| import argparse |
| import glob |
| import os |
| import pickle |
| import pickle as pkl |
| from typing import Dict, List |
|
|
| import utils.sensor as box_sensor |
| import utils.transform as box_transform |
| import utils.vis as box_vis |
| import utils.rotation as object_rotation |
| import utils.calibration as calib_utils |
| import numpy as np |
| import pyquaternion |
| import scipy |
| from utils.miscell import find_unique_common_from_lists |
| from utils.data_structure import Box3D |
| from nuscenes.eval.common.utils import Quaternion, quaternion_yaw |
|
|
| from utils.nuplan_pointcloud import PointCloud |
|
|
| np.set_printoptions(precision=3, suppress=True) |
|
|
| FPS = 2 |
| FRAME_INTERVAL = 1 |
| FPS_KEYFRAME = FPS / FRAME_INTERVAL |
|
|
| global_track_id = 1 |
| mapping_tracktoken2globalid = dict() |
|
|
|
|
| def parse_ego_sensor_calib(anno: Dict, data_split: str, vis: bool = False) -> Dict: |
| """Parse calibration between world, ego, lidar and camera""" |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| sdc_center_ego = np.array([[0, 0, 0, 1.0]]).transpose() |
| sdc_center_lidar = np.linalg.inv(anno["lidar2ego"]) @ sdc_center_ego |
| yaw_lidar = scipy.spatial.transform.Rotation.from_matrix( |
| np.linalg.inv(anno["lidar2ego"])[:3, :3] |
| ).as_euler("xyz", degrees=False)[-1] |
| l, w, h = 4.52, 2.1, 1.56 |
| sdc_vel_ego = anno["can_bus"][10:13] |
| sdc_vel_global = anno["ego2global"][:3, :3] @ sdc_vel_ego |
| sdc_vel_lidar = np.linalg.inv(anno["lidar2ego"][:3, :3]) @ sdc_vel_ego |
| |
| |
| |
|
|
| gt_sdc_bbox_lidar = np.array( |
| [ |
| sdc_center_lidar[0, 0], |
| sdc_center_lidar[1, 0], |
| sdc_center_lidar[2, 0], |
| l, |
| w, |
| h, |
| yaw_lidar, |
| sdc_vel_lidar[0], |
| sdc_vel_lidar[1], |
| ] |
| ) |
| |
| |
|
|
| |
| cams = anno["cams"] |
| for cam_type, cam_dict in cams.items(): |
| sensor2lidar = np.identity(4) |
| sensor2lidar[:3, :3] = cam_dict["sensor2lidar_rotation"] |
| sensor2lidar[:3, 3] = cam_dict["sensor2lidar_translation"] |
| sensor2ego: np.ndarray = anno["lidar2ego"] @ sensor2lidar |
|
|
| |
| cams[cam_type].update( |
| { |
| "type": cam_type, |
| "sensor2ego_translation": sensor2ego[:3, 3], |
| "sensor2ego_rotation": object_rotation.convert_mat2qua( |
| sensor2ego[:3, :3] |
| ), |
| } |
| ) |
|
|
| pts_filename = anno["lidar_path"] |
| src_split = data_split.split("_")[0] |
| data_root = f"./data/openscene-v1.1/sensor_blobs/{src_split}" |
| if data_root not in pts_filename: |
| pts_filename = os.path.join(data_root, pts_filename) |
|
|
| points = PointCloud.parse_from_file(pts_filename).to_pcd_bin2() |
| points = points[:3].T |
|
|
| |
| if vis: |
| log_root = pts_filename.split("/")[5] |
| save_root = ( |
| f"./data/openscene-v1.1/vis/{data_split}/{log_root}/{anno['scene_name']}" |
| ) |
|
|
| if not os.path.exists(save_root): |
| os.makedirs(save_root) |
| save_path = os.path.join( |
| save_root, f"{anno['frame_idx']:06d}_{anno['token']}.jpg" |
| ) |
| box_vis.vis_box_on_lidar( |
| lidar=points, |
| save_path=save_path, |
| left_hand=False, |
| ego2lidar=np.linalg.inv(anno["lidar2ego"]), |
| ) |
|
|
| command = np.argmax(anno["driving_command"]) |
| |
| anno.update( |
| { |
| "lidar_pts": points, |
| "sweeps": [], |
| "cams": cams, |
| "sdc_vel_global": sdc_vel_global, |
| |
| "gt_sdc_bbox_lidar": gt_sdc_bbox_lidar, |
| "command": command, |
| } |
| ) |
|
|
| return anno |
|
|
|
|
| def parse_bbox(anno: Dict, data_split: str, vis: bool = False) -> Dict: |
| """Parse each one of the bounding boxes and compute properties""" |
|
|
| num_obj: int = anno["anns"]["gt_boxes"].shape[0] |
| anno.update( |
| { |
| "gt_velocity": anno["anns"]["gt_velocity_3d"][:, :2], |
| "gt_boxes": anno["anns"]["gt_boxes"], |
| "gt_bboxes_global": np.zeros((num_obj, 9), dtype="float32"), |
| "gt_names": anno["anns"]["gt_names"], |
| "gt_inds": np.zeros((num_obj), dtype="int64"), |
| "num_lidar_pts": np.zeros((num_obj,), dtype="float32"), |
| "valid_flag": np.zeros((num_obj,), dtype="bool"), |
| } |
| ) |
|
|
| |
| bbox_gt_in_ego_list: List[np.ndarray] = list() |
| valid_id_list: List[np.ndarray] = list() |
| for bbox_index in range(num_obj): |
| |
| box_7dof_lidar = anno["anns"]["gt_boxes"][bbox_index] |
| instance_token = anno["anns"]["instance_tokens"][bbox_index] |
| track_token = anno["anns"]["track_tokens"][bbox_index] |
|
|
| |
| global mapping_tracktoken2globalid |
| if track_token not in mapping_tracktoken2globalid: |
| global global_track_id |
| mapping_tracktoken2globalid[track_token] = global_track_id |
| anno["gt_inds"][bbox_index] = global_track_id |
|
|
| |
| global_track_id += 1 |
| else: |
| anno["gt_inds"][bbox_index] = mapping_tracktoken2globalid[track_token] |
|
|
| |
| |
| |
| |
| |
|
|
| |
| bbox2lidar = calib_utils.create_transform( |
| box_7dof_lidar[:3], np.array([0, box_7dof_lidar[-1], 0]) |
| ) |
| |
| |
|
|
| |
| bbox_center_lidar = np.array([[0, 0, 0, 1.0]]).transpose() |
| bbox_center_lidar[:3, 0] = box_7dof_lidar[:3] |
| l, w, h = box_7dof_lidar[3:6] |
| bbox_center_world = anno["lidar2global"] @ bbox_center_lidar |
| bbox2global = anno["lidar2global"] @ bbox2lidar |
| yaw_global = scipy.spatial.transform.Rotation.from_matrix( |
| bbox2global[:3, :3] |
| ).as_euler("xyz", degrees=False)[-1] |
| velocity_lidar = np.array([[0, 0, 1.0]]).transpose() |
| velocity_lidar[:2, 0] = anno["gt_velocity"][bbox_index] |
| velocity_world = anno["lidar2global"][:3, :3] @ velocity_lidar |
|
|
| |
| box_7dof_global = np.array( |
| [ |
| float(bbox_center_world[0]), |
| float(bbox_center_world[1]), |
| float(bbox_center_world[2]), |
| ] |
| + [l, w, h] |
| + [float(yaw_global)] |
| ) |
| anno["gt_bboxes_global"][bbox_index, :7] = box_7dof_global |
| anno["gt_bboxes_global"][bbox_index, 7:] = velocity_world[:2, 0] |
| |
| |
|
|
| |
| box3d: Box3D = Box3D.array2bbox_xyzlwhyaw(box_7dof_lidar) |
| box_in_lidar: np.ndarray = Box3D.box2corners3d_lidar(box3d) |
| pc_sub: np.ndarray = box_sensor.extract_pc_in_box3d( |
| anno["lidar_pts"], box_in_lidar |
| )[ |
| 0 |
| ] |
| |
|
|
| |
| anno["num_lidar_pts"][bbox_index] = pc_sub.shape[0] |
|
|
| |
| if pc_sub.shape[0] > 0: |
| anno["valid_flag"][bbox_index] = True |
|
|
| |
| if vis: |
| box_in_ego: np.ndarray = box_transform.box_in_lidar_to_ego( |
| box_in_lidar, lidar2ego=anno["lidar2ego"], |
| ) |
| bbox_gt_in_ego_list.append(box_in_ego) |
| valid_id_list.append(anno["gt_inds"][bbox_index]) |
|
|
| if vis: |
| pts_filename = anno["lidar_path"] |
| log_root = pts_filename.split("/")[0] |
| save_root = f"./data/openscene-v1.1/vis_bbox/{data_split}/{log_root}/{anno['scene_name']}" |
|
|
| if not os.path.exists(save_root): |
| os.makedirs(save_root) |
| save_path = os.path.join( |
| save_root, f"{anno['frame_idx']:06d}_{anno['token']}.jpg" |
| ) |
|
|
| |
| box3d: Box3D = Box3D.array2bbox_xyzlwhyaw(anno["gt_sdc_bbox_lidar"][:7]) |
| sdc_box_in_lidar: np.ndarray = Box3D.box2corners3d_lidar(box3d) |
| sdc_box_in_ego: np.ndarray = box_transform.box_in_lidar_to_ego( |
| sdc_box_in_lidar, lidar2ego=anno["lidar2ego"], |
| ) |
| bbox_gt_in_ego_list.append(sdc_box_in_ego) |
| valid_id_list.append(-1) |
|
|
| |
| box_vis.vis_box_on_lidar( |
| anno["lidar_pts"], |
| save_path, |
| bbox_gt_in_ego_list=bbox_gt_in_ego_list, |
| bev=True, |
| id_list=valid_id_list, |
| left_hand=False, |
| ego2lidar=np.linalg.inv(anno["lidar2ego"]), |
| ) |
|
|
| return anno |
|
|
|
|
| def parse_data(anno: Dict, data_split: str, vis: bool = False) -> Dict: |
| """Convert carla data into the collection of path needed by mmdetection3D dataloader""" |
|
|
| anno: Dict = parse_ego_sensor_calib(anno, data_split, vis=vis) |
| anno: Dict = parse_bbox(anno, data_split, vis=vis) |
|
|
| return anno |
|
|
|
|
| def parse_sequence( |
| seq: str, |
| annos: List[Dict], |
| seq_index: List[int], |
| data_split: str, |
| pre_sec: float = 1.5, |
| fut_sec: float = 4, |
| vis: bool = False, |
| ) -> List[Dict]: |
| """Add past/future trajectory into data at each frame to allow prediction""" |
|
|
| anno_seq: List[Dict] = [annos[index] for index in seq_index] |
| print(f"processing: {seq}, number of frames {len(anno_seq)}") |
|
|
| |
| |
| def sort_data(data_dict: Dict): |
| return data_dict["frame_idx"] |
| anno_seq.sort(key=sort_data) |
|
|
| |
| min_frame_ID: int = anno_seq[0]["frame_idx"] |
| max_frame_ID: int = anno_seq[-1]["frame_idx"] |
| if len(anno_seq) < max_frame_ID - min_frame_ID + 1: |
| for index in seq_index: |
| annos[index] = None |
| |
| return annos |
|
|
| |
|
|
| |
| |
| |
|
|
| |
| for index in range(len(seq_index)): |
| anno: Dict = anno_seq[index] |
| frame_ID: int = anno["frame_idx"] |
| |
|
|
| |
| scene_token: str = anno["scene_token"] |
| assert scene_token == seq, "error, not the same sequence" |
|
|
| |
| |
| gt_bboxes_lidar: np.ndarray = anno["anns"]["gt_boxes"] |
| num_obj: int = gt_bboxes_lidar.shape[0] |
|
|
| |
| obj_ids: List[int] = anno["gt_inds"].tolist() |
| |
| |
|
|
| |
| l, w, h = anno["gt_sdc_bbox_lidar"][3:6] |
| lidar2global: np.ndarray = anno["lidar2global"] |
|
|
| |
| pre_frames: int = int(pre_sec * FPS_KEYFRAME) |
| fut_frames: int = int(fut_sec * FPS_KEYFRAME) |
|
|
| |
| |
| gt_fut_bbox_lidar: np.ndarray = np.zeros((num_obj, fut_frames, 9)) |
| gt_pre_bbox_lidar: np.ndarray = np.zeros((num_obj, pre_frames, 9)) |
|
|
| |
| gt_fut_bbox_mask: np.ndarray = np.zeros((num_obj, fut_frames, 1)) |
| gt_pre_bbox_mask: np.ndarray = np.zeros((num_obj, pre_frames, 1)) |
|
|
| |
| |
| |
| |
| gt_fut_bbox_lidar_all: List[np.ndarray] = [ |
| [np.empty([0])] for _ in range(fut_frames) |
| ] |
| gt_pre_bbox_lidar_all: List[np.ndarray] = [ |
| [np.empty([0])] for _ in range(pre_frames) |
| ] |
|
|
| |
| gt_pre_bbox_sdc_lidar: np.ndarray = np.zeros((1, pre_frames, 9)) |
| gt_fut_bbox_sdc_lidar: np.ndarray = np.zeros((1, fut_frames, 9)) |
| gt_pre_bbox_sdc_global: np.ndarray = np.zeros((1, pre_frames, 9)) |
| gt_fut_bbox_sdc_global: np.ndarray = np.zeros((1, fut_frames, 9)) |
| gt_pre_bbox_sdc_mask: np.ndarray = np.zeros((1, pre_frames, 1)) |
| gt_fut_bbox_sdc_mask: np.ndarray = np.zeros((1, fut_frames, 1)) |
| gt_pre_command_sdc: np.ndarray = np.zeros((1, pre_frames, 1)) |
| gt_fut_command_sdc: np.ndarray = np.zeros((1, fut_frames, 1)) |
|
|
| |
| |
| for pre_frame_index in range(1, pre_frames + 1): |
| pre_frame_ID = int(frame_ID - pre_frame_index * FRAME_INTERVAL) |
| pre_frame_index_in_seq: int = index - pre_frame_index |
|
|
| |
| if pre_frame_ID < min_frame_ID: |
| continue |
|
|
| |
| anno_pre_tmp: Dict = anno_seq[pre_frame_index_in_seq] |
| gt_pre_bbox_global_tmp: np.ndarray = anno_pre_tmp[ |
| "gt_bboxes_global" |
| ] |
| num_obj_pre: int = gt_pre_bbox_global_tmp.shape[0] |
| gt_pre_bbox_lidar_tmp = np.zeros((num_obj_pre, 9)) |
|
|
| |
| |
|
|
| |
| loc_global = np.concatenate( |
| (gt_pre_bbox_global_tmp[:, :3], np.ones((num_obj_pre, 1))), axis=1 |
| ).T |
| loc_lidar = np.linalg.inv(lidar2global) @ loc_global |
| gt_pre_bbox_lidar_tmp[:, :3] = loc_lidar[:3, :].transpose() |
|
|
| |
| gt_pre_bbox_lidar_tmp[:, 3:6] = gt_pre_bbox_global_tmp[:, 3:6] |
|
|
| |
| gt_pre_rot_global_rad = np.concatenate( |
| ( |
| np.zeros((num_obj_pre, 1)), |
| gt_pre_bbox_global_tmp[:, [6]], |
| np.zeros((num_obj_pre, 1)), |
| ), |
| axis=1, |
| ) |
| gt_pre_rot_global_deg = gt_pre_rot_global_rad / np.pi * 180 |
| for obj_index in range(num_obj_pre): |
| gt_pre_trans_global = calib_utils.create_transform( |
| gt_pre_bbox_global_tmp[obj_index, :3], |
| gt_pre_rot_global_deg[obj_index], |
| ) |
| gt_pre_trans_lidar = ( |
| np.linalg.inv(anno["lidar2global"]) @ gt_pre_trans_global |
| ) |
| gt_pre_yaw_lidar = scipy.spatial.transform.Rotation.from_matrix( |
| gt_pre_trans_lidar[:3, :3] |
| ).as_euler("xyz", degrees=False)[-1] |
| gt_pre_bbox_lidar_tmp[obj_index, 6] = gt_pre_yaw_lidar |
|
|
| |
| gt_pre_vel_global = np.concatenate( |
| (gt_pre_bbox_global_tmp[:, 7:], np.zeros((num_obj_pre, 1)),), |
| axis=1, |
| ) |
| gt_pre_vel_lidar: np.ndarray = ( |
| np.linalg.inv(lidar2global[:3, :3]) @ gt_pre_vel_global.T |
| ).T |
| gt_pre_bbox_lidar_tmp[:, 7:] = gt_pre_vel_lidar[:, :2] |
|
|
| |
| gt_pre_bbox_lidar_all[pre_frame_index - 1] = gt_pre_bbox_lidar_tmp |
|
|
| |
| |
| obj_ids_pre: List[int] = anno_pre_tmp["gt_inds"].tolist() |
| ( |
| obj_ids_common, |
| index_cur, |
| index_past, |
| ) = find_unique_common_from_lists(obj_ids, obj_ids_pre) |
|
|
| |
| if len(index_cur) > 0: |
| gt_pre_bbox_mask[np.array(index_cur), pre_frame_index - 1] = 1 |
|
|
| |
| gt_pre_bbox_lidar[ |
| np.array(index_cur), pre_frame_index - 1, : |
| ] = gt_pre_bbox_lidar_tmp[np.array(index_past), :] |
|
|
| |
| sdc_loc_global, _ = calib_utils.transform_matrix_to_vector( |
| anno_pre_tmp["ego2global"] |
| ) |
| sdc_loc_global = np.concatenate((sdc_loc_global, np.array([1]))) |
| sdc_loc_lidar = np.linalg.inv(lidar2global) @ sdc_loc_global |
| sdc_vel_global: np.ndarray = anno_pre_tmp["sdc_vel_global"] |
| sdc_vel_lidar: np.ndarray = ( |
| np.linalg.inv(lidar2global[:3, :3]) @ sdc_vel_global |
| ) |
| sdc_trans_lidar = ( |
| np.linalg.inv(lidar2global) @ anno_pre_tmp["ego2global"] |
| ) |
| yaw_lidar = scipy.spatial.transform.Rotation.from_matrix( |
| sdc_trans_lidar[:3, :3] |
| ).as_euler("xyz", degrees=False)[-1] |
| yaw_global = scipy.spatial.transform.Rotation.from_matrix( |
| anno_pre_tmp["ego2global"][:3, :3] |
| ).as_euler("xyz", degrees=False)[-1] |
| gt_pre_bbox_sdc_mask[0, pre_frame_index - 1] = 1 |
|
|
| |
| gt_sdc_bbox_lidar = np.array( |
| [ |
| sdc_loc_lidar[0], |
| sdc_loc_lidar[1], |
| sdc_loc_lidar[2], |
| l, |
| w, |
| h, |
| yaw_lidar, |
| sdc_vel_lidar[0], |
| sdc_vel_lidar[1], |
| ] |
| ) |
| gt_pre_bbox_sdc_lidar[0, pre_frame_index - 1] = gt_sdc_bbox_lidar |
| gt_sdc_bbox_global = np.array( |
| [ |
| sdc_loc_global[0], |
| sdc_loc_global[1], |
| sdc_loc_global[2], |
| l, |
| w, |
| h, |
| yaw_global, |
| sdc_vel_global[0], |
| sdc_vel_global[1], |
| ] |
| ) |
| gt_pre_bbox_sdc_global[0, pre_frame_index - 1] = gt_sdc_bbox_global |
|
|
| |
| gt_pre_command_sdc[0, pre_frame_index - 1]: int = anno_pre_tmp["command"] |
|
|
| |
| |
| for fut_frame_index in range(1, fut_frames + 1): |
| fut_frame_ID = int(frame_ID + fut_frame_index * FRAME_INTERVAL) |
| fut_frame_index_in_seq: int = index + fut_frame_index |
|
|
| |
| if fut_frame_ID > max_frame_ID: |
| continue |
|
|
| |
| try: |
| anno_fut_tmp: Dict = anno_seq[fut_frame_index_in_seq] |
| except: |
| print('fut_frame_index_in_seq', fut_frame_index_in_seq) |
| anno_fut_tmp: Dict = anno_seq[fut_frame_index_in_seq] |
|
|
| gt_fut_bbox_global_tmp: np.ndarray = anno_fut_tmp[ |
| "gt_bboxes_global" |
| ] |
| num_obj_fut: int = gt_fut_bbox_global_tmp.shape[0] |
| gt_fut_bbox_lidar_tmp = np.zeros((num_obj_fut, 9)) |
|
|
| |
| |
|
|
| |
| loc_global = np.concatenate( |
| (gt_fut_bbox_global_tmp[:, :3], np.ones((num_obj_fut, 1))), axis=1, |
| ).T |
| loc_lidar = np.linalg.inv(lidar2global) @ loc_global |
| gt_fut_bbox_lidar_tmp[:, :3] = loc_lidar[:3, :].transpose() |
|
|
| |
| gt_fut_bbox_lidar_tmp[:, 3:6] = gt_fut_bbox_global_tmp[:, 3:6] |
|
|
| |
| gt_fut_rot_global_rad = np.concatenate( |
| ( |
| np.zeros((num_obj_fut, 1)), |
| gt_fut_bbox_global_tmp[:, [6]], |
| np.zeros((num_obj_fut, 1)), |
| ), |
| axis=1, |
| ) |
| gt_fut_rot_global_deg = gt_fut_rot_global_rad / np.pi * 180 |
| for obj_index in range(num_obj_fut): |
| gt_trans_global_future = calib_utils.create_transform( |
| gt_fut_bbox_global_tmp[obj_index, :3], |
| gt_fut_rot_global_deg[obj_index], |
| ) |
| gt_fut_trans_lidar = ( |
| np.linalg.inv(anno["lidar2global"]) @ gt_trans_global_future |
| ) |
| gt_fut_yaw_lidar = scipy.spatial.transform.Rotation.from_matrix( |
| gt_fut_trans_lidar[:3, :3] |
| ).as_euler("xyz", degrees=False)[-1] |
| gt_fut_bbox_lidar_tmp[obj_index, 6] = gt_fut_yaw_lidar |
|
|
| |
| gt_fut_vel_global = np.concatenate( |
| (gt_fut_bbox_global_tmp[:, 7:], np.zeros((num_obj_fut, 1)),), |
| axis=1, |
| ) |
| gt_fut_vel_lidar: np.ndarray = ( |
| np.linalg.inv(lidar2global[:3, :3]) @ gt_fut_vel_global.T |
| ).T |
| gt_fut_bbox_lidar_tmp[:, 7:] = gt_fut_vel_lidar[:, :2] |
|
|
| |
| |
| |
| |
| |
| |
|
|
| |
| gt_fut_bbox_lidar_all[fut_frame_index - 1] = gt_fut_bbox_lidar_tmp |
|
|
| |
| |
| obj_ids_fut: List[int] = anno_fut_tmp["gt_inds"].tolist() |
| ( |
| obj_ids_common, |
| index_cur, |
| index_fut, |
| ) = find_unique_common_from_lists(obj_ids, obj_ids_fut) |
|
|
| |
| if len(index_cur) > 0: |
| gt_fut_bbox_mask[np.array(index_cur), fut_frame_index - 1] = 1 |
| |
| gt_fut_bbox_lidar[ |
| np.array(index_cur), fut_frame_index - 1, : |
| ] = gt_fut_bbox_lidar_tmp[np.array(index_fut), :] |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| sdc_loc_global, _ = calib_utils.transform_matrix_to_vector( |
| anno_fut_tmp["ego2global"] |
| ) |
| sdc_loc_global = np.concatenate((sdc_loc_global, np.array([1]))) |
| sdc_loc_lidar = np.linalg.inv(lidar2global) @ sdc_loc_global |
| sdc_vel_global: np.ndarray = anno_fut_tmp["sdc_vel_global"] |
| sdc_vel_lidar: np.ndarray = ( |
| np.linalg.inv(lidar2global[:3, :3]) @ sdc_vel_global |
| ) |
| sdc_trans_lidar = ( |
| np.linalg.inv(lidar2global) @ anno_fut_tmp["ego2global"] |
| ) |
| yaw_lidar = scipy.spatial.transform.Rotation.from_matrix( |
| sdc_trans_lidar[:3, :3] |
| ).as_euler("xyz", degrees=False)[-1] |
| yaw_global = scipy.spatial.transform.Rotation.from_matrix( |
| anno_fut_tmp["ego2global"][:3, :3] |
| ).as_euler("xyz", degrees=False)[-1] |
| gt_fut_bbox_sdc_mask[0, fut_frame_index - 1] = 1 |
|
|
| |
| gt_sdc_bbox_lidar = np.array( |
| [ |
| sdc_loc_lidar[0], |
| sdc_loc_lidar[1], |
| sdc_loc_lidar[2], |
| l, |
| w, |
| h, |
| yaw_lidar, |
| sdc_vel_lidar[0], |
| sdc_vel_lidar[1], |
| ] |
| ) |
| gt_fut_bbox_sdc_lidar[0, fut_frame_index - 1] = gt_sdc_bbox_lidar |
| gt_sdc_bbox_global = np.array( |
| [ |
| sdc_loc_global[0], |
| sdc_loc_global[1], |
| sdc_loc_global[2], |
| l, |
| w, |
| h, |
| yaw_global, |
| sdc_vel_global[0], |
| sdc_vel_global[1], |
| ] |
| ) |
| gt_fut_bbox_sdc_global[0, fut_frame_index - 1] = gt_sdc_bbox_global |
|
|
| |
| gt_fut_command_sdc[0, fut_frame_index - 1]: int = anno_fut_tmp["command"] |
|
|
| |
| gt_pre_bbox_lidar = np.flip(gt_pre_bbox_lidar, axis=1) |
| gt_pre_bbox_mask = np.flip(gt_pre_bbox_mask, axis=1) |
| gt_pre_bbox_sdc_lidar = np.flip(gt_pre_bbox_sdc_lidar, axis=1) |
| gt_pre_bbox_sdc_global = np.flip(gt_pre_bbox_sdc_global, axis=1) |
| gt_pre_bbox_sdc_mask = np.flip(gt_pre_bbox_sdc_mask, axis=1) |
| gt_pre_command_sdc = np.flip(gt_pre_command_sdc, axis=1) |
|
|
| |
| |
| anno["gt_pre_bbox_lidar"] = gt_pre_bbox_lidar |
| anno["gt_fut_bbox_lidar"] = gt_fut_bbox_lidar |
| anno["gt_pre_bbox_mask"] = gt_pre_bbox_mask |
| anno["gt_fut_bbox_mask"] = gt_fut_bbox_mask |
| anno["gt_pre_bbox_sdc_lidar"] = gt_pre_bbox_sdc_lidar |
| anno["gt_fut_bbox_sdc_lidar"] = gt_fut_bbox_sdc_lidar |
| anno["gt_pre_bbox_sdc_global"] = gt_pre_bbox_sdc_global |
| anno["gt_fut_bbox_sdc_global"] = gt_fut_bbox_sdc_global |
| anno["gt_pre_bbox_sdc_mask"] = gt_pre_bbox_sdc_mask |
| anno["gt_fut_bbox_sdc_mask"] = gt_fut_bbox_sdc_mask |
| anno["gt_pre_command_sdc"] = gt_pre_command_sdc |
| anno["gt_fut_command_sdc"] = gt_fut_command_sdc |
|
|
| |
| |
| |
| |
| |
|
|
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| if vis: |
| |
|
|
| pts_filename = anno["lidar_path"] |
| log_root = pts_filename.split("/")[0] |
| save_root = f"./data/openscene-v1.1/vis_seq/{data_split}/{log_root}/{anno['scene_name']}" |
| if not os.path.exists(save_root): |
| os.makedirs(save_root) |
| save_path = os.path.join( |
| save_root, f"{anno['frame_idx']:06d}_{anno['token']}.jpg" |
| ) |
|
|
| |
| bbox_gt_in_ego_list: List[np.ndarray] = list() |
| for bbox_index in range(num_obj): |
| box_7dof_lidar: np.ndarray = anno["gt_boxes"][bbox_index, :] |
| box3d: Box3D = Box3D.array2bbox_xyzlwhyaw(box_7dof_lidar) |
| box_in_lidar: np.ndarray = Box3D.box2corners3d_lidar(box3d) |
| box_in_ego: np.ndarray = box_transform.box_in_lidar_to_ego( |
| box_in_lidar, lidar2ego=anno["lidar2ego"], |
| ) |
| bbox_gt_in_ego_list.append(box_in_ego) |
|
|
| |
| box3d: Box3D = Box3D.array2bbox_xyzlwhyaw(anno["gt_sdc_bbox_lidar"][:7]) |
| sdc_box_in_lidar: np.ndarray = Box3D.box2corners3d_lidar(box3d) |
| sdc_box_in_ego: np.ndarray = box_transform.box_in_lidar_to_ego( |
| sdc_box_in_lidar, lidar2ego=anno["lidar2ego"], |
| ) |
| bbox_gt_in_ego_list.append(sdc_box_in_ego) |
|
|
| |
| bbox_fut_lidar_all = np.concatenate( |
| (anno["gt_fut_bbox_lidar"], anno["gt_fut_bbox_sdc_lidar"]), axis=0 |
| ) |
| bbox_fut_mask_all = np.concatenate( |
| (anno["gt_fut_bbox_mask"], anno["gt_fut_bbox_sdc_mask"]), axis=0 |
| ) |
| bbox_pre_lidar_all = np.concatenate( |
| (anno["gt_pre_bbox_lidar"], anno["gt_pre_bbox_sdc_lidar"]), axis=0 |
| ) |
| bbox_pre_mask_all = np.concatenate( |
| (anno["gt_pre_bbox_mask"], anno["gt_pre_bbox_sdc_mask"]), axis=0 |
| ) |
|
|
| |
| box_vis.vis_box_on_lidar( |
| anno["lidar_pts"], |
| save_path, |
| bbox_gt_in_ego_list=bbox_gt_in_ego_list, |
| bev=True, |
| id_list=obj_ids + [-1], |
| fut_traj=bbox_fut_lidar_all, |
| fut_traj_mask=bbox_fut_mask_all, |
| pre_traj=bbox_pre_lidar_all, |
| pre_traj_mask=bbox_pre_mask_all, |
| left_hand=False, |
| ego2lidar=np.linalg.inv(anno["lidar2ego"]), |
| ) |
|
|
| |
| del anno["lidar_pts"] |
|
|
| return annos |
|
|
|
|
| if __name__ == "__main__": |
| parser = argparse.ArgumentParser() |
| parser.add_argument( |
| "--data_root", |
| type=str, |
| default="/mnt/hdd2/datasets/navsim/openscene-v1.1", |
| help="root directory of raw carla data", |
| ) |
| parser.add_argument( |
| "--output_dir", |
| type=str, |
| default="/mnt/hdd2/datasets/navsim/openscene-v1.1/infos", |
| help="path of output file", |
| ) |
| parser.add_argument( |
| "--split", |
| type=str, |
| default="mini_val", |
| help="trainval_train/trainval_val/mini_train/mini_val", |
| ) |
| parser.add_argument( |
| "--vis", |
| action='store_true', |
| help="turn on the visualization", |
| ) |
| args = parser.parse_args() |
|
|
| |
| |
| |
| |
| |
|
|
| |
| data_src = f"{args.data_root}/infos/openscene_{args.split}.pkl" |
| with open(data_src, "rb") as f: |
| data_src = pickle.load(f) |
|
|
| seq_index: List[int] = [] |
| scene_token_pre = None |
|
|
| for frame_count_global, frame_data in enumerate(data_src): |
| |
| log_name = frame_data["log_name"] |
| log_token = frame_data["log_token"] |
| scene_name = frame_data["scene_name"] |
| scene_token = frame_data["scene_token"] |
| token = frame_data["token"] |
| frame_idx = frame_data["frame_idx"] |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
|
|
| |
| if scene_token != scene_token_pre and scene_token_pre is not None: |
| |
| |
| data_src: List[Dict] = parse_sequence( |
| scene_token_pre, data_src, seq_index, args.split, vis=args.vis, |
| ) |
| seq_index: List[int] = [] |
|
|
| |
| frame_data: Dict = parse_data(frame_data, args.split, vis=args.vis) |
| data_src[frame_count_global] = frame_data |
|
|
| |
| seq_index.append(frame_count_global) |
| scene_token_pre = scene_token |
|
|
| if frame_count_global % 10 == 0: |
| print("finished: ", frame_count_global) |
|
|
| |
| data_src: List[Dict] = parse_sequence( |
| scene_token_pre, data_src, seq_index, args.split |
| ) |
| print("total length: ", len(data_src)) |
|
|
| |
| save_data = [] |
| for data_dict in data_src: |
| if data_dict is not None: |
| save_data.append(data_dict) |
| print("total length after cleaning: ", len(save_data)) |
|
|
| |
| data_to_save = { |
| "infos": save_data, |
| "mapping_tracktoken2globalid": mapping_tracktoken2globalid, |
| } |
| output_path = os.path.join(args.output_dir, f"nuplan_{args.split}.pkl") |
| with open(output_path, "wb") as f: |
| pkl.dump(data_to_save, f) |
|
|