Includes client interfaces and parsers for shows, productions, and orchestrations.
--- /dev/null
+from cgi import parse_header
+from dataclasses import dataclass
+from datetime import date
+from io import TextIOBase, TextIOWrapper
+from typing import Iterable, Optional
+from urllib.response import addinfourl
+from xml.etree.ElementTree import Element
+import re
+def parse_date(date_str: str) -> Optional[date]:
+ if match := re.match(r"(\d+)/(\d+)/(\d+)$", date_str):
+ (m, d, y) = map(int, match.groups())
+ return date(y, m, d)
+ else:
+ return None
+@dataclass(frozen = True)
+class Location:
+ latitude: float
+ longitude: float
+def decode_response(response: addinfourl, encoding: Optional[str] = None) -> TextIOBase:
+ if not encoding:
+ encoding = "ascii"
+ content_type_header = response.headers.get("Content-Type")
+ if content_type_header:
+ (_, params) = parse_header(content_type_header)
+ encoding = params.get("charset", encoding)
+ return TextIOWrapper(response, encoding = encoding)
+def build_element(
+ tag: str,
+ attrib: dict[str, Optional[str]] = {},
+ text: Optional[str] = None,
+ children: Iterable[Element] = [],
+) -> Element:
+ e = Element(tag, attrib)
+ e.text = text
+ for child in children:
+ e.append(child)
+ return e
+def throwing_factory():
+ """For use as a default factory for required dataclass fields that must have a default."""
+ raise RuntimeError("Required argument not passed")
--- /dev/null
+from dataclasses import dataclass
+from datetime import date
+from itertools import groupby
+from typing import Collection, Iterator, Optional
+from urllib.parse import urlencode
+from urllib.request import Request, urlopen
+from xml.etree.ElementTree import Element, TreeBuilder
+import itertools
+import json
+import re
+from .common import build_element, decode_response, parse_date
+from .html import parse_html
+class Location:
+ latitude: float
+ longitude: float
+class Production:
+ show_id: int
+ producer: str
+ opening: date
+ closing: date
+def fetch_productions(ne_corner: Location, sw_corner: Location) -> Iterator[Production]:
+ request = Request(
+ "",
+ headers = {"User-Agent": ""},
+ )
+ with decode_response(urlopen(request)) as resp:
+ resp_data =
+ root = parse_html(resp_data)
+ [thead] = root.iter("thead")
+ [tr] = thead
+ assert all(el.tag == "th" for el in tr)
+ col_headers = [next(th.iter("span")).text.strip() for th in tr]
+ charset = "utf-8"
+ for page_num in itertools.count(start = 1):
+ request = Request(
+ "",
+ data = urlencode({
+ "mapParams": json.dumps({
+ "neLat": ne_corner.latitude,
+ "neLon": ne_corner.longitude,
+ "swLat": sw_corner.latitude,
+ "swLon": sw_corner.longitude,
+ "page": page_num,
+ "sortBy": "", # default: order lexically by show title
+ "sortDes": False,
+ "showHistorical": True, # don't know what this does
+ })
+ }).encode(charset),
+ headers = {"User-Agent": ""},
+ )
+ with decode_response(urlopen(request)) as resp:
+ resp_data =
+ root = parse_html("<doc>" + resp_data + "</doc>")
+ rows = list(root)
+ assert all(el.tag == "tr" for el in rows)
+ [*data_rows, pagination_row] = rows
+ assert pagination_row.attrib["class"] == "tfoot"
+ for row in data_rows:
+ assert len(row) == len(col_headers) and all(el.tag == "td" for el in row)
+ zipped = list(zip(col_headers, row))
+ [title_td] = (td for (header, td) in zipped if header == "Title")
+ [a] = title_td
+ assert a.tag == "a"
+ show_id = int(re.match(r"\d+)/", a.attrib["href"])[1])
+ [producer_td] = (td for (header, td) in zipped if header == "Producer")
+ assert not producer_td
+ producer = producer_td.text.strip()
+ [opening_td] = (td for (header, td) in zipped if header == "Opening")
+ assert not opening_td
+ opening = parse_date(opening_td.text.strip())
+ [closing_td] = (td for (header, td) in zipped if header == "Closing")
+ assert not closing_td
+ closing = parse_date(closing_td.text.strip())
+ yield Production(show_id, producer, opening, closing)
+ [td] = pagination_row
+ if td:
+ def page_num_for(li: Element) -> int:
+ return int(li[0][0].text)
+ lis = list(td.iter("li"))
+ has_next = any("next" in li[0][0].text.strip().lower() for li in lis)
+ [current_li] = (li for li in lis if li.attrib.get("class") == "current")
+ assert page_num_for(current_li) == page_num
+ if not has_next:
+ assert page_num == max(page_num_for(li) for li in lis if li[0][0].text.strip().isdigit())
+ break
+ else:
+ # single page of results, no pagination
+ assert page_num == 1
+ [cell] = pagination_row
+ assert cell.tag == "tr" and (not cell.text or cell.text.isspace())
+ break
+def build_show_p_request(show_id: int) -> Request:
+ return Request(
+ f"{show_id}",
+ headers = {"User-Agent": ""},
+ )
+def get_orchestration_modal(document: Element) -> Optional[Element]:
+ # look for "MUSIC" in the show nav bar below the hero
+ title_nav = next(
+ nav for nav in document.iter("nav")
+ if "title-nav" in nav.attrib.get("class", "").split()
+ )
+ music_nav = any(a.text.strip() == "Music" for a in title_nav[1].iter("a"))
+ """
+ There's invalid markup here, so what Firefox corrects to:
+ <main id="main-content" class="main-content">
+ <div id="modal-estimator" ...>...</div>
+ <div class="title-hero js-title-hero">...</div>
+ <div class="position-relative bg-color-silver-ultralight">...</div>
+ TreeBuilder constructs as:
+ <main id="main-content" class="main-content>
+ <div id="modal-estimator" ...>
+ ...
+ <div class="title-hero js-title-hero">...</div>
+ <div class="position-relative bg-color-silver-ultralight">...</div>
+ """
+ main = next(document.iter("main"))
+ # extract the brief italic description from just below the show title
+ hero = next(
+ el for el in main.iter("div")
+ if "title-hero" in el.attrib.get("class", "").split()
+ )
+ [hero_sub_el] = (
+ el for el in hero.iter("p")
+ if "title-hero__subinfo" in el.attrib.get("class", "").split()
+ )
+ sub_info = hero_sub_el.text.strip()
+ """
+ post_hero = next(
+ el for el in main.iter("div")
+ if {"position-relative", "bg-color-silver-ultralight"} <= set((el.attrib.get("class") or "").split())
+ )
+ [contents] = (el for el in post_hero if "contents" in el.attrib.get("class", ""))
+ # look for a music page section and check that the nav menu agrees
+ flat = itertools.chain.from_iterable(contents) # disregard div.row groupings
+ """
+ """
+ anchor = next(
+ (el for el in main.iter("span") if el.attrib.get("id") == "music"),
+ None
+ )
+ music_section = anchor is not None
+ """
+ def get_pdp_sections(tree: Element, headers: Collection[str]) -> Iterator[Optional[Element]]:
+ # Extract page sections by header text. Iterating over all the div elements could be very
+ # expensive, so we try to only do it once.
+ elements = {}
+ for el in tree.iter("div"):
+ if "pdp-section" in (el.attrib.get("class") or "").split():
+ if el and el[0]:
+ header = el[0][0].text.strip() # h3
+ if header in headers:
+ assert header not in elements
+ elements[header] = el
+ if len(elements) == len(headers):
+ break
+ return (elements.get(header) for header in headers)
+ def get_inline_modals(tree: Element) -> dict[str, Element]:
+ # Extract inline modals by the text of the button that opens them
+ return {
+ div.attrib["button-text"].lower(): div
+ for div in tree.iter("div")
+ if div.attrib.get("is") == "InlineModal"
+ }
+ [music_section, materials_section] = get_pdp_sections(main, ["Music", "Licensing & Materials"])
+ assert (music_section is not None) == music_nav
+ if music_nav:
+ # pdp-section div immediately follows corresponding anchor
+ assert "pdp-section" in music_section.attrib["class"].split()
+ [_, content] = [e for e in music_section if e.tag == "div"]
+ rhs = content[-1]
+ classifications = content[-1]
+ music_modals = get_inline_modals(music_section)
+ #assert music_modals.keys() <= {"musical numbers"} TODO
+ def look_up(header: str) -> Optional[str]:
+ labels = [el for el in content.iter("strong") if (el.text or "").strip() == header]
+ if labels:
+ [label] = labels
+ return label.tail.strip()
+ else:
+ return None
+ musical_style = look_up("Musical Style")
+ orch_size = look_up("Orchestra Size")
+ if musical_style == "N/A (Not a musical)":
+ assert orch_size is None
+ elif orch_size is None:
+ pass
+ else:
+ materials_modals = get_inline_modals(materials_section)
+ assert materials_modals.keys() <= {
+ "piano only", "full package rentals", "title specific notes"
+ }
+ if orch_size == "Piano Only":
+ assert "full package rentals" not in materials_modals
+ modal = materials_modals["piano only"]
+ elif orch_size in ["Small/Combo", "Medium", "Large", "X-Large"]:
+ modal = materials_modals["full package rentals"]
+ else:
+ raise NotImplementedError(f"orchestra size {orch_size}")
+ return modal
+ else:
+ # no music section; should be a play
+ assert"\bplay\b", sub_info, re.IGNORECASE)
+ return None
+def lineify_orchestration_modal(modal: Element) -> list[str]:
+ def clean_whitespace(text: str) -> str:
+ return re.sub(r"\s+", " ", text.strip())
+ def textify(el: Element) -> Iterator[str]:
+ if el.tag in ["p", "br", "div", "li", "ul"]:
+ yield "\n"
+ else:
+ assert el.tag in ["a", "em", "span", "strong"]
+ if el.text and el.text.strip():
+ yield clean_whitespace(el.text)
+ for sub in el:
+ yield from textify(sub)
+ if el.tail and el.tail.strip():
+ yield clean_whitespace(el.tail)
+ if el.tag in ["p", "div", "li", "ul"]:
+ yield "\n"
+ return [
+ " ".join(parts)
+ for (is_br, parts) in groupby(textify(modal), key = lambda s: s == "\n")
+ if not is_br
+ ]
--- /dev/null
+from html.parser import HTMLParser
+from xml.etree.ElementTree import Element, TreeBuilder
+class _AutoClosingTreeBuilder(TreeBuilder):
+ _AUTO_CLOSING_TAGS = {"br", "img"}
+ def __init__(self, *args, **kwargs) -> None:
+ self._tag_stack = []
+ super().__init__(*args, **kwargs)
+ def _auto_close(self):
+ if self._tag_stack and self._tag_stack[-1] in self._AUTO_CLOSING_TAGS:
+ self.end(self._tag_stack[-1])
+ def start(self, tag: str, attrs: list[tuple[str, str]]) -> None:
+ self._auto_close()
+ super().start(tag, attrs)
+ self._tag_stack.append(tag)
+ def end(self, tag: str) -> None:
+ if tag not in self._AUTO_CLOSING_TAGS:
+ self._auto_close()
+ super().end(tag)
+ self._tag_stack.pop()
+class _HtmlTreeBuilderDriver(HTMLParser):
+ """Adapts the HTMLParser callback interface to etree's TreeBuilder for an etree-like HTML parser."""
+ def __init__(self, tree_builder: TreeBuilder, **kwargs) -> None:
+ self.tree_builder = tree_builder
+ super().__init__(**kwargs)
+ def handle_starttag(self, tag: str, attrs: list[tuple[str, str]]) -> None:
+ self.tree_builder.start(tag, dict(attrs))
+ def handle_endtag(self, tag: str) -> None:
+ self.tree_builder.end(tag)
+ def handle_data(self, data: str) -> None:
+def parse_html(content: str) -> Element:
+ tree_builder = _AutoClosingTreeBuilder()
+ _HtmlTreeBuilderDriver(tree_builder).feed(content)
+ return tree_builder.close()
--- /dev/null
+from collections import deque
+from dataclasses import dataclass, replace
+from datetime import date
+from enum import Enum
+from itertools import groupby
+from pathlib import PurePosixPath
+from typing import Callable, Iterable, Optional
+from urllib.parse import parse_qs, urlencode, urlparse
+from urllib.request import Request
+from warnings import warn
+from xml.etree.ElementTree import Element
+import itertools
+import json
+import re
+from .common import Location, parse_date
+from .html import parse_html
+from .orchestration import (
+ deduplicate_keyboard_parts, Instrument, KeyboardPart, Orchestration, parse_instrument,
+ parse_instrument_list, Part, PartInstruments, parts_to_instruments, PART_NAMES,
+class MaprefreshParams:
+ @dataclass
+ class SortSpec:
+ Key = Enum("Key", ["ORG_NAME"])
+ key: Key
+ ascending: bool = True
+ show_id: Optional[int] = None
+ area: Optional[int] = None
+ page: Optional[int] = None # zero-based
+ sort_spec: Optional[SortSpec] = None
+ def query_string_params(self) -> dict[str, list[str]]:
+ params = {}
+ if self.show_id is not None:
+ params["field_production_show_target_id"] = [str(self.show_id)]
+ if self.area is not None:
+ params["field_production_address_administrative_area"] = [str(self.area)]
+ if is not None:
+ params["page"] = [str(]
+ if self.sort_spec:
+ params["order"] = ["field_production_org_name"]
+ params["sort"] = ["asc" if self.sort_spec.ascending else "desc"]
+ return params
+ def to_request(self) -> Request:
+ qs = urlencode(self.query_string_params(), doseq = True)
+ return Request("" + qs)
+@dataclass(frozen = True)
+class Production:
+ show_title: str
+ organization_name: str
+ opening: date
+ closing: Optional[date]
+ location: Location
+def _zip_productions(map_view_production: "MapViewProduction", table_production: "TableProduction") -> Production:
+ [org_name] = {map_view_production.organization_name, table_production.organization_name}
+ [opening] = {map_view_production.opening, table_production.opening}
+ [closing] = {map_view_production.closing, table_production.closing}
+ if map_view_production.venue is not None:
+ assert map_view_production.venue == table_production.venue
+ return Production(org_name, opening, closing, map_view_production.location)
+@dataclass(frozen = True)
+class MapViewProduction:
+ show_title: str
+ opening: date
+ closing: Optional[date]
+ organization_name: str
+ venue: Optional[str]
+ location: Location
+@dataclass(frozen = True)
+class TableProduction:
+ show_title: str
+ organization_name: str
+ opening: date
+ closing: Optional[date]
+ venue: str
+class MaprefreshData:
+ show_ids: list[int]
+ map_view_productions: list[MapViewProduction]
+ table_productions: list[TableProduction]
+ page_count: int
+_Requester = Callable[[Request], str]
+def fetch_maprefresh_page(params: MaprefreshParams, requester: _Requester) -> MaprefreshData:
+ body = requester(params.to_request())
+ return parse_maprefresh(parse_html(body), params)
+def _walk_pages(initial_params: MaprefreshParams, requester: _Requester) -> list[MaprefreshData]:
+ start_page = or 0 # server-side default of 0
+ initial = fetch_maprefresh_page(initial_params, requester)
+ out = [initial]
+ for page_num in range(start_page + 1, initial.page_count):
+ curr_params = replace(initial_params, page = page_num)
+ maprefresh = fetch_maprefresh_page(curr_params, requester)
+ assert maprefresh.page_count == initial.page_count
+ out.append(maprefresh)
+ if len(out) > 1:
+ # There should be minimal if any overlap between page contents.
+ for attr_name in ["map_view_productions", "table_productions"]:
+ production_count = sum(len(getattr(page, attr_name)) for page in out)
+ if production_count:
+ distinct_productions = set(itertools.chain.from_iterable(
+ getattr(page, attr_name) for page in out
+ ))
+ assert len(distinct_productions) / production_count > 0.97
+ return out
+def fetch_all_productions(show_id: int, requester: _Requester, area: Optional[int] = None) -> list[Production]:
+ """
+ When a particular ordering is not imposed by the request, there seem to be pagination problems;
+ primarily a production near the end of page n reappears near the beginning of page n + 1. This
+ is mitigated for the table view by specifying an order, but these request parameters don't seem
+ to affect the map view at all, so the pagination issue is unavoidable.
+ The workaround is to use a sort order and the table view to get an authoritative list of
+ productions, and then fill in missing map productions by using more targeted queries whose
+ results fit on a single page.
+ """
+ params = MaprefreshParams(
+ show_id,
+ area,
+ sort_spec = MaprefreshParams.SortSpec(MaprefreshParams.SortSpec.Key.ORG_NAME),
+ )
+ pages = _walk_pages(params, requester)
+ # TODO clean up
+ map_orgs = {
+ production.organization_name
+ for page in pages
+ for production in page.map_view_productions
+ }
+ table_orgs = {
+ production.organization_name
+ for page in pages
+ for production in page.table_productions
+ }
+ if map_orgs - table_orgs:
+ print("Resolving table shuffle")
+ reverse_params = replace(
+ params,
+ sort_spec = replace(params.sort_spec, ascending = False),
+ )
+ pages.extend(_walk_pages(reverse_params, requester))
+ map_orgs = {
+ production.organization_name
+ for page in pages
+ for production in page.map_view_productions
+ }
+ table_orgs = {
+ production.organization_name
+ for page in pages
+ for production in page.table_productions
+ }
+ map_missing = table_orgs - map_orgs
+ if map_missing:
+ print("missing from map")
+ for org_name in map_missing:
+ print(f" {org_name!r}")
+ table_missing = map_orgs - table_orgs
+ if table_missing:
+ print("missing from table")
+ for org_name in table_missing:
+ print(f" {org_name!r}")
+ raise AssertionError()
+ map_productions = list(itertools.chain.from_iterable(p.map_view_productions for p in pages))
+ table_productions = list(itertools.chain.from_iterable(p.table_productions for p in pages))
+ """
+ Because we can perturb the response ordering, the table is most authoritative, but only the map
+ data has precise location. Possible solutions:
+ (1) get a clear set of productions from the table source and make separate geographically
+ targeted requests for the missing map data
+ (2) geocode the address data in the tables
+ """
+ # TODO merge sources, implementing one of the above solutions
+ table_count = len(set(table_productions))
+ map_count = len(set(map_productions))
+ if table_count < map_count:
+ warn(f"{map_count - table_count} more distinct map entries than distinct table entries")
+ elif map_count < table_count:
+ warn(f"{table_count - map_count} more distinct table entries than distinct map entries")
+ return [
+ Production(
+ production.show_title,
+ production.organization_name,
+ production.opening,
+ production.closing,
+ production.location
+ )
+ for production in map_productions
+ ]
+def get_productions(maprefresh: MaprefreshData) -> list[Production]:
+ def sort_key(production: MapViewProduction | TableProduction) -> str:
+ return (production.organization_name, production.opening, production.venue)
+ from collections import Counter
+ mct = Counter(maprefresh.map_view_productions)
+ mdups = [k for (k, v) in mct.items() if v > 1]
+ if mdups:
+ print(" map dupes:")
+ for k in mdups:
+ idxs = [i for (i, v) in enumerate(maprefresh.map_view_productions) if k == v]
+ print(f" {k.organization_name} @{k.venue} {idxs}")
+ tct = Counter(maprefresh.table_productions)
+ tdups = [k for (k, v) in tct.items() if v > 1]
+ if tdups:
+ print(" table dupes:")
+ for k in tdups:
+ idxs = [i for (i, v) in enumerate(maprefresh.table_productions) if k == v]
+ print(f" {k.organization_name} @{k.venue} {idxs}")
+ map_keys = {p.organization_name for p in maprefresh.map_view_productions}
+ table_keys = {p.organization_name for p in maprefresh.table_productions}
+ if map_keys - table_keys:
+ print(" stray map keys:")
+ for k in map_keys - table_keys:
+ print(f" {k}")
+ if table_keys - map_keys:
+ print(" stray table keys:")
+ for k in table_keys - map_keys:
+ print(f" {k}")
+ assert len(maprefresh.map_view_productions) == len(maprefresh.table_productions)
+ return list(itertools.starmap(
+ _zip_productions,
+ zip(
+ sorted(maprefresh.map_view_productions, key = sort_key),
+ sorted(maprefresh.table_productions, key = sort_key)
+ )
+ ))
+def _map_point_to_production(point_doc: dict) -> MapViewProduction:
+ (longitude, latitude) = point_doc["coordinates"]
+ location = Location(latitude, longitude)
+ bubble_tree = parse_html("<root>" + point_doc["properties"]["description"] + "</root>")
+ [show_title, *rest] = [t.strip() for t in bubble_tree.itertext() if not t.isspace()]
+ opening = None
+ closing = None
+ org_name = None
+ venue = None
+ to_seen = False
+ url_seen = False
+ for part in rest:
+ if date := parse_date(part):
+ if not opening:
+ opening = date
+ else:
+ assert not closing
+ closing = date
+ elif part == "to":
+ assert opening and not closing
+ to_seen = True
+ elif"ttps?://", part):
+ assert not url_seen
+ url_seen = True
+ else:
+ # org name or venue
+ if org_name:
+ assert not venue
+ venue = part
+ else:
+ org_name = part
+ assert opening
+ if to_seen:
+ assert closing
+ assert org_name
+ return MapViewProduction(show_title, opening, closing, org_name, venue, location)
+def _table_row_to_production(tr: Element) -> TableProduction:
+ [title_td, _, org_td, _, dates_td, venue_td] = tr
+ title = title_td.text.strip()
+ [raw] = [v for v in org_td.itertext() if not v.isspace()]
+ org_name = raw.strip()
+ dates_parts = [v.strip() for v in dates_td.itertext() if not v.isspace()]
+ if len(dates_parts) == 1:
+ # no closing date
+ [opening_str] = dates_parts
+ closing_str = None
+ else:
+ [opening_str, to, closing_str] = dates_parts
+ assert to == "to"
+ opening = parse_date(opening_str)
+ closing = closing_str and parse_date(closing_str)
+ venue = venue_td.text.strip()
+ return TableProduction(title, org_name, opening, closing, venue)
+def parse_maprefresh(page_root: Element, params: MaprefreshParams) -> MaprefreshData:
+ assert page_root.tag == "html"
+ head = page_root[0]
+ assert head.tag == "head"
+ [inject_expr] = [
+ m[1] for el in head.iter("script")
+ if (m := re.match(r"\s*jQuery\.extend\(Drupal\.settings, *(.+)\);\s*$", el.text or ""))
+ ]
+ map_data = json.loads(inject_expr)["geofieldMap"]["production-listing-refresh-page-2"]["data"]
+ if map_data == []:
+ map_view_productions = []
+ elif map_data["type"] == "Point":
+ map_view_productions = [_map_point_to_production(map_data)]
+ else:
+ assert map_data["type"] == "GeometryCollection"
+ map_view_productions = list(map(_map_point_to_production, map_data["geometries"]))
+ block_system_main = next(e for e in page_root.iter("div") if e.attrib.get("id") == "block-system-main")
+ [content_div] = block_system_main
+ [view_div] = content_div
+ item_list = next(
+ (el for el in view_div if "item-list" in el.attrib.get("class", "").split()),
+ None
+ )
+ if item_list is not None:
+ base_qs_params = {k: v for (k, v) in params.query_string_params().items() if k != "page"}
+ def page_num_for_url(url: str) -> int:
+ parsed = urlparse(ul[-1][0].attrib["href"])
+ assert not parsed.netloc
+ assert parsed.path == "/maprefresh"
+ assert not parsed.params
+ qs_params = parse_qs(parsed.query)
+ [page_num_str] = qs_params.pop("page")
+ assert qs_params == base_qs_params
+ return int(page_num_str)
+ [ul] = item_list
+ [pager_current] = (el for el in ul if "pager-current" in el.attrib["class"].split())
+ page_num = int(pager_current.text) - 1
+ # On the last page there's no pager-last button so we take the page count from the current
+ # page number.
+ pager_last_els = [el for el in ul if "pager-last" in el.attrib["class"].split()]
+ if pager_last_els:
+ [pager_last] = pager_last_els
+ page_count = page_num_for_url(pager_last[0].attrib["href"]) + 1
+ else:
+ page_count = page_num + 1
+ else:
+ page_num = 0
+ page_count = 1
+ assert page_num == ( or 0)
+ [view_filters] = (el for el in view_div if "view-filters" in el.attrib.get("class", "").split())
+ [attachment_div] = (el for el in view_div if "attachment" in el.attrib.get("class", "").split())
+ show_select = next(
+ el for el in view_filters.iter("select")
+ if el.attrib["id"] == "edit-field-production-show-target-id"
+ )
+ [blank, *options] = show_select
+ assert not blank.attrib["value"]
+ show_ids = [int(option.attrib["value"]) for option in options]
+ [view] = attachment_div
+ els = [el for el in view if "view-content" in el.attrib.get("class", "").split()]
+ if els:
+ [view_content] = els
+ [table] = view_content
+ [thead, tbody] = table
+ [tr] = thead
+ assert [th[0].text for th in tr] == ["Title", "Address", "Org Name", "Website", "Dates", "Venue"]
+ table_productions = list(map(_table_row_to_production, tbody))
+ else:
+ [view_empty] = view
+ assert "view-empty" in view_empty.attrib["class"]
+ table_productions = []
+ if
+ # shouldn't get empty results from a request beyond the first page; possible pagination
+ # handling error
+ assert map_view_productions and table_productions
+ # Pagination size is 200 productions; see if it looks like we may have missed the page buttons.
+ if page_count == 1 and len(map_view_productions) == 200:
+ title = map_view_productions[0].show_title
+ warn(f"200-show single page result for {title}; possible missed pagination")
+ return MaprefreshData(show_ids, map_view_productions, table_productions, page_count)
+def parse_show_index(page_root: Element) -> Iterable[str]:
+ """
+ Parser for /shows/all.
+ This gives URL IDs for shows but not numerical IDs, so is not useful for production scraping
+ except as a consistency check on the full show list. It also seems to be missing some shows
+ present in the Maprefresh. Returns URL IDs as strings. Shows are at
+ """
+ main_content = next(
+ e for e in page_root.iter("div")
+ if "main-content" in (e.attrib.get("class") or "").split()
+ )
+ section = main_content[0]
+ block_system_main = next(e for e in section.iter("div") if e.attrib.get("id") == "block-system-main")
+ [content_div] = block_system_main
+ [alpha_nav, *alpha_containers] = content_div
+ assert alpha_nav.attrib["id"] == "show-links"
+ assert len(alpha_containers) >= 25
+ item_sets = [
+ list(map(_id_for_item_div, _alpha_container_items(container)))
+ for container in alpha_containers
+ ]
+ assert all(item_sets)
+ return itertools.chain.from_iterable(item_sets)
+def _id_for_item_div(item_div: Element) -> str:
+ [_, link] = item_div
+ url = urlparse(link.attrib["href"])
+ [id_] = PurePosixPath(url.path).relative_to("/").parts
+ return id_
+def _alpha_container_items(container: Element) -> list[Element]:
+ [h1, *items, top_link] = container
+ assert h1.tag == "h1"
+ assert top_link.text == "Back to top"
+ return items
+def build_materials_request(show_id: int) -> Request:
+ return Request(f"{show_id}")
+def _merge_instruments(part_instrs: list[Instrument], double_instrs: PartInstruments) -> PartInstruments:
+ if part_instrs == [Instrument.GUITAR] and any("GUITAR" in for i in double_instrs.required):
+ required = double_instrs.required
+ else:
+ required = [*part_instrs, *double_instrs.required]
+ return replace(double_instrs, required = required)
+def _parse_part(instr_text: str, doubling_text: Optional[str], quantity_text: str) -> list[Part]:
+ quantity = int(quantity_text)
+ ignore_texts = {
+ "ADD.GUITAR PARTS FOR #9 & #11",
+ }
+ ignore_patterns = [
+ r"\d+PC FULL SCORE VOL \d",
+ ]
+ if not doubling_text:
+ if instr_text in ignore_texts:
+ return []
+ if any(re.match(patt + "$", instr_text) for patt in ignore_patterns):
+ return []
+ dealted = re.match(r"((ALT|FEMALE VERS): )?(?P<main>.+)$", instr_text)["main"]
+ m = re.match(r"(OPT: )?(.+)$", dealted)
+ optional = bool(m[1])
+ deopted = m[2]
+ if ": " in deopted:
+ [part_name, extra_str] = deopted.split(": ")
+ assert part_name
+ extra_part_instrs = list(itertools.chain.from_iterable(
+ map(
+ parse_instrument,
+ itertools.chain.from_iterable(
+ [f"{m[1]} {m[3]}", f"{m[2]} {m[3]}"]
+ if (m := re.match(r"([^ ]+) & ([^ ]+) ([^ ]+)$", p))
+ else [p]
+ for p in extra_str.split(", ")
+ )
+ )
+ ))
+ elif part := KeyboardPart.parse(deopted.removesuffix(" ACT 1")):
+ return [replace(part, optional = optional, players = quantity)]
+ else:
+ part_name = deopted
+ extra_part_instrs = []
+ part_match = re.match(
+ (
+ r"(ON ?STAGE )?(?P<instr>.+?)( ?\d| 1 ?(?P<multi>& ?2)?| [A-C]| (?P<multi2>A-B)?)?"
+ + r"( \(DOUBLES (?P<dbl>.+)\))?$"
+ ),
+ part_name,
+ )
+ players = None
+ if doubling_text:
+ m = re.match(r"(\(OPTIONAL\) *, )?(.+)$", doubling_text)
+ if m[1]:
+ optional = True
+ deopted_doubling = m[2]
+ if m := re.match(r"\((\d) PLAYERS? (REQUIRED|MINIMUM)\)$", deopted_doubling):
+ double_instrs = PartInstruments(False, [], [])
+ players = int(m[1])
+ elif re.match(r"OPT: SUB FOR \w+( \d)?$", deopted_doubling):
+ double_instrs = PartInstruments(False, [], [])
+ optional = True
+ elif part_match["instr"].lower() in PERCUSSION_PART_NAMES:
+ # future enhancement: percussion instrument parsing
+ double_instrs = PartInstruments(False, [], [])
+ else:
+ double_instrs = parse_instrument_list(deopted_doubling, part_match["instr"])
+ else:
+ double_instrs = PartInstruments(False, [], [])
+ if part_match["dbl"]:
+ [doubled] = parse_instrument(part_match["dbl"])
+ double_instrs.required.append(doubled)
+ part_instrs = [
+ *(
+ [] if part_match["instr"].lower() in PART_NAMES
+ else parts_to_instruments(part_match["instr"])
+ ),
+ *extra_part_instrs,
+ ]
+ merged = _merge_instruments(part_instrs, double_instrs)
+ required = merged.required
+ optional_instrs = merged.optional
+ if part_match["multi"] or part_match["multi2"]:
+ if part_match["multi"]:
+ multis = [1, 2]
+ else:
+ multis = ["A", "B"]
+ if quantity == 1:
+ players = 2
+ else:
+ assert players is None and quantity == 2
+ instr = part_match["instr"].removesuffix("S")
+ return [
+ Part(
+ f"{instr} {multi}",
+ list(dict.fromkeys(itertools.chain(extra_part_instrs, required))),
+ optional_instruments = optional_instrs,
+ optional = optional,
+ )
+ for multi in multis
+ ]
+ deduped_required = list(dict.fromkeys(required))
+ derived_players = quantity if players is None else players
+ parts = [Part(
+ part_name,
+ deduped_required,
+ optional = optional,
+ players = derived_players,
+ optional_instruments = optional_instrs,
+ )]
+ return [p.maybe_convert_down() for p in parts]
+def get_orchestrations(materials_json: list) -> list[Orchestration]:
+ [cmd_obj] = [obj for obj in materials_json if obj["command"] == "insert" and obj["method"] == "html"]
+ # e.g. All Together Now!
+ if not cmd_obj["data"]:
+ # TODO validation against other show info?
+ return []
+ tree = parse_html("<root>" + cmd_obj["data"] + "</root>")
+ html_parts = deque(tree)
+ main_header = html_parts.popleft()
+ assert main_header.tag == "h1" and main_header.text.startswith("Materials for ")
+ # The first table lists materials such as libretti and piano-conductor scores. From what I can
+ # tell none of these are meant to be played in the orchestra during a production, with the
+ # exception of piano-conductor parts when also listed in the orchestration table.
+ aux_table = html_parts.popleft()
+ assert aux_table.tag == "table"
+ if len(aux_table) == 1:
+ assert all(el.tag == "th" for el in aux_table[0])
+ # TODO validation that no-orchestration shows are JR/KIDS or unreleased
+ return []
+ [thead, tbody] = aux_table
+ assert thead.tag == "thead"
+ assert [el.text for el in thead.iter("th")] == ["Resource", "Quantity"]
+ assert tbody.tag == "tbody"
+ def deact_key(tr: Element) -> str:
+ m = re.match(r"(.+?)( ACT ?\d)?(/KEYBOARD3)?$", tr[0].text.strip())
+ return m[1] + (m[3] or "")
+ resource_parts = []
+ for (deacted, group_iter) in groupby(tbody, deact_key):
+ group = list(group_iter)
+ [count] = {int(tr[1].text) for tr in group}
+ if part := KeyboardPart.parse(deacted):
+ # More than one piano-conductor part may be provided, but only one will be used in
+ # performance.
+ if count > 1:
+ part.players = count
+ elif part.players is None:
+ part.players = 1
+ resource_parts.append(part)
+ else:
+ assert (
+ deacted.lower() == "piano vocal score"
+ or not any(
+ part in deacted.lower() for part in ["keyboard", "piano", "synth"]
+ )
+ )
+ if resource_parts:
+ assert len(resource_parts) <= 2
+ # Some shows have both a standard and an alternate orchestration.
+ orchestrations = []
+ while html_parts:
+ h3 = html_parts.popleft()
+ assert h3.tag == "h3" and "orchestration" in h3.text.lower()
+ table = html_parts.popleft()
+ assert table.tag == "table"
+ [thead, tbody] = table
+ assert thead.tag == "thead"
+ assert [el.text for el in thead.iter("th")] == ["Instrumentation", "Doubling", "Quantity"]
+ assert tbody.tag == "tbody"
+ orchestration_parts = []
+ for (deacted, group_iter) in groupby(tbody, deact_key):
+ group = list(group_iter)
+ [doubling_val] = {tr[1].text for tr in group}
+ [count] = {tr[2].text for tr in group}
+ orchestration_parts.extend(
+ _parse_part(deacted.strip(), doubling_val and doubling_val.strip(), count.strip())
+ )
+ joined = resource_parts + orchestration_parts
+ all_parts = deduplicate_keyboard_parts(joined)
+ # one orchestration seems to have all parts marked optional by mistake
+ if all(p.optional for p in all_parts):
+ for p in all_parts:
+ p.optional = False
+ orchestrations.append(Orchestration(h3.text, all_parts))
+ return orchestrations
--- /dev/null
+from collections import Counter, defaultdict, deque
+from dataclasses import asdict, dataclass, replace
+from enum import Enum
+from itertools import groupby, pairwise
+from os import environ
+from typing import Collection, Iterable, Iterator, Optional, Union
+import dataclasses
+import itertools
+import re
+from pithub.common import throwing_factory
+def parse_orchestrations(lines: list[str]) -> Iterator["Orchestration"]:
+ remaining = [l.strip() for l in lines if not l.isspace()]
+ while remaining:
+ if (res := _pop_uninteresting_section(remaining)) is not None:
+ remaining = res
+ else:
+ result = Orchestration.pop_from_lines(remaining)
+ if isinstance(result, Orchestration.ParseFailure):
+ raise RuntimeError(f"Orchestration parsing stuck at line: {result.stuck_line!r}")
+ else:
+ (orchestration, remaining) = result
+ yield orchestration
+def _fix_bass_part(parts: list["Part"]) -> None:
+ # Does "bass" mean upright or bass guitar? We should be able to find out heuristically.
+ Kind = Enum("Kind", ["BASS", "STRING", "OTHER"])
+ def classify(part: Part) -> Kind:
+ if == "Bass":
+ return Kind.BASS
+ elif {*part.required_instruments, *part.optional_instruments} & STRING_INSTRUMENTS:
+ return Kind.STRING
+ else:
+ return Kind.OTHER
+ groups = [(kind, list(parts)) for (kind, parts) in groupby(parts, classify)]
+ bass_idxs = [i for (i, (kind, _)) in enumerate(groups) if kind is Kind.BASS]
+ if bass_idxs:
+ [bass_idx] = bass_idxs
+ [bass_part] = groups[bass_idx][1]
+ adjacent_kinds = [
+ groups[i][0] for i in [bass_idx - 1, bass_idx + 1]
+ if 0 <= i < len(groups)
+ ]
+ if not bass_part.required_instruments and not bass_part.optional_instruments:
+ if Kind.STRING in adjacent_kinds:
+ bass_part.required_instruments.insert(0, Instrument.UPRIGHT_BASS)
+ elif bass_part._double:
+ # We don't want to miss e.g. a guitar part calling for acoustic guitar implicitly by
+ # merely stating it doubles electric guitar.
+ assert {Instrument.BASS_GUITAR, Instrument.UPRIGHT_BASS} <= {
+ *bass_part.required_instruments, *bass_part.optional_instruments,
+ }
+class Orchestration:
+ @dataclass
+ class ParseFailure:
+ stuck_line: str
+ description: str
+ parts: list["Part"]
+ @classmethod
+ def pop_from_lines(cls, lines: list[str]) -> tuple["Orchestration", list[str]]|ParseFailure:
+ queue = deque(lines)
+ if "orchestration" in queue[0].lower() or queue[0].endswith(":"):
+ title_line = queue.popleft()
+ title = title_line.removesuffix(":")
+ assert "orchestration" in title.lower() or"\d-Piece\b", title)
+ else:
+ title = None
+ parts = []
+ optional = False
+ check_piano = False
+ while queue:
+ if queue[0] == "Optional Orchestra Parts:":
+ queue.popleft()
+ optional = True
+ elif (m := re.match(r"\(([^\d]+)\)$", queue[0])) and m[1] in _PART_NOTES:
+ queue.popleft()
+ elif re.match(r"\(Note: .+\)$", queue[0]):
+ queue.popleft()
+ else:
+ if environ.get("LIVE_DEBUG"):
+ result = Part.parse_from_line(queue[0])
+ else:
+ try:
+ result = Part.parse_from_line(queue[0])
+ except Exception:
+ raise RuntimeError(f"Error parsing line: {queue[0]!r}")
+ if result is None:
+ break
+ else:
+ queue.popleft()
+ if isinstance(result, list):
+ for v in result:
+ part = v.part if isinstance(v, _ActPart) else v
+ part.optional |= optional
+ parts.append(v)
+ else:
+ assert isinstance(result, IgnoredPart)
+ if result is IgnoredPart.REHEARSAL_PIANO:
+ check_piano = True
+ if parts:
+ zipped_parts = []
+ key = lambda v: (
+ isinstance(v, _ActPart),
+ v.part if isinstance(v, _ActPart) else v
+ )
+ for ((act, part), group) in groupby(parts, key):
+ if act:
+ assert len(list(group)) > 1
+ zipped_parts.append(part)
+ else:
+ zipped_parts.extend(group)
+ assert all(isinstance(p, Part) for p in zipped_parts)
+ for part in zipped_parts:
+ if Instrument.KEYBOARD in part.instruments():
+ assert isinstance(part, KeyboardPart)
+ # TODO clean this up
+ converted = [p.maybe_convert_down() for p in zipped_parts]
+ if check_piano:
+ # We ignore rehearsal-specific parts, but if we see one we expect a real piano
+ # performance part.
+ assert any(isinstance(p, KeyboardPart) for p in converted)
+ fixed_parts = deduplicate_keyboard_parts(converted)
+ _fix_bass_part(fixed_parts)
+ return (cls(title, fixed_parts), list(queue))
+ else:
+ return cls.ParseFailure(queue[0])
+class _ActPart:
+ """
+ Sometimes two parts are listed when one logical part is physically separated by act (in
+ practice, large keyboard parts). We parse these separately but then zip them up, so this tags
+ them internally in the parser interface.
+ """
+ part: "Part"
+def parts_to_instruments(parts: str) -> list["Instrument"]:
+ ret = list(itertools.chain.from_iterable(
+ parse_instrument(re.sub(r" \d$", "", subpart.removeprefix("AKA ")))
+ for subpart in re.split("/|, | & ", parts)
+ ))
+ assert all(isinstance(v, Instrument) for v in ret)
+ return ret
+def _is_coverage_note(detail: str) -> bool:
+ num_patt = r"([1-9]*[0-9]a?)"
+ token_patt = fr"{num_patt}(-{num_patt})?"
+ return bool(re.match(fr"Nos\. {token_patt}(, {token_patt})*$", detail))
+ r"(?P<count>\d+)(?P<opt> Optional)?"
+ + r" +(?P<part>(?P<instr>[A-Za-z &]+?([ /-][A-Za-z1-2]+){,2}?)( +(?P<nums>\d+(&\d+)*)| [A-D])?)"
+ + r" *(\(((?P<opt2>Optional)|(?P<paren>[^)]+))\))?"
+ + r"( +[–—-] +(?P<list>.+?)(?P<opt3> \(optional\))?| +\((?P<rp>rehearsal|performance)\))?$"
+class Part:
+ """
+ Generally this corresponds one-to-one with an entry in an orchestration list (where the entry
+ represents something actually played during a performance), but entries like "Percussion 1 & 2"
+ are expanded into two instances.
+ """
+ name: Optional[str]
+ required_instruments: list[Union["Instrument", "Choice"]]
+ players: Optional[int] = None # None is module internal use only
+ optional: bool = False
+ role: Optional["AdditionalRole"] = None
+ optional_instruments: list["Instrument"] = dataclasses.field(default_factory = list)
+ _double: bool = False # module internal use
+ def __post_init__(self) -> None:
+ # remove duplicate instruments from each list
+ self.required_instruments = list(dict.fromkeys(self.required_instruments))
+ self.optional_instruments = list(dict.fromkeys(self.optional_instruments))
+ for v in self.instruments():
+ # TODO re-enable these checks when test serialization nonsense is fixed
+ """
+ if isinstance(v, Choice):
+ assert all(isinstance(opt, Instrument) for opt in v.choices)
+ else:
+ assert isinstance(v, Instrument)
+ """
+ @classmethod
+ def parse_from_line(cls, line: str) -> Optional[Union[list[Union["Part", "_ActPart"]], "IgnoredPart"]]:
+ # TODO much of this logic is currently Concord-specific and should be factored out into
+ # concord module
+ if re.match(r"\d (Logo Pack|Pre-Show Music Lead Sheets)$", line):
+ return IgnoredPart.OTHER
+ elif match := re.match(_PART_PATTERN, line):
+ is_optional = bool(match["opt"] or match["opt2"] or match["opt3"])
+ player_count = int(match["count"])
+ if match["paren"] and (m := re.match(r"(\d+) players?(,? divisi)?$", match["paren"], re.IGNORECASE)):
+ player_count = int(m[1])
+ explicit_player_count = True
+ elif match["paren"] == "one or more players":
+ player_count = 1
+ explicit_player_count = True
+ else:
+ explicit_player_count = False
+ if match["nums"]:
+ ordinals = list(map(int, match["nums"].split("&")))
+ else:
+ ordinals = []
+ rps = set(filter(None, [match["paren"], match["rp"]])) & {"rehearsal", "performance"}
+ if match["part"] == "Full Score":
+ assert not match["opt"]
+ assert re.match(r"Act \d \(stick conductor\)$", match["list"])
+ return IgnoredPart.CONDUCTOR
+ elif match["part"] in {"Libretto-Vocal Book", "Vocal Book", "Libretti"}:
+ return IgnoredPart.LIBRETTO
+ elif match["instr"] == "Piano-Vocal" and rps <= {"rehearsal"}:
+ # TODO try converting instr check to assertion to tighten rp checking
+ assert not match["opt"]
+ return IgnoredPart.REHEARSAL_PIANO
+ elif part := KeyboardPart.parse(match["part"]):
+ # ignore part count as materials often include extras of these
+ if is_optional:
+ part = replace(part, optional = True)
+ if match["list"]:
+ assert match["list"].lower() in {
+ "doubling celeste",
+ "intended to supplement a small string section",
+ "primarily accordion",
+ "replaces violin, viola & cello",
+ }
+ include = True
+ if match["paren"]:
+ act_part = bool(re.match(r"Act \d$", match["paren"]))
+ if match["paren"] == "rehearsal & stick conductor":
+ include = False
+ else:
+ assert (
+ match["paren"] == "rehearsal & performance"
+ or act_part and match["rp"] == "performance"
+ )
+ else:
+ act_part = False
+ if include:
+ return [_ActPart(part) if act_part else part]
+ else:
+ return []
+ elif player_count == 1:
+ if match["paren"] and not explicit_player_count:
+ assert match["paren"] == "** FOLEY ARTIST FISH"
+ optional_instr = []
+ if match["list"] and not _is_coverage_note(match["list"]):
+ assert not is_optional
+ if match["list"] == "optional part":
+ list_text = ""
+ is_optional = True
+ elif match["list"].startswith("(optional)"):
+ list_text = match["list"].removeprefix("(optional)").strip()
+ is_optional = True
+ else:
+ list_text = match["list"]
+ if list_text.startswith("primarily "):
+ primarily = list_text.removeprefix("primarily ")
+ else:
+ primarily = None
+ if list_text:
+ if match["instr"].lower() in PART_NAMES:
+ context_instr = match["instr"]
+ else:
+ context_instr = parts_to_instruments(match["instr"])[0].name # Concord 93490
+ res = parse_instrument_list(primarily or list_text, context_instr)
+ else:
+ res = PartInstruments(False, [], [])
+ required = res.required
+ optional_instr = res.optional
+ if match["instr"].lower() not in PART_NAMES:
+ from_part = parts_to_instruments(match["instr"])
+ if from_part == [Instrument.KEYBOARD]:
+ required = from_part
+ primarily = None
+ optional_instr = []
+ elif Instrument.GUITAR in from_part:
+ has_specific_instr = any(
+ "GUITAR" in for i in required if i is not Instrument.GUITAR
+ )
+ if not res.double and has_specific_instr:
+ required = [
+ i for i in dict.fromkeys(itertools.chain(from_part, required))
+ if i is not Instrument.GUITAR
+ ]
+ else:
+ required = list(dict.fromkeys(itertools.chain(from_part, required)))
+ else:
+ required = list(dict.fromkeys(itertools.chain(from_part, required)))
+ double = res.double
+ if primarily:
+ assert set(parse_instrument(primarily)) <= {*required, *optional_instr}
+ else:
+ if match["instr"].lower() in PART_NAMES:
+ # non-instrument part with no instrument information
+ required = []
+ else:
+ required = parts_to_instruments(match["part"])
+ double = False
+ return [
+ cls(
+ name = match["part"],
+ required_instruments = required,
+ players = 1,
+ optional_instruments = optional_instr,
+ optional = is_optional,
+ _double = double,
+ )
+ ]
+ elif player_count > 1 and match["part"].endswith("s"):
+ assert not is_optional
+ assert not match["paren"] and not match["list"]
+ clean_part = match["part"].removesuffix("s")
+ required = parse_instrument(clean_part)
+ return [cls(name = clean_part, required_instruments = required, players = player_count)]
+ else:
+ assert not is_optional
+ if match["instr"].lower() in PART_NAMES:
+ assert player_count == 2
+ if len(ordinals) == 2:
+ if match["list"]:
+ sublists = match["list"].split(". ")
+ assert len(sublists) == 2
+ players = {
+ (m := re.match(r"(.+?) Player: (.+)$", sublist))[1]:
+ parse_instrument_list(m[2], match["instr"])
+ for sublist in sublists
+ }
+ optional = match["paren"] and match["paren"].removesuffix(" Player is optional")
+ else:
+ # non-instrument part with no instrument information
+ players = {
+ "{} {}".format(match["instr"], ordinal): PartInstruments(False, [], [])
+ for ordinal in ordinals
+ }
+ optional = None
+ if optional:
+ assert optional in players
+ return [
+ cls(
+ name = "{} {}".format(match["instr"], ordinal),
+ required_instruments = instruments.required,
+ players = 1,
+ optional_instruments = instruments.optional,
+ optional = optional is not None and name == optional,
+ _double = instruments.double,
+ )
+ for (ordinal, (name, instruments)) in zip(ordinals, players.items())
+ ]
+ else:
+ assert not ordinals
+ return [cls(match["part"], required_instruments = [], players = player_count)]
+ elif player_count == len(ordinals) == 2:
+ assert not match["paren"]
+ if match["list"]:
+ # TODO is this code dead?
+ raise RuntimeError("expected dead code; pipe double indication for bass part munging")
+ assert "dbl" in match["list"]
+ double = parse_instrument(match["list"].removeprefix("Both dbl. "))
+ else:
+ double = []
+ return [
+ cls(
+ name = "{} {}".format(match["instr"], ordinal),
+ required_instruments = [*parse_instrument(match["instr"]), *double],
+ players = 1,
+ _double = False,
+ )
+ for ordinal in ordinals
+ ]
+ else:
+ assert not match["list"] or match["list"].lower() == "divisi"
+ assert not match["rp"]
+ return [cls(
+ name = match["part"],
+ required_instruments = parts_to_instruments(match["instr"]),
+ players = player_count,
+ )]
+ else:
+ return None
+ def instruments(self) -> Iterator[Union["Instrument", "Choice"]]:
+ return itertools.chain(self.required_instruments, self.optional_instruments)
+ def maybe_convert_down(self) -> "Part":
+ if not isinstance(self, KeyboardPart) and set(self.instruments()) == {Instrument.KEYBOARD}:
+ raise RuntimeError("obsolete")
+ if ordinals := re.findall(r"\d+",
+ [ordinal] = ordinals
+ else:
+ ordinal = None
+ kind = (
+ KeyboardPart.Kind.PRIMARY if ordinal in [None, "1"] and any(
+ keyword in
+ for keyword in ["piano", "keyboard", "synthesizer"]
+ )
+ else KeyboardPart.Kind.AUXILIARY
+ )
+ return KeyboardPart(
+ **asdict(self),
+ ordinal = int(ordinal) if ordinal else None,
+ )
+ else:
+ return self
+def deduplicate_keyboard_parts(parts: Iterable[Part]) -> list[Part]:
+ """
+ Keyboard 1, Piano
+ Piano-Conductor, Synthesizer
+ """
+ pc_parts = [
+ part for part in parts
+ if isinstance(part, KeyboardPart) and part.role is AdditionalRole.CONDUCTOR
+ ]
+ if pc_parts:
+ pc_part = max(pc_parts, key = lambda p: len(
+ if pc_part.ordinal is None:
+ conflicts = [
+ part for part in parts if (
+ isinstance(part, KeyboardPart)
+ and part.role is None
+ and set(part.instrument_names) & {"piano", "keyboard"}
+ and part.ordinal in [None, 1]
+ )
+ ]
+ else:
+ assert set(pc_part.instrument_names) <= {"piano", "keyboard"}
+ conflicts = [
+ part for part in parts if (
+ part is not pc_part and isinstance(part, KeyboardPart)
+ and part.instrument_names == ["keyboard"] and part.ordinal == pc_part.ordinal
+ )
+ ]
+ else:
+ pc_part = None
+ conflicts = []
+ return [
+ part.convert_up() if isinstance(part, KeyboardPart) else part
+ for part in parts
+ if (
+ part is pc_part
+ or (isinstance(part, KeyboardPart) and part.role is None and part not in conflicts)
+ or not isinstance(part, KeyboardPart)
+ )
+ ]
+class KeyboardPart(Part):
+ Kind = Enum("Kind", ["PRIMARY", "AUXILIARY"])
+ # Due to fields in the supertype with defaults these must have defaults, but we enforce that
+ # they're always explicitly specified.
+ instrument_names: list[str] = dataclasses.field(default_factory = throwing_factory)
+ ordinal: Optional[int] = dataclasses.field(default_factory = throwing_factory)
+ @classmethod
+ def parse(cls, part_name: str) -> Optional["KeyboardPart"]:
+ patt = (
+ r"(?P<base>keyboard|piano|(string )?synth(esizer)?)(?P<pl>s)? ?(?P<ord>\d)?"
+ + r" ?-? ?(?P<cond>conductor(('s)? score)?)?"
+ + r"(/(aka )?(?P<base2>keyboard|synthesizer) ?(?P<ord2>\d)?)?$"
+ )
+ if m := re.match(patt, part_name.lower()):
+ base = re.sub(r"\bsynth\b", "synthesizer", m["base"])
+ if m["ord"] or m["ord2"]:
+ [ordinal] = set(filter(None, [m["ord"], m["ord2"]]))
+ else:
+ ordinal = None
+ if m["pl"]:
+ # plural part name suggests >1 player
+ assert not (ordinal or m["cond"] or m["base2"])
+ players = 2
+ else:
+ players = None
+ return KeyboardPart(
+ part_name,
+ [Instrument.KEYBOARD],
+ players,
+ role = AdditionalRole.CONDUCTOR if m["cond"] else None,
+ instrument_names = [n for n in [base, m["base2"]] if n],
+ ordinal = int(ordinal) if ordinal else None,
+ )
+ elif m := re.match(r"(organ|piano)[/-](dulcimer|synthesizer|celeste)$", part_name.lower()):
+ return KeyboardPart(
+ part_name,
+ [Instrument.KEYBOARD],
+ instrument_names = [m[1], m[2]],
+ ordinal = None,
+ )
+ else:
+ return None
+ def convert_up(self) -> Part:
+ generic_field_names = { for field in dataclasses.fields(Part)}
+ params = {
+ name: value
+ for (name, value) in asdict(self).items()
+ if name in generic_field_names
+ }
+ return Part(**params)
+class PartInstruments:
+ # whether these instruments are from a list marked as "doubling"
+ double: bool = False
+ required: list[Union["Instrument", "Choice"]] = (
+ dataclasses.field(default_factory = list)
+ )
+ optional: list[Union["Instrument", "Choice"]] = (
+ dataclasses.field(default_factory = list)
+ )
+ def merge(self, other: "PartInstruments") -> "PartInstruments":
+ [double] = {self.double, other.double}
+ return PartInstruments(
+ double = double,
+ required = self.required + other.required,
+ optional = self.optional + other.optional,
+ )
+def parse_instrument_list(list_: str, part_instr: str) -> PartInstruments:
+ def parse_list_part(list_part: str) -> list[Union[Instrument, Choice]]:
+ if m := re.match(r"(?P<a>.+) \(or (?P<b>.+)\)$", list_part, re.IGNORECASE):
+ # This could be an instrument choice, or for percussion a note suggesting a way to
+ # emulate the instrument; in the latter case we ignore the note.
+ [a] = parse_instrument(m["a"], part_instr)
+ try:
+ [b] = parse_instrument(m["b"], part_instr)
+ except ParseError:
+ pass
+ else:
+ return [Choice((a, b))]
+ elif m := re.match(r"(?P<a>.+) DOUBLES (?P<b>.+)$", list_part):
+ return parse_instrument(m["a"], part_instr) + parse_instrument(m["b"], part_instr)
+ return parse_instrument(list_part, part_instr)
+ def split_simple_list(simple_list: str) -> Iterator[str]:
+ if"\([^)]*$", simple_list):
+ simple_list += ")"
+ # split into parenthesized and non-parenthesized parts
+ paren_tok = re.split(r"(?=\()|(?<=\)(?!$))", simple_list)
+ # split only unparenthesized parts as instrument lists
+ split = list(itertools.chain.from_iterable(
+ re.split(r" *, (?![^(]*\))| (?:&|and) +(?!.*,)", tok) if tok[0] != "(" else [tok]
+ for tok in paren_tok
+ ))
+ # combine parenthesized parts with preceding list elements
+ for (a, b, c) in zip(itertools.chain([None], split), split, itertools.chain(split[1:], [None])):
+ if b and b[0] == "(":
+ assert not a or a[0] != "(" and not c or c[0] != "("
+ yield "".join(filter(None, [a, b, c]))
+ elif (not a or a[0] != "(") and (not c or c[0] != "("):
+ yield b
+ def parse_simple_list(simple_list: str) -> PartInstruments:
+ # match a comma not contained within parentheses, or &/and appearing after any commas
+ required = []
+ optional = []
+ for part in split_simple_list(simple_list):
+ deopt_patt = r"(?P<opt>\(?(optional|opt\.)\)? *)?(?P<part>.+?)(?P<opt2> \((optional|opt\.)\))?$"
+ m = re.match(deopt_patt, part.strip(), re.IGNORECASE)
+ if part_instr.lower() not in PERCUSSION_PART_NAMES or m["part"] not in _PERCUSSION_WTF:
+ (optional if m["opt"] or m["opt2"] else required).extend(parse_list_part(m["part"]))
+ return PartInstruments(False, required, optional)
+ (dbl, dedoubled) = re.match(r"(dbl\. |doubles |doubling )?(.+)$", list_, re.IGNORECASE).groups()
+ if m := re.match(r"(.+)\. (.+) and (.+) are double lined for (.+)$", dedoubled):
+ [denoted, *note_instr_names] = m.groups()
+ else:
+ denoted = dedoubled
+ note_instr_names = []
+ simple_lists = denoted.split(" & optional ")
+ if len(simple_lists) == 2:
+ (main, optional) = map(parse_simple_list, simple_lists)
+ assert not main.optional and not optional.optional
+ result = PartInstruments(bool(dbl), main.required, optional.required)
+ else:
+ [simple_list] = simple_lists
+ result = replace(parse_simple_list(simple_list), double = bool(dbl))
+ for note_instr_name in note_instr_names:
+ [note_instr] = parse_instrument(note_instr_name)
+ assert note_instr in result.required
+ return result
+class ParseError(Exception):
+ pass
+def parse_instrument(text: str, context_instr: Optional[str] = None) -> list["Instrument"]:
+ def to_identifier(name: str) -> str:
+ identifier = re.sub(
+ r"\s|[-']", "_",
+ name.upper().replace(" - ", " ").replace("’", "").replace('"', "")
+ )
+ if identifier[0].isdigit():
+ identifier = "_" + identifier
+ return identifier
+ normalized = text.lower()
+ if context_instr and context_instr.lower() == "guitar":
+ normalized = re.sub(r"( +|-)(strat(ocaster)?|gretsch|telecaster)$", "", normalized)
+ normalized = re.sub(r"(?<=\b[a-g])(b|[- ]flat)\b", "â™", normalized)
+ normalized = re.sub(r"\bacous\.", "acoustic", normalized)
+ normalized = re.sub(r"\bbari\b", "baritone", normalized)
+ normalized = re.sub(r"\bcym\b\.?", "cymbal", normalized)
+ normalized = re.sub(r"\belec\.", "electric", normalized)
+ normalized = re.sub(r"\bglock\.", "glockenspiel", normalized)
+ normalized = re.sub(r"\bgtr\.", "guitar", normalized)
+ normalized = re.sub(r"\borch\b", "orchestra", normalized)
+ normalized = re.sub(r"\bsax\b", "saxophone", normalized)
+ normalized = re.sub(r"\bsop\. *", "soprano ", normalized)
+ normalized = re.sub(r"\bstrg\b", "string", normalized)
+ normalized = re.sub(r"\bsus\.", "suspended", normalized)
+ normalized = re.sub(r"\bsynth\b", "synthesizer", normalized)
+ normalized = re.sub(r"\bten\. *", "tenor ", normalized)
+ m = re.match(r"(?P<name>.+?)( *\((?P<note>.+)\))?$", normalized)
+ name = m["name"].lower()
+ if context_instr:
+ if context_instr.lower() in PERCUSSION_PART_NAMES: # "2 Timpani"
+ name = re.sub(r"^\d+ ", "", name)
+ name = name.removeprefix("loud ")
+ if name == "bass":
+ assert not m["note"]
+ return [Instrument.BASS_DRUM]
+ elif context_instr.lower() == "guitar":
+ name = name.removeprefix("6-string ")
+ @dataclass
+ class Form:
+ value: str
+ using_note: bool = False
+ forms = [
+ Form(name),
+ Form(name[:-1] if name.endswith("s") else name + "s"),
+ ]
+ if context_instr:
+ forms.append(Form(name + " " + context_instr))
+ if m["note"]:
+ forms.append(Form(m["note"] + " " + name, True))
+ for (i, form) in enumerate(forms):
+ identifier = to_identifier(form.value)
+ parsed = _INSTRUMENT_TYPOS.get(identifier)
+ if not parsed:
+ try:
+ parsed = Instrument[identifier]
+ except KeyError as key_error:
+ if i == 0:
+ orig_err = key_error
+ if parsed is Instrument.DRUM_KIT and m["note"]:
+ return list(itertools.chain.from_iterable(
+ parse_instrument(part, context_instr)
+ for part in re.split(r", | (?:and|&) ", m["note"])
+ ))
+ elif parsed:
+ if m["note"]:
+ note_okay = (
+ form.using_note
+ or m["note"] in _INSTRUMENT_NOTES.get(parsed, [])
+ or re.match(r"\d+(-\d+)?(, \d+(-\d+)?)*$", m["note"])
+ )
+ if not note_okay:
+ raise ValueError(f"Unrecognized note {m['note']!r} for {}")
+ return [parsed]
+ raise ParseError(f"Unhandled instrument {text!r}") from orig_err
+IgnoredPart = Enum("IgnoredPart", ["CONDUCTOR", "LIBRETTO", "REHEARSAL_PIANO", "OTHER"])
+@dataclass(frozen = True)
+class Choice:
+ """For use where the orchestration indicates any of several instruments is sufficient."""
+ choices: tuple["Instrument", ...]
+ "additional materials?:?",
+ "delivered digitally:",
+ "digital download",
+ "optional keyboard patches/programming:",
+def _is_ignored_header(line: str) -> bool:
+ return bool(any(
+ re.match(patt + "$", line, re.IGNORECASE)
+ ))
+def _may_be_orchestration_header(line: str) -> bool:
+ return "orchestration" in line.lower() or line.endswith(":")
+def _ignorable(line: str) -> bool:
+ if line.startswith("Note:") and line.endswith(".") or line == "Logo":
+ return True
+ if _may_be_orchestration_header(line):
+ return False
+ return not re.match(_PART_PATTERN, line)
+def _pop_uninteresting_section(lines: list[str]) -> Optional[list[str]]:
+ if _is_ignored_header(lines[0]):
+ end = next(
+ (
+ i for (i, line) in enumerate(lines[1:], start = 1)
+ if not _ignorable(line) or _is_ignored_header(line)
+ ),
+ len(lines)
+ )
+ assert end <= 4 # at most 2 lines of section text
+ return lines[end:]
+ elif _ignorable(lines[0]):
+ return lines[1:]
+ else:
+ return None
+# There are special considerations for parsing percussion detail sections because of how diverse the
+# instruments/sounds called for can be.
+ "drums & percussion",
+ "drums",
+ "drums/percussion",
+ "percussion",
+# Many parts are named after an instrument, which is an implicit requirement for the part. In
+# contrast, parts with these names don't have a clearly implied instrument, and what instruments are
+# called for is dictated by a separate instrument list.
+ "bass",
+ "drums",
+ "flexbo",
+ "reed",
+ "woodwind",
+ "woodwinds",
+Instrument = Enum(
+ "Instrument",
+ [
+ "BANJO",
+ "CELLO",
+ "DAF",
+ "FLUTE",
+ "GONG",
+ "GUITAR", # acoustic/electric unknown
+ "GÃœIRO",
+ "HARP",
+ "HI_HAT",
+ "OBOE",
+ "RECORDER", # no obvious default size
+ "TAIKO",
+ "TUBA",
+ "VIOLA",
+ "WHIP",
+ ]
+ "AFRICAN_DRUM": Instrument.DJEMBE,
+ "BIG_SHAKER": Instrument.EGG_SHAKER,
+ "BONGOS": Instrument.BONGO_DRUMS,
+ "Bâ™_CLARINET": Instrument.CLARINET,
+ "CL": Instrument.CLARINET, # :(
+ "CLAR": Instrument.CLARINET,
+ "CLAR.": Instrument.CLARINET,
+ "COWBELLS": Instrument.COWBELL,
+ "COW_BELL": Instrument.COWBELL,
+ "C_FLUTE": Instrument.FLUTE,
+ "DARBUKA": Instrument.GOBLET_DRUM,
+ "DRUMS": Instrument.DRUM_KIT,
+ "DRUM_SET": Instrument.DRUM_KIT,
+ "ENG._HORN": Instrument.ENGLISH_HORN,
+ "FIDDLE": Instrument.VIOLIN,
+ "FL": Instrument.FLUTE,
+ "GOURD": Instrument.GÃœIRO,
+ "GRAN_CASSA": Instrument.BASS_DRUM,
+ "GUIRO": Instrument.GÃœIRO,
+ "G_FLUTE": Instrument.ALTO_FLUTE,
+ "HIGH_HAT": Instrument.HI_HAT,
+ "HORN": Instrument.FRENCH_HORN,
+ "ICE_BELL": Instrument.BELL_CYMBAL,
+ "KIT": Instrument.DRUM_KIT,
+ "ORGAN": Instrument.KEYBOARD,
+ "PC": Instrument.PICCOLO, # yikes
+ "PIANO": Instrument.KEYBOARD,
+ "PICC.": Instrument.PICCOLO,
+ "PIZZA_DRUM": Instrument.BASS_DRUM,
+ "RACHET": Instrument.RATCHET,
+ "RIQ": Instrument.DAF,
+ "SCRAPER": Instrument.GÃœIRO,
+ "SHAKER": Instrument.EGG_SHAKER,
+ "SIREN": Instrument.ACME_SIREN,
+ "SLAPSTICK": Instrument.WHIP,
+ "SMALL_TOM": Instrument.TOM_DRUM,
+ "SNARE": Instrument.SNARE_DRUM,
+ "SOCK_CYMBAL": Instrument.HI_HAT,
+ "TAM_TAM": Instrument.SLIT_DRUM,
+ "TENOR_HORN": Instrument.ALTO_HORN,
+ "TOMS": Instrument.TOM_DRUM,
+ "TOM_TOMS": Instrument.TOM_DRUM,
+ "TRAP_SET": Instrument.DRUM_KIT,
+ "TYMPANI": Instrument.TIMPANI,
+ "T_BLOX": Instrument.TEMPLE_BLOCKS,
+ "VIBES": Instrument.VIBRAPHONE,
+ "VIOLONCELLO": Instrument.CELLO,
+ "_5_STRING_BASS": Instrument.BASS_GUITAR,
+ Instrument.VIOLIN,
+ Instrument.VIOLA,
+ Instrument.CELLO,
+ Instrument.UPRIGHT_BASS,
+# instruments that could be implied by a "Bass" part
+ Instrument.BASS_GUITAR,
+ Instrument.UPRIGHT_BASS,
+ "ANVIL",
+ "Brushes",
+ "Drum Sticks",
+ "Electronic Drum Pad",
+ "Hand Drums",
+ "Handclap",
+ "Huge Electronic Hits", # ??
+ "Industrial Sounds",
+ "Mallet KAT",
+ "Oriental Drum (deep)",
+ "PAD",
+ "POP GUN",
+ "Pop Cork Gun (or similar)",
+ "Popgun",
+ "Rubber Udders",
+ "Storm Drums",
+ '"Noisy Things" (Clanky Noisemakers)',
+ "played by actors, if possible, for certain numbers only",
+ Instrument.BAMBOO_FLUTE: {
+ "f",
+ "g",
+ },
+ Instrument.BASS_DRUM: {
+ "large drum with ominous, bass drum quality",
+ },
+ Instrument.COWBELL: {
+ "high, medium, low",
+ },
+ Instrument.CYMBALS: {
+ "various suspended, splash, chip, ride",
+ },
+ Instrument.FIELD_DRUM: {
+ "or snare drum w/o snares",
+ },
+ Instrument.GLOCKENSPIEL: {
+ "bells",
+ },
+ Instrument.TEMPLE_BLOCKS: {
+ "3 pitches",
+ "5 pitches",
+ },
+ Instrument.TOM_DRUM: {
+ "3",
+ },
+ Instrument.TUBULAR_BELLS: {
+ "bâ™, eâ™",
+ "e",
+ },
+ Instrument.WHIP: {
+ "whip",
+ },
+ Instrument.WOOD_BLOCK: {
+ "2 pitches",
+ "hi & low",
+ "high and low",
+ },
+AdditionalRole = Enum("AdditionalRole", ["CONDUCTOR"])
--- /dev/null
+from dataclasses import dataclass
+from difflib import unified_diff
+from itertools import groupby, islice
+from pathlib import Path
+from sys import argv
+from time import sleep
+from typing import Iterable, Iterator
+from urllib.request import urlopen
+import dataclasses
+import itertools
+import pickle
+from pithub.common import decode_response
+from pithub.concord import (
+ Location, fetch_productions, build_show_p_request, get_orchestration_modal,
+ lineify_orchestration_modal,
+from pithub.html import parse_html
+from pithub.orchestration import Choice, Instrument, Orchestration, parse_orchestrations
+def format_orch(orch: Orchestration, indent: int) -> list[str]:
+ def format_instr(val: Instrument | Choice) -> str:
+ if isinstance(val, Instrument):
+ return
+ else:
+ return " | ".join( for instr in val.choices)
+ out = []
+ out.append(" " * indent + str(orch.description))
+ for part in
+ out.append(
+ " " * (indent + 1)
+ + f"{} {part.players}p"
+ + (" (opt)" if part.optional else "")
+ )
+ for instr in part.required_instruments:
+ out.append(" " * (indent + 2) + format_instr(instr))
+ for instr in part.optional_instruments:
+ out.append(" " * (indent + 2) + f"{format_instr(instr)} (opt)")
+ if part.role:
+ out.append(" " * (indent + 2) + f"+{}")
+ return out
+def print_orch(orch: Orchestration, indent: int):
+ for line in format_orch(orch, indent):
+ print(line)
+def deser_instr_list(l) -> None:
+ for (i, v) in enumerate(l):
+ if isinstance(v, str):
+ l[i] = Instrument[v]
+ else:
+ assert isinstance(v, Choice)
+ l[i] = Choice(tuple(Instrument[n] for n in v.choices))
+def ser_instr_list(l) -> None:
+ for (i, v) in enumerate(l):
+ if isinstance(v, Instrument):
+ l[i] =
+ else:
+ assert isinstance(v, Choice)
+ l[i] = Choice(tuple( for p in v.choices))
+def ser_instr(v):
+ if isinstance(v, Instrument):
+ return
+ else:
+ assert isinstance(v, Choice)
+ return Choice(tuple( for p in v.choices))
+def ser_instr_list_2(l) -> list:
+ out = []
+ for v in l:
+ if isinstance(v, Instrument):
+ out.append(
+ else:
+ assert isinstance(v, Choice)
+ out.append(Choice([ for p in v.choices]))
+ return out
+def dedup_stream(it: Iterable) -> Iterator:
+ seen = set()
+ for val in it:
+ if val not in seen:
+ yield val
+ seen.add(val)
+def do_concord():
+ test_db_path = Path("test_db")
+ test_db_path.mkdir(exist_ok = True)
+ def extract_id(path):
+ return int("_")[0])
+ test_db_ids = map(extract_id, test_db_path.iterdir())
+ prods = fetch_productions(
+ Location(45.1288, -92.9073),
+ Location(44.7389, -93.6969),
+ )
+ geo_ids = (v for (v, _) in groupby(prods, lambda p: p.show_id))
+ all_ids = dedup_stream(itertools.chain(test_db_ids, geo_ids))
+ for (i, show_id) in islice(enumerate(all_ids), 34, None):
+ print(f"{i}: {show_id}")
+ modal_p = test_db_path.joinpath(f"{show_id}_modal.pickle")
+ orchs_p = test_db_path.joinpath(f"{show_id}_orchs.pickle")
+ maybe_install = False
+ if orchs_p.exists():
+ with"rb") as f:
+ modal = pickle.load(f)
+ with"rb") as f:
+ exp_orchs = pickle.load(f)
+ for orch in exp_orchs:
+ for p in
+ deser_instr_list(p.required_instruments)
+ deser_instr_list(p.optional_instruments)
+ lines = lineify_orchestration_modal(modal)
+ act_orchs = list(parse_orchestrations(lines))
+ if act_orchs == exp_orchs:
+ print("pass")
+ else:
+ print("mismatch!")
+ assert len(act_orchs) == len(exp_orchs)
+ for (exp, act) in zip(exp_orchs, act_orchs):
+ for line in unified_diff(format_orch(exp, 0), format_orch(act, 0), lineterm = ""):
+ print(" " + line)
+ maybe_install = True
+ orchs = act_orchs
+ else:
+ with decode_response(urlopen(build_show_p_request(show_id))) as resp:
+ resp_data =
+ document = parse_html(resp_data)
+ modal = get_orchestration_modal(document)
+ if modal:
+ lines = lineify_orchestration_modal(modal)
+ orchs = list(parse_orchestrations(lines))
+ for orch in orchs:
+ print_orch(orch, indent = 1)
+ maybe_install = True
+ else:
+ print("n/a")
+ sleep(3)
+ if maybe_install:
+ done = False
+ while True:
+ if (response := input("(y/full): ")) == "y":
+ with"wb") as f:
+ pickle.dump(modal, f)
+ orchs = [
+ dataclasses.replace(
+ orch,
+ parts = [
+ dataclasses.replace(
+ part,
+ required_instruments = list(map(ser_instr, part.required_instruments)),
+ optional_instruments = list(map(ser_instr, part.optional_instruments)),
+ )
+ for part in
+ ]
+ )
+ for orch in orchs
+ ]
+ with"wb") as f:
+ pickle.dump(orchs, f)
+ break
+ elif response == "full":
+ for orch in orchs:
+ print_orch(orch, indent = 0)
+ else:
+ done = True
+ break
+ if done:
+ break
+def great_circle_miles(a: Location, b: Location) -> float:
+ from math import asin, cos, radians, sin, sqrt
+ # haversine formula
+ r1 = radians(a.latitude)
+ r2 = radians(b.latitude)
+ l1 = radians(a.longitude)
+ l2 = radians(b.longitude)
+ return 2 * 3957 * asin(sqrt(
+ sin((r2 - r1) / 2) ** 2
+ + cos(r1) * cos(r2) * sin((l2 - l1) / 2) ** 2
+ ))
+def do_mti(skip: int):
+ from urllib.request import urlopen
+ import json
+ from pithub.common import decode_response
+ from pithub.html import parse_html
+ from pithub.mti import (
+ build_maprefresh_request, build_materials_request, get_all_shows, get_orchestrations,
+ get_productions, parse_maprefresh
+ )
+ test_db_path = Path("mti_test_db")
+ test_db_path.mkdir(exist_ok = True)
+ with decode_response(urlopen(build_maprefresh_request())) as resp:
+ data =
+ root = parse_html(data)
+ def full_show_ids():
+ def extract_id(path):
+ return int("_")[0])
+ #yield from map(extract_id, test_db_path.iterdir())
+ yield from get_all_shows(root)
+ def get_show_name(materials_json):
+ [cmd_obj] = [obj for obj in materials_json if obj["command"] == "insert" and obj["method"] == "html"]
+ if cmd_obj["data"]:
+ tree = parse_html("<root>" + cmd_obj["data"] + "</root>")
+ return tree[0].text.removeprefix("Materials for ")
+ else:
+ return None
+ olds_warehouse_loc = Location(43.0779, -89.37308)
+ for (i, show_id) in islice(enumerate(dedup_stream(full_show_ids())), skip, None):
+ print(f"{i} ({show_id})")
+ req = build_maprefresh_request(show_id)
+ print(f" {req.full_url}")
+ with decode_response(urlopen(req)) as resp:
+ data =
+ sleep(3)
+ doc = parse_html(data)
+ maprefresh = parse_maprefresh(doc, show_id)
+ print(f" {len(maprefresh.map_view_productions)}/{len(maprefresh.table_productions)}")
+ #productions = get_productions(maprefresh)
+ agg = parse_maprefresh(doc, show_id)
+ for page_no in range(1, maprefresh.page_count):
+ req = build_maprefresh_request(show_id, page_no)
+ print(f" {req.full_url}")
+ with decode_response(urlopen(req)) as resp:
+ data =
+ sleep(3)
+ maprefresh = parse_maprefresh(parse_html(data), show_id)
+ print(f" p1: {len(maprefresh.map_view_productions)}/{len(maprefresh.table_productions)}")
+ assert maprefresh.page_num == page_no
+ agg.map_view_productions.extend(maprefresh.map_view_productions)
+ agg.table_productions.extend(maprefresh.table_productions)
+ #productions.extend(get_productions(maprefresh))
+ productions = get_productions(agg)
+ if not any(great_circle_miles(p.location, olds_warehouse_loc) < 15 for p in productions):
+ print(" geo skipping")
+ continue
+ json_p = test_db_path.joinpath(f"{show_id}_res.json")
+ orchs_p = test_db_path.joinpath(f"{show_id}_orchs.pickle")
+ if orchs_p.exists():
+ with"r") as f:
+ materials_json = json.load(f)
+ if (show_name := get_show_name(materials_json)):
+ print(f"{i} {show_name} ({show_id})")
+ with"rb") as f:
+ exp_orchs = pickle.load(f)
+ for orch in exp_orchs:
+ for p in
+ deser_instr_list(p.required_instruments)
+ deser_instr_list(p.optional_instruments)
+ act_orchs = get_orchestrations(materials_json)
+ if act_orchs == exp_orchs:
+ print("pass")
+ maybe_install = False
+ else:
+ print("mismatch!")
+ assert len(act_orchs) == len(exp_orchs)
+ for (exp, act) in zip(exp_orchs, act_orchs):
+ for line in unified_diff(format_orch(exp, 0), format_orch(act, 0), lineterm = ""):
+ print(" " + line)
+ maybe_install = True
+ orchs = act_orchs
+ else:
+ print(f"{i}: parsing ", end = "", flush = True)
+ with decode_response(urlopen(build_materials_request(show_id))) as resp:
+ materials_json = json.load(resp)
+ if (show_name := get_show_name(materials_json)):
+ print(show_name)
+ orchs = get_orchestrations(materials_json)
+ for orch in orchs:
+ print_orch(orch, indent = 1)
+ maybe_install = True
+ if maybe_install:
+ done = False
+ while True:
+ if (response := input("(y/full): ")) == "y":
+ with"w") as f:
+ json.dump(materials_json, f)
+ orchs = [
+ dataclasses.replace(
+ orch,
+ parts = [
+ dataclasses.replace(
+ part,
+ required_instruments = list(map(ser_instr, part.required_instruments)),
+ optional_instruments = list(map(ser_instr, part.optional_instruments)),
+ )
+ for part in
+ ]
+ )
+ for orch in orchs
+ ]
+ with"wb") as f:
+ pickle.dump(orchs, f)
+ sleep(3)
+ break
+ elif response == "full":
+ for orch in orchs:
+ print_orch(orch, indent = 0)
+ else:
+ done = True
+ break
+ if done:
+ break
+if __name__ == "__main__":
+ if argv[1:] == ["alt"]:
+ from datetime import date, timedelta
+ import json
+ from pithub.mti import (
+ fetch_all_productions, fetch_maprefresh_page, MaprefreshParams, Production,
+ build_materials_request, get_orchestrations
+ )
+ def requester(req):
+ with decode_response(urlopen(req)) as resp:
+ data =
+ sleep(3)
+ return data
+ def is_upcoming(production) -> bool:
+ default_closing = production.opening + timedelta(days = 21)
+ if < production.opening:
+ return True
+ if production.closing:
+ if production.closing - production.opening > timedelta(days = 25):
+ return < default_closing
+ else:
+ return < production.closing
+ else:
+ return < default_closing
+ @dataclass
+ class TaggedProduction:
+ show_id: int
+ production: Production
+ def get_instr_codes(orch) -> Iterator[str]:
+ for part in
+ main_instruments = set(itertools.chain.from_iterable(
+ v.choices if isinstance(v, Choice) else [v]
+ for v in part.required_instruments
+ ))
+ assert all(isinstance(v, Instrument) for v in main_instruments)
+ if Instrument.VIOLIN in main_instruments:
+ yield "vln"
+ if Instrument.VIOLA in main_instruments:
+ yield "vla"
+ if Instrument.TROMBONE in main_instruments:
+ yield "tbn"
+ olds_warehouse_loc = Location(43.0779, -89.37308)
+ show_ids = fetch_maprefresh_page(MaprefreshParams(), requester).show_ids
+ productions = []
+ show_codes = {}
+ for (i, show_id) in islice(enumerate(show_ids), 0, None):
+ print(f"\rLoading show {show_id:<10} {i}/{len(show_ids)}", end = "")
+ new = [
+ TaggedProduction(show_id, p)
+ for p in set(fetch_all_productions(show_id, requester, area = 48))
+ if is_upcoming(p) and great_circle_miles(p.location, olds_warehouse_loc) < 15
+ ]
+ if new:
+ data = requester(build_materials_request(show_id))
+ materials_json = json.loads(data)
+ orchestrations = get_orchestrations(materials_json)
+ codes = set(itertools.chain.from_iterable(
+ map(get_instr_codes, orchestrations)
+ ))
+ if codes:
+ show_codes[show_id] = codes
+ productions.extend(new)
+ print()
+ if productions:
+ show_len = max(len(tp.production.show_title) for tp in productions)
+ org_len = max(len(tp.production.organization_name) for tp in productions)
+ productions.sort(key = lambda tp: tp.production.opening)
+ for tp in productions:
+ p = tp.production
+ dates = f"{p.opening} - {p.closing}" if p.closing else str(p.opening)
+ code_str = " ".join(sorted(show_codes[tp.show_id]))
+ print(f"{dates:<23}: {p.show_title:<{show_len}} @ {p.organization_name:<{org_len}} ({code_str})")
+ else:
+ print("no results")
+ else:
+ if len(argv) == 2:
+ skip = int(argv[1])
+ else:
+ skip = 0
+ do_mti(skip)