-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathg1chat_node.py
More file actions
124 lines (94 loc) · 3.97 KB
/
Copy pathg1chat_node.py
File metadata and controls
124 lines (94 loc) · 3.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#!/usr/bin/env python3
"""g1chat ROS2 节点
- 启动底层 G1Chat 服务(等价于 g1.py, 启动后会一直运行直到进程退出)
- 将 G1Chat 的 text_queue 映射为 ROS2 topic:
- qa: 发布 user/assistant 文本对话,形如 "user:...", "assistant:..."
- location:发布位置信息,形如 "location:{...json...}"
- signal: 发布内部控制信号,形如 "signal:some_signal_name"
- 将 G1Chat 的 control_queue 映射为 ROS2 订阅 topic:
- control: 订阅控制信号字符串,放入 control_queue
使用前请先安装 g1chat 的 whl 包,并在同一环境中运行。
"""
import threading
from queue import Empty
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from .g1 import G1Chat
class G1ChatNode(Node):
"""封装 G1Chat 的 ROS2 节点."""
def __init__(self) -> None:
super().__init__("g1chat_node")
# 创建底层 G1Chat 实例
self._chat = G1Chat()
# 发布者:qa / location / signal
self._qa_pub = self.create_publisher(String, "qa", 10)
self._location_pub = self.create_publisher(String, "location", 10)
self._signal_pub = self.create_publisher(String, "signal", 10)
# 订阅者:control
self._control_sub = self.create_subscription(String, "control", self._on_control_msg, 10)
# 启动底层 G1Chat 服务(在独立线程中运行 asyncio 循环)
self._chat_thread = threading.Thread(target=self._run_chat_loop, daemon=True)
self._chat_thread.start()
# 启动队列 -> ROS topic 的桥接线程
self._bridge_thread = threading.Thread(target=self._bridge_text_queue, daemon=True)
self._bridge_thread.start()
self.get_logger().info("g1chat_node 已启动:G1Chat 服务运行中,队列与 ROS topic 已建立映射。")
# -------------------- G1Chat 启动 --------------------
def _run_chat_loop(self) -> None:
"""在独立线程中以 g1.py 的方式运行 G1Chat(直到进程结束)"""
import asyncio
async def _main():
await self._chat.start()
try:
# 等价于 g1.py 中的 asyncio.Future(),一直运行直到进程退出
await asyncio.Future()
except asyncio.CancelledError:
pass
finally:
await self._chat.stop()
asyncio.run(_main())
# -------------------- ROS <-> G1Chat 桥接 --------------------
def _bridge_text_queue(self) -> None:
"""持续从 G1Chat.text_queue 取数据并发布到对应 topic."""
q = self._chat.text_queue
while rclpy.ok():
try:
item = q.get(timeout=0.1)
except Empty:
continue
if not isinstance(item, str) or not item:
continue
# 形如 "user:xxx" / "assistant:yyy" / "location:{...}" / "some_signal"
if item.startswith("user:") or item.startswith("assistant:"):
msg = String()
msg.data = item
self._qa_pub.publish(msg)
continue
if item.startswith("location:"):
msg = String()
msg.data = item[len("location:") :].strip()
self._location_pub.publish(msg)
continue
# 其它非空字符串统一视为 signal
msg = String()
msg.data = item
self._signal_pub.publish(msg)
def _on_control_msg(self, msg: String) -> None:
"""订阅 control topic, 将消息内容写入 G1Chat.control_queue."""
control_signal = msg.data.strip()
if not control_signal:
return
self._chat.control_queue.put(control_signal)
def main(args=None) -> None:
rclpy.init(args=args)
node = G1ChatNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()