import logging import sys from pathlib import Path from dataclasses import dataclass, field from typing import List, Tuple, Set from urllib.parse import urljoin import requests from bs4 import BeautifulSoup from pathvalidate import sanitize_filepath from .utils import ContentDisposition, heading_sanitization BASE_URL = "https://ilias3.uni-stuttgart.de" root = logging.getLogger() root.setLevel(logging.DEBUG) handler = logging.StreamHandler(sys.stdout) handler.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) root.addHandler(handler) class NoIliasFilename(Exception): pass class FileDownloadError(Exception): pass class LoginFailed(Exception): pass class RequestError(Exception): pass class NoContentType(Exception): pass # Fehler nach oben durchreichen # Seite # seite holen # seite parsen - pro Link # - Namen suchen # - Link suchen # - Neues Crawl-Objekt erstellen # rekursiv sich mit den Links beschäftigen # Wenn seite != HTML # - Daten aus HEAD mit Datei im System abgleichen # - wenn nicht vorhanden oder unterschiedlich runterladen und speichern @dataclass class CrawlObject(): path: Path url: str session: requests.Session head: requests.Response = None items: List["CrawlObject"] = field(default_factory=list) discovered: Set[str] = field(default_factory=set) def request_head(self) -> requests.Response: return self.request(method="HEAD") def request_get(self, **kwargs) -> requests.Response: return self.request(method="GET", **kwargs) def request(self, method="GET", **kwargs) -> requests.Response: response = self.session.request(method=method, url=self.url, **kwargs) if response.status_code != requests.codes.ok: raise RequestError(f"Error getting '{ method }' for { self.url }", response) return response def parse(self) -> None: pass # self.get_content def process(self): if self.head is None: self.head = self.request_head() content_type = self.head.headers.get("Content-Type", None) if content_type is None: raise NoContentType(f"No Content-Type for { self.url }") typestr = content_type.split(";")[0] logging.debug(f"Content-Type is '{ typestr }'.") if typestr == "text/html": self.parse() self.process_items() else: self.improve_name() if self.update_requiered(): self.save() def improve_name(self): """ This method reads the header and tries to improve the filename """ disposition = self.head.headers.get("Content-Disposition") if disposition is not None: disposition_obj = ContentDisposition.from_str(disposition) if disposition_obj.filename is not None: logging.info(f"Improving filename from '{ self.path.name }' to '{ disposition_obj.filename }'") self.path = self.path.parent / disposition_obj.filename # self.path = Path(sanitize_filepath(self.path)) def process_items(self) -> None: subitems = list() self.page_request = self.request_get() self.html = BeautifulSoup(self.page_request.text, "html.parser") content = self.html.find(id="ilContentContainer") if content is None: logging.info("no ilias content block found, nothing to do.") else: for block in content.find_all("div", class_="ilContainerBlock"): header = block.find("div", class_="ilContainerBlockHeader") heading = header.find("h2").text items = block.find_all("div", class_="il_ContainerListItem") for item in items: # logging.debug(item.prettify()) link = item.find("a") if link is None: continue name = sanitize_filepath(link.text) url = link.attrs["href"] url = urljoin(self.url, url) if url in self.discovered: logging.info(f"{ url } already discovered, skipping it") continue self.discovered.add(url) subitems.append(CrawlObject(self.path / heading_sanitization(heading) / name, url, self.session, discovered=self.discovered)) # 4 column video download like used by Introduction to Modern Cryptography for block in content.find_all("tr"): items = block.find_all("td") if len(items) == 4: _, name, _, download = items link_element = download.find("a") name_element = name.find("a") if link_element is not None and name_element is not None: link = link_element.attrs["href"] name = sanitize_filepath(name_element.text + ".mp4") # rough estimate that all files are mp4 url = urljoin(self.url, link) path = self.path / name if url in self.discovered: logging.info(f"{ url } already discovered, skipping it") continue self.discovered.add(url) subitems.append(CrawlObject(path, url, self.session, discovered=self.discovered)) # download things from exercise sections exercise_div = content.find("div", class_="ilExcOverview") if exercise_div is not None: logging.debug(exercise_div.prettify()) containers = exercise_div.find_all("div", class_="il_VAccordionInnerContainer") logging.info(f"found { len(containers) } containers.") for block in containers: header = block.find("span", class_="ilAssignmentHeader") logging.debug(header.prettify()) if header is None: logging.warning(f"Found an assignment without a header in '{ self.path }', '{ self.url }'") continue header_text = header.text forms = block.find_all("div", class_="form-group") for form in forms: logging.debug(form.prettify()) divs = form.find_all("div") if len(divs) == 2: name_div, link_div = divs else: logging.info("number of divs in formgroup != 2, dont know what to do.") continue name = sanitize_filepath(name_div.text) link = link_div.find("a") if link is None: continue link = link.attrs["href"] url = urljoin(self.url, link) if url in self.discovered: logging.info(f"{ url } already discovered, skipping it") continue self.discovered.add(url) subitems.append(CrawlObject(self.path / header_text / name, url, self.session, discovered=self.discovered)) # # contents for videos with chapters # menu_div = self.html.find("div", class_="il-mainbar-slates") # if menu_div is not None: # # blocks = menu_div.find_all("li") # logging.info(f"found { len(blocks) } blocks in menu") # for block in blocks: # logging.debug(block.prettify()) # a_element = block.find("a") # if a_element is None: # continue # href = a_element.attrs["href"] # logging.info(href) # find the il-card thumbnail videos for item in content.find_all("div", class_=["il-card", "thumbnail"]): captions = item.find_all("div", class_="caption") if captions is None: logging.debug("no captions found") continue if len(captions) < 3: logging.debug(f"to few captions found: { len(captions) }") continue heading = captions[0].find("a") if heading is None: logging.debug(f"No found in { captions.prettify() }") continue name = heading.text logging.debug(f"Found Heading: { name }") name = sanitize_filepath(name + ".mp4") link = captions[2].find("a") if link is None: logging.debug("No found in { link.prettify() }") continue url = link.attrs["href"] if url in self.discovered: logging.info(f"{ url } already discovered, skipping it") continue self.discovered.add(url) subitems.append(CrawlObject(self.path / name, url, self.session, discovered=self.discovered)) for item in subitems: item.process() def save(self) -> None: """ makes a GET request to the URL of this objects and saves it to the path of this object. If needed the directories are created. """ response = self.request_get(stream=True) if not self.path.parent.is_dir(): self.path.parent.mkdir(parents=True, exist_ok=True) logging.info(f"Writing to file '{ self.path }'.") with open(self.path, "wb+") as f: for chunk in response.iter_content(chunk_size=10*(2**10)): f.write(chunk) def update_requiered(self) -> bool: """ Tries to compare the file from the URL to the file on the filesystem. Returns True if the file needs an update. """ if not self.path.is_file(): logging.info(f"Update required for '{ self.path }' because the file does not exist.") return True logging.info(f"No update required for '{ self.path }'.") return False @dataclass class Synchronizer: username: str password: str directories: List[Tuple[str, str]] = field(default_factory=list) def login(self): self._session = requests.Session() # fill the session with the correct cookies login_page = self._session.get(f"{ BASE_URL }/login.php?target=&client_id=Uni_Stuttgart&cmd=force_login&lang=en") login_page = BeautifulSoup(login_page.content) login_form = login_page.find("form", attrs={"name": "formlogin"}) if login_form is None: logging.error("Login failed, login form not found!") exit(1) logging.debug(login_form) login_url = login_form.attrs.get("action") if login_url is None: logging.error("Could not find the action URL in the login form!") exit(1) logging.debug(f"Login URL: {login_url}") login_data = { "username": self.username, "password": self.password, "cmd[doStandardAuthentication]": "Anmelden" } # do the actual login self._session.post(f"{ BASE_URL }/{ login_url }", data=login_data) def synchronize(self): self.login() for path, url in self.directories: obj = CrawlObject(path, url, self._session) obj.process()