define-networks (19029B)
1 #!/usr/bin/python3 2 3 # KNOWN BUGS 4 # - can use mac addresses, which dont work for all iface types 5 # - duplicate mac addresses are ignored 6 # - only eth and tun iface types are tested 7 8 import ipaddress 9 import argparse 10 import queue 11 import subprocess 12 import re 13 import sys 14 import os 15 from typing import Optional 16 17 def eprint(*args, color=None, indents=0, **kwargs): 18 color_codes = { 19 'black': '30', 20 'red': '31', 21 'green': '32', 22 'yellow': '33', 23 'blue': '34', 24 'magenta': '35', 25 'cyan': '36', 26 'white': '37' 27 } 28 29 indent_str = ' ' * indents # 4 spaces per indent level, adjust as desired 30 31 if color in color_codes: 32 args = ['\033[' + color_codes[color] + 'm' + indent_str + arg + '\033[0m' for arg in args] 33 else: 34 args = [indent_str + arg for arg in args] 35 36 print(*args, file=sys.stderr, **kwargs) 37 38 def check_root(): 39 if os.geteuid() != 0: 40 exit("You need to have root privileges to run this script.\nPlease try again, this time using 'sudo'. Exiting.") 41 42 def is_mac_address(string: str) -> bool: 43 """Check if the string is a valid MAC address""" 44 return re.fullmatch(r"(?:[0-9a-fA-F]{2}[:-]){5}[0-9a-fA-F]{2}|[0-9a-fA-F]{12}", string) is not None 45 46 def get_interface_by_mac(mac_address: str) -> Optional[str]: 47 """Return the network interface corresponding to a given MAC address""" 48 if len(mac_address) == 12: # If MAC address has no separators 49 # Add colons to MAC address 50 mac_address = ':'.join(mac_address[i:i+2] for i in range(0, 12, 2)) 51 52 result = subprocess.run(['ifconfig', '-a'], capture_output=True, text=True) 53 interfaces = result.stdout.split('\n\n') 54 55 for interface in interfaces: 56 if mac_address in interface: 57 interface_name = re.match(r"^\w+", interface).group() 58 return interface_name.replace('Link', '') if 'Link' in interface_name else interface_name 59 60 return None 61 62 def get_network_interface(string: str) -> Optional[str]: 63 """Get the network interface given a MAC address or interface name""" 64 if is_mac_address(string): 65 mac = get_interface_by_mac(string) 66 eprint("converted " + str(string) + " to " + str(mac), indents=1) 67 return mac 68 else: 69 return string 70 71 def extract_dhcp_info(): 72 73 eprint("Running nmap to extract DHCP info...") 74 75 command = ['nmap', '-T4', '--script', 'broadcast-dhcp-discover'] 76 77 # Run the command and get the output 78 result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 79 80 # Ensure the command didn't produce an error 81 if result.returncode != 0: 82 raise Exception(f"Command failed with error {result.stderr.decode()}") 83 84 # print(result) 85 86 # Split the output into lines 87 output = result.stdout.decode('utf-8').split('\n') 88 89 servers = [] 90 91 # Define regex to match interface, server identifier and subnet mask 92 interface_regex = re.compile(r'^\|\s*Interface:\s*(.*)$') 93 server_regex = re.compile(r'^\|\s*Server Identifier:\s*(.*)$') 94 subnet_regex = re.compile(r'^\|\s*Subnet Mask:\s*(.*)$') 95 96 current_interface = None 97 current_server = None 98 99 for line in output: 100 101 # If the line contains an interface, save it 102 interface_match = interface_regex.match(line) 103 if interface_match: 104 current_interface = interface_match.group(1) 105 # eprint("found interface: " + str(current_interface)) 106 107 # If the line contains a server identifier, save it 108 server_match = server_regex.match(line) 109 if server_match: 110 current_server = server_match.group(1) 111 # eprint("found server: " + str(current_server)) 112 113 # If the line contains a subnet mask, save it and convert to CIDR 114 subnet_match = subnet_regex.match(line) 115 if subnet_match and current_interface and current_server: 116 117 subnet_mask = subnet_match.group(1) 118 cidr = ipaddress.ip_network(f"0.0.0.0/{subnet_mask}").prefixlen # Convert subnet mask to CIDR 119 # eprint("found CIDR subnet range: " + str(cidr)) 120 121 servers.append((current_interface, f"{current_server}/{cidr}")) 122 current_interface = None 123 current_server = None 124 125 # eprint("--appended new subnet entry--") 126 127 for server in servers: 128 eprint("found server: " + str(server), indents=1) 129 130 return servers 131 132 def merge_interface_groups(iface_groups, join_groups): 133 134 eprint("merging interface groups...") 135 136 merged_iface_groups = iface_groups.copy() 137 138 for _join_group in join_groups: 139 140 eprint("join group: " + str(_join_group), indents=1) 141 142 join_group = _join_group.split() 143 144 for i in range(len(join_group)): 145 join_group[i] = get_network_interface(join_group[i]) 146 147 indices_to_merge = [] 148 149 # Find the indices of the groups that contain interfaces from the join_group 150 for i, group in enumerate(merged_iface_groups): 151 if any(iface in group for iface in join_group): 152 indices_to_merge.append(i) 153 154 # Merge the groups together 155 if indices_to_merge: 156 merged_group = [] 157 for index in indices_to_merge: 158 merged_group.extend(merged_iface_groups[index]) 159 merged_group = list(set(merged_group)) # remove duplicates 160 161 eprint(f"Merging groups at indices {indices_to_merge} into {merged_group}", indents=1) 162 163 # Remove the old groups and add the merged group 164 indices_to_merge.sort(reverse=True) 165 for index in indices_to_merge: 166 del merged_iface_groups[index] 167 merged_iface_groups.append(merged_group) 168 169 return merged_iface_groups 170 171 def get_iface_groups(iface_types, timelimit): 172 173 eprint("running iface-groups to get interface groups...") 174 175 eprint("given iface types: " + str(iface_types), indents=1) 176 eprint("time limit: " + str(timelimit), indents=1) 177 178 cmd = ['/bin/iface-groups'] 179 180 for iface_type in iface_types: 181 cmd.extend(['-i', iface_type]) 182 183 cmd.extend(['-t', str(timelimit)]) 184 185 output = subprocess.run(cmd, capture_output=True, text=True) 186 187 if output.returncode != 0: 188 eprint('Error running iface-groups') 189 return 1 190 191 interface_groups = [] 192 for line in output.stdout.splitlines(): 193 interface_group = line.split() 194 interface_groups.append(interface_group) 195 196 eprint("interface group: " + str(interface_group), indents=1) 197 198 return interface_groups 199 200 def assign_discovered_subnets(dhcp_info, iface_groups): 201 202 eprint("assigning discovered subnets...") 203 204 assigned_subnets = [] 205 206 for iface_group in iface_groups: 207 208 subnet_found = False 209 210 for _iface in iface_group: 211 212 # convert MAC to iface 213 iface = get_network_interface(_iface) 214 215 subnet = next((s for i, s in dhcp_info if i == iface), None) 216 217 if subnet is not None: 218 assigned_subnets.append([iface_group, subnet, 'discovered']) 219 subnet_found = True 220 221 eprint("ASSIGNED: " + str(assigned_subnets[-1]), color='green') 222 break 223 224 if not subnet_found: 225 226 assigned_subnets.append([iface_group, None, 'none']) 227 228 # assigned_subnet_overlap(assigned_subnets, '192.168.0.0/8') 229 230 return assigned_subnets 231 232 def assigned_subnet_overlap(assigned_subnets, new_subnet): 233 234 eprint("checking if " + str(new_subnet) + " overlaps with existing subnets...", indents=1) 235 236 new_subnet = ipaddress.IPv4Network(str(new_subnet), False) 237 238 # [[['enp0s20f0u3', 'enp0s31f6'], '192.168.1.1/24', 'discovered'], [['eth0'], None, 'none']] 239 for assigned_subnet in assigned_subnets: 240 241 a_subnet = assigned_subnet[1] 242 243 if a_subnet is not None: 244 if new_subnet.overlaps(ipaddress.IPv4Network(str(a_subnet), False)): 245 246 eprint("subnet " + str(new_subnet) + " overlaps existing subnet " + str(a_subnet), indents=2) 247 return False 248 249 return True 250 251 def assign_manual_subnets(assigned_subnets, manual_subnets): 252 253 eprint("Manually assigning subnets...") 254 255 if manual_subnets is None: 256 eprint("No manual subnets provided", indents=1) 257 return 258 259 for _m_iface, m_subnet in manual_subnets: 260 261 m_iface = get_network_interface(_m_iface) 262 263 if m_subnet.isdigit(): 264 eprint('manual pair (' + str(m_iface) + ', ' + str(m_subnet) + ') will be used for dynamic generation', indents=1) 265 continue 266 267 # we assign manual subnets in order of precedence 268 if not assigned_subnet_overlap(assigned_subnets, m_subnet): 269 continue 270 271 # [[['enp0s20f0u3', 'enp0s31f6'], '192.168.1.1/24', 'discovered'], [['eth0'], None, 'none']] 272 for index, assigned_subnet in enumerate(assigned_subnets): 273 274 a_group = assigned_subnet[0] 275 a_subnet = assigned_subnet[1] 276 277 if m_iface in a_group: 278 279 eprint('found ' + str(m_iface) + ' in ' + str(a_group) + ' with subnet ' + str(a_subnet), indents=1) 280 281 if a_subnet is None: 282 283 eprint("Manually assigning " + str(m_iface) + ' the subnet ' + str(m_subnet), indents=1) 284 285 assigned_subnets[index][1] = m_subnet 286 assigned_subnets[index][2] = 'manual' 287 288 eprint("ASSIGNED: " + str(assigned_subnets[index]), color='green') 289 290 else: 291 292 eprint("DHCP server already exists, ignoring manual assignment", color='red', indents=1) 293 294 break 295 296 def possible_subnets(main, taken): 297 298 # eprint("searching for possible subnets within " + str(main) + "...", indents=2) 299 300 # we assume no subnets are available intially 301 available = [] 302 q = queue.Queue() 303 304 # add first node for expansion in the BFS process 305 q.put(main) 306 307 while q.qsize() > 0: 308 subnet = q.get() 309 has_overlap = False 310 311 if taken: 312 313 for taken_subnet in taken: 314 315 if subnet.overlaps(taken_subnet): 316 # still has overlaps somewhere in children, keep expanding 317 318 if subnet.prefixlen < 31: # avoid adding /32 and /31 subnets to the queue 319 for sub_subnet in subnet.subnets(): 320 q.put(sub_subnet) 321 has_overlap = True 322 break 323 324 if not has_overlap: 325 # no overlaps with taken - this subnet is entirely available 326 available.append(subnet) 327 else: 328 # if no subnets are taken, all subnets are available 329 available.append(subnet) 330 331 return available 332 333 def get_new_subnet(available_subnets, taken_subnets, desired_subnet_size): 334 335 eprint("generating a list of subnets already in use...", indents=1) 336 337 if taken_subnets: 338 for taken_subnet in taken_subnets: 339 eprint("unavailable subnet: " + str(taken_subnet), indents=2) 340 else: 341 eprint("no known subnets are in use", indents=2) 342 343 available_subnets_complete = [] 344 345 eprint("generating a sorted list of available subnets, after subtracting unavailable ranges...", indents=1) 346 347 for available_supernet in available_subnets: 348 eprint("candidate supernet: " + str(available_supernet), indents=2) 349 350 for available_subnet in available_subnets: 351 available_subnets_complete.extend(possible_subnets(available_subnet, taken_subnets)) 352 353 # now we have an exhaustive list of the known free space 354 355 sorted_subnets = sorted(available_subnets_complete, key=lambda subnet: subnet.prefixlen, reverse=True) 356 357 # find the smallest subnet that fits our needs 358 359 matching_subnet = [] 360 361 eprint("searching available subnets for smallest that is at least a /" + str(desired_subnet_size) + " ...", indents=1) 362 363 for subnet in sorted_subnets: 364 365 eprint("candidate subnet: " + str(subnet), indents=2) 366 367 if int(subnet.prefixlen) <= int(desired_subnet_size): 368 matching_subnet = subnet 369 # eprint("subnet is suitable: " + str(subnet)) 370 break 371 372 if not matching_subnet: 373 eprint("no matching subnet found", indents=3) 374 return 1 375 else: 376 resized = next(matching_subnet.subnets(new_prefix=int(desired_subnet_size))) 377 eprint("available subnet found: " + str(resized), indents=3) 378 return resized 379 380 def assign_dynamic_subnets(assigned_subnets, manual_subnets, subnet_pool, default_subnet_size): 381 382 eprint("dynamically assigning subnets...") 383 384 existing_subnets = [ipaddress.IPv4Network(subnet, False) for _, subnet, _ in assigned_subnets if subnet] 385 386 # Create a dictionary mapping interfaces to preferred subnet sizes for easy lookup 387 preferred_sizes = {} 388 389 manual_subnets = manual_subnets if manual_subnets is not None else [] 390 391 for _iface, size in manual_subnets: 392 393 iface = get_network_interface(_iface) 394 395 if size.isdigit(): 396 preferred_sizes[iface] = int(size) 397 398 # Iterate over assigned_subnets looking for interface groups without assigned subnets 399 for index, (iface_group, assigned_subnet, method) in enumerate(assigned_subnets): 400 401 if assigned_subnet is not None: 402 continue # Skip interface groups that already have a subnet assigned 403 404 # Get the preferred subnet size for the first interface in the group that has a preference, 405 # or use the default subnet size if no preference is found 406 subnet_size = default_subnet_size 407 for _iface in iface_group: 408 409 iface = get_network_interface(_iface) 410 411 if iface in preferred_sizes: 412 subnet_size = preferred_sizes[iface] 413 break 414 415 eprint("Finding a /" + str(subnet_size) + " subnet for interface group " + str(iface_group), indents=1) 416 417 # Create a list to hold the generated subnets 418 generated_subnets = [] 419 420 # Generate all possible subnets of the required size from the subnet pool 421 for pool_subnet in subnet_pool: 422 pool_subnet = ipaddress.IPv4Network(pool_subnet, False) # ensure pool_subnet is an IPv4Network object 423 generated_subnets.extend(possible_subnets(pool_subnet, [ipaddress.IPv4Network(s[1], False) for s in assigned_subnets if s[1] is not None])) 424 425 new_subnet = get_new_subnet(generated_subnets, existing_subnets, subnet_size) 426 427 if new_subnet: 428 429 eprint(f"assigning {new_subnet} to {iface_group}", indents=1) 430 existing_subnets.append(new_subnet) 431 assigned_subnets[index][1] = str(new_subnet) 432 assigned_subnets[index][2] = 'dynamic' 433 434 eprint("ASSIGNED: " + str(assigned_subnets[index]), color='green') 435 # continue 436 437 else: 438 eprint(f"No available subnets for {iface_group}", indents=1) 439 440 def find_and_assign_subnets(iface_types, timelimit, subnet_pool, manual_subnets, default_subnet_size, join_groups): 441 442 eprint("Finding and assigning subnets...", color='green') 443 444 iface_groups = get_iface_groups(iface_types, timelimit) 445 446 # eprint("iface groups: " + str(iface_groups)) 447 448 if join_groups: 449 iface_groups = merge_interface_groups(iface_groups, join_groups) 450 451 dhcp_info = extract_dhcp_info() 452 453 assigned_subnets = assign_discovered_subnets(dhcp_info, iface_groups) 454 455 # eprint("dhcp results: " + str(dhcp_info), indents=1) 456 # eprint("RAW GROUPS > " + str(iface_groups)) 457 # eprint("ASSIGNED: " + str(assigned_subnets), color='green') 458 # eprint("MANUAL SUBNETS > " + str(manual_subnets)) 459 # eprint("IFACE GROUPS > " + str(iface_groups)) 460 461 assign_manual_subnets(assigned_subnets, manual_subnets) 462 463 # eprint("ASSIGNED: " + str(assigned_subnets), color='green') 464 465 assign_dynamic_subnets(assigned_subnets, manual_subnets, subnet_pool, default_subnet_size) 466 467 # eprint("ASSIGNED: " + str(assigned_subnets), color='green') 468 469 eprint("done finding and assigning subnets", color='green') 470 471 return assigned_subnets 472 473 def valid_cidr(string): 474 try: 475 ipaddress.ip_network(string) 476 return string 477 except ValueError: 478 msg = "%r is not a valid CIDR block" % string 479 raise argparse.ArgumentTypeError(msg) 480 481 def valid_pair(string): 482 parts = string.split() 483 if len(parts) != 2: 484 msg = "%r is not a valid interface-subnet pair" % string 485 raise argparse.ArgumentTypeError(msg) 486 iface, cidr = parts 487 488 # Check if cidr is a valid IP network 489 try: 490 ipaddress.ip_network(cidr) 491 return iface, cidr 492 except ValueError: 493 pass 494 495 # Check if cidr is a valid digit between 32 and 0 496 try: 497 cidr_value = int(cidr) 498 if 0 <= cidr_value <= 32: 499 return iface, cidr 500 else: 501 msg = "%r in the pair is not a valid CIDR block or digit" % cidr 502 raise argparse.ArgumentTypeError(msg) 503 except ValueError: 504 pass 505 506 msg = "%r in the pair is not a valid CIDR block or digit" % cidr 507 raise argparse.ArgumentTypeError(msg) 508 509 def main(args): 510 511 parser = argparse.ArgumentParser(description="Find and assign subnets") 512 513 parser.add_argument('-i', '--interfaces', nargs='+', required=True, 514 choices=['eth', 'wlan', 'bridge', 'vlan', 'bond', 'tap', 'dummy', 'ppp', 'ipip', 515 'ib', 'ibchild', 516 'ip6tnl', 'lo', 'sit', 'gre', 'irda', 'wlan_aux', 'tun', 'isdn', 'mip6mnha'], 517 help="Network interfaces") 518 parser.add_argument('-t', '--timeout', type=int, required=True, help="Timeout period") 519 parser.add_argument('-s', '--subnets', nargs='+', required=True, type=valid_cidr, help="Subnet pools in CIDR format") 520 parser.add_argument('-m', '--manual', nargs='+', type=valid_pair, help="Manually assigned interface-subnet pairs") 521 parser.add_argument('-d', '--subnetsize', type=int, default=24, help="Default size for dynamically generated subnets") 522 parser.add_argument('-j', '--join', nargs='+', type=str, help="Specify interfaces to be assumed on the same L2 network") 523 524 # TODO (later - important ones are done) 525 # define a default subnet size associated with an interface: -d 'eth0 21' 'eth5 16' 526 # exclude interfaces and their interface groups: -X 'eth0 eth4' 527 # exclude interfaces but not their interface groups: -x 'eth2 eth3' 528 # only use these interfaces, and their interface groups: -O 'eth1 eth8' 529 # only use these interfaces, not even others in their iface group: -o 'eth2 eth4' 530 531 args = parser.parse_args(args) 532 533 check_root() 534 535 results = find_and_assign_subnets(args.interfaces, args.timeout, args.subnets, args.manual, args.subnetsize, args.join) 536 537 for result in results: 538 print('ifaces: ' + str(' '.join(result[0])) + ' status: ' + str(result[2]) + ' subnet: ' + str(result[1])) 539 540 if __name__ == "__main__": 541 main(sys.argv[1:])