diff --git a/config b/config index aa9ac2b..9814436 100644 --- a/config +++ b/config @@ -1,2 +1,3 @@ -CONTEST_URL=https://31pwr5t6ij.execute-api.eu-west-2.amazonaws.com +# CONTEST_URL=https://31pwr5t6ij.execute-api.eu-west-2.amazonaws.com +CONTEST_URL=http://localhost:8000 ID="icfp@zeuxis.de DHf1KQyE3vCvMqPaA4LLlw" diff --git a/server/server.py b/server/server.py new file mode 100755 index 0000000..acd8c95 --- /dev/null +++ b/server/server.py @@ -0,0 +1,299 @@ +#!/usr/bin/env python3 + +import itertools +import random +import subprocess +from typing import List + +import fastapi +from fastapi.responses import JSONResponse +from pydantic import BaseModel, Field + +api = fastapi.FastAPI() + + +class Problem(BaseModel): + rooms: int + copies: int + + +PROBLEMS = { + "probatio": Problem(rooms=3, copies=1), + "primus": Problem(rooms=6, copies=1), + "secundus": Problem(rooms=12, copies=1), + "tertius": Problem(rooms=18, copies=1), + "quartus": Problem(rooms=24, copies=1), + "quintus": Problem(rooms=30, copies=1), + "aleph": Problem(rooms=6, copies=2), + "beth": Problem(rooms=12, copies=2), + "gimel": Problem(rooms=18, copies=2), + "daleth": Problem(rooms=24, copies=2), + "he": Problem(rooms=30, copies=2), + "vau": Problem(rooms=6, copies=3), + "zain": Problem(rooms=12, copies=3), + "hhet": Problem(rooms=18, copies=3), + "teth": Problem(rooms=24, copies=3), + "iod": Problem(rooms=30, copies=3), +} + +CURRENT_SELECTION = None + + +class Connection(BaseModel): + class Port(BaseModel): + room: int + door: int + + src: Port = Field(alias="from") + trg: Port = Field(alias="to") + + +class Map(BaseModel): + rooms: List[int] + startingRoom: int + connections: List[Connection] + + +class Library: + class Room: + def __init__(self, label): + self.label = label + self.doors = 6 * [None] + + def __getitem__(self, idx): + return self.doors[idx] + + def __setitem__(self, idx, trg): + self.doors[idx] = trg + + def __repr__(self): + return f"{self.label}: {self.doors}" + + def __init__(self, problem): + self.problem = problem + self.count = 0 + + while True: + start = random.randrange(4) + self.rooms = [ + self.Room(lbl) for lbl in itertools.islice(itertools.cycle(range(4)), start, start + problem.rooms) + ] + + doors = [(r, d) for r in range(len(self.rooms)) for d in range(6)] + while doors: + i1 = random.randrange(len(doors)) + i2 = random.randrange(len(doors)) + r1, d1 = doors[i1] + r2, d2 = doors[i2] + + self.rooms[r1][d1] = r2 + self.rooms[r2][d2] = r1 + + del doors[i1] + if i1 < i2: + del doors[i2 - 1] + elif i1 > i2: + del doors[i2] + + self.start = 0 + + # is connected? + visited = set() + queue = [self.start] + while queue: + src = queue.pop(0) + + for d, trg in enumerate(self.rooms[src]): + if self.rooms[src][d] not in visited: + visited.add(trg) + queue.append(self.rooms[src][d]) + + if len(visited) != len(self.rooms): + continue + + # make copies + for c in range(1, problem.copies): + for i in range(problem.rooms): + room = self.Room(self.rooms[i].label) + self.rooms.append(room) + for d, trg in enumerate(self.rooms[i].doors): + room[d] = trg + c * problem.rooms + + # connect copies + if problem.copies > 1: + for _ in range(problem.rooms * problem.copies // 2): + src = random.randrange(problem.rooms) + src_d = random.randrange(6) + cs = list(range(problem.copies)) + random.shuffle(cs) + c0 = cs[0] * problem.rooms + c1 = cs[1] * problem.rooms + + trg = self.rooms[src][src_d] % problem.rooms + trg_d = next( + i for i, t in enumerate(self.rooms[trg].doors) if t % problem.rooms == src % problem.rooms + ) + + self.rooms[c0 + src][src_d] = c1 + trg + self.rooms[c1 + trg][trg_d] = c0 + src + break + + def dump(self): + for ri, r in enumerate(self.rooms): + print(f" {ri:<2}: {r.label} {r.doors}") + print() + + def explore(self, paths: List[str]) -> List[List[int]]: + self.count += 1 + + results = [] + for path in paths: + pos = self.start + marking = False + marks = [r.label for r in self.rooms] + result = [self.rooms[pos].label] + + for p in path: + match p: + case d if "0" <= p < "6": + if marking: + marks[pos] = int(d) % 4 + else: + pos = self.rooms[pos][int(d)] + result.append(marks[pos]) + case "[": + marking = True + case "]": + marking = False + case _: + pass + results.append(result) + self.count += 1 + + return results + + def guess(self, map): + imap = len(self.rooms) * [None] + try: + assert len(map.rooms) == len(map.rooms), "number of rooms" + irooms = [self.Room(lbl) for lbl in map.rooms] + + assert map.startingRoom in range(len(imap)), "starting room is not a room" + + for i, conn in enumerate(map.connections): + # print(conn) + assert conn.src.room in range(len(imap)), f"{i}.from.room is not a room" + assert conn.src.door in range(6), f"{i}.from.room is not a door" + assert conn.trg.room in range(len(imap)), f"{i}.to.room is not a room" + assert conn.trg.door in range(6), f"{i}.to.room is not a door" + + irooms[conn.src.room][conn.src.door] = conn.trg.room + irooms[conn.trg.room][conn.trg.door] = conn.src.room + + for i, r in enumerate(irooms): + for d, trg in enumerate(r.doors): + assert trg is not None, f"door {d} of room {i} missing" + # self.dump() + # for r in irooms: + # print(r) + + queue = [map.startingRoom] + imap[map.startingRoom] = self.start + visited = set() + + while queue: + isrc = queue.pop(0) + src = imap[isrc] + iroom = irooms[isrc] + + assert iroom.label == self.rooms[src].label, f"wrong label {isrc} {src}" + visited.add(isrc) + + for d, itrg in enumerate(iroom.doors): + if imap[itrg] is None: + imap[itrg] = self.rooms[src][d] + queue.append(itrg) + assert imap[itrg] == self.rooms[src][d], f"wrong connection {isrc} {src} [{d}] {itrg} {imap[itrg]}" + + assert len(visited) == len(self.rooms), "not all rooms visited" + + except AssertionError as exc: + print(exc) + return False + return True + + +class Select(BaseModel): + id: str + problemName: str + + class Problem(BaseModel): + problem: str + size: int + copies: int + + class Response(BaseModel): + problemName: str + + +@api.get(path="/select") +def select_get() -> List[Select.Problem]: + return [ + Select.Problem(problem=pname, size=prob.rooms * prob.copies, copies=prob.copies) + for pname, prob in PROBLEMS.items() + ] + return PROBLEMS + + +@api.post(path="/select") +def select_post(select: Select) -> Select.Response: + global CURRENT_SELECTION + + if select.problemName not in PROBLEMS: + return JSONResponse({"detail": "problem not found"}, status_code=404) + + CURRENT_SELECTION = Library(PROBLEMS[select.problemName]) + CURRENT_SELECTION.dump() + return Select.Response(problemName=select.problemName) + + +class Explore(BaseModel): + id: str + plans: List[str] + + class Response(BaseModel): + results: List[List[int]] + queryCount: int + + +@api.post(path="/explore") +def explore(explore: Explore) -> Explore.Response: + if not CURRENT_SELECTION: + return JSONResponse({"detailno problem selected"}, status_code=409) + + results = CURRENT_SELECTION.explore(explore.plans) + return Explore.Response(results=results, queryCount=CURRENT_SELECTION.count) + + +class Guess(BaseModel): + id: str + map: Map + + class Response(BaseModel): + correct: bool + + +@api.post(path="/guess") +def guess(guess: Guess) -> Guess.Response: + global CURRENT_SELECTION + + if not CURRENT_SELECTION: + return JSONResponse({"detailno problem selected"}, status_code=409) + + ok = CURRENT_SELECTION.guess(guess.map) + CURRENT_SELECTION = None + return Guess.Response(correct=ok) + + +if __name__ == "__main__": + subprocess.run(["fastapi", "dev", __file__])