From 42525c218d5940ff4b5d1df1694af94bdbe0bd22 Mon Sep 17 00:00:00 2001 From: Jakob Cornell Date: Tue, 25 Jun 2019 00:48:44 -0500 Subject: [PATCH] Finish Allergic Cliffs level 4 and test program --- common.py | 15 +++- puzzles/allergic_cliffs/4-2.py | 53 ----------- puzzles/allergic_cliffs/4.py | 86 ------------------ puzzles/allergic_cliffs/common.py | 72 ++++++++++++++- puzzles/allergic_cliffs/l4.py | 135 +++++++++++++++++++++++++++++ puzzles/allergic_cliffs/test-l4.py | 16 ++++ ui.py | 2 +- 7 files changed, 236 insertions(+), 143 deletions(-) delete mode 100644 puzzles/allergic_cliffs/4-2.py delete mode 100644 puzzles/allergic_cliffs/4.py create mode 100644 puzzles/allergic_cliffs/l4.py create mode 100644 puzzles/allergic_cliffs/test-l4.py diff --git a/common.py b/common.py index f618b16..cc39254 100644 --- a/common.py +++ b/common.py @@ -1,4 +1,6 @@ from enum import Enum +import random +import itertools class Zoombini: class Property(Enum): @@ -39,5 +41,16 @@ def save(troupe, path): pickle.dump(troupe, f) def random_zoombini(): - import random return Zoombini({random.choice(list(t)) for t in Zoombini.PROPERTIES}) + +def game_rand_troupe(): + all_zs = [] + for attrs in map(frozenset, itertools.product(*Zoombini.PROPERTIES)): + all_zs.append(Zoombini(attrs)) + all_zs.append(Zoombini(attrs)) + troupe = set() + for _ in range(16): + up = random.choice(all_zs) + troupe.add(up) + all_zs.remove(up) + return troupe diff --git a/puzzles/allergic_cliffs/4-2.py b/puzzles/allergic_cliffs/4-2.py deleted file mode 100644 index 33fdade..0000000 --- a/puzzles/allergic_cliffs/4-2.py +++ /dev/null @@ -1,53 +0,0 @@ -import sys - -from zoombinis import common, ui -from zoombinis.puzzles.allergic_cliffs.common import * - -import pathlib -import random - -try: - [file_name] = sys.argv[1:] - troupe = common.load(pathlib.Path(file_name)) -except: - print("args: ", file = sys.stderr) - raise - -class Instance: - def __init__(self, zoombinis, io_agent): - self.io_agent = io_agent - self.waiting = set(zoombinis) - self.across = {cliff: set() for cliff in Cliff} - - def next_pair(self): - import itertools - def key(pair): - cliff, zoombini = pair - pool = self.across[cliff] if self.across[cliff] else self.waiting - {zoombini} - sim_score = sum(len(zoombini.attrs & z.attrs) for z in pool) / len(pool) - free_ratio = (8 - len(self.across[cliff])) / (8 - len(self.across[cliff.other()])) - return sim_score * free_ratio ** (1/10) - return max(itertools.product(Cliff, self.waiting), key = key) - - def send(self, zoombini, cliff): - send_fmt = "Send {} across the {} cliff." - self.io_agent.print(send_fmt.format(zoombini, cliff.name.lower())) - ans = self.io_agent.choose("Did it work?", ["y", "n"]) - if ans == 'n': - cliff = cliff.other() - self.io_agent.print(send_fmt.format(zoombini, cliff.name.lower())) - self.io_agent.wait() - self.across[cliff].add(zoombini) - self.waiting.remove(zoombini) - - def run(self): - first = max(self.waiting, key = lambda z: sum(len(z.attrs & oz.attrs) for oz in self.waiting) / len(self.waiting)) - self.send(first, random.choice(list(Cliff))) - while not any(len(s) == 8 for s in self.across.values()): - (c, z) = self.next_pair() - self.send(z, c) - [full] = [c for c in Cliff if len(self.across[c]) == 8] - for z in list(self.waiting): - self.send(z, full.other()) - -Instance(troupe, ui.Agent(sys.stdin, sys.stderr)).run() diff --git a/puzzles/allergic_cliffs/4.py b/puzzles/allergic_cliffs/4.py deleted file mode 100644 index 3da700d..0000000 --- a/puzzles/allergic_cliffs/4.py +++ /dev/null @@ -1,86 +0,0 @@ -import sys - -from zoombinis import common, ui -from zoombinis.puzzles.allergic_cliffs.common import * - -import pathlib -import random - -try: - [file_name] = sys.argv[1:] - troupe = common.load(pathlib.Path(file_name)) -except: - print("args: ", file = sys.stderr) - raise - -class Instance: - def __init__(self, zoombinis, io_agent): - self.io_agent = io_agent - self.waiting = set(zoombinis) - self.across = {cliff: set() for cliff in Cliff} - self.cand_rules = {cliff: set(common.ALL_ATTRS) for cliff in Cliff} - - self.pos_cliff = None - self.pos_cand_rules = None - self.pos_rules = None - - def choose_zoombini(self): - if self.pos_rules is not None: - return next(iter(self.waiting)) - elif self.pos_cliff is None: - key = lambda z: ( - len(z.attrs & self.cand_rules[Cliff.UPPER]) - * len(z.attrs & self.cand_rules[Cliff.LOWER]) - ) - return max(self.waiting, key = key) - else: - return next(iter(self.waiting)) - - def choose_cliff(self, zoombini): - self.check_solve() - if self.pos_rules is None: - def key(cliff): - pool = self.across[cliff] if self.across[cliff] else self.waiting - {zoombini} - # assume that if there's a cliff with no Zoombinis across there are still others waiting to cross - return sum(len(zoombini.attrs & z.attrs) for z in pool) / len(pool) - return max(Cliff, key = key) - else: - return self.pos_cliff if zoombini.attrs & self.pos_rules else self.pos_cliff.other() - - def check_solve(self): - if self.pos_cliff is None: - drained = [cliff for cliff in Cliff if len(self.cand_rules[cliff]) < 3] - if drained: - [neg_cliff] = drained - self.pos_cliff = neg_cliff.other() - self.pos_cand_rules = self.cand_rules[self.pos_cliff] - self.check_solve() - elif len(self.pos_cand_rules) == 3: - self.pos_rules = frozenset(self.pos_cand_rules) - - def send(self, zoombini, cliff): - send_fmt = "Send {} across the {} cliff." - self.io_agent.print(send_fmt.format(zoombini, cliff.name.lower())) - ans = self.io_agent.choose("Did it work?", ["y", "n"]) - if ans == 'n': - cliff = cliff.other() - self.io_agent.print(send_fmt.format(zoombini, cliff.name.lower())) - self.io_agent.wait() - self.across[cliff].add(zoombini) - if self.pos_cliff is None: - for attr in zoombini.attrs: - self.cand_rules[cliff.other()].discard(attr) - elif self.pos_rules is None and cliff == self.pos_cliff.other(): - for attr in zoombini.attrs: - self.pos_cand_rules.discard(attr) - - def run(self): - while self.waiting: - z = self.choose_zoombini() - c = self.choose_cliff(z) - self.send(z, c) - self.waiting.remove(z) - print(self.cand_rules) - print(self.pos_cliff) - -Instance(troupe, ui.Agent(sys.stdin, sys.stderr)).run() diff --git a/puzzles/allergic_cliffs/common.py b/puzzles/allergic_cliffs/common.py index 4ae751b..200983d 100644 --- a/puzzles/allergic_cliffs/common.py +++ b/puzzles/allergic_cliffs/common.py @@ -1,8 +1,76 @@ -import zoombinis.common as comm +from zoombinis import common +import zoombinis.ui from enum import Enum -Zoombini = comm.Zoombini # for pickle issue workaround +Zoombini = common.Zoombini # for pickle issue workaround Cliff = Enum('Cliff', ['UPPER', 'LOWER']) Cliff.other = lambda self: {Cliff.UPPER: Cliff.LOWER, Cliff.LOWER: Cliff.UPPER}[self] + +def assign(zoombini, rule): + (cliff, attrs) = rule + return cliff if zoombini.attrs & attrs else cliff.other() + +class Partition(dict): + @staticmethod + def from_(troupe, rule): + part = Partition() + count = {cliff: 0 for cliff in Cliff} + for z in troupe: + cliff = assign(z, rule) + count[cliff] += 1 + if count[cliff] > 8: + return None + part[z] = cliff + return part + + def __hash__(self): + return hash(frozenset(self.items())) + +class CliGameIface: + def __init__(self, cli_agent): + self.agent = cli_agent + + def send(self, zoombini, cliff): + self.agent.print("Send {} across the {} bridge.".format(zoombini, cliff.name.lower())) + ans = self.agent.choose("Did it work?", ["y", "n"]) + return ans == "y" + + def resend(self, cliff): + self.agent.print("Resend the Zoombini across the {} bridge.".format(cliff.name.lower())) + self.agent.wait() + +class MockIface: + @staticmethod + def random_rule(zoombinis): + import random, itertools + + def assign(zoombini, rule): + (cliff, attrs) = rule + return cliff if zoombini.attrs & attrs else cliff.other() + def check_split(rule): + cts = {cliff: 0 for cliff in Cliff} + for z in zoombinis: + cts[assign(z, rule)] += 1 + return all(v == 8 for v in cts.values()) + + cands = frozenset( + rule + for rule in itertools.product( + Cliff, + map(frozenset, itertools.combinations(common.ALL_ATTRS, 3)) + ) + if check_split(rule) + ) + return random.choice(list(cands)) + + def __init__(self, rule): + (self.cliff, self.attrs) = rule + + def send(self, zoombini, cliff): + actual = self.cliff if zoombini.attrs & self.attrs else self.cliff.other() + return cliff == actual + + def resend(self, cliff): + pass diff --git a/puzzles/allergic_cliffs/l4.py b/puzzles/allergic_cliffs/l4.py new file mode 100644 index 0000000..798a7b9 --- /dev/null +++ b/puzzles/allergic_cliffs/l4.py @@ -0,0 +1,135 @@ +import sys +import itertools +import pathlib +import random + +from zoombinis import common, ui +from zoombinis.puzzles.allergic_cliffs.common import * + +''' +This algorithm precomputes all possible allergy rules that result in an 8-8 split between the cliffs (which the game seems to guarantee). +These rules are then pared down as Zoombinis are assigned to their cliffs. +The proportion of remaining rules that place a Zoombini on a certain cliff is used as a probability that it belongs on that cliff. + +Until the allergy rule is determined, the algorithm uses two strategies to choose the order of Zoombinis: + - the Zoombini with the highest expected reduction in feasible rules is sent first (when there are 5 or 6 bridge pegs left) + - the Zoombini with the highest probability of belonging on a cliff is sent first (4 or fewer pegs) +Each Zoombini is sent to the cliff with the higher computed probability. + +The exception is if one cliff is saturated (has 8 Zoombinis across already). +In this event, the remaining Zoombinis are sent in arbitrary order to the other cliff. + +According to automated simulation with randomized input, the success rate for this algorithm is about 0.9996. +''' + +class Instance: + def __init__(self, zoombinis, iface): + self.iface = iface + self.waiting = set(zoombinis) + self.across = {cliff: set() for cliff in Cliff} + self.strikes = 0 + self.rule = None + self.full_cliff = None + + def check_split(rule): + cts = {cliff: 0 for cliff in Cliff} + for z in zoombinis: + cts[Instance.assign(z, rule)] += 1 + return all(v == 8 for v in cts.values()) + + self.cand_rules = frozenset( + rule + for rule in itertools.product( + Cliff, + map(frozenset, itertools.combinations(common.ALL_ATTRS, 3)) + ) + if check_split(rule) + ) + + @staticmethod + def assign(zoombini, rule): + (cliff, attrs) = rule + return cliff if zoombini.attrs & attrs else cliff.other() + + def filtered_rules(self, rules, across): + def check(rule): + (cliff, attrs) = rule + return ( + all(z.attrs & attrs for z in across[cliff]) + and not any(z.attrs & attrs for z in across[cliff.other()]) + ) + return {r for r in rules if check(r)} + + def update_state(self): + if self.rule is None: + self.cand_rules = self.filtered_rules(self.cand_rules, self.across) + if len(self.cand_rules) == 1: + [self.rule] = self.cand_rules + if self.full_cliff is None: + full = [c for c in Cliff if len(self.across[c]) == 8] + if full: + [self.full_cliff] = full + + def cliff_scores(self, z): + cts = {cliff: 0 for cliff in Cliff} + for rule in self.cand_rules: + cts[Instance.assign(z, rule)] += 1 + return cts + + def next_pair(self): + if self.rule is not None: + (cliff, attrs) = self.rule + z = self.waiting.pop() + c = cliff if z.attrs & attrs else cliff.other() + return (c, z) + elif self.full_cliff is not None: + z = self.waiting.pop() + c = self.full_cliff.other() + return (c, z) + elif self.strikes < 2: + def key(z): + # score is expected number of candidate rules after processing Zoombini + cliff_scores = self.cliff_scores(z) + def get_across(cliff): + import copy + across = copy.deepcopy(self.across) + across[cliff].add(z) + return across + return sum(len(self.filtered_rules(self.cand_rules, get_across(c))) * cliff_scores[c] for c in Cliff) + z = min(self.waiting, key = key) + cliff_scores = self.cliff_scores(z) + c = max(Cliff, key = lambda c: cliff_scores[c]) + self.waiting.remove(z) + return (c, z) + else: + scores = {z: self.cliff_scores(z) for z in self.waiting} + def key(pair): + (c, z) = pair + return scores[z][c] + (c, z) = max(itertools.product(Cliff, self.waiting), key = key) + self.waiting.remove(z) + return (c, z) + + def send(self, zoombini, cliff): + accept = self.iface.send(zoombini, cliff) + if not accept: + self.strikes += 1 + cliff = cliff.other() + self.iface.resend(cliff) + self.across[cliff].add(zoombini) + + def run(self): + while self.waiting and self.strikes < 6: + (c, z) = self.next_pair() + self.send(z, c) + self.update_state() + return self.strikes < 6 + +if __name__ == '__main__': + try: + [file_name] = sys.argv[1:] + troupe = common.load(pathlib.Path(file_name)) + except: + print("args: ", file = sys.stderr) + + Instance(troupe, CliGameIface(ui.CliAgent(sys.stdin, sys.stderr))).run() diff --git a/puzzles/allergic_cliffs/test-l4.py b/puzzles/allergic_cliffs/test-l4.py new file mode 100644 index 0000000..baa06a7 --- /dev/null +++ b/puzzles/allergic_cliffs/test-l4.py @@ -0,0 +1,16 @@ +from zoombinis import common +from zoombinis.puzzles.allergic_cliffs.common import * +from zoombinis.puzzles.allergic_cliffs.l4 import * + +def run(): + troupe = common.game_rand_troupe() + iface = MockIface(MockIface.random_rule(troupe)) + return Instance(troupe, iface).run() + +runs = 0 +wins = 0 +while True: + won = run() + runs += 1 + wins += won + print("Won {:4}/{:4} ({})".format(wins, runs, wins/runs)) diff --git a/ui.py b/ui.py index 1e6ee59..afefa3a 100644 --- a/ui.py +++ b/ui.py @@ -1,4 +1,4 @@ -class Agent: +class CliAgent: def __init__(self, in_stream, out_stream): self.in_stream = in_stream self.out_stream = out_stream -- 2.30.2