Source code for vissim2gmns.func_lib.cvt_fzp

'''
##############################################################
# Created Date: Tuesday, April 15th 2025
# Contact Info: luoxiangyong01@gmail.com
# Author/Copyright: Mr. Xiangyong Luo
##############################################################
'''


from pathlib import Path

from loguru import logger
import pandas as pd
from geopandas import GeoDataFrame
from shapely.geometry import Point
import numpy as np
from pyufunc import str_strip

# ignore RuntimeWarning
import warnings

warnings.filterwarnings("ignore", category=RuntimeWarning)
warnings.filterwarnings("ignore", category=UserWarning)

try:
    from .geocoding_vissim_coord import cvt_vissim_to_wgs1984
    from .cvt_inpx import vissim_inpx
    from .link_projection import point_on_wkt_line_by_distance
except ImportError:
    from geocoding_vissim_coord import cvt_vissim_to_wgs1984
    from cvt_inpx import vissim_inpx
    from link_projection import point_on_wkt_line_by_distance


def remove_stripe_values(value):
    """Remove stripe values from a string."""
    if isinstance(value, str):
        return value.replace("\\r", "").replace("\\n", "").replace("\n", "").replace("b'", "").replace("'", "")
    return value


[docs] def vissim_fzp(path_vissim_fzp: str, path_vissim_inpx: str, *, x_col_name: str = "POS", y_col_name: str = "POSLAT", output_dir: str = "", **kwargs) -> GeoDataFrame: """Convert vissim fzp file to geopandas dataframe. Args: path_vissim_fzp (str): the path to the vissim fzp file. path_vissim_inpx (str): the path to the vissim inpx file. x_col_name (str): the longitude column name in fzp file to convert fzp file to geojson. y_col_name (str): the latitude column name in fzp file to convert fzp file to geojson. output_dir (str): the directory to save the output files. Defaults to "". **kwargs: other parameters for the conversion, such as isShp, isGeojson, isCsv. Notes: - output_dir: if not provide no data will be saved. - isShp: whether to save the output as shapefile. Default is False. - isGeojson: whether to save the output as geojson. Default is False. - isCsv: whether to save the output as csv. Default is True. Example: >>> import vissim2gmns as vg >>> path_vissim_fzp = "./vissim_data/xl_002_001.fzp" >>> path_vissim_inpx = "./vissim_data/xl_002_001.inpx" >>> output_dir = "./output" >>> x_col_name = "POS" >>> y_col_name = "POSLAT" >>> df_fzp = vg.vissim_fzp(path_vissim_fzp, path_vissim_inpx) # get the fzp data as a geopandas dataframe without saving to file. >>> df_fzp = vg.vissim_fzp(path_vissim_fzp, path_vissim_inpx, output_dir=output_dir) # get fzp data and save shp files to the output_dir. >>> df_fzp = vg.vissim_fzp(path_vissim_fzp, path_vissim_inpx, output_dir=output_dir, isShp=True, isGeojson=True, isCsv=True) >>> # get fzp data and save csv, geojson and shapefile to the output_dir. Returns: GeoDataFrame: converted geopandas dataframe. """ # Get inpx data (specially the link geometry) to use for mapping the vehicles on the link geometry. inpx_dict = vissim_inpx(path_vissim_inpx) # extract refPoint coordinates from inpx file ref_point_map = inpx_dict.get("netPara", {}).get("refPointMap", {}) ref_point_net = inpx_dict.get("netPara", {}).get("refPointNet", {}) x_refmap = float(ref_point_map.get("@x")) y_refmap = float(ref_point_map.get("@y")) x_refnet = float(ref_point_net.get("@x")) y_refnet = float(ref_point_net.get("@y")) # Read fzp file as binary and create a DataFrame of byte lines. with open(path_vissim_fzp, 'rb') as ff: df_fzp = pd.DataFrame(ff.readlines()) logger.info(f"Total {len(df_fzp)} vehicle points in fzp file") # Retrieve the VISSIM running date from a specific row. fzp_date = str(df_fzp.iloc[3, :]) # remove \\r and \\n, \n characters from staring values fzp_date = remove_stripe_values(fzp_date) start_fzp = next((i for i in range(len(df_fzp)) if str(df_fzp.iloc[i, 0])[3:10] == "VEHICLE"), 0) # fzp file starts from the identified start position. vissim_fzpdata = df_fzp.iloc[start_fzp:] fzp_data = pd.DataFrame([str(jj).split(';') for jj in vissim_fzpdata.iloc[:, 0]]) # Clean up and assign column names. fzp_data = fzp_data.map(remove_stripe_values) # Clean up and assign column names. columns_pre = list(fzp_data.iloc[0]) for i in columns_pre: if "\\r\\n'" in i: columns_pre[columns_pre.index(i)] = i[:-5] fzp_data.columns = columns_pre # Process the data rows. fzp_data = fzp_data.iloc[1:].reset_index(drop=True) # fzp_data.iloc[:, 0] = [i.split("'")[1] for i in fzp_data.iloc[:, 0]] # fzp_data.iloc[:, 0] = fzp_data.iloc[:, 0].astype(float) time_col = fzp_data.columns[0] fzp_data[time_col] = pd.to_numeric(fzp_data[time_col], errors="coerce") # Create a datetime column based on the fzp file's date and time offset. fzp_data["datetime"] = pd.to_datetime( fzp_date.split("Name")[0].split("Date:")[1].lstrip(), # format="%d/%m/%Y %H:%M:%S", # errors="coerce" ) + pd.to_timedelta(fzp_data.iloc[:, 0], unit="s") # format datetime to "%d/%m/%Y %H:%M:%S" # fzp_data["datetime"] = fzp_data["datetime"].dt.strftime("%d/%m/%Y %H:%M:%S") # Clean the y-coordinate column values. fzp_data[x_col_name] = fzp_data[x_col_name].map(str_strip) fzp_data[y_col_name] = fzp_data[y_col_name].map(str_strip) # add new columns for wgs84 coordinates fzp_data[f"{x_col_name}_wgs"] = np.nan fzp_data[f"{y_col_name}_wgs"] = np.nan logger.info("Start projecting vehicle points on the lane geometry to get accurate coordinates in wgs84...") # update x_wgs and y_wgs from table for idx, row in fzp_data.iterrows(): # get the link id that column starts with "LANE\\LINK" in fzp file link_id = row.get(r"LANE\\LINK") # if not get link id, fine column starts with "LANE\\Link" and get link id if pd.isna(link_id): col_link = [col for col in fzp_data.columns if "lane" in col.lower() and "link" in col.lower()] if col_link: link_id = row.get(col_link[0]) lane_index = row.get("LANE") if pd.isna(lane_index): col_lane = [col for col in fzp_data.columns if "lane" in col.lower() and "link" not in col.lower()] if col_lane: lane_index = row.get(col_lane[0]) if "-" in lane_index: # extract lane geometry from inpx_dict lane_id = lane_index.replace("-", "_") else: lane_id = rf"{link_id}_{lane_index}" lane_geom = inpx_dict.get("lanes", {}).get(lane_id, {}).get("geom") if lane_geom: # get the distance of the vehicle on the lane from the start point of the lane distance_on_lane = row.get(f"{x_col_name}") if pd.notna(distance_on_lane): # project the vehicle point on the lane geometry to get the accurate x and y coordinates in wgs84 projected_point = point_on_wkt_line_by_distance(lane_geom, float(distance_on_lane)) if projected_point: fzp_data.at[idx, f"{x_col_name}_wgs"] = projected_point.x fzp_data.at[idx, f"{y_col_name}_wgs"] = projected_point.y # Create geometry for the GeoDataFrame. geometry = [Point(xy) for xy in zip(fzp_data[f"{x_col_name}_wgs"], fzp_data[f"{y_col_name}_wgs"])] gdf_fzp = GeoDataFrame(fzp_data, crs="EPSG:4326", geometry=geometry) if output_dir: isCsv = kwargs.get("isCsv", False) isGeojson = kwargs.get("isGeojson", True) isShp = kwargs.get("isShp", False) output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) output_fzp = output_dir / f"{Path(path_vissim_fzp).stem}_fzp" # save to csv file if isCsv: output_fzp_csv = output_fzp.with_suffix(f"{output_fzp.suffix}.csv") gdf_fzp.to_csv(output_fzp_csv, index=False) logger.info(f"Successfully saved fzp file to csv: {output_dir}") # save to geojson file if isGeojson: output_fzp_geojson = output_fzp.with_suffix(f"{output_fzp.suffix}.geojson") gdf_fzp.to_file(output_fzp_geojson, driver="GeoJSON") logger.info(f"Successfully saved fzp file to geojson: {output_dir}") # save to shapefile if isShp: output_fzp_shp = output_fzp.with_suffix(".shp") gdf_fzp.to_file(output_fzp_shp, driver="ESRI Shapefile") logger.info(f"Successfully saved fzp file to shapefile: {output_dir}") return gdf_fzp
if __name__ == "__main__": path_vissim_inpx = r"C:\Users\xyluo25\anaconda3_workspace\001_GitHub\vissim2gmns\datasets\one_intersection\xl_002_001.inpx" path_vissim_fzp = r"C:\Users\xyluo25\anaconda3_workspace\001_GitHub\vissim2gmns\datasets\one_intersection\xl_002_001.fzp" x_col_name: str = "POS" y_col_name: str = "POSLAT" gdf_fzp = vissim_fzp(path_vissim_fzp, path_vissim_inpx) gdf_fzp_1000 = gdf_fzp.head(1000) gdf_fzp_1000.to_file("vissim_fzp_1000.geojson", driver="GeoJSON") # save to shp file gdf_fzp_1000.to_file("vissim_fzp_1000.shp", driver="ESRI Shapefile") # gdf_fzp.to_file("vissim_fzp.geojson", driver="GeoJSON")