commit 4742e417f794704ea13d6d88a96517e45eacea29 Author: Johannes Erwerle Date: Tue Mar 29 15:13:31 2022 +0200 initial commit diff --git a/README.md b/README.md new file mode 100644 index 0000000..d5f6050 --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +# Ilias Sync diff --git a/ilias_sync2/__init__.py b/ilias_sync2/__init__.py new file mode 100644 index 0000000..96be6e8 --- /dev/null +++ b/ilias_sync2/__init__.py @@ -0,0 +1 @@ +from .main import CrawlObject, Synchronizer diff --git a/ilias_sync2/main.py b/ilias_sync2/main.py new file mode 100644 index 0000000..d402567 --- /dev/null +++ b/ilias_sync2/main.py @@ -0,0 +1,282 @@ +import logging +import sys +from pathlib import Path +from dataclasses import dataclass, field +from typing import List, Tuple, Set +from urllib.parse import urljoin + +import requests +from bs4 import BeautifulSoup + +from .utils import ContentDisposition, heading_sanitization + +BASE_URL = "https://ilias3.uni-stuttgart.de" + + +root = logging.getLogger() +root.setLevel(logging.DEBUG) + +handler = logging.StreamHandler(sys.stdout) +handler.setLevel(logging.DEBUG) +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +handler.setFormatter(formatter) +root.addHandler(handler) + + +class NoIliasFilename(Exception): + pass + + +class FileDownloadError(Exception): + pass + + +class LoginFailed(Exception): + pass + + +class RequestError(Exception): + pass + + +class NoContentType(Exception): + pass + +# Fehler nach oben durchreichen +# Seite +# seite holen +# seite parsen - pro Link +# - Namen suchen +# - Link suchen +# - Neues Crawl-Objekt erstellen +# rekursiv sich mit den Links beschäftigen +# Wenn seite != HTML +# - Daten aus HEAD mit Datei im System abgleichen +# - wenn nicht vorhanden oder unterschiedlich runterladen und speichern + + +@dataclass +class CrawlObject(): + + path: Path + url: str + session: requests.Session + head: requests.Response = None + items: List["CrawlObject"] = field(default_factory=list) + discovered: Set[str] = field(default_factory=set) + + def request_head(self) -> requests.Response: + return self.request(method="HEAD") + + def request_get(self) -> requests.Response: + return self.request(method="GET") + + def request(self, method="GET") -> requests.Response: + response = self.session.request(method=method, url=self.url) + if response.status_code != requests.codes.ok: + raise RequestError(f"Error getting '{ method }' for { self.url }", + response) + + return response + + def parse(self) -> None: + pass + # self.get_content + + def process(self): + if self.head is None: + self.head = self.request_head() + content_type = self.head.headers.get("Content-Type", None) + + if content_type is None: + raise NoContentType(f"No Content-Type for { self.url }") + + typestr = content_type.split(";")[0] + logging.debug(f"Content-Type is '{ typestr }'.") + if typestr == "text/html": + self.parse() + self.process_items() + else: + self.improve_name() + if self.update_requiered(): + self.save() + + def improve_name(self): + """ + This method reads the header and tries to improve the filename + """ + + disposition = self.head.headers.get("Content-Disposition") + if disposition is not None: + disposition_obj = ContentDisposition.from_str(disposition) + + if disposition_obj.filename is not None: + logging.info(f"Improving filename from '{ self.path.name }' to '{ disposition_obj.filename }'") + self.path = self.path.parent / disposition_obj.filename + + def process_items(self) -> None: + + subitems = list() + self.page_request = self.request_get() + self.html = BeautifulSoup(self.page_request.text, "html.parser") + + content = self.html.find(id="ilContentContainer") + + if content is None: + logging.info("no ilias content block found, nothing to do.") + else: + for block in content.find_all("div", class_="ilContainerBlock"): + header = block.find("div", class_="ilContainerBlockHeader") + heading = header.find("h2").text + + items = block.find_all("div", class_="il_ContainerListItem") + for item in items: + #logging.debug(item.prettify()) + link = item.find("a") + if link is None: + continue + name = link.text + url = link.attrs["href"] + url = urljoin(self.url, url) + + if url in self.discovered: + continue + logging.info(f"{ url } already discovered, skipping it") + self.discovered.add(url) + subitems.append(CrawlObject(self.path / heading_sanitization(heading) / name, + url, + self.session, discovered=self.discovered)) + + # 4 column video download like used by Introduction to Modern Cryptography + for block in content.find_all("tr"): + items = block.find_all("td") + if len(items) == 4: + _, name, _, download = items + + link = download.find("a").attrs["href"] + name = name.find("a").text + ".mp4" # rough estimate that all files are mp4 + url = urljoin(self.url, link) + + path = self.path / name + + if url in self.discovered: + logging.info(f"{ url } already discovered, skipping it") + continue + self.discovered.add(url) + subitems.append(CrawlObject(path, url, self.session, discovered=self.discovered)) + + # download things from exercise sections + exercise_div = content.find("div", class_="ilExcOverview") + if exercise_div is not None: + logging.debug(exercise_div.prettify()) + containers = exercise_div.find_all("div", class_="il_VAccordionInnerContainer") + logging.info(f"found { len(containers) } containers.") + for block in containers: + header = block.find("span", class_="ilAssignmentHeader") + logging.debug(header.prettify()) + if header is None: + logging.warning(f"Found an assignment without a header in '{ self.path }', '{ self.url }'") + continue + header_text = header.text + + forms = block.find_all("div", class_="form-group") + for form in forms: + logging.debug(form.prettify()) + divs = form.find_all("div") + if len(divs) == 2: + name_div, link_div = divs + else: + logging.info("number of divs in formgroup != 2, dont know what to do.") + continue + + name = name_div.text + link = link_div.find("a") + if link is None: + continue + link = link.attrs["href"] + url = urljoin(self.url, link) + + if url in self.discovered: + logging.info(f"{ url } already discovered, skipping it") + continue + self.discovered.add(url) + subitems.append(CrawlObject(self.path / header_text / name, url, self.session, discovered=self.discovered)) + +# # contents for videos with chapters +# menu_div = self.html.find("div", class_="il-mainbar-slates") +# if menu_div is not None: +# +# blocks = menu_div.find_all("li") +# logging.info(f"found { len(blocks) } blocks in menu") +# for block in blocks: +# logging.debug(block.prettify()) +# a_element = block.find("a") +# if a_element is None: +# continue +# href = a_element.attrs["href"] +# logging.info(href) + + for item in subitems: + item.process() + + def save(self) -> None: + """ + makes a GET request to the URL of this objects and saves it to the + path of this object. + + If needed the directories are created. + """ + + content = self.request_get().content + + if not self.path.parent.is_dir(): + self.path.parent.mkdir(parents=True, exist_ok=True) + + logging.info(f"Writing to file '{ self.path }'.") + with open(self.path, "wb+") as f: + f.write(content) + + def update_requiered(self) -> bool: + """ + Tries to compare the file from the URL to the file on the filesystem. + + Returns True if the file needs an update. + """ + + if not self.path.is_file(): + logging.info(f"Update required for '{ self.path }' because the file does not exist.") + return True + + logging.info(f"No update required for '{ self.path }'.") + return False + + +@dataclass +class Synchronizer: + username: str + password: str + directories: List[Tuple[str, str]] = field(default_factory=list) + + def login(self): + + self._session = requests.Session() + + # fill the session with the correct cookies + self._session.get(f"{ BASE_URL }/login.php?target=&client_id=Uni_Stuttgart&cmd=force_login&lang=en") + + login_data = { + "username": self.username, + "password": self.password, + "cmd[doStandardAuthentication]": "Anmelden" + } + + # do the actual login + self._session.post(f"{ BASE_URL }/ilias.php?lang=en&client_id=Uni_Stuttgart&cmd=post&cmdClass=ilstartupgui&cmdNode=123&baseClass=ilStartUpGUI&rtoken=", data=login_data) + + def synchronize(self): + + self.login() + + for path, url in self.directories: + obj = CrawlObject(path, url, self._session) + obj.process() diff --git a/ilias_sync2/test/__init__.py b/ilias_sync2/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ilias_sync2/test/test_page_parser.py b/ilias_sync2/test/test_page_parser.py new file mode 100644 index 0000000..887e8c8 --- /dev/null +++ b/ilias_sync2/test/test_page_parser.py @@ -0,0 +1,5 @@ +from unittest import TestCase + +class TestGetPageContent(TestCase): + + def test_ diff --git a/ilias_sync2/utils.py b/ilias_sync2/utils.py new file mode 100644 index 0000000..279f4ef --- /dev/null +++ b/ilias_sync2/utils.py @@ -0,0 +1,37 @@ +import re +from typing import Optional +from dataclasses import dataclass + +@dataclass +class ContentDisposition: + + method: str + filename: Optional[str] = None + + @classmethod + def from_str(cls, disposition: str): + + m = re.match("(?P(inline)|(attachment))(;\\s*(filename=\"(?P.+)\"\\s*))?", disposition) + + if m is None: + raise ValueError(f"Error while parsing disposition string '{ disposition }'.") + + d = m.groupdict() + + return cls(d["type"], d.get("filename", None)) + +def heading_sanitization(heading: str) -> str: + """ + Removes some parts of headings and path names. + + Currently these optimizations are done: + - if the heading is "Inhalt" it is replaced by the empty string + + Otherwise the heading is returned as is. + """ + + if heading == "Inhalt": + return "" + + return heading + diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..4bfb8ec --- /dev/null +++ b/setup.py @@ -0,0 +1,19 @@ +from setuptools import setup, find_packages + +with open('README.md') as f: + readme = f.read() + +setup( + name='ilias_sync2', + version="0.0.1", + packages=find_packages(), + url="", + author='Johannes Erwerle', + author_email='jo@swagsapce.org', + description="A library to synchronize the content of ILIAS to a directory", + include_package_data=True, + zip_safe=False, + long_description=readme, + install_requires=["bs4", "requests"], + long_description_content_type="text/markdown", +)