<< BACK TO RS001 LOG

#!/usr/bin/env python3

"""
q2n - QEC to NNTP sync

This script syncs QEC logs to NNTP.

* everything configurable via `Config`
* has a throttler so we don't accidentally submit too much at a time
* has a dry-run for submission
* it remembers what has already been submitted so they don't get submitted
  again
* TODO:
    - read from argv or a config file
    - set up a cron job
    - put it on tildegit (once my application issue get sorted out)
"""

from dataclasses import dataclass
import datetime as dt
import io
import logging
import os
import pickle
import pwd
import random
import subprocess as sp
import time
import typing as t


_LOGGER = logging.getLogger(__name__)

Path = str
User = str
NntpArticleBody = str
LogEntryHash = str


@dataclass
class Config:
    listing_dir: str
    listing_filename: str
    nntp_group: str
    nntp_server: str
    max_submission: int
    submission_store_dir: Path

    @classmethod
    def create(cls):
        return Config(
            listing_dir="/var/gopher/",
            listing_filename="listing.gophermap",
            nntp_server="localhost",
            # TODO: find more appropriate one
            nntp_group="cosmic.worldbuilding",
            max_submission=5,
            submission_store_dir="/var/tmp/q2n",
        )


@dataclass
class Ship:
    name: str
    owner: User


@dataclass
class LogEntry:
    ship: Ship
    author: User
    title: str
    file_name: str


class LogIterator(t.Protocol):
    def __call__(self) -> t.List[LogEntry]: ...


class SubmitCondition(t.Protocol):
    def __call__(self, log_entry: LogEntry) -> bool: ...


class LogSubmitter(t.Protocol):
    def __call__(self, log: LogEntry) -> None: ...


@dataclass
class Utils:
    config: Config

    def ship_owner(self, ship_name: str) -> User:
        return self._get_path_user(
            f"{self.config.listing_dir}/{ship_name}"
        )

    def read_log_content(self, log: LogEntry) -> str:
        return self._read_log_entry(
            f"{self.config.listing_dir}/{log.ship.name}/{log.file_name}"
        )

    @staticmethod
    def _read_log_entry(path: str) -> str:
        with open(path, "r", encoding="utf-8") as f:
            return f.read()

    @staticmethod
    def _get_path_user(fp: str) -> User:
        st = os.stat(fp)
        return pwd.getpwuid(st.st_uid).pw_name

@dataclass
class SubmittedLogsStore:
    store_dir: str

    def __post_init__(self):
        import subprocess as sp
        sp.check_call(
            f"mkdir -p {self.store_dir}",
            shell=True
        )

    def record_submission(self, log: LogEntry):
        with open(f"{self.store_dir}/{self.checksum(log)}", "wb") as f:
            pickle.dump(log, f)

    def load_submitted_logs(self) -> t.List[LogEntryHash]:
        return os.listdir(self.store_dir)

    @staticmethod
    def checksum(log: LogEntry) -> LogEntryHash:
        import hashlib
        checked_str = f"{log.ship.name}{log.file_name}"
        return hashlib.md5(checked_str.encode("utf-8")).hexdigest()


# Throttles log entries to submit. Just in case there's a bug.
# Usually we'd limit logs to submit to a small number, and maybe also
# send out some alert.
SubmissionThrottle = t.Callable[[t.List[LogEntry]], t.List[LogEntry]]


@dataclass
class ListingFileLogIterator(LogIterator):
    listing_dir: str
    listing_filename: str
    utils: Utils

    def __call__(self) -> t.List[LogEntry]:
        with open(
            f"{self.listing_dir}/{self.listing_filename}",
            "r", 
            encoding="utf-8"
        ) as f:
            entries = f.readlines()
        return [self._parse(ent) for ent in entries]
    
    def _parse(self, entry: str) -> LogEntry:
        """Parse a listing file entry into a `LogEntry`

        An entry looks like this:
            0betsy - About QEC	/betsy/qec.txt

        I.e.
            0<ship> - <TAB><file_path>
        
        Note:
        * <file_path> is rooted at /var/gohper, i.e., where the listing
          file resides.
        """
        import re
        res = re.match(r"^0(.+?) - (.+)\t(.+)$", entry)
        if not res: raise ValueError(f"Cannot parse: {entry}")

        # It's more robust to use the file path (/ship/fn.txt) to obtain ship's
        # name, rather than res.group(1). This is b/c there're duplicated
        # entries in the listing:
        #   0Polonia - 24131	/Polonia-II/24131.txt
        #   0Polonia-II - 24131	/Polonia-II/24131.txt
        title = res.group(2)
        log_path = res.group(3)
        ship, log_fn = self._parse_log_file_name(log_path)
        ship_owner = self.utils.ship_owner(ship)

        return LogEntry(
            ship=Ship(name=ship, owner=ship_owner),
            author=ship_owner,
            title=title,
            file_name=log_fn,
        )
    
    @staticmethod
    def _parse_log_file_name(ship_and_file: str) -> t.Tuple[str, str]:
        "/<ship>/file.txt -> (<ship>, file.txt)"
        return t.cast(
            t.Tuple[str, str],
            tuple(x for x in ship_and_file.split("/") if x),
        )


@dataclass
class SubmitConditionImpl(SubmitCondition):
    submission_store: SubmittedLogsStore

    def __call__(self, log_entry: LogEntry) -> bool:
        return (
            self.submission_store.checksum(log_entry) 
            not in self.submission_store.load_submitted_logs()
        )


@dataclass
class NntpLogSubmitter(LogSubmitter):

    @dataclass
    class NntpLogFormat:
        subject: str
        body: str
        from_: str

    submission_store: SubmittedLogsStore
    read_log_entry: t.Callable[[LogEntry], NntpArticleBody]
    nntp_group: str
    nntp_server: str
    dry_run: bool = False

    def __call__(self, log: LogEntry) -> None:
        self.nntp_submit(log)
        self.submission_store.record_submission(log)

    def add_envelope(self, article: str, log: LogEntry) -> str:
        return f"""\
TIMESTAMP: {int(time.time())} SGT
AUTHOR: {log.author}
ORIGINATING SHIP: {log.ship.name}
QEC GATEWAY: QG-{random.randint(0, 31)}

{article}
"""

    def nntp_submit(self, log: LogEntry) -> None:
        import nntplib as nn
        s = nn.NNTP(self.nntp_server, readermode=True)

        article_body = self.read_log_entry(log)
        article_body = self.add_envelope(article_body, log)

        msg = f"""\
Newsgroups: {self.nntp_group}
Subject: [QEC] {log.title}
From: {log.author} "{log.author}@cosmic.voyage"

{article_body}
"""
        f = io.BytesIO(msg.encode("utf-8"))
        f.seek(0)
        _LOGGER.info(f"About to submit log:\n{msg}")
        if not self.dry_run:
            s.post(f)


@dataclass
class SubmissionThrottler:
    max_submission: int
    def __call__(self, logs: t.List[LogEntry]) -> t.List[LogEntry]:
        return logs[0:self.max_submission]


def main():

    logging.basicConfig()
    logging.root.setLevel(logging.INFO)

    config = Config.create()
    _LOGGER.info(f"Running with config: {config}")

    utils = Utils(config=config)

    iterate_logs = ListingFileLogIterator(
        listing_dir=config.listing_dir,
        listing_filename=config.listing_filename,
        utils=utils,
    )
    throttler = SubmissionThrottler(config.max_submission)
    submission_store = SubmittedLogsStore(store_dir=config.submission_store_dir)
    should_submit = SubmitConditionImpl(submission_store=submission_store)
    submit_log = NntpLogSubmitter(
        submission_store=submission_store,
        read_log_entry=utils.read_log_content,
        nntp_group=config.nntp_group,
        nntp_server=config.nntp_server,
        dry_run=True,  # TODO remove
    )

    logs_to_submit = [log for log in iterate_logs() if should_submit(log)]
    ### # FOR TEST: remove - randomly choose one log
    ### logs_to_submit = logs_to_submit[random.randint(0, len(logs_to_submit)-2):][0:]
    logs_to_submit = throttler(logs_to_submit)
    _LOGGER.info(f"Submitting {len(logs_to_submit)} logs...")
    for log in logs_to_submit: submit_log(log)


if __name__ == "__main__":
    main()

</pre>
    </div>
    </body>
  </head>
</html>