forked from labgrid-project/labgrid
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlabgrid-bound-connect
executable file
·81 lines (67 loc) · 2.22 KB
/
labgrid-bound-connect
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#!/usr/bin/python3
# This is intended to be used via sudo. For example, add via visudo:
# %developers ALL = NOPASSWD: /usr/local/sbin/labgrid-bound-connect
import argparse
import os
import sys
import ipaddress
import subprocess
def main(ifname, address, port):
if not ifname:
raise ValueError("Empty interface name.")
if any((c == "/" or c.isspace()) for c in ifname):
raise ValueError(f"Interface name '{ifname}' contains invalid characters.")
if len(ifname) > 16:
raise ValueError(f"Interface name '{ifname}' is too long.")
address = ipaddress.ip_address(address)
if not 0 < port < 0xFFFF:
raise ValueError(f"Invalid port '{port}'.")
args = [
"socat",
"STDIO",
]
if address.version == 4:
prefix = f"TCP4:{address}:{port}"
elif address.version == 6:
prefix = f"TCP6:[{address}]:{port}"
else:
raise RuntimeError(f"Invalid IP version '{address.version}'")
# Delete the IP lookup cache for the address in case it is stale
subprocess.run(["ip", "neigh", "del", str(address), "dev", ifname],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
args.append(','.join([
prefix,
f'so-bindtodevice={ifname}',
'connect-timeout=15',
'keepalive',
'keepcnt=3',
'keepidle=15',
'keepintvl=15',
'nodelay',
]))
try:
os.execvp(args[0], args)
except FileNotFoundError as e:
raise RuntimeError("Missing socat binary") from e
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
'-d',
'--debug',
action='store_true',
default=False,
help="enable debug mode"
)
parser.add_argument('interface', type=str, help='interface name')
parser.add_argument('address', type=str, help='destination IP address')
parser.add_argument('port', type=int, help='destination TCP port')
args = parser.parse_args()
try:
main(args.interface, args.address, args.port)
except Exception as e: # pylint: disable=broad-except
if args.debug:
import traceback
traceback.print_exc()
print(f"ERROR: {e}", file=sys.stderr)