from pprint import pformat from pathlib import Path from subprocess import run import shutil from ipaddress import ( IPv4Interface, IPv6Interface, IPv6Address, IPv4Address, ip_interface, ip_address, ) from typing import List, Dict, Optional from tempfile import TemporaryDirectory import logging from getpass import getpass from yaml import dump from python_graphql_client import GraphqlClient from pynetbox_config_helper import build_argparser, resolve_graphql_config import importlib.resources logger = logging.getLogger(__name__) IPvAnyInterface = IPv6Interface | IPv4Interface IPvAnyAddress = IPv6Address | IPv4Address my_resources = importlib.resources.files("swagspace_cloud_init") query_base_path = Path("swagspace_cloud_init/") vm_query = my_resources.joinpath("vm.graphql").read_text() prefix_query = my_resources.joinpath("prefixes.graphql").read_text() address_query = my_resources.joinpath("ip_address.graphql").read_text() nameservers = [ip_address(nameserver) for nameserver in ["2620:fe::fe", "2620:fe::9"]] class MultiLineFormatter(logging.Formatter): """Multi-line formatter.""" def get_header_length(self, record): """Get the header length of a given record.""" return len( super().format( logging.LogRecord( name=record.name, level=record.levelno, pathname=record.pathname, lineno=record.lineno, msg="", args=(), exc_info=None, ) ) ) def format(self, record): """Format a record with added indentation.""" indent = " " * self.get_header_length(record) head, *trailing = super().format(record).splitlines(True) return head + "".join(indent + line for line in trailing) def graphql_query(client: GraphqlClient, query, variables: Optional[Dict] = None): if variables is None: variables = dict() logger.debug(f"running GraphQL query:\n{query}\nwith variables:\n{variables}") data = client.execute(query, variables=variables) logger.debug(f"received data from GraphQL:\n{pformat(data)}") return data def build_vm(): stderr_formatter = logging.StreamHandler(None) stderr_formatter.setFormatter( MultiLineFormatter("%(levelname)s:%(name)s:%(message)s") ) logging.basicConfig( level=logging.DEBUG, handlers=[stderr_formatter], ) parser = build_argparser() parser.add_argument("vm_name", type=str, help="The name of the VM to build") parser.add_argument( "--base_image", type=Path, help="The path to the cloud-init image to use. Default: 'debian13-cloud-init.qcow2'", default=Path("debian13-cloud-init.qcow2"), ) parser.add_argument( "--image_dir", type=Path, help="The location where the image should be stored.", default=Path("/var/lib/libvirt/images/"), ) parser.add_argument( "--network-bridge", type=str, help="Name of the bridge the VM should be attached to", default="br-libvirt0", ) parser.add_argument( "--dry-run", action="store_true", help="Query NetBox, build the cloud-init files but don't actually build the image", ) parser.add_argument( "--overwrite", action="store_true", help="overwrite the image, if it exists" ) parser.add_argument( "--username", type=str, help="name of the user, Default: jo", default="jo" ) args = parser.parse_args() endpoint, headers = resolve_graphql_config(parser.parse_args()) headers.update({ "Content-Type": "application/json", "Accept": "application/json", }) client = GraphqlClient(endpoint=endpoint, headers=headers) password = getpass(f"Password for user {args.username}:") data = graphql_query(client, vm_query, variables={"name": args.vm_name})["data"] vm = data["vms"][0] cpu_cores = int(float(vm["vcpus"])) memory_size_mbytes = int(float(vm["memory"])) disk_size = f"{int(vm['disk'])}M" hostname = args.vm_name assert len(vm["interfaces"]) == 1, ( "VMs can have at most 1 interface for automatic installation" ) interface = vm["interfaces"][0] primary_interface = interface["name"] query = prefix_query logger.debug(query) addresses = list() gateways = list() # find prefix containing the address: for address in interface["ip_addresses"]: interface_addr = ip_interface(address["address"]) addresses.append(interface_addr) prefixes = graphql_query( client, query, variables={ "address": str(interface_addr.ip), "prefix_length": str(interface_addr.network.prefixlen), "vrf": "null", }, )["data"] if len(prefixes["prefixes"]) >= 1: logger.error("Expected at most one prefix containing the IP address") if len(prefixes["prefixes"]) == 0: logger.info("No matching prefix found.") prefix = prefixes["prefixes"][0] gateway_id = prefix["custom_fields"]["gateway"] if gateway_id is not None: query = address_query gateway = graphql_query(client, query, variables={"id": gateway_id})[ "data" ]["ip_address"]["address"] gateway_address = ip_interface(gateway).ip logger.debug( f"found gateway {gateway_address} for address {interface_addr}" ) gateways.append(gateway_address) disk_path = create_image( args.base_image, args.image_dir, f"{hostname}.qcow2", disk_size, args.overwrite ) cloud_init_tempdir = TemporaryDirectory(delete=False) cloud_init_tempdir_path = Path(cloud_init_tempdir.name) network_config = build_network_config( addresses, gateways, nameservers, cloud_init_tempdir_path, default_interface_name=primary_interface, ) user_data = build_user_data( username=args.username, password=password, target_dir=cloud_init_tempdir_path, ) meta_data = cloud_init_tempdir_path / "meta-data" meta_data.touch() if not args.dry_run: install_image( args.vm_name, memory_size_mbytes=memory_size_mbytes, disk_path=disk_path, user_data=user_data, network_config=network_config, meta_data=meta_data, network_bridge=args.network_bridge, num_cpu_cores=cpu_cores, ) def build_network_config( addresses: List[IPvAnyInterface], default_gateways: List[IPvAnyAddress], nameservers: List[IPvAnyAddress], target_dir: Path, default_interface_name: str = "enp1s0", ) -> Path: """Builds the network-config file as a in *target_dir* and returns the Path to it.""" id0 = { "match": {"name": default_interface_name}, "addresses": [str(addr) for addr in addresses], "routes": [ {"to": "default", "via": str(gateway)} for gateway in default_gateways ], "nameservers": {"addresses": [str(nameserver) for nameserver in nameservers]}, } network_config = { "network": { "version": 2, "ethernets": {"id0": id0}, } } logger.debug(f"Network Config is:\n{pformat(network_config)}") target_path = target_dir / "network-config" with target_path.open("w+") as f: dump(network_config, f) return target_path def password_hash(password: str) -> str: """ Calculates a SHA512 hash of the password """ pw_hash = run( ["mkpasswd", "-m", "sha-512", "-s"], input=password, text=True, capture_output=True, ) if pw_hash.returncode != 0: raise Exception( f"Could not calculate password hash, non-zero return code: '{pw_hash.returncode}'.\nstdout: '{pw_hash.stdout}'\nstderr: '{pw_hash.stderr}'" ) return pw_hash.stdout def build_user_data(username: str, password: str, target_dir: Path) -> Path: """ Builds the user-data file in *target_dir* and returns a Path to it. """ users = [ { "name": username, "hashed_passwd": password_hash(password), "lock_passwd": False, "shell": "/bin/bash", "sudo": "ALL=(ALL) ALL", } ] power_state = { "mode": "poweroff", "message": "finished cloud init install, powering off", } keyboard = {"layout": "de", "model": "pc105"} user_data = {"users": users, "power_state": power_state, "keyboard": keyboard} target_path = target_dir / "user-data" with target_path.open("w+") as f: f.write("#cloud-config\n") dump(user_data, f) return target_path def create_image( base_image: Path, destination: Path, name: str, size: str, overwrite: bool = False ) -> Path: """ Copies the *base_image* to *destination* with the name *name* and resizes it to *size* Returns the final path of the image. """ assert base_image.is_file(), f"base_image '{base_image}' is not a file" assert destination.is_dir(), f"destination '{destination}' is not a directory" target = destination / name if not overwrite: assert not target.exists(), f"target file '{target}' already exists" shutil.copyfile(base_image, target) run(["qemu-img", "resize", str(target), size], check=True) return target def install_image( hostname: str, memory_size_mbytes: int, disk_path: Path, user_data: Path, meta_data: Path, network_config: Path, network_bridge: str, num_cpu_cores: int, ): # virt-install --name $HOSTNAME --memory 4000 --noreboot \ # --os-variant detect=on,name=debian13 \ # --disk="$(pwd)/${HOSTNAME}.qcow2" \ # --cloud-init user-data="$(pwd)/user-data,meta-data=$(pwd)/meta-data,network-config=$(pwd)/network-config" command = [ "virt-install", "--name", hostname, "--connect", "qemu:///system", "--memory", str(memory_size_mbytes), "--noreboot", "--noautoconsole", "--os-variant", "name=debian13", f"--disk='{disk_path}'", "--cloud-init", f"user-data={user_data},meta-data={meta_data},network-config={network_config}", "--vcpus", f"{num_cpu_cores}", ] if network_bridge is not None: command.extend([ "--network", f"bridge={network_bridge}", ]) logger.info(" ".join(command)) result = run(command, capture_output=True, env=dict(), timeout=60, text=True) logger.info(result.returncode) logger.info(result.stdout) logger.info(result.stderr)