465 lines
14 KiB
Python
465 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import itertools
|
|
import functools
|
|
import json
|
|
import os.path
|
|
import sys
|
|
import random
|
|
from collections import defaultdict
|
|
|
|
import api
|
|
import graph
|
|
|
|
|
|
class ExploreError(Exception):
|
|
pass
|
|
|
|
|
|
@functools.total_ordering
|
|
class Path:
|
|
def __init__(self, path=""):
|
|
if isinstance(path, list):
|
|
path = "".join(str(p) for p in path)
|
|
self.path = path
|
|
|
|
def __repr__(self):
|
|
return "." + self.path if self.path else "."
|
|
|
|
def __bool__(self):
|
|
return bool(self.path)
|
|
|
|
def __eq__(self, other):
|
|
return self.path == other.path
|
|
|
|
def __lt__(self, other):
|
|
return (len(self.path), self.path) < (len(other.path), other.path)
|
|
|
|
def __hash__(self):
|
|
return self.path.__hash__()
|
|
|
|
def __format__(self, spec):
|
|
return str(self).__format__(spec)
|
|
|
|
def __add__(self, other):
|
|
return Path(self.path + other.path)
|
|
|
|
def __len__(self):
|
|
return len(self.path)
|
|
|
|
def __getitem__(self, idx):
|
|
return Path(self.path.__getitem__(idx))
|
|
|
|
def last(self):
|
|
return Path(self.path[:-1]), int(self.path[-1])
|
|
|
|
def shorten(self, path, pmerge):
|
|
if self.path.startswith(pmerge.path):
|
|
return Path(path.path + self.path[len(pmerge.path) :])
|
|
else:
|
|
return self
|
|
|
|
|
|
DOORS = [Path(str(i)) for i in range(6)]
|
|
|
|
|
|
class Explore:
|
|
def __init__(self, problem, probes):
|
|
self.problem = problem
|
|
self.probes = probes
|
|
self.rooms = {}
|
|
self.room_ids = defaultdict(lambda: set())
|
|
self.neighbors = {}
|
|
|
|
self.unification_id = {}
|
|
|
|
self.score = 0
|
|
|
|
def _path(self, path):
|
|
while True:
|
|
repeat = False
|
|
for i in range(1, len(path) + 1):
|
|
if path[:i] in self.unification_id:
|
|
path = self.unification_id[path[:i]] + path[i:]
|
|
repeat = True
|
|
break
|
|
if not repeat:
|
|
break
|
|
|
|
return path
|
|
|
|
def explore(self, paths, probes=None, mark=Path()):
|
|
probes = probes or self.probes
|
|
num_marks = mark.path.count("[")
|
|
|
|
expaths = []
|
|
for path in paths:
|
|
path = self._path(path)
|
|
expaths.extend(mark + path + probe for probe in probes)
|
|
|
|
print("explore", paths)
|
|
response = api.explore([p.path for p in expaths])
|
|
results = response["results"]
|
|
self.score = response["queryCount"]
|
|
|
|
for path, presults in zip(paths, itertools.batched(results, len(probes))):
|
|
prefix_len = len(mark) - 2 * num_marks + len(path)
|
|
print("id", path, results)
|
|
self.update(path, [res[prefix_len:] for res in presults])
|
|
|
|
def _add_room(self, path, results):
|
|
label = results[0][0]
|
|
assert all(result[0] == label for result in results)
|
|
|
|
probe_ids = [r for res in (rs[1:] for rs in results) for r in res]
|
|
room_id = str(label) + "".join(str(p) for p in probe_ids)
|
|
# print("add room", path, results, room_id)
|
|
|
|
if path in self.rooms:
|
|
rid = self.rooms[path]
|
|
if rid != room_id:
|
|
raise ExploreError(f"expected match room at {path}: {rid} != {room_id}")
|
|
else:
|
|
self.rooms[path] = room_id
|
|
self.room_ids[room_id].add(path)
|
|
self.neighbors[path] = [(path + d, None) for d in DOORS]
|
|
return room_id
|
|
|
|
def update(self, path, results):
|
|
"""
|
|
path: path to update
|
|
probe: prob result from path.
|
|
"""
|
|
path = self._path(path)
|
|
print("update", path, results)
|
|
room_id = self._add_room(path, results)
|
|
|
|
# update penultimate room
|
|
if path:
|
|
pl, d = path.last()
|
|
pl = self._path(pl)
|
|
if pl in self.rooms:
|
|
# print("penult", path, pl, d, pl in self.rooms)
|
|
# self.dump()
|
|
|
|
p, rid = self.neighbors[pl][d]
|
|
assert rid is None or rid == room_id, f"penultimate {path} {pl}: {rid} != {room_id}"
|
|
self.neighbors[pl][d] = (p, room_id)
|
|
|
|
# self.dump()
|
|
|
|
def update_path(self, path, door, result0, result1):
|
|
path = self._path(path)
|
|
|
|
room0 = path
|
|
room1 = path + Path([door])
|
|
room_id0 = self._add_room(room0, [result0[:1], result0]) # noqa
|
|
room_id1 = self._add_room(room1, [result1[1:2], result1[1:]])
|
|
|
|
p, rid = self.neighbors[room0][door]
|
|
assert rid is None or rid == room_id1
|
|
self.neighbors[room0][door] = (p, room_id1)
|
|
|
|
def dump(self):
|
|
print(f"{self.problem} rooms: {len(self.rooms)}")
|
|
for r, rid in self.rooms.items():
|
|
print(f" {r:<10}: {rid} {self.neighbors[r]}")
|
|
print()
|
|
# for rid, paths in self.room_ids.items():
|
|
# print(f" {rid}: {paths}")
|
|
# print()
|
|
# for p, pu in self.unification_id.items():
|
|
# print(f" {p}: {pu}")
|
|
# print()
|
|
|
|
def unify(self, path1, path2):
|
|
"""
|
|
try to unify rooms at paths p1, p2
|
|
return unified rooms
|
|
"""
|
|
print("unify", path1, path2)
|
|
if path1 == path2:
|
|
return
|
|
|
|
assert path1 in self.rooms, f"room '{path1}' not explored"
|
|
assert path2 in self.rooms, f"room '{path2}' not explored"
|
|
|
|
if self.rooms[path1] != self.rooms[path2]:
|
|
raise ExploreError(f"ids of '{path1}'({self.rooms[path1]}) and '{path2}'({self.rooms[path2]}) do not match")
|
|
|
|
path = min(path1, path2)
|
|
pmerge = max(path1, path2)
|
|
|
|
# unify paths
|
|
self.unification_id[pmerge] = path
|
|
|
|
merged_neighbors = []
|
|
for n, ((p, rid), (pm, rmid)) in enumerate(zip(self.neighbors[path], self.neighbors[pmerge])):
|
|
if rid and rmid and rid != rmid:
|
|
raise ExploreError(f"neighbor {n} of '{path}'({rid}) and '{pmerge}'({rmid}) do not match")
|
|
merged_neighbors.append((self._path(p), rid or rmid))
|
|
|
|
# fix rooms
|
|
del self.rooms[pmerge]
|
|
del self.neighbors[pmerge]
|
|
|
|
for p in list(self.rooms.keys()):
|
|
pm = self._path(p)
|
|
if p != pm:
|
|
rid = self.rooms[p]
|
|
del self.rooms[p]
|
|
self.rooms[pm] = rid
|
|
ns = self.neighbors[p]
|
|
del self.neighbors[p]
|
|
self.neighbors[pm] = ns
|
|
|
|
for i, ps in self.room_ids.items():
|
|
self.room_ids[i] = {self._path(p) for p in ps if p != pmerge}
|
|
|
|
self.neighbors[path] = merged_neighbors
|
|
for p, ns in self.neighbors.items():
|
|
new = []
|
|
for np, rid in ns:
|
|
np_ = self._path(np)
|
|
new.append((np_, rid))
|
|
if rid:
|
|
try:
|
|
assert np_ in self.rooms, f"unify: path {np} {np_} of {(p, ns)} not in rooms"
|
|
except AssertionError as exc:
|
|
self.dump()
|
|
raise exc
|
|
self.neighbors[p] = new
|
|
|
|
def unify_all(self):
|
|
while True:
|
|
repeat = False
|
|
for rid, paths in self.room_ids.items():
|
|
if len(paths) > 1:
|
|
paths = list(paths)
|
|
# print("unify", paths[0], paths[1])
|
|
self.unify(paths[0], paths[1])
|
|
repeat = True
|
|
break
|
|
if not repeat:
|
|
break
|
|
|
|
def unexplored(self):
|
|
unexplored = []
|
|
for path, ns in self.neighbors.items():
|
|
for d, (p, rid) in enumerate(ns):
|
|
if not rid:
|
|
unexplored.append((d, path))
|
|
if unexplored:
|
|
yield unexplored
|
|
|
|
def is_explored(self):
|
|
return next(self.unexplored(), None) is None and all(len(p) == 1 for p in self.room_ids.values())
|
|
|
|
def layout(self, orig=None, start=Path()):
|
|
aedi = graph.Aedificium(self.problem)
|
|
|
|
ids = {}
|
|
for i, p in enumerate(self.rooms.keys()):
|
|
ids[p] = i
|
|
aedi.add_node(str(p))
|
|
|
|
connected = set()
|
|
connections = []
|
|
connect_errors = []
|
|
for path, ns in self.neighbors.items():
|
|
src_path = path
|
|
for src_door, (trg_path, _) in enumerate(ns):
|
|
src = (src_path, src_door)
|
|
trg_door = next(
|
|
(j for j, (p, rid) in enumerate(self.neighbors[trg_path]) if p == src_path),
|
|
None,
|
|
)
|
|
if trg_door is None:
|
|
# raise Exception(f"backlink not found: {(src, trg_path, self.neighbors[trg_path])}")
|
|
print(f"backlink not found: {(src, trg_path, self.neighbors[trg_path])}")
|
|
connect_errors.append((src[0], src[1], trg_path))
|
|
trg_door = 0
|
|
trg = (trg_path, trg_door)
|
|
|
|
if (src, trg) in connected or (trg, src) in connected:
|
|
continue
|
|
connected.add((src, trg))
|
|
connections.append(
|
|
{
|
|
"from": {"room": ids[src[0]], "door": src[1]},
|
|
"to": {"room": ids[trg[0]], "door": trg[1]},
|
|
}
|
|
)
|
|
aedi.add_edge(src[0], src[1], trg[0], trg[1])
|
|
|
|
if connect_errors:
|
|
# try to fix connection issues
|
|
updated = False
|
|
src = {src_path: (trg_path, src_door) for src_path, src_door, trg_path in connect_errors}
|
|
trg = {trg_path: (src_path, src_door) for src_path, src_door, trg_path in connect_errors}
|
|
|
|
for trg_path, (src_path, src_door) in trg.items():
|
|
if trg_path in src:
|
|
self.neighbors[trg_path][src_door] = (src_path, "**FIXED**")
|
|
updated = True
|
|
|
|
assert updated, "could not fix connections"
|
|
self.dump()
|
|
return self.layout(orig=orig, start=start)
|
|
|
|
if orig:
|
|
rooms = orig
|
|
else:
|
|
rooms = {}
|
|
for path, rid in self.rooms.items():
|
|
rooms[path] = rid[0]
|
|
|
|
layout = {
|
|
"rooms": [int(rooms[p]) for p in ids.keys()],
|
|
"startingRoom": ids[start],
|
|
"connections": connections,
|
|
}
|
|
|
|
aedi.render()
|
|
return layout
|
|
|
|
def returnfrom(self, path):
|
|
queue = [(path, Path())]
|
|
while queue:
|
|
p, ret = queue.pop(0)
|
|
if p == Path():
|
|
return ret
|
|
|
|
for d, (q, _) in enumerate(self.neighbors[p]):
|
|
queue.append((q, ret + Path([d])))
|
|
|
|
assert False, "return path not found"
|
|
|
|
def walk(self, path, start=Path()):
|
|
here = start
|
|
for d in map(int, path.path):
|
|
here = self.neighbors[here][d][0]
|
|
return here
|
|
|
|
|
|
def get_mark(rid, mask):
|
|
return Path("[" + str(int(rid[0]) ^ mask) + "]")
|
|
|
|
|
|
def apply_mask(rid, mask):
|
|
return "".join(map(lambda d: str(int(d) & ~mask), rid))
|
|
|
|
|
|
def mark_solve(problem, nrooms):
|
|
probes = [d + d + d + d + d + d for d in DOORS[:2]]
|
|
api.select(problem)
|
|
|
|
masks = [0, 1, 2]
|
|
exs = []
|
|
loop = Path()
|
|
mark_loop = Path()
|
|
|
|
for mask in masks:
|
|
print("exploration mask", mask)
|
|
if mask != 0:
|
|
for p, rid in exs[-1].rooms.items():
|
|
q = exs[-1].returnfrom(p)
|
|
loop = loop + p + q
|
|
mark_loop = mark_loop + p + get_mark(rid, mask) + q
|
|
|
|
print("marked", mark_loop, loop)
|
|
assert exs[-1].walk(loop) == Path(), f"loop doesn't close {exs[-1].walk(loop)}"
|
|
|
|
ex = Explore(problem, probes)
|
|
exs.append(ex)
|
|
|
|
ex.explore([Path()], mark=mark_loop)
|
|
ex.dump()
|
|
|
|
while True:
|
|
unexplored = next(ex.unexplored(), None)
|
|
if unexplored is None:
|
|
break
|
|
|
|
paths = []
|
|
for door, path in unexplored:
|
|
paths.append(path + Path([door]))
|
|
print("explore", unexplored)
|
|
ex.explore(paths, mark=mark_loop)
|
|
ex.unify_all()
|
|
ex.dump()
|
|
|
|
assert ex.is_explored(), "not fully explored"
|
|
if nrooms % len(ex.rooms) != 0:
|
|
raise ExploreError(f"not all rooms could be identifed {len(ex.rooms)}/{nrooms}")
|
|
|
|
if len(ex.rooms) == nrooms:
|
|
print("found all rooms")
|
|
break
|
|
|
|
if len(ex.rooms) != nrooms:
|
|
raise ExploreError(f"not all rooms could be identifed {len(ex.rooms)}/{nrooms}")
|
|
for e in exs:
|
|
e.dump()
|
|
|
|
# get old markings
|
|
orig = {}
|
|
for path in ex.rooms.keys():
|
|
p = exs[0].walk(path)
|
|
rid = exs[0].rooms[p]
|
|
orig[path] = rid[0]
|
|
print(path, p, rid)
|
|
print(orig)
|
|
|
|
# try to identify original start
|
|
start = apply_mask(exs[0].rooms[Path()], mask)
|
|
starts = []
|
|
all_masked = []
|
|
for p, rid in ex.rooms.items():
|
|
mrid = apply_mask(rid, mask)
|
|
all_masked.append(mrid)
|
|
if mrid == start:
|
|
starts.append(p)
|
|
print("start", start, starts, all_masked)
|
|
assert len(starts) > 0
|
|
# try to reach . from there
|
|
start = None
|
|
for s in starts:
|
|
t = ex.walk(loop, start=s)
|
|
print(f"walk {s} => {t}")
|
|
if t == Path():
|
|
start = s
|
|
print("use as start", start)
|
|
assert start is not None
|
|
|
|
layout = ex.layout(orig=orig, start=starts[0])
|
|
guess_ok = api.guess(layout)
|
|
print("guess", guess_ok)
|
|
print("score", sum(e.score for e in exs))
|
|
return guess_ok
|
|
|
|
|
|
if __name__ == "__main__":
|
|
problem = sys.argv[1]
|
|
|
|
with open(os.path.join("..", "problems.json")) as h:
|
|
problems = json.loads(h.read())
|
|
|
|
problems = {p["problem"]: {"size": p["size"], "idx": i} for i, p in enumerate(problems)}
|
|
|
|
if problem not in problems:
|
|
raise ExploreError(f"unknown problem {problem}")
|
|
|
|
ok = False
|
|
try:
|
|
ok = mark_solve(problem, problems[problem]["size"])
|
|
api.clean_explore_cache()
|
|
except ExploreError as exc:
|
|
api.clean_explore_cache()
|
|
raise exc
|
|
except Exception as exc:
|
|
api.write_explore_cache()
|
|
raise exc
|
|
|
|
sys.exit(0 if ok else 1)
|