This is needed in case the feed is not in reverse chronological order, like most feeds. This needs testing still.
178 lines
6.2 KiB
Python
Executable file
178 lines
6.2 KiB
Python
Executable file
#!/usr/bin/env python
|
|
# -*- coding: UTF-8 -*-
|
|
# SPDX-License-Identifier: MPL-2.0
|
|
# SPDX-FileCopyrightText: © 2025 A.M. Rowsell <https://frzn.dev/~amr>
|
|
|
|
# This Source Code Form is subject to the terms of the Mozilla Public
|
|
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
|
# DiscoRSS: A simple RSS feed reader for Discord. Takes RSS feeds and then sends them to
|
|
# webhooks. Intended to run using systemd timers.
|
|
|
|
import requests
|
|
import feedparser
|
|
import hashlib
|
|
from pathlib import Path
|
|
import json
|
|
import time
|
|
import os
|
|
import sys
|
|
import re
|
|
|
|
config_dir = os.environ.get("XDG_CONFIG_HOME")
|
|
home_dir = Path.home()
|
|
if config_dir is None:
|
|
config_file_path = str(home_dir) + "/.config/discorss/discorss.conf"
|
|
config_dir = str(home_dir) + "/.config/discorss"
|
|
else:
|
|
config_file_path = config_dir + r"/discorss/discorss.conf"
|
|
log_dir = r"/var/log/discorss"
|
|
log_file_path = r"/app.log"
|
|
# Yes, I know you "can't parse HTML with regex", but
|
|
# just watch me.
|
|
html_filter = re.compile(r"\<\/?([A-Za-z \:\.\/\"\=])*\>")
|
|
success_codes = ["200", "201", "202", "203", "204", "205", "206"]
|
|
app_config = {}
|
|
|
|
|
|
# This function gets and formats the brief excerpt that goes in the embed
|
|
# Different feeds put summaries in different fields, so we pick the best
|
|
# one and limit it to 250 characters.
|
|
# TODO: make the character limit smarter, as to split at a natural point
|
|
def get_description(feed):
|
|
try:
|
|
temporary_string = str(feed.entries[0]["summary_detail"]["value"])
|
|
temporary_string = html_filter.sub("", temporary_string)
|
|
desc = (
|
|
temporary_string[:250] if len(temporary_string) > 250 else temporary_string
|
|
)
|
|
except KeyError:
|
|
temporary_string = str(feed.entries[0]["description"])
|
|
temporary_string = html_filter.sub("", temporary_string)
|
|
desc = (
|
|
temporary_string[:250] if len(temporary_string) > 250 else temporary_string
|
|
)
|
|
return desc
|
|
|
|
|
|
def setupPaths():
|
|
global app_config
|
|
# Check for log and config files/paths, create empty directories if needed
|
|
# TODO: make this cleaner
|
|
if not Path(log_file_path).exists():
|
|
print("No log file path exists. Yark! We'll try and make {}...", log_dir)
|
|
try:
|
|
Path(log_dir).mkdir(parents=True, exist_ok=True)
|
|
except FileExistsError:
|
|
print("The path {} already exists and is not a directory!".format(log_dir))
|
|
if not Path(config_file_path).exists():
|
|
print(
|
|
"No config file at {}! Snarf. We'll try and make {}...".format(
|
|
config_file_path, config_dir
|
|
)
|
|
)
|
|
try:
|
|
Path(config_dir).mkdir(parents=True, exist_ok=True)
|
|
except FileExistsError:
|
|
print(
|
|
"The config dir {} already exists and is not a directory! Please fix manually.".format(
|
|
config_dir
|
|
)
|
|
)
|
|
sys.exit(255)
|
|
return
|
|
# Loading the config file
|
|
with open(config_file_path, "r") as config_file:
|
|
app_config = json.load(config_file)
|
|
return
|
|
|
|
|
|
def main():
|
|
os.environ["TZ"] = "America/Toronto"
|
|
time.tzset()
|
|
now = time.mktime(time.localtime())
|
|
setupPaths() # Handle the config and log paths
|
|
try:
|
|
last_check = app_config["lastupdate"]
|
|
except KeyError:
|
|
last_check = now - 21600 # first run, no lastupdate, check up to 6 hours ago
|
|
for i, hook in enumerate(app_config["feeds"]):
|
|
# Get the feed
|
|
print("Parsing feed {}...".format(hook["name"]))
|
|
feeds = feedparser.parse(hook["url"])
|
|
latest_post = []
|
|
prev_best = 0
|
|
for feed in feeds:
|
|
try:
|
|
published_time = time.mktime(feed["published_parsed"])
|
|
published_time = published_time + hook["offset"]
|
|
except KeyError:
|
|
published_time = feed["published"]
|
|
print(published_time)
|
|
sys.exit(254)
|
|
if published_time > prev_best:
|
|
latest_post = feed
|
|
prev_best = published_time
|
|
else:
|
|
continue
|
|
|
|
# Hash the title of the latest post and use that to determine if it's been posted
|
|
new_hash = hashlib.sha3_512(bytes(latest_post["title"], "utf-8")).hexdigest()
|
|
try:
|
|
if hook["lasthash"] != new_hash:
|
|
app_config["feeds"][i]["lasthash"] = new_hash
|
|
else:
|
|
continue
|
|
except KeyError:
|
|
app_config["feeds"][i]["lasthash"] = new_hash
|
|
# Generate the webhook
|
|
webhook = {
|
|
"embeds": [
|
|
{
|
|
"title": str(latest_post["title"]),
|
|
"url": str(latest_post["link"]),
|
|
"color": 216128,
|
|
"footer": {
|
|
"name": "DiscoRSS",
|
|
# "url": "https://git.frzn.dev/amr/discorss",
|
|
},
|
|
"author": {
|
|
"name": str(hook["name"]),
|
|
"url": str(hook["siteurl"]),
|
|
},
|
|
"fields": [
|
|
{
|
|
"name": "Excerpt from post:",
|
|
"value": get_description(latest_post),
|
|
}
|
|
],
|
|
}
|
|
],
|
|
"attachments": [],
|
|
}
|
|
custom_header = {
|
|
"user-agent": "DiscoRSS (https://git.frzn.dev/amr/discorss, 0.2rc2)",
|
|
"content-type": "application/json",
|
|
}
|
|
webhook_string = json.dumps(webhook)
|
|
|
|
if published_time > last_check:
|
|
r = requests.post(
|
|
hook["webhook"], data=webhook_string, headers=custom_header
|
|
)
|
|
if r.status_code not in success_codes:
|
|
print(
|
|
"Error {} while trying to post {}".format(
|
|
r.status_code, hook["webhook"]
|
|
)
|
|
)
|
|
app_config["lastupdate"] = now
|
|
with open(config_file_path, "w") as config_file:
|
|
json.dump(app_config, config_file, indent=4)
|
|
|
|
return
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|