# Start the server with: # # uvicorn tools.gh_moderator:app --reload --port 59523 # # pip install openai pygithub fastapi uvicorn hypy_utils import hashlib import hmac import json import unicodedata from datetime import datetime from pathlib import Path import openai import tomllib as toml from fastapi import FastAPI, Request, Response from github import Github from hypy_utils import write, json_stringify from hypy_utils.logging_utils import setup_logger from openai.openai_object import OpenAIObject from hyfetch.color_util import printc log = setup_logger() def read_config(): with open(Path.home() / ".config/gh_moderator.toml", "rb") as f: return toml.load(f) app = FastAPI() config = read_config() webhook_secret = bytes(config["webhook_secret"], "utf-8") gh = Github(per_page=100, login_or_token=config["gh_token"]) me = gh.get_user() repo = gh.get_repo(config["gh_repo"]) printc(f"&a[+] Logged in as {me.login}") harm_classifier_url, harm_classifier_token = config["harm_classifier_url"], config["harm_classifier_token"] script_path = Path(__file__).parent supported_events = ["issue_comment", "issues", "pull_request", "pull_request_review_comment"] ai_notice = f"If you think this is a false-positive, please contact the owner of this repo." openai.organization = config['OpenAI']['org'] openai.api_key = config['OpenAI']['key'] openai_model = config['OpenAI']['model'] def get_content(event: str, obj: dict) -> str: # Get the content of the event match event: case "issue_comment" | "pull_request_review_comment": return obj["comment"]["body"] case "issues": return obj["issue"]["title"] + "\n\n" + obj["issue"]["body"] case "pull_request": return obj["pull_request"]["title"] + "\n\n" + obj["pull_request"]["body"] def redact(event: str, obj: dict, id: str, reason: str): """ Redact the event """ printc(f"&c[!] Redacting {event} {id} for {reason}.") tail = f"\n\n> Reason: {reason}\n> {ai_notice}" redact_notice = f"[Redacted by [AI Content Moderator]({me.html_url})]{tail}" match event: case "issue_comment": # Redact the comment comment = repo.get_issue(obj["issue"]["number"]).get_comment(obj["comment"]["id"]) comment.edit(body=redact_notice) case "pull_request_review_comment": # Redact the comment comment = repo.get_pull(obj["pull_request"]["number"]).get_review_comment(obj["comment"]["id"]) comment.edit(body=redact_notice) case "issues" | "pull_request": # Close the issue iss = repo.get_issue(obj["issue"]["number"]) iss.edit(title="[Redacted]", body=redact_notice, state="closed") iss.create_comment(f"Issue closed for potentially offensive content.{tail}") iss.lock("spam") async def process_event(event: str, obj: dict, id: str): # Preliminary checks if event not in supported_events: printc(f"&7[-] Unknown event: {event}") return if obj['repository']['full_name'] != repo.full_name: printc(f"&7[-] Unknown repository: {obj['repository']['full_name']}") return blacklist_users = {v for v in (script_path / "blacklist_users.csv").read_text().split("\n") if v} actor = obj["sender"]["login"] if actor == me.login: printc(f"&7[-] Ignoring event by myself: {id} {event} by {actor}") return printc(f"&e[+] Received event: {id} {event} by {actor}") if actor in blacklist_users: redact(event, obj, id, "User is blacklisted") return # Normalize content content = unicodedata.normalize("NFKC", get_content(event, obj)) # Ask OpenAI to predict if it's offensive res: OpenAIObject = openai.Moderation.create(content, openai_model).results[0] write(f"moderator-data/openai/{id}.json", json_stringify(res)) if res.flagged: printc(f"\n&c[!] AI classified {event} {id} by {actor} as offensive !!!\n> Content: {content}\n\n") reason = " | ".join(f"{k} {res.category_scores.get(k) * 100:.0f}%" for k, v in res.categories.items() if v) redact(event, obj, id, f"Flagged by OpenAI : {reason}") return printc(f"&a[~] AI classified {event} {id} by {actor} as safe.") @app.post("/") async def handle_webhook(request: Request, response: Response): # Read headers event = request.headers.get("X-GitHub-Event") signature = request.headers.get("X-Hub-Signature") # Verify the signature body = await request.body() if not verify_signature(signature, body): response.status_code = 401 return {"message": "Invalid signature"} # Parse the event body obj = json.loads(body.decode()) # Log the request id = datetime.now().isoformat() write(f"moderator-data/webhook/{id}-{event}.json", json_stringify(obj, indent=4)) await process_event(event, obj, id) return {"message": "OK"} # Helper function to verify the signature def verify_signature(signature: str, payload: bytes) -> bool: hash_type, signature = signature.split("=") digest = hmac.new(webhook_secret, msg=payload, digestmod=getattr(hashlib, hash_type)).hexdigest() return hmac.compare_digest(digest, signature)