link/py/link.py

430 lines
16 KiB
Python

import argparse
import os
import subprocess
import sys
from pyroute2 import IPRoute, NetNS
class NetworkNamespaceManager:
"""
Provides functionalities to manage network namespaces.
Network namespaces partition network resources such as network links,
IP addresses, and port numbers into disjoint sets.
"""
@staticmethod
def list_netns():
"""
List all available network namespaces.
Scans the '/var/run/netns' directory and prints out all
the network namespace files present. If no namespaces are found, it
prints a message indicating that.
"""
netns_dir = "/var/run/netns"
try:
netns_files = os.listdir(netns_dir)
if netns_files:
print("List of available network namespaces:")
for netns_file in netns_files:
print(f"- {netns_file}")
else:
print("No network namespaces found.")
except OSError as e:
print(f"Error listing network namespaces: {e}")
class ContainerNetnsExposer:
"""
Responsible for exposing network namespaces of containers.
Supports Docker, LXC, LXD, and Incus containers, allowing the user to interact
with their network namespaces.
"""
def __init__(self):
"""
Initializes the ContainerNetnsExposer instance.
"""
self.netns_pid = None
self.netns_path = None
def expose_container_netns(self, container_id_or_name, container_type='docker'):
"""
Expose the network namespace of a specified container.
Parameters:
container_id_or_name (str): The identifier or name of the container.
container_type (str): The type of the container ('docker', 'lxc', 'lxd', or 'incus').
Returns:
str: The PID of the container as a string.
Raises:
SystemExit: If the container type is unsupported or if there is an error
in retrieving the container's PID.
"""
if container_type == 'docker':
self.get_docker_container_pid(container_id_or_name)
elif container_type in ('lxc', 'lxd', 'incus'):
self.get_lxc_container_pid(container_id_or_name, container_type)
else:
print("Unsupported container type. Only 'docker', 'lxc', 'lxd', and 'incus' are supported.")
sys.exit(1)
self.create_netns_directory()
self.create_or_remove_netns_symlink()
return str(self.netns_pid)
def get_docker_container_pid(self, container_id_or_name):
"""
Retrieve the PID of a Docker container.
Parameters:
container_id_or_name (str): The identifier or name of the Docker container.
Raises:
SystemExit: If there is an error in retrieving the Docker container's PID.
"""
try:
self.netns_pid = subprocess.check_output(
["sudo", "docker", "inspect", "-f", "{{.State.Pid}}", container_id_or_name],
universal_newlines=True
).strip()
except subprocess.CalledProcessError:
print("Error retrieving Docker container PID. Make sure the container exists and is running.")
sys.exit(1)
def get_lxc_container_pid(self, container_name, container_type='lxc'):
"""
Retrieve the PID of an LXC, LXD, or Incus container.
Parameters:
container_name (str): The name of the container.
container_type (str): The type of the container ('lxc', 'lxd', or 'incus').
Raises:
SystemExit: If there is an error in retrieving the container's PID.
"""
try:
if container_type == 'lxc':
output = subprocess.check_output(
["lxc-info", "-n", container_name, "-p"],
universal_newlines=True
)
self.netns_pid = output.strip().split()[-1]
elif container_type == 'lxd':
output = subprocess.check_output(
["sudo", "lxc", "info", container_name],
universal_newlines=True
)
for line in output.splitlines():
if line.strip().startswith("PID:"):
self.netns_pid = line.split(':')[1].strip()
break
else:
print(f"PID not found in 'lxc info' output for container '{container_name}'.")
sys.exit(1)
elif container_type == 'incus':
output = subprocess.check_output(
["sudo", "incus", "info", container_name],
universal_newlines=True
)
for line in output.splitlines():
if line.strip().startswith("PID:"):
self.netns_pid = line.split(':')[1].strip()
break
else:
print(f"PID not found in 'incus info' output for container '{container_name}'.")
sys.exit(1)
except subprocess.CalledProcessError as e:
print(f"Error retrieving {container_type.upper()} container PID for '{container_name}'. Error: {e}")
sys.exit(1)
def create_netns_directory(self):
"""
Create the network namespace directory if it does not exist.
Ensures that the directory '/var/run/netns' exists, which is used
to store network namespace symlinks.
"""
try:
subprocess.run(["sudo", "mkdir", "-p", "/var/run/netns"], check=True)
except subprocess.CalledProcessError as e:
print(f"Error creating network namespace directory: {e.stderr.strip()}")
sys.exit(1)
def create_or_remove_netns_symlink(self):
"""
Create or remove a symlink to the network namespace of a container.
Sets up a symlink in '/var/run/netns', pointing to the network namespace
of the container identified by its PID. If a symlink with the same name
already exists, it is removed before creating a new one.
"""
self.netns_path = f"/var/run/netns/{self.netns_pid}"
try:
if os.path.exists(self.netns_path):
subprocess.run(["sudo", "rm", "-rf", self.netns_path], check=True)
subprocess.run(["sudo", "ln", "-s", f"/proc/{self.netns_pid}/ns/net", self.netns_path], check=True)
except subprocess.CalledProcessError as e:
print(f"Error creating or removing symlink: {e}")
sys.exit(1)
class IfaceManager:
"""
Manages network interfaces, including creation, deletion, and configuration
of veth pairs, VLANs, and bridges.
"""
def delete(self, iface_name, namespace=None):
"""
Delete a network interface.
Parameters:
iface_name (str): The name of the interface to delete.
namespace (str, optional): The network namespace where the interface exists.
Raises:
Exception: If the interface cannot be deleted.
"""
try:
if namespace:
context_manager = NetNS(namespace)
else:
context_manager = IPRoute()
with context_manager as ns:
iface_idx_list = ns.link_lookup(ifname=iface_name)
if not iface_idx_list:
raise ValueError(f"Interface {iface_name} not found.")
iface_idx = iface_idx_list[0]
ns.link('del', index=iface_idx)
except Exception as e:
print(f"Error deleting interface {iface_name} in namespace {namespace}: {e}")
def create_veth(self, iface1, iface2, namespace=None):
"""
Create a veth pair.
Parameters:
iface1 (str): The name of the first interface.
iface2 (str): The name of the second interface.
namespace (str, optional): The network namespace where to create the veth pair.
Raises:
Exception: If the veth pair cannot be created.
"""
try:
if namespace:
context_manager = NetNS(namespace)
else:
context_manager = IPRoute()
with context_manager as ns:
ns.link('add', ifname=iface1, peer={'ifname': iface2}, kind='veth')
except Exception as e:
print(f"Error creating veth pair {iface1} and {iface2}: {e}")
def create_vlan(self, base_iface, vlan_id, namespace=None):
"""
Create a VLAN interface.
Parameters:
base_iface (str): The base interface name.
vlan_id (int): The VLAN ID.
namespace (str, optional): The network namespace where to create the VLAN interface.
Raises:
Exception: If the VLAN interface cannot be created.
"""
try:
if namespace:
context_manager = NetNS(namespace)
else:
context_manager = IPRoute()
with context_manager as ns:
base_iface_idx_list = ns.link_lookup(ifname=base_iface)
if not base_iface_idx_list:
raise ValueError(f"Base interface {base_iface} not found.")
base_iface_idx = base_iface_idx_list[0]
vlan_iface = f"{base_iface}.{vlan_id}"
ns.link('add', ifname=vlan_iface, link=base_iface_idx, kind='vlan', vlan_info={'id': vlan_id})
except Exception as e:
print(f"Error creating VLAN on interface {base_iface}: {e}")
def create_bridge(self, bridge_name, namespace=None):
"""
Create a bridge interface.
Parameters:
bridge_name (str): The name of the bridge.
namespace (str, optional): The network namespace where to create the bridge.
Raises:
Exception: If the bridge cannot be created.
"""
try:
if namespace:
context_manager = NetNS(namespace)
else:
context_manager = IPRoute()
with context_manager as ns:
ns.link('add', ifname=bridge_name, kind='bridge')
except Exception as e:
print(f"Error creating bridge {bridge_name}: {e}")
def move(self, iface, namespace):
"""
Move a network interface to another namespace.
Parameters:
iface (str): The interface name to move.
namespace (str): The target network namespace.
Raises:
Exception: If the interface cannot be moved.
"""
try:
ipr = IPRoute()
idx_list = ipr.link_lookup(ifname=iface)
if not idx_list:
raise ValueError(f"Interface {iface} not found.")
idx = idx_list[0]
ipr.link('set', index=idx, net_ns_fd=namespace)
except Exception as e:
print(f"Error moving interface {iface} to namespace {namespace}: {e}")
def set_interface_up(self, iface_name, namespace=None):
"""
Set a network interface up.
Parameters:
iface_name (str): The name of the interface.
namespace (str, optional): The network namespace where the interface exists.
Raises:
Exception: If the interface cannot be set up.
"""
try:
if namespace:
context_manager = NetNS(namespace)
else:
context_manager = IPRoute()
with context_manager as ns:
iface_idx_list = ns.link_lookup(ifname=iface_name)
if not iface_idx_list:
raise ValueError(f"Interface {iface_name} not found.")
iface_idx = iface_idx_list[0]
ns.link("set", index=iface_idx, state="up")
except Exception as e:
print(f"Error setting up interface {iface_name} in namespace {namespace}: {e}")
def attach_to_bridge(self, iface_name, bridge_name, namespace=None):
"""
Attach an interface to a bridge.
Parameters:
iface_name (str): The name of the interface.
bridge_name (str): The name of the bridge.
namespace (str, optional): The network namespace where the interface and bridge exist.
Raises:
Exception: If the interface cannot be attached to the bridge.
"""
try:
if namespace:
context_manager = NetNS(namespace)
else:
context_manager = IPRoute()
with context_manager as ns:
iface_idx_list = ns.link_lookup(ifname=iface_name)
if not iface_idx_list:
raise ValueError(f"Interface {iface_name} not found.")
iface_idx = iface_idx_list[0]
bridge_idx_list = ns.link_lookup(ifname=bridge_name)
if not bridge_idx_list:
raise ValueError(f"Bridge {bridge_name} not found.")
bridge_idx = bridge_idx_list[0]
ns.link("set", index=iface_idx, master=bridge_idx)
except Exception as e:
print(f"Error attaching interface {iface_name} to bridge {bridge_name} in namespace {namespace}: {e}")
def interpret_namespace(namespace_arg):
"""
Interpret the namespace argument.
If the argument is '1', converts it to None (representing the default namespace),
otherwise returns the argument as is.
Parameters:
namespace_arg (str): The namespace argument.
Returns:
str or None: The interpreted namespace.
"""
return None if namespace_arg == '1' else namespace_arg
def main():
parser = argparse.ArgumentParser(description="Create veth pairs between containers with optional bridge attachment.")
parser.add_argument("-ns1", "--namespace1", default=None, help="Name of the first namespace or container, or '1' for the default namespace.")
parser.add_argument("-ns2", "--namespace2", default=None, help="Name of the second namespace or container, or '1' for the default namespace.")
parser.add_argument("-n1", "--name1", required=True, help="Name of the first veth interface.")
parser.add_argument("-n2", "--name2", required=True, help="Name of the second veth interface.")
parser.add_argument("-b1", "--bridge1", default=None, help="Name of the network bridge for ns1.")
parser.add_argument("-b2", "--bridge2", default=None, help="Name of the network bridge for ns2.")
parser.add_argument("-t1", "--type1", default=None, help="Container type for ns1 ('docker', 'lxc', 'lxd', 'incus', or 'None' for the default namespace).")
parser.add_argument("-t2", "--type2", default=None, help="Container type for ns2 ('docker', 'lxc', 'lxd', 'incus', or 'None' for the default namespace).")
args = parser.parse_args()
# Processing namespace arguments and container types
ns1, type1 = interpret_namespace(args.namespace1), args.type1
ns2, type2 = interpret_namespace(args.namespace2), args.type2
# Instantiate management classes
iface_manager = IfaceManager()
container_exposer = ContainerNetnsExposer()
# Expose container network namespaces if applicable
if type1 and ns1:
ns1_pid = container_exposer.expose_container_netns(ns1, type1)
ns1 = ns1_pid
if type2 and ns2:
ns2_pid = container_exposer.expose_container_netns(ns2, type2)
ns2 = ns2_pid
# Create veth pair
iface_manager.create_veth(args.name1, args.name2)
# Move ends of the veth pair to appropriate namespaces if required
if ns1:
iface_manager.move(args.name1, ns1)
if ns2:
iface_manager.move(args.name2, ns2)
# Optional: Attach to network bridge and bring interfaces up
if args.bridge1 and args.name1:
iface_manager.attach_to_bridge(args.name1, args.bridge1, ns1)
iface_manager.set_interface_up(args.name1, ns1)
if args.bridge2 and args.name2:
iface_manager.attach_to_bridge(args.name2, args.bridge2, ns2)
iface_manager.set_interface_up(args.name2, ns2)
# Bring up interfaces if not already up
if not args.bridge1 and args.name1:
iface_manager.set_interface_up(args.name1, ns1)
if not args.bridge2 and args.name2:
iface_manager.set_interface_up(args.name2, ns2)
if __name__ == "__main__":
main()