#!/usr/bin/env python # -*- coding: UTF-8 -*- # SPDX-License-Identifier: MPL-2.0 # SPDX-FileCopyrightText: © 2025 A.M. Rowsell # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. # DiscoRSS: A simple RSS feed reader for Discord. Takes RSS feeds and then sends them to # webhooks. Intended to run using systemd timers. import requests import feedparser import hashlib import logging from pathlib import Path import json import time import os import sys import re config_dir = os.environ.get("XDG_CONFIG_HOME") home_dir = Path.home() if config_dir is None: config_file_path = str(home_dir) + "/.config/discorss/discorss.conf" config_dir = str(home_dir) + "/.config/discorss" else: config_file_path = config_dir + r"/discorss/discorss.conf" log_dir = r"/var/log/discorss" log_file_path = r"/app.log" # Yes, I know you "can't parse HTML with regex", but # just watch me. html_filter = re.compile(r"\<\/?([A-Za-z0-9 \:\.\-\/\"\=])*\>") success_codes = [200, 201, 202, 203, 204, 205, 206] app_config = {} # IDEA: Consider making this into a class-based program # This would solve a couple issues around global variables and generally # make things a bit neater # This function gets and formats the brief excerpt that goes in the embed # Different feeds put summaries in different fields, so we pick the best # one and limit it to 250 characters. # TODO: make the character limit smarter, as to split at a natural point def get_description(feed, length=250): try: temporary_string = str(feed["summary_detail"]["value"]) temporary_string = html_filter.sub("", temporary_string) while length > 150: if temporary_string[length - 1 : length] == " ": break else: length -= 1 desc = ( temporary_string[:length] if len(temporary_string) > length else temporary_string ) except KeyError: temporary_string = str(feed["description"]) temporary_string = html_filter.sub("", temporary_string) while length > 150: if temporary_string[length - 1 : length] == " ": break else: length -= 1 desc = ( temporary_string[:length] if len(temporary_string) > length else temporary_string ) return desc def setupPaths(): global app_config global logger # Check for log and config files/paths, create empty directories if needed # TODO: make this cleaner if not Path(log_dir).exists(): print("No log file path exists. Yark! We'll try and make {}...".format(log_dir)) try: Path(log_dir).mkdir(parents=True, exist_ok=True) except FileExistsError: print("The path {} already exists and is not a directory!".format(log_dir)) if not Path(config_file_path).exists(): print( "No config file at {}! Snarf. We'll try and make {}...".format( config_file_path, config_dir ) ) try: Path(config_dir).mkdir(parents=True, exist_ok=True) except FileExistsError: print( "The config dir {} already exists and is not a directory! Please fix manually.".format( config_dir ) ) sys.exit(255) return # Loading the config file with open(config_file_path, "r") as config_file: app_config = json.load(config_file) # Set up logging logger = logging.getLogger(__name__) logging.basicConfig( filename=str(log_dir + log_file_path), encoding="utf-8", level=logging.INFO, datefmt="%m/%d/%Y %H:%M:%S", format="%(asctime)s: %(levelname)s: %(message)s", ) return def main(): os.environ["TZ"] = "America/Toronto" time.tzset() now = time.mktime(time.localtime()) setupPaths() # Handle the config and log paths try: last_check = app_config["lastupdate"] except KeyError: last_check = now - 21600 # first run, no lastupdate, check up to 6 hours ago for i, hook in enumerate(app_config["feeds"]): # Feed loop start logger.debug("Parsing feed %s...", hook["name"]) feeds = feedparser.parse(hook["url"]) latest_post = [] prev_best = 0 for feed in feeds["entries"]: try: bad_time = False published_time = time.mktime(feed["published_parsed"]) published_time = published_time + hook["offset"] except KeyError: published_time = time.mktime(feed["updated_parsed"]) bad_time = True if published_time > prev_best: latest_post = feed prev_best = published_time else: continue if bad_time is True: logger.warning( "Feed %s doesn't supply a published time, using updated time instead", hook["name"], ) # Hash the title and time of the latest post and use that to determine if it's been posted new_hash = hashlib.sha3_512( bytes(latest_post["title"] + str(published_time), "utf-8") ).hexdigest() try: if hook["lasthash"] != new_hash: app_config["feeds"][i]["lasthash"] = new_hash else: continue except KeyError: app_config["feeds"][i]["lasthash"] = new_hash logger.info( "Feed %s has no existing hash, likely a new feed!", hook["name"] ) # Generate the webhook logger.info( "Publishing webhook for %s. Last check was %d, now is %d", hook["name"], last_check, now, ) webhook = { "embeds": [ { "title": str(latest_post["title"]), "url": str(latest_post["link"]), "color": 216128, "footer": { "name": "DiscoRSS", # "url": "https://git.frzn.dev/amr/discorss", }, "author": { "name": str(hook["name"]), "url": str(hook["siteurl"]), }, "fields": [ { "name": "Excerpt from post:", "value": get_description(latest_post), } ], } ], "attachments": [], } custom_header = { "user-agent": "DiscoRSS (https://git.frzn.dev/amr/discorss, 0.2rc3)", "content-type": "application/json", } webhook_string = json.dumps(webhook) r = requests.post(hook["webhook"], data=webhook_string, headers=custom_header) if r.status_code not in success_codes: logger.error( "Error %d while trying to post %s", r.status_code, hook["name"] ) else: logger.debug("Got %d when posting %s", r.status_code, hook["name"]) # End of feed loop # Dump updated config back to json file app_config["lastupdate"] = now with open(config_file_path, "w") as config_file: json.dump(app_config, config_file, indent=4) return if __name__ == "__main__": main()