From 5ae7f78cd77265cebc69a8acb5c1daf98bb28588 Mon Sep 17 00:00:00 2001
From: "pre-commit-ci[bot]"
<66853113+pre-commit-ci[bot]@users.noreply.github.com>
Date: Mon, 24 Feb 2025 08:57:41 +0000
Subject: [PATCH] style(pre-commit): autofix
---
map/autoware_tp_collector/package.xml | 2 +-
.../scripts/tp_checker.py | 168 +++++++++++-------
.../scripts/tp_collector.py | 163 ++++++++++-------
.../scripts/tp_utility.py | 140 ++++++++-------
.../scripts/tp_visualizer.py | 97 +++++-----
5 files changed, 328 insertions(+), 242 deletions(-)
diff --git a/map/autoware_tp_collector/package.xml b/map/autoware_tp_collector/package.xml
index 7e9bb440..611caeeb 100644
--- a/map/autoware_tp_collector/package.xml
+++ b/map/autoware_tp_collector/package.xml
@@ -14,8 +14,8 @@
tier4_debug_msgs
libpcl-all-dev
- yaml-cpp
tier4_debug_msgs
+ yaml-cpp
ament_cmake
diff --git a/map/autoware_tp_collector/scripts/tp_checker.py b/map/autoware_tp_collector/scripts/tp_checker.py
index 00968084..a86038e7 100755
--- a/map/autoware_tp_collector/scripts/tp_checker.py
+++ b/map/autoware_tp_collector/scripts/tp_checker.py
@@ -14,35 +14,34 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import os
import argparse
-from rosbag2_py import (
- SequentialReader,
- StorageOptions,
- ConverterOptions,
- Info
-)
-
-from rclpy.serialization import deserialize_message, serialize_message
-import os
import csv
-import yaml
-from scipy import spatial as sp
-import numpy as np
+import os
+import shutil
+import struct
+import time
+
+from builtin_interfaces.msg import Time
from geometry_msgs.msg import PoseWithCovarianceStamped
-import sensor_msgs.msg as sensor_msgs
-import std_msgs.msg as std_msgs
+import numpy as np
+import open3d as o3d
import rclpy
from rclpy.node import Node
-from tier4_debug_msgs.msg import Float32Stamped
-from builtin_interfaces.msg import Time
-import tqdm
-import time
-import shutil
-import open3d as o3d
+from rclpy.serialization import deserialize_message
+from rclpy.serialization import serialize_message
+from rosbag2_py import ConverterOptions
+from rosbag2_py import Info
+from rosbag2_py import SequentialReader
+from rosbag2_py import StorageOptions
+from scipy import spatial as sp
+import sensor_msgs.msg as sensor_msgs
import sensor_msgs_py.point_cloud2 as pc2
-import struct
+import std_msgs.msg as std_msgs
+from tier4_debug_msgs.msg import Float32Stamped
import tp_utility as tpu
+import tqdm
+import yaml
+
def mark_changes(candidate_segments, rosbag_tp: float, segment_dict, segment_df):
tp_sum = 0.0
@@ -57,21 +56,23 @@ def mark_changes(candidate_segments, rosbag_tp: float, segment_dict, segment_df)
else:
expected_tp = 0
- if (expected_tp > 0 and abs(expected_tp - rosbag_tp) / expected_tp >= 0.2) or \
- (expected_tp == 0 and rosbag_tp > 0):
+ if (expected_tp > 0 and abs(expected_tp - rosbag_tp) / expected_tp >= 0.2) or (
+ expected_tp == 0 and rosbag_tp > 0
+ ):
for key in candidate_segments:
if key in segment_dict:
segment_df[segment_dict[key], 2] += 1
+
class TPChecker(Node):
def __init__(self):
- super().__init__('tp_checker')
+ super().__init__("tp_checker")
self.segment_df = None # Segment indices, TP
self.segment_dict = {} # key: segment name, value: index to the segment_df
- self.pcd_path = None # Path to the directory containing PCD segments
- self.changed_dir = None # A directory contains the segments that need to be examined
- self.result_csv = None # Path to the result CSV file
+ self.pcd_path = None # Path to the directory containing PCD segments
+ self.changed_dir = None # A directory contains the segments that need to be examined
+ self.result_csv = None # Path to the result CSV file
self.tp_path = None # Path to the file that contains TPs of map segments
def __initialize(self, pcd_map_dir: str):
@@ -94,7 +95,11 @@ def __initialize(self, pcd_map_dir: str):
self.tp_path = os.path.join(pcd_map_dir, "scores.csv")
if not os.path.exists(self.tp_path):
- print("Error: A TP file, which contains the TPs of map segments, is not found at {0}! Abort!".format(self.tp_path))
+ print(
+ "Error: A TP file, which contains the TPs of map segments, is not found at {0}! Abort!".format(
+ self.tp_path
+ )
+ )
exit()
# Read the input map directory and setup the segment dictionary
@@ -112,8 +117,8 @@ def __get_pcd_segments_and_scores(self):
elif key == "x_resolution":
self.resolution = value
- self.segment_df = np.array(self.segment_df, dtype = object)
-
+ self.segment_df = np.array(self.segment_df, dtype=object)
+
# Load the TPs
with open(self.tp_path, "r") as f:
reader = csv.reader(f)
@@ -136,15 +141,21 @@ def __show(self):
dtype = np.float32
itemsize = np.dtype(dtype).itemsize
- fields = [sensor_msgs.PointField(name = "x", offset = 0, datatype = ros_float_dtype, count = 1),
- sensor_msgs.PointField(name = "y", offset = itemsize, datatype = ros_float_dtype, count = 1),
- sensor_msgs.PointField(name = "z", offset = itemsize * 2, datatype = ros_float_dtype, count = 1),
- sensor_msgs.PointField(name = "rgba", offset = itemsize * 3, datatype = ros_uint32_dtype, count = 1)]
+ fields = [
+ sensor_msgs.PointField(name="x", offset=0, datatype=ros_float_dtype, count=1),
+ sensor_msgs.PointField(name="y", offset=itemsize, datatype=ros_float_dtype, count=1),
+ sensor_msgs.PointField(
+ name="z", offset=itemsize * 2, datatype=ros_float_dtype, count=1
+ ),
+ sensor_msgs.PointField(
+ name="rgba", offset=itemsize * 3, datatype=ros_uint32_dtype, count=1
+ ),
+ ]
points = []
pc2_width = 0
- progress_bar = tqdm.tqdm(total = len(self.segment_df))
+ progress_bar = tqdm.tqdm(total=len(self.segment_df))
origin = None
for i in range(self.segment_df.shape[0]):
@@ -167,65 +178,84 @@ def __show(self):
header.frame_id = "map"
pc2_msg = pc2.create_cloud(header, fields, points)
- pcd_publisher = self.create_publisher(sensor_msgs.PointCloud2, '/autoware_tp_checker', 10)
+ pcd_publisher = self.create_publisher(sensor_msgs.PointCloud2, "/autoware_tp_checker", 10)
while True:
pcd_publisher.publish(pc2_msg)
time.sleep(5)
-
def __set_color_based_on_mark(self, mark) -> int:
# The may-be-changed segments are colored red
# The may-not-be-changed segments are colored white
if mark >= 100:
r = 255
g = 0
- b = 0
+ b = 0
else:
r = 255
g = 255
b = 255
a = 255
- tmp_rgb = struct.pack('BBBB', b, g, r, a)
- rgba = struct.unpack('I', tmp_rgb)[0]
+ tmp_rgb = struct.pack("BBBB", b, g, r, a)
+ rgba = struct.unpack("I", tmp_rgb)[0]
return rgba
- def processing(self,
- pcd_path: str,
- rosbag_path: str,
- result_path: str,
- pose_topic: str,
- tp_topic: str,
- scan_topic: str):
+ def processing(
+ self,
+ pcd_path: str,
+ rosbag_path: str,
+ result_path: str,
+ pose_topic: str,
+ tp_topic: str,
+ scan_topic: str,
+ ):
self.__initialize(pcd_path)
self.__get_pcd_segments_and_scores()
- tpu.collect_rosbag_tp(rosbag_path, pose_topic, tp_topic, scan_topic, self.resolution,
- mark_changes, self.segment_dict, self.segment_df)
+ tpu.collect_rosbag_tp(
+ rosbag_path,
+ pose_topic,
+ tp_topic,
+ scan_topic,
+ self.resolution,
+ mark_changes,
+ self.segment_dict,
+ self.segment_df,
+ )
self.__save_results()
self.__show()
+
if __name__ == "__main__":
rclpy.init()
parser = argparse.ArgumentParser()
parser.add_argument("map_path", help="The path to the PCD folder")
parser.add_argument("bag_path", help="The path to the input rosbag")
parser.add_argument("result_path", help="The path to the result folder")
- parser.add_argument("--pose_topic",
- help = "Pose topic",
- default = "/localization/pose_twist_fusion_filter/pose_with_covariance_without_yawbias",
- required = False, type = str)
- parser.add_argument("--tp_topic",
- help = "TP topic",
- default = "/localization/pose_estimator/transform_probability",
- required = False, type = str)
- parser.add_argument("--scan_topic",
- help = "Point cloud topic",
- default = "/localization/util/downsample/pointcloud",
- required = False, type = str)
+ parser.add_argument(
+ "--pose_topic",
+ help="Pose topic",
+ default="/localization/pose_twist_fusion_filter/pose_with_covariance_without_yawbias",
+ required=False,
+ type=str,
+ )
+ parser.add_argument(
+ "--tp_topic",
+ help="TP topic",
+ default="/localization/pose_estimator/transform_probability",
+ required=False,
+ type=str,
+ )
+ parser.add_argument(
+ "--scan_topic",
+ help="Point cloud topic",
+ default="/localization/util/downsample/pointcloud",
+ required=False,
+ type=str,
+ )
args = parser.parse_args()
@@ -241,9 +271,11 @@ def processing(self,
checker = TPChecker()
# checker.processing(args.map_path, args.bag_path, args.result_path)
- checker.processing(args.map_path,
- args.bag_path,
- args.result_path,
- args.pose_topic,
- args.tp_topic,
- args.scan_topic)
+ checker.processing(
+ args.map_path,
+ args.bag_path,
+ args.result_path,
+ args.pose_topic,
+ args.tp_topic,
+ args.scan_topic,
+ )
diff --git a/map/autoware_tp_collector/scripts/tp_collector.py b/map/autoware_tp_collector/scripts/tp_collector.py
index 4186edc4..9b38e5c4 100755
--- a/map/autoware_tp_collector/scripts/tp_collector.py
+++ b/map/autoware_tp_collector/scripts/tp_collector.py
@@ -14,32 +14,32 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import os
import argparse
-from rosbag2_py import (
- SequentialReader,
- StorageOptions,
- ConverterOptions,
- Info
-)
-
-from rclpy.serialization import deserialize_message, serialize_message
-import os
import csv
-import yaml
-from scipy import spatial as sp
-import numpy as np
-from geometry_msgs.msg import PoseWithCovarianceStamped, Pose
-from tier4_debug_msgs.msg import Float32Stamped
+import os
+import shutil
+from subprocess import call
+
from builtin_interfaces.msg import Time
-import tqdm
-import sensor_msgs.msg as sensor_msgs
-import sensor_msgs_py.point_cloud2 as pc2
+from geometry_msgs.msg import Pose
+from geometry_msgs.msg import PoseWithCovarianceStamped
+import numpy as np
import rclpy
from rclpy.node import Node
-import shutil
-from subprocess import call
+from rclpy.serialization import deserialize_message
+from rclpy.serialization import serialize_message
+from rosbag2_py import ConverterOptions
+from rosbag2_py import Info
+from rosbag2_py import SequentialReader
+from rosbag2_py import StorageOptions
+from scipy import spatial as sp
+import sensor_msgs.msg as sensor_msgs
+import sensor_msgs_py.point_cloud2 as pc2
+from tier4_debug_msgs.msg import Float32Stamped
import tp_utility as tpu
+import tqdm
+import yaml
+
# Update the TP of a segment at index @idx, given a newly TP value
def update_avg_tp(candidate_segments, new_tp, segment_dict, segment_df):
@@ -49,14 +49,15 @@ def update_avg_tp(candidate_segments, new_tp, segment_dict, segment_df):
tp, counter = segment_df[i, [1, 2]]
segment_df[i, [1, 2]] = [tp + 1.0 / (counter + 1) * (new_tp - tp), counter + 1]
+
class TPCollector(Node):
def __init__(self):
- super().__init__('tp_collector')
+ super().__init__("tp_collector")
self.pcd_path = None
self.yaml_path = None
self.score_path = None
self.segment_df = None
- self.segment_dict = {} # Pairs of 2D coordinate and index
+ self.segment_dict = {} # Pairs of 2D coordinate and index
self.resolution = None
def __initialize(self, pcd_map_dir: str, output_path: str, resolution: float):
@@ -93,11 +94,19 @@ def __get_pcd_segments_and_scores(self):
self.yaml_path = os.path.join(self.output_path, "pointcloud_map_metadata.yaml")
if not os.path.exists(self.yaml_path):
- ds_cmd = "ros2 launch autoware_pointcloud_divider pointcloud_divider.launch.xml " + \
- "input_pcd_or_dir:=" + self.pcd_path + " output_pcd_dir:=" + self.output_path + \
- " prefix:=test leaf_size:=0.5" + " grid_size_x:=" + str(self.resolution) + \
- " grid_size_y:=" + str(self.resolution)
- call(ds_cmd, shell = True)
+ ds_cmd = (
+ "ros2 launch autoware_pointcloud_divider pointcloud_divider.launch.xml "
+ + "input_pcd_or_dir:="
+ + self.pcd_path
+ + " output_pcd_dir:="
+ + self.output_path
+ + " prefix:=test leaf_size:=0.5"
+ + " grid_size_x:="
+ + str(self.resolution)
+ + " grid_size_y:="
+ + str(self.resolution)
+ )
+ call(ds_cmd, shell=True)
self.pcd_path = os.path.join(self.output_path, "pointcloud_map.pcd")
# Now scan the downsample directory and get the segment list
@@ -108,7 +117,7 @@ def __get_pcd_segments_and_scores(self):
seg_key = str(value[0]) + "_" + str(value[1])
self.segment_dict[seg_key] = len(self.segment_df) - 1
- self.segment_df = np.array(self.segment_df, dtype = object)
+ self.segment_df = np.array(self.segment_df, dtype=object)
# Load the score map
self.score_path = os.path.join(self.output_path, "scores.csv")
@@ -119,29 +128,30 @@ def __get_pcd_segments_and_scores(self):
# Skip the header
next(reader)
- # Load the current maps' TPs
+ # Load the current maps' TPs
for index, row in enumerate(reader):
self.segment_df[index, [1, 2]] = [float(row[1]), 1]
-
+
##### Save the segment TPs #####
def __save_tps(self):
print("Saving TP to files")
with open(self.score_path, "w") as f:
f.write("segment,tp\n")
print("Number of segments = {0}".format(self.segment_df.shape[0]))
- for i in np.arange(0, self.segment_df.shape[0], dtype = int):
+ for i in np.arange(0, self.segment_df.shape[0], dtype=int):
f.write("{0},{1}\n".format(self.segment_df[i, 0], self.segment_df[i, 1]))
print("Done. Segments' TPs are saved at {0}.".format(self.score_path))
-
- def processing(self,
- pcd_map_dir: str,
- rosbag_path: str,
- output_path: str,
- resolution: float,
- pose_topic: str,
- tp_topic: str,
- scan_topic: str):
+ def processing(
+ self,
+ pcd_map_dir: str,
+ rosbag_path: str,
+ output_path: str,
+ resolution: float,
+ pose_topic: str,
+ tp_topic: str,
+ scan_topic: str,
+ ):
# Initialize paths to directories
self.__initialize(pcd_map_dir, output_path, resolution)
@@ -149,32 +159,51 @@ def processing(self,
self.__get_pcd_segments_and_scores()
# Read the rosbag and get the ndt poses and corresponding tps
- tpu.collect_rosbag_tp(rosbag_path, pose_topic, tp_topic, scan_topic,
- self.resolution, update_avg_tp,
- self.segment_dict, self.segment_df)
+ tpu.collect_rosbag_tp(
+ rosbag_path,
+ pose_topic,
+ tp_topic,
+ scan_topic,
+ self.resolution,
+ update_avg_tp,
+ self.segment_dict,
+ self.segment_df,
+ )
# Save the new TPs
self.__save_tps()
+
if __name__ == "__main__":
rclpy.init()
parser = argparse.ArgumentParser()
- parser.add_argument("map_path", help = "The path to the PCD folder", type = str)
- parser.add_argument("bag_path", help = "The path to the input rosbag", type = str)
- parser.add_argument("output", help = "The path to the output directory", type = str)
- parser.add_argument("--resolution", help = "Map segment resolution", default = 20, required = False, type = float)
- parser.add_argument("--pose_topic",
- help = "Pose topic",
- default = "/localization/pose_twist_fusion_filter/pose_with_covariance_without_yawbias",
- required = False, type = str)
- parser.add_argument("--tp_topic",
- help = "TP topic",
- default = "/localization/pose_estimator/transform_probability",
- required = False, type = str)
- parser.add_argument("--scan_topic",
- help = "Point cloud topic",
- default = "/localization/util/downsample/pointcloud",
- required = False, type = str)
+ parser.add_argument("map_path", help="The path to the PCD folder", type=str)
+ parser.add_argument("bag_path", help="The path to the input rosbag", type=str)
+ parser.add_argument("output", help="The path to the output directory", type=str)
+ parser.add_argument(
+ "--resolution", help="Map segment resolution", default=20, required=False, type=float
+ )
+ parser.add_argument(
+ "--pose_topic",
+ help="Pose topic",
+ default="/localization/pose_twist_fusion_filter/pose_with_covariance_without_yawbias",
+ required=False,
+ type=str,
+ )
+ parser.add_argument(
+ "--tp_topic",
+ help="TP topic",
+ default="/localization/pose_estimator/transform_probability",
+ required=False,
+ type=str,
+ )
+ parser.add_argument(
+ "--scan_topic",
+ help="Point cloud topic",
+ default="/localization/util/downsample/pointcloud",
+ required=False,
+ type=str,
+ )
args = parser.parse_args()
@@ -189,10 +218,12 @@ def processing(self,
# Run
tp_collector = TPCollector()
- tp_collector.processing(args.map_path,
- args.bag_path,
- args.output,
- args.resolution,
- args.pose_topic,
- args.tp_topic,
- args.scan_topic)
+ tp_collector.processing(
+ args.map_path,
+ args.bag_path,
+ args.output,
+ args.resolution,
+ args.pose_topic,
+ args.tp_topic,
+ args.scan_topic,
+ )
diff --git a/map/autoware_tp_collector/scripts/tp_utility.py b/map/autoware_tp_collector/scripts/tp_utility.py
index 759b367c..fa3beb4d 100644
--- a/map/autoware_tp_collector/scripts/tp_utility.py
+++ b/map/autoware_tp_collector/scripts/tp_utility.py
@@ -1,26 +1,27 @@
-from rosbag2_py import (
- SequentialReader,
- StorageOptions,
- ConverterOptions,
- Info
-)
-from rclpy.serialization import deserialize_message, serialize_message
-from geometry_msgs.msg import PoseWithCovarianceStamped, Pose, Point, Quaternion
-from tier4_debug_msgs.msg import Float32Stamped
-import sensor_msgs_py.point_cloud2 as pc2
-import sensor_msgs.msg as sensor_msgs
+from geometry_msgs.msg import Point
+from geometry_msgs.msg import Pose
+from geometry_msgs.msg import PoseWithCovarianceStamped
+from geometry_msgs.msg import Quaternion
import numpy as np
+from rclpy.serialization import deserialize_message
+from rclpy.serialization import serialize_message
+from rosbag2_py import ConverterOptions
+from rosbag2_py import Info
+from rosbag2_py import SequentialReader
+from rosbag2_py import StorageOptions
+import sensor_msgs.msg as sensor_msgs
+import sensor_msgs_py.point_cloud2 as pc2
+from tier4_debug_msgs.msg import Float32Stamped
import tqdm
-
-class FixQueue():
+class FixQueue:
def __init__(self):
- self.queue_limit = 100 # Max number of items in the queue
- self.data_queue = None # Queue of data
- self.head = 0 # Index to append a new item (enqueue index)
- self.tail = 0 # Index to start remove/process items (dequeue index)
- self.current_size = 0 # Number of items in the queue
+ self.queue_limit = 100 # Max number of items in the queue
+ self.data_queue = None # Queue of data
+ self.head = 0 # Index to append a new item (enqueue index)
+ self.tail = 0 # Index to start remove/process items (dequeue index)
+ self.current_size = 0 # Number of items in the queue
def enqueue(self, item):
if self.is_full():
@@ -28,7 +29,7 @@ def enqueue(self, item):
return False
if self.is_empty() or self.data_queue.shape[1] != len(item):
- self.data_queue = np.ndarray((self.queue_limit, len(item)), dtype = object)
+ self.data_queue = np.ndarray((self.queue_limit, len(item)), dtype=object)
self.data_queue[self.tail, :] = item
self.tail += 1
@@ -39,7 +40,7 @@ def enqueue(self, item):
return True
- def drop(self, limitless = False, callback_func = None, *args):
+ def drop(self, limitless=False, callback_func=None, *args):
if limitless == True:
end_id = self.tail
else:
@@ -64,16 +65,17 @@ def is_full(self) -> bool:
if self.current_size == self.queue_limit:
return True
return False
-
+
def is_empty(self) -> bool:
if self.current_size == 0:
return True
return False
+
# Some utility functions that is shared by both the tp collector and tp checker
# Convert a pose to a 4x4 transformation matrix
def __pose_to_mat(pose: Pose) -> np.array:
- t = pose.position # Translation
+ t = pose.position # Translation
q = pose.orientation # Rotation
qx = q.x
@@ -81,16 +83,22 @@ def __pose_to_mat(pose: Pose) -> np.array:
qz = q.z
qw = q.w
- return np.asarray([[1 - 2 * (qy * qy + qz * qz), 2 * (qx * qy - qw * qz), 2 * (qw * qy + qx * qz), t.x],
- [2 * (qx * qy + qw * qz), 1 - 2 * (qx * qx + qz * qz), 2 * (qy * qz - qw * qz), t.y],
- [2 * (qx * qz - qw * qy), 2 * (qw * qx + qy * qz), 1 - 2 * (qx * qx + qy * qy), t.z],
- [0.0, 0.0, 0.0, 1.0]])
+ return np.asarray(
+ [
+ [1 - 2 * (qy * qy + qz * qz), 2 * (qx * qy - qw * qz), 2 * (qw * qy + qx * qz), t.x],
+ [2 * (qx * qy + qw * qz), 1 - 2 * (qx * qx + qz * qz), 2 * (qy * qz - qw * qz), t.y],
+ [2 * (qx * qz - qw * qy), 2 * (qw * qx + qy * qz), 1 - 2 * (qx * qx + qy * qy), t.z],
+ [0.0, 0.0, 0.0, 1.0],
+ ]
+ )
+
-# Point-wise transform
+# Point-wise transform
def transform_p(p, trans_mat_t: np.ndarray) -> np.ndarray:
tp = np.asarray([p[0], p[1], p[2], 1.0])
return np.matmul(tp, trans_mat_t)
+
##### Stamp search #####
def __stamp_search(stamp: int, ref_df: np.ndarray, search_size: int) -> int:
left = 0
@@ -110,47 +118,46 @@ def __stamp_search(stamp: int, ref_df: np.ndarray, search_size: int) -> int:
return res
+
def __get_message_count(rosbag_path: str):
info = Info()
metadata = info.read_metadata(rosbag_path, "sqlite3")
- output = {item.topic_metadata.name : item.message_count for item in metadata.topics_with_message_count}
+ output = {
+ item.topic_metadata.name: item.message_count for item in metadata.topics_with_message_count
+ }
output["total"] = metadata.message_count
return output
-def __scan_callback(scan_tuple,
- pose_df,
- pose_df_len,
- tp_df,
- tp_df_len,
- resolution,
- callback_func =None,
- *args):
+
+def __scan_callback(
+ scan_tuple, pose_df, pose_df_len, tp_df, tp_df_len, resolution, callback_func=None, *args
+):
stamp, scan = scan_tuple
- tmp_scan = pc2.read_points(scan, skip_nans = True)
+ tmp_scan = pc2.read_points(scan, skip_nans=True)
- # Find the closest pose
+ # Find the closest pose
pid = __stamp_search(stamp, pose_df, pose_df_len)
if pid <= 0:
return
-
+
# Get the transposed transformation matrix
closest_p_t = pose_df[pid, 1].T
# Skip some 0 poses at the beginning of the rosbag
if closest_p_t[3, 0] == 0 and closest_p_t[3, 1] == 0 and closest_p_t[3, 2] == 0:
return
-
- # Find the closest tp
+
+ # Find the closest tp
tid = __stamp_search(stamp, tp_df, tp_df_len)
if tid <= 0:
return
-
+
closest_tp = tp_df[tid, 1]
-
+
# Transform the scan and find the segments that cover the transformed points
# Record the segments that cover the scan
segment_set = set()
@@ -163,38 +170,45 @@ def __scan_callback(scan_tuple,
sy = int(tp[1] / resolution) * int(resolution)
seg_idx = str(sx) + "_" + str(sy)
- segment_set.add(seg_idx)
+ segment_set.add(seg_idx)
# Update/examine the segments' avg TP
callback_func(segment_set, closest_tp, *args)
+
##### Read the input rosbag and update map's TP #####
-def collect_rosbag_tp(bag_path: str,
- pose_topic: str,
- tp_topic: str,
- scan_topic: str,
- *args):
+def collect_rosbag_tp(bag_path: str, pose_topic: str, tp_topic: str, scan_topic: str, *args):
reader = SequentialReader()
- bag_storage_options = StorageOptions(uri = bag_path, storage_id = "sqlite3")
+ bag_storage_options = StorageOptions(uri=bag_path, storage_id="sqlite3")
bag_converter_options = ConverterOptions(
- input_serialization_format = "cdr", output_serialization_format = "cdr"
+ input_serialization_format="cdr", output_serialization_format="cdr"
)
reader.open(bag_storage_options, bag_converter_options)
print("Reading rosbag...")
msg_count_by_topic = __get_message_count(bag_path)
- progress_bar = tqdm.tqdm(total = msg_count_by_topic["total"])
+ progress_bar = tqdm.tqdm(total=msg_count_by_topic["total"])
if msg_count_by_topic[pose_topic] == 0:
- print("Error: the input rosbag contains no message from the topic {0}. Exit!".format(pose_topic))
+ print(
+ "Error: the input rosbag contains no message from the topic {0}. Exit!".format(
+ pose_topic
+ )
+ )
if msg_count_by_topic[tp_topic] == 0:
- print("Error: the input rosbag contains no message from the topic {0}. Exit!".format(tp_topic))
+ print(
+ "Error: the input rosbag contains no message from the topic {0}. Exit!".format(tp_topic)
+ )
if msg_count_by_topic[scan_topic] == 0:
- print("Error: the input rosbag contains no message from the topic {0}. Exit!".format(scan_topic))
-
- pose_df = np.ndarray((msg_count_by_topic[pose_topic], 2), dtype = object)
- tp_df = np.ndarray((msg_count_by_topic[tp_topic], 2), dtype = object)
+ print(
+ "Error: the input rosbag contains no message from the topic {0}. Exit!".format(
+ scan_topic
+ )
+ )
+
+ pose_df = np.ndarray((msg_count_by_topic[pose_topic], 2), dtype=object)
+ tp_df = np.ndarray((msg_count_by_topic[tp_topic], 2), dtype=object)
pose_df_idx = 0
tp_df_idx = 0
scan_df = FixQueue()
@@ -205,7 +219,7 @@ def collect_rosbag_tp(bag_path: str,
progress_bar.update(1)
(topic, data, stamp) = reader.read_next()
- if (skip_count <= 2000):
+ if skip_count <= 2000:
skip_count += 1
continue
@@ -220,16 +234,16 @@ def collect_rosbag_tp(bag_path: str,
stamp = tp_msg.stamp.sec * 1e9 + tp_msg.stamp.nanosec
tp_df[tp_df_idx, :] = [stamp, tp_msg.data]
tp_df_idx += 1
-
+
elif topic == scan_topic:
pc_msg = deserialize_message(data, sensor_msgs.PointCloud2)
- stamp = pc_msg.header.stamp.sec * 1e9 + pc_msg.header.stamp.nanosec
-
+ stamp = pc_msg.header.stamp.sec * 1e9 + pc_msg.header.stamp.nanosec
+
scan_df.enqueue([stamp, pc_msg])
if scan_df.is_full():
scan_df.drop(False, __scan_callback, pose_df, pose_df_idx, tp_df, tp_df_idx, *args)
-
+
# Process the rest of the queue
scan_df.drop(True, __scan_callback, pose_df, pose_df_idx, tp_df, tp_df_idx, *args)
diff --git a/map/autoware_tp_collector/scripts/tp_visualizer.py b/map/autoware_tp_collector/scripts/tp_visualizer.py
index d5bf79ec..ac95c4c6 100644
--- a/map/autoware_tp_collector/scripts/tp_visualizer.py
+++ b/map/autoware_tp_collector/scripts/tp_visualizer.py
@@ -14,55 +14,56 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import os
import argparse
-from rosbag2_py import (
- SequentialReader,
- StorageOptions,
- ConverterOptions,
- SequentialWriter,
- BagMetadata,
- TopicMetadata,
- Info
-)
-
-from rclpy.serialization import deserialize_message, serialize_message
-import os
import csv
-import yaml
-from scipy import spatial as sp
-import numpy as np
-from geometry_msgs.msg import PoseWithCovarianceStamped
-from tier4_debug_msgs.msg import Float32Stamped
+import os
+import struct
+import time
+
from builtin_interfaces.msg import Time
-import pandas as pd
-import tqdm
+from geometry_msgs.msg import PoseWithCovarianceStamped
+import numpy as np
import open3d as o3d
-import sensor_msgs.msg as sensor_msgs
-import std_msgs.msg as std_msgs
-import sensor_msgs_py.point_cloud2 as pc2
+import pandas as pd
import rclpy
from rclpy.node import Node
-import time
-import struct
+from rclpy.serialization import deserialize_message
+from rclpy.serialization import serialize_message
+from rosbag2_py import BagMetadata
+from rosbag2_py import ConverterOptions
+from rosbag2_py import Info
+from rosbag2_py import SequentialReader
+from rosbag2_py import SequentialWriter
+from rosbag2_py import StorageOptions
+from rosbag2_py import TopicMetadata
+from scipy import spatial as sp
+import sensor_msgs.msg as sensor_msgs
+import sensor_msgs_py.point_cloud2 as pc2
+import std_msgs.msg as std_msgs
+from tier4_debug_msgs.msg import Float32Stamped
+import tqdm
+import yaml
+
class TPVisualizer(Node):
def __init__(self):
- super().__init__('tp_visualizer')
+ super().__init__("tp_visualizer")
self.pcd_map_dir = None
self.pcd_path = None
self.yaml_path = None
self.score_path = None
# Color based on rounded TP
- self.color = {0 : [255, 255, 255], # White
- 1 : [255, 255, 51], # Yellow
- 2 : [255, 128, 0], # Orange
- 3 : [204, 0, 0], # Dark red
- 4 : [51, 153, 255], # Blue
- 5 : [51, 153, 255],
- 6 : [51, 153, 255],
- 7 : [51, 153, 255],
- 8 : [51, 153, 255]}
+ self.color = {
+ 0: [255, 255, 255], # White
+ 1: [255, 255, 51], # Yellow
+ 2: [255, 128, 0], # Orange
+ 3: [204, 0, 0], # Dark red
+ 4: [51, 153, 255], # Blue
+ 5: [51, 153, 255],
+ 6: [51, 153, 255],
+ 7: [51, 153, 255],
+ 8: [51, 153, 255],
+ }
##### Read the YAML file to get the list of PCD segments and scores #####
def __get_pcd_segments_and_scores(self, pcd_map_dir: str):
@@ -91,15 +92,21 @@ def __show(self):
dtype = np.float32
itemsize = np.dtype(dtype).itemsize
- fields = [sensor_msgs.PointField(name = "x", offset = 0, datatype = ros_float_dtype, count = 1),
- sensor_msgs.PointField(name = "y", offset = itemsize, datatype = ros_float_dtype, count = 1),
- sensor_msgs.PointField(name = "z", offset = itemsize * 2, datatype = ros_float_dtype, count = 1),
- sensor_msgs.PointField(name = "rgba", offset = itemsize * 3, datatype = ros_uint32_dtype, count = 1)]
+ fields = [
+ sensor_msgs.PointField(name="x", offset=0, datatype=ros_float_dtype, count=1),
+ sensor_msgs.PointField(name="y", offset=itemsize, datatype=ros_float_dtype, count=1),
+ sensor_msgs.PointField(
+ name="z", offset=itemsize * 2, datatype=ros_float_dtype, count=1
+ ),
+ sensor_msgs.PointField(
+ name="rgba", offset=itemsize * 3, datatype=ros_uint32_dtype, count=1
+ ),
+ ]
points = []
pc2_width = 0
- progress_bar = tqdm.tqdm(total = len(self.segment_df))
+ progress_bar = tqdm.tqdm(total=len(self.segment_df))
origin = None
for tuple in self.segment_df.itertuples():
@@ -125,19 +132,20 @@ def __show(self):
header.frame_id = "map"
pc2_msg = pc2.create_cloud(header, fields, points)
- pcd_publisher = self.create_publisher(sensor_msgs.PointCloud2, '/autoware_tp_visualizer', 10)
+ pcd_publisher = self.create_publisher(
+ sensor_msgs.PointCloud2, "/autoware_tp_visualizer", 10
+ )
while True:
pcd_publisher.publish(pc2_msg)
time.sleep(1)
-
def __set_color_based_on_score(self, score) -> int:
r, g, b = self.color[int(score)]
a = 255
- tmp_rgb = struct.pack('BBBB', b, g, r, a)
- rgba = struct.unpack('I', tmp_rgb)[0]
+ tmp_rgb = struct.pack("BBBB", b, g, r, a)
+ rgba = struct.unpack("I", tmp_rgb)[0]
return rgba
@@ -147,6 +155,7 @@ def processing(self, pcd_map_dir: str):
# Publish to rviz
self.__show()
+
if __name__ == "__main__":
rclpy.init()
parser = argparse.ArgumentParser()