import argparse import os import subprocess import sys from pyroute2 import IPRoute, NetNS class NetworkNamespaceManager: """ Provides functionalities to manage network namespaces. Network namespaces partition network resources such as network links, IP addresses, and port numbers into disjoint sets. """ @staticmethod def list_netns(): """ List all available network namespaces. Scans the '/var/run/netns' directory and prints out all the network namespace files present. If no namespaces are found, it prints a message indicating that. """ netns_dir = "/var/run/netns" try: netns_files = os.listdir(netns_dir) if netns_files: print("List of available network namespaces:") for netns_file in netns_files: print(f"- {netns_file}") else: print("No network namespaces found.") except OSError as e: print(f"Error listing network namespaces: {e}") class ContainerNetnsExposer: """ Responsible for exposing network namespaces of containers. Supports Docker, LXC, LXD, and Incus containers, allowing the user to interact with their network namespaces. """ def __init__(self): """ Initializes the ContainerNetnsExposer instance. """ self.netns_pid = None self.netns_path = None def expose_container_netns(self, container_id_or_name, container_type='docker'): """ Expose the network namespace of a specified container. Parameters: container_id_or_name (str): The identifier or name of the container. container_type (str): The type of the container ('docker', 'lxc', 'lxd', or 'incus'). Returns: str: The PID of the container as a string. Raises: SystemExit: If the container type is unsupported or if there is an error in retrieving the container's PID. """ if container_type == 'docker': self.get_docker_container_pid(container_id_or_name) elif container_type in ('lxc', 'lxd', 'incus'): self.get_lxc_container_pid(container_id_or_name, container_type) else: print("Unsupported container type. Only 'docker', 'lxc', 'lxd', and 'incus' are supported.") sys.exit(1) self.create_netns_directory() self.create_or_remove_netns_symlink() return str(self.netns_pid) def get_docker_container_pid(self, container_id_or_name): """ Retrieve the PID of a Docker container. Parameters: container_id_or_name (str): The identifier or name of the Docker container. Raises: SystemExit: If there is an error in retrieving the Docker container's PID. """ try: self.netns_pid = subprocess.check_output( ["sudo", "docker", "inspect", "-f", "{{.State.Pid}}", container_id_or_name], universal_newlines=True ).strip() except subprocess.CalledProcessError: print("Error retrieving Docker container PID. Make sure the container exists and is running.") sys.exit(1) def get_lxc_container_pid(self, container_name, container_type='lxc'): """ Retrieve the PID of an LXC, LXD, or Incus container. Parameters: container_name (str): The name of the container. container_type (str): The type of the container ('lxc', 'lxd', or 'incus'). Raises: SystemExit: If there is an error in retrieving the container's PID. """ try: if container_type == 'lxc': output = subprocess.check_output( ["lxc-info", "-n", container_name, "-p"], universal_newlines=True ) self.netns_pid = output.strip().split()[-1] elif container_type == 'lxd': output = subprocess.check_output( ["sudo", "lxc", "info", container_name], universal_newlines=True ) for line in output.splitlines(): if line.strip().startswith("PID:"): self.netns_pid = line.split(':')[1].strip() break else: print(f"PID not found in 'lxc info' output for container '{container_name}'.") sys.exit(1) elif container_type == 'incus': output = subprocess.check_output( ["sudo", "incus", "info", container_name], universal_newlines=True ) for line in output.splitlines(): if line.strip().startswith("PID:"): self.netns_pid = line.split(':')[1].strip() break else: print(f"PID not found in 'incus info' output for container '{container_name}'.") sys.exit(1) except subprocess.CalledProcessError as e: print(f"Error retrieving {container_type.upper()} container PID for '{container_name}'. Error: {e}") sys.exit(1) def create_netns_directory(self): """ Create the network namespace directory if it does not exist. Ensures that the directory '/var/run/netns' exists, which is used to store network namespace symlinks. """ try: subprocess.run(["sudo", "mkdir", "-p", "/var/run/netns"], check=True) except subprocess.CalledProcessError as e: print(f"Error creating network namespace directory: {e.stderr.strip()}") sys.exit(1) def create_or_remove_netns_symlink(self): """ Create or remove a symlink to the network namespace of a container. Sets up a symlink in '/var/run/netns', pointing to the network namespace of the container identified by its PID. If a symlink with the same name already exists, it is removed before creating a new one. """ self.netns_path = f"/var/run/netns/{self.netns_pid}" try: if os.path.exists(self.netns_path): subprocess.run(["sudo", "rm", "-rf", self.netns_path], check=True) subprocess.run(["sudo", "ln", "-s", f"/proc/{self.netns_pid}/ns/net", self.netns_path], check=True) except subprocess.CalledProcessError as e: print(f"Error creating or removing symlink: {e}") sys.exit(1) class IfaceManager: """ Manages network interfaces, including creation, deletion, and configuration of veth pairs, VLANs, and bridges. """ def delete(self, iface_name, namespace=None): """ Delete a network interface. Parameters: iface_name (str): The name of the interface to delete. namespace (str, optional): The network namespace where the interface exists. Raises: Exception: If the interface cannot be deleted. """ try: if namespace: context_manager = NetNS(namespace) else: context_manager = IPRoute() with context_manager as ns: iface_idx_list = ns.link_lookup(ifname=iface_name) if not iface_idx_list: raise ValueError(f"Interface {iface_name} not found.") iface_idx = iface_idx_list[0] ns.link('del', index=iface_idx) except Exception as e: print(f"Error deleting interface {iface_name} in namespace {namespace}: {e}") def create_veth(self, iface1, iface2, namespace=None): """ Create a veth pair. Parameters: iface1 (str): The name of the first interface. iface2 (str): The name of the second interface. namespace (str, optional): The network namespace where to create the veth pair. Raises: Exception: If the veth pair cannot be created. """ try: if namespace: context_manager = NetNS(namespace) else: context_manager = IPRoute() with context_manager as ns: ns.link('add', ifname=iface1, peer={'ifname': iface2}, kind='veth') except Exception as e: print(f"Error creating veth pair {iface1} and {iface2}: {e}") def create_vlan(self, base_iface, vlan_id, namespace=None): """ Create a VLAN interface. Parameters: base_iface (str): The base interface name. vlan_id (int): The VLAN ID. namespace (str, optional): The network namespace where to create the VLAN interface. Raises: Exception: If the VLAN interface cannot be created. """ try: if namespace: context_manager = NetNS(namespace) else: context_manager = IPRoute() with context_manager as ns: base_iface_idx_list = ns.link_lookup(ifname=base_iface) if not base_iface_idx_list: raise ValueError(f"Base interface {base_iface} not found.") base_iface_idx = base_iface_idx_list[0] vlan_iface = f"{base_iface}.{vlan_id}" ns.link('add', ifname=vlan_iface, link=base_iface_idx, kind='vlan', vlan_info={'id': vlan_id}) except Exception as e: print(f"Error creating VLAN on interface {base_iface}: {e}") def create_bridge(self, bridge_name, namespace=None): """ Create a bridge interface. Parameters: bridge_name (str): The name of the bridge. namespace (str, optional): The network namespace where to create the bridge. Raises: Exception: If the bridge cannot be created. """ try: if namespace: context_manager = NetNS(namespace) else: context_manager = IPRoute() with context_manager as ns: ns.link('add', ifname=bridge_name, kind='bridge') except Exception as e: print(f"Error creating bridge {bridge_name}: {e}") def move(self, iface, namespace): """ Move a network interface to another namespace. Parameters: iface (str): The interface name to move. namespace (str): The target network namespace. Raises: Exception: If the interface cannot be moved. """ try: ipr = IPRoute() idx_list = ipr.link_lookup(ifname=iface) if not idx_list: raise ValueError(f"Interface {iface} not found.") idx = idx_list[0] ipr.link('set', index=idx, net_ns_fd=namespace) except Exception as e: print(f"Error moving interface {iface} to namespace {namespace}: {e}") def set_interface_up(self, iface_name, namespace=None): """ Set a network interface up. Parameters: iface_name (str): The name of the interface. namespace (str, optional): The network namespace where the interface exists. Raises: Exception: If the interface cannot be set up. """ try: if namespace: context_manager = NetNS(namespace) else: context_manager = IPRoute() with context_manager as ns: iface_idx_list = ns.link_lookup(ifname=iface_name) if not iface_idx_list: raise ValueError(f"Interface {iface_name} not found.") iface_idx = iface_idx_list[0] ns.link("set", index=iface_idx, state="up") except Exception as e: print(f"Error setting up interface {iface_name} in namespace {namespace}: {e}") def attach_to_bridge(self, iface_name, bridge_name, namespace=None): """ Attach an interface to a bridge. Parameters: iface_name (str): The name of the interface. bridge_name (str): The name of the bridge. namespace (str, optional): The network namespace where the interface and bridge exist. Raises: Exception: If the interface cannot be attached to the bridge. """ try: if namespace: context_manager = NetNS(namespace) else: context_manager = IPRoute() with context_manager as ns: iface_idx_list = ns.link_lookup(ifname=iface_name) if not iface_idx_list: raise ValueError(f"Interface {iface_name} not found.") iface_idx = iface_idx_list[0] bridge_idx_list = ns.link_lookup(ifname=bridge_name) if not bridge_idx_list: raise ValueError(f"Bridge {bridge_name} not found.") bridge_idx = bridge_idx_list[0] ns.link("set", index=iface_idx, master=bridge_idx) except Exception as e: print(f"Error attaching interface {iface_name} to bridge {bridge_name} in namespace {namespace}: {e}") def interpret_namespace(namespace_arg): """ Interpret the namespace argument. If the argument is '1', converts it to None (representing the default namespace), otherwise returns the argument as is. Parameters: namespace_arg (str): The namespace argument. Returns: str or None: The interpreted namespace. """ return None if namespace_arg == '1' else namespace_arg def main(): parser = argparse.ArgumentParser(description="Create veth pairs between containers with optional bridge attachment.") parser.add_argument("-ns1", "--namespace1", default=None, help="Name of the first namespace or container, or '1' for the default namespace.") parser.add_argument("-ns2", "--namespace2", default=None, help="Name of the second namespace or container, or '1' for the default namespace.") parser.add_argument("-n1", "--name1", required=True, help="Name of the first veth interface.") parser.add_argument("-n2", "--name2", required=True, help="Name of the second veth interface.") parser.add_argument("-b1", "--bridge1", default=None, help="Name of the network bridge for ns1.") parser.add_argument("-b2", "--bridge2", default=None, help="Name of the network bridge for ns2.") parser.add_argument("-t1", "--type1", default=None, help="Container type for ns1 ('docker', 'lxc', 'lxd', 'incus', or 'None' for the default namespace).") parser.add_argument("-t2", "--type2", default=None, help="Container type for ns2 ('docker', 'lxc', 'lxd', 'incus', or 'None' for the default namespace).") args = parser.parse_args() # Processing namespace arguments and container types ns1, type1 = interpret_namespace(args.namespace1), args.type1 ns2, type2 = interpret_namespace(args.namespace2), args.type2 # Instantiate management classes iface_manager = IfaceManager() container_exposer = ContainerNetnsExposer() # Expose container network namespaces if applicable if type1 and ns1: ns1_pid = container_exposer.expose_container_netns(ns1, type1) ns1 = ns1_pid if type2 and ns2: ns2_pid = container_exposer.expose_container_netns(ns2, type2) ns2 = ns2_pid # Create veth pair iface_manager.create_veth(args.name1, args.name2) # Move ends of the veth pair to appropriate namespaces if required if ns1: iface_manager.move(args.name1, ns1) if ns2: iface_manager.move(args.name2, ns2) # Optional: Attach to network bridge and bring interfaces up if args.bridge1 and args.name1: iface_manager.attach_to_bridge(args.name1, args.bridge1, ns1) iface_manager.set_interface_up(args.name1, ns1) if args.bridge2 and args.name2: iface_manager.attach_to_bridge(args.name2, args.bridge2, ns2) iface_manager.set_interface_up(args.name2, ns2) # Bring up interfaces if not already up if not args.bridge1 and args.name1: iface_manager.set_interface_up(args.name1, ns1) if not args.bridge2 and args.name2: iface_manager.set_interface_up(args.name2, ns2) if __name__ == "__main__": main()