Finish Allergic Cliffs level 4 and test program
authorJakob Cornell <jakob@jcornell.net>
Tue, 25 Jun 2019 05:48:44 +0000 (00:48 -0500)
committerJakob Cornell <jakob@jcornell.net>
Tue, 25 Jun 2019 05:48:44 +0000 (00:48 -0500)
common.py
puzzles/allergic_cliffs/4-2.py [deleted file]
puzzles/allergic_cliffs/4.py [deleted file]
puzzles/allergic_cliffs/common.py
puzzles/allergic_cliffs/l4.py [new file with mode: 0644]
puzzles/allergic_cliffs/test-l4.py [new file with mode: 0644]
ui.py

index f618b16ffb7183fc1d0544e939311059b5999798..cc39254223e6a5c14bc101daa13a2ff6e3c1ef00 100644 (file)
--- a/common.py
+++ b/common.py
@@ -1,4 +1,6 @@
 from enum import Enum
+import random
+import itertools
 
 class Zoombini:
        class Property(Enum):
@@ -39,5 +41,16 @@ def save(troupe, path):
                pickle.dump(troupe, f)
 
 def random_zoombini():
-       import random
        return Zoombini({random.choice(list(t)) for t in Zoombini.PROPERTIES})
+
+def game_rand_troupe():
+       all_zs = []
+       for attrs in map(frozenset, itertools.product(*Zoombini.PROPERTIES)):
+               all_zs.append(Zoombini(attrs))
+               all_zs.append(Zoombini(attrs))
+       troupe = set()
+       for _ in range(16):
+               up = random.choice(all_zs)
+               troupe.add(up)
+               all_zs.remove(up)
+       return troupe
diff --git a/puzzles/allergic_cliffs/4-2.py b/puzzles/allergic_cliffs/4-2.py
deleted file mode 100644 (file)
index 33fdade..0000000
+++ /dev/null
@@ -1,53 +0,0 @@
-import sys
-
-from zoombinis import common, ui
-from zoombinis.puzzles.allergic_cliffs.common import *
-
-import pathlib
-import random
-
-try:
-       [file_name] = sys.argv[1:]
-       troupe = common.load(pathlib.Path(file_name))
-except:
-       print("args: <troupe-file>", file = sys.stderr)
-       raise
-
-class Instance:
-       def __init__(self, zoombinis, io_agent):
-               self.io_agent = io_agent
-               self.waiting = set(zoombinis)
-               self.across = {cliff: set() for cliff in Cliff}
-
-       def next_pair(self):
-               import itertools
-               def key(pair):
-                       cliff, zoombini = pair
-                       pool = self.across[cliff] if self.across[cliff] else self.waiting - {zoombini}
-                       sim_score = sum(len(zoombini.attrs & z.attrs) for z in pool) / len(pool)
-                       free_ratio = (8 - len(self.across[cliff])) / (8 - len(self.across[cliff.other()]))
-                       return sim_score * free_ratio ** (1/10)
-               return max(itertools.product(Cliff, self.waiting), key = key)
-
-       def send(self, zoombini, cliff):
-               send_fmt = "Send {} across the {} cliff."
-               self.io_agent.print(send_fmt.format(zoombini, cliff.name.lower()))
-               ans = self.io_agent.choose("Did it work?", ["y", "n"])
-               if ans == 'n':
-                       cliff = cliff.other()
-                       self.io_agent.print(send_fmt.format(zoombini, cliff.name.lower()))
-                       self.io_agent.wait()
-               self.across[cliff].add(zoombini)
-               self.waiting.remove(zoombini)
-
-       def run(self):
-               first = max(self.waiting, key = lambda z: sum(len(z.attrs & oz.attrs) for oz in self.waiting) / len(self.waiting))
-               self.send(first, random.choice(list(Cliff)))
-               while not any(len(s) == 8 for s in self.across.values()):
-                       (c, z) = self.next_pair()
-                       self.send(z, c)
-               [full] = [c for c in Cliff if len(self.across[c]) == 8]
-               for z in list(self.waiting):
-                       self.send(z, full.other())
-
-Instance(troupe, ui.Agent(sys.stdin, sys.stderr)).run()
diff --git a/puzzles/allergic_cliffs/4.py b/puzzles/allergic_cliffs/4.py
deleted file mode 100644 (file)
index 3da700d..0000000
+++ /dev/null
@@ -1,86 +0,0 @@
-import sys
-
-from zoombinis import common, ui
-from zoombinis.puzzles.allergic_cliffs.common import *
-
-import pathlib
-import random
-
-try:
-       [file_name] = sys.argv[1:]
-       troupe = common.load(pathlib.Path(file_name))
-except:
-       print("args: <troupe-file>", file = sys.stderr)
-       raise
-
-class Instance:
-       def __init__(self, zoombinis, io_agent):
-               self.io_agent = io_agent
-               self.waiting = set(zoombinis)
-               self.across = {cliff: set() for cliff in Cliff}
-               self.cand_rules = {cliff: set(common.ALL_ATTRS) for cliff in Cliff}
-
-               self.pos_cliff = None
-               self.pos_cand_rules = None
-               self.pos_rules = None
-
-       def choose_zoombini(self):
-               if self.pos_rules is not None:
-                       return next(iter(self.waiting))
-               elif self.pos_cliff is None:
-                       key = lambda z: (
-                               len(z.attrs & self.cand_rules[Cliff.UPPER])
-                               * len(z.attrs & self.cand_rules[Cliff.LOWER])
-                       )
-                       return max(self.waiting, key = key)
-               else:
-                       return next(iter(self.waiting))
-
-       def choose_cliff(self, zoombini):
-               self.check_solve()
-               if self.pos_rules is None:
-                       def key(cliff):
-                               pool = self.across[cliff] if self.across[cliff] else self.waiting - {zoombini}
-                               # assume that if there's a cliff with no Zoombinis across there are still others waiting to cross
-                               return sum(len(zoombini.attrs & z.attrs) for z in pool) / len(pool)
-                       return max(Cliff, key = key)
-               else:
-                       return self.pos_cliff if zoombini.attrs & self.pos_rules else self.pos_cliff.other()
-
-       def check_solve(self):
-               if self.pos_cliff is None:
-                       drained = [cliff for cliff in Cliff if len(self.cand_rules[cliff]) < 3]
-                       if drained:
-                               [neg_cliff] = drained
-                               self.pos_cliff = neg_cliff.other()
-                               self.pos_cand_rules = self.cand_rules[self.pos_cliff]
-                               self.check_solve()
-               elif len(self.pos_cand_rules) == 3:
-                       self.pos_rules = frozenset(self.pos_cand_rules)
-
-       def send(self, zoombini, cliff):
-               send_fmt = "Send {} across the {} cliff."
-               self.io_agent.print(send_fmt.format(zoombini, cliff.name.lower()))
-               ans = self.io_agent.choose("Did it work?", ["y", "n"])
-               if ans == 'n':
-                       cliff = cliff.other()
-                       self.io_agent.print(send_fmt.format(zoombini, cliff.name.lower()))
-                       self.io_agent.wait()
-               self.across[cliff].add(zoombini)
-               if self.pos_cliff is None:
-                       for attr in zoombini.attrs:
-                               self.cand_rules[cliff.other()].discard(attr)
-               elif self.pos_rules is None and cliff == self.pos_cliff.other():
-                       for attr in zoombini.attrs:
-                               self.pos_cand_rules.discard(attr)
-
-       def run(self):
-               while self.waiting:
-                       z = self.choose_zoombini()
-                       c = self.choose_cliff(z)
-                       self.send(z, c)
-                       self.waiting.remove(z)
-                       print(self.cand_rules)
-                       print(self.pos_cliff)
-
-Instance(troupe, ui.Agent(sys.stdin, sys.stderr)).run()
index 4ae751b3f60b9c9088a70ea52fb1b95e1e5535b8..200983dd3dd21693812e14ec9d62a5ca6b647132 100644 (file)
@@ -1,8 +1,76 @@
-import zoombinis.common as comm
+from zoombinis import common
+import zoombinis.ui
 
 from enum import Enum
 
-Zoombini = comm.Zoombini # for pickle issue workaround
+Zoombini = common.Zoombini # for pickle issue workaround
 
 Cliff = Enum('Cliff', ['UPPER', 'LOWER'])
 Cliff.other = lambda self: {Cliff.UPPER: Cliff.LOWER, Cliff.LOWER: Cliff.UPPER}[self]
+
+def assign(zoombini, rule):
+       (cliff, attrs) = rule
+       return cliff if zoombini.attrs & attrs else cliff.other()
+
+class Partition(dict):
+       @staticmethod
+       def from_(troupe, rule):
+               part = Partition()
+               count = {cliff: 0 for cliff in Cliff}
+               for z in troupe:
+                       cliff = assign(z, rule)
+                       count[cliff] += 1
+                       if count[cliff] > 8:
+                               return None
+                       part[z] = cliff
+               return part
+
+       def __hash__(self):
+               return hash(frozenset(self.items()))
+
+class CliGameIface:
+       def __init__(self, cli_agent):
+               self.agent = cli_agent
+
+       def send(self, zoombini, cliff):
+               self.agent.print("Send {} across the {} bridge.".format(zoombini, cliff.name.lower()))
+               ans = self.agent.choose("Did it work?", ["y", "n"])
+               return ans == "y"
+
+       def resend(self, cliff):
+               self.agent.print("Resend the Zoombini across the {} bridge.".format(cliff.name.lower()))
+               self.agent.wait()
+
+class MockIface:
+       @staticmethod
+       def random_rule(zoombinis):
+               import random, itertools
+
+               def assign(zoombini, rule):
+                       (cliff, attrs) = rule
+                       return cliff if zoombini.attrs & attrs else cliff.other()
+               def check_split(rule):
+                       cts = {cliff: 0 for cliff in Cliff}
+                       for z in zoombinis:
+                               cts[assign(z, rule)] += 1
+                       return all(v == 8 for v in cts.values())
+
+               cands = frozenset(
+                       rule
+                       for rule in itertools.product(
+                               Cliff,
+                               map(frozenset, itertools.combinations(common.ALL_ATTRS, 3))
+                       )
+                       if check_split(rule)
+               )
+               return random.choice(list(cands))
+
+       def __init__(self, rule):
+               (self.cliff, self.attrs) = rule
+
+       def send(self, zoombini, cliff):
+               actual = self.cliff if zoombini.attrs & self.attrs else self.cliff.other()
+               return cliff == actual
+
+       def resend(self, cliff):
+               pass
diff --git a/puzzles/allergic_cliffs/l4.py b/puzzles/allergic_cliffs/l4.py
new file mode 100644 (file)
index 0000000..798a7b9
--- /dev/null
@@ -0,0 +1,135 @@
+import sys
+import itertools
+import pathlib
+import random
+
+from zoombinis import common, ui
+from zoombinis.puzzles.allergic_cliffs.common import *
+
+'''
+This algorithm precomputes all possible allergy rules that result in an 8-8 split between the cliffs (which the game seems to guarantee).
+These rules are then pared down as Zoombinis are assigned to their cliffs.
+The proportion of remaining rules that place a Zoombini on a certain cliff is used as a probability that it belongs on that cliff.
+
+Until the allergy rule is determined, the algorithm uses two strategies to choose the order of Zoombinis:
+       - the Zoombini with the highest expected reduction in feasible rules is sent first (when there are 5 or 6 bridge pegs left)
+       - the Zoombini with the highest probability of belonging on a cliff is sent first (4 or fewer pegs)
+Each Zoombini is sent to the cliff with the higher computed probability.
+
+The exception is if one cliff is saturated (has 8 Zoombinis across already).
+In this event, the remaining Zoombinis are sent in arbitrary order to the other cliff.
+
+According to automated simulation with randomized input, the success rate for this algorithm is about 0.9996.
+'''
+
+class Instance:
+       def __init__(self, zoombinis, iface):
+               self.iface = iface
+               self.waiting = set(zoombinis)
+               self.across = {cliff: set() for cliff in Cliff}
+               self.strikes = 0
+               self.rule = None
+               self.full_cliff = None
+
+               def check_split(rule):
+                       cts = {cliff: 0 for cliff in Cliff}
+                       for z in zoombinis:
+                               cts[Instance.assign(z, rule)] += 1
+                       return all(v == 8 for v in cts.values())
+
+               self.cand_rules = frozenset(
+                       rule
+                       for rule in itertools.product(
+                               Cliff,
+                               map(frozenset, itertools.combinations(common.ALL_ATTRS, 3))
+                       )
+                       if check_split(rule)
+               )
+
+       @staticmethod
+       def assign(zoombini, rule):
+               (cliff, attrs) = rule
+               return cliff if zoombini.attrs & attrs else cliff.other()
+
+       def filtered_rules(self, rules, across):
+               def check(rule):
+                       (cliff, attrs) = rule
+                       return (
+                               all(z.attrs & attrs for z in across[cliff])
+                               and not any(z.attrs & attrs for z in across[cliff.other()])
+                       )
+               return {r for r in rules if check(r)}
+
+       def update_state(self):
+               if self.rule is None:
+                       self.cand_rules = self.filtered_rules(self.cand_rules, self.across)
+                       if len(self.cand_rules) == 1:
+                               [self.rule] = self.cand_rules
+               if self.full_cliff is None:
+                       full = [c for c in Cliff if len(self.across[c]) == 8]
+                       if full:
+                               [self.full_cliff] = full
+
+       def cliff_scores(self, z):
+               cts = {cliff: 0 for cliff in Cliff}
+               for rule in self.cand_rules:
+                       cts[Instance.assign(z, rule)] += 1
+               return cts
+
+       def next_pair(self):
+               if self.rule is not None:
+                       (cliff, attrs) = self.rule
+                       z = self.waiting.pop()
+                       c = cliff if z.attrs & attrs else cliff.other()
+                       return (c, z)
+               elif self.full_cliff is not None:
+                       z = self.waiting.pop()
+                       c = self.full_cliff.other()
+                       return (c, z)
+               elif self.strikes < 2:
+                       def key(z):
+                               # score is expected number of candidate rules after processing Zoombini
+                               cliff_scores = self.cliff_scores(z)
+                               def get_across(cliff):
+                                       import copy
+                                       across = copy.deepcopy(self.across)
+                                       across[cliff].add(z)
+                                       return across
+                               return sum(len(self.filtered_rules(self.cand_rules, get_across(c))) * cliff_scores[c] for c in Cliff)
+                       z = min(self.waiting, key = key)
+                       cliff_scores = self.cliff_scores(z)
+                       c = max(Cliff, key = lambda c: cliff_scores[c])
+                       self.waiting.remove(z)
+                       return (c, z)
+               else:
+                       scores = {z: self.cliff_scores(z) for z in self.waiting}
+                       def key(pair):
+                               (c, z) = pair
+                               return scores[z][c]
+                       (c, z) = max(itertools.product(Cliff, self.waiting), key = key)
+                       self.waiting.remove(z)
+                       return (c, z)
+
+       def send(self, zoombini, cliff):
+               accept = self.iface.send(zoombini, cliff)
+               if not accept:
+                       self.strikes += 1
+                       cliff = cliff.other()
+                       self.iface.resend(cliff)
+               self.across[cliff].add(zoombini)
+
+       def run(self):
+               while self.waiting and self.strikes < 6:
+                       (c, z) = self.next_pair()
+                       self.send(z, c)
+                       self.update_state()
+               return self.strikes < 6
+
+if __name__ == '__main__':
+       try:
+               [file_name] = sys.argv[1:]
+               troupe = common.load(pathlib.Path(file_name))
+       except:
+               print("args: <troupe-file>", file = sys.stderr)
+
+       Instance(troupe, CliGameIface(ui.CliAgent(sys.stdin, sys.stderr))).run()
diff --git a/puzzles/allergic_cliffs/test-l4.py b/puzzles/allergic_cliffs/test-l4.py
new file mode 100644 (file)
index 0000000..baa06a7
--- /dev/null
@@ -0,0 +1,16 @@
+from zoombinis import common
+from zoombinis.puzzles.allergic_cliffs.common import *
+from zoombinis.puzzles.allergic_cliffs.l4 import *
+
+def run():
+       troupe = common.game_rand_troupe()
+       iface = MockIface(MockIface.random_rule(troupe))
+       return Instance(troupe, iface).run()
+
+runs = 0
+wins = 0
+while True:
+       won = run()
+       runs += 1
+       wins += won
+       print("Won {:4}/{:4} ({})".format(wins, runs, wins/runs))
diff --git a/ui.py b/ui.py
index 1e6ee59b899573c719a79fabb05227b278a5624a..afefa3aa3104e553e92b27806b0ee778791d7d61 100644 (file)
--- a/ui.py
+++ b/ui.py
@@ -1,4 +1,4 @@
-class Agent:
+class CliAgent:
        def __init__(self, in_stream, out_stream):
                self.in_stream = in_stream
                self.out_stream = out_stream