WIP
authorJakob Cornell <jakob@jcornell.net>
Fri, 15 Nov 2019 06:34:35 +0000 (00:34 -0600)
committerJakob Cornell <jakob@jcornell.net>
Fri, 15 Nov 2019 06:34:35 +0000 (00:34 -0600)
main.py [new file with mode: 0644]
oauth.py [new file with mode: 0644]
util.py [new file with mode: 0644]

diff --git a/main.py b/main.py
new file mode 100644 (file)
index 0000000..f46f06f
--- /dev/null
+++ b/main.py
@@ -0,0 +1,112 @@
+# Copyright 2019 Jakob Cornell
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# 
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+# 
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+
+import urllib.request
+from pathlib import Path
+import tempfile
+from collections import deque
+import shutil
+import filecmp
+import json
+
+import toml
+
+HOST = 'blackboard.oberlin.edu'
+API_PATH = '/learn/api/public/v1'
+
+def get_uri(path):
+       return 'https://' + HOST + API_PATH + path
+
+def walk_pages(uri):
+       while True:
+               with urllib.request.urlopen(uri) as resp:
+                       data = json.load(resp)
+                       yield from data['results']
+                       if 'paging' in data and 'nextPage' in data['paging']:
+                               uri = data['paging']['nextPage']
+                       else:
+                               return
+
+def get_path(course, leaf_id):
+       path = deque()
+       id_ = leaf_id
+       while True:
+               with urllib.request.urlopen(get_uri('/courses/{}/contents/{}'.format(course, leaf_id))) as resp:
+                       data = json.load(resp)
+               path.appendleft('{} ({})'.format(data['title'], id_))
+               if 'parentId' in data:
+                       id_ = data['parentId']
+               else:
+                       return list(path)
+
+# https://docs.microsoft.com/en-us/windows/desktop/FileIO/naming-a-file
+def sanitize(seg):
+       bad = {*map(chr, range(0, 31 + 1)), *r'<>:"/\|?*'}
+       return seg.translate({ch: None for ch in bad})
+
+# parse directory contents into mirror state for individual item
+def read_metadata(path):
+       info = {}
+       
+
+try:
+       with open('config.toml') as f:
+               config = toml.load(f)
+except OSError:
+       print("Cannot open configuration file `config.toml`:", file = sys.stderr)
+       raise
+if 'base_path' not in config:
+       print("`base_path` not in config file")
+       raise KeyError()
+
+import sys
+
+args = sys.argv[1:]
+if len(args) != 1:
+       print("Please pass the Blackboard URL to sync as an argument", file = sys.stderr)
+       raise AssertionError()
+
+url = args[0]
+try:
+       params = urllib.parse.parse_qs(urllib.parse.urlparse(url).query)
+except ValueError:
+       print("That URL doesn't look right:", file = sys.stderr)
+       raise
+
+if 'course_id' not in params or 'content_id' not in params:
+       print("That URL doesn't look right.", file = sys.stderr)
+       raise ValueError()
+course = params['course_id']
+page = params['content_id']
+base_path = Path(config['base_path'], *map(sanitize, get_path(page)))
+
+for item in walk_pages(get_uri('/courses/{}/contents/{}/children'.format(course, page))):
+       for attachment in walk_pages(get_uri('/courses/{}/contents/{}/attachments'.format(course, item['id']))):
+               dir_ = base_path.joinpath(sanitize('{} ({})'.format(item['title'], item['id'])))
+               orig_name = sanitize('{} ({})'.format(attachment['fileName'], attachment['id']))
+               dir_.mkdir(parents = True, exist_ok = True)
+               with tempfile.NamedTemporaryFile(prefix = orig_name, dir = str(dir_), delete = False) as temp:
+                       with urllib.request.urlopen(
+                               '/courses/{}/contents/{}/attachments/{}/download'.format(course, item['id'], attachment['id'])
+                       ) as resp:
+                               shutil.copyfileobj(resp, temp)
+                       temp_name = temp.name
+               orig = dir_.joinpath(orig_name)
+               temp = dir_.joinpath(temp_name)
+               if dir_.joinpath(name).exists():
+                       if not filecmp.cmp(str(dir_.joinpath(temp_name)), str(dir_.joinpath(name)), shallow = False):
+                               print("")
+               else:
+                       shutil.move(str(temp), str(orig))
diff --git a/oauth.py b/oauth.py
new file mode 100644 (file)
index 0000000..6aeb982
--- /dev/null
+++ b/oauth.py
@@ -0,0 +1,65 @@
+# Copyright 2019 Jakob Cornell
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# 
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+# 
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+
+import http.server
+from queue import Queue
+from threading import Thread
+import urllib.parse
+import webbrowser
+
+ADDRESS = 'localhost'
+PORT = 1081
+
+def _get_handler(queue):
+       # Why does your API require me to use such hacks?
+
+       class AuthCodeRequestHandler(http.server.BaseHTTPRequestHandler):
+               def do_GET(self):
+                       self.send_response(200)
+                       self.end_headers()
+                       self.wfile.write(self.path.encode('ascii'))
+                       queue.put(None)
+
+               def log_message(self, *_):
+                       pass
+
+       return AuthCodeRequestHandler
+
+def get_auth_code(bb_host, api_path, client_id):
+       redirect_uri = 'http://' + ADDRESS + ':' + str(PORT) + '/'
+       params = {
+               'redirect_uri': redirect_uri,
+               'response_type': 'code',
+               'client_id': client_id,
+       }
+       qs = urllib.parse.urlencode(params)
+       user_url = urllib.parse.urlunparse(urllib.parse.ParseResult(
+               scheme = 'https',
+               netloc = bb_host,
+               path = str(api_path.joinpath('oauth2/authorizationcode')),
+               query = qs,
+               params = '',
+               fragment = '',
+       ))
+       webbrowser.open(user_url)
+
+       queue = Queue()
+       server = http.server.HTTPServer((ADDRESS, PORT), _get_handler(queue))
+       Thread(target = lambda: server.serve_forever()).start()
+       queue.get()
+       server.shutdown()
+
+import pathlib
+get_auth_code('oberlin-test.blackboard.com', pathlib.PurePosixPath('/learn/api/public/v1'))
diff --git a/util.py b/util.py
new file mode 100644 (file)
index 0000000..9ce2138
--- /dev/null
+++ b/util.py
@@ -0,0 +1,94 @@
+# Copyright 2018, Anders Cornell and Jakob Cornell
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# 
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+# 
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+
+from collections import namedtuple
+import itertools
+import logging
+import pathlib
+import re
+import urllib.parse
+
+LOGGER_NAME = 'bb_sync_api'
+
+def resolve(target, curr_url):
+       # I hope I do this like a browser...
+       parsed = urllib.parse.urlparse(target)
+       if parsed.scheme:
+               return target
+       elif target.startswith('/'):
+               curr = urllib.parse.urlparse(curr_url)
+               return curr.scheme + '://' + curr.netloc + target
+       else:
+               raise NotImplementedError("relative URI")
+
+def extract_ids(url):
+       qs = urllib.parse.parse_qs(urllib.parse.urlparse(url).query)
+       [course_id] = qs['course_id']
+       [content_id] = qs['content_id']
+       return {
+               'course': course_id.strip('_'),
+               'content': content_id.strip('_'),
+       }
+
+def clean_win_path(seg):
+       # https://docs.microsoft.com/en-us/windows/desktop/FileIO/naming-a-file
+       bad = {*range(0, 31 + 1), *map(ord, r'<>:"/\|?*')}
+       return seg.translate({ch: ' ' for ch in bad})
+
+def _split_name(name):
+       suff_len = sum(len(s) for s in pathlib.Path(name).suffixes)
+       stem = name[slice(None, len(name) - suff_len)]
+       suff = name[slice(len(name) - suff_len, None)]
+       return (stem, suff)
+
+def content_path(course_path, segments):
+       path = course_path
+       for (id_, name) in segments:
+               if path.exists():
+                       cands = [child for child in path.iterdir() if re.search(r'\({}\)$'.format(re.escape(id_)), child.name)]
+                       if cands:
+                               [path] = cands
+                               continue
+               path = path.joinpath(clean_win_path('{}({})'.format(name, id_)))
+       return path
+
+_BB_ID_STREAM_NAME = '8f3b98ea-e227-478f-bb58-5c31db476409'
+
+ParseResult = namedtuple('ParseResult', ['id_', 'version'])
+
+def _parse_path(path):
+       (stem, _) = _split_name(path.name)
+       match = re.search(r'\((?P<id>[\d_]+,)?(?P<version>\d+)\)$', stem)
+       if match:
+               stream_path = path.with_name(path.name + ':' + _BB_ID_STREAM_NAME)
+               if stream_path.exists():
+                       with stream_path.open() as f:
+                               id_ = f.read()
+               else:
+                       id_ = match.group('id')
+                       assert id_ is not None
+               return ParseResult(id_ = id_, version = match.group('version'))
+       else:
+               return None
+
+def unparse_path(parse_result):
+
+def get_latest_versions(content_path):
+       results = {}
+       for path in content_path.iterdir():
+               result = _parse_path(path)
+               if result and (result.id_ not in results or results[result._id] < result.version):
+                       results[result.id_] = result.version
+       return results