From 087a6339c8adf4df3c23d298fe1cbddcbdf23eef Mon Sep 17 00:00:00 2001 From: "A.M. Rowsell" Date: Sun, 20 Apr 2025 16:06:16 -0400 Subject: [PATCH] class refactor: Squashed commit of the following: commit 597f3837244cce6f501cf16f96e2cac3a81c8ab2 Author: A.M. Rowsell Date: Sun Apr 20 16:04:00 2025 -0400 fix: typo in log setup, error when replacing w/self commit 810cbd6f3d32ca0c7f74ec90f17c0011661d4785 Author: A.M. Rowsell Date: Sun Apr 20 13:51:41 2025 -0400 refactor: transformed into class-based app --- discorss.py | 392 +++++++++++++++++++++++++++------------------------- 1 file changed, 204 insertions(+), 188 deletions(-) diff --git a/discorss.py b/discorss.py index 76f243d..d90c7fb 100755 --- a/discorss.py +++ b/discorss.py @@ -22,204 +22,220 @@ import sys import argparse import re -config_dir = os.environ.get("XDG_CONFIG_HOME") -home_dir = Path.home() -if config_dir is None: - config_file_path = str(home_dir) + "/.config/discorss/discorss.conf" - config_dir = str(home_dir) + "/.config/discorss" -else: - config_file_path = config_dir + r"/discorss/discorss.conf" -log_dir = r"/var/log/discorss" -log_file_path = r"/app.log" -# Yes, I know you "can't parse HTML with regex", but -# just watch me. -html_filter = re.compile(r"\<\/?([A-Za-z0-9 \:\.\-\/\"\=])*\>") -success_codes = [200, 201, 202, 203, 204, 205, 206] -app_config = {} -# IDEA: Consider making this into a class-based program -# This would solve a couple issues around global variables and generally -# make things a bit neater +class Discorss: + def __init__(self): + self.config_dir = os.environ.get("XDG_CONFIG_HOME") + home_dir = Path.home() + if self.config_dir is None: + self.config_file_path = str(home_dir) + "/.config/discorss/discorss.conf" + self.config_dir = str(home_dir) + "/.config/discorss" + else: + self.config_file_path = self.config_dir + r"/discorss/discorss.conf" + self.log_dir = r"/var/log/discorss" + self.log_file_path = r"/app.log" + # Yes, I know you "can't parse HTML with regex", but + # just watch me. + self.html_filter = re.compile(r"\<\/?([A-Za-z0-9 \:\.\-\/\"\=])*\>") + self.success_codes = [200, 201, 202, 203, 204, 205, 206] + self.app_config = {} - -# This function gets and formats the brief excerpt that goes in the embed -# Different feeds put summaries in different fields, so we pick the best -# one and limit it to 250 characters. -def get_description(feed, length=250, min_length=150, addons=None): - try: - temporary_string = str(feed["summary_detail"]["value"]) - temporary_string = html_filter.sub("", temporary_string) - while length > min_length: - if temporary_string[length - 1 : length] == " ": - break - else: - length -= 1 - except KeyError: - temporary_string = str(feed["description"]) - temporary_string = html_filter.sub("", temporary_string) - while length > min_length: - if temporary_string[length - 1 : length] == " ": - break - else: - length -= 1 - - desc = temporary_string[:length] - if addons is not None: - desc = desc + str(addons) - return desc - - -def setupPaths(): - global app_config - global logger - # Check for log and config files/paths, create empty directories if needed - # TODO: make this cleaner - if not Path(log_dir).exists(): - print("No log file path exists. Yark! We'll try and make {}...".format(log_dir)) + # This function gets and formats the brief excerpt that goes in the embed + # Different feeds put summaries in different fields, so we pick the best + # one and limit it to 250 characters. + def get_description(self, feed, length=250, min_length=150, addons=None): try: - Path(log_dir).mkdir(parents=True, exist_ok=True) - except FileExistsError: - print("The path {} already exists and is not a directory!".format(log_dir)) - if not Path(config_file_path).exists(): - print( - "No config file at {}! Snarf. We'll try and make {}...".format( - config_file_path, config_dir - ) - ) - try: - Path(config_dir).mkdir(parents=True, exist_ok=True) - except FileExistsError: + temporary_string = str(feed["summary_detail"]["value"]) + temporary_string = self.html_filter.sub("", temporary_string) + while length > min_length: + if temporary_string[length - 1 : length] == " ": + break + else: + length -= 1 + except KeyError: + temporary_string = str(feed["description"]) + temporary_string = self.html_filter.sub("", temporary_string) + while length > min_length: + if temporary_string[length - 1 : length] == " ": + break + else: + length -= 1 + + desc = temporary_string[:length] + if addons is not None: + desc = desc + str(addons) + return desc + + def setupPaths(self): + # Check for log and config files/paths, create empty directories if needed + # TODO: make this cleaner + if not Path(self.log_dir).exists(): print( - "The config dir {} already exists and is not a directory! Please fix manually. Quitting!".format( - config_dir + "No log file path exists. Yark! We'll try and make {}...".format( + self.log_dir ) ) - sys.exit(255) + try: + Path(self.log_dir).mkdir(parents=True, exist_ok=True) + except FileExistsError: + print( + "The path {} already exists and is not a directory!".format( + self.log_dir + ) + ) + if not Path(self.config_file_path).exists(): + print( + "No config file at {}! Snarf. We'll try and make {}...".format( + self.config_file_path, self.config_dir + ) + ) + try: + Path(self.config_dir).mkdir(parents=True, exist_ok=True) + except FileExistsError: + print( + "The config dir {} already exists and is not a directory! Please fix manually. Quitting!".format( + self.config_dir + ) + ) + sys.exit(255) + return + # Loading the config file + with open(self.config_file_path, "r") as config_file: + self.app_config = json.load(config_file) + # Set up logging + self.logger = logging.getLogger(__name__) + logging.basicConfig( + filename=str(self.log_dir + self.log_file_path), + encoding="utf-8", + level=logging.DEBUG, + datefmt="%m/%d/%Y %H:%M:%S", + format="%(asctime)s: %(levelname)s: %(message)s", + ) return - # Loading the config file - with open(config_file_path, "r") as config_file: - app_config = json.load(config_file) - # Set up logging - logger = logging.getLogger(__name__) - logging.basicConfig( - filename=str(log_dir + log_file_path), - encoding="utf-8", - level=logging.DEBUG, - datefmt="%m/%d/%Y %H:%M:%S", - format="%(asctime)s: %(levelname)s: %(message)s", - ) - return + + def process(self): + os.environ["TZ"] = "America/Toronto" + time.tzset() + now = time.mktime(time.localtime()) + self.setupPaths() # Handle the config and log paths + try: + last_check = self.app_config["lastupdate"] + except KeyError: + last_check = ( + now - 21600 + ) # first run, no lastupdate, check up to 6 hours ago + for i, hook in enumerate(self.app_config["feeds"]): # Feed loop start + self.logger.debug("Parsing feed %s...", hook["name"]) + self.feeds = feedparser.parse(hook["url"]) + self.latest_post = [] + prev_best = 0 + self.logger.debug( + "About to sort through entries for feed %s ...", hook["name"] + ) + for feed in self.feeds["entries"]: + try: + bad_time = False + published_time = time.mktime(feed["published_parsed"]) + published_time = published_time + hook["offset"] + except KeyError: + published_time = time.mktime(feed["updated_parsed"]) + bad_time = True + if published_time > prev_best: + latest_post = feed + prev_best = published_time + else: + continue + if bad_time is True: + self.logger.debug( + "Feed %s doesn't supply a published time, using updated time instead", + hook["name"], + ) + # Hash the title and time of the latest post and use that to determine if it's been posted + # Yes, SHA3-512 is totally unnecessary for this purpose, but I love SHA3 + self.logger.debug("About to hash %s ...", latest_post["title"]) + try: + new_hash = hashlib.sha3_512( + bytes(latest_post["title"] + str(published_time), "utf-8") + ).hexdigest() + except TypeError: + self.logger.error("Title of %s isn't hashing correctly", hook["name"]) + continue + try: + if hook["lasthash"] != new_hash: + self.app_config["feeds"][i]["lasthash"] = new_hash + else: + continue + except KeyError: + self.app_config["feeds"][i]["lasthash"] = new_hash + self.logger.info( + "Feed %s has no existing hash, likely a new feed!", hook["name"] + ) + # Generate the webhook + self.logger.info( + "Publishing webhook for %s. Last check was %d, now is %d", + hook["name"], + last_check, + now, + ) + webhook = { + "embeds": [ + { + "title": str(latest_post["title"]), + "url": str(latest_post["link"]), + "color": 2123412, + "footer": { + "text": "DiscoRSS", + "icon_url": "https://frzn.dev/~amr/images/discorss.png", + }, + "author": { + "name": str(hook["name"]), + "url": str(hook["siteurl"]), + }, + "fields": [ + { + "name": "Excerpt from post:", + "value": self.get_description(latest_post), + } + ], + # "timestamp": str(now), + } + ], + "attachments": [], + } + custom_header = { + "user-agent": "DiscoRSS (https://git.frzn.dev/amr/discorss, 0.2rc3)", + "content-type": "application/json", + } + webhook_string = json.dumps(webhook) + + self.logger.debug("About to run POST for %s", hook["name"]) + r = requests.post( + hook["webhook"], data=webhook_string, headers=custom_header + ) + if r.status_code not in self.success_codes: + self.logger.error( + "Error %d while trying to post %s", r.status_code, hook["name"] + ) + else: + self.logger.debug("Got %d when posting %s", r.status_code, hook["name"]) + + # End of feed loop + + # Dump updated config back to json file + self.logger.debug("Dumping config back to %s", str(self.config_file_path)) + self.app_config["lastupdate"] = now + with open(self.config_file_path, "w") as config_file: + json.dump(self.app_config, config_file, indent=4) + + return + + +# end of Discorss class def main(): - os.environ["TZ"] = "America/Toronto" - time.tzset() - now = time.mktime(time.localtime()) - setupPaths() # Handle the config and log paths - try: - last_check = app_config["lastupdate"] - except KeyError: - last_check = now - 21600 # first run, no lastupdate, check up to 6 hours ago - for i, hook in enumerate(app_config["feeds"]): # Feed loop start - logger.debug("Parsing feed %s...", hook["name"]) - feeds = feedparser.parse(hook["url"]) - latest_post = [] - prev_best = 0 - logger.debug("About to sort through entries for feed %s ...", hook["name"]) - for feed in feeds["entries"]: - try: - bad_time = False - published_time = time.mktime(feed["published_parsed"]) - published_time = published_time + hook["offset"] - except KeyError: - published_time = time.mktime(feed["updated_parsed"]) - bad_time = True - if published_time > prev_best: - latest_post = feed - prev_best = published_time - else: - continue - if bad_time is True: - logger.debug( - "Feed %s doesn't supply a published time, using updated time instead", - hook["name"], - ) - # Hash the title and time of the latest post and use that to determine if it's been posted - # Yes, SHA3-512 is totally unnecessary for this purpose, but I love SHA3 - logger.debug("About to hash %s ...", latest_post["title"]) - try: - new_hash = hashlib.sha3_512( - bytes(latest_post["title"] + str(published_time), "utf-8") - ).hexdigest() - except TypeError: - logger.error("Title of %s isn't hashing correctly", hook["name"]) - continue - try: - if hook["lasthash"] != new_hash: - app_config["feeds"][i]["lasthash"] = new_hash - else: - continue - except KeyError: - app_config["feeds"][i]["lasthash"] = new_hash - logger.info( - "Feed %s has no existing hash, likely a new feed!", hook["name"] - ) - # Generate the webhook - logger.info( - "Publishing webhook for %s. Last check was %d, now is %d", - hook["name"], - last_check, - now, - ) - webhook = { - "embeds": [ - { - "title": str(latest_post["title"]), - "url": str(latest_post["link"]), - "color": 2123412, - "footer": { - "text": "DiscoRSS", - "icon_url": "https://frzn.dev/~amr/images/discorss.png", - }, - "author": { - "name": str(hook["name"]), - "url": str(hook["siteurl"]), - }, - "fields": [ - { - "name": "Excerpt from post:", - "value": get_description(latest_post), - } - ], - # "timestamp": str(now), - } - ], - "attachments": [], - } - custom_header = { - "user-agent": "DiscoRSS (https://git.frzn.dev/amr/discorss, 0.2rc3)", - "content-type": "application/json", - } - webhook_string = json.dumps(webhook) - - logger.debug("About to run POST for %s", hook["name"]) - r = requests.post(hook["webhook"], data=webhook_string, headers=custom_header) - if r.status_code not in success_codes: - logger.error( - "Error %d while trying to post %s", r.status_code, hook["name"] - ) - else: - logger.debug("Got %d when posting %s", r.status_code, hook["name"]) - - # End of feed loop - - # Dump updated config back to json file - logger.debug("Dumping config back to %s", str(config_file_path)) - app_config["lastupdate"] = now - with open(config_file_path, "w") as config_file: - json.dump(app_config, config_file, indent=4) - - return + app = Discorss() + app.process() if __name__ == "__main__":