<< BACK TO RS001 LOG #!/usr/bin/env python3 """ q2n - QEC to NNTP sync This script syncs QEC logs to NNTP. * everything configurable via `Config` * has a throttler so we don't accidentally submit too much at a time * has a dry-run for submission * it remembers what has already been submitted so they don't get submitted again * TODO: - read from argv or a config file - set up a cron job - put it on tildegit (once my application issue get sorted out) """ from dataclasses import dataclass import datetime as dt import io import logging import os import pickle import pwd import random import subprocess as sp import time import typing as t _LOGGER = logging.getLogger(__name__) Path = str User = str NntpArticleBody = str LogEntryHash = str @dataclass class Config: listing_dir: str listing_filename: str nntp_group: str nntp_server: str max_submission: int submission_store_dir: Path @classmethod def create(cls): return Config( listing_dir="/var/gopher/", listing_filename="listing.gophermap", nntp_server="localhost", # TODO: find more appropriate one nntp_group="cosmic.worldbuilding", max_submission=5, submission_store_dir="/var/tmp/q2n", ) @dataclass class Ship: name: str owner: User @dataclass class LogEntry: ship: Ship author: User title: str file_name: str class LogIterator(t.Protocol): def __call__(self) -> t.List[LogEntry]: ... class SubmitCondition(t.Protocol): def __call__(self, log_entry: LogEntry) -> bool: ... class LogSubmitter(t.Protocol): def __call__(self, log: LogEntry) -> None: ... @dataclass class Utils: config: Config def ship_owner(self, ship_name: str) -> User: return self._get_path_user( f"{self.config.listing_dir}/{ship_name}" ) def read_log_content(self, log: LogEntry) -> str: return self._read_log_entry( f"{self.config.listing_dir}/{log.ship.name}/{log.file_name}" ) @staticmethod def _read_log_entry(path: str) -> str: with open(path, "r", encoding="utf-8") as f: return f.read() @staticmethod def _get_path_user(fp: str) -> User: st = os.stat(fp) return pwd.getpwuid(st.st_uid).pw_name @dataclass class SubmittedLogsStore: store_dir: str def __post_init__(self): import subprocess as sp sp.check_call( f"mkdir -p {self.store_dir}", shell=True ) def record_submission(self, log: LogEntry): with open(f"{self.store_dir}/{self.checksum(log)}", "wb") as f: pickle.dump(log, f) def load_submitted_logs(self) -> t.List[LogEntryHash]: return os.listdir(self.store_dir) @staticmethod def checksum(log: LogEntry) -> LogEntryHash: import hashlib checked_str = f"{log.ship.name}{log.file_name}" return hashlib.md5(checked_str.encode("utf-8")).hexdigest() # Throttles log entries to submit. Just in case there's a bug. # Usually we'd limit logs to submit to a small number, and maybe also # send out some alert. SubmissionThrottle = t.Callable[[t.List[LogEntry]], t.List[LogEntry]] @dataclass class ListingFileLogIterator(LogIterator): listing_dir: str listing_filename: str utils: Utils def __call__(self) -> t.List[LogEntry]: with open( f"{self.listing_dir}/{self.listing_filename}", "r", encoding="utf-8" ) as f: entries = f.readlines() return [self._parse(ent) for ent in entries] def _parse(self, entry: str) -> LogEntry: """Parse a listing file entry into a `LogEntry` An entry looks like this: 0betsy - About QEC /betsy/qec.txt I.e. 0<ship> - <title><TAB><file_path> Note: * <file_path> is rooted at /var/gohper, i.e., where the listing file resides. """ import re res = re.match(r"^0(.+?) - (.+)\t(.+)##content##quot;, entry) if not res: raise ValueError(f"Cannot parse: {entry}") # It's more robust to use the file path (/ship/fn.txt) to obtain ship's # name, rather than res.group(1). This is b/c there're duplicated # entries in the listing: # 0Polonia - 24131 /Polonia-II/24131.txt # 0Polonia-II - 24131 /Polonia-II/24131.txt title = res.group(2) log_path = res.group(3) ship, log_fn = self._parse_log_file_name(log_path) ship_owner = self.utils.ship_owner(ship) return LogEntry( ship=Ship(name=ship, owner=ship_owner), author=ship_owner, title=title, file_name=log_fn, ) @staticmethod def _parse_log_file_name(ship_and_file: str) -> t.Tuple[str, str]: "/<ship>/file.txt -> (<ship>, file.txt)" return t.cast( t.Tuple[str, str], tuple(x for x in ship_and_file.split("/") if x), ) @dataclass class SubmitConditionImpl(SubmitCondition): submission_store: SubmittedLogsStore def __call__(self, log_entry: LogEntry) -> bool: return ( self.submission_store.checksum(log_entry) not in self.submission_store.load_submitted_logs() ) @dataclass class NntpLogSubmitter(LogSubmitter): @dataclass class NntpLogFormat: subject: str body: str from_: str submission_store: SubmittedLogsStore read_log_entry: t.Callable[[LogEntry], NntpArticleBody] nntp_group: str nntp_server: str dry_run: bool = False def __call__(self, log: LogEntry) -> None: self.nntp_submit(log) self.submission_store.record_submission(log) def add_envelope(self, article: str, log: LogEntry) -> str: return f"""\ TIMESTAMP: {int(time.time())} SGT AUTHOR: {log.author} ORIGINATING SHIP: {log.ship.name} QEC GATEWAY: QG-{random.randint(0, 31)} {article} """ def nntp_submit(self, log: LogEntry) -> None: import nntplib as nn s = nn.NNTP(self.nntp_server, readermode=True) article_body = self.read_log_entry(log) article_body = self.add_envelope(article_body, log) msg = f"""\ Newsgroups: {self.nntp_group} Subject: [QEC] {log.title} From: {log.author} "{log.author}@cosmic.voyage" {article_body} """ f = io.BytesIO(msg.encode("utf-8")) f.seek(0) _LOGGER.info(f"About to submit log:\n{msg}") if not self.dry_run: s.post(f) @dataclass class SubmissionThrottler: max_submission: int def __call__(self, logs: t.List[LogEntry]) -> t.List[LogEntry]: return logs[0:self.max_submission] def main(): logging.basicConfig() logging.root.setLevel(logging.INFO) config = Config.create() _LOGGER.info(f"Running with config: {config}") utils = Utils(config=config) iterate_logs = ListingFileLogIterator( listing_dir=config.listing_dir, listing_filename=config.listing_filename, utils=utils, ) throttler = SubmissionThrottler(config.max_submission) submission_store = SubmittedLogsStore(store_dir=config.submission_store_dir) should_submit = SubmitConditionImpl(submission_store=submission_store) submit_log = NntpLogSubmitter( submission_store=submission_store, read_log_entry=utils.read_log_content, nntp_group=config.nntp_group, nntp_server=config.nntp_server, dry_run=True, # TODO remove ) logs_to_submit = [log for log in iterate_logs() if should_submit(log)] ### # FOR TEST: remove - randomly choose one log ### logs_to_submit = logs_to_submit[random.randint(0, len(logs_to_submit)-2):][0:] logs_to_submit = throttler(logs_to_submit) _LOGGER.info(f"Submitting {len(logs_to_submit)} logs...") for log in logs_to_submit: submit_log(log) if __name__ == "__main__": main()