Skip to content

Commit

Permalink
refactor: text hmi and implement hmi task as ros2 action (#176)
Browse files Browse the repository at this point in the history
Signed-off-by: Bartłomiej Boczek <[email protected]>
  • Loading branch information
boczekbartek authored and maciejmajek committed Sep 9, 2024
1 parent f5245bf commit a01a8e4
Show file tree
Hide file tree
Showing 8 changed files with 651 additions and 277 deletions.
82 changes: 44 additions & 38 deletions src/rai/rai/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
from collections import deque
from dataclasses import dataclass, field
from pprint import pformat
from queue import Queue
from threading import Thread
from typing import Any, Callable, Deque, Dict, List, Literal, Optional, Tuple

import rcl_interfaces.msg
Expand All @@ -30,13 +28,13 @@
import rclpy.subscription
import rclpy.task
import sensor_msgs.msg
import std_msgs.msg
from langchain.tools import tool
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langgraph.graph.graph import CompiledGraph
from rclpy.action.graph import get_action_names_and_types
from rclpy.action.server import ActionServer
from rclpy.node import Node
from rclpy.qos import (
DurabilityPolicy,
Expand All @@ -51,6 +49,7 @@
from rai.messages.multimodal import HumanMultimodalMessage
from rai.tools.ros.utils import convert_ros_img_to_base64, import_message_from_str
from rai.tools.utils import wait_for_message
from rai_interfaces.action import Task as TaskAction


class RosoutBuffer:
Expand Down Expand Up @@ -338,7 +337,6 @@ def __init__(

# ---------- ROS configuration ----------

self.initialize_task_subscriber()
self.rosout_sub = self.create_subscription(
rcl_interfaces.msg.Log,
"/rosout",
Expand All @@ -347,50 +345,62 @@ def __init__(
qos_profile=self.qos_profile,
)

# ---------- Task Queue ----------
self.task_action_server = ActionServer(
self, TaskAction, "perform_task", self.agent_loop
)

# ---------- LLM Agents ----------
self.AGENT_RECURSION_LIMIT = 100
self.llm_app: CompiledGraph = None

self.task_queue = Queue()
self.agent_loop_thread = Thread(target=self.agent_loop)
self.agent_loop_thread.start()
# self.task_queue = Queue()
# self.agent_loop_thread = Thread(target=self.agent_loop)
# self.agent_loop_thread.start()

def agent_loop(self, goal_handle: TaskAction.Goal):
self.get_logger().info(f"Received goal handle: {goal_handle}")
action_request = goal_handle.request
task = dict(
task=action_request.task,
description=action_request.description,
priority=action_request.priority,
)
self.get_logger().info(f"Received task: {task}")

# ---- LLM Task Handling ----
messages = [
SystemMessage(content=self.system_prompt),
HumanMessage(content=f"Task: {task}"),
]

def agent_loop(self):
while True:
if self.task_queue.empty():
time.sleep(0.1)
continue
data = self.task_queue.get()
self.get_logger().info(f"Agent loop consuming task: {data}")
payload = State(messages=messages)

messages = [
SystemMessage(content=self.system_prompt),
HumanMessage(content=f"Task: {data}"),
]
state: State = self.llm_app.invoke(
payload, {"recursion_limit": self.AGENT_RECURSION_LIMIT}
) # type: ignore

payload = State(messages=messages)
# ---- Share Action feedback ----
# TODO(boczekbartek): add graph node to langgraph which will send ros2 action feedback to HMI

state: State = self.llm_app.invoke(
payload, {"recursion_limit": self.AGENT_RECURSION_LIMIT}
) # type: ignore
# ---- Share Action Result ----
report = state["messages"][-1]
report = pformat(report.json())

report = state["messages"][-1]
result = TaskAction.Result()
result.success = (
True # TODO(boczekbartek): ask llm if the action has been successful
)
result.report = report

self.get_logger().info(f"Finished task:\n{report}")
self.clear_state()

report = pformat(report.json())
self.get_logger().info(f"Finished task:\n{report}")
self.clear_state()
return report

def set_app(self, app: CompiledGraph):
self.llm_app = app

def initialize_task_subscriber(self):
self.task_sub = self.create_subscription(
std_msgs.msg.String,
self.task_topic,
callback=self.task_callback,
qos_profile=self.qos_profile,
)

def get_robot_state(self) -> Dict[str, str]:
state_dict = dict()
for t in self.state_subscribers:
Expand All @@ -412,10 +422,6 @@ def get_robot_state(self) -> Dict[str, str]:
self.get_logger().info(f"{state_dict=}")
return state_dict

def task_callback(self, msg: std_msgs.msg.String):
self.get_logger().info(f"Received task: {msg.data}")
self.task_queue.put(msg.data)

def clear_state(self):
self.rosout_buffer.clear()

Expand Down
4 changes: 3 additions & 1 deletion src/rai/rai/tools/ros/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ def _run(self):

class Ros2GetRobotInterfaces(Ros2BaseTool):
name: str = "ros2_robot_interfaces"
description: str = "A tool for getting all ros2 robot interfaces"
description: str = (
"A tool for getting all ros2 robot interfaces: topics, services and actions"
)

def _run(self):
return self.node.ros_discovery_info.dict()
Expand Down
82 changes: 0 additions & 82 deletions src/rai_hmi/rai_hmi/action_handler_mixin.py

This file was deleted.

Loading

0 comments on commit a01a8e4

Please sign in to comment.