discorss/discorss.py

199 lines
6.8 KiB
Python
Executable file

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# SPDX-License-Identifier: MPL-2.0
# SPDX-FileCopyrightText: © 2025 A.M. Rowsell <https://frzn.dev/~amr>
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
# DiscoRSS: A simple RSS feed reader for Discord. Takes RSS feeds and then sends them to
# webhooks. Intended to run using systemd timers.
import requests
import feedparser
import hashlib
import logging
from pathlib import Path
import json
import time
import os
import sys
import re
config_dir = os.environ.get("XDG_CONFIG_HOME")
home_dir = Path.home()
if config_dir is None:
config_file_path = str(home_dir) + "/.config/discorss/discorss.conf"
config_dir = str(home_dir) + "/.config/discorss"
else:
config_file_path = config_dir + r"/discorss/discorss.conf"
log_dir = r"/var/log/discorss"
log_file_path = r"/app.log"
# Yes, I know you "can't parse HTML with regex", but
# just watch me.
html_filter = re.compile(r"\<\/?([A-Za-z \:\.\/\"\=])*\>")
success_codes = ["200", "201", "202", "203", "204", "205", "206"]
app_config = {}
# This function gets and formats the brief excerpt that goes in the embed
# Different feeds put summaries in different fields, so we pick the best
# one and limit it to 250 characters.
# TODO: make the character limit smarter, as to split at a natural point
def get_description(feed):
try:
temporary_string = str(feed["summary_detail"]["value"])
temporary_string = html_filter.sub("", temporary_string)
desc = (
temporary_string[:250] if len(temporary_string) > 250 else temporary_string
)
except KeyError:
temporary_string = str(feed["description"])
temporary_string = html_filter.sub("", temporary_string)
desc = (
temporary_string[:250] if len(temporary_string) > 250 else temporary_string
)
return desc
def setupPaths():
global app_config
global logger
# Check for log and config files/paths, create empty directories if needed
# TODO: make this cleaner
if not Path(log_dir).exists():
print("No log file path exists. Yark! We'll try and make {}...".format(log_dir))
try:
Path(log_dir).mkdir(parents=True, exist_ok=True)
except FileExistsError:
print("The path {} already exists and is not a directory!".format(log_dir))
if not Path(config_file_path).exists():
print(
"No config file at {}! Snarf. We'll try and make {}...".format(
config_file_path, config_dir
)
)
try:
Path(config_dir).mkdir(parents=True, exist_ok=True)
except FileExistsError:
print(
"The config dir {} already exists and is not a directory! Please fix manually.".format(
config_dir
)
)
sys.exit(255)
return
# Loading the config file
with open(config_file_path, "r") as config_file:
app_config = json.load(config_file)
# Set up logging
logger = logging.getLogger(__name__)
logging.basicConfig(
filename=str(log_dir + log_file_path),
encoding="utf-8",
level=logging.INFO,
datefmt="%m/%d/%Y %H:%M:%S",
format="%(asctime)s: %(levelname)s: %(message)s",
)
return
def main():
os.environ["TZ"] = "America/Toronto"
time.tzset()
now = time.mktime(time.localtime())
setupPaths() # Handle the config and log paths
try:
last_check = app_config["lastupdate"]
except KeyError:
last_check = now - 21600 # first run, no lastupdate, check up to 6 hours ago
for i, hook in enumerate(app_config["feeds"]):
# Get the feed
logger.info("Parsing feed %s...", hook["name"])
feeds = feedparser.parse(hook["url"])
latest_post = []
prev_best = 0
for feed in feeds["entries"]:
try:
bad_time = False
published_time = time.mktime(feed["published_parsed"])
published_time = published_time + hook["offset"]
except KeyError:
published_time = time.mktime(feed["updated_parsed"])
bad_time = True
if published_time > prev_best:
latest_post = feed
prev_best = published_time
else:
continue
if bad_time is True:
logger.warning(
"Feed %s doesn't supply a published time, using updated time instead",
hook["name"],
)
# Hash the title of the latest post and use that to determine if it's been posted
new_hash = hashlib.sha3_512(bytes(latest_post["title"], "utf-8")).hexdigest()
try:
if hook["lasthash"] != new_hash:
app_config["feeds"][i]["lasthash"] = new_hash
else:
continue
except KeyError:
app_config["feeds"][i]["lasthash"] = new_hash
logger.info(
"Feed %s has no existing hash, likely a new feed!", hook["name"]
)
# Generate the webhook
logger.info(
"Publishing webhook for %s. Last check was %d, now is %d",
hook["name"],
last_check,
now,
)
webhook = {
"embeds": [
{
"title": str(latest_post["title"]),
"url": str(latest_post["link"]),
"color": 216128,
"footer": {
"name": "DiscoRSS",
# "url": "https://git.frzn.dev/amr/discorss",
},
"author": {
"name": str(hook["name"]),
"url": str(hook["siteurl"]),
},
"fields": [
{
"name": "Excerpt from post:",
"value": get_description(latest_post),
}
],
}
],
"attachments": [],
}
custom_header = {
"user-agent": "DiscoRSS (https://git.frzn.dev/amr/discorss, 0.2rc2)",
"content-type": "application/json",
}
webhook_string = json.dumps(webhook)
r = requests.post(hook["webhook"], data=webhook_string, headers=custom_header)
if r.status_code not in success_codes:
logger.error(
"Error %d while trying to post %s", r.status_code, hook["webhook"]
)
# End of feed loop
app_config["lastupdate"] = now
with open(config_file_path, "w") as config_file:
json.dump(app_config, config_file, indent=4)
return
if __name__ == "__main__":
main()