ilias-sync2/ilias_sync2/main.py
Johannes Erwerle 0849c0363a squashed all commits
added parsing for mathematical foundations recordings

added filepath sanitization

worked around the changing login URL

added streaming of requests for large downloads

added gitignore

added example
2022-11-03 15:18:14 +01:00

348 lines
12 KiB
Python

import logging
import sys
from pathlib import Path
from dataclasses import dataclass, field
from typing import List, Tuple, Set
from urllib.parse import urljoin
import requests
from bs4 import BeautifulSoup
from pathvalidate import sanitize_filepath
from .utils import ContentDisposition, heading_sanitization
BASE_URL = "https://ilias3.uni-stuttgart.de"
root = logging.getLogger()
root.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
root.addHandler(handler)
class NoIliasFilename(Exception):
pass
class FileDownloadError(Exception):
pass
class LoginFailed(Exception):
pass
class RequestError(Exception):
pass
class NoContentType(Exception):
pass
# Fehler nach oben durchreichen
# Seite
# seite holen
# seite parsen - pro Link
# - Namen suchen
# - Link suchen
# - Neues Crawl-Objekt erstellen
# rekursiv sich mit den Links beschäftigen
# Wenn seite != HTML
# - Daten aus HEAD mit Datei im System abgleichen
# - wenn nicht vorhanden oder unterschiedlich runterladen und speichern
@dataclass
class CrawlObject():
path: Path
url: str
session: requests.Session
head: requests.Response = None
items: List["CrawlObject"] = field(default_factory=list)
discovered: Set[str] = field(default_factory=set)
def request_head(self) -> requests.Response:
return self.request(method="HEAD")
def request_get(self, **kwargs) -> requests.Response:
return self.request(method="GET", **kwargs)
def request(self, method="GET", **kwargs) -> requests.Response:
response = self.session.request(method=method, url=self.url, **kwargs)
if response.status_code != requests.codes.ok:
raise RequestError(f"Error getting '{ method }' for { self.url }",
response)
return response
def parse(self) -> None:
pass
# self.get_content
def process(self):
if self.head is None:
self.head = self.request_head()
content_type = self.head.headers.get("Content-Type", None)
if content_type is None:
raise NoContentType(f"No Content-Type for { self.url }")
typestr = content_type.split(";")[0]
logging.debug(f"Content-Type is '{ typestr }'.")
if typestr == "text/html":
self.parse()
self.process_items()
else:
self.improve_name()
if self.update_requiered():
self.save()
def improve_name(self):
"""
This method reads the header and tries to improve the filename
"""
disposition = self.head.headers.get("Content-Disposition")
if disposition is not None:
disposition_obj = ContentDisposition.from_str(disposition)
if disposition_obj.filename is not None:
logging.info(f"Improving filename from '{ self.path.name }' to '{ disposition_obj.filename }'")
self.path = self.path.parent / disposition_obj.filename
# self.path = Path(sanitize_filepath(self.path))
def process_items(self) -> None:
subitems = list()
self.page_request = self.request_get()
self.html = BeautifulSoup(self.page_request.text, "html.parser")
content = self.html.find(id="ilContentContainer")
if content is None:
logging.info("no ilias content block found, nothing to do.")
else:
for block in content.find_all("div", class_="ilContainerBlock"):
header = block.find("div", class_="ilContainerBlockHeader")
heading = header.find("h2").text
items = block.find_all("div", class_="il_ContainerListItem")
for item in items:
# logging.debug(item.prettify())
link = item.find("a")
if link is None:
continue
name = sanitize_filepath(link.text)
url = link.attrs["href"]
url = urljoin(self.url, url)
if url in self.discovered:
logging.info(f"{ url } already discovered, skipping it")
continue
self.discovered.add(url)
subitems.append(CrawlObject(self.path / heading_sanitization(heading) / name,
url,
self.session, discovered=self.discovered))
# 4 column video download like used by Introduction to Modern Cryptography
for block in content.find_all("tr"):
items = block.find_all("td")
if len(items) == 4:
_, name, _, download = items
link_element = download.find("a")
name_element = name.find("a")
if link_element is not None and name_element is not None:
link = link_element.attrs["href"]
name = sanitize_filepath(name_element.text + ".mp4") # rough estimate that all files are mp4
url = urljoin(self.url, link)
path = self.path / name
if url in self.discovered:
logging.info(f"{ url } already discovered, skipping it")
continue
self.discovered.add(url)
subitems.append(CrawlObject(path, url, self.session, discovered=self.discovered))
# download things from exercise sections
exercise_div = content.find("div", class_="ilExcOverview")
if exercise_div is not None:
logging.debug(exercise_div.prettify())
containers = exercise_div.find_all("div", class_="il_VAccordionInnerContainer")
logging.info(f"found { len(containers) } containers.")
for block in containers:
header = block.find("span", class_="ilAssignmentHeader")
logging.debug(header.prettify())
if header is None:
logging.warning(f"Found an assignment without a header in '{ self.path }', '{ self.url }'")
continue
header_text = header.text
forms = block.find_all("div", class_="form-group")
for form in forms:
logging.debug(form.prettify())
divs = form.find_all("div")
if len(divs) == 2:
name_div, link_div = divs
else:
logging.info("number of divs in formgroup != 2, dont know what to do.")
continue
name = sanitize_filepath(name_div.text)
link = link_div.find("a")
if link is None:
continue
link = link.attrs["href"]
url = urljoin(self.url, link)
if url in self.discovered:
logging.info(f"{ url } already discovered, skipping it")
continue
self.discovered.add(url)
subitems.append(CrawlObject(self.path / header_text / name, url, self.session, discovered=self.discovered))
# # contents for videos with chapters
# menu_div = self.html.find("div", class_="il-mainbar-slates")
# if menu_div is not None:
#
# blocks = menu_div.find_all("li")
# logging.info(f"found { len(blocks) } blocks in menu")
# for block in blocks:
# logging.debug(block.prettify())
# a_element = block.find("a")
# if a_element is None:
# continue
# href = a_element.attrs["href"]
# logging.info(href)
# find the il-card thumbnail videos
for item in content.find_all("div", class_=["il-card", "thumbnail"]):
captions = item.find_all("div", class_="caption")
if captions is None:
logging.debug("no captions found")
continue
if len(captions) < 3:
logging.debug(f"to few captions found: { len(captions) }")
continue
heading = captions[0].find("a")
if heading is None:
logging.debug(f"No <a> found in { captions.prettify() }")
continue
name = heading.text
logging.debug(f"Found Heading: { name }")
name = sanitize_filepath(name + ".mp4")
link = captions[2].find("a")
if link is None:
logging.debug("No <a> found in { link.prettify() }")
continue
url = link.attrs["href"]
if url in self.discovered:
logging.info(f"{ url } already discovered, skipping it")
continue
self.discovered.add(url)
subitems.append(CrawlObject(self.path / name, url, self.session, discovered=self.discovered))
for item in subitems:
item.process()
def save(self) -> None:
"""
makes a GET request to the URL of this objects and saves it to the
path of this object.
If needed the directories are created.
"""
response = self.request_get(stream=True)
if not self.path.parent.is_dir():
self.path.parent.mkdir(parents=True, exist_ok=True)
logging.info(f"Writing to file '{ self.path }'.")
with open(self.path, "wb+") as f:
for chunk in response.iter_content(chunk_size=10*(2**10)):
f.write(chunk)
def update_requiered(self) -> bool:
"""
Tries to compare the file from the URL to the file on the filesystem.
Returns True if the file needs an update.
"""
if not self.path.is_file():
logging.info(f"Update required for '{ self.path }' because the file does not exist.")
return True
logging.info(f"No update required for '{ self.path }'.")
return False
@dataclass
class Synchronizer:
username: str
password: str
directories: List[Tuple[str, str]] = field(default_factory=list)
def login(self):
self._session = requests.Session()
# fill the session with the correct cookies
login_page = self._session.get(f"{ BASE_URL }/login.php?target=&client_id=Uni_Stuttgart&cmd=force_login&lang=en")
login_page = BeautifulSoup(login_page.content)
login_form = login_page.find("form", attrs={"name": "formlogin"})
if login_form is None:
logging.error("Login failed, login form not found!")
exit(1)
logging.debug(login_form)
login_url = login_form.attrs.get("action")
if login_url is None:
logging.error("Could not find the action URL in the login form!")
exit(1)
logging.debug(f"Login URL: {login_url}")
login_data = {
"username": self.username,
"password": self.password,
"cmd[doStandardAuthentication]": "Anmelden"
}
# do the actual login
self._session.post(f"{ BASE_URL }/{ login_url }",
data=login_data)
def synchronize(self):
self.login()
for path, url in self.directories:
obj = CrawlObject(path, url, self._session)
obj.process()