+ +
...
+
...
+ + TreeBuilder constructs as: + +
+ ... +
...
+
...
+ """ + main = next(document.iter("main")) + + # extract the brief italic description from just below the show title + hero = next( + el for el in main.iter("div") + if "title-hero" in el.attrib.get("class", "").split() + ) + [hero_sub_el] = ( + el for el in hero.iter("p") + if "title-hero__subinfo" in el.attrib.get("class", "").split() + ) + sub_info = hero_sub_el.text.strip() + + """ + post_hero = next( + el for el in main.iter("div") + if {"position-relative", "bg-color-silver-ultralight"} <= set((el.attrib.get("class") or "").split()) + ) + [contents] = (el for el in post_hero if "contents" in el.attrib.get("class", "")) + + # look for a music page section and check that the nav menu agrees + flat = itertools.chain.from_iterable(contents) # disregard div.row groupings + """ + """ + anchor = next( + (el for el in main.iter("span") if el.attrib.get("id") == "music"), + None + ) + music_section = anchor is not None + """ + + def get_pdp_sections(tree: Element, headers: Collection[str]) -> Iterator[Optional[Element]]: + # Extract page sections by header text. Iterating over all the div elements could be very + # expensive, so we try to only do it once. + elements = {} + for el in tree.iter("div"): + if "pdp-section" in (el.attrib.get("class") or "").split(): + if el and el[0]: + header = el[0][0].text.strip() # h3 + if header in headers: + assert header not in elements + elements[header] = el + if len(elements) == len(headers): + break + return (elements.get(header) for header in headers) + + def get_inline_modals(tree: Element) -> dict[str, Element]: + # Extract inline modals by the text of the button that opens them + return { + div.attrib["button-text"].lower(): div + for div in tree.iter("div") + if div.attrib.get("is") == "InlineModal" + } + + [music_section, materials_section] = get_pdp_sections(main, ["Music", "Licensing & Materials"]) + assert (music_section is not None) == music_nav + + if music_nav: + # pdp-section div immediately follows corresponding anchor + assert "pdp-section" in music_section.attrib["class"].split() + [_, content] = [e for e in music_section if e.tag == "div"] + rhs = content[-1] + classifications = content[-1] + + music_modals = get_inline_modals(music_section) + #assert music_modals.keys() <= {"musical numbers"} TODO + + def look_up(header: str) -> Optional[str]: + labels = [el for el in content.iter("strong") if (el.text or "").strip() == header] + if labels: + [label] = labels + return label.tail.strip() + else: + return None + + musical_style = look_up("Musical Style") + orch_size = look_up("Orchestra Size") + + if musical_style == "N/A (Not a musical)": + assert orch_size is None + elif orch_size is None: + pass + else: + materials_modals = get_inline_modals(materials_section) + assert materials_modals.keys() <= { + "piano only", "full package rentals", "title specific notes" + } + + if orch_size == "Piano Only": + assert "full package rentals" not in materials_modals + modal = materials_modals["piano only"] + elif orch_size in ["Small/Combo", "Medium", "Large", "X-Large"]: + modal = materials_modals["full package rentals"] + else: + raise NotImplementedError(f"orchestra size {orch_size}") + return modal + else: + # no music section; should be a play + assert re.search(r"\bplay\b", sub_info, re.IGNORECASE) + + return None + + +def lineify_orchestration_modal(modal: Element) -> list[str]: + def clean_whitespace(text: str) -> str: + return re.sub(r"\s+", " ", text.strip()) + + def textify(el: Element) -> Iterator[str]: + if el.tag in ["p", "br", "div", "li", "ul"]: + yield "\n" + else: + assert el.tag in ["a", "em", "span", "strong"] + if el.text and el.text.strip(): + yield clean_whitespace(el.text) + for sub in el: + yield from textify(sub) + if el.tail and el.tail.strip(): + yield clean_whitespace(el.tail) + if el.tag in ["p", "div", "li", "ul"]: + yield "\n" + + return [ + " ".join(parts) + for (is_br, parts) in groupby(textify(modal), key = lambda s: s == "\n") + if not is_br + ] diff --git a/pithub/html.py b/pithub/html.py new file mode 100644 index 0000000..7745243 --- /dev/null +++ b/pithub/html.py @@ -0,0 +1,48 @@ +from html.parser import HTMLParser +from xml.etree.ElementTree import Element, TreeBuilder + + +class _AutoClosingTreeBuilder(TreeBuilder): + _AUTO_CLOSING_TAGS = {"br", "img"} + + def __init__(self, *args, **kwargs) -> None: + self._tag_stack = [] + super().__init__(*args, **kwargs) + + def _auto_close(self): + if self._tag_stack and self._tag_stack[-1] in self._AUTO_CLOSING_TAGS: + self.end(self._tag_stack[-1]) + + def start(self, tag: str, attrs: list[tuple[str, str]]) -> None: + self._auto_close() + super().start(tag, attrs) + self._tag_stack.append(tag) + + def end(self, tag: str) -> None: + if tag not in self._AUTO_CLOSING_TAGS: + self._auto_close() + super().end(tag) + self._tag_stack.pop() + + +class _HtmlTreeBuilderDriver(HTMLParser): + """Adapts the HTMLParser callback interface to etree's TreeBuilder for an etree-like HTML parser.""" + + def __init__(self, tree_builder: TreeBuilder, **kwargs) -> None: + self.tree_builder = tree_builder + super().__init__(**kwargs) + + def handle_starttag(self, tag: str, attrs: list[tuple[str, str]]) -> None: + self.tree_builder.start(tag, dict(attrs)) + + def handle_endtag(self, tag: str) -> None: + self.tree_builder.end(tag) + + def handle_data(self, data: str) -> None: + self.tree_builder.data(data) + + +def parse_html(content: str) -> Element: + tree_builder = _AutoClosingTreeBuilder() + _HtmlTreeBuilderDriver(tree_builder).feed(content) + return tree_builder.close() diff --git a/pithub/mti.py b/pithub/mti.py new file mode 100644 index 0000000..a1290d5 --- /dev/null +++ b/pithub/mti.py @@ -0,0 +1,697 @@ +from collections import deque +from dataclasses import dataclass, replace +from datetime import date +from enum import Enum +from itertools import groupby +from pathlib import PurePosixPath +from typing import Callable, Iterable, Optional +from urllib.parse import parse_qs, urlencode, urlparse +from urllib.request import Request +from warnings import warn +from xml.etree.ElementTree import Element +import itertools +import json +import re + +from .common import Location, parse_date +from .html import parse_html +from .orchestration import ( + deduplicate_keyboard_parts, Instrument, KeyboardPart, Orchestration, parse_instrument, + parse_instrument_list, Part, PartInstruments, parts_to_instruments, PART_NAMES, + PERCUSSION_PART_NAMES, +) + + +@dataclass +class MaprefreshParams: + @dataclass + class SortSpec: + Key = Enum("Key", ["ORG_NAME"]) + + key: Key + ascending: bool = True + + show_id: Optional[int] = None + area: Optional[int] = None + page: Optional[int] = None # zero-based + sort_spec: Optional[SortSpec] = None + + def query_string_params(self) -> dict[str, list[str]]: + params = {} + if self.show_id is not None: + params["field_production_show_target_id"] = [str(self.show_id)] + if self.area is not None: + params["field_production_address_administrative_area"] = [str(self.area)] + if self.page is not None: + params["page"] = [str(self.page)] + if self.sort_spec: + params["order"] = ["field_production_org_name"] + params["sort"] = ["asc" if self.sort_spec.ascending else "desc"] + return params + + def to_request(self) -> Request: + qs = urlencode(self.query_string_params(), doseq = True) + return Request("https://www.mtishows.com/maprefresh?" + qs) + + +@dataclass(frozen = True) +class Production: + show_title: str + organization_name: str + opening: date + closing: Optional[date] + location: Location + + +def _zip_productions(map_view_production: "MapViewProduction", table_production: "TableProduction") -> Production: + [org_name] = {map_view_production.organization_name, table_production.organization_name} + [opening] = {map_view_production.opening, table_production.opening} + [closing] = {map_view_production.closing, table_production.closing} + + if map_view_production.venue is not None: + assert map_view_production.venue == table_production.venue + + return Production(org_name, opening, closing, map_view_production.location) + + +@dataclass(frozen = True) +class MapViewProduction: + show_title: str + opening: date + closing: Optional[date] + organization_name: str + venue: Optional[str] + location: Location + + +@dataclass(frozen = True) +class TableProduction: + show_title: str + organization_name: str + opening: date + closing: Optional[date] + venue: str + + +@dataclass +class MaprefreshData: + show_ids: list[int] + map_view_productions: list[MapViewProduction] + table_productions: list[TableProduction] + page_count: int + + +_Requester = Callable[[Request], str] + + +def fetch_maprefresh_page(params: MaprefreshParams, requester: _Requester) -> MaprefreshData: + body = requester(params.to_request()) + return parse_maprefresh(parse_html(body), params) + + +def _walk_pages(initial_params: MaprefreshParams, requester: _Requester) -> list[MaprefreshData]: + start_page = initial_params.page or 0 # server-side default of 0 + initial = fetch_maprefresh_page(initial_params, requester) + out = [initial] + for page_num in range(start_page + 1, initial.page_count): + curr_params = replace(initial_params, page = page_num) + maprefresh = fetch_maprefresh_page(curr_params, requester) + assert maprefresh.page_count == initial.page_count + out.append(maprefresh) + + if len(out) > 1: + # There should be minimal if any overlap between page contents. + for attr_name in ["map_view_productions", "table_productions"]: + production_count = sum(len(getattr(page, attr_name)) for page in out) + if production_count: + distinct_productions = set(itertools.chain.from_iterable( + getattr(page, attr_name) for page in out + )) + assert len(distinct_productions) / production_count > 0.97 + + return out + + +def fetch_all_productions(show_id: int, requester: _Requester, area: Optional[int] = None) -> list[Production]: + """ + When a particular ordering is not imposed by the request, there seem to be pagination problems; + primarily a production near the end of page n reappears near the beginning of page n + 1. This + is mitigated for the table view by specifying an order, but these request parameters don't seem + to affect the map view at all, so the pagination issue is unavoidable. + + The workaround is to use a sort order and the table view to get an authoritative list of + productions, and then fill in missing map productions by using more targeted queries whose + results fit on a single page. + """ + params = MaprefreshParams( + show_id, + area, + sort_spec = MaprefreshParams.SortSpec(MaprefreshParams.SortSpec.Key.ORG_NAME), + ) + pages = _walk_pages(params, requester) + + # TODO clean up + map_orgs = { + production.organization_name + for page in pages + for production in page.map_view_productions + } + table_orgs = { + production.organization_name + for page in pages + for production in page.table_productions + } + if map_orgs - table_orgs: + print("Resolving table shuffle") + reverse_params = replace( + params, + sort_spec = replace(params.sort_spec, ascending = False), + ) + pages.extend(_walk_pages(reverse_params, requester)) + + map_orgs = { + production.organization_name + for page in pages + for production in page.map_view_productions + } + table_orgs = { + production.organization_name + for page in pages + for production in page.table_productions + } + + map_missing = table_orgs - map_orgs + if map_missing: + print("missing from map") + for org_name in map_missing: + print(f" {org_name!r}") + + table_missing = map_orgs - table_orgs + if table_missing: + print("missing from table") + for org_name in table_missing: + print(f" {org_name!r}") + raise AssertionError() + + map_productions = list(itertools.chain.from_iterable(p.map_view_productions for p in pages)) + table_productions = list(itertools.chain.from_iterable(p.table_productions for p in pages)) + + """ + Because we can perturb the response ordering, the table is most authoritative, but only the map + data has precise location. Possible solutions: + + (1) get a clear set of productions from the table source and make separate geographically + targeted requests for the missing map data + + (2) geocode the address data in the tables + """ + # TODO merge sources, implementing one of the above solutions + table_count = len(set(table_productions)) + map_count = len(set(map_productions)) + if table_count < map_count: + warn(f"{map_count - table_count} more distinct map entries than distinct table entries") + elif map_count < table_count: + warn(f"{table_count - map_count} more distinct table entries than distinct map entries") + + return [ + Production( + production.show_title, + production.organization_name, + production.opening, + production.closing, + production.location + ) + for production in map_productions + ] + + +def get_productions(maprefresh: MaprefreshData) -> list[Production]: + def sort_key(production: MapViewProduction | TableProduction) -> str: + return (production.organization_name, production.opening, production.venue) + + from collections import Counter + mct = Counter(maprefresh.map_view_productions) + mdups = [k for (k, v) in mct.items() if v > 1] + if mdups: + print(" map dupes:") + for k in mdups: + idxs = [i for (i, v) in enumerate(maprefresh.map_view_productions) if k == v] + print(f" {k.organization_name} @{k.venue} {idxs}") + + tct = Counter(maprefresh.table_productions) + tdups = [k for (k, v) in tct.items() if v > 1] + if tdups: + print(" table dupes:") + for k in tdups: + idxs = [i for (i, v) in enumerate(maprefresh.table_productions) if k == v] + print(f" {k.organization_name} @{k.venue} {idxs}") + + map_keys = {p.organization_name for p in maprefresh.map_view_productions} + table_keys = {p.organization_name for p in maprefresh.table_productions} + if map_keys - table_keys: + print(" stray map keys:") + for k in map_keys - table_keys: + print(f" {k}") + if table_keys - map_keys: + print(" stray table keys:") + for k in table_keys - map_keys: + print(f" {k}") + + assert len(maprefresh.map_view_productions) == len(maprefresh.table_productions) + return list(itertools.starmap( + _zip_productions, + zip( + sorted(maprefresh.map_view_productions, key = sort_key), + sorted(maprefresh.table_productions, key = sort_key) + ) + )) + + +def _map_point_to_production(point_doc: dict) -> MapViewProduction: + (longitude, latitude) = point_doc["coordinates"] + location = Location(latitude, longitude) + + bubble_tree = parse_html("" + point_doc["properties"]["description"] + "") + [show_title, *rest] = [t.strip() for t in bubble_tree.itertext() if not t.isspace()] + + opening = None + closing = None + org_name = None + venue = None + + to_seen = False + url_seen = False + for part in rest: + if date := parse_date(part): + if not opening: + opening = date + else: + assert not closing + closing = date + elif part == "to": + assert opening and not closing + to_seen = True + elif re.search("ttps?://", part): + assert not url_seen + url_seen = True + else: + # org name or venue + if org_name: + assert not venue + venue = part + else: + org_name = part + + assert opening + if to_seen: + assert closing + assert org_name + + return MapViewProduction(show_title, opening, closing, org_name, venue, location) + + +def _table_row_to_production(tr: Element) -> TableProduction: + [title_td, _, org_td, _, dates_td, venue_td] = tr + + title = title_td.text.strip() + + [raw] = [v for v in org_td.itertext() if not v.isspace()] + org_name = raw.strip() + + dates_parts = [v.strip() for v in dates_td.itertext() if not v.isspace()] + if len(dates_parts) == 1: + # no closing date + [opening_str] = dates_parts + closing_str = None + else: + [opening_str, to, closing_str] = dates_parts + assert to == "to" + + opening = parse_date(opening_str) + closing = closing_str and parse_date(closing_str) + + venue = venue_td.text.strip() + return TableProduction(title, org_name, opening, closing, venue) + + +def parse_maprefresh(page_root: Element, params: MaprefreshParams) -> MaprefreshData: + assert page_root.tag == "html" + head = page_root[0] + assert head.tag == "head" + [inject_expr] = [ + m[1] for el in head.iter("script") + if (m := re.match(r"\s*jQuery\.extend\(Drupal\.settings, *(.+)\);\s*$", el.text or "")) + ] + + map_data = json.loads(inject_expr)["geofieldMap"]["production-listing-refresh-page-2"]["data"] + if map_data == []: + map_view_productions = [] + elif map_data["type"] == "Point": + map_view_productions = [_map_point_to_production(map_data)] + else: + assert map_data["type"] == "GeometryCollection" + map_view_productions = list(map(_map_point_to_production, map_data["geometries"])) + + block_system_main = next(e for e in page_root.iter("div") if e.attrib.get("id") == "block-system-main") + [content_div] = block_system_main + [view_div] = content_div + + item_list = next( + (el for el in view_div if "item-list" in el.attrib.get("class", "").split()), + None + ) + if item_list is not None: + base_qs_params = {k: v for (k, v) in params.query_string_params().items() if k != "page"} + + def page_num_for_url(url: str) -> int: + parsed = urlparse(ul[-1][0].attrib["href"]) + assert not parsed.netloc + assert parsed.path == "/maprefresh" + assert not parsed.params + + qs_params = parse_qs(parsed.query) + [page_num_str] = qs_params.pop("page") + assert qs_params == base_qs_params + return int(page_num_str) + + [ul] = item_list + [pager_current] = (el for el in ul if "pager-current" in el.attrib["class"].split()) + page_num = int(pager_current.text) - 1 + + # On the last page there's no pager-last button so we take the page count from the current + # page number. + pager_last_els = [el for el in ul if "pager-last" in el.attrib["class"].split()] + if pager_last_els: + [pager_last] = pager_last_els + page_count = page_num_for_url(pager_last[0].attrib["href"]) + 1 + else: + page_count = page_num + 1 + else: + page_num = 0 + page_count = 1 + assert page_num == (params.page or 0) + + [view_filters] = (el for el in view_div if "view-filters" in el.attrib.get("class", "").split()) + [attachment_div] = (el for el in view_div if "attachment" in el.attrib.get("class", "").split()) + + show_select = next( + el for el in view_filters.iter("select") + if el.attrib["id"] == "edit-field-production-show-target-id" + ) + [blank, *options] = show_select + assert not blank.attrib["value"] + show_ids = [int(option.attrib["value"]) for option in options] + + [view] = attachment_div + + els = [el for el in view if "view-content" in el.attrib.get("class", "").split()] + if els: + [view_content] = els + [table] = view_content + [thead, tbody] = table + [tr] = thead + assert [th[0].text for th in tr] == ["Title", "Address", "Org Name", "Website", "Dates", "Venue"] + table_productions = list(map(_table_row_to_production, tbody)) + else: + [view_empty] = view + assert "view-empty" in view_empty.attrib["class"] + table_productions = [] + + if params.page: + # shouldn't get empty results from a request beyond the first page; possible pagination + # handling error + assert map_view_productions and table_productions + + # Pagination size is 200 productions; see if it looks like we may have missed the page buttons. + if page_count == 1 and len(map_view_productions) == 200: + title = map_view_productions[0].show_title + warn(f"200-show single page result for {title}; possible missed pagination") + + return MaprefreshData(show_ids, map_view_productions, table_productions, page_count) + + +def parse_show_index(page_root: Element) -> Iterable[str]: + """ + Parser for /shows/all. + + This gives URL IDs for shows but not numerical IDs, so is not useful for production scraping + except as a consistency check on the full show list. It also seems to be missing some shows + present in the Maprefresh. Returns URL IDs as strings. Shows are at + https://www.mtishows.com/. + """ + main_content = next( + e for e in page_root.iter("div") + if "main-content" in (e.attrib.get("class") or "").split() + ) + section = main_content[0] + block_system_main = next(e for e in section.iter("div") if e.attrib.get("id") == "block-system-main") + [content_div] = block_system_main + + [alpha_nav, *alpha_containers] = content_div + assert alpha_nav.attrib["id"] == "show-links" + assert len(alpha_containers) >= 25 + + item_sets = [ + list(map(_id_for_item_div, _alpha_container_items(container))) + for container in alpha_containers + ] + assert all(item_sets) + return itertools.chain.from_iterable(item_sets) + + +def _id_for_item_div(item_div: Element) -> str: + [_, link] = item_div + url = urlparse(link.attrib["href"]) + [id_] = PurePosixPath(url.path).relative_to("/").parts + return id_ + + +def _alpha_container_items(container: Element) -> list[Element]: + [h1, *items, top_link] = container + assert h1.tag == "h1" + assert top_link.text == "Back to top" + return items + + +def build_materials_request(show_id: int) -> Request: + return Request(f"https://www.mtishows.com/colorbox/mti_resources/show_materials/{show_id}") + + +def _merge_instruments(part_instrs: list[Instrument], double_instrs: PartInstruments) -> PartInstruments: + if part_instrs == [Instrument.GUITAR] and any("GUITAR" in i.name for i in double_instrs.required): + required = double_instrs.required + else: + required = [*part_instrs, *double_instrs.required] + return replace(double_instrs, required = required) + + +def _parse_part(instr_text: str, doubling_text: Optional[str], quantity_text: str) -> list[Part]: + quantity = int(quantity_text) + + ignore_texts = { + "ACCOMP. AND SFX INSTRUCTIONS", + "ADD.GUITAR PARTS FOR #9 & #11", + "ERRATA LIST", + "FULL SCORE", + "OPT. ORCHESTRATION MATERIAL", + "OPTIONAL MARCHING BAND PARTS", + } + ignore_patterns = [ + r"\d+PC FULL SCORE VOL \d", + ] + if not doubling_text: + if instr_text in ignore_texts: + return [] + if any(re.match(patt + "$", instr_text) for patt in ignore_patterns): + return [] + + dealted = re.match(r"((ALT|FEMALE VERS): )?(?P
.+)$", instr_text)["main"] + m = re.match(r"(OPT: )?(.+)$", dealted) + optional = bool(m[1]) + deopted = m[2] + if ": " in deopted: + [part_name, extra_str] = deopted.split(": ") + assert part_name + extra_part_instrs = list(itertools.chain.from_iterable( + map( + parse_instrument, + itertools.chain.from_iterable( + [f"{m[1]} {m[3]}", f"{m[2]} {m[3]}"] + if (m := re.match(r"([^ ]+) & ([^ ]+) ([^ ]+)$", p)) + else [p] + for p in extra_str.split(", ") + ) + ) + )) + elif part := KeyboardPart.parse(deopted.removesuffix(" ACT 1")): + return [replace(part, optional = optional, players = quantity)] + else: + part_name = deopted + extra_part_instrs = [] + + part_match = re.match( + ( + r"(ON ?STAGE )?(?P.+?)( ?\d| 1 ?(?P& ?2)?| [A-C]| (?PA-B)?)?" + + r"( \(DOUBLES (?P.+)\))?$" + ), + part_name, + ) + + players = None + if doubling_text: + m = re.match(r"(\(OPTIONAL\) *, )?(.+)$", doubling_text) + if m[1]: + optional = True + deopted_doubling = m[2] + + if m := re.match(r"\((\d) PLAYERS? (REQUIRED|MINIMUM)\)$", deopted_doubling): + double_instrs = PartInstruments(False, [], []) + players = int(m[1]) + elif re.match(r"OPT: SUB FOR \w+( \d)?$", deopted_doubling): + double_instrs = PartInstruments(False, [], []) + optional = True + elif part_match["instr"].lower() in PERCUSSION_PART_NAMES: + # future enhancement: percussion instrument parsing + double_instrs = PartInstruments(False, [], []) + else: + double_instrs = parse_instrument_list(deopted_doubling, part_match["instr"]) + else: + double_instrs = PartInstruments(False, [], []) + if part_match["dbl"]: + [doubled] = parse_instrument(part_match["dbl"]) + double_instrs.required.append(doubled) + + part_instrs = [ + *( + [] if part_match["instr"].lower() in PART_NAMES + else parts_to_instruments(part_match["instr"]) + ), + *extra_part_instrs, + ] + merged = _merge_instruments(part_instrs, double_instrs) + required = merged.required + optional_instrs = merged.optional + if part_match["multi"] or part_match["multi2"]: + if part_match["multi"]: + multis = [1, 2] + else: + multis = ["A", "B"] + + if quantity == 1: + players = 2 + else: + assert players is None and quantity == 2 + instr = part_match["instr"].removesuffix("S") + return [ + Part( + f"{instr} {multi}", + list(dict.fromkeys(itertools.chain(extra_part_instrs, required))), + optional_instruments = optional_instrs, + optional = optional, + ) + for multi in multis + ] + + deduped_required = list(dict.fromkeys(required)) + derived_players = quantity if players is None else players + + parts = [Part( + part_name, + deduped_required, + optional = optional, + players = derived_players, + optional_instruments = optional_instrs, + )] + return [p.maybe_convert_down() for p in parts] + + +def get_orchestrations(materials_json: list) -> list[Orchestration]: + [cmd_obj] = [obj for obj in materials_json if obj["command"] == "insert" and obj["method"] == "html"] + + # e.g. All Together Now! + if not cmd_obj["data"]: + # TODO validation against other show info? + return [] + + tree = parse_html("" + cmd_obj["data"] + "") + html_parts = deque(tree) + + main_header = html_parts.popleft() + assert main_header.tag == "h1" and main_header.text.startswith("Materials for ") + + # The first table lists materials such as libretti and piano-conductor scores. From what I can + # tell none of these are meant to be played in the orchestra during a production, with the + # exception of piano-conductor parts when also listed in the orchestration table. + aux_table = html_parts.popleft() + assert aux_table.tag == "table" + + if len(aux_table) == 1: + assert all(el.tag == "th" for el in aux_table[0]) + # TODO validation that no-orchestration shows are JR/KIDS or unreleased + return [] + + [thead, tbody] = aux_table + assert thead.tag == "thead" + assert [el.text for el in thead.iter("th")] == ["Resource", "Quantity"] + assert tbody.tag == "tbody" + + def deact_key(tr: Element) -> str: + m = re.match(r"(.+?)( ACT ?\d)?(/KEYBOARD3)?$", tr[0].text.strip()) + return m[1] + (m[3] or "") + + resource_parts = [] + for (deacted, group_iter) in groupby(tbody, deact_key): + group = list(group_iter) + [count] = {int(tr[1].text) for tr in group} + if part := KeyboardPart.parse(deacted): + # More than one piano-conductor part may be provided, but only one will be used in + # performance. + if count > 1: + part.players = count + elif part.players is None: + part.players = 1 + resource_parts.append(part) + else: + assert ( + deacted.lower() == "piano vocal score" + or not any( + part in deacted.lower() for part in ["keyboard", "piano", "synth"] + ) + ) + + if resource_parts: + assert len(resource_parts) <= 2 + + # Some shows have both a standard and an alternate orchestration. + orchestrations = [] + while html_parts: + h3 = html_parts.popleft() + assert h3.tag == "h3" and "orchestration" in h3.text.lower() + + table = html_parts.popleft() + assert table.tag == "table" + [thead, tbody] = table + assert thead.tag == "thead" + assert [el.text for el in thead.iter("th")] == ["Instrumentation", "Doubling", "Quantity"] + + assert tbody.tag == "tbody" + orchestration_parts = [] + for (deacted, group_iter) in groupby(tbody, deact_key): + group = list(group_iter) + [doubling_val] = {tr[1].text for tr in group} + [count] = {tr[2].text for tr in group} + orchestration_parts.extend( + _parse_part(deacted.strip(), doubling_val and doubling_val.strip(), count.strip()) + ) + + joined = resource_parts + orchestration_parts + all_parts = deduplicate_keyboard_parts(joined) + + # one orchestration seems to have all parts marked optional by mistake + if all(p.optional for p in all_parts): + for p in all_parts: + p.optional = False + + orchestrations.append(Orchestration(h3.text, all_parts)) + + return orchestrations diff --git a/pithub/orchestration.py b/pithub/orchestration.py new file mode 100644 index 0000000..e4a54bf --- /dev/null +++ b/pithub/orchestration.py @@ -0,0 +1,1121 @@ +from collections import Counter, defaultdict, deque +from dataclasses import asdict, dataclass, replace +from enum import Enum +from itertools import groupby, pairwise +from os import environ +from typing import Collection, Iterable, Iterator, Optional, Union +import dataclasses +import itertools +import re + +from pithub.common import throwing_factory + + +def parse_orchestrations(lines: list[str]) -> Iterator["Orchestration"]: + remaining = [l.strip() for l in lines if not l.isspace()] + while remaining: + if (res := _pop_uninteresting_section(remaining)) is not None: + remaining = res + else: + result = Orchestration.pop_from_lines(remaining) + if isinstance(result, Orchestration.ParseFailure): + raise RuntimeError(f"Orchestration parsing stuck at line: {result.stuck_line!r}") + else: + (orchestration, remaining) = result + yield orchestration + + +def _fix_bass_part(parts: list["Part"]) -> None: + # Does "bass" mean upright or bass guitar? We should be able to find out heuristically. + Kind = Enum("Kind", ["BASS", "STRING", "OTHER"]) + + def classify(part: Part) -> Kind: + if part.name == "Bass": + return Kind.BASS + elif {*part.required_instruments, *part.optional_instruments} & STRING_INSTRUMENTS: + return Kind.STRING + else: + return Kind.OTHER + + groups = [(kind, list(parts)) for (kind, parts) in groupby(parts, classify)] + bass_idxs = [i for (i, (kind, _)) in enumerate(groups) if kind is Kind.BASS] + if bass_idxs: + [bass_idx] = bass_idxs + [bass_part] = groups[bass_idx][1] + adjacent_kinds = [ + groups[i][0] for i in [bass_idx - 1, bass_idx + 1] + if 0 <= i < len(groups) + ] + if not bass_part.required_instruments and not bass_part.optional_instruments: + if Kind.STRING in adjacent_kinds: + bass_part.required_instruments.insert(0, Instrument.UPRIGHT_BASS) + elif bass_part._double: + # We don't want to miss e.g. a guitar part calling for acoustic guitar implicitly by + # merely stating it doubles electric guitar. + assert {Instrument.BASS_GUITAR, Instrument.UPRIGHT_BASS} <= { + *bass_part.required_instruments, *bass_part.optional_instruments, + } + + +@dataclass +class Orchestration: + @dataclass + class ParseFailure: + stuck_line: str + + description: str + parts: list["Part"] + + @classmethod + def pop_from_lines(cls, lines: list[str]) -> tuple["Orchestration", list[str]]|ParseFailure: + queue = deque(lines) + if "orchestration" in queue[0].lower() or queue[0].endswith(":"): + title_line = queue.popleft() + title = title_line.removesuffix(":") + assert "orchestration" in title.lower() or re.search(r"\d-Piece\b", title) + else: + title = None + + parts = [] + optional = False + check_piano = False + while queue: + if queue[0] == "Optional Orchestra Parts:": + queue.popleft() + optional = True + elif (m := re.match(r"\(([^\d]+)\)$", queue[0])) and m[1] in _PART_NOTES: + queue.popleft() + elif re.match(r"\(Note: .+\)$", queue[0]): + queue.popleft() + else: + if environ.get("LIVE_DEBUG"): + result = Part.parse_from_line(queue[0]) + else: + try: + result = Part.parse_from_line(queue[0]) + except Exception: + raise RuntimeError(f"Error parsing line: {queue[0]!r}") + + if result is None: + break + else: + queue.popleft() + if isinstance(result, list): + for v in result: + part = v.part if isinstance(v, _ActPart) else v + part.optional |= optional + parts.append(v) + else: + assert isinstance(result, IgnoredPart) + if result is IgnoredPart.REHEARSAL_PIANO: + check_piano = True + + if parts: + zipped_parts = [] + key = lambda v: ( + isinstance(v, _ActPart), + v.part if isinstance(v, _ActPart) else v + ) + for ((act, part), group) in groupby(parts, key): + if act: + assert len(list(group)) > 1 + zipped_parts.append(part) + else: + zipped_parts.extend(group) + assert all(isinstance(p, Part) for p in zipped_parts) + + for part in zipped_parts: + if Instrument.KEYBOARD in part.instruments(): + assert isinstance(part, KeyboardPart) + # TODO clean this up + converted = [p.maybe_convert_down() for p in zipped_parts] + + if check_piano: + # We ignore rehearsal-specific parts, but if we see one we expect a real piano + # performance part. + assert any(isinstance(p, KeyboardPart) for p in converted) + + fixed_parts = deduplicate_keyboard_parts(converted) + _fix_bass_part(fixed_parts) + + return (cls(title, fixed_parts), list(queue)) + else: + return cls.ParseFailure(queue[0]) + + +@dataclass +class _ActPart: + """ + Sometimes two parts are listed when one logical part is physically separated by act (in + practice, large keyboard parts). We parse these separately but then zip them up, so this tags + them internally in the parser interface. + """ + part: "Part" + + +def parts_to_instruments(parts: str) -> list["Instrument"]: + ret = list(itertools.chain.from_iterable( + parse_instrument(re.sub(r" \d$", "", subpart.removeprefix("AKA "))) + for subpart in re.split("/|, | & ", parts) + )) + assert all(isinstance(v, Instrument) for v in ret) + return ret + + +def _is_coverage_note(detail: str) -> bool: + num_patt = r"([1-9]*[0-9]a?)" + token_patt = fr"{num_patt}(-{num_patt})?" + return bool(re.match(fr"Nos\. {token_patt}(, {token_patt})*$", detail)) + + +_PART_PATTERN = ( + r"(?P\d+)(?P Optional)?" + + r" +(?P(?P[A-Za-z &]+?([ /-][A-Za-z1-2]+){,2}?)( +(?P\d+(&\d+)*)| [A-D])?)" + + r" *(\(((?POptional)|(?P[^)]+))\))?" + + r"( +[–—-] +(?P.+?)(?P \(optional\))?| +\((?Prehearsal|performance)\))?$" +) + + +@dataclass +class Part: + """ + Generally this corresponds one-to-one with an entry in an orchestration list (where the entry + represents something actually played during a performance), but entries like "Percussion 1 & 2" + are expanded into two instances. + """ + name: Optional[str] + required_instruments: list[Union["Instrument", "Choice"]] + players: Optional[int] = None # None is module internal use only + optional: bool = False + role: Optional["AdditionalRole"] = None + optional_instruments: list["Instrument"] = dataclasses.field(default_factory = list) + _double: bool = False # module internal use + + def __post_init__(self) -> None: + # remove duplicate instruments from each list + self.required_instruments = list(dict.fromkeys(self.required_instruments)) + self.optional_instruments = list(dict.fromkeys(self.optional_instruments)) + + for v in self.instruments(): + # TODO re-enable these checks when test serialization nonsense is fixed + """ + if isinstance(v, Choice): + assert all(isinstance(opt, Instrument) for opt in v.choices) + else: + assert isinstance(v, Instrument) + """ + + @classmethod + def parse_from_line(cls, line: str) -> Optional[Union[list[Union["Part", "_ActPart"]], "IgnoredPart"]]: + # TODO much of this logic is currently Concord-specific and should be factored out into + # concord module + if re.match(r"\d (Logo Pack|Pre-Show Music Lead Sheets)$", line): + return IgnoredPart.OTHER + elif match := re.match(_PART_PATTERN, line): + is_optional = bool(match["opt"] or match["opt2"] or match["opt3"]) + + player_count = int(match["count"]) + if match["paren"] and (m := re.match(r"(\d+) players?(,? divisi)?$", match["paren"], re.IGNORECASE)): + player_count = int(m[1]) + explicit_player_count = True + elif match["paren"] == "one or more players": + player_count = 1 + explicit_player_count = True + else: + explicit_player_count = False + + if match["nums"]: + ordinals = list(map(int, match["nums"].split("&"))) + else: + ordinals = [] + + rps = set(filter(None, [match["paren"], match["rp"]])) & {"rehearsal", "performance"} + if match["part"] == "Full Score": + assert not match["opt"] + assert re.match(r"Act \d \(stick conductor\)$", match["list"]) + return IgnoredPart.CONDUCTOR + elif match["part"] in {"Libretto-Vocal Book", "Vocal Book", "Libretti"}: + return IgnoredPart.LIBRETTO + elif match["instr"] == "Piano-Vocal" and rps <= {"rehearsal"}: + # TODO try converting instr check to assertion to tighten rp checking + assert not match["opt"] + return IgnoredPart.REHEARSAL_PIANO + elif part := KeyboardPart.parse(match["part"]): + # ignore part count as materials often include extras of these + if is_optional: + part = replace(part, optional = True) + if match["list"]: + assert match["list"].lower() in { + "doubling celeste", + "intended to supplement a small string section", + "primarily accordion", + "replaces violin, viola & cello", + } + include = True + if match["paren"]: + act_part = bool(re.match(r"Act \d$", match["paren"])) + if match["paren"] == "rehearsal & stick conductor": + include = False + else: + assert ( + match["paren"] == "rehearsal & performance" + or act_part and match["rp"] == "performance" + ) + else: + act_part = False + if include: + return [_ActPart(part) if act_part else part] + else: + return [] + elif player_count == 1: + if match["paren"] and not explicit_player_count: + assert match["paren"] == "** FOLEY ARTIST FISH" + optional_instr = [] + if match["list"] and not _is_coverage_note(match["list"]): + assert not is_optional + if match["list"] == "optional part": + list_text = "" + is_optional = True + elif match["list"].startswith("(optional)"): + list_text = match["list"].removeprefix("(optional)").strip() + is_optional = True + else: + list_text = match["list"] + if list_text.startswith("primarily "): + primarily = list_text.removeprefix("primarily ") + else: + primarily = None + if list_text: + if match["instr"].lower() in PART_NAMES: + context_instr = match["instr"] + else: + context_instr = parts_to_instruments(match["instr"])[0].name # Concord 93490 + res = parse_instrument_list(primarily or list_text, context_instr) + else: + res = PartInstruments(False, [], []) + required = res.required + optional_instr = res.optional + if match["instr"].lower() not in PART_NAMES: + from_part = parts_to_instruments(match["instr"]) + if from_part == [Instrument.KEYBOARD]: + required = from_part + primarily = None + optional_instr = [] + elif Instrument.GUITAR in from_part: + has_specific_instr = any( + "GUITAR" in i.name for i in required if i is not Instrument.GUITAR + ) + if not res.double and has_specific_instr: + required = [ + i for i in dict.fromkeys(itertools.chain(from_part, required)) + if i is not Instrument.GUITAR + ] + else: + required = list(dict.fromkeys(itertools.chain(from_part, required))) + else: + required = list(dict.fromkeys(itertools.chain(from_part, required))) + double = res.double + if primarily: + assert set(parse_instrument(primarily)) <= {*required, *optional_instr} + else: + if match["instr"].lower() in PART_NAMES: + # non-instrument part with no instrument information + required = [] + else: + required = parts_to_instruments(match["part"]) + double = False + return [ + cls( + name = match["part"], + required_instruments = required, + players = 1, + optional_instruments = optional_instr, + optional = is_optional, + _double = double, + ) + ] + elif player_count > 1 and match["part"].endswith("s"): + assert not is_optional + assert not match["paren"] and not match["list"] + clean_part = match["part"].removesuffix("s") + required = parse_instrument(clean_part) + return [cls(name = clean_part, required_instruments = required, players = player_count)] + else: + assert not is_optional + if match["instr"].lower() in PART_NAMES: + assert player_count == 2 + if len(ordinals) == 2: + if match["list"]: + sublists = match["list"].split(". ") + assert len(sublists) == 2 + players = { + (m := re.match(r"(.+?) Player: (.+)$", sublist))[1]: + parse_instrument_list(m[2], match["instr"]) + for sublist in sublists + } + optional = match["paren"] and match["paren"].removesuffix(" Player is optional") + else: + # non-instrument part with no instrument information + players = { + "{} {}".format(match["instr"], ordinal): PartInstruments(False, [], []) + for ordinal in ordinals + } + optional = None + if optional: + assert optional in players + return [ + cls( + name = "{} {}".format(match["instr"], ordinal), + required_instruments = instruments.required, + players = 1, + optional_instruments = instruments.optional, + optional = optional is not None and name == optional, + _double = instruments.double, + ) + for (ordinal, (name, instruments)) in zip(ordinals, players.items()) + ] + else: + assert not ordinals + return [cls(match["part"], required_instruments = [], players = player_count)] + elif player_count == len(ordinals) == 2: + assert not match["paren"] + if match["list"]: + # TODO is this code dead? + raise RuntimeError("expected dead code; pipe double indication for bass part munging") + assert "dbl" in match["list"] + double = parse_instrument(match["list"].removeprefix("Both dbl. ")) + else: + double = [] + return [ + cls( + name = "{} {}".format(match["instr"], ordinal), + required_instruments = [*parse_instrument(match["instr"]), *double], + players = 1, + _double = False, + ) + for ordinal in ordinals + ] + else: + assert not match["list"] or match["list"].lower() == "divisi" + assert not match["rp"] + return [cls( + name = match["part"], + required_instruments = parts_to_instruments(match["instr"]), + players = player_count, + )] + else: + return None + + def instruments(self) -> Iterator[Union["Instrument", "Choice"]]: + return itertools.chain(self.required_instruments, self.optional_instruments) + + def maybe_convert_down(self) -> "Part": + if not isinstance(self, KeyboardPart) and set(self.instruments()) == {Instrument.KEYBOARD}: + raise RuntimeError("obsolete") + if ordinals := re.findall(r"\d+", self.name): + [ordinal] = ordinals + else: + ordinal = None + + kind = ( + KeyboardPart.Kind.PRIMARY if ordinal in [None, "1"] and any( + keyword in self.name.lower() + for keyword in ["piano", "keyboard", "synthesizer"] + ) + else KeyboardPart.Kind.AUXILIARY + ) + return KeyboardPart( + **asdict(self), + ordinal = int(ordinal) if ordinal else None, + ) + else: + return self + + +def deduplicate_keyboard_parts(parts: Iterable[Part]) -> list[Part]: + """ + Keyboard 1, Piano + Piano-Conductor, Synthesizer + KEYBOARD1 - CONDUCTOR SCORE, KEYBOARD 1 (!) + """ + pc_parts = [ + part for part in parts + if isinstance(part, KeyboardPart) and part.role is AdditionalRole.CONDUCTOR + ] + if pc_parts: + pc_part = max(pc_parts, key = lambda p: len(p.name)) + if pc_part.ordinal is None: + conflicts = [ + part for part in parts if ( + isinstance(part, KeyboardPart) + and part.role is None + and set(part.instrument_names) & {"piano", "keyboard"} + and part.ordinal in [None, 1] + ) + ] + else: + assert set(pc_part.instrument_names) <= {"piano", "keyboard"} + conflicts = [ + part for part in parts if ( + part is not pc_part and isinstance(part, KeyboardPart) + and part.instrument_names == ["keyboard"] and part.ordinal == pc_part.ordinal + ) + ] + else: + pc_part = None + conflicts = [] + + return [ + part.convert_up() if isinstance(part, KeyboardPart) else part + for part in parts + if ( + part is pc_part + or (isinstance(part, KeyboardPart) and part.role is None and part not in conflicts) + or not isinstance(part, KeyboardPart) + ) + ] + + +@dataclass +class KeyboardPart(Part): + Kind = Enum("Kind", ["PRIMARY", "AUXILIARY"]) + + # Due to fields in the supertype with defaults these must have defaults, but we enforce that + # they're always explicitly specified. + instrument_names: list[str] = dataclasses.field(default_factory = throwing_factory) + ordinal: Optional[int] = dataclasses.field(default_factory = throwing_factory) + + @classmethod + def parse(cls, part_name: str) -> Optional["KeyboardPart"]: + patt = ( + r"(?Pkeyboard|piano|(string )?synth(esizer)?)(?Ps)? ?(?P\d)?" + + r" ?-? ?(?Pconductor(('s)? score)?)?" + + r"(/(aka )?(?Pkeyboard|synthesizer) ?(?P\d)?)?$" + ) + if m := re.match(patt, part_name.lower()): + base = re.sub(r"\bsynth\b", "synthesizer", m["base"]) + + if m["ord"] or m["ord2"]: + [ordinal] = set(filter(None, [m["ord"], m["ord2"]])) + else: + ordinal = None + + if m["pl"]: + # plural part name suggests >1 player + assert not (ordinal or m["cond"] or m["base2"]) + players = 2 + else: + players = None + + return KeyboardPart( + part_name, + [Instrument.KEYBOARD], + players, + role = AdditionalRole.CONDUCTOR if m["cond"] else None, + instrument_names = [n for n in [base, m["base2"]] if n], + ordinal = int(ordinal) if ordinal else None, + ) + elif m := re.match(r"(organ|piano)[/-](dulcimer|synthesizer|celeste)$", part_name.lower()): + return KeyboardPart( + part_name, + [Instrument.KEYBOARD], + instrument_names = [m[1], m[2]], + ordinal = None, + ) + else: + return None + + def convert_up(self) -> Part: + generic_field_names = {field.name for field in dataclasses.fields(Part)} + params = { + name: value + for (name, value) in asdict(self).items() + if name in generic_field_names + } + return Part(**params) + + +@dataclass +class PartInstruments: + # whether these instruments are from a list marked as "doubling" + double: bool = False + + required: list[Union["Instrument", "Choice"]] = ( + dataclasses.field(default_factory = list) + ) + optional: list[Union["Instrument", "Choice"]] = ( + dataclasses.field(default_factory = list) + ) + + def merge(self, other: "PartInstruments") -> "PartInstruments": + [double] = {self.double, other.double} + return PartInstruments( + double = double, + required = self.required + other.required, + optional = self.optional + other.optional, + ) + + +def parse_instrument_list(list_: str, part_instr: str) -> PartInstruments: + def parse_list_part(list_part: str) -> list[Union[Instrument, Choice]]: + if m := re.match(r"(?P.+) \(or (?P.+)\)$", list_part, re.IGNORECASE): + # This could be an instrument choice, or for percussion a note suggesting a way to + # emulate the instrument; in the latter case we ignore the note. + [a] = parse_instrument(m["a"], part_instr) + try: + [b] = parse_instrument(m["b"], part_instr) + except ParseError: + pass + else: + return [Choice((a, b))] + elif m := re.match(r"(?P.+) DOUBLES (?P.+)$", list_part): + return parse_instrument(m["a"], part_instr) + parse_instrument(m["b"], part_instr) + return parse_instrument(list_part, part_instr) + + def split_simple_list(simple_list: str) -> Iterator[str]: + if re.search(r"\([^)]*$", simple_list): + simple_list += ")" + + # split into parenthesized and non-parenthesized parts + paren_tok = re.split(r"(?=\()|(?<=\)(?!$))", simple_list) + + # split only unparenthesized parts as instrument lists + split = list(itertools.chain.from_iterable( + re.split(r" *, (?![^(]*\))| (?:&|and) +(?!.*,)", tok) if tok[0] != "(" else [tok] + for tok in paren_tok + )) + + # combine parenthesized parts with preceding list elements + for (a, b, c) in zip(itertools.chain([None], split), split, itertools.chain(split[1:], [None])): + if b and b[0] == "(": + assert not a or a[0] != "(" and not c or c[0] != "(" + yield "".join(filter(None, [a, b, c])) + elif (not a or a[0] != "(") and (not c or c[0] != "("): + yield b + + def parse_simple_list(simple_list: str) -> PartInstruments: + # match a comma not contained within parentheses, or &/and appearing after any commas + required = [] + optional = [] + for part in split_simple_list(simple_list): + deopt_patt = r"(?P\(?(optional|opt\.)\)? *)?(?P.+?)(?P \((optional|opt\.)\))?$" + m = re.match(deopt_patt, part.strip(), re.IGNORECASE) + if part_instr.lower() not in PERCUSSION_PART_NAMES or m["part"] not in _PERCUSSION_WTF: + (optional if m["opt"] or m["opt2"] else required).extend(parse_list_part(m["part"])) + return PartInstruments(False, required, optional) + + (dbl, dedoubled) = re.match(r"(dbl\. |doubles |doubling )?(.+)$", list_, re.IGNORECASE).groups() + + if m := re.match(r"(.+)\. (.+) and (.+) are double lined for (.+)$", dedoubled): + [denoted, *note_instr_names] = m.groups() + else: + denoted = dedoubled + note_instr_names = [] + + simple_lists = denoted.split(" & optional ") + if len(simple_lists) == 2: + (main, optional) = map(parse_simple_list, simple_lists) + assert not main.optional and not optional.optional + result = PartInstruments(bool(dbl), main.required, optional.required) + else: + [simple_list] = simple_lists + result = replace(parse_simple_list(simple_list), double = bool(dbl)) + + for note_instr_name in note_instr_names: + [note_instr] = parse_instrument(note_instr_name) + assert note_instr in result.required + return result + + +class ParseError(Exception): + pass + + +def parse_instrument(text: str, context_instr: Optional[str] = None) -> list["Instrument"]: + def to_identifier(name: str) -> str: + identifier = re.sub( + r"\s|[-']", "_", + name.upper().replace(" - ", " ").replace("’", "").replace('"', "") + ) + if identifier[0].isdigit(): + identifier = "_" + identifier + return identifier + + normalized = text.lower() + if context_instr and context_instr.lower() == "guitar": + normalized = re.sub(r"( +|-)(strat(ocaster)?|gretsch|telecaster)$", "", normalized) + normalized = re.sub(r"(?<=\b[a-g])(b|[- ]flat)\b", "♭", normalized) + normalized = re.sub(r"\bacous\.", "acoustic", normalized) + normalized = re.sub(r"\bbari\b", "baritone", normalized) + normalized = re.sub(r"\bcym\b\.?", "cymbal", normalized) + normalized = re.sub(r"\belec\.", "electric", normalized) + normalized = re.sub(r"\bglock\.", "glockenspiel", normalized) + normalized = re.sub(r"\bgtr\.", "guitar", normalized) + normalized = re.sub(r"\borch\b", "orchestra", normalized) + normalized = re.sub(r"\bsax\b", "saxophone", normalized) + normalized = re.sub(r"\bsop\. *", "soprano ", normalized) + normalized = re.sub(r"\bstrg\b", "string", normalized) + normalized = re.sub(r"\bsus\.", "suspended", normalized) + normalized = re.sub(r"\bsynth\b", "synthesizer", normalized) + normalized = re.sub(r"\bten\. *", "tenor ", normalized) + + m = re.match(r"(?P.+?)( *\((?P.+)\))?$", normalized) + name = m["name"].lower() + if context_instr: + if context_instr.lower() in PERCUSSION_PART_NAMES: # "2 Timpani" + name = re.sub(r"^\d+ ", "", name) + name = name.removeprefix("loud ") + if name == "bass": + assert not m["note"] + return [Instrument.BASS_DRUM] + elif context_instr.lower() == "guitar": + name = name.removeprefix("6-string ") + + @dataclass + class Form: + value: str + using_note: bool = False + + forms = [ + Form(name), + Form(name[:-1] if name.endswith("s") else name + "s"), + ] + if context_instr: + forms.append(Form(name + " " + context_instr)) + if m["note"]: + forms.append(Form(m["note"] + " " + name, True)) + + for (i, form) in enumerate(forms): + identifier = to_identifier(form.value) + parsed = _INSTRUMENT_TYPOS.get(identifier) + if not parsed: + try: + parsed = Instrument[identifier] + except KeyError as key_error: + if i == 0: + orig_err = key_error + if parsed is Instrument.DRUM_KIT and m["note"]: + return list(itertools.chain.from_iterable( + parse_instrument(part, context_instr) + for part in re.split(r", | (?:and|&) ", m["note"]) + )) + elif parsed: + if m["note"]: + note_okay = ( + form.using_note + or m["note"] in _INSTRUMENT_NOTES.get(parsed, []) + or re.match(r"\d+(-\d+)?(, \d+(-\d+)?)*$", m["note"]) + ) + if not note_okay: + raise ValueError(f"Unrecognized note {m['note']!r} for {parsed.name}") + return [parsed] + raise ParseError(f"Unhandled instrument {text!r}") from orig_err + + +IgnoredPart = Enum("IgnoredPart", ["CONDUCTOR", "LIBRETTO", "REHEARSAL_PIANO", "OTHER"]) + + +@dataclass(frozen = True) +class Choice: + """For use where the orchestration indicates any of several instruments is sufficient.""" + choices: tuple["Instrument", ...] + + +_IGNORED_HEADER_PATTS = [ + "additional materials?:?", + "delivered digitally:", + "digital download", + "optional keyboard patches/programming:", +] + + +def _is_ignored_header(line: str) -> bool: + return bool(any( + re.match(patt + "$", line, re.IGNORECASE) + for patt in _IGNORED_HEADER_PATTS + )) + + +def _may_be_orchestration_header(line: str) -> bool: + return "orchestration" in line.lower() or line.endswith(":") + + +def _ignorable(line: str) -> bool: + if line.startswith("Note:") and line.endswith(".") or line == "Logo": + return True + if _may_be_orchestration_header(line): + return False + + return not re.match(_PART_PATTERN, line) + + +def _pop_uninteresting_section(lines: list[str]) -> Optional[list[str]]: + if _is_ignored_header(lines[0]): + end = next( + ( + i for (i, line) in enumerate(lines[1:], start = 1) + if not _ignorable(line) or _is_ignored_header(line) + ), + len(lines) + ) + assert end <= 4 # at most 2 lines of section text + return lines[end:] + elif _ignorable(lines[0]): + return lines[1:] + else: + return None + + +# There are special considerations for parsing percussion detail sections because of how diverse the +# instruments/sounds called for can be. +PERCUSSION_PART_NAMES = { + "drums & percussion", + "drums", + "drums/percussion", + "percussion", +} + + +# Many parts are named after an instrument, which is an implicit requirement for the part. In +# contrast, parts with these names don't have a clearly implied instrument, and what instruments are +# called for is dictated by a separate instrument list. +PART_NAMES = { + *PERCUSSION_PART_NAMES, + "bass", + "drums", + "flexbo", + "reed", + "woodwind", + "woodwinds", +} + + +Instrument = Enum( + "Instrument", + [ + "ACCORDION", + "ACME_SIREN", + "ACOUSTIC_BASS_GUITAR", + "ACOUSTIC_ELECTRIC_GUITAR", + "ACOUSTIC_GUITAR", + "ALTO_FLUTE", + "ALTO_HORN", + "ALTO_RECORDER", + "ALTO_SAXOPHONE", + "ARCHTOP_GUITAR", + "A_CLARINET", + "BAMBOO_FLUTE", + "BANJO", + "BARITONE_SAXOPHONE", + "BASSOON", + "BASS_CLARINET", + "BASS_DRUM", + "BASS_GUITAR", + "BASS_OBOE", + "BASS_SAXOPHONE", + "BASS_TROMBONE", + "BELL_CYMBAL", + "BELL_TREE", + "BIRD_WHISTLE", + "BONGO_DRUMS", + "BOWED_CYMBAL", + "CABASA", + "CASTANETS", + "CELLO", + "CHINA_CYMBAL", + "CLARINET", + "CLAVES", + "CONCERTINA", + "CONGAS", + "CORNET", + "COWBELL", + "CRASH_CYMBAL", + "CROTALES", + "CYMBALS", + "DAF", + "DJEMBE", + "DRUM_KIT", + "DULCIMER", + "EGG_SHAKER", + "ELECTRIC_GUITAR", + "ENGLISH_HORN", + "EUPHONIUM", + "E♭_CLARINET", + "FIELD_DRUM", + "FINGER_CYMBALS", + "FLEXATONE", + "FLOOR_TOM", + "FLUTE", + "FLÜGELHORN", + "FRAME_DRUM", + "FRENCH_HORN", + "GLOCKENSPIEL", + "GOBLET_DRUM", + "GONG", + "GUITAR", # acoustic/electric unknown + "GÜIRO", + "HARMONICA", + "HARMONIUM", + "HARP", + "HI_HAT", + "JINGLE_BELLS", + "KEYBOARD", + "MANDOLIN", + "MARACAS", + "MARIMBA", + "MARK_TREE", + "MELODICA", + "OBOE", + "OBOE_D_AMORE", + "PEDAL_STEEL_GUITAR", + "PIATTI", + "PICCOLO", + "PICCOLO_TRUMPET", + "RATCHET", + "RECORDER", # no obvious default size + "RESONATOR_GUITAR", + "RIDE_CYMBAL", + "SANDPAPER_BLOCKS", + "SEMI_ACOUSTIC_GUITAR", + "SHIPS_BELL", + "SLIDE_WHISTLE", + "SLIT_DRUM", + "SNARE_DRUM", + "SOPRANO_CORNET", + "SOPRANO_RECORDER", + "SOPRANO_SAXOPHONE", + "SPLASH_CYMBAL", + "STEELPAN", + "SUSPENDED_CYMBAL", + "TAIKO", + "TAMBOURINE", + "TEMPLE_BLOCKS", + "TENOR_SAXOPHONE", + "TIMPANI", + "TOM_DRUM", + "TRAIN_WHISTLE", + "TRIANGLE", + "TROMBONE", + "TRUMPET", + "TUBA", + "TUBULAR_BELLS", + "UKELELE", + "UPRIGHT_BASS", + "VIBRAPHONE", + "VIBRASLAP", + "VIOLA", + "VIOLIN", + "WHIP", + "WHISTLE", + "WIND_CHIMES", + "WOOD_BLOCK", + "XYLOPHONE", + "_12_STRING_ACOUSTIC_GUITAR", + "_12_STRING_ELECTRIC_GUITAR", + "_1_SHOT_SHAKER", + ] +) + + +_INSTRUMENT_TYPOS = { + "ACOUSTIC_BASS": Instrument.UPRIGHT_BASS, + "ACOUSTIC_NYLON_STRING_GUITAR": Instrument.ACOUSTIC_GUITAR, + "ACOUSTIC_PIANO": Instrument.KEYBOARD, + "ACOUSTIC_STEEL_STRING_GUITAR": Instrument.ACOUSTIC_GUITAR, + "ACOUSTIC_WITH_AMP_GUITAR": Instrument.ACOUSTIC_ELECTRIC_GUITAR, + "AFRICAN_DRUM": Instrument.DJEMBE, + "ALTO_RECORDER_IN_F": Instrument.ALTO_RECORDER, + "BELLS": Instrument.GLOCKENSPIEL, + "BIG_BAMBOO_FLUTE": Instrument.BAMBOO_FLUTE, + "BIG_SHAKER": Instrument.EGG_SHAKER, + "BONGOS": Instrument.BONGO_DRUMS, + "B♭_CLARINET": Instrument.CLARINET, + "CHIMES": Instrument.TUBULAR_BELLS, + "CHINESE_BELL_TREE": Instrument.BELL_TREE, + "CL": Instrument.CLARINET, # :( + "CLAR": Instrument.CLARINET, + "CLAR.": Instrument.CLARINET, + "COMEDY_SIREN": Instrument.ACME_SIREN, + "CONCERT_BASS_DRUM": Instrument.BASS_DRUM, + "COR_ANGLAIS": Instrument.ENGLISH_HORN, + "COWBELLS": Instrument.COWBELL, + "COW_BELL": Instrument.COWBELL, + "C_FLUTE": Instrument.FLUTE, + "DARBUKA": Instrument.GOBLET_DRUM, + "DISCO_WHISTLE": Instrument.WHISTLE, + "DOBRO": Instrument.RESONATOR_GUITAR, + "DOUBLE_BASS": Instrument.UPRIGHT_BASS, + "DRUMS": Instrument.DRUM_KIT, + "DRUM_SET": Instrument.DRUM_KIT, + "ELECTRIC_BASS": Instrument.BASS_GUITAR, + "ELECTRIC_DROP_D_GUITAR": Instrument.ELECTRIC_GUITAR, + "ELECTRIC_PIANO": Instrument.KEYBOARD, + "ELECTRIC_SLIDE_GUITAR": Instrument.ELECTRIC_GUITAR, + "ELECTRONIC_KIT": Instrument.DRUM_KIT, + "ENG._HORN": Instrument.ENGLISH_HORN, + "FENDER_BASS": Instrument.BASS_GUITAR, + "FENDER_RHODES": Instrument.KEYBOARD, + "FIDDLE": Instrument.VIOLIN, + "FL": Instrument.FLUTE, + "FLEXITONE": Instrument.FLEXATONE, + "FLUGELHORN": Instrument.FLÜGELHORN, + "GOURD": Instrument.GÜIRO, + "GRAN_CASSA": Instrument.BASS_DRUM, + "GUIRO": Instrument.GÜIRO, + "G_FLUTE": Instrument.ALTO_FLUTE, + "HIGH_HAT": Instrument.HI_HAT, + "HOLLOW_BODY_GUITAR": Instrument.SEMI_ACOUSTIC_GUITAR, + "HORN": Instrument.FRENCH_HORN, + "ICE_BELL": Instrument.BELL_CYMBAL, + "KIT": Instrument.DRUM_KIT, + "LES_PAUL_GUITAR": Instrument.ELECTRIC_GUITAR, + "MARCHING_SNARE": Instrument.SNARE_DRUM, + "MOUTH_SIREN": Instrument.ACME_SIREN, + "NON_WESTERN_CHIME": Instrument.TUBULAR_BELLS, + "NYLON_ACOUSTIC_GUITAR": Instrument.ACOUSTIC_GUITAR, + "NYLON_STRING_ACOUSTIC_GUITAR": Instrument.ACOUSTIC_GUITAR, + "NYLON_STRING_GUITAR": Instrument.ACOUSTIC_GUITAR, + "ORCHESTRA_BELLS": Instrument.GLOCKENSPIEL, + "ORGAN": Instrument.KEYBOARD, + "PC": Instrument.PICCOLO, # yikes + "PIANO": Instrument.KEYBOARD, + "PICC.": Instrument.PICCOLO, + "PICCOLO_SNARE": Instrument.SNARE_DRUM, + "PIZZA_DRUM": Instrument.BASS_DRUM, + "RACHET": Instrument.RATCHET, + "RIQ": Instrument.DAF, + "SANDPAPER": Instrument.SANDPAPER_BLOCKS, + "SCRAPER": Instrument.GÜIRO, + "SHAKER": Instrument.EGG_SHAKER, + "SIREN": Instrument.ACME_SIREN, + "SIREN_WHISTLE": Instrument.ACME_SIREN, + "SLAPSTICK": Instrument.WHIP, + "SLEIGHBELLS": Instrument.JINGLE_BELLS, + "SLEIGH_BELLS": Instrument.JINGLE_BELLS, + "SMALL_&_LARGE_TRIANGLE": Instrument.TRIANGLE, + "SMALL_BAMBOO_FLUTE": Instrument.BAMBOO_FLUTE, + "SMALL_SHAKER": Instrument.EGG_SHAKER, + "SMALL_TOM": Instrument.TOM_DRUM, + "SMALL_TRIANGLE": Instrument.TRIANGLE, + "SMALL_WOOD_BLOCK": Instrument.WOOD_BLOCK, + "SNARE": Instrument.SNARE_DRUM, + "SOCK_CYMBAL": Instrument.HI_HAT, + "SOLID_BODY_ELECTRIC_GUITAR": Instrument.ELECTRIC_GUITAR, + "STEEL_ACOUSTIC_GUITAR": Instrument.ACOUSTIC_GUITAR, + "STEEL_DRUMS": Instrument.STEELPAN, + "STEEL_STRING_ACOUSTIC": Instrument.ACOUSTIC_GUITAR, + "STEEL_STRING_GUITAR": Instrument.ACOUSTIC_GUITAR, + "STRATOCASTER": Instrument.ELECTRIC_GUITAR, + "STRING_SYNTHESIZER": Instrument.KEYBOARD, + "SUSPENDED_CYMBAL": Instrument.CYMBALS, + "SYNTHESIZER": Instrument.KEYBOARD, + "TAM_TAM": Instrument.SLIT_DRUM, + "TELECASTER": Instrument.ELECTRIC_GUITAR, + "TENOR_HORN": Instrument.ALTO_HORN, + "TENOR_TROMBONE": Instrument.TROMBONE, + "TIC_TOC_BLOCKS": Instrument.WOOD_BLOCK, + "TOMS": Instrument.TOM_DRUM, + "TOM_TOMS": Instrument.TOM_DRUM, + "TRAP_SET": Instrument.DRUM_KIT, + "TWELVE_STRING_GUITAR": Instrument._12_STRING_ACOUSTIC_GUITAR, + "TYMPANI": Instrument.TIMPANI, + "T_BLOX": Instrument.TEMPLE_BLOCKS, + "VIBES": Instrument.VIBRAPHONE, + "VIOLONCELLO": Instrument.CELLO, + "WOODBLOCK": Instrument.WOOD_BLOCK, + "_5_STRING_BASS": Instrument.BASS_GUITAR, + "_5_STRING_ELECTRIC_BASS": Instrument.BASS_GUITAR, + "_5_STRING_FRETLESS_BASS": Instrument.BASS_GUITAR, +} + + +STRING_INSTRUMENTS = { + Instrument.VIOLIN, + Instrument.VIOLA, + Instrument.CELLO, + Instrument.UPRIGHT_BASS, +} + + +# instruments that could be implied by a "Bass" part +BASS_INSTRUMENTS = { + Instrument.ACOUSTIC_BASS_GUITAR, + Instrument.BASS_GUITAR, + Instrument.UPRIGHT_BASS, +} + + +_PERCUSSION_WTF = { + "ANVIL", + "Brushes", + "Drum Sticks", + "Electronic Drum Pad", + "FOOT ON HARDWOOD", + "Hand Drums", + "Handclap", + "Huge Electronic Hits", # ?? + "Industrial Sounds", + "METALLIC PERCUSSION", + "Mallet KAT", + "OCTOPAD-STADIUM HITS", + "Oriental Drum (deep)", + "PAD", + "POP GUN", + "Pop Cork Gun (or similar)", + "Popgun", + "Rubber Udders", + "SWIZZLE", + "Storm Drums", + "WHIZZER WHISTLE", + '"Noisy Things" (Clanky Noisemakers)', +} + + +_PART_NOTES = { + "played by actors, if possible, for certain numbers only", +} + + +_INSTRUMENT_NOTES = { + Instrument.BAMBOO_FLUTE: { + "f", + "g", + }, + Instrument.BASS_DRUM: { + "large drum with ominous, bass drum quality", + }, + Instrument.COWBELL: { + "high, medium, low", + }, + Instrument.CYMBALS: { + "various suspended, splash, chip, ride", + }, + Instrument.FIELD_DRUM: { + "or snare drum w/o snares", + }, + Instrument.GLOCKENSPIEL: { + "bells", + }, + Instrument.TEMPLE_BLOCKS: { + "3 pitches", + "5 pitches", + }, + Instrument.TOM_DRUM: { + "3", + }, + Instrument.TUBULAR_BELLS: { + "b♭, e♭", + "e", + }, + Instrument.WHIP: { + "whip", + }, + Instrument.WOOD_BLOCK: { + "2 pitches", + "hi & low", + "high and low", + }, +} + + +AdditionalRole = Enum("AdditionalRole", ["CONDUCTOR"]) diff --git a/test.py b/test.py new file mode 100644 index 0000000..97c1f9b --- /dev/null +++ b/test.py @@ -0,0 +1,426 @@ +from dataclasses import dataclass +from difflib import unified_diff +from itertools import groupby, islice +from pathlib import Path +from sys import argv +from time import sleep +from typing import Iterable, Iterator +from urllib.request import urlopen +import dataclasses +import itertools +import pickle + +from pithub.common import decode_response +from pithub.concord import ( + Location, fetch_productions, build_show_p_request, get_orchestration_modal, + lineify_orchestration_modal, +) +from pithub.html import parse_html +from pithub.orchestration import Choice, Instrument, Orchestration, parse_orchestrations + + +def format_orch(orch: Orchestration, indent: int) -> list[str]: + def format_instr(val: Instrument | Choice) -> str: + if isinstance(val, Instrument): + return val.name + else: + return " | ".join(instr.name for instr in val.choices) + + out = [] + out.append(" " * indent + str(orch.description)) + for part in orch.parts: + out.append( + " " * (indent + 1) + + f"{part.name} {part.players}p" + + (" (opt)" if part.optional else "") + ) + for instr in part.required_instruments: + out.append(" " * (indent + 2) + format_instr(instr)) + for instr in part.optional_instruments: + out.append(" " * (indent + 2) + f"{format_instr(instr)} (opt)") + if part.role: + out.append(" " * (indent + 2) + f"+{part.role.name}") + return out + + +def print_orch(orch: Orchestration, indent: int): + for line in format_orch(orch, indent): + print(line) + + +def deser_instr_list(l) -> None: + for (i, v) in enumerate(l): + if isinstance(v, str): + l[i] = Instrument[v] + else: + assert isinstance(v, Choice) + l[i] = Choice(tuple(Instrument[n] for n in v.choices)) + + +def ser_instr_list(l) -> None: + for (i, v) in enumerate(l): + if isinstance(v, Instrument): + l[i] = v.name + else: + assert isinstance(v, Choice) + l[i] = Choice(tuple(p.name for p in v.choices)) + + +def ser_instr(v): + if isinstance(v, Instrument): + return v.name + else: + assert isinstance(v, Choice) + return Choice(tuple(p.name for p in v.choices)) + + +def ser_instr_list_2(l) -> list: + out = [] + for v in l: + if isinstance(v, Instrument): + out.append(v.name) + else: + assert isinstance(v, Choice) + out.append(Choice([p.name for p in v.choices])) + return out + + +def dedup_stream(it: Iterable) -> Iterator: + seen = set() + for val in it: + if val not in seen: + yield val + seen.add(val) + + +def do_concord(): + test_db_path = Path("test_db") + test_db_path.mkdir(exist_ok = True) + + def extract_id(path): + return int(path.name.split("_")[0]) + test_db_ids = map(extract_id, test_db_path.iterdir()) + + prods = fetch_productions( + Location(45.1288, -92.9073), + Location(44.7389, -93.6969), + ) + geo_ids = (v for (v, _) in groupby(prods, lambda p: p.show_id)) + + all_ids = dedup_stream(itertools.chain(test_db_ids, geo_ids)) + for (i, show_id) in islice(enumerate(all_ids), 34, None): + print(f"{i}: {show_id}") + modal_p = test_db_path.joinpath(f"{show_id}_modal.pickle") + orchs_p = test_db_path.joinpath(f"{show_id}_orchs.pickle") + maybe_install = False + if orchs_p.exists(): + with modal_p.open("rb") as f: + modal = pickle.load(f) + with orchs_p.open("rb") as f: + exp_orchs = pickle.load(f) + for orch in exp_orchs: + for p in orch.parts: + deser_instr_list(p.required_instruments) + deser_instr_list(p.optional_instruments) + lines = lineify_orchestration_modal(modal) + act_orchs = list(parse_orchestrations(lines)) + if act_orchs == exp_orchs: + print("pass") + else: + print("mismatch!") + assert len(act_orchs) == len(exp_orchs) + for (exp, act) in zip(exp_orchs, act_orchs): + for line in unified_diff(format_orch(exp, 0), format_orch(act, 0), lineterm = ""): + print(" " + line) + maybe_install = True + orchs = act_orchs + else: + with decode_response(urlopen(build_show_p_request(show_id))) as resp: + resp_data = resp.read() + document = parse_html(resp_data) + modal = get_orchestration_modal(document) + if modal: + lines = lineify_orchestration_modal(modal) + orchs = list(parse_orchestrations(lines)) + for orch in orchs: + print_orch(orch, indent = 1) + maybe_install = True + else: + print("n/a") + sleep(3) + + if maybe_install: + done = False + while True: + if (response := input("(y/full): ")) == "y": + with modal_p.open("wb") as f: + pickle.dump(modal, f) + orchs = [ + dataclasses.replace( + orch, + parts = [ + dataclasses.replace( + part, + required_instruments = list(map(ser_instr, part.required_instruments)), + optional_instruments = list(map(ser_instr, part.optional_instruments)), + ) + for part in orch.parts + ] + ) + for orch in orchs + ] + with orchs_p.open("wb") as f: + pickle.dump(orchs, f) + break + elif response == "full": + for orch in orchs: + print_orch(orch, indent = 0) + else: + done = True + break + if done: + break + + +def great_circle_miles(a: Location, b: Location) -> float: + from math import asin, cos, radians, sin, sqrt + + # haversine formula + r1 = radians(a.latitude) + r2 = radians(b.latitude) + l1 = radians(a.longitude) + l2 = radians(b.longitude) + return 2 * 3957 * asin(sqrt( + sin((r2 - r1) / 2) ** 2 + + cos(r1) * cos(r2) * sin((l2 - l1) / 2) ** 2 + )) + + +def do_mti(skip: int): + from urllib.request import urlopen + import json + + from pithub.common import decode_response + from pithub.html import parse_html + from pithub.mti import ( + build_maprefresh_request, build_materials_request, get_all_shows, get_orchestrations, + get_productions, parse_maprefresh + ) + + test_db_path = Path("mti_test_db") + test_db_path.mkdir(exist_ok = True) + + with decode_response(urlopen(build_maprefresh_request())) as resp: + data = resp.read() + + root = parse_html(data) + + def full_show_ids(): + def extract_id(path): + return int(path.name.split("_")[0]) + + #yield from map(extract_id, test_db_path.iterdir()) + yield from get_all_shows(root) + + def get_show_name(materials_json): + [cmd_obj] = [obj for obj in materials_json if obj["command"] == "insert" and obj["method"] == "html"] + if cmd_obj["data"]: + tree = parse_html("" + cmd_obj["data"] + "") + return tree[0].text.removeprefix("Materials for ") + else: + return None + + olds_warehouse_loc = Location(43.0779, -89.37308) + for (i, show_id) in islice(enumerate(dedup_stream(full_show_ids())), skip, None): + print(f"{i} ({show_id})") + req = build_maprefresh_request(show_id) + print(f" {req.full_url}") + with decode_response(urlopen(req)) as resp: + data = resp.read() + sleep(3) + + doc = parse_html(data) + maprefresh = parse_maprefresh(doc, show_id) + print(f" {len(maprefresh.map_view_productions)}/{len(maprefresh.table_productions)}") + #productions = get_productions(maprefresh) + agg = parse_maprefresh(doc, show_id) + for page_no in range(1, maprefresh.page_count): + req = build_maprefresh_request(show_id, page_no) + print(f" {req.full_url}") + with decode_response(urlopen(req)) as resp: + data = resp.read() + sleep(3) + maprefresh = parse_maprefresh(parse_html(data), show_id) + print(f" p1: {len(maprefresh.map_view_productions)}/{len(maprefresh.table_productions)}") + assert maprefresh.page_num == page_no + agg.map_view_productions.extend(maprefresh.map_view_productions) + agg.table_productions.extend(maprefresh.table_productions) + #productions.extend(get_productions(maprefresh)) + productions = get_productions(agg) + + if not any(great_circle_miles(p.location, olds_warehouse_loc) < 15 for p in productions): + print(" geo skipping") + continue + + json_p = test_db_path.joinpath(f"{show_id}_res.json") + orchs_p = test_db_path.joinpath(f"{show_id}_orchs.pickle") + + if orchs_p.exists(): + with json_p.open("r") as f: + materials_json = json.load(f) + + if (show_name := get_show_name(materials_json)): + print(f"{i} {show_name} ({show_id})") + + with orchs_p.open("rb") as f: + exp_orchs = pickle.load(f) + for orch in exp_orchs: + for p in orch.parts: + deser_instr_list(p.required_instruments) + deser_instr_list(p.optional_instruments) + act_orchs = get_orchestrations(materials_json) + if act_orchs == exp_orchs: + print("pass") + maybe_install = False + else: + print("mismatch!") + assert len(act_orchs) == len(exp_orchs) + for (exp, act) in zip(exp_orchs, act_orchs): + for line in unified_diff(format_orch(exp, 0), format_orch(act, 0), lineterm = ""): + print(" " + line) + maybe_install = True + orchs = act_orchs + else: + print(f"{i}: parsing ", end = "", flush = True) + with decode_response(urlopen(build_materials_request(show_id))) as resp: + materials_json = json.load(resp) + + if (show_name := get_show_name(materials_json)): + print(show_name) + + orchs = get_orchestrations(materials_json) + for orch in orchs: + print_orch(orch, indent = 1) + maybe_install = True + + if maybe_install: + done = False + while True: + if (response := input("(y/full): ")) == "y": + with json_p.open("w") as f: + json.dump(materials_json, f) + orchs = [ + dataclasses.replace( + orch, + parts = [ + dataclasses.replace( + part, + required_instruments = list(map(ser_instr, part.required_instruments)), + optional_instruments = list(map(ser_instr, part.optional_instruments)), + ) + for part in orch.parts + ] + ) + for orch in orchs + ] + with orchs_p.open("wb") as f: + pickle.dump(orchs, f) + sleep(3) + break + elif response == "full": + for orch in orchs: + print_orch(orch, indent = 0) + else: + done = True + break + if done: + break + + +if __name__ == "__main__": + if argv[1:] == ["alt"]: + from datetime import date, timedelta + import json + + from pithub.mti import ( + fetch_all_productions, fetch_maprefresh_page, MaprefreshParams, Production, + build_materials_request, get_orchestrations + ) + + def requester(req): + with decode_response(urlopen(req)) as resp: + data = resp.read() + sleep(3) + return data + + def is_upcoming(production) -> bool: + default_closing = production.opening + timedelta(days = 21) + if date.today() < production.opening: + return True + if production.closing: + if production.closing - production.opening > timedelta(days = 25): + return date.today() < default_closing + else: + return date.today() < production.closing + else: + return date.today() < default_closing + + @dataclass + class TaggedProduction: + show_id: int + production: Production + + def get_instr_codes(orch) -> Iterator[str]: + for part in orch.parts: + main_instruments = set(itertools.chain.from_iterable( + v.choices if isinstance(v, Choice) else [v] + for v in part.required_instruments + )) + assert all(isinstance(v, Instrument) for v in main_instruments) + if Instrument.VIOLIN in main_instruments: + yield "vln" + if Instrument.VIOLA in main_instruments: + yield "vla" + if Instrument.TROMBONE in main_instruments: + yield "tbn" + + olds_warehouse_loc = Location(43.0779, -89.37308) + show_ids = fetch_maprefresh_page(MaprefreshParams(), requester).show_ids + productions = [] + show_codes = {} + for (i, show_id) in islice(enumerate(show_ids), 0, None): + print(f"\rLoading show {show_id:<10} {i}/{len(show_ids)}", end = "") + new = [ + TaggedProduction(show_id, p) + for p in set(fetch_all_productions(show_id, requester, area = 48)) + if is_upcoming(p) and great_circle_miles(p.location, olds_warehouse_loc) < 15 + ] + if new: + data = requester(build_materials_request(show_id)) + materials_json = json.loads(data) + orchestrations = get_orchestrations(materials_json) + codes = set(itertools.chain.from_iterable( + map(get_instr_codes, orchestrations) + )) + if codes: + show_codes[show_id] = codes + productions.extend(new) + print() + + if productions: + show_len = max(len(tp.production.show_title) for tp in productions) + org_len = max(len(tp.production.organization_name) for tp in productions) + productions.sort(key = lambda tp: tp.production.opening) + for tp in productions: + p = tp.production + dates = f"{p.opening} - {p.closing}" if p.closing else str(p.opening) + code_str = " ".join(sorted(show_codes[tp.show_id])) + print(f"{dates:<23}: {p.show_title:<{show_len}} @ {p.organization_name:<{org_len}} ({code_str})") + else: + print("no results") + else: + if len(argv) == 2: + skip = int(argv[1]) + else: + skip = 0 + do_mti(skip)