'''
##############################################################
# Created Date: Monday, April 14th 2025
# Contact Info: luoxiangyong01@gmail.com
# Author/Copyright: Mr. Xiangyong Luo
##############################################################
'''
from pathlib import Path
from shapely.geometry import LineString
from shapely.ops import linemerge, transform
from geopandas import GeoDataFrame
import geopandas as gpd
import pandas as pd
from pyproj import CRS, Transformer
import xmltodict
from loguru import logger
# ignore RuntimeWarning, UserWarning
import warnings
warnings.filterwarnings("ignore", category=RuntimeWarning)
warnings.filterwarnings("ignore", category=UserWarning)
try:
from .geocoding_vissim_coord import cvt_vissim_to_wgs1984
except ImportError:
from geocoding_vissim_coord import cvt_vissim_to_wgs1984
SELECTED_INPX_FIELDS = {
"@vissimVersion": None,
"@version": None,
"simulation": None,
"netPara": None,
"links": "link",
"nodes": "node",
"signalHeads": "signalHead",
"signalControllers": "signalController",
"laneMarkingTypes": "laneMarkingType",
"levels": "level",
"linkBehaviorTypes": "linkBehaviorType",
"drivingBehaviors": "drivingBehavior",
"occupancyDistributions": "occupancyDistribution",
"pedestrianClasses": "pedestrianClass",
"pedestrianTypes": "pedestrianType",
"parkLotGrps": "parkLotGrp",
"parkingLots": "parkingLot",
"stopSigns": "stopSign",
"vehicleClasses": "vehicleClass",
"vehicleInputs": "vehicleInput",
"vehicleTypes": "vehicleType",
"walkingBehaviors": "walkingBehavior",
}
ADDITIONAL_INPX_FIELDS = {
"anmDefaults": None,
"areaBehaviorTypes": "areaBehaviorType",
"backgroundImages": "backgroundImage",
"colorDistributions": "colorDistribution",
"conflictAreas": "conflictArea",
"desAccelerationFunctions": "desAccelerationFunction",
"desDecelerationFunctions": "desDecelerationFunction",
"desSpeedDecisions": "desSpeedDecision",
"desSpeedDistributions": "desSpeedDistribution",
"displayTypes": "displayType",
"dynamicAssignment": None,
"evaluation": None,
"pedestrianCompositions": "pedestrianComposition",
"locationDistributions": "locationDistribution",
"maxAccelerationFunctions": "maxAccelerationFunction",
"maxDecelerationFunctions": "maxDecelerationFunction",
"model2D3DDistributions": "model2D3DDistribution",
"models2D3D": "model2D3D",
"powerDistributions": "powerDistribution",
"reducedSpeedAreas": "reducedSpeedArea",
"timeDistributions": "timeDistribution",
"timeIntervalSets": "timeIntervalSet",
"vehicleCompositions": "vehicleComposition",
"vehicleRoutingDecisionsParking": "vehicleRoutingDecisionParking",
"vehicleRoutingDecisionsStatic": "vehicleRoutingDecisionStatic",
"weightDistributions": "weightDistribution",
}
[docs]
def create_lane_geometries(link_geometry: str | list | LineString,
num_lanes: int,
lane_width: float | list[float]):
"""Create lane centerline geometries from a link geometry.
Args:
link_geometry: A Shapely LineString or an iterable of (x, y) coordinate pairs.
num_lanes (int): Number of lanes to generate.
lane_width (float | list[float]): Lane width in meters. A single value is applied to
every lane. A list must have the same length as ``num_lanes`` and is interpreted
from rightmost lane to leftmost lane.
Notes:
- The lane geometries are generated by offsetting the link geometry.
The leftmost index starts from 1, and the lane index increases from leftmost to rightmost lane (VISSIM Lane ordering).
Example:
>>> from shapely.geometry import LineString
>>> link_geom = LineString([(0, 0), (100, 0)])
>>> lane_geoms = create_lane_geometries(link_geom, num_lanes=3, lane_width=3.5)
>>> for idx, lane_geom in lane_geoms.items():
... print(f"Lane {idx}: {lane_geom}")
... Lane 1: LINESTRING (0 3.5, 100 3.5)
... Lane 2: LINESTRING (0 0, 100 0)
... Lane 3: LINESTRING (0 -3.5, 100 -3.5)
Returns:
list[LineString]: Lane geometries ordered from rightmost to leftmost relative to the link direction.
"""
if num_lanes < 1:
raise ValueError("num_lanes must be at least 1.")
if isinstance(lane_width, (int, float)):
lane_widths = [float(lane_width)] * num_lanes
else:
lane_widths = [float(width) for width in lane_width]
if len(lane_widths) == 1:
lane_widths *= num_lanes
elif len(lane_widths) != num_lanes:
# use the first value if lane_width is a list but have different length with num_lanes
lane_widths = [lane_widths[0]] * num_lanes
logger.warning(f" :Length of lane_width list does not match num_lanes. Using the first value {lane_widths[0]} for all lanes.")
if any(width <= 0 for width in lane_widths):
raise ValueError("All lane widths must be greater than 0.")
if isinstance(link_geometry, LineString):
link_line = link_geometry
else:
link_line = LineString(link_geometry)
if len(link_line.coords) < 2:
raise ValueError("link_geometry must contain at least two points.")
centroid = link_line.centroid
local_crs = CRS.from_proj4(
f"+proj=aeqd +lat_0={centroid.y} +lon_0={centroid.x} +datum=WGS84 +units=m +no_defs"
)
to_local = Transformer.from_crs("EPSG:4326", local_crs, always_xy=True).transform
to_wgs84 = Transformer.from_crs(local_crs, "EPSG:4326", always_xy=True).transform
link_local = transform(to_local, link_line)
lane_geometries = {}
total_lane_width = sum(lane_widths)
left_edge_offset = -total_lane_width / 2.0
accumulated_width = 0.0
for lane_idx in range(num_lanes):
offset_distance = left_edge_offset + accumulated_width + lane_widths[lane_idx] / 2.0
accumulated_width += lane_widths[lane_idx]
if abs(offset_distance) < 1e-9:
lane_local = link_local
else:
side = "left" if offset_distance > 0 else "right"
lane_local = link_local.parallel_offset(
abs(offset_distance),
side,
join_style=2,
)
if lane_local.geom_type == "MultiLineString":
lane_local = linemerge(lane_local)
if lane_local.geom_type != "LineString":
lane_local = max(
(
geom
for geom in getattr(lane_local, "geoms", [])
if geom.geom_type == "LineString"
),
key=lambda geom: geom.length,
default=link_local,
)
lane_geometries[num_lanes - lane_idx] = transform(to_wgs84, lane_local)
return lane_geometries
@logger.catch
def extract_inpx_data(path_vissim_inpx: str, inc_fields: list = None) -> dict:
"""Extract data from vissim inpx file.
Args:
path_vissim_inpx (str): the path to the vissim inpx file.
inc_fields (list): the fields to be extracted from inpx file. Defaults to None, which means only minimum fields will be extracted.
Notes:
- The inpx file is an XML file, and the data is stored in a hierarchical structure. We need to parse the XML file and extract the relevant data. The data is stored in a dictionary format, where the keys are the names of the elements and attributes in the XML file, and the values are the corresponding values.
- keys starting with "@" are attributes in the original xml file and keys without "@" are elements in the original xml file.
- the inc_fields include: ["anmDefaults", "areaBehaviorTypes", "backgroundImages", "colorDistributions", "conflictAreas", "desAccelerationFunctions", "desDecelerationFunctions", "desSpeedDecisions", "desSpeedDistributions", "displayTypes", "dynamicAssignment", "evaluation", "locationDistributions", "maxAccelerationFunctions", "maxDecelerationFunctions", "model2D3DDistributions", "models2D3D", "powerDistributions", "reducedSpeedAreas", "timeDistributions", "timeIntervalSets", "vehicleCompositions", "vehicleRoutingDecisionsParking", "vehicleRoutingDecisionsStatic", "weightDistributions"]
- default fields include: ["drivingBehaviors", "laneMarkingTypes", "levels", "linkBehaviorTypes", "links", "netPara", "nodes", "occupancyDistributions", "parkLotGrps", "parkingLots", "simulation", "stopSigns", "vehicleClasses", "vehicleInputs", "vehicleTypes", "walkingBehaviors"]
Returns:
dict: extracted data from inpx file.
"""
inpx_dict = {}
with open(path_vissim_inpx, "r") as f:
xmlstring = f.read()
root_dict = xmltodict.parse(xmlstring, encoding="utf-8", process_namespaces=True)
root_network = root_dict.get("network", {})
if not root_network:
logger.error(f" :Failed to extract network data from inpx file: {path_vissim_inpx}")
return {}
# use SELECTED_INPX_FIELDS as default fields to be extracted from inpx file,
if inc_fields is None:
INC_INPX_FIELDS = SELECTED_INPX_FIELDS
else:
ALL_INPX_FIELDS = {**SELECTED_INPX_FIELDS, **ADDITIONAL_INPX_FIELDS}
INC_INPX_FIELDS = {
**SELECTED_INPX_FIELDS,
**{
field: ADDITIONAL_INPX_FIELDS[field]
for field in inc_fields
if field in ALL_INPX_FIELDS
},
}
# add vissim version
for each_field in INC_INPX_FIELDS:
# skip fields that are not in the inpx file
field_data = root_network.get(each_field, {})
if not field_data:
continue
# load the field data into the inpx_dict
if INC_INPX_FIELDS[each_field]:
each_field_data = field_data.get(INC_INPX_FIELDS[each_field], {})
# only one item within the field
if isinstance(each_field_data, dict):
inpx_dict[each_field] = each_field_data
# multiple items within the field
elif isinstance(each_field_data, list):
# record the data by no attribute of each field
inpx_dict[each_field] = {}
for each_item in each_field_data:
item_id = each_item.get("@no", None)
if item_id is not None:
inpx_dict[each_field][item_id] = each_item
else:
logger.warning(f" :Failed to extract {each_field} data from inpx file: {path_vissim_inpx}. Missing @no attribute in one of the items.")
else:
inpx_dict[each_field] = field_data
return inpx_dict
# @func_running_time
[docs]
@logger.catch
def vissim_inpx(path_vissim_inpx: str,
output_dir: str = "", **kwargs) -> dict:
"""Convert vissim inpx file to geopandas dataframe.
Args:
path_vissim_inpx (str): the path to the vissim inpx file.
output_dir (str): the directory to save the output files. Defaults to "".
**kwargs: other parameters for the conversion, such as isShp, isGeojson, isCsv.
Notes:
- output_dir: if not provide no data will be saved.
- isShp: whether to save the output as shapefile. Default is False.
- isGeojson: whether to save the output as geojson. Default is True.
- isCsv: whether to save the output as csv. Default is True.
Example:
>>> import vissim2gmns as vg
>>> path_vissim_inpx = "path/to/vissim.inpx"
>>> output_dir = "path/to/output"
>>> # get the inpx data as a dictionary without saving any output files
>>> inpx_dict = vg.vissim_inpx(path_vissim_inpx)
>>>
>>> # get the inpx data and save csv and geojson files (default) to the output_dir.
>>> inpx_dict = vg.vissim_inpx(path_vissim_inpx, output_dir=output_dir)
>>>
>>> # get the inpx data and control the output file format by setting isShp, isGeojson and isCsv parameters.
>>> inpx_dict = vg.vissim_inpx(path_vissim_inpx, output_dir=output_dir, isShp=True, isGeojson=True, isCsv=True)
Returns:
dict: the inpx data as a dictionary.
"""
inpx_dict = extract_inpx_data(path_vissim_inpx, inc_fields=None)
# extract refPoint coordinates from inpx file
ref_point_map = inpx_dict.get("netPara", {}).get("refPointMap", {})
ref_point_net = inpx_dict.get("netPara", {}).get("refPointNet", {})
x_refmap = float(ref_point_map.get("@x"))
y_refmap = float(ref_point_map.get("@y"))
x_refnet = float(ref_point_net.get("@x"))
y_refnet = float(ref_point_net.get("@y"))
# Extract link data from inpx file and convert the coordinates to lonlat
inpx_links = inpx_dict.get("links", {})
lanes_dict = {}
for link_id, link_data in inpx_links.items():
# extract link geometry and convert to lonlat coordinates
link_geometry = link_data.get("geometry").get("linkPolyPts").get("linkPolyPoint", [])
points_lst = []
for point in link_geometry:
vissim_x = float(point.get("@x"))
vissim_y = float(point.get("@y"))
lon, lat = cvt_vissim_to_wgs1984(vissim_x, vissim_y,
x_refmap, y_refmap,
x_refnet, y_refnet)
points_lst.append((lon, lat))
# convert LineString to WKT format
# link_data["geom"] = LineString(points_lst)
link_data["geom"] = LineString(points_lst)
# link_data["geom_points"] = points_lst
# extract lane data and add link_id to each lane
# lane index in leftmost to rightmost order with index starting from 1.
link_lanes = link_data.get("lanes").get("lane")
if link_lanes:
if isinstance(link_lanes, dict):
link_data["num_lanes"] = 1
link_lanes["link_id"] = link_id
link_lanes["lane_index"] = 1
link_lanes["lane_id"] = f"{link_id}_1"
link_lanes["geom"] = link_data["geom"]
# link_lanes["geom_points"] = link_data["geom_points"]
lanes_dict[f"{link_id}_1"] = link_lanes
elif isinstance(link_lanes, list):
# update num_lanes in link_data, which will be used to create lane geometries later
link_data["num_lanes"] = len(link_lanes)
# get lane width from link_lanes, if lane width is not specified, use the default lane width of 3.0 meters
lane_width = [float(lane.get("@width", 3.0)) for lane in link_lanes]
lane_geometries = create_lane_geometries(
link_data["geom"],
len(link_lanes),
lane_width,
)
# update link_data geom with MultiplineString geometry created from lane geometries, which will be used for mapping the vehicles on the link geometry later
link_data["geom"] = linemerge(list(lane_geometries.values()))
for lane_idx, lane_data in enumerate(link_lanes):
add_on = {"link_id": link_id, "lane_index": lane_idx + 1, "lane_id": f"{link_id}_{lane_idx + 1}"}
lane_data.update(add_on)
lane_data["geom"] = lane_geometries[lane_idx + 1]
# lane_data["geom_points"] = list(lane_geometries[lane_idx].coords)
lanes_dict[f"{link_id}_{lane_idx + 1}"] = lane_data
else:
continue
# remove geometry and lanes attribute from inpx_links, since they are not needed in the final output
for link_id, link_data in inpx_links.items():
link_data.pop("geometry", None)
link_data.pop("lanes", None)
# create line series
df_links = pd.DataFrame.from_dict(inpx_links, orient="index").reset_index(drop=True)
df_lanes = pd.DataFrame.from_dict(lanes_dict, orient="index").reset_index(drop=True)
# line_series = gpd.GeoSeries(link_linestring_lst)
# line_df = gpd.GeoDataFrame({"geometry": line_series}, crs="EPSG:4326")
gdf_links = gpd.GeoDataFrame(df_links, geometry="geom", crs="EPSG:4326")
gdf_lanes = gpd.GeoDataFrame(df_lanes, geometry="geom", crs="EPSG:4326")
# add link id column at the first column
gdf_links.insert(0, "link_id", gdf_links["@no"])
# rename num_lanes to lanes to match with GMNS link standards
gdf_links.rename(columns={"num_lanes": "lanes"}, inplace=True)
# add lane_num from lane_index column in gdf_lanes to match with GMNS lane standards.
gdf_lanes.insert(1, "lane_num", gdf_lanes["lane_index"])
# rename @width to width in gdf_lanes
gdf_lanes.rename(columns={"@width": "width"}, inplace=True)
# order columns in gdf_lanes to have first columns as lane_id, link_id, lane_num, and then the rest of the columns
lane_cols = gdf_lanes.columns.tolist()
lane_cols = [col for col in lane_cols if col not in ["lane_id", "link_id", "lane_num", "width"]]
gdf_lanes = gdf_lanes[["lane_id", "link_id", "lane_num", "width"] + lane_cols]
# save the results to csv and geojson files if output_dir is specified
if output_dir:
isCsv = kwargs.get("isCsv", False)
isGeojson = kwargs.get("isGeojson", True)
isShp = kwargs.get("isShp", False)
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# get input inpx file name without suffix to use as output file name
output_link = output_dir / f"{Path(path_vissim_inpx).stem}_inpx_links"
output_lane = output_dir / f"{Path(path_vissim_inpx).stem}_inpx_lanes"
# save to csv file
if isCsv:
output_link_csv = output_link.with_suffix(f"{output_link.suffix}.csv")
output_lane_csv = output_lane.with_suffix(f"{output_lane.suffix}.csv")
gdf_links.to_csv(output_link_csv, index=False)
gdf_lanes.to_csv(output_lane_csv, index=False)
logger.info(f"Successfully saved inpx file to csv: {output_dir}")
# save to geojson file
if isGeojson:
output_link_geojson = output_link.with_suffix(f"{output_link.suffix}.geojson")
output_lane_geojson = output_lane.with_suffix(f"{output_lane.suffix}.geojson")
gdf_links.to_file(output_link_geojson, driver="GeoJSON")
gdf_lanes.to_file(output_lane_geojson, driver="GeoJSON")
logger.info(f"Successfully saved inpx file to geojson: {output_dir}")
# save to shapefile
if isShp:
output_link_shp = output_link.with_suffix(".shp")
output_lane_shp = output_lane.with_suffix(".shp")
gdf_links.to_file(output_link_shp, driver="ESRI Shapefile")
gdf_lanes.to_file(output_lane_shp, driver="ESRI Shapefile")
logger.info(f"Successfully saved inpx file to shapefile: {output_dir}")
# add lanes data to inpx_dict
inpx_dict["lanes"] = lanes_dict
return inpx_dict
if __name__ == "__main__":
path_vissim_inpx = r"C:\Users\xyluo25\anaconda3_workspace\001_GitHub\vissim2gmns\datasets\aveiro_port_net\Aveiro_Port_Train_Network_25_03_2026.inpx"
inpx_dict = vissim_inpx(path_vissim_inpx, output_dir=Path.cwd())