157 lines
4.2 KiB
Python
157 lines
4.2 KiB
Python
from pathlib import Path
|
|
from subprocess import run
|
|
from collections import defaultdict
|
|
from typing import Any
|
|
from logging import getLogger
|
|
import shutil
|
|
|
|
from django.contrib.auth.models import User
|
|
|
|
from .models import BorgRepository
|
|
|
|
logger = getLogger("deployments")
|
|
|
|
# sync users
|
|
|
|
|
|
def get_users(user_prefix: str = "u-") -> dict[str, dict[str, Any]]:
|
|
|
|
passwd = Path("/etc/passwd").read_text()
|
|
output = dict()
|
|
for line in passwd.splitlines():
|
|
name, _, _, _, _, homedir, _ = line.split(":")
|
|
if not name.startswith(user_prefix):
|
|
continue
|
|
|
|
output[name] = {"homedir": Path(homedir)}
|
|
|
|
return output
|
|
|
|
|
|
def cleanup_users(
|
|
usernames: set[str],
|
|
system_users: dict[str, dict[str, Any]],
|
|
dry_run=False,
|
|
):
|
|
"""Cleans up all unused user accounts and directories on the system.
|
|
|
|
usernames is the set of users that should exist.
|
|
system_users contains all the users that exist (with the correct prefix)"""
|
|
|
|
base_dir = Path("/home/")
|
|
|
|
# cleanup users
|
|
for user in system_users.keys():
|
|
if user not in usernames:
|
|
logger.info("Deleting user '%(user)s'.")
|
|
print(f"Deleting user '{user}'.")
|
|
if not dry_run:
|
|
run(["userdel", user])
|
|
pass
|
|
|
|
# cleanup directories
|
|
for dir in base_dir.iterdir():
|
|
if dir.is_dir() and dir.name.startswith("u-") and dir.name not in usernames:
|
|
logger.info("Deleting home directory '%(dir)s'")
|
|
print(f"Deleting home directory '{dir}'")
|
|
if not dry_run:
|
|
shutil.rmtree(dir)
|
|
pass
|
|
|
|
|
|
def create_users(
|
|
usernames: set[str],
|
|
system_users: dict[str, dict[str, Any]],
|
|
dry_run=False,
|
|
):
|
|
|
|
for username in usernames:
|
|
if username not in system_users.keys():
|
|
logger.info("Adding user '%(username)s'")
|
|
print(f"Adding user '{username}'")
|
|
if not dry_run:
|
|
add_user(username)
|
|
|
|
|
|
def add_user(username: str):
|
|
|
|
users = get_users()
|
|
assert username not in users.keys(), "User already exists"
|
|
|
|
run(["useradd", "--no-user-group", "--create-home", username], check=True)
|
|
|
|
|
|
def sync_users(dry_run=False):
|
|
|
|
# gather data
|
|
users = User.objects.all()
|
|
usernames = {f"u-{user.pk}" for user in users}
|
|
system_users = get_users()
|
|
|
|
# clean up old users
|
|
cleanup_users(usernames=usernames, system_users=system_users, dry_run=dry_run)
|
|
|
|
# create the new users
|
|
create_users(usernames=usernames, system_users=system_users, dry_run=dry_run)
|
|
|
|
for user in users:
|
|
username = f"u-{user.pk}"
|
|
sync_repos(
|
|
username=username,
|
|
user=user,
|
|
system_user=system_users[username],
|
|
dry_run=dry_run,
|
|
)
|
|
|
|
|
|
# sync authorized_keys
|
|
def sync_repos(username: str, user: User, system_user: dict[str, Any], dry_run=False):
|
|
|
|
repos = BorgRepository.objects.filter(user=user)
|
|
|
|
repos_by_key = defaultdict(list)
|
|
for repo in repos:
|
|
repos_by_key[repo.key].append(repo)
|
|
|
|
homedir = system_user["homedir"]
|
|
|
|
# create .ssh directory
|
|
ssh_dir = homedir / ".ssh"
|
|
ssh_dir.mkdir(
|
|
mode=0o750,
|
|
exist_ok=True,
|
|
parents=True,
|
|
)
|
|
|
|
# create authorized_keys file
|
|
authorized_keys = ssh_dir / "authorized_keys"
|
|
|
|
commands = []
|
|
for key, repos in repos_by_key.items():
|
|
repo_paths = [
|
|
f"--restrict-to-repository {str(system_user['homedir'] / repo.name)}"
|
|
for repo in repos
|
|
]
|
|
commands.append(
|
|
f"""command="borg serve {" ".join(repo_paths)} --quota=500G",restrict ssh-rsa {key}"""
|
|
)
|
|
|
|
print("\n".join(commands))
|
|
|
|
authorized_keys.write_text("\n".join(commands) + "\n")
|
|
|
|
# remove repositories that do no longer exist
|
|
repo_names = {repo.name for repo in repos}
|
|
print(repo_names)
|
|
for dir in homedir.iterdir():
|
|
print(dir.name)
|
|
if dir.is_dir() and not dir.name.startswith("."):
|
|
if dir.name not in repo_names:
|
|
print(f"removing unused repo '{dir}'")
|
|
shutil.rmtree(dir)
|
|
|
|
# create the repositories
|
|
for repo in repos:
|
|
path = system_user["homedir"] / repo.name
|
|
path.mkdir(mode=0o750, exist_ok=True, parents=True)
|
|
shutil.chown(path, user=username)
|