#!/usr/bin/env python # -*- coding: UTF-8 -*- # SPDX-License-Identifier: MPL-2.0 # SPDX-FileCopyrightText: © 2025 A.M. Rowsell # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. # DiscoRSS: A simple RSS feed reader for Discord. Takes RSS feeds and then sends them to # webhooks. Intended to run using systemd timers. import requests import feedparser import hashlib import logging import asyncio from pathlib import Path import json import time import os import sys import argparse import re from types import SimpleNamespace class Discorss: FEED_TIMEOUT_SECONDS = 15 HASH_HISTORY_LIMIT = 10 APP_VERSION = "0.3rc1" def __init__(self, args=None): if args is None: args = SimpleNamespace( dry_run=False, config_file=None, log_file=None, ) self.DRY_RUN = args.dry_run self.config_dir = os.environ.get("XDG_CONFIG_HOME") home_dir = Path.home() if self.config_dir is None: default_config_file_path = str(home_dir) + "/.config/discorss/discorss.conf" else: default_config_file_path = self.config_dir + r"/discorss/discorss.conf" self.config_file_path = args.config_file or default_config_file_path self.config_dir = str(Path(self.config_file_path).parent) default_log_file_path = "/var/log/discorss/app.log" self.log_file_path = args.log_file or default_log_file_path self.log_dir = str(Path(self.log_file_path).parent) # Yes, I know you "can't parse HTML with regex", but # just watch me. self.html_filter = re.compile(r"\<\/?([A-Za-z0-9 \:\.\-\/\"\=])*\>") self.success_codes = [200, 201, 202, 203, 204, 205, 206] self.app_config = {} async def _fetch_feed(self, hook): response = await asyncio.to_thread( requests.get, hook["url"], headers={ "user-agent": "DiscoRSS (https://git.frzn.dev/amr/discorss, {})".format( self.APP_VERSION ) }, timeout=self.FEED_TIMEOUT_SECONDS, ) response.raise_for_status() return await asyncio.to_thread(feedparser.parse, response.content) async def _post_webhook(self, hook, webhook_string, custom_header): return await asyncio.to_thread( requests.post, hook["webhook"], data=webhook_string, headers=custom_header, timeout=self.FEED_TIMEOUT_SECONDS, ) def _get_hash_history(self, hook): # now we store a list of hashes 10 long # this function checks if it's the old format and updates it if needed existing_hashes = hook.get("lasthash", []) if isinstance(existing_hashes, str): return [existing_hashes] if isinstance(existing_hashes, list): return [ saved_hash for saved_hash in existing_hashes if isinstance(saved_hash, str) ] return [] async def _process_feed(self, hook, last_check): self.logger.debug("Parsing feed %s...", hook["name"]) feeds = await self._fetch_feed(hook) latest_post = None prev_best = 0 bad_time = False self.logger.debug("About to sort through entries for feed %s ...", hook["name"]) for feed in feeds["entries"]: try: published_time = time.mktime(feed["published_parsed"]) published_time = published_time + hook["offset"] except KeyError: published_time = time.mktime(feed["updated_parsed"]) bad_time = True if published_time > prev_best: latest_post = feed prev_best = published_time if latest_post is None: self.logger.warning("Feed %s had no entries to process", hook["name"]) return None if bad_time is True: self.logger.debug( "Feed %s doesn't supply a published time, using updated time instead", hook["name"], ) # Hash the url of the latest post and use that to determine if it's been posted # Yes, SHA3-512 is totally unnecessary for this purpose, but I love SHA3 self.logger.debug("About to hash %s ...", latest_post["link"]) try: new_hash = hashlib.sha3_512( bytes(latest_post["link"], "utf-8") # Removed time from hash ).hexdigest() except TypeError: self.logger.error("URL %s isn't hashing correctly", hook["link"]) return None if new_hash in self._get_hash_history(hook): return None # Generate the webhook self.logger.info( "Publishing webhook for %s. Last check was %d, self.now is %d", hook["name"], last_check, self.now, ) webhook = { "embeds": [ { "title": str(latest_post["title"]), "url": str(latest_post["link"]), "color": 2123412, "footer": { "text": "DiscoRSS", "icon_url": "https://frzn.dev/~amr/images/discorss.png", }, "author": { "name": str(hook["name"]), "url": str(hook["siteurl"]), }, "fields": [ { "name": "Excerpt from post:", "value": self.get_description(latest_post), } ], # "timestamp": str(self.now), } ], "attachments": [], } custom_header = { "user-agent": "DiscoRSS (https://git.frzn.dev/amr/discorss, 0.2)", "content-type": "application/json", } webhook_string = json.dumps(webhook) self.logger.debug("About to run POST for %s", hook["name"]) if not self.DRY_RUN: response = await self._post_webhook(hook, webhook_string, custom_header) else: self.logger.info( "Dry run, not actually posting to webhook, faking return code 200" ) response = SimpleNamespace(status_code=200) if response.status_code not in self.success_codes: self.logger.error( "Error %d while trying to post %s", response.status_code, hook["name"] ) return None self.logger.debug("Got %d when posting %s", response.status_code, hook["name"]) return new_hash async def _process_feeds(self, last_check): tasks = [ asyncio.create_task( asyncio.wait_for( self._process_feed(hook, last_check), timeout=self.FEED_TIMEOUT_SECONDS, ) ) for hook in self.app_config["feeds"] ] results = await asyncio.gather(*tasks, return_exceptions=True) for i, result in enumerate(results): hook = self.app_config["feeds"][i] if isinstance(result, asyncio.TimeoutError): self.logger.critical( "Timed out processing feed %s after %d seconds", hook["name"], self.FEED_TIMEOUT_SECONDS, ) continue if isinstance(result, requests.RequestException): self.logger.critical( "Network error while processing feed %s: %s", hook["name"], result, ) continue if isinstance(result, Exception): self.logger.error( "Unhandled error while processing feed %s: %s", hook["name"], result, ) continue if result is None: continue if "lasthash" not in hook: self.logger.debug( "Feed %s has no existing hash, likely a new feed!", hook["name"] ) hash_history = self._get_hash_history(hook) hash_history.append(result) if len(hash_history) > self.HASH_HISTORY_LIMIT: hash_history = hash_history[-self.HASH_HISTORY_LIMIT :] self.app_config["feeds"][i]["lasthash"] = hash_history # This function gets and formats the brief excerpt that goes in the embed # Different feeds put summaries in different fields, so we pick the best # one and limit it to 250 characters. def get_description(self, feed, length=250, min_length=150, addons=None): try: temporary_string = str(feed["summary_detail"]["value"]) temporary_string = self.html_filter.sub("", temporary_string) while length > min_length: if temporary_string[length - 1 : length] == " ": break else: length -= 1 except KeyError: temporary_string = str(feed["description"]) temporary_string = self.html_filter.sub("", temporary_string) while length > min_length: if temporary_string[length - 1 : length] == " ": break else: length -= 1 desc = temporary_string[:length] if addons is not None: desc = desc + str(addons) return desc # Some of this could go in __init__ def setup(self): os.environ["TZ"] = "America/Toronto" time.tzset() self.now = time.mktime(time.localtime()) # Set up logging self.logger = logging.getLogger(__name__) logging.basicConfig( filename=self.log_file_path, encoding="utf-8", level=logging.WARNING, datefmt="%m/%d/%Y %H:%M:%S", format="%(asctime)s [%(threadName)s] -> %(levelname)s: %(message)s", ) # Check for log and config files/paths, create empty directories if needed # TODO: change output to log file, as warning/error if not Path(self.log_dir).exists(): self.logger.warning( "No log file path exists. Yark! We'll try and make %s...", self.log_dir ) try: Path(self.log_dir).mkdir(parents=True, exist_ok=True) except FileExistsError: self.logger.critical( "The path {} already exists and is not a directory!".format( self.log_dir ) ) if not Path(self.config_file_path).exists(): self.logger.warning( "No config file at {}! Snarf. We'll try and make {}...".format( self.config_file_path, self.config_dir ) ) try: Path(self.config_dir).mkdir(parents=True, exist_ok=True) except FileExistsError: self.warning.critical( "The config dir {} already exists and is not a directory! Please fix manually. Quitting!".format( self.config_dir ) ) sys.exit(255) return # Loading the config file with open(self.config_file_path, "r") as config_file: self.app_config = json.load(config_file) return def process(self): self.setup() # Handle the config and log paths self.logger.info("Starting DiscoRSS version {}...".format(self.APP_VERSION)) try: last_check = self.app_config["lastupdate"] except KeyError: last_check = ( self.now - 21600 ) # first run, no lastupdate, check up to 6 hours ago asyncio.run(self._process_feeds(last_check)) # Dump updated config back to json file self.logger.debug("Dumping config back to %s", str(self.config_file_path)) self.app_config["lastupdate"] = self.now with open(self.config_file_path, "w") as config_file: json.dump(self.app_config, config_file, indent=4) return # end of Discorss class def main(): parser = argparse.ArgumentParser( description="DiscoRSS: publish feed updates to Discord webhooks." ) parser.add_argument( "-d", "--dry-run", action="store_true", help="Parse feeds and update state without posting to Discord.", ) parser.add_argument( "-c", "--config-file", default=None, help="Alternate config file path. Defaults to the existing config location.", ) parser.add_argument( "-l", "--log-file", default=None, help="Alternate log file path. Defaults to the existing log location.", ) args = parser.parse_args() app = Discorss(args) app.process() if __name__ == "__main__": main()