added code

This commit is contained in:
Johannes Erwerle 2025-07-13 14:23:13 +02:00
parent 9edd602376
commit 894687da0f
7 changed files with 1489 additions and 0 deletions

32
README.md Normal file
View file

@ -0,0 +1,32 @@
# Swagspace Cloud Init
This script builds VMs for the Swagspace infrastructure.
It takes gets the name of a VM, collect further data (CPUs, memory, IP addresses) from NetBox via GraphQL and creates a VM using [virt-install](https://virt-manager.org/index.html) and [cloud-init](https://cloudinit.readthedocs.io/en/latest/index.html).
It asks for a password for the user.
```
usage: build-vm [-h] [--instance INSTANCE] [--config CONFIG] [--url URL] [--token TOKEN] [--graphql-url GRAPHQL_URL] [--base_image BASE_IMAGE] [--image_dir IMAGE_DIR] [--network-bridge NETWORK_BRIDGE] [--dry-run] [--overwrite] [--username USERNAME] vm_name
positional arguments:
vm_name The name of the VM to build
options:
-h, --help show this help message and exit
--instance INSTANCE the name of the instance to use
--config CONFIG the config file to use
--url URL the URL of the NetBox API endpoint
--token TOKEN the token for the NetBox REST API
--graphql-url GRAPHQL_URL
The URL for the GraphQL endpoint
--base_image BASE_IMAGE
The path to the cloud-init image to use. Default: 'debian13-cloud-init.qcow2'
--image_dir IMAGE_DIR
The location where the image should be stored.
--network-bridge NETWORK_BRIDGE
Name of the bridge the VM should be attached to
--dry-run Query NetBox, build the cloud-init files but don't actually build the image
--overwrite overwrite the image, if it exists
--username USERNAME name of the user, Default: jo
```

1032
poetry.lock generated Normal file

File diff suppressed because it is too large Load diff

22
pyproject.toml Normal file
View file

@ -0,0 +1,22 @@
[project]
name = "swagspace_cloud_init"
version = "0.1.0"
description = ""
authors = [
{name = "Johannes Erwerle",email = "jo@swagspace.org"}
]
readme = "README.md"
requires-python = ">=3.12,<4"
dependencies = [
"pynetbox-config-helper @ git+https://github.com/BelWue/pynetbox-config-helper",
"pyyaml (>=6.0.2,<7.0.0)",
"python-graphql-client (>=0.4.3,<0.5.0)"
]
[project.scripts]
build-vm = "swagspace_cloud_init:build_vm"
[build-system]
requires = ["poetry-core>=2.0.0,<3.0.0"]
build-backend = "poetry.core.masonry.api"

View file

@ -0,0 +1,370 @@
from pprint import pformat
from pathlib import Path
from subprocess import run
import shutil
from ipaddress import (
IPv4Interface,
IPv6Interface,
IPv6Address,
IPv4Address,
ip_interface,
ip_address,
)
from typing import List, Dict, Optional
from tempfile import TemporaryDirectory
import logging
from getpass import getpass
from yaml import dump
from python_graphql_client import GraphqlClient
from pynetbox_config_helper import build_argparser, resolve_graphql_config
import importlib.resources
logger = logging.getLogger(__name__)
IPvAnyInterface = IPv6Interface | IPv4Interface
IPvAnyAddress = IPv6Address | IPv4Address
my_resources = importlib.resources.files("swagspace_cloud_init")
query_base_path = Path("swagspace_cloud_init/")
vm_query = my_resources.joinpath("vm.graphql").read_text()
prefix_query = my_resources.joinpath("prefixes.graphql").read_text()
address_query = my_resources.joinpath("ip_address.graphql").read_text()
nameservers = [ip_address(nameserver) for nameserver in ["2620:fe::fe", "2620:fe::9"]]
class MultiLineFormatter(logging.Formatter):
"""Multi-line formatter."""
def get_header_length(self, record):
"""Get the header length of a given record."""
return len(
super().format(
logging.LogRecord(
name=record.name,
level=record.levelno,
pathname=record.pathname,
lineno=record.lineno,
msg="",
args=(),
exc_info=None,
)
)
)
def format(self, record):
"""Format a record with added indentation."""
indent = " " * self.get_header_length(record)
head, *trailing = super().format(record).splitlines(True)
return head + "".join(indent + line for line in trailing)
def graphql_query(client: GraphqlClient, query, variables: Optional[Dict] = None):
if variables is None:
variables = dict()
logger.debug(f"running GraphQL query:\n{query}\nwith variables:\n{variables}")
data = client.execute(query, variables=variables)
logger.debug(f"received data from GraphQL:\n{pformat(data)}")
return data
def build_vm():
stderr_formatter = logging.StreamHandler(None)
stderr_formatter.setFormatter(
MultiLineFormatter("%(levelname)s:%(name)s:%(message)s")
)
logging.basicConfig(
level=logging.DEBUG,
handlers=[stderr_formatter],
)
parser = build_argparser()
parser.add_argument("vm_name", type=str, help="The name of the VM to build")
parser.add_argument(
"--base_image",
type=Path,
help="The path to the cloud-init image to use. Default: 'debian13-cloud-init.qcow2'",
default=Path("debian13-cloud-init.qcow2"),
)
parser.add_argument(
"--image_dir",
type=Path,
help="The location where the image should be stored.",
default=Path("/var/lib/libvirt/images/"),
)
parser.add_argument(
"--network-bridge",
type=str,
help="Name of the bridge the VM should be attached to",
default="br-libvirt0",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Query NetBox, build the cloud-init files but don't actually build the image",
)
parser.add_argument(
"--overwrite", action="store_true", help="overwrite the image, if it exists"
)
parser.add_argument(
"--username", type=str, help="name of the user, Default: jo", default="jo"
)
args = parser.parse_args()
endpoint, headers = resolve_graphql_config(parser.parse_args())
headers.update({
"Content-Type": "application/json",
"Accept": "application/json",
})
client = GraphqlClient(endpoint=endpoint, headers=headers)
password = getpass(f"Password for user {args.username}:")
data = graphql_query(client, vm_query)["data"]
vm = data["vms"][0]
cpu_cores = int(float(vm["vcpus"]))
memory_size_mbytes = int(float(vm["memory"]))
disk_size = f"{int(vm['disk'])}M"
hostname = args.vm_name
assert len(vm["interfaces"]) == 1, (
"VMs can have at most 1 interface for automatic installation"
)
interface = vm["interfaces"][0]
primary_interface = interface["name"]
query = prefix_query
logger.debug(query)
addresses = list()
gateways = list()
# find prefix containing the address:
for address in interface["ip_addresses"]:
interface_addr = ip_interface(address["address"])
addresses.append(interface_addr)
prefixes = graphql_query(
client,
query,
variables={
"address": str(interface_addr.ip),
"prefix_length": str(interface_addr.network.prefixlen),
"vrf": "null",
},
)["data"]
if len(prefixes["prefixes"]) >= 1:
logger.error("Expected at most one prefix containing the IP address")
if len(prefixes["prefixes"]) == 0:
logger.info("No matching prefix found.")
prefix = prefixes["prefixes"][0]
gateway_id = prefix["custom_fields"]["gateway"]
if gateway_id is not None:
query = address_query
gateway = graphql_query(client, query, variables={"id": gateway_id})[
"data"
]["ip_address"]["address"]
gateway_address = ip_interface(gateway).ip
logger.debug(
f"found gateway {gateway_address} for address {interface_addr}"
)
gateways.append(gateway_address)
disk_path = create_image(
args.base_image, args.image_dir, f"{hostname}.qcow2", disk_size, args.overwrite
)
cloud_init_tempdir = TemporaryDirectory(delete=False)
cloud_init_tempdir_path = Path(cloud_init_tempdir.name)
network_config = build_network_config(
addresses,
gateways,
nameservers,
cloud_init_tempdir_path,
default_interface_name=primary_interface,
)
user_data = build_user_data(
username=args.username,
password=password,
target_dir=cloud_init_tempdir_path,
)
meta_data = cloud_init_tempdir_path / "meta-data"
meta_data.touch()
if not args.dry_run:
install_image(
args.vm_name,
memory_size_mbytes=memory_size_mbytes,
disk_path=disk_path,
user_data=user_data,
network_config=network_config,
meta_data=meta_data,
network_bridge=args.network_bridge,
num_cpu_cores=cpu_cores,
)
def build_network_config(
addresses: List[IPvAnyInterface],
default_gateways: List[IPvAnyAddress],
nameservers: List[IPvAnyAddress],
target_dir: Path,
default_interface_name: str = "enp1s0",
) -> Path:
"""Builds the network-config file as a in *target_dir* and returns the Path to it."""
id0 = {
"match": {"name": default_interface_name},
"addresses": [str(addr) for addr in addresses],
"routes": [
{"to": "default", "via": str(gateway)} for gateway in default_gateways
],
"nameservers": {"addresses": [str(nameserver) for nameserver in nameservers]},
}
network_config = {
"network": {
"version": 2,
"ethernets": {"id0": id0},
}
}
logger.debug(f"Network Config is:\n{pformat(network_config)}")
target_path = target_dir / "network-config"
with target_path.open("w+") as f:
dump(network_config, f)
return target_path
def password_hash(password: str) -> str:
"""
Calculates a SHA512 hash of the password
"""
pw_hash = run(
["mkpasswd", "-m", "sha-512", "-s"],
input=password,
text=True,
capture_output=True,
)
if pw_hash.returncode != 0:
raise Exception(
f"Could not calculate password hash, non-zero return code: '{pw_hash.returncode}'.\nstdout: '{pw_hash.stdout}'\nstderr: '{pw_hash.stderr}'"
)
return pw_hash.stdout
def build_user_data(username: str, password: str, target_dir: Path) -> Path:
"""
Builds the user-data file in *target_dir* and returns a Path to it.
"""
users = [
{
"name": username,
"hashed_passwd": password_hash(password),
"lock_passwd": False,
"shell": "/bin/bash",
"sudo": "ALL=(ALL) ALL",
}
]
power_state = {
"mode": "poweroff",
"message": "finished cloud init install, powering off",
}
keyboard = {"layout": "de", "model": "pc105"}
user_data = {"users": users, "power_state": power_state, "keyboard": keyboard}
target_path = target_dir / "user-data"
with target_path.open("w+") as f:
f.write("#cloud-config\n")
dump(user_data, f)
return target_path
def create_image(
base_image: Path, destination: Path, name: str, size: str, overwrite: bool = False
) -> Path:
"""
Copies the *base_image* to *destination* with the name *name* and resizes it to *size*
Returns the final path of the image.
"""
assert base_image.is_file(), f"base_image '{base_image}' is not a file"
assert destination.is_dir(), f"destination '{destination}' is not a directory"
target = destination / name
if not overwrite:
assert not target.exists(), f"target file '{target}' already exists"
shutil.copyfile(base_image, target)
run(["qemu-img", "resize", str(target), size], check=True)
return target
def install_image(
hostname: str,
memory_size_mbytes: int,
disk_path: Path,
user_data: Path,
meta_data: Path,
network_config: Path,
network_bridge: str,
num_cpu_cores: int,
):
# virt-install --name $HOSTNAME --memory 4000 --noreboot \
# --os-variant detect=on,name=debian13 \
# --disk="$(pwd)/${HOSTNAME}.qcow2" \
# --cloud-init user-data="$(pwd)/user-data,meta-data=$(pwd)/meta-data,network-config=$(pwd)/network-config"
command = [
"virt-install",
"--name",
hostname,
"--connect",
"qemu:///system",
"--memory",
str(memory_size_mbytes),
"--noreboot",
"--noautoconsole",
"--os-variant",
"name=debian13",
f"--disk='{disk_path}'",
"--cloud-init",
f"user-data={user_data},meta-data={meta_data},network-config={network_config}",
"--vcpus",
f"{num_cpu_cores}",
]
if network_bridge is not None:
command.extend([
"--network",
f"bridge={network_bridge}",
])
logger.info(" ".join(command))
result = run(command, capture_output=True, env=dict(), timeout=60, text=True)
logger.info(result.returncode)
logger.info(result.stdout)
logger.info(result.stderr)

View file

@ -0,0 +1,6 @@
query IPAddresses($id: ID!){
ip_address(id: $id){
address
id
}
}

View file

@ -0,0 +1,8 @@
query Prefixes($address: String, $prefix_length: [String!], $vrf: [String!]) {
prefixes: prefix_list(
filters: {contains: $address, AND: {mask_length: $prefix_length, AND: {vrf: $vrf}}}
) {
prefix
custom_fields
}
}

View file

@ -0,0 +1,19 @@
query vms {
vms: virtual_machine_list(filters: {name: {exact: "test"}}) {
name
id
vcpus
disk
memory
interfaces {
name
ip_addresses {
id
vrf {
id
}
address
}
}
}
}