import roslibpy import math import time # This script connects to a ROS 2 robot (here, turtlesim) using the rosbridge protocol. # In a 1st terminal, run: #sudo apt install ros-humble-rosbridge-suite #ros2 launch rosbridge_server rosbridge_websocket_launch.xml # In a 2nd terminal, run: #ros2 launch turtlesim multisim.launch.py # In another terminal possibly from another computer with only python and no full ROS 2 installation (change IP address below): #pip install roslibpy #python roslib_sample_ros2.py def pose_callback(message): x = message.get('x', None) y = message.get('y', None) theta = message.get('theta', None) print(f"[POSE] x: {x:.2f}\ty: {y:.2f}\ttheta: {theta:.2f}") # Global publisher variable to be used within callbacks cmd_vel_topic = None def start_motors(): global cmd_vel_topic msg = roslibpy.Message({ 'linear': {'x': 0.5, 'y': 0.0, 'z': 0.0}, 'angular': {'x': 0.0, 'y': 0.0, 'z': 0.2} }) cmd_vel_topic.publish(msg) print("[CMD_VEL] Published start command.") def stop_motors(): global cmd_vel_topic msg = roslibpy.Message({ 'linear': {'x': 0.0, 'y': 0.0, 'z': 0.0}, 'angular': {'x': 0.0, 'y': 0.0, 'z': 0.0} }) cmd_vel_topic.publish(msg) print("[CMD_VEL] Published stop command.") def main(): print("Please change the IP address in the code and check your network configuration if needed!") # Create the connection to rosbridge (update with your robot's IP and port) ros = roslibpy.Ros(host='127.0.0.1', port=9090) # Create subscribers for sensor topics pose_topic = roslibpy.Topic(ros, '/turtlesim1/turtle1/pose', 'turtlesim/msg/Pose') pose_topic.subscribe(pose_callback) # Create a publisher for the motor command /cmd_vel topic global cmd_vel_topic cmd_vel_topic = roslibpy.Topic(ros, '/turtlesim1/turtle1/cmd_vel', 'geometry_msgs/msg/Twist') # Run the connection ros.run() print("Listening for Pose events. Press CTRL+C to exit.") try: while True: start_motors() # Idle loop keeps the script running. time.sleep(1) except KeyboardInterrupt: print("CTRL+C pressed. Shutting down...") finally: stop_motors() time.sleep(1) # Clean up: unsubscribe and end the connection. pose_topic.unsubscribe() cmd_vel_topic.unadvertise() ros.terminate() if __name__ == '__main__': main()