initial commit

This commit is contained in:
Johannes Erwerle 2022-03-29 15:13:31 +02:00
commit 4742e417f7
7 changed files with 345 additions and 0 deletions

1
README.md Normal file
View file

@ -0,0 +1 @@
# Ilias Sync

1
ilias_sync2/__init__.py Normal file
View file

@ -0,0 +1 @@
from .main import CrawlObject, Synchronizer

282
ilias_sync2/main.py Normal file
View file

@ -0,0 +1,282 @@
import logging
import sys
from pathlib import Path
from dataclasses import dataclass, field
from typing import List, Tuple, Set
from urllib.parse import urljoin
import requests
from bs4 import BeautifulSoup
from .utils import ContentDisposition, heading_sanitization
BASE_URL = "https://ilias3.uni-stuttgart.de"
root = logging.getLogger()
root.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
root.addHandler(handler)
class NoIliasFilename(Exception):
pass
class FileDownloadError(Exception):
pass
class LoginFailed(Exception):
pass
class RequestError(Exception):
pass
class NoContentType(Exception):
pass
# Fehler nach oben durchreichen
# Seite
# seite holen
# seite parsen - pro Link
# - Namen suchen
# - Link suchen
# - Neues Crawl-Objekt erstellen
# rekursiv sich mit den Links beschäftigen
# Wenn seite != HTML
# - Daten aus HEAD mit Datei im System abgleichen
# - wenn nicht vorhanden oder unterschiedlich runterladen und speichern
@dataclass
class CrawlObject():
path: Path
url: str
session: requests.Session
head: requests.Response = None
items: List["CrawlObject"] = field(default_factory=list)
discovered: Set[str] = field(default_factory=set)
def request_head(self) -> requests.Response:
return self.request(method="HEAD")
def request_get(self) -> requests.Response:
return self.request(method="GET")
def request(self, method="GET") -> requests.Response:
response = self.session.request(method=method, url=self.url)
if response.status_code != requests.codes.ok:
raise RequestError(f"Error getting '{ method }' for { self.url }",
response)
return response
def parse(self) -> None:
pass
# self.get_content
def process(self):
if self.head is None:
self.head = self.request_head()
content_type = self.head.headers.get("Content-Type", None)
if content_type is None:
raise NoContentType(f"No Content-Type for { self.url }")
typestr = content_type.split(";")[0]
logging.debug(f"Content-Type is '{ typestr }'.")
if typestr == "text/html":
self.parse()
self.process_items()
else:
self.improve_name()
if self.update_requiered():
self.save()
def improve_name(self):
"""
This method reads the header and tries to improve the filename
"""
disposition = self.head.headers.get("Content-Disposition")
if disposition is not None:
disposition_obj = ContentDisposition.from_str(disposition)
if disposition_obj.filename is not None:
logging.info(f"Improving filename from '{ self.path.name }' to '{ disposition_obj.filename }'")
self.path = self.path.parent / disposition_obj.filename
def process_items(self) -> None:
subitems = list()
self.page_request = self.request_get()
self.html = BeautifulSoup(self.page_request.text, "html.parser")
content = self.html.find(id="ilContentContainer")
if content is None:
logging.info("no ilias content block found, nothing to do.")
else:
for block in content.find_all("div", class_="ilContainerBlock"):
header = block.find("div", class_="ilContainerBlockHeader")
heading = header.find("h2").text
items = block.find_all("div", class_="il_ContainerListItem")
for item in items:
#logging.debug(item.prettify())
link = item.find("a")
if link is None:
continue
name = link.text
url = link.attrs["href"]
url = urljoin(self.url, url)
if url in self.discovered:
continue
logging.info(f"{ url } already discovered, skipping it")
self.discovered.add(url)
subitems.append(CrawlObject(self.path / heading_sanitization(heading) / name,
url,
self.session, discovered=self.discovered))
# 4 column video download like used by Introduction to Modern Cryptography
for block in content.find_all("tr"):
items = block.find_all("td")
if len(items) == 4:
_, name, _, download = items
link = download.find("a").attrs["href"]
name = name.find("a").text + ".mp4" # rough estimate that all files are mp4
url = urljoin(self.url, link)
path = self.path / name
if url in self.discovered:
logging.info(f"{ url } already discovered, skipping it")
continue
self.discovered.add(url)
subitems.append(CrawlObject(path, url, self.session, discovered=self.discovered))
# download things from exercise sections
exercise_div = content.find("div", class_="ilExcOverview")
if exercise_div is not None:
logging.debug(exercise_div.prettify())
containers = exercise_div.find_all("div", class_="il_VAccordionInnerContainer")
logging.info(f"found { len(containers) } containers.")
for block in containers:
header = block.find("span", class_="ilAssignmentHeader")
logging.debug(header.prettify())
if header is None:
logging.warning(f"Found an assignment without a header in '{ self.path }', '{ self.url }'")
continue
header_text = header.text
forms = block.find_all("div", class_="form-group")
for form in forms:
logging.debug(form.prettify())
divs = form.find_all("div")
if len(divs) == 2:
name_div, link_div = divs
else:
logging.info("number of divs in formgroup != 2, dont know what to do.")
continue
name = name_div.text
link = link_div.find("a")
if link is None:
continue
link = link.attrs["href"]
url = urljoin(self.url, link)
if url in self.discovered:
logging.info(f"{ url } already discovered, skipping it")
continue
self.discovered.add(url)
subitems.append(CrawlObject(self.path / header_text / name, url, self.session, discovered=self.discovered))
# # contents for videos with chapters
# menu_div = self.html.find("div", class_="il-mainbar-slates")
# if menu_div is not None:
#
# blocks = menu_div.find_all("li")
# logging.info(f"found { len(blocks) } blocks in menu")
# for block in blocks:
# logging.debug(block.prettify())
# a_element = block.find("a")
# if a_element is None:
# continue
# href = a_element.attrs["href"]
# logging.info(href)
for item in subitems:
item.process()
def save(self) -> None:
"""
makes a GET request to the URL of this objects and saves it to the
path of this object.
If needed the directories are created.
"""
content = self.request_get().content
if not self.path.parent.is_dir():
self.path.parent.mkdir(parents=True, exist_ok=True)
logging.info(f"Writing to file '{ self.path }'.")
with open(self.path, "wb+") as f:
f.write(content)
def update_requiered(self) -> bool:
"""
Tries to compare the file from the URL to the file on the filesystem.
Returns True if the file needs an update.
"""
if not self.path.is_file():
logging.info(f"Update required for '{ self.path }' because the file does not exist.")
return True
logging.info(f"No update required for '{ self.path }'.")
return False
@dataclass
class Synchronizer:
username: str
password: str
directories: List[Tuple[str, str]] = field(default_factory=list)
def login(self):
self._session = requests.Session()
# fill the session with the correct cookies
self._session.get(f"{ BASE_URL }/login.php?target=&client_id=Uni_Stuttgart&cmd=force_login&lang=en")
login_data = {
"username": self.username,
"password": self.password,
"cmd[doStandardAuthentication]": "Anmelden"
}
# do the actual login
self._session.post(f"{ BASE_URL }/ilias.php?lang=en&client_id=Uni_Stuttgart&cmd=post&cmdClass=ilstartupgui&cmdNode=123&baseClass=ilStartUpGUI&rtoken=", data=login_data)
def synchronize(self):
self.login()
for path, url in self.directories:
obj = CrawlObject(path, url, self._session)
obj.process()

View file

View file

@ -0,0 +1,5 @@
from unittest import TestCase
class TestGetPageContent(TestCase):
def test_

37
ilias_sync2/utils.py Normal file
View file

@ -0,0 +1,37 @@
import re
from typing import Optional
from dataclasses import dataclass
@dataclass
class ContentDisposition:
method: str
filename: Optional[str] = None
@classmethod
def from_str(cls, disposition: str):
m = re.match("(?P<type>(inline)|(attachment))(;\\s*(filename=\"(?P<filename>.+)\"\\s*))?", disposition)
if m is None:
raise ValueError(f"Error while parsing disposition string '{ disposition }'.")
d = m.groupdict()
return cls(d["type"], d.get("filename", None))
def heading_sanitization(heading: str) -> str:
"""
Removes some parts of headings and path names.
Currently these optimizations are done:
- if the heading is "Inhalt" it is replaced by the empty string
Otherwise the heading is returned as is.
"""
if heading == "Inhalt":
return ""
return heading

19
setup.py Normal file
View file

@ -0,0 +1,19 @@
from setuptools import setup, find_packages
with open('README.md') as f:
readme = f.read()
setup(
name='ilias_sync2',
version="0.0.1",
packages=find_packages(),
url="",
author='Johannes Erwerle',
author_email='jo@swagsapce.org',
description="A library to synchronize the content of ILIAS to a directory",
include_package_data=True,
zip_safe=False,
long_description=readme,
install_requires=["bs4", "requests"],
long_description_content_type="text/markdown",
)