Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: set dataset index #28

Merged
merged 1 commit into from
Sep 6, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 24 additions & 19 deletions driving_log_replayer_v2/launch/driving_log_replayer_v2.launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,19 @@ def check_launch_component(conf: dict) -> dict:
return launch_component


def get_dataset_index(idx_str: str, dataset_length: int) -> int | None:
if idx_str == "": # default value
if dataset_length == 1:
return 0
return None
try:
idx_int = int(idx_str)
if idx_int < 0 or idx_int > dataset_length:
return None
except ValueError:
return None


def ensure_arg_compatibility(context: LaunchContext) -> list:
conf = context.launch_configurations
scenario_path = Path(conf["scenario_path"])
Expand All @@ -118,24 +131,16 @@ def ensure_arg_compatibility(context: LaunchContext) -> list:

with scenario_path.open() as scenario_file:
yaml_obj = yaml.safe_load(scenario_file)
# check datasets length and index

datasets = yaml_obj["Evaluation"]["Datasets"]
idx_str = conf["dataset_index"]
if idx_str == "": # default value
if len(datasets) == 1:
dataset_index = 0
else:
return [
LogInfo(msg="launch argument 'dataset_index:=i' is required"),
]
else:
dataset_index = int(idx_str)
dataset_index = get_dataset_index(conf["dataset_index"], len(datasets))
if dataset_index is None:
return [LogInfo(msg=f"dataset_index={conf['dataset_index']} is invalid")]

for k, v in datasets[dataset_index].items():
dataset_path = dataset_dir.joinpath(k)
t4_dataset_path = dataset_dir.joinpath(k)
conf["vehicle_id"] = v["VehicleId"]
init_pose: dict | None = v.get(
"InitialPose",
) # nullに設定されている。または書かれてない場合はNone
init_pose: dict | None = v.get("InitialPose")
if init_pose is not None:
conf["initial_pose"] = json.dumps(init_pose)
direct_pose: dict | None = v.get("DirectInitialPose")
Expand All @@ -144,19 +149,19 @@ def ensure_arg_compatibility(context: LaunchContext) -> list:
goal_pose: dict | None = v.get("GoalPose")
if goal_pose is not None:
conf["goal_pose"] = json.dumps(goal_pose)
conf["map_path"] = dataset_path.joinpath("map").as_posix()
conf["t4_dataset_path"] = t4_dataset_path.as_posix()
conf["vehicle_model"] = yaml_obj["VehicleModel"]
conf["sensor_model"] = yaml_obj["SensorModel"]
conf["t4_dataset_path"] = dataset_path.as_posix()
conf["input_bag"] = dataset_path.joinpath("input_bag").as_posix()
conf["map_path"] = t4_dataset_path.joinpath("map").as_posix()
conf["input_bag"] = t4_dataset_path.joinpath("input_bag").as_posix()
conf["result_json_path"] = output_dir.joinpath("result.json").as_posix()
conf["result_bag_path"] = output_dir.joinpath("result_bag").as_posix()
conf["result_archive_path"] = output_dir.joinpath("result_archive").as_posix()
conf["use_case"] = yaml_obj["Evaluation"]["UseCaseName"]

return [
LogInfo(
msg=f"{dataset_path=}, {dataset_index=}, {output_dir=}, use_case={conf['use_case']}",
msg=f"{t4_dataset_path=}, {dataset_index=}, {output_dir=}, use_case={conf['use_case']}",
),
LogInfo(
msg=f"{check_launch_component(conf)=}",
Expand Down