From d0f43fc01c4c4d412e8d92020b6ade8669826794 Mon Sep 17 00:00:00 2001 From: Jakob Cornell Date: Fri, 15 Nov 2019 00:34:35 -0600 Subject: [PATCH] WIP --- main.py | 112 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ oauth.py | 65 ++++++++++++++++++++++++++++++++ util.py | 94 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 271 insertions(+) create mode 100644 main.py create mode 100644 oauth.py create mode 100644 util.py diff --git a/main.py b/main.py new file mode 100644 index 0000000..f46f06f --- /dev/null +++ b/main.py @@ -0,0 +1,112 @@ +# Copyright 2019 Jakob Cornell + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import urllib.request +from pathlib import Path +import tempfile +from collections import deque +import shutil +import filecmp +import json + +import toml + +HOST = 'blackboard.oberlin.edu' +API_PATH = '/learn/api/public/v1' + +def get_uri(path): + return 'https://' + HOST + API_PATH + path + +def walk_pages(uri): + while True: + with urllib.request.urlopen(uri) as resp: + data = json.load(resp) + yield from data['results'] + if 'paging' in data and 'nextPage' in data['paging']: + uri = data['paging']['nextPage'] + else: + return + +def get_path(course, leaf_id): + path = deque() + id_ = leaf_id + while True: + with urllib.request.urlopen(get_uri('/courses/{}/contents/{}'.format(course, leaf_id))) as resp: + data = json.load(resp) + path.appendleft('{} ({})'.format(data['title'], id_)) + if 'parentId' in data: + id_ = data['parentId'] + else: + return list(path) + +# https://docs.microsoft.com/en-us/windows/desktop/FileIO/naming-a-file +def sanitize(seg): + bad = {*map(chr, range(0, 31 + 1)), *r'<>:"/\|?*'} + return seg.translate({ch: None for ch in bad}) + +# parse directory contents into mirror state for individual item +def read_metadata(path): + info = {} + + +try: + with open('config.toml') as f: + config = toml.load(f) +except OSError: + print("Cannot open configuration file `config.toml`:", file = sys.stderr) + raise +if 'base_path' not in config: + print("`base_path` not in config file") + raise KeyError() + +import sys + +args = sys.argv[1:] +if len(args) != 1: + print("Please pass the Blackboard URL to sync as an argument", file = sys.stderr) + raise AssertionError() + +url = args[0] +try: + params = urllib.parse.parse_qs(urllib.parse.urlparse(url).query) +except ValueError: + print("That URL doesn't look right:", file = sys.stderr) + raise + +if 'course_id' not in params or 'content_id' not in params: + print("That URL doesn't look right.", file = sys.stderr) + raise ValueError() +course = params['course_id'] +page = params['content_id'] +base_path = Path(config['base_path'], *map(sanitize, get_path(page))) + +for item in walk_pages(get_uri('/courses/{}/contents/{}/children'.format(course, page))): + for attachment in walk_pages(get_uri('/courses/{}/contents/{}/attachments'.format(course, item['id']))): + dir_ = base_path.joinpath(sanitize('{} ({})'.format(item['title'], item['id']))) + orig_name = sanitize('{} ({})'.format(attachment['fileName'], attachment['id'])) + dir_.mkdir(parents = True, exist_ok = True) + with tempfile.NamedTemporaryFile(prefix = orig_name, dir = str(dir_), delete = False) as temp: + with urllib.request.urlopen( + '/courses/{}/contents/{}/attachments/{}/download'.format(course, item['id'], attachment['id']) + ) as resp: + shutil.copyfileobj(resp, temp) + temp_name = temp.name + orig = dir_.joinpath(orig_name) + temp = dir_.joinpath(temp_name) + if dir_.joinpath(name).exists(): + if not filecmp.cmp(str(dir_.joinpath(temp_name)), str(dir_.joinpath(name)), shallow = False): + print("") + else: + shutil.move(str(temp), str(orig)) diff --git a/oauth.py b/oauth.py new file mode 100644 index 0000000..6aeb982 --- /dev/null +++ b/oauth.py @@ -0,0 +1,65 @@ +# Copyright 2019 Jakob Cornell + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import http.server +from queue import Queue +from threading import Thread +import urllib.parse +import webbrowser + +ADDRESS = 'localhost' +PORT = 1081 + +def _get_handler(queue): + # Why does your API require me to use such hacks? + + class AuthCodeRequestHandler(http.server.BaseHTTPRequestHandler): + def do_GET(self): + self.send_response(200) + self.end_headers() + self.wfile.write(self.path.encode('ascii')) + queue.put(None) + + def log_message(self, *_): + pass + + return AuthCodeRequestHandler + +def get_auth_code(bb_host, api_path, client_id): + redirect_uri = 'http://' + ADDRESS + ':' + str(PORT) + '/' + params = { + 'redirect_uri': redirect_uri, + 'response_type': 'code', + 'client_id': client_id, + } + qs = urllib.parse.urlencode(params) + user_url = urllib.parse.urlunparse(urllib.parse.ParseResult( + scheme = 'https', + netloc = bb_host, + path = str(api_path.joinpath('oauth2/authorizationcode')), + query = qs, + params = '', + fragment = '', + )) + webbrowser.open(user_url) + + queue = Queue() + server = http.server.HTTPServer((ADDRESS, PORT), _get_handler(queue)) + Thread(target = lambda: server.serve_forever()).start() + queue.get() + server.shutdown() + +import pathlib +get_auth_code('oberlin-test.blackboard.com', pathlib.PurePosixPath('/learn/api/public/v1')) diff --git a/util.py b/util.py new file mode 100644 index 0000000..9ce2138 --- /dev/null +++ b/util.py @@ -0,0 +1,94 @@ +# Copyright 2018, Anders Cornell and Jakob Cornell + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from collections import namedtuple +import itertools +import logging +import pathlib +import re +import urllib.parse + +LOGGER_NAME = 'bb_sync_api' + +def resolve(target, curr_url): + # I hope I do this like a browser... + parsed = urllib.parse.urlparse(target) + if parsed.scheme: + return target + elif target.startswith('/'): + curr = urllib.parse.urlparse(curr_url) + return curr.scheme + '://' + curr.netloc + target + else: + raise NotImplementedError("relative URI") + +def extract_ids(url): + qs = urllib.parse.parse_qs(urllib.parse.urlparse(url).query) + [course_id] = qs['course_id'] + [content_id] = qs['content_id'] + return { + 'course': course_id.strip('_'), + 'content': content_id.strip('_'), + } + +def clean_win_path(seg): + # https://docs.microsoft.com/en-us/windows/desktop/FileIO/naming-a-file + bad = {*range(0, 31 + 1), *map(ord, r'<>:"/\|?*')} + return seg.translate({ch: ' ' for ch in bad}) + +def _split_name(name): + suff_len = sum(len(s) for s in pathlib.Path(name).suffixes) + stem = name[slice(None, len(name) - suff_len)] + suff = name[slice(len(name) - suff_len, None)] + return (stem, suff) + +def content_path(course_path, segments): + path = course_path + for (id_, name) in segments: + if path.exists(): + cands = [child for child in path.iterdir() if re.search(r'\({}\)$'.format(re.escape(id_)), child.name)] + if cands: + [path] = cands + continue + path = path.joinpath(clean_win_path('{}({})'.format(name, id_))) + return path + +_BB_ID_STREAM_NAME = '8f3b98ea-e227-478f-bb58-5c31db476409' + +ParseResult = namedtuple('ParseResult', ['id_', 'version']) + +def _parse_path(path): + (stem, _) = _split_name(path.name) + match = re.search(r'\((?P[\d_]+,)?(?P\d+)\)$', stem) + if match: + stream_path = path.with_name(path.name + ':' + _BB_ID_STREAM_NAME) + if stream_path.exists(): + with stream_path.open() as f: + id_ = f.read() + else: + id_ = match.group('id') + assert id_ is not None + return ParseResult(id_ = id_, version = match.group('version')) + else: + return None + +def unparse_path(parse_result): + +def get_latest_versions(content_path): + results = {} + for path in content_path.iterdir(): + result = _parse_path(path) + if result and (result.id_ not in results or results[result._id] < result.version): + results[result.id_] = result.version + return results -- 2.30.2