import logging import sys from pathlib import Path from dataclasses import dataclass, field from typing import List, Tuple, Set from urllib.parse import urljoin import requests from bs4 import BeautifulSoup from .utils import ContentDisposition, heading_sanitization BASE_URL = "https://ilias3.uni-stuttgart.de" root = logging.getLogger() root.setLevel(logging.DEBUG) handler = logging.StreamHandler(sys.stdout) handler.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) root.addHandler(handler) class NoIliasFilename(Exception): pass class FileDownloadError(Exception): pass class LoginFailed(Exception): pass class RequestError(Exception): pass class NoContentType(Exception): pass # Fehler nach oben durchreichen # Seite # seite holen # seite parsen - pro Link # - Namen suchen # - Link suchen # - Neues Crawl-Objekt erstellen # rekursiv sich mit den Links beschäftigen # Wenn seite != HTML # - Daten aus HEAD mit Datei im System abgleichen # - wenn nicht vorhanden oder unterschiedlich runterladen und speichern @dataclass class CrawlObject(): path: Path url: str session: requests.Session head: requests.Response = None items: List["CrawlObject"] = field(default_factory=list) discovered: Set[str] = field(default_factory=set) def request_head(self) -> requests.Response: return self.request(method="HEAD") def request_get(self) -> requests.Response: return self.request(method="GET") def request(self, method="GET") -> requests.Response: response = self.session.request(method=method, url=self.url) if response.status_code != requests.codes.ok: raise RequestError(f"Error getting '{ method }' for { self.url }", response) return response def parse(self) -> None: pass # self.get_content def process(self): if self.head is None: self.head = self.request_head() content_type = self.head.headers.get("Content-Type", None) if content_type is None: raise NoContentType(f"No Content-Type for { self.url }") typestr = content_type.split(";")[0] logging.debug(f"Content-Type is '{ typestr }'.") if typestr == "text/html": self.parse() self.process_items() else: self.improve_name() if self.update_requiered(): self.save() def improve_name(self): """ This method reads the header and tries to improve the filename """ disposition = self.head.headers.get("Content-Disposition") if disposition is not None: disposition_obj = ContentDisposition.from_str(disposition) if disposition_obj.filename is not None: logging.info(f"Improving filename from '{ self.path.name }' to '{ disposition_obj.filename }'") self.path = self.path.parent / disposition_obj.filename def process_items(self) -> None: subitems = list() self.page_request = self.request_get() self.html = BeautifulSoup(self.page_request.text, "html.parser") content = self.html.find(id="ilContentContainer") if content is None: logging.info("no ilias content block found, nothing to do.") else: for block in content.find_all("div", class_="ilContainerBlock"): header = block.find("div", class_="ilContainerBlockHeader") heading = header.find("h2").text items = block.find_all("div", class_="il_ContainerListItem") for item in items: #logging.debug(item.prettify()) link = item.find("a") if link is None: continue name = link.text url = link.attrs["href"] url = urljoin(self.url, url) if url in self.discovered: continue logging.info(f"{ url } already discovered, skipping it") self.discovered.add(url) subitems.append(CrawlObject(self.path / heading_sanitization(heading) / name, url, self.session, discovered=self.discovered)) # 4 column video download like used by Introduction to Modern Cryptography for block in content.find_all("tr"): items = block.find_all("td") if len(items) == 4: _, name, _, download = items link_element = download.find("a") name_element = name.find("a") if link_element is not None and name_element is not None: link = link_element.attrs["href"] name = name_element.text + ".mp4" # rough estimate that all files are mp4 url = urljoin(self.url, link) path = self.path / name if url in self.discovered: logging.info(f"{ url } already discovered, skipping it") continue self.discovered.add(url) subitems.append(CrawlObject(path, url, self.session, discovered=self.discovered)) # download things from exercise sections exercise_div = content.find("div", class_="ilExcOverview") if exercise_div is not None: logging.debug(exercise_div.prettify()) containers = exercise_div.find_all("div", class_="il_VAccordionInnerContainer") logging.info(f"found { len(containers) } containers.") for block in containers: header = block.find("span", class_="ilAssignmentHeader") logging.debug(header.prettify()) if header is None: logging.warning(f"Found an assignment without a header in '{ self.path }', '{ self.url }'") continue header_text = header.text forms = block.find_all("div", class_="form-group") for form in forms: logging.debug(form.prettify()) divs = form.find_all("div") if len(divs) == 2: name_div, link_div = divs else: logging.info("number of divs in formgroup != 2, dont know what to do.") continue name = name_div.text link = link_div.find("a") if link is None: continue link = link.attrs["href"] url = urljoin(self.url, link) if url in self.discovered: logging.info(f"{ url } already discovered, skipping it") continue self.discovered.add(url) subitems.append(CrawlObject(self.path / header_text / name, url, self.session, discovered=self.discovered)) # # contents for videos with chapters # menu_div = self.html.find("div", class_="il-mainbar-slates") # if menu_div is not None: # # blocks = menu_div.find_all("li") # logging.info(f"found { len(blocks) } blocks in menu") # for block in blocks: # logging.debug(block.prettify()) # a_element = block.find("a") # if a_element is None: # continue # href = a_element.attrs["href"] # logging.info(href) for item in subitems: item.process() def save(self) -> None: """ makes a GET request to the URL of this objects and saves it to the path of this object. If needed the directories are created. """ content = self.request_get().content if not self.path.parent.is_dir(): self.path.parent.mkdir(parents=True, exist_ok=True) logging.info(f"Writing to file '{ self.path }'.") with open(self.path, "wb+") as f: f.write(content) def update_requiered(self) -> bool: """ Tries to compare the file from the URL to the file on the filesystem. Returns True if the file needs an update. """ if not self.path.is_file(): logging.info(f"Update required for '{ self.path }' because the file does not exist.") return True logging.info(f"No update required for '{ self.path }'.") return False @dataclass class Synchronizer: username: str password: str directories: List[Tuple[str, str]] = field(default_factory=list) def login(self): self._session = requests.Session() # fill the session with the correct cookies self._session.get(f"{ BASE_URL }/login.php?target=&client_id=Uni_Stuttgart&cmd=force_login&lang=en") login_data = { "username": self.username, "password": self.password, "cmd[doStandardAuthentication]": "Anmelden" } # do the actual login self._session.post(f"{ BASE_URL }/ilias.php?lang=en&client_id=Uni_Stuttgart&cmd=post&cmdClass=ilstartupgui&cmdNode=123&baseClass=ilStartUpGUI&rtoken=", data=login_data) def synchronize(self): self.login() for path, url in self.directories: obj = CrawlObject(path, url, self._session) obj.process()