discorss/discorss.py

304 lines
11 KiB
Python
Executable file

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# SPDX-License-Identifier: MPL-2.0
# SPDX-FileCopyrightText: © 2025 A.M. Rowsell <https://frzn.dev/~amr>
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
# DiscoRSS: A simple RSS feed reader for Discord. Takes RSS feeds and then sends them to
# webhooks. Intended to run using systemd timers.
import requests
import feedparser
import hashlib
import logging
import asyncio
from pathlib import Path
import json
import time
import os
import sys
import argparse
import re
class Discorss:
FEED_TIMEOUT_SECONDS = 15
def __init__(self):
self.config_dir = os.environ.get("XDG_CONFIG_HOME")
home_dir = Path.home()
if self.config_dir is None:
self.config_file_path = str(home_dir) + "/.config/discorss/discorss.conf"
self.config_dir = str(home_dir) + "/.config/discorss"
else:
self.config_file_path = self.config_dir + r"/discorss/discorss.conf"
self.log_dir = r"/var/log/discorss"
self.log_file_path = r"/app.log"
# Yes, I know you "can't parse HTML with regex", but
# just watch me.
self.html_filter = re.compile(r"\<\/?([A-Za-z0-9 \:\.\-\/\"\=])*\>")
self.success_codes = [200, 201, 202, 203, 204, 205, 206]
self.app_config = {}
async def _fetch_feed(self, hook):
response = await asyncio.to_thread(
requests.get,
hook["url"],
headers={"user-agent": "DiscoRSS (https://git.frzn.dev/amr/discorss, 0.2)"},
timeout=self.FEED_TIMEOUT_SECONDS,
)
response.raise_for_status()
return await asyncio.to_thread(feedparser.parse, response.content)
async def _post_webhook(self, hook, webhook_string, custom_header):
return await asyncio.to_thread(
requests.post,
hook["webhook"],
data=webhook_string,
headers=custom_header,
timeout=self.FEED_TIMEOUT_SECONDS,
)
async def _process_feed(self, hook, last_check):
self.logger.debug("Parsing feed %s...", hook["name"])
feeds = await self._fetch_feed(hook)
latest_post = None
latest_post_time = None
prev_best = 0
bad_time = False
self.logger.debug("About to sort through entries for feed %s ...", hook["name"])
for feed in feeds["entries"]:
try:
published_time = time.mktime(feed["published_parsed"])
published_time = published_time + hook["offset"]
except KeyError:
published_time = time.mktime(feed["updated_parsed"])
bad_time = True
if published_time > prev_best:
latest_post = feed
latest_post_time = published_time
prev_best = published_time
if latest_post is None:
self.logger.warning("Feed %s had no entries to process", hook["name"])
return None
if bad_time is True:
self.logger.debug(
"Feed %s doesn't supply a published time, using updated time instead",
hook["name"],
)
# Hash the title and time of the latest post and use that to determine if it's been posted
# Yes, SHA3-512 is totally unnecessary for this purpose, but I love SHA3
self.logger.debug("About to hash %s ...", latest_post["title"])
try:
new_hash = hashlib.sha3_512(
bytes(latest_post["title"] + str(latest_post_time), "utf-8")
).hexdigest()
except TypeError:
self.logger.error("Title of %s isn't hashing correctly", hook["name"])
return None
if hook.get("lasthash") == new_hash:
return None
# Generate the webhook
self.logger.info(
"Publishing webhook for %s. Last check was %d, self.now is %d",
hook["name"],
last_check,
self.now,
)
webhook = {
"embeds": [
{
"title": str(latest_post["title"]),
"url": str(latest_post["link"]),
"color": 2123412,
"footer": {
"text": "DiscoRSS",
"icon_url": "https://frzn.dev/~amr/images/discorss.png",
},
"author": {
"name": str(hook["name"]),
"url": str(hook["siteurl"]),
},
"fields": [
{
"name": "Excerpt from post:",
"value": self.get_description(latest_post),
}
],
# "timestamp": str(self.now),
}
],
"attachments": [],
}
custom_header = {
"user-agent": "DiscoRSS (https://git.frzn.dev/amr/discorss, 0.2)",
"content-type": "application/json",
}
webhook_string = json.dumps(webhook)
self.logger.debug("About to run POST for %s", hook["name"])
response = await self._post_webhook(hook, webhook_string, custom_header)
if response.status_code not in self.success_codes:
self.logger.error(
"Error %d while trying to post %s", response.status_code, hook["name"]
)
return None
self.logger.debug("Got %d when posting %s", response.status_code, hook["name"])
return new_hash
async def _process_feeds(self, last_check):
tasks = [
asyncio.create_task(
asyncio.wait_for(
self._process_feed(hook, last_check),
timeout=self.FEED_TIMEOUT_SECONDS,
)
)
for hook in self.app_config["feeds"]
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
hook = self.app_config["feeds"][i]
if isinstance(result, asyncio.TimeoutError):
self.logger.error(
"Timed out processing feed %s after %d seconds",
hook["name"],
self.FEED_TIMEOUT_SECONDS,
)
continue
if isinstance(result, requests.RequestException):
self.logger.error(
"Network error while processing feed %s: %s",
hook["name"],
result,
)
continue
if isinstance(result, Exception):
self.logger.error(
"Unhandled error while processing feed %s: %s",
hook["name"],
result,
)
continue
if result is None:
continue
if "lasthash" not in hook:
self.logger.info(
"Feed %s has no existing hash, likely a new feed!", hook["name"]
)
self.app_config["feeds"][i]["lasthash"] = result
# This function gets and formats the brief excerpt that goes in the embed
# Different feeds put summaries in different fields, so we pick the best
# one and limit it to 250 characters.
def get_description(self, feed, length=250, min_length=150, addons=None):
try:
temporary_string = str(feed["summary_detail"]["value"])
temporary_string = self.html_filter.sub("", temporary_string)
while length > min_length:
if temporary_string[length - 1 : length] == " ":
break
else:
length -= 1
except KeyError:
temporary_string = str(feed["description"])
temporary_string = self.html_filter.sub("", temporary_string)
while length > min_length:
if temporary_string[length - 1 : length] == " ":
break
else:
length -= 1
desc = temporary_string[:length]
if addons is not None:
desc = desc + str(addons)
return desc
# Some of this could go in __init__
def setup(self):
os.environ["TZ"] = "America/Toronto"
time.tzset()
self.now = time.mktime(time.localtime())
# Check for log and config files/paths, create empty directories if needed
# TODO: change output to log file, as warning/error
if not Path(self.log_dir).exists():
print(
"No log file path exists. Yark! We'll try and make {}...".format(
self.log_dir
)
)
try:
Path(self.log_dir).mkdir(parents=True, exist_ok=True)
except FileExistsError:
print(
"The path {} already exists and is not a directory!".format(
self.log_dir
)
)
if not Path(self.config_file_path).exists():
print(
"No config file at {}! Snarf. We'll try and make {}...".format(
self.config_file_path, self.config_dir
)
)
try:
Path(self.config_dir).mkdir(parents=True, exist_ok=True)
except FileExistsError:
print(
"The config dir {} already exists and is not a directory! Please fix manually. Quitting!".format(
self.config_dir
)
)
sys.exit(255)
return
# Loading the config file
with open(self.config_file_path, "r") as config_file:
self.app_config = json.load(config_file)
# Set up logging
self.logger = logging.getLogger(__name__)
logging.basicConfig(
filename=str(self.log_dir + self.log_file_path),
encoding="utf-8",
level=logging.ERROR,
datefmt="%m/%d/%Y %H:%M:%S",
format="%(asctime)s: %(levelname)s: %(message)s",
)
return
def process(self):
self.setup() # Handle the config and log paths
try:
last_check = self.app_config["lastupdate"]
except KeyError:
last_check = (
self.now - 21600
) # first run, no lastupdate, check up to 6 hours ago
asyncio.run(self._process_feeds(last_check))
# Dump updated config back to json file
self.logger.debug("Dumping config back to %s", str(self.config_file_path))
self.app_config["lastupdate"] = self.now
with open(self.config_file_path, "w") as config_file:
json.dump(self.app_config, config_file, indent=4)
return
# end of Discorss class
def main():
app = Discorss()
app.process()
if __name__ == "__main__":
main()