-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.py
101 lines (83 loc) · 3.71 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#
# Click server in Python
# Binds REP socket to tcp://*:5555
# Expects Message from client, replies with Message
#
import os
import signal
from argparse import ArgumentParser
from pclick import Server, MessageFactory, ControlMessageType, HandshakeInitMessageType, ValueType
from pclick.server import SizeCollectorChanges
def parse_args():
parser = ArgumentParser(description='Demo client connecting to click server')
parser.add_argument('--host', metavar='<host>', type=str, default="*",
help=f'server to connect to, default is *')
parser.add_argument('--port', metavar='<port>', type=str, default="5555",
help=f'port to connect to, default is 5555')
parser.add_argument('--addr', metavar='<addr>', type=str, default="",
help=f'set addr. Ie ipc:///tmp/click.ipc. host and port will be ignored')
parser.add_argument('--trace', action='store_true',
help=f'print what is sent/received')
parser.add_argument('--trace-sizes', action='store_true',
help=f'print size of what is sent/received')
return parser.parse_args()
def handler(signum, frame):
os._exit(0)
def main():
signal.signal(signal.SIGINT, handler)
args = parse_args()
addr = f"tcp://{args.host}:{args.port}"
if args.addr:
addr = args.addr
server = Server(addr)
if args.trace_sizes:
server.size_collector = SizeCollectorChanges()
# Note: Below code uses the protobuf API directly, a future version might wrap this in a higher level API.
while True:
request = server.recv()
if request is None:
continue
if args.trace:
print(f"Received request: {request}")
if request.messageType == HandshakeInitMessageType:
response = handshake_message()
if args.trace:
print(f"Sending handshake: {response}")
server.send(response)
if request.messageType == ControlMessageType:
response = sensor_message()
if args.trace:
print(f"Sending sensormessage: {response}")
server.send(response)
if args.trace_sizes and server.size_collector.is_updated:
print(f"Received {server.size_collector.recv_size} from client, Sent {server.size_collector.send_size} bytes to client")
def handshake_message():
handshake = MessageFactory.create_handshake()
handshake.controlType = ValueType.Force3D
object = handshake.objects["robot"]
object.controlsInOrder.extend(["joint1", "joint2"])
object.controlTypesInOrder.extend([ValueType.Angle, ValueType.Angle])
object.jointSensorsInOrder.extend(["joint1", "joint2"])
object.controlEvents["gripper"] = ValueType.Activated
object.jointSensors.extend([ValueType.Angle, ValueType.AngularVelocity1D, ValueType.Torque1D])
object.objectSensors.append(ValueType.Position)
object.sensors["external_1"].types.extend([ValueType.Force3D, ValueType.AngularAcceleration3D])
return handshake
def sensor_message():
sensor_m = MessageFactory.create_sensormessage()
robot = sensor_m.objects["robot1"]
size = 2
robot.angleSensors.extend([1.0] * size)
robot.angularVelocitySensors.extend([2.0] * size)
robot.torqueSensors.extend([3.0] * size)
box = sensor_m.objects["box"]
sensor = box.objectSensors.add()
sensor.position.arr.extend([1.0, 2.0, 3.0])
sensor = box.objectSensors.add()
sensor.rpy.arr.extend([4.0, 5.0, 6.0])
val = robot.sensors["external_1"].sensor.add()
val.force3d.arr.extend([4.0, 4.1, 4.2])
val = robot.sensors["external_1"].sensor.add()
val.angularAcceleration3d.arr.extend([5.0, 5.1, 5.2])
return sensor_m
main()