‹ projects

netscan

tool for scanning networks and calculating available subnets
Log | Files | Refs | README

define-networks (19029B)


      1 #!/usr/bin/python3
      2 
      3 # KNOWN BUGS
      4 # - can use mac addresses, which dont work for all iface types
      5 # - duplicate mac addresses are ignored
      6 # - only eth and tun iface types are tested
      7 
      8 import ipaddress
      9 import argparse
     10 import queue
     11 import subprocess
     12 import re
     13 import sys
     14 import os
     15 from typing import Optional
     16 
     17 def eprint(*args, color=None, indents=0, **kwargs):
     18     color_codes = {
     19         'black': '30',
     20         'red': '31',
     21         'green': '32',
     22         'yellow': '33',
     23         'blue': '34',
     24         'magenta': '35',
     25         'cyan': '36',
     26         'white': '37'
     27     }
     28 
     29     indent_str = '    ' * indents  # 4 spaces per indent level, adjust as desired
     30 
     31     if color in color_codes:
     32         args = ['\033[' + color_codes[color] + 'm' + indent_str + arg + '\033[0m' for arg in args]
     33     else:
     34         args = [indent_str + arg for arg in args]
     35 
     36     print(*args, file=sys.stderr, **kwargs)
     37 
     38 def check_root():
     39     if os.geteuid() != 0:
     40         exit("You need to have root privileges to run this script.\nPlease try again, this time using 'sudo'. Exiting.")
     41 
     42 def is_mac_address(string: str) -> bool:
     43     """Check if the string is a valid MAC address"""
     44     return re.fullmatch(r"(?:[0-9a-fA-F]{2}[:-]){5}[0-9a-fA-F]{2}|[0-9a-fA-F]{12}", string) is not None
     45 
     46 def get_interface_by_mac(mac_address: str) -> Optional[str]:
     47     """Return the network interface corresponding to a given MAC address"""
     48     if len(mac_address) == 12:  # If MAC address has no separators
     49         # Add colons to MAC address
     50         mac_address = ':'.join(mac_address[i:i+2] for i in range(0, 12, 2))
     51     
     52     result = subprocess.run(['ifconfig', '-a'], capture_output=True, text=True)
     53     interfaces = result.stdout.split('\n\n')
     54 
     55     for interface in interfaces:
     56         if mac_address in interface:
     57             interface_name = re.match(r"^\w+", interface).group()
     58             return interface_name.replace('Link', '') if 'Link' in interface_name else interface_name
     59 
     60     return None
     61 
     62 def get_network_interface(string: str) -> Optional[str]:
     63     """Get the network interface given a MAC address or interface name"""
     64     if is_mac_address(string):
     65         mac = get_interface_by_mac(string)
     66         eprint("converted " + str(string) + " to " + str(mac), indents=1)
     67         return mac
     68     else:
     69         return string
     70 
     71 def extract_dhcp_info():
     72 
     73     eprint("Running nmap to extract DHCP info...")
     74 
     75     command = ['nmap', '-T4', '--script', 'broadcast-dhcp-discover']
     76 
     77     # Run the command and get the output
     78     result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
     79 
     80     # Ensure the command didn't produce an error
     81     if result.returncode != 0:
     82         raise Exception(f"Command failed with error {result.stderr.decode()}")
     83 
     84     # print(result)
     85 
     86     # Split the output into lines
     87     output = result.stdout.decode('utf-8').split('\n')
     88 
     89     servers = []
     90 
     91     # Define regex to match interface, server identifier and subnet mask
     92     interface_regex = re.compile(r'^\|\s*Interface:\s*(.*)$')
     93     server_regex = re.compile(r'^\|\s*Server Identifier:\s*(.*)$')
     94     subnet_regex = re.compile(r'^\|\s*Subnet Mask:\s*(.*)$')
     95 
     96     current_interface = None
     97     current_server = None
     98 
     99     for line in output:
    100 
    101         # If the line contains an interface, save it
    102         interface_match = interface_regex.match(line)
    103         if interface_match:
    104             current_interface = interface_match.group(1)
    105             # eprint("found interface: " + str(current_interface))
    106 
    107         # If the line contains a server identifier, save it
    108         server_match = server_regex.match(line)
    109         if server_match:
    110             current_server = server_match.group(1)
    111             # eprint("found server: " + str(current_server))
    112 
    113         # If the line contains a subnet mask, save it and convert to CIDR
    114         subnet_match = subnet_regex.match(line)
    115         if subnet_match and current_interface and current_server:
    116 
    117             subnet_mask = subnet_match.group(1)
    118             cidr = ipaddress.ip_network(f"0.0.0.0/{subnet_mask}").prefixlen  # Convert subnet mask to CIDR
    119             # eprint("found CIDR subnet range: " + str(cidr))
    120 
    121             servers.append((current_interface, f"{current_server}/{cidr}"))
    122             current_interface = None
    123             current_server = None
    124 
    125             # eprint("--appended new subnet entry--")
    126 
    127     for server in servers:
    128         eprint("found server: " + str(server), indents=1)
    129 
    130     return servers
    131 
    132 def merge_interface_groups(iface_groups, join_groups):
    133 
    134     eprint("merging interface groups...")
    135 
    136     merged_iface_groups = iface_groups.copy()
    137 
    138     for _join_group in join_groups:
    139 
    140         eprint("join group: " + str(_join_group), indents=1)
    141 
    142         join_group = _join_group.split()
    143 
    144         for i in range(len(join_group)):
    145             join_group[i] = get_network_interface(join_group[i])
    146 
    147         indices_to_merge = []
    148 
    149         # Find the indices of the groups that contain interfaces from the join_group
    150         for i, group in enumerate(merged_iface_groups):
    151             if any(iface in group for iface in join_group):
    152                 indices_to_merge.append(i)
    153 
    154         # Merge the groups together
    155         if indices_to_merge:
    156             merged_group = []
    157             for index in indices_to_merge:
    158                 merged_group.extend(merged_iface_groups[index])
    159             merged_group = list(set(merged_group))  # remove duplicates
    160 
    161             eprint(f"Merging groups at indices {indices_to_merge} into {merged_group}", indents=1)
    162             
    163             # Remove the old groups and add the merged group
    164             indices_to_merge.sort(reverse=True)
    165             for index in indices_to_merge:
    166                 del merged_iface_groups[index]
    167             merged_iface_groups.append(merged_group)
    168 
    169     return merged_iface_groups
    170 
    171 def get_iface_groups(iface_types, timelimit):
    172 
    173     eprint("running iface-groups to get interface groups...")
    174 
    175     eprint("given iface types: " + str(iface_types), indents=1)
    176     eprint("time limit: " + str(timelimit), indents=1)
    177 
    178     cmd = ['/bin/iface-groups']
    179 
    180     for iface_type in iface_types:
    181         cmd.extend(['-i', iface_type])
    182 
    183     cmd.extend(['-t', str(timelimit)])
    184 
    185     output = subprocess.run(cmd, capture_output=True, text=True)
    186 
    187     if output.returncode != 0:
    188         eprint('Error running iface-groups')
    189         return 1
    190 
    191     interface_groups = []
    192     for line in output.stdout.splitlines():
    193         interface_group = line.split()
    194         interface_groups.append(interface_group)
    195         
    196         eprint("interface group: " + str(interface_group), indents=1)
    197 
    198     return interface_groups
    199 
    200 def assign_discovered_subnets(dhcp_info, iface_groups):
    201 
    202     eprint("assigning discovered subnets...") 
    203     
    204     assigned_subnets = []
    205     
    206     for iface_group in iface_groups:
    207 
    208         subnet_found = False
    209         
    210         for _iface in iface_group:
    211 
    212             # convert MAC to iface
    213             iface = get_network_interface(_iface)
    214 
    215             subnet = next((s for i, s in dhcp_info if i == iface), None)
    216             
    217             if subnet is not None:
    218                 assigned_subnets.append([iface_group, subnet, 'discovered'])
    219                 subnet_found = True
    220 
    221                 eprint("ASSIGNED: " + str(assigned_subnets[-1]), color='green')
    222                 break
    223         
    224         if not subnet_found:
    225 
    226             assigned_subnets.append([iface_group, None, 'none'])
    227 
    228     # assigned_subnet_overlap(assigned_subnets, '192.168.0.0/8')
    229     
    230     return assigned_subnets
    231 
    232 def assigned_subnet_overlap(assigned_subnets, new_subnet):
    233 
    234     eprint("checking if " + str(new_subnet) + " overlaps with existing subnets...", indents=1)
    235 
    236     new_subnet = ipaddress.IPv4Network(str(new_subnet), False)
    237 
    238     # [[['enp0s20f0u3', 'enp0s31f6'], '192.168.1.1/24', 'discovered'], [['eth0'], None, 'none']]
    239     for assigned_subnet in assigned_subnets:
    240 
    241         a_subnet = assigned_subnet[1]
    242 
    243         if a_subnet is not None:
    244             if new_subnet.overlaps(ipaddress.IPv4Network(str(a_subnet), False)):
    245 
    246                 eprint("subnet " + str(new_subnet) + " overlaps existing subnet " + str(a_subnet), indents=2)
    247                 return False
    248 
    249     return True
    250 
    251 def assign_manual_subnets(assigned_subnets, manual_subnets):
    252 
    253     eprint("Manually assigning subnets...")
    254     
    255     if manual_subnets is None:
    256         eprint("No manual subnets provided", indents=1)
    257         return
    258 
    259     for _m_iface, m_subnet in manual_subnets:
    260 
    261         m_iface = get_network_interface(_m_iface)
    262 
    263         if m_subnet.isdigit():
    264             eprint('manual pair (' + str(m_iface) + ', ' + str(m_subnet) + ') will be used for dynamic generation', indents=1)
    265             continue
    266 
    267         # we assign manual subnets in order of precedence
    268         if not assigned_subnet_overlap(assigned_subnets, m_subnet):
    269             continue
    270 
    271         # [[['enp0s20f0u3', 'enp0s31f6'], '192.168.1.1/24', 'discovered'], [['eth0'], None, 'none']]
    272         for index, assigned_subnet in enumerate(assigned_subnets):
    273 
    274             a_group  = assigned_subnet[0]
    275             a_subnet = assigned_subnet[1]
    276 
    277             if m_iface in a_group:
    278 
    279                 eprint('found ' + str(m_iface) + ' in ' + str(a_group) + ' with subnet ' + str(a_subnet), indents=1)
    280 
    281                 if a_subnet is None:
    282 
    283                     eprint("Manually assigning " + str(m_iface) + ' the subnet ' + str(m_subnet), indents=1)
    284 
    285                     assigned_subnets[index][1] = m_subnet
    286                     assigned_subnets[index][2] = 'manual'
    287 
    288                     eprint("ASSIGNED: " + str(assigned_subnets[index]), color='green')
    289 
    290                 else:
    291 
    292                     eprint("DHCP server already exists, ignoring manual assignment", color='red', indents=1)
    293 
    294                 break
    295 
    296 def possible_subnets(main, taken):
    297 
    298     # eprint("searching for possible subnets within " + str(main) + "...", indents=2)
    299 
    300     # we assume no subnets are available intially
    301     available = []
    302     q = queue.Queue()
    303 
    304     # add first node for expansion in the BFS process
    305     q.put(main)
    306 
    307     while q.qsize() > 0:
    308         subnet = q.get()
    309         has_overlap = False
    310 
    311         if taken:
    312 
    313             for taken_subnet in taken:
    314 
    315                 if subnet.overlaps(taken_subnet):
    316                     # still has overlaps somewhere in children, keep expanding
    317 
    318                     if subnet.prefixlen < 31: # avoid adding /32 and /31 subnets to the queue
    319                         for sub_subnet in subnet.subnets():
    320                             q.put(sub_subnet)
    321                     has_overlap = True
    322                     break
    323 
    324             if not has_overlap:
    325                 # no overlaps with taken - this subnet is entirely available
    326                 available.append(subnet)
    327         else:
    328             # if no subnets are taken, all subnets are available
    329             available.append(subnet)
    330 
    331     return available
    332 
    333 def get_new_subnet(available_subnets, taken_subnets, desired_subnet_size):
    334 
    335     eprint("generating a list of subnets already in use...", indents=1)
    336 
    337     if taken_subnets:
    338         for taken_subnet in taken_subnets:
    339             eprint("unavailable subnet: " + str(taken_subnet), indents=2)
    340     else:
    341         eprint("no known subnets are in use", indents=2)
    342 
    343     available_subnets_complete = []
    344 
    345     eprint("generating a sorted list of available subnets, after subtracting unavailable ranges...", indents=1)
    346 
    347     for available_supernet in available_subnets:
    348         eprint("candidate supernet: " + str(available_supernet), indents=2)
    349 
    350     for available_subnet in available_subnets:
    351         available_subnets_complete.extend(possible_subnets(available_subnet, taken_subnets))
    352 
    353     # now we have an exhaustive list of the known free space
    354 
    355     sorted_subnets = sorted(available_subnets_complete, key=lambda subnet: subnet.prefixlen, reverse=True)
    356 
    357     # find the smallest subnet that fits our needs
    358 
    359     matching_subnet = []
    360 
    361     eprint("searching available subnets for smallest that is at least a /" + str(desired_subnet_size) + " ...", indents=1)
    362 
    363     for subnet in sorted_subnets:
    364 
    365         eprint("candidate subnet: " + str(subnet), indents=2)
    366 
    367         if int(subnet.prefixlen) <= int(desired_subnet_size):
    368             matching_subnet = subnet
    369             # eprint("subnet is suitable: " + str(subnet))
    370             break
    371 
    372     if not matching_subnet:
    373         eprint("no matching subnet found", indents=3)
    374         return 1
    375     else:
    376         resized = next(matching_subnet.subnets(new_prefix=int(desired_subnet_size)))
    377         eprint("available subnet found: " + str(resized), indents=3)
    378         return resized
    379 
    380 def assign_dynamic_subnets(assigned_subnets, manual_subnets, subnet_pool, default_subnet_size):
    381 
    382     eprint("dynamically assigning subnets...")
    383 
    384     existing_subnets = [ipaddress.IPv4Network(subnet, False) for _, subnet, _ in assigned_subnets if subnet]
    385 
    386     # Create a dictionary mapping interfaces to preferred subnet sizes for easy lookup
    387     preferred_sizes = {}
    388 
    389     manual_subnets = manual_subnets if manual_subnets is not None else []
    390 
    391     for _iface, size in manual_subnets:
    392 
    393         iface = get_network_interface(_iface)
    394 
    395         if size.isdigit():
    396             preferred_sizes[iface] = int(size)
    397 
    398     # Iterate over assigned_subnets looking for interface groups without assigned subnets
    399     for index, (iface_group, assigned_subnet, method) in enumerate(assigned_subnets):
    400 
    401         if assigned_subnet is not None:
    402             continue  # Skip interface groups that already have a subnet assigned
    403 
    404         # Get the preferred subnet size for the first interface in the group that has a preference,
    405         # or use the default subnet size if no preference is found
    406         subnet_size = default_subnet_size
    407         for _iface in iface_group:
    408 
    409             iface = get_network_interface(_iface)
    410 
    411             if iface in preferred_sizes:
    412                 subnet_size = preferred_sizes[iface]
    413                 break
    414 
    415         eprint("Finding a /" + str(subnet_size) + " subnet for interface group " + str(iface_group), indents=1)
    416 
    417         # Create a list to hold the generated subnets
    418         generated_subnets = []
    419 
    420         # Generate all possible subnets of the required size from the subnet pool
    421         for pool_subnet in subnet_pool:
    422             pool_subnet = ipaddress.IPv4Network(pool_subnet, False)  # ensure pool_subnet is an IPv4Network object
    423             generated_subnets.extend(possible_subnets(pool_subnet, [ipaddress.IPv4Network(s[1], False) for s in assigned_subnets if s[1] is not None]))
    424 
    425         new_subnet = get_new_subnet(generated_subnets, existing_subnets, subnet_size)
    426 
    427         if new_subnet:
    428 
    429             eprint(f"assigning {new_subnet} to {iface_group}", indents=1)
    430             existing_subnets.append(new_subnet)
    431             assigned_subnets[index][1] = str(new_subnet)
    432             assigned_subnets[index][2] = 'dynamic'
    433 
    434             eprint("ASSIGNED: " + str(assigned_subnets[index]), color='green')
    435             # continue
    436                 
    437         else:
    438             eprint(f"No available subnets for {iface_group}", indents=1)
    439 
    440 def find_and_assign_subnets(iface_types, timelimit, subnet_pool, manual_subnets, default_subnet_size, join_groups):
    441 
    442     eprint("Finding and assigning subnets...", color='green')
    443     
    444     iface_groups = get_iface_groups(iface_types, timelimit)
    445 
    446     # eprint("iface groups: " + str(iface_groups))
    447 
    448     if join_groups:
    449         iface_groups = merge_interface_groups(iface_groups, join_groups)
    450 
    451     dhcp_info    = extract_dhcp_info()
    452 
    453     assigned_subnets = assign_discovered_subnets(dhcp_info, iface_groups)
    454 
    455     # eprint("dhcp results: " + str(dhcp_info), indents=1)
    456     # eprint("RAW GROUPS       > " + str(iface_groups))
    457     # eprint("ASSIGNED: " + str(assigned_subnets), color='green')
    458     # eprint("MANUAL SUBNETS   > " + str(manual_subnets))
    459     # eprint("IFACE GROUPS     > " + str(iface_groups))
    460 
    461     assign_manual_subnets(assigned_subnets, manual_subnets)
    462 
    463     # eprint("ASSIGNED: " + str(assigned_subnets), color='green')
    464 
    465     assign_dynamic_subnets(assigned_subnets, manual_subnets, subnet_pool, default_subnet_size)
    466 
    467     # eprint("ASSIGNED: " + str(assigned_subnets), color='green')
    468 
    469     eprint("done finding and assigning subnets", color='green')
    470 
    471     return assigned_subnets
    472 
    473 def valid_cidr(string):
    474     try:
    475         ipaddress.ip_network(string)
    476         return string
    477     except ValueError:
    478         msg = "%r is not a valid CIDR block" % string
    479         raise argparse.ArgumentTypeError(msg)
    480 
    481 def valid_pair(string):
    482     parts = string.split()
    483     if len(parts) != 2:
    484         msg = "%r is not a valid interface-subnet pair" % string
    485         raise argparse.ArgumentTypeError(msg)
    486     iface, cidr = parts
    487 
    488     # Check if cidr is a valid IP network
    489     try:
    490         ipaddress.ip_network(cidr)
    491         return iface, cidr
    492     except ValueError:
    493         pass
    494 
    495     # Check if cidr is a valid digit between 32 and 0
    496     try:
    497         cidr_value = int(cidr)
    498         if 0 <= cidr_value <= 32:
    499             return iface, cidr
    500         else:
    501             msg = "%r in the pair is not a valid CIDR block or digit" % cidr
    502             raise argparse.ArgumentTypeError(msg)
    503     except ValueError:
    504         pass
    505 
    506     msg = "%r in the pair is not a valid CIDR block or digit" % cidr
    507     raise argparse.ArgumentTypeError(msg)
    508 
    509 def main(args):
    510 
    511     parser = argparse.ArgumentParser(description="Find and assign subnets")
    512 
    513     parser.add_argument('-i', '--interfaces', nargs='+', required=True,
    514                         choices=['eth', 'wlan', 'bridge', 'vlan', 'bond', 'tap', 'dummy', 'ppp', 'ipip',
    515                                  'ib', 'ibchild', 
    516                                  'ip6tnl', 'lo', 'sit', 'gre', 'irda', 'wlan_aux', 'tun', 'isdn', 'mip6mnha'],
    517                         help="Network interfaces")
    518     parser.add_argument('-t', '--timeout', type=int, required=True, help="Timeout period")
    519     parser.add_argument('-s', '--subnets', nargs='+', required=True, type=valid_cidr, help="Subnet pools in CIDR format")
    520     parser.add_argument('-m', '--manual', nargs='+', type=valid_pair, help="Manually assigned interface-subnet pairs")
    521     parser.add_argument('-d', '--subnetsize', type=int, default=24, help="Default size for dynamically generated subnets")
    522     parser.add_argument('-j', '--join', nargs='+', type=str, help="Specify interfaces to be assumed on the same L2 network")
    523 
    524     # TODO (later - important ones are done)
    525     # define a default subnet size associated with an interface: -d 'eth0 21' 'eth5 16'
    526     # exclude interfaces and their interface groups: -X 'eth0 eth4'
    527     # exclude interfaces but not their interface groups: -x 'eth2 eth3'
    528     # only use these interfaces, and their interface groups: -O 'eth1 eth8'
    529     # only use these interfaces, not even others in their iface group: -o 'eth2 eth4'
    530 
    531     args = parser.parse_args(args)
    532 
    533     check_root()
    534 
    535     results = find_and_assign_subnets(args.interfaces, args.timeout, args.subnets, args.manual, args.subnetsize, args.join)
    536 
    537     for result in results:
    538         print('ifaces: ' + str(' '.join(result[0])) + ' status: ' + str(result[2]) + ' subnet: ' + str(result[1]))
    539 
    540 if __name__ == "__main__":
    541     main(sys.argv[1:])