Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: launch argument #98

Merged
merged 4 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 41 additions & 30 deletions driving_log_replayer_v2/driving_log_replayer_v2/launch/argument.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,43 @@ def is_arg_valid(conf: dict) -> LogInfo | None:
return None


def load_scenario(scenario_path: Path) -> dict:
with scenario_path.open() as scenario_file:
return yaml.safe_load(scenario_file)


def get_dataset_index_from_conf(conf: dict, datasets: list[dict]) -> int | str:
if conf["t4_dataset_path"] != "":
return extract_index_from_id(conf["t4_dataset_id"], datasets)
return get_dataset_index(conf["dataset_index"], len(datasets))


def update_conf_with_dataset_info(
conf: dict,
t4_dataset_path: Path,
yaml_obj: dict,
dataset_info: dict,
output_dir: Path,
) -> None:
v = dataset_info
conf["vehicle_id"] = v["VehicleId"]
conf["initial_pose"] = json.dumps(v.get("InitialPose", {}))
conf["direct_initial_pose"] = json.dumps(v.get("DirectInitialPose", {}))
conf["goal_pose"] = json.dumps(v.get("GoalPose", {}))
conf["t4_dataset_path"] = t4_dataset_path.as_posix()
conf["vehicle_model"] = yaml_obj["VehicleModel"]
conf["sensor_model"] = yaml_obj["SensorModel"]
conf["map_path"] = t4_dataset_path.joinpath("map").as_posix()
conf["input_bag"] = t4_dataset_path.joinpath("input_bag").as_posix()
conf["result_json_path"] = output_dir.joinpath("result.json").as_posix()
conf["result_bag_path"] = output_dir.joinpath("result_bag").as_posix()
conf["result_archive_path"] = output_dir.joinpath("result_archive").as_posix()
conf["use_case"] = yaml_obj["Evaluation"]["UseCaseName"]

if conf["use_case"] == "dlr_all":
conf["record_only"] = "true"


def ensure_arg_compatibility(context: LaunchContext) -> list:
conf = context.launch_configurations
is_valid = is_arg_valid(conf)
Expand All @@ -200,43 +237,17 @@ def ensure_arg_compatibility(context: LaunchContext) -> list:
output_dir = create_output_dir(conf["output_dir"], scenario_path)
conf["output_dir"] = output_dir.as_posix()

with scenario_path.open() as scenario_file:
yaml_obj = yaml.safe_load(scenario_file)

yaml_obj = load_scenario(scenario_path)
datasets = yaml_obj["Evaluation"]["Datasets"]
if conf["t4_dataset_path"] != "":
dataset_index = extract_index_from_id(conf["t4_dataset_id"], datasets)
else:
dataset_index = get_dataset_index(conf["dataset_index"], len(datasets))
dataset_index = get_dataset_index_from_conf(conf, datasets)
if isinstance(dataset_dir, str):
return [LogInfo(msg=dataset_index)]

k, v = next(iter(datasets[dataset_index].items()))
t4_dataset_path = (
Path(conf["t4_dataset_path"]) if conf["t4_dataset_path"] != "" else dataset_dir.joinpath(k)
) # t4_dataset_pathが引数で渡されていたら更新しない。指定ない場合はdata_dirから作る
conf["vehicle_id"] = v["VehicleId"]
init_pose: dict | None = v.get("InitialPose")
if init_pose is not None:
conf["initial_pose"] = json.dumps(init_pose)
direct_pose: dict | None = v.get("DirectInitialPose")
if direct_pose is not None:
conf["direct_initial_pose"] = json.dumps(direct_pose)
goal_pose: dict | None = v.get("GoalPose")
if goal_pose is not None:
conf["goal_pose"] = json.dumps(goal_pose)
conf["t4_dataset_path"] = t4_dataset_path.as_posix()
conf["vehicle_model"] = yaml_obj["VehicleModel"]
conf["sensor_model"] = yaml_obj["SensorModel"]
conf["map_path"] = t4_dataset_path.joinpath("map").as_posix()
conf["input_bag"] = t4_dataset_path.joinpath("input_bag").as_posix()
conf["result_json_path"] = output_dir.joinpath("result.json").as_posix()
conf["result_bag_path"] = output_dir.joinpath("result_bag").as_posix()
conf["result_archive_path"] = output_dir.joinpath("result_archive").as_posix()
conf["use_case"] = yaml_obj["Evaluation"]["UseCaseName"]

if conf["use_case"] == "dlr_all":
conf["record_only"] = "true"
) # Do not update if t4_dataset_path is set by argument. If not, create t4_dataset_path from data_dir
update_conf_with_dataset_info(conf, t4_dataset_path, yaml_obj, v, output_dir)

return [
LogInfo(
Expand Down
20 changes: 11 additions & 9 deletions driving_log_replayer_v2/launch/driving_log_replayer_v2.launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,17 +151,18 @@ def launch_topic_state_monitor(context: LaunchContext) -> list:

def launch_initial_pose_node(context: LaunchContext) -> list:
conf = context.launch_configurations
initial_pose = conf.get("initial_pose", "")
direct_initial_pose = conf.get("direct_initial_pose", "")
initial_pose = conf["initial_pose"]
direct_initial_pose = conf["direct_initial_pose"]

if initial_pose == "{}" and direct_initial_pose == "{}":
return [LogInfo(msg="initial_pose_node is not activated")]

params = {
"use_sim_time": True,
"initial_pose": initial_pose,
"direct_initial_pose": direct_initial_pose,
}

if initial_pose == "" and direct_initial_pose == "":
return [LogInfo(msg="initial_pose_node is not activated")]

return [
Node(
package="driving_log_replayer_v2",
Expand All @@ -176,15 +177,16 @@ def launch_initial_pose_node(context: LaunchContext) -> list:

def launch_goal_pose_node(context: LaunchContext) -> list:
conf = context.launch_configurations
goal_pose = conf.get("goal_pose", "")
goal_pose = conf["goal_pose"]

if goal_pose == "{}":
return [LogInfo(msg="goal_pose_node is not activated")]

params = {
"use_sim_time": True,
"goal_pose": goal_pose,
}

if goal_pose == "":
return [LogInfo(msg="goal_pose_node is not activated")]

return [
Node(
package="driving_log_replayer_v2",
Expand Down
2 changes: 1 addition & 1 deletion driving_log_replayer_v2/scripts/goal_pose_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __init__(self) -> None:
self.declare_parameter("goal_pose", "")
self._goal_pose_str = self.get_parameter("goal_pose").get_parameter_value().string_value

if self._goal_pose_str == "":
if self._goal_pose_str == "{}":
rclpy.shutdown()

self._goal_pose_running: bool = False
Expand Down
6 changes: 3 additions & 3 deletions driving_log_replayer_v2/scripts/initial_pose_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ def __init__(self) -> None:
self.get_logger().error(f"{self._direct_initial_pose_str=}")
"""

if self._initial_pose_str == "" and self._direct_initial_pose_str == "":
if self._initial_pose_str == "{}" and self._direct_initial_pose_str == "{}":
rclpy.shutdown()

# initial pose estimation
self._initial_pose_running: bool = False
if self._initial_pose_str != "":
if self._initial_pose_str != "{}":
self._initial_pose = arg_to_initial_pose(self._initial_pose_str)
self._initial_pose_method: int = InitializeLocalization.Request.AUTO
if self._direct_initial_pose_str != "":
if self._direct_initial_pose_str != "{}":
self._initial_pose = arg_to_initial_pose(self._direct_initial_pose_str)
self._initial_pose_method: int = InitializeLocalization.Request.DIRECT

Expand Down
Loading