用Python处理NGSIM轨迹数据从原始CSV到模型输入的完整实战指南附代码当面对上千万行的交通轨迹数据时数据科学家最常遇到的困境是如何在保证处理效率的同时提取出对模型真正有价值的时空特征NGSIM数据集作为交通行为研究的黄金标准其每0.1秒采样的高精度轨迹既蕴含着丰富的驾驶行为模式也隐藏着数据清洗和特征工程的诸多挑战。本文将手把手带你用Python实现从原始CSV到模型输入的完整处理流程特别针对LSTM和Transformer等序列模型的输入需求分享处理大规模时空数据的实战技巧。1. 数据加载与初步探索面对25列、上千万行的CSV文件直接使用pandas的read_csv()可能会耗尽内存。这里推荐使用分块加载策略import pandas as pd chunk_size 100000 traj_chunks pd.read_csv(us101_trajectories.csv, chunksizechunk_size, dtype{Vehicle_ID: int32, Frame_ID: int32}) df pd.concat([chunk for chunk in traj_chunks])关键参数优化技巧指定dtype参数减少内存占用使用chunksize分批加载避免内存溢出优先加载必要列usecols参数筛选关键字段初步探索时建议重点关注以下统计量指标异常值判断标准处理建议v_Vel0 或 120 ft/s标记为缺失值v_Acc-10 或 10 ft/s²前后帧插值Space_Headway0设为前车距离阈值2. 时空数据清洗实战2.1 坐标系统一与异常点检测NGSIM的Local_X/Y坐标是相对于路段局部零点的不同路段需要转换到统一坐标系。这里给出UTM坐标转换函数import pyproj def local_to_utm(local_x, local_y, zone_number10): utm_proj pyproj.Proj(projutm, zonezone_number, ellpsWGS84) return utm_proj(local_x, local_y) df[utm_x], df[utm_y] zip(*df.apply( lambda row: local_to_utm(row[Local_X], row[Local_Y]), axis1))常见坑点不同路段需指定不同的UTM分区号摩托车v_Class1的加速度波动较大需单独处理相邻帧坐标突变可能是标注误差建议使用移动平均平滑2.2 运动学特征修复计算Jerk加速度变化率时原始加速度的噪声会被放大。推荐使用Savitzky-Golay滤波器from scipy.signal import savgol_filter def smooth_trajectory(group, window5, polyorder2): group[smoothed_acc] savgol_filter(group[v_Acc], window, polyorder) group[jerk] group[smoothed_acc].diff() / 0.1 # 0.1秒采样间隔 return group df df.groupby(Vehicle_ID).apply(smooth_trajectory)提示滤波窗口大小应根据车辆类型动态调整卡车建议window7轿车window53. 高级特征工程3.1 微观交通流特征除基础速度、加速度外这些衍生特征对行为预测至关重要# 跟车特征 df[TTC] df[Space_Headway] / (df[v_Vel] 1e-6) # 避免除零 df[DRAC] (df[v_Vel]**2 - df[Preceding_Vel]**2) / (2 * df[Space_Headway]) # 车道变换指标 df[lane_deviation] df.groupby(Vehicle_ID)[Lane_ID].diff().abs()3.2 上下文感知特征利用周围车辆状态构建环境特征def get_surrounding_vehicles(df, target_id, frame_window30): current_frame df[df[Vehicle_ID] target_id][Frame_ID].values[0] nearby df[(df[Frame_ID] current_frame - frame_window) (df[Frame_ID] current_frame frame_window)] return nearby.groupby(Lane_ID).agg({ v_Vel: mean, Space_Headway: min }).to_dict(records)4. 模型输入格式化4.1 序列样本生成为LSTM生成滑动窗口样本的优化实现def create_sequences(values, window_size50, stride10): sequences [] for i in range(0, len(values) - window_size 1, stride): seq values[i:i window_size] sequences.append(seq) return np.array(sequences) # 按车辆分组处理 features [utm_x, utm_y, v_Vel, smoothed_acc, jerk] vehicle_groups df.groupby(Vehicle_ID)[features] sequences [create_sequences(group.values) for _, group in vehicle_groups]4.2 Transformer专用格式化对于Transformer模型需要额外添加位置编码def add_positional_encoding(sequences): pos np.arange(sequences.shape[1])[:, np.newaxis] i np.arange(sequences.shape[2])[np.newaxis, :] angle_rates 1 / np.power(10000, (2 * (i//2)) / np.float32(sequences.shape[2])) angle_rads pos * angle_rates angle_rads[:, 0::2] np.sin(angle_rads[:, 0::2]) angle_rads[:, 1::2] np.cos(angle_rads[:, 1::2]) return sequences angle_rads[np.newaxis, ...]5. 性能优化技巧处理千万级数据时的关键优化手段并行化处理使用swifter加速apply操作import swifter df df.groupby(Vehicle_ID).swifter.apply(smooth_trajectory)内存映射对于超大文件使用dask或modinimport dask.dataframe as dd ddf dd.read_csv(i80_trajectories.csv, blocksize25e6)特征存储处理好的特征保存为Parquet格式df.to_parquet(processed_data.parquet, enginepyarrow)在实际项目中处理US-101路段数据时约270万行上述优化方案将处理时间从原来的46分钟缩短到8分钟。特别值得注意的是车辆ID的排序对分组操作效率影响显著——预先按Vehicle_ID排序可使处理速度提升3倍以上。