Also switching logging back to ERROR from DEBUG. The solution to the lockups for now is to just use systemd timer timeouts.
242 lines
9.1 KiB
Python
Executable file
242 lines
9.1 KiB
Python
Executable file
#!/usr/bin/env python
|
|
# -*- coding: UTF-8 -*-
|
|
# SPDX-License-Identifier: MPL-2.0
|
|
# SPDX-FileCopyrightText: © 2025 A.M. Rowsell <https://frzn.dev/~amr>
|
|
|
|
# This Source Code Form is subject to the terms of the Mozilla Public
|
|
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
|
# DiscoRSS: A simple RSS feed reader for Discord. Takes RSS feeds and then sends them to
|
|
# webhooks. Intended to run using systemd timers.
|
|
|
|
import requests
|
|
import feedparser
|
|
import hashlib
|
|
import logging
|
|
from pathlib import Path
|
|
import json
|
|
import time
|
|
import os
|
|
import sys
|
|
import argparse
|
|
import re
|
|
|
|
|
|
class Discorss:
|
|
def __init__(self):
|
|
self.config_dir = os.environ.get("XDG_CONFIG_HOME")
|
|
home_dir = Path.home()
|
|
if self.config_dir is None:
|
|
self.config_file_path = str(home_dir) + "/.config/discorss/discorss.conf"
|
|
self.config_dir = str(home_dir) + "/.config/discorss"
|
|
else:
|
|
self.config_file_path = self.config_dir + r"/discorss/discorss.conf"
|
|
self.log_dir = r"/var/log/discorss"
|
|
self.log_file_path = r"/app.log"
|
|
# Yes, I know you "can't parse HTML with regex", but
|
|
# just watch me.
|
|
self.html_filter = re.compile(r"\<\/?([A-Za-z0-9 \:\.\-\/\"\=])*\>")
|
|
self.success_codes = [200, 201, 202, 203, 204, 205, 206]
|
|
self.app_config = {}
|
|
|
|
# This function gets and formats the brief excerpt that goes in the embed
|
|
# Different feeds put summaries in different fields, so we pick the best
|
|
# one and limit it to 250 characters.
|
|
def get_description(self, feed, length=250, min_length=150, addons=None):
|
|
try:
|
|
temporary_string = str(feed["summary_detail"]["value"])
|
|
temporary_string = self.html_filter.sub("", temporary_string)
|
|
while length > min_length:
|
|
if temporary_string[length - 1 : length] == " ":
|
|
break
|
|
else:
|
|
length -= 1
|
|
except KeyError:
|
|
temporary_string = str(feed["description"])
|
|
temporary_string = self.html_filter.sub("", temporary_string)
|
|
while length > min_length:
|
|
if temporary_string[length - 1 : length] == " ":
|
|
break
|
|
else:
|
|
length -= 1
|
|
|
|
desc = temporary_string[:length]
|
|
if addons is not None:
|
|
desc = desc + str(addons)
|
|
return desc
|
|
|
|
def setup(self):
|
|
os.environ["TZ"] = "America/Toronto"
|
|
time.tzset()
|
|
self.now = time.mktime(time.localtime())
|
|
# Check for log and config files/paths, create empty directories if needed
|
|
# TODO: make this cleaner
|
|
if not Path(self.log_dir).exists():
|
|
print(
|
|
"No log file path exists. Yark! We'll try and make {}...".format(
|
|
self.log_dir
|
|
)
|
|
)
|
|
try:
|
|
Path(self.log_dir).mkdir(parents=True, exist_ok=True)
|
|
except FileExistsError:
|
|
print(
|
|
"The path {} already exists and is not a directory!".format(
|
|
self.log_dir
|
|
)
|
|
)
|
|
if not Path(self.config_file_path).exists():
|
|
print(
|
|
"No config file at {}! Snarf. We'll try and make {}...".format(
|
|
self.config_file_path, self.config_dir
|
|
)
|
|
)
|
|
try:
|
|
Path(self.config_dir).mkdir(parents=True, exist_ok=True)
|
|
except FileExistsError:
|
|
print(
|
|
"The config dir {} already exists and is not a directory! Please fix manually. Quitting!".format(
|
|
self.config_dir
|
|
)
|
|
)
|
|
sys.exit(255)
|
|
return
|
|
# Loading the config file
|
|
with open(self.config_file_path, "r") as config_file:
|
|
self.app_config = json.load(config_file)
|
|
# Set up logging
|
|
self.logger = logging.getLogger(__name__)
|
|
logging.basicConfig(
|
|
filename=str(self.log_dir + self.log_file_path),
|
|
encoding="utf-8",
|
|
level=logging.ERROR,
|
|
datefmt="%m/%d/%Y %H:%M:%S",
|
|
format="%(asctime)s: %(levelname)s: %(message)s",
|
|
)
|
|
return
|
|
|
|
def process(self):
|
|
self.setup() # Handle the config and log paths
|
|
try:
|
|
last_check = self.app_config["lastupdate"]
|
|
except KeyError:
|
|
last_check = (
|
|
self.now - 21600
|
|
) # first run, no lastupdate, check up to 6 hours ago
|
|
for i, hook in enumerate(self.app_config["feeds"]): # Feed loop start
|
|
self.logger.debug("Parsing feed %s...", hook["name"])
|
|
self.feeds = feedparser.parse(hook["url"])
|
|
self.latest_post = []
|
|
prev_best = 0
|
|
self.logger.debug(
|
|
"About to sort through entries for feed %s ...", hook["name"]
|
|
)
|
|
for feed in self.feeds["entries"]:
|
|
try:
|
|
bad_time = False
|
|
published_time = time.mktime(feed["published_parsed"])
|
|
published_time = published_time + hook["offset"]
|
|
except KeyError:
|
|
published_time = time.mktime(feed["updated_parsed"])
|
|
bad_time = True
|
|
if published_time > prev_best:
|
|
latest_post = feed
|
|
prev_best = published_time
|
|
else:
|
|
continue
|
|
if bad_time is True:
|
|
self.logger.debug(
|
|
"Feed %s doesn't supply a published time, using updated time instead",
|
|
hook["name"],
|
|
)
|
|
# Hash the title and time of the latest post and use that to determine if it's been posted
|
|
# Yes, SHA3-512 is totally unnecessary for this purpose, but I love SHA3
|
|
self.logger.debug("About to hash %s ...", latest_post["title"])
|
|
try:
|
|
new_hash = hashlib.sha3_512(
|
|
bytes(latest_post["title"] + str(published_time), "utf-8")
|
|
).hexdigest()
|
|
except TypeError:
|
|
self.logger.error("Title of %s isn't hashing correctly", hook["name"])
|
|
continue
|
|
try:
|
|
if hook["lasthash"] != new_hash:
|
|
self.app_config["feeds"][i]["lasthash"] = new_hash
|
|
else:
|
|
continue
|
|
except KeyError:
|
|
self.app_config["feeds"][i]["lasthash"] = new_hash
|
|
self.logger.info(
|
|
"Feed %s has no existing hash, likely a new feed!", hook["name"]
|
|
)
|
|
# Generate the webhook
|
|
self.logger.info(
|
|
"Publishing webhook for %s. Last check was %d, self.now is %d",
|
|
hook["name"],
|
|
last_check,
|
|
self.now,
|
|
)
|
|
webhook = {
|
|
"embeds": [
|
|
{
|
|
"title": str(latest_post["title"]),
|
|
"url": str(latest_post["link"]),
|
|
"color": 2123412,
|
|
"footer": {
|
|
"text": "DiscoRSS",
|
|
"icon_url": "https://frzn.dev/~amr/images/discorss.png",
|
|
},
|
|
"author": {
|
|
"name": str(hook["name"]),
|
|
"url": str(hook["siteurl"]),
|
|
},
|
|
"fields": [
|
|
{
|
|
"name": "Excerpt from post:",
|
|
"value": self.get_description(latest_post),
|
|
}
|
|
],
|
|
# "timestamp": str(self.now),
|
|
}
|
|
],
|
|
"attachments": [],
|
|
}
|
|
custom_header = {
|
|
"user-agent": "DiscoRSS (https://git.frzn.dev/amr/discorss, 0.2)",
|
|
"content-type": "application/json",
|
|
}
|
|
webhook_string = json.dumps(webhook)
|
|
|
|
self.logger.debug("About to run POST for %s", hook["name"])
|
|
r = requests.post(
|
|
hook["webhook"], data=webhook_string, headers=custom_header
|
|
)
|
|
if r.status_code not in self.success_codes:
|
|
self.logger.error(
|
|
"Error %d while trying to post %s", r.status_code, hook["name"]
|
|
)
|
|
else:
|
|
self.logger.debug("Got %d when posting %s", r.status_code, hook["name"])
|
|
|
|
# End of feed loop
|
|
|
|
# Dump updated config back to json file
|
|
self.logger.debug("Dumping config back to %s", str(self.config_file_path))
|
|
self.app_config["lastupdate"] = self.now
|
|
with open(self.config_file_path, "w") as config_file:
|
|
json.dump(self.app_config, config_file, indent=4)
|
|
|
|
return
|
|
|
|
|
|
# end of Discorss class
|
|
|
|
|
|
def main():
|
|
app = Discorss()
|
|
app.process()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|