""" 해커톤 투표 앱 — DB 없이 단일 JSON. - 참가자: 이름 선택 → 본인 팀 자동 → 본인 팀 제외 3분야 투표 - 진행자: ?mode=admin&token=XXX 로 결과 확인 - 시상: ?mode=ceremony&token=XXX """ import csv import io import json import os import random as _rand import threading from datetime import datetime import socket from io import BytesIO from pathlib import Path import qrcode import streamlit as st DATA_PATH = os.environ.get( "DATA_PATH", str(Path(__file__).parent / "hackathon.json") ) ADMIN_TOKEN = os.environ.get("ADMIN_TOKEN", "change-me") CATEGORIES = [ ("fun_team", "🎉 재미상", "손선풍기 5개"), ("polish_team", "🏆 완성도상", "양우산 5개"), ("utility_team", "🛠 실용성상", "팜레스트 5개"), ] PRIZE_PRIORITY = ["utility_team", "polish_team", "fun_team"] _lock = threading.RLock() def _empty_state(): return { "people": [], "settings": {"voting_open": True, "current_stage": "intro"}, "titles": {}, "tie_breaks": {}, "votes": [], "topics": {"categories": []}, } def load_data(): """매 호출 디스크 read. 호스트 편집 즉시 반영.""" with _lock: p = Path(DATA_PATH) if not p.exists(): return _empty_state() try: data = json.loads(p.read_text(encoding="utf-8")) except json.JSONDecodeError: return _empty_state() # 누락 키 채움 base = _empty_state() base.update(data) for k, v in _empty_state().items(): base.setdefault(k, v) # 한 단계 deep merge — 기존 데이터에 누락된 nested 키 보강 for nested_key in ("settings", "topics"): for k, default_v in _empty_state()[nested_key].items(): base[nested_key].setdefault(k, default_v) return base def save_data(data): """원자적 write — tmp 파일 + rename.""" with _lock: tmp = DATA_PATH + ".tmp" Path(tmp).write_text( json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8" ) os.replace(tmp, DATA_PATH) def update_data(fn): """load → modify → save 원자적 묶음.""" with _lock: data = load_data() result = fn(data) save_data(data) return result def get_participants(): return {p["name"]: p["team"] for p in load_data().get("people", [])} def get_teams(): parts = get_participants() return sorted(set(parts.values())) if parts else [f"팀{i}" for i in range(1, 8)] def get_titles(): return load_data().get("titles", {}) def set_title(team, title): def _fn(data): data["titles"][team] = title.strip() update_data(_fn) def is_voting_open(): return load_data().get("settings", {}).get("voting_open", True) def set_voting_open(flag): def _fn(data): data["settings"]["voting_open"] = bool(flag) update_data(_fn) VALID_STAGES = ("intro", "topics", "vote") def get_stage(): return load_data().get("settings", {}).get("current_stage", "intro") def set_stage(stage): if stage not in VALID_STAGES: raise ValueError(f"invalid stage: {stage!r}") def _fn(data): data["settings"]["current_stage"] = stage if stage == "vote": data["settings"]["voting_open"] = True update_data(_fn) def can_accept_votes(data): s = data.get("settings", {}) return s.get("current_stage") == "vote" and s.get("voting_open", False) def get_topics(): return load_data().get("topics", {}).get("categories", []) def update_topics(categories): def _fn(data): data.setdefault("topics", {}) data["topics"]["categories"] = categories update_data(_fn) def get_tie_breaks(): return load_data().get("tie_breaks", {}) def save_tie_break(category, winner, method): def _fn(data): data["tie_breaks"][category] = { "winner_team": winner, "method": method, "decided_at": datetime.now().isoformat(timespec="seconds"), } update_data(_fn) def clear_tie_break(category): def _fn(data): data["tie_breaks"].pop(category, None) update_data(_fn) def insert_vote(voter_name, employee_id, voter_team, picks): """UNIQUE on voter_name. 이미 있으면 ValueError.""" def _fn(data): existing = {v["voter_name"] for v in data["votes"]} if voter_name in existing: raise ValueError("DUPLICATE_VOTER") data["votes"].append( { "voter_name": voter_name, "employee_id": employee_id, "voter_team": voter_team, "fun_team": picks["fun_team"], "polish_team": picks["polish_team"], "utility_team": picks["utility_team"], "created_at": datetime.now().isoformat(timespec="seconds"), } ) update_data(_fn) def list_votes(): return load_data().get("votes", []) def clear_votes(): def _fn(data): data["votes"] = [] update_data(_fn) def fmt_team(team, titles): t = titles.get(team, "") return f"{team} — {t}" if t else team def make_qr_png(url: str, box_size: int = 20) -> bytes: img = qrcode.make(url, box_size=box_size, border=2) buf = BytesIO() img.save(buf, format="PNG") return buf.getvalue() def _detect_lan_ip() -> str: """LAN IP 자동 감지. 실패 시 'localhost'.""" try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) ip = s.getsockname()[0] s.close() return ip except Exception: return "localhost" def compute_vote_url() -> str: data = load_data() base = ( data.get("settings", {}).get("public_base_url") or os.environ.get("PUBLIC_BASE_URL") or f"http://{_detect_lan_ip()}:8501" ) return f"{base.rstrip('/')}/?mode=vote" def compute_winners(): """우선순위 기반 1팀 1상 + 동률 처리.""" data = load_data() votes = data.get("votes", []) tie_decisions = data.get("tie_breaks", {}) rankings = {} for col, _, _ in CATEGORIES: counts = {} for v in votes: t = v.get(col) if t: counts[t] = counts.get(t, 0) + 1 rankings[col] = sorted(counts.items(), key=lambda x: (-x[1], x[0])) winners = {} excluded = set() for col in PRIZE_PRIORITY: rows = rankings[col] filtered = [(t, c) for t, c in rows if t not in excluded] if not filtered: winners[col] = {"status": "empty"} continue top_votes = filtered[0][1] tied = [t for t, c in filtered if c == top_votes] if len(tied) > 1: decision = tie_decisions.get(col) if decision and decision.get("winner_team") in tied: winner = decision["winner_team"] method = decision.get("method", "") else: winners[col] = { "status": "tie", "tied": tied, "votes": top_votes, } continue else: winner = filtered[0][0] method = "" runner_votes = next((c for t, c in filtered if t != winner), 0) winners[col] = { "status": "ok", "team": winner, "votes": top_votes, "diff": top_votes - runner_votes, "method": method, } excluded.add(winner) return winners, rankings def archive_results(): """결과 timestamped JSON. 모든 winners ok일 때만.""" winners, _ = compute_winners() if any(w.get("status") != "ok" for w in winners.values()): return None titles = get_titles() data = { "timestamp": datetime.now().isoformat(timespec="seconds"), "results": [], } for col, label, prize in CATEGORIES: w = winners.get(col) if w and w.get("status") == "ok": data["results"].append( { "category": label, "prize": prize, "team": w["team"], "title": titles.get(w["team"], ""), "votes": w["votes"], "diff_2nd": w["diff"], "method": w.get("method") or "majority", } ) archive_dir = os.path.dirname(DATA_PATH) or "." ts = datetime.now().strftime("%Y%m%d_%H%M%S") path = os.path.join(archive_dir, f"results_{ts}.json") Path(path).write_text( json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8" ) return path # --- UI --- def render_voter(): if not can_accept_votes(load_data()): st.title("🗳 해커톤 투표") st.info("⏳ 지금은 투표 시간이 아닙니다. 진행자가 투표 stage로 전환할 때까지 기다려주세요.") return st.title("🗳 해커톤 투표") if not is_voting_open(): st.error("⏹️ 투표가 마감되었습니다. 시상식을 기다려주세요.") return with st.expander("📖 투표 방식 / 수상 결정 (꼭 읽어주세요)", expanded=True): st.markdown( """ **투표 방식** - 본인 이름 선택 + **본인 사번 입력** - 본인 팀 자동 매핑 - 본인 팀 **제외**, 다른 6팀 중 각 분야별 1팀씩 투표 (총 3표) - **한 번만 제출 가능** (이름 unique) - 사번은 사칭 의심 시 추적용으로 기록됨 **수상 결정 — 1팀 1상 한정** - 우선순위: 🛠 **실용성상 (팜레스트)** > 🏆 **완성도상 (양우산)** > 🎉 **재미상 (손선풍기)** - 상위상 수상 팀은 후순위 상에서 자동 제외 - 한 팀이 모든 분야 1위여도 **가장 비싼 상 1개**만 받음 **시상 발표 순서** - 🎉 재미상 → 🏆 완성도상 → 🛠 실용성상 (긴장감 build-up) """ ) PARTS = get_participants() TEAMS = get_teams() if not PARTS: st.error("참가자 명단이 없습니다. `assign_teams.py` 먼저 실행하세요.") return voted_set = {v["voter_name"] for v in list_votes()} def fmt_name(n): return f"{n} ✅ (이미 투표함)" if n in voted_set else n name = st.selectbox( "본인 이름", options=sorted(PARTS.keys()), index=None, placeholder="이름 선택", format_func=fmt_name, ) employee_id = st.text_input( "사번", placeholder="본인 사번만 (전화번호/주민번호 입력 금지)", help="사칭 추적용으로 기록됨. 본인 사번 외 입력하지 마세요.", ) if not name: st.info("이름을 선택하면 본인 팀이 자동 표시됩니다.") return if name in voted_set: st.error( f"❌ {name}님은 이미 투표하셨습니다. 한 번만 가능합니다. " "다른 사람으로 잘못 누른 경우 진행자에게 문의하세요." ) return if not employee_id.strip(): st.warning("사번 입력 후 진행하세요.") return my_team = PARTS[name] titles = get_titles() st.info(f"본인 팀: **{fmt_team(my_team, titles)}**") candidates = [t for t in TEAMS if t != my_team] with st.form("vote", clear_on_submit=False): st.divider() picks = {} for col, label, _ in CATEGORIES: picks[col] = st.radio( label, candidates, index=None, key=col, format_func=lambda t: fmt_team(t, titles), ) submitted = st.form_submit_button("제출") if submitted: if any(picks.get(col) is None for col, _, _ in CATEGORIES): st.error("3분야 모두 선택하세요.") return try: insert_vote(name, employee_id.strip(), my_team, picks) st.success(f"✅ {name}님 ({my_team}) 투표 완료. 창 닫아도 됩니다.") st.balloons() except ValueError: st.error(f"❌ '{name}' 이미 투표함. 진행자에게 문의하세요.") def render_admin(): token = st.query_params.get("token", "") if token != ADMIN_TOKEN: st.error("권한 없음. ?mode=admin&token=... 형식 필요.") return st.title("🔐 진행자 콘솔") with st.expander("🔗 다른 페이지 URL"): st.markdown( f""" - 👥 **참가자 투표**: [/](/) - 🎉 **시상식 (큰 화면)**: [/?mode=ceremony&token=...](?mode=ceremony&token={ADMIN_TOKEN}) - 📦 **JSON 원본 조회**: [/?mode=raw&token=...](?mode=raw&token={ADMIN_TOKEN}) 호스트에서 LAN IP 포함 모든 URL 보기: ```bash ./show-urls.sh ``` """ ) with st.expander("💾 데이터 백업 (hackathon.json 다운로드)"): try: raw_bytes = Path(DATA_PATH).read_bytes() ts = datetime.now().strftime("%Y%m%d_%H%M%S") st.download_button( "📥 hackathon.json 다운로드", raw_bytes, file_name=f"hackathon_{ts}.json", mime="application/json", ) st.caption(f"파일 경로: `{DATA_PATH}` ({len(raw_bytes):,} bytes)") except FileNotFoundError: st.warning(f"파일 없음: {DATA_PATH}") voting_open = is_voting_open() cur_label = "🟢 투표 진행 중" if voting_open else "🔴 투표 마감됨" st.markdown(f"### 투표 상태: {cur_label}") cc1, cc2 = st.columns(2) with cc1: if voting_open and st.button("🛑 투표 마감 (시상 시작 전 필수)", type="primary"): set_voting_open(False) st.rerun() with cc2: if not voting_open and st.button("🔓 투표 재개"): set_voting_open(True) st.rerun() st.divider() PARTS = get_participants() TEAMS = get_teams() votes = list_votes() total = len(votes) expected = len(PARTS) if PARTS else None col_a, col_b, col_c = st.columns(3) col_a.metric("투표 참여자", f"{total}명") col_b.metric("팀 수", f"{len(TEAMS)}팀") if expected: pct = int(100 * total / expected) col_c.metric("참여율", f"{pct}% ({total}/{expected})") if expected and total < expected: voted = {v["voter_name"] for v in votes} not_voted = [n for n in PARTS.keys() if n not in voted] with st.expander(f"⏳ 미투표자 ({len(not_voted)}명)"): for n in sorted(not_voted): st.write(f"- {n} ({PARTS[n]})") st.divider() st.subheader("📝 팀별 결과물 제목 입력") titles = get_titles() with st.form("titles_form"): new_titles = {} for team in TEAMS: new_titles[team] = st.text_input( team, value=titles.get(team, ""), placeholder="예: 슬랙 멘션 자동 분류기", key=f"title_{team}", ) if st.form_submit_button("제목 저장"): for team, title in new_titles.items(): set_title(team, title) st.success("제목 저장 완료. 투표 페이지에 반영됨.") st.rerun() st.divider() st.subheader("📊 분야별 집계 (우선순위 적용 — 1팀 1상)") st.caption( "수상 결정 순서: 팜레스트(실용성) → 양우산(완성도) → 손선풍기(재미). " "이미 받은 팀은 후순위 상에서 제외." ) winners, rankings = compute_winners() awarded_teams = { w["team"] for w in winners.values() if w.get("status") == "ok" } public_lines = [] pending_ties = [ (col, label, prize) for col, label, prize in CATEGORIES if winners.get(col, {}).get("status") == "tie" ] if pending_ties: st.error( f"⚠️ 동률 미해결 {len(pending_ties)}건. 시상 전 추첨/선택 필요." ) for col, label, prize in CATEGORIES: rows = rankings[col] st.markdown(f"### {label} ({prize})") if not rows: st.caption("표 없음") continue result = winners.get(col, {}) status = result.get("status") if status == "empty": st.warning("후보 없음 (모두 우선순위 상 수상)") continue if status == "tie": tied = result["tied"] votes_n = result["votes"] tied_labels = ", ".join(fmt_team(t, titles) for t in tied) st.warning(f"🟰 동률 ({votes_n}표): {tied_labels}") ca, cb = st.columns([1, 2]) with ca: if st.button("🎲 즉석 추첨", key=f"draw_{col}"): chosen = _rand.choice(tied) save_tie_break(col, chosen, "random") st.rerun() with cb: manual = st.selectbox( "수동 선택", tied, index=None, key=f"manual_{col}", format_func=lambda t: fmt_team(t, titles), placeholder="팀 선택", ) if manual and st.button("확정", key=f"manual_btn_{col}"): save_tie_break(col, manual, "manual") st.rerun() continue winner_team = result["team"] winner_votes = result["votes"] diff = result["diff"] method = result.get("method", "") winner_label = fmt_team(winner_team, titles) method_tag = "" if method == "random": method_tag = " 🎲(추첨)" elif method == "manual": method_tag = " 🖊️(수동)" st.success( f"**우승: {winner_label}**{method_tag} — {winner_votes}표 " f"(2위와 {diff}표 차이)" ) if method: if st.button("동률 결정 취소 (재추첨)", key=f"clear_{col}"): clear_tie_break(col) st.rerun() public_lines.append( f"- {label} 우승: **{winner_label}** " f"({winner_votes}표, 2위와 {diff}표 차이)" ) with st.expander("전체 분포 — raw (제외 적용 전)"): for team, c in rows: marker = "" if team in awarded_teams and team != winner_team: marker = " 🚫상위상수상으로 제외" st.write(f"- {fmt_team(team, titles)}: {c}표{marker}") st.divider() st.subheader("🎤 시상식 발표용 (복사해서 화면 공유)") st.code("\n".join(public_lines), language="markdown") st.divider() with st.expander("🔍 감사 로그 (사칭 추적용)"): if not votes: st.caption("투표 없음") else: buf = io.StringIO() w = csv.writer(buf) w.writerow(["시각", "이름", "사번", "본인팀", "재미", "완성도", "실용성"]) for v in votes: w.writerow( [ v["created_at"], v["voter_name"], v.get("employee_id") or "", v["voter_team"], v["fun_team"], v["polish_team"], v["utility_team"], ] ) csv_data = "" + buf.getvalue() st.download_button( "CSV 내려받기", csv_data, file_name="audit.csv", mime="text/csv; charset=utf-8", ) st.dataframe( { "시각": [v["created_at"] for v in votes], "이름": [v["voter_name"] for v in votes], "사번": [v.get("employee_id") or "" for v in votes], "본인팀": [v["voter_team"] for v in votes], "재미": [v["fun_team"] for v in votes], "완성도": [v["polish_team"] for v in votes], "실용성": [v["utility_team"] for v in votes], }, use_container_width=True, hide_index=True, ) from collections import defaultdict by_emp = defaultdict(list) for v in votes: if v.get("employee_id"): by_emp[v["employee_id"]].append(v["voter_name"]) dups = {emp: names for emp, names in by_emp.items() if len(set(names)) > 1} if dups: st.error("⚠️ 같은 사번이 여러 이름으로 투표한 케이스:") for emp, names in dups.items(): st.write(f"- 사번 `{emp}`: {', '.join(names)}") st.divider() with st.expander("⚠️ 위험 작업"): if st.button("모든 투표 삭제 (되돌릴 수 없음)"): clear_votes() st.warning("전체 삭제됨. 새로고침하세요.") def render_ceremony(): token = st.query_params.get("token", "") if token != ADMIN_TOKEN: st.error("권한 없음. ?mode=ceremony&token=... 형식 필요.") return titles = get_titles() votes = list_votes() if not votes: st.warning("📊 아직 투표가 없습니다. 투표 종료 후 시상식을 시작하세요.") return if is_voting_open(): st.error( "⚠️ 투표가 아직 진행 중입니다. 어드민에서 '투표 마감' 후 시상하세요." ) return winners, _ = compute_winners() pending_count = sum( 1 for col, _, _ in CATEGORIES if winners.get(col, {}).get("status") == "tie" ) if pending_count > 0: st.warning("⏳ 시상 준비 중입니다. 잠시만 기다려주세요.") return if not st.session_state.get("ceremony_archived"): archived_path = archive_results() if archived_path: st.session_state.ceremony_archived = archived_path results = [] for col, label, prize in CATEGORIES: result = winners.get(col, {}) if result.get("status") == "ok": results.append( (label, prize, result["team"], result["votes"], result["diff"]) ) if "ceremony_step" not in st.session_state: st.session_state.ceremony_step = 0 if "ceremony_revealed" not in st.session_state: st.session_state.ceremony_revealed = False st.markdown( """ """, unsafe_allow_html=True, ) step = st.session_state.ceremony_step if step == 0: st.markdown('
🎉 해커톤 시상식 🎉
', unsafe_allow_html=True) st.markdown('
준비됐습니다
', unsafe_allow_html=True) if st.button("시작 →", use_container_width=True, type="primary"): st.session_state.ceremony_step = 1 st.session_state.ceremony_revealed = False st.rerun() elif step > len(results): st.markdown('
수고하셨습니다!
', unsafe_allow_html=True) st.balloons() st.snow() if st.button("처음으로", use_container_width=True): st.session_state.ceremony_step = 0 st.session_state.ceremony_revealed = False st.rerun() else: label, prize, winner, votes_n, diff = results[step - 1] st.markdown(f'
{label}
', unsafe_allow_html=True) st.markdown(f'
🎁 상품: {prize}
', unsafe_allow_html=True) if not st.session_state.ceremony_revealed: st.markdown('
🥁🥁🥁
', unsafe_allow_html=True) if st.button("우승팀 공개 →", use_container_width=True, type="primary"): st.session_state.ceremony_revealed = True st.rerun() else: st.balloons() winner_label = fmt_team(winner, titles) st.markdown( f'
🏆
{winner_label}
', unsafe_allow_html=True, ) st.markdown( f'
{votes_n}표 (2위와 {diff}표 차이)
', unsafe_allow_html=True, ) next_label = "다음 부문 →" if step < len(results) else "마무리 →" if st.button(next_label, use_container_width=True, type="primary"): st.session_state.ceremony_step = step + 1 st.session_state.ceremony_revealed = False st.rerun() def render_raw(): """JSON 원본 조회 — admin token 필요.""" token = st.query_params.get("token", "") if token != ADMIN_TOKEN: st.error("권한 없음. ?mode=raw&token=... 형식 필요.") return st.title("📦 hackathon.json 원본") try: raw_text = Path(DATA_PATH).read_text(encoding="utf-8") except FileNotFoundError: st.error(f"파일 없음: {DATA_PATH}") return ts = datetime.now().strftime("%Y%m%d_%H%M%S") col1, col2 = st.columns(2) with col1: st.download_button( "📥 다운로드", raw_text, file_name=f"hackathon_{ts}.json", mime="application/json", use_container_width=True, ) with col2: st.caption(f"`{DATA_PATH}` — {len(raw_text):,} bytes") try: st.json(json.loads(raw_text)) except json.JSONDecodeError: st.code(raw_text, language="json") def main(): st.set_page_config(page_title="해커톤 투표", page_icon="🗳", layout="wide") mode = st.query_params.get("mode", "vote") if mode == "admin": render_admin() elif mode == "ceremony": render_ceremony() elif mode == "raw": render_raw() else: render_voter() if __name__ == "__main__": main()