added parsing for mathematical foundations recordings added filepath sanitization worked around the changing login URL added streaming of requests for large downloads added gitignore added example
348 lines
12 KiB
Python
348 lines
12 KiB
Python
import logging
|
|
import sys
|
|
from pathlib import Path
|
|
from dataclasses import dataclass, field
|
|
from typing import List, Tuple, Set
|
|
from urllib.parse import urljoin
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
from pathvalidate import sanitize_filepath
|
|
|
|
from .utils import ContentDisposition, heading_sanitization
|
|
|
|
BASE_URL = "https://ilias3.uni-stuttgart.de"
|
|
|
|
|
|
root = logging.getLogger()
|
|
root.setLevel(logging.DEBUG)
|
|
|
|
handler = logging.StreamHandler(sys.stdout)
|
|
handler.setLevel(logging.DEBUG)
|
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
handler.setFormatter(formatter)
|
|
root.addHandler(handler)
|
|
|
|
|
|
class NoIliasFilename(Exception):
|
|
pass
|
|
|
|
|
|
class FileDownloadError(Exception):
|
|
pass
|
|
|
|
|
|
class LoginFailed(Exception):
|
|
pass
|
|
|
|
|
|
class RequestError(Exception):
|
|
pass
|
|
|
|
|
|
class NoContentType(Exception):
|
|
pass
|
|
|
|
# Fehler nach oben durchreichen
|
|
# Seite
|
|
# seite holen
|
|
# seite parsen - pro Link
|
|
# - Namen suchen
|
|
# - Link suchen
|
|
# - Neues Crawl-Objekt erstellen
|
|
# rekursiv sich mit den Links beschäftigen
|
|
# Wenn seite != HTML
|
|
# - Daten aus HEAD mit Datei im System abgleichen
|
|
# - wenn nicht vorhanden oder unterschiedlich runterladen und speichern
|
|
|
|
|
|
@dataclass
|
|
class CrawlObject():
|
|
|
|
path: Path
|
|
url: str
|
|
session: requests.Session
|
|
head: requests.Response = None
|
|
items: List["CrawlObject"] = field(default_factory=list)
|
|
discovered: Set[str] = field(default_factory=set)
|
|
|
|
def request_head(self) -> requests.Response:
|
|
return self.request(method="HEAD")
|
|
|
|
def request_get(self, **kwargs) -> requests.Response:
|
|
return self.request(method="GET", **kwargs)
|
|
|
|
def request(self, method="GET", **kwargs) -> requests.Response:
|
|
response = self.session.request(method=method, url=self.url, **kwargs)
|
|
if response.status_code != requests.codes.ok:
|
|
raise RequestError(f"Error getting '{ method }' for { self.url }",
|
|
response)
|
|
|
|
return response
|
|
|
|
def parse(self) -> None:
|
|
pass
|
|
# self.get_content
|
|
|
|
def process(self):
|
|
if self.head is None:
|
|
self.head = self.request_head()
|
|
content_type = self.head.headers.get("Content-Type", None)
|
|
|
|
if content_type is None:
|
|
raise NoContentType(f"No Content-Type for { self.url }")
|
|
|
|
typestr = content_type.split(";")[0]
|
|
logging.debug(f"Content-Type is '{ typestr }'.")
|
|
if typestr == "text/html":
|
|
self.parse()
|
|
self.process_items()
|
|
else:
|
|
self.improve_name()
|
|
if self.update_requiered():
|
|
self.save()
|
|
|
|
def improve_name(self):
|
|
"""
|
|
This method reads the header and tries to improve the filename
|
|
"""
|
|
|
|
disposition = self.head.headers.get("Content-Disposition")
|
|
if disposition is not None:
|
|
disposition_obj = ContentDisposition.from_str(disposition)
|
|
|
|
if disposition_obj.filename is not None:
|
|
logging.info(f"Improving filename from '{ self.path.name }' to '{ disposition_obj.filename }'")
|
|
self.path = self.path.parent / disposition_obj.filename
|
|
|
|
# self.path = Path(sanitize_filepath(self.path))
|
|
|
|
def process_items(self) -> None:
|
|
|
|
subitems = list()
|
|
self.page_request = self.request_get()
|
|
self.html = BeautifulSoup(self.page_request.text, "html.parser")
|
|
|
|
content = self.html.find(id="ilContentContainer")
|
|
|
|
if content is None:
|
|
logging.info("no ilias content block found, nothing to do.")
|
|
else:
|
|
for block in content.find_all("div", class_="ilContainerBlock"):
|
|
header = block.find("div", class_="ilContainerBlockHeader")
|
|
heading = header.find("h2").text
|
|
|
|
items = block.find_all("div", class_="il_ContainerListItem")
|
|
for item in items:
|
|
# logging.debug(item.prettify())
|
|
link = item.find("a")
|
|
if link is None:
|
|
continue
|
|
name = sanitize_filepath(link.text)
|
|
url = link.attrs["href"]
|
|
url = urljoin(self.url, url)
|
|
|
|
if url in self.discovered:
|
|
logging.info(f"{ url } already discovered, skipping it")
|
|
continue
|
|
self.discovered.add(url)
|
|
subitems.append(CrawlObject(self.path / heading_sanitization(heading) / name,
|
|
url,
|
|
self.session, discovered=self.discovered))
|
|
|
|
# 4 column video download like used by Introduction to Modern Cryptography
|
|
for block in content.find_all("tr"):
|
|
items = block.find_all("td")
|
|
if len(items) == 4:
|
|
_, name, _, download = items
|
|
|
|
|
|
link_element = download.find("a")
|
|
name_element = name.find("a")
|
|
if link_element is not None and name_element is not None:
|
|
link = link_element.attrs["href"]
|
|
name = sanitize_filepath(name_element.text + ".mp4") # rough estimate that all files are mp4
|
|
url = urljoin(self.url, link)
|
|
|
|
path = self.path / name
|
|
|
|
if url in self.discovered:
|
|
logging.info(f"{ url } already discovered, skipping it")
|
|
continue
|
|
self.discovered.add(url)
|
|
subitems.append(CrawlObject(path, url, self.session, discovered=self.discovered))
|
|
|
|
# download things from exercise sections
|
|
exercise_div = content.find("div", class_="ilExcOverview")
|
|
if exercise_div is not None:
|
|
logging.debug(exercise_div.prettify())
|
|
containers = exercise_div.find_all("div", class_="il_VAccordionInnerContainer")
|
|
logging.info(f"found { len(containers) } containers.")
|
|
for block in containers:
|
|
header = block.find("span", class_="ilAssignmentHeader")
|
|
logging.debug(header.prettify())
|
|
if header is None:
|
|
logging.warning(f"Found an assignment without a header in '{ self.path }', '{ self.url }'")
|
|
continue
|
|
header_text = header.text
|
|
|
|
forms = block.find_all("div", class_="form-group")
|
|
for form in forms:
|
|
logging.debug(form.prettify())
|
|
divs = form.find_all("div")
|
|
if len(divs) == 2:
|
|
name_div, link_div = divs
|
|
else:
|
|
logging.info("number of divs in formgroup != 2, dont know what to do.")
|
|
continue
|
|
|
|
name = sanitize_filepath(name_div.text)
|
|
link = link_div.find("a")
|
|
if link is None:
|
|
continue
|
|
link = link.attrs["href"]
|
|
url = urljoin(self.url, link)
|
|
|
|
if url in self.discovered:
|
|
logging.info(f"{ url } already discovered, skipping it")
|
|
continue
|
|
self.discovered.add(url)
|
|
subitems.append(CrawlObject(self.path / header_text / name, url, self.session, discovered=self.discovered))
|
|
|
|
# # contents for videos with chapters
|
|
# menu_div = self.html.find("div", class_="il-mainbar-slates")
|
|
# if menu_div is not None:
|
|
#
|
|
# blocks = menu_div.find_all("li")
|
|
# logging.info(f"found { len(blocks) } blocks in menu")
|
|
# for block in blocks:
|
|
# logging.debug(block.prettify())
|
|
# a_element = block.find("a")
|
|
# if a_element is None:
|
|
# continue
|
|
# href = a_element.attrs["href"]
|
|
# logging.info(href)
|
|
|
|
# find the il-card thumbnail videos
|
|
for item in content.find_all("div", class_=["il-card", "thumbnail"]):
|
|
captions = item.find_all("div", class_="caption")
|
|
if captions is None:
|
|
logging.debug("no captions found")
|
|
continue
|
|
|
|
if len(captions) < 3:
|
|
logging.debug(f"to few captions found: { len(captions) }")
|
|
continue
|
|
|
|
heading = captions[0].find("a")
|
|
if heading is None:
|
|
logging.debug(f"No <a> found in { captions.prettify() }")
|
|
continue
|
|
|
|
name = heading.text
|
|
|
|
logging.debug(f"Found Heading: { name }")
|
|
|
|
name = sanitize_filepath(name + ".mp4")
|
|
|
|
link = captions[2].find("a")
|
|
if link is None:
|
|
logging.debug("No <a> found in { link.prettify() }")
|
|
continue
|
|
|
|
url = link.attrs["href"]
|
|
|
|
if url in self.discovered:
|
|
logging.info(f"{ url } already discovered, skipping it")
|
|
continue
|
|
self.discovered.add(url)
|
|
subitems.append(CrawlObject(self.path / name, url, self.session, discovered=self.discovered))
|
|
|
|
|
|
|
|
|
|
|
|
for item in subitems:
|
|
item.process()
|
|
|
|
def save(self) -> None:
|
|
"""
|
|
makes a GET request to the URL of this objects and saves it to the
|
|
path of this object.
|
|
|
|
If needed the directories are created.
|
|
"""
|
|
|
|
response = self.request_get(stream=True)
|
|
|
|
if not self.path.parent.is_dir():
|
|
self.path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
logging.info(f"Writing to file '{ self.path }'.")
|
|
with open(self.path, "wb+") as f:
|
|
for chunk in response.iter_content(chunk_size=10*(2**10)):
|
|
f.write(chunk)
|
|
|
|
def update_requiered(self) -> bool:
|
|
"""
|
|
Tries to compare the file from the URL to the file on the filesystem.
|
|
|
|
Returns True if the file needs an update.
|
|
"""
|
|
|
|
if not self.path.is_file():
|
|
logging.info(f"Update required for '{ self.path }' because the file does not exist.")
|
|
return True
|
|
|
|
logging.info(f"No update required for '{ self.path }'.")
|
|
return False
|
|
|
|
|
|
@dataclass
|
|
class Synchronizer:
|
|
username: str
|
|
password: str
|
|
directories: List[Tuple[str, str]] = field(default_factory=list)
|
|
|
|
def login(self):
|
|
|
|
self._session = requests.Session()
|
|
|
|
# fill the session with the correct cookies
|
|
login_page = self._session.get(f"{ BASE_URL }/login.php?target=&client_id=Uni_Stuttgart&cmd=force_login&lang=en")
|
|
|
|
login_page = BeautifulSoup(login_page.content)
|
|
|
|
login_form = login_page.find("form", attrs={"name": "formlogin"})
|
|
|
|
if login_form is None:
|
|
logging.error("Login failed, login form not found!")
|
|
exit(1)
|
|
|
|
logging.debug(login_form)
|
|
|
|
login_url = login_form.attrs.get("action")
|
|
|
|
if login_url is None:
|
|
logging.error("Could not find the action URL in the login form!")
|
|
exit(1)
|
|
|
|
logging.debug(f"Login URL: {login_url}")
|
|
|
|
login_data = {
|
|
"username": self.username,
|
|
"password": self.password,
|
|
"cmd[doStandardAuthentication]": "Anmelden"
|
|
}
|
|
|
|
# do the actual login
|
|
self._session.post(f"{ BASE_URL }/{ login_url }",
|
|
data=login_data)
|
|
|
|
def synchronize(self):
|
|
|
|
self.login()
|
|
|
|
for path, url in self.directories:
|
|
obj = CrawlObject(path, url, self._session)
|
|
obj.process()
|