-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathserial_proxy.py
64 lines (49 loc) · 1.87 KB
/
serial_proxy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import asyncio
import serial.aio
import serial
import typing
import logging
import binascii
loop = asyncio.get_event_loop()
class SocketForwardingProtocol(asyncio.Protocol):
def __init__(self, tty):
self.peer = tty # type: ProxyConnection
self.transport = None # type: asyncio.Transport
def data_received(self, data):
if not self.peer:
logging.error("Peer is None, wtf?")
return
if self.peer.transport is None:
logging.error("Peer not ready")
return
logging.warning('cli->dev: %s', binascii.hexlify(data).decode())
self.peer.transport.write(data)
def connection_made(self, transport):
self.transport = transport
class ProxyConnection(asyncio.Protocol):
def __init__(self):
self.socket = None # type: SocketForwardingProtocol
self.transport = serial.aio.SerialTransport(loop, self, serial.Serial('/dev/ttyS0', baudrate=38400))
def data_received(self, data):
super().data_received(data)
logging.warning("dev->cli: %s", binascii.hexlify(data).decode())
if self.socket and self.socket.transport:
self.socket.transport.write(data)
else:
logging.warning("No active socket connection")
def socket_factory(self):
logging.warning("Got new connection")
if self.socket:
logging.warning("Closing existing client socket")
self.socket.transport.close()
self.socket.peer = None
self.socket = SocketForwardingProtocol(self)
return self.socket
def main():
logging.basicConfig(format='[%(asctime)s - %(message)s')
pc = ProxyConnection()
server = loop.create_server(pc.socket_factory, host='0.0.0.0', port=9999, reuse_address=True)
asyncio.ensure_future(server)
loop.run_forever()
if __name__ == '__main__':
main()