discorss/discorss.py

128 lines
4.5 KiB
Python
Raw Normal View History

#!/usr/bin/env python
2025-01-30 16:28:11 -05:00
# -*- coding: UTF-8 -*-
# SPDX-License-Identifier: MPL-2.0
# SPDX-FileCopyrightText: © 2025 A.M. Rowsell <https://frzn.dev/~amr>
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
# DiscoRSS: A simple RSS feed reader for Discord. Takes RSS feeds and then sends them to
# webhooks. Intended to run using systemd timers.
import requests
import feedparser
import hashlib
from pathlib import Path
import json
import time
import os
import re
config_file_path = r"/etc/discorss.conf"
# config_file_path = r"discorss.conf"
log_file_path = r"/var/log/discorss"
# log_file_path = r"./log"
log_file_name = r"/app.log"
# Yes, I know you "can't parse HTML with regex", but
# just watch me.
html_filter = re.compile(r"\<\/?([A-Za-z \"\=])*\>")
def get_description(feed):
try:
temporary_string = str(feed.entries[0]["summary_detail"]["value"])
temporary_string = html_filter.sub("", temporary_string)
desc = (
temporary_string[:150] if len(temporary_string) > 150 else temporary_string
)
except KeyError:
temporary_string = str(feed.entries[0]["description"])
temporary_string = html_filter.sub("", temporary_string)
desc = (
temporary_string[:150] if len(temporary_string) > 150 else temporary_string
)
return desc
def main():
os.environ["TZ"] = "America/Toronto"
time.tzset()
try:
Path(log_file_path).mkdir(parents=True, exist_ok=True)
except FileExistsError:
print("This path already exists and is not a directory!")
# Load and read the config file
if not Path(config_file_path).exists():
print("No config file! Snarf. Directories were created for you.")
return
with open(config_file_path, "r") as config_file:
app_config = json.load(config_file)
now = time.mktime(time.localtime())
try:
last_check = app_config["lastupdate"]
except KeyError:
last_check = now - 21600 # first run, no lastupdate, check up to 6 hours ago
for i, hook in enumerate(app_config["feeds"]):
2025-01-30 16:28:11 -05:00
# Get the feed
feed = feedparser.parse(hook["url"])
published_time = time.mktime(feed.entries[0]["published_parsed"])
published_time = published_time + hook["offset"]
print("Parsing feed {}...".format(hook["name"]))
# Hash the title of the latest post and use that to determine if it's been posted
new_hash = hashlib.sha3_512(bytes(feed.entries[0]["title"], 'utf-8')).hexdigest()
try:
if hook["lasthash"] != new_hash:
app_config["feeds"][i]["lasthash"] = new_hash
else:
continue
except KeyError:
app_config["feeds"][i]["lasthash"] = new_hash
2025-01-30 16:28:11 -05:00
# Generate the webhook
webhook = {
"embeds": [
{
"title": str(feed.entries[0]["title"]),
"url": str(feed.entries[0]["link"]),
"color": 216128,
"footer": {
"name": "DiscoRSS",
# "url": "https://git.frzn.dev/amr/discorss",
},
"author": {"name": str(hook["name"]), "url": str(hook["siteurl"])},
"fields": [
{
"name": "Excerpt from post:",
"value": get_description(feed),
}
],
}
],
"attachments": [],
}
custom_header = {
"user-agent": "DiscoRSS (https://git.frzn.dev/amr/discorss, 0.2rc1)",
"content-type": "application/json",
}
webhook_string = json.dumps(webhook)
# print(webhook_string)
if published_time > last_check and published_time < now:
r = requests.post(
hook["webhook"], data=webhook_string, headers=custom_header
)
if r.status_code != "200":
print(
"Error {} while trying to post {}".format(
r.status_code, hook["webhook"]
)
)
app_config["lastupdate"] = now
with open(config_file_path, "w") as config_file:
json.dump(app_config, config_file, indent=4)
return
if __name__ == "__main__":
main()