--- /dev/null
+# Copyright 2019 Jakob Cornell
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+import urllib.request
+from pathlib import Path
+import tempfile
+from collections import deque
+import shutil
+import filecmp
+import json
+
+import toml
+
+HOST = 'blackboard.oberlin.edu'
+API_PATH = '/learn/api/public/v1'
+
+def get_uri(path):
+ return 'https://' + HOST + API_PATH + path
+
+def walk_pages(uri):
+ while True:
+ with urllib.request.urlopen(uri) as resp:
+ data = json.load(resp)
+ yield from data['results']
+ if 'paging' in data and 'nextPage' in data['paging']:
+ uri = data['paging']['nextPage']
+ else:
+ return
+
+def get_path(course, leaf_id):
+ path = deque()
+ id_ = leaf_id
+ while True:
+ with urllib.request.urlopen(get_uri('/courses/{}/contents/{}'.format(course, leaf_id))) as resp:
+ data = json.load(resp)
+ path.appendleft('{} ({})'.format(data['title'], id_))
+ if 'parentId' in data:
+ id_ = data['parentId']
+ else:
+ return list(path)
+
+# https://docs.microsoft.com/en-us/windows/desktop/FileIO/naming-a-file
+def sanitize(seg):
+ bad = {*map(chr, range(0, 31 + 1)), *r'<>:"/\|?*'}
+ return seg.translate({ch: None for ch in bad})
+
+# parse directory contents into mirror state for individual item
+def read_metadata(path):
+ info = {}
+
+
+try:
+ with open('config.toml') as f:
+ config = toml.load(f)
+except OSError:
+ print("Cannot open configuration file `config.toml`:", file = sys.stderr)
+ raise
+if 'base_path' not in config:
+ print("`base_path` not in config file")
+ raise KeyError()
+
+import sys
+
+args = sys.argv[1:]
+if len(args) != 1:
+ print("Please pass the Blackboard URL to sync as an argument", file = sys.stderr)
+ raise AssertionError()
+
+url = args[0]
+try:
+ params = urllib.parse.parse_qs(urllib.parse.urlparse(url).query)
+except ValueError:
+ print("That URL doesn't look right:", file = sys.stderr)
+ raise
+
+if 'course_id' not in params or 'content_id' not in params:
+ print("That URL doesn't look right.", file = sys.stderr)
+ raise ValueError()
+course = params['course_id']
+page = params['content_id']
+base_path = Path(config['base_path'], *map(sanitize, get_path(page)))
+
+for item in walk_pages(get_uri('/courses/{}/contents/{}/children'.format(course, page))):
+ for attachment in walk_pages(get_uri('/courses/{}/contents/{}/attachments'.format(course, item['id']))):
+ dir_ = base_path.joinpath(sanitize('{} ({})'.format(item['title'], item['id'])))
+ orig_name = sanitize('{} ({})'.format(attachment['fileName'], attachment['id']))
+ dir_.mkdir(parents = True, exist_ok = True)
+ with tempfile.NamedTemporaryFile(prefix = orig_name, dir = str(dir_), delete = False) as temp:
+ with urllib.request.urlopen(
+ '/courses/{}/contents/{}/attachments/{}/download'.format(course, item['id'], attachment['id'])
+ ) as resp:
+ shutil.copyfileobj(resp, temp)
+ temp_name = temp.name
+ orig = dir_.joinpath(orig_name)
+ temp = dir_.joinpath(temp_name)
+ if dir_.joinpath(name).exists():
+ if not filecmp.cmp(str(dir_.joinpath(temp_name)), str(dir_.joinpath(name)), shallow = False):
+ print("")
+ else:
+ shutil.move(str(temp), str(orig))
--- /dev/null
+# Copyright 2019 Jakob Cornell
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+import http.server
+from queue import Queue
+from threading import Thread
+import urllib.parse
+import webbrowser
+
+ADDRESS = 'localhost'
+PORT = 1081
+
+def _get_handler(queue):
+ # Why does your API require me to use such hacks?
+
+ class AuthCodeRequestHandler(http.server.BaseHTTPRequestHandler):
+ def do_GET(self):
+ self.send_response(200)
+ self.end_headers()
+ self.wfile.write(self.path.encode('ascii'))
+ queue.put(None)
+
+ def log_message(self, *_):
+ pass
+
+ return AuthCodeRequestHandler
+
+def get_auth_code(bb_host, api_path, client_id):
+ redirect_uri = 'http://' + ADDRESS + ':' + str(PORT) + '/'
+ params = {
+ 'redirect_uri': redirect_uri,
+ 'response_type': 'code',
+ 'client_id': client_id,
+ }
+ qs = urllib.parse.urlencode(params)
+ user_url = urllib.parse.urlunparse(urllib.parse.ParseResult(
+ scheme = 'https',
+ netloc = bb_host,
+ path = str(api_path.joinpath('oauth2/authorizationcode')),
+ query = qs,
+ params = '',
+ fragment = '',
+ ))
+ webbrowser.open(user_url)
+
+ queue = Queue()
+ server = http.server.HTTPServer((ADDRESS, PORT), _get_handler(queue))
+ Thread(target = lambda: server.serve_forever()).start()
+ queue.get()
+ server.shutdown()
+
+import pathlib
+get_auth_code('oberlin-test.blackboard.com', pathlib.PurePosixPath('/learn/api/public/v1'))
--- /dev/null
+# Copyright 2018, Anders Cornell and Jakob Cornell
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+from collections import namedtuple
+import itertools
+import logging
+import pathlib
+import re
+import urllib.parse
+
+LOGGER_NAME = 'bb_sync_api'
+
+def resolve(target, curr_url):
+ # I hope I do this like a browser...
+ parsed = urllib.parse.urlparse(target)
+ if parsed.scheme:
+ return target
+ elif target.startswith('/'):
+ curr = urllib.parse.urlparse(curr_url)
+ return curr.scheme + '://' + curr.netloc + target
+ else:
+ raise NotImplementedError("relative URI")
+
+def extract_ids(url):
+ qs = urllib.parse.parse_qs(urllib.parse.urlparse(url).query)
+ [course_id] = qs['course_id']
+ [content_id] = qs['content_id']
+ return {
+ 'course': course_id.strip('_'),
+ 'content': content_id.strip('_'),
+ }
+
+def clean_win_path(seg):
+ # https://docs.microsoft.com/en-us/windows/desktop/FileIO/naming-a-file
+ bad = {*range(0, 31 + 1), *map(ord, r'<>:"/\|?*')}
+ return seg.translate({ch: ' ' for ch in bad})
+
+def _split_name(name):
+ suff_len = sum(len(s) for s in pathlib.Path(name).suffixes)
+ stem = name[slice(None, len(name) - suff_len)]
+ suff = name[slice(len(name) - suff_len, None)]
+ return (stem, suff)
+
+def content_path(course_path, segments):
+ path = course_path
+ for (id_, name) in segments:
+ if path.exists():
+ cands = [child for child in path.iterdir() if re.search(r'\({}\)$'.format(re.escape(id_)), child.name)]
+ if cands:
+ [path] = cands
+ continue
+ path = path.joinpath(clean_win_path('{}({})'.format(name, id_)))
+ return path
+
+_BB_ID_STREAM_NAME = '8f3b98ea-e227-478f-bb58-5c31db476409'
+
+ParseResult = namedtuple('ParseResult', ['id_', 'version'])
+
+def _parse_path(path):
+ (stem, _) = _split_name(path.name)
+ match = re.search(r'\((?P<id>[\d_]+,)?(?P<version>\d+)\)$', stem)
+ if match:
+ stream_path = path.with_name(path.name + ':' + _BB_ID_STREAM_NAME)
+ if stream_path.exists():
+ with stream_path.open() as f:
+ id_ = f.read()
+ else:
+ id_ = match.group('id')
+ assert id_ is not None
+ return ParseResult(id_ = id_, version = match.group('version'))
+ else:
+ return None
+
+def unparse_path(parse_result):
+
+def get_latest_versions(content_path):
+ results = {}
+ for path in content_path.iterdir():
+ result = _parse_path(path)
+ if result and (result.id_ not in results or results[result._id] < result.version):
+ results[result.id_] = result.version
+ return results