discorss/discorss.py

149 lines
5.2 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python
2025-01-30 16:28:11 -05:00
# -*- coding: UTF-8 -*-
# SPDX-License-Identifier: MPL-2.0
# SPDX-FileCopyrightText: © 2025 A.M. Rowsell <https://frzn.dev/~amr>
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
# DiscoRSS: A simple RSS feed reader for Discord. Takes RSS feeds and then sends them to
# webhooks. Intended to run using systemd timers.
import requests
import feedparser
import hashlib
from pathlib import Path
import json
import time
import os
import re
config_dir = os.environ.get('XDG_CONFIG_HOME')
if config_dir is None:
config_file_path = r"~/.config/discorss/discorss.conf"
config_dir = r"~/.config/discorss"
else:
config_file_path = config_dir + r"/discorss/discorss.conf"
log_file_path = r"/var/log/discorss"
# log_file_path = r"./log"
log_file_name = r"/app.log"
# Yes, I know you "can't parse HTML with regex", but
# just watch me.
html_filter = re.compile(r"\<\/?([A-Za-z \"\=])*\>")
success_codes = ["200", "201", "202", "203", "204", "205", "206"]
# This function gets and formats the brief excerpt that goes in the embed
# Different feeds put summaries in different fields, so we pick the best
# one and limit it to 150 characters.
# TODO: make the character limit smarter, as to split at a natural point
def get_description(feed):
try:
temporary_string = str(feed.entries[0]["summary_detail"]["value"])
temporary_string = html_filter.sub("", temporary_string)
desc = (
temporary_string[:150]
if len(temporary_string) > 150
else temporary_string
)
except KeyError:
temporary_string = str(feed.entries[0]["description"])
temporary_string = html_filter.sub("", temporary_string)
desc = (
temporary_string[:150]
if len(temporary_string) > 150
else temporary_string
)
return desc
def main():
os.environ["TZ"] = "America/Toronto"
time.tzset()
# Check for log and config files/paths, create empty directories if needed
try:
Path(log_file_path).mkdir(parents=True, exist_ok=True)
except FileExistsError:
print("The logfile path {} already exists and is not a directory!".format(log_file_path))
if not Path(config_file_path).exists():
print("No config file at {}! Snarf.\n{} was created for you.".format(config_file_path, config_dir))
Path(config_file_path).mkdir(parents=True, exist_ok=True)
return
with open(config_file_path, "r") as config_file:
app_config = json.load(config_file)
now = time.mktime(time.localtime())
try:
last_check = app_config["lastupdate"]
except KeyError:
last_check = (
now - 21600
) # first run, no lastupdate, check up to 6 hours ago
for i, hook in enumerate(app_config["feeds"]):
2025-01-30 16:28:11 -05:00
# Get the feed
feed = feedparser.parse(hook["url"])
published_time = time.mktime(feed.entries[0]["published_parsed"])
published_time = published_time + hook["offset"]
print("Parsing feed {}...".format(hook["name"]))
# Hash the title of the latest post and use that to determine if it's been posted
new_hash = hashlib.sha3_512(
bytes(feed.entries[0]["title"], "utf-8")
).hexdigest()
try:
if hook["lasthash"] != new_hash:
app_config["feeds"][i]["lasthash"] = new_hash
else:
continue
except KeyError:
app_config["feeds"][i]["lasthash"] = new_hash
2025-01-30 16:28:11 -05:00
# Generate the webhook
webhook = {
"embeds": [
{
"title": str(feed.entries[0]["title"]),
"url": str(feed.entries[0]["link"]),
"color": 216128,
"footer": {
"name": "DiscoRSS",
# "url": "https://git.frzn.dev/amr/discorss",
},
"author": {
"name": str(hook["name"]),
"url": str(hook["siteurl"]),
},
"fields": [
{
"name": "Excerpt from post:",
"value": get_description(feed),
}
],
}
],
"attachments": [],
}
custom_header = {
"user-agent": "DiscoRSS (https://git.frzn.dev/amr/discorss, 0.2rc1)",
"content-type": "application/json",
}
webhook_string = json.dumps(webhook)
2025-02-05 23:40:27 -05:00
if published_time > last_check:
r = requests.post(
hook["webhook"], data=webhook_string, headers=custom_header
)
if r.status_code not in success_codes:
print(
"Error {} while trying to post {}".format(
r.status_code, hook["webhook"]
)
)
app_config["lastupdate"] = now
with open(config_file_path, "w") as config_file:
json.dump(app_config, config_file, indent=4)
return
if __name__ == "__main__":
main()