class refactor: Squashed commit of the following:

commit 597f383724
Author: A.M. Rowsell <amrowsell@frozenelectronics.ca>
Date:   Sun Apr 20 16:04:00 2025 -0400

    fix: typo in log setup, error when replacing w/self

commit 810cbd6f3d
Author: A.M. Rowsell <amrowsell@frozenelectronics.ca>
Date:   Sun Apr 20 13:51:41 2025 -0400

    refactor: transformed into class-based app
This commit is contained in:
A.M. Rowsell 2025-04-20 16:06:16 -04:00
commit 087a6339c8
Signed by: amr
GPG key ID: 0B6E2D8375CF79A9

View file

@ -22,204 +22,220 @@ import sys
import argparse
import re
config_dir = os.environ.get("XDG_CONFIG_HOME")
home_dir = Path.home()
if config_dir is None:
config_file_path = str(home_dir) + "/.config/discorss/discorss.conf"
config_dir = str(home_dir) + "/.config/discorss"
else:
config_file_path = config_dir + r"/discorss/discorss.conf"
log_dir = r"/var/log/discorss"
log_file_path = r"/app.log"
# Yes, I know you "can't parse HTML with regex", but
# just watch me.
html_filter = re.compile(r"\<\/?([A-Za-z0-9 \:\.\-\/\"\=])*\>")
success_codes = [200, 201, 202, 203, 204, 205, 206]
app_config = {}
# IDEA: Consider making this into a class-based program
# This would solve a couple issues around global variables and generally
# make things a bit neater
class Discorss:
def __init__(self):
self.config_dir = os.environ.get("XDG_CONFIG_HOME")
home_dir = Path.home()
if self.config_dir is None:
self.config_file_path = str(home_dir) + "/.config/discorss/discorss.conf"
self.config_dir = str(home_dir) + "/.config/discorss"
else:
self.config_file_path = self.config_dir + r"/discorss/discorss.conf"
self.log_dir = r"/var/log/discorss"
self.log_file_path = r"/app.log"
# Yes, I know you "can't parse HTML with regex", but
# just watch me.
self.html_filter = re.compile(r"\<\/?([A-Za-z0-9 \:\.\-\/\"\=])*\>")
self.success_codes = [200, 201, 202, 203, 204, 205, 206]
self.app_config = {}
# This function gets and formats the brief excerpt that goes in the embed
# Different feeds put summaries in different fields, so we pick the best
# one and limit it to 250 characters.
def get_description(feed, length=250, min_length=150, addons=None):
try:
temporary_string = str(feed["summary_detail"]["value"])
temporary_string = html_filter.sub("", temporary_string)
while length > min_length:
if temporary_string[length - 1 : length] == " ":
break
else:
length -= 1
except KeyError:
temporary_string = str(feed["description"])
temporary_string = html_filter.sub("", temporary_string)
while length > min_length:
if temporary_string[length - 1 : length] == " ":
break
else:
length -= 1
desc = temporary_string[:length]
if addons is not None:
desc = desc + str(addons)
return desc
def setupPaths():
global app_config
global logger
# Check for log and config files/paths, create empty directories if needed
# TODO: make this cleaner
if not Path(log_dir).exists():
print("No log file path exists. Yark! We'll try and make {}...".format(log_dir))
# This function gets and formats the brief excerpt that goes in the embed
# Different feeds put summaries in different fields, so we pick the best
# one and limit it to 250 characters.
def get_description(self, feed, length=250, min_length=150, addons=None):
try:
Path(log_dir).mkdir(parents=True, exist_ok=True)
except FileExistsError:
print("The path {} already exists and is not a directory!".format(log_dir))
if not Path(config_file_path).exists():
print(
"No config file at {}! Snarf. We'll try and make {}...".format(
config_file_path, config_dir
)
)
try:
Path(config_dir).mkdir(parents=True, exist_ok=True)
except FileExistsError:
temporary_string = str(feed["summary_detail"]["value"])
temporary_string = self.html_filter.sub("", temporary_string)
while length > min_length:
if temporary_string[length - 1 : length] == " ":
break
else:
length -= 1
except KeyError:
temporary_string = str(feed["description"])
temporary_string = self.html_filter.sub("", temporary_string)
while length > min_length:
if temporary_string[length - 1 : length] == " ":
break
else:
length -= 1
desc = temporary_string[:length]
if addons is not None:
desc = desc + str(addons)
return desc
def setupPaths(self):
# Check for log and config files/paths, create empty directories if needed
# TODO: make this cleaner
if not Path(self.log_dir).exists():
print(
"The config dir {} already exists and is not a directory! Please fix manually. Quitting!".format(
config_dir
"No log file path exists. Yark! We'll try and make {}...".format(
self.log_dir
)
)
sys.exit(255)
try:
Path(self.log_dir).mkdir(parents=True, exist_ok=True)
except FileExistsError:
print(
"The path {} already exists and is not a directory!".format(
self.log_dir
)
)
if not Path(self.config_file_path).exists():
print(
"No config file at {}! Snarf. We'll try and make {}...".format(
self.config_file_path, self.config_dir
)
)
try:
Path(self.config_dir).mkdir(parents=True, exist_ok=True)
except FileExistsError:
print(
"The config dir {} already exists and is not a directory! Please fix manually. Quitting!".format(
self.config_dir
)
)
sys.exit(255)
return
# Loading the config file
with open(self.config_file_path, "r") as config_file:
self.app_config = json.load(config_file)
# Set up logging
self.logger = logging.getLogger(__name__)
logging.basicConfig(
filename=str(self.log_dir + self.log_file_path),
encoding="utf-8",
level=logging.DEBUG,
datefmt="%m/%d/%Y %H:%M:%S",
format="%(asctime)s: %(levelname)s: %(message)s",
)
return
# Loading the config file
with open(config_file_path, "r") as config_file:
app_config = json.load(config_file)
# Set up logging
logger = logging.getLogger(__name__)
logging.basicConfig(
filename=str(log_dir + log_file_path),
encoding="utf-8",
level=logging.DEBUG,
datefmt="%m/%d/%Y %H:%M:%S",
format="%(asctime)s: %(levelname)s: %(message)s",
)
return
def process(self):
os.environ["TZ"] = "America/Toronto"
time.tzset()
now = time.mktime(time.localtime())
self.setupPaths() # Handle the config and log paths
try:
last_check = self.app_config["lastupdate"]
except KeyError:
last_check = (
now - 21600
) # first run, no lastupdate, check up to 6 hours ago
for i, hook in enumerate(self.app_config["feeds"]): # Feed loop start
self.logger.debug("Parsing feed %s...", hook["name"])
self.feeds = feedparser.parse(hook["url"])
self.latest_post = []
prev_best = 0
self.logger.debug(
"About to sort through entries for feed %s ...", hook["name"]
)
for feed in self.feeds["entries"]:
try:
bad_time = False
published_time = time.mktime(feed["published_parsed"])
published_time = published_time + hook["offset"]
except KeyError:
published_time = time.mktime(feed["updated_parsed"])
bad_time = True
if published_time > prev_best:
latest_post = feed
prev_best = published_time
else:
continue
if bad_time is True:
self.logger.debug(
"Feed %s doesn't supply a published time, using updated time instead",
hook["name"],
)
# Hash the title and time of the latest post and use that to determine if it's been posted
# Yes, SHA3-512 is totally unnecessary for this purpose, but I love SHA3
self.logger.debug("About to hash %s ...", latest_post["title"])
try:
new_hash = hashlib.sha3_512(
bytes(latest_post["title"] + str(published_time), "utf-8")
).hexdigest()
except TypeError:
self.logger.error("Title of %s isn't hashing correctly", hook["name"])
continue
try:
if hook["lasthash"] != new_hash:
self.app_config["feeds"][i]["lasthash"] = new_hash
else:
continue
except KeyError:
self.app_config["feeds"][i]["lasthash"] = new_hash
self.logger.info(
"Feed %s has no existing hash, likely a new feed!", hook["name"]
)
# Generate the webhook
self.logger.info(
"Publishing webhook for %s. Last check was %d, now is %d",
hook["name"],
last_check,
now,
)
webhook = {
"embeds": [
{
"title": str(latest_post["title"]),
"url": str(latest_post["link"]),
"color": 2123412,
"footer": {
"text": "DiscoRSS",
"icon_url": "https://frzn.dev/~amr/images/discorss.png",
},
"author": {
"name": str(hook["name"]),
"url": str(hook["siteurl"]),
},
"fields": [
{
"name": "Excerpt from post:",
"value": self.get_description(latest_post),
}
],
# "timestamp": str(now),
}
],
"attachments": [],
}
custom_header = {
"user-agent": "DiscoRSS (https://git.frzn.dev/amr/discorss, 0.2rc3)",
"content-type": "application/json",
}
webhook_string = json.dumps(webhook)
self.logger.debug("About to run POST for %s", hook["name"])
r = requests.post(
hook["webhook"], data=webhook_string, headers=custom_header
)
if r.status_code not in self.success_codes:
self.logger.error(
"Error %d while trying to post %s", r.status_code, hook["name"]
)
else:
self.logger.debug("Got %d when posting %s", r.status_code, hook["name"])
# End of feed loop
# Dump updated config back to json file
self.logger.debug("Dumping config back to %s", str(self.config_file_path))
self.app_config["lastupdate"] = now
with open(self.config_file_path, "w") as config_file:
json.dump(self.app_config, config_file, indent=4)
return
# end of Discorss class
def main():
os.environ["TZ"] = "America/Toronto"
time.tzset()
now = time.mktime(time.localtime())
setupPaths() # Handle the config and log paths
try:
last_check = app_config["lastupdate"]
except KeyError:
last_check = now - 21600 # first run, no lastupdate, check up to 6 hours ago
for i, hook in enumerate(app_config["feeds"]): # Feed loop start
logger.debug("Parsing feed %s...", hook["name"])
feeds = feedparser.parse(hook["url"])
latest_post = []
prev_best = 0
logger.debug("About to sort through entries for feed %s ...", hook["name"])
for feed in feeds["entries"]:
try:
bad_time = False
published_time = time.mktime(feed["published_parsed"])
published_time = published_time + hook["offset"]
except KeyError:
published_time = time.mktime(feed["updated_parsed"])
bad_time = True
if published_time > prev_best:
latest_post = feed
prev_best = published_time
else:
continue
if bad_time is True:
logger.debug(
"Feed %s doesn't supply a published time, using updated time instead",
hook["name"],
)
# Hash the title and time of the latest post and use that to determine if it's been posted
# Yes, SHA3-512 is totally unnecessary for this purpose, but I love SHA3
logger.debug("About to hash %s ...", latest_post["title"])
try:
new_hash = hashlib.sha3_512(
bytes(latest_post["title"] + str(published_time), "utf-8")
).hexdigest()
except TypeError:
logger.error("Title of %s isn't hashing correctly", hook["name"])
continue
try:
if hook["lasthash"] != new_hash:
app_config["feeds"][i]["lasthash"] = new_hash
else:
continue
except KeyError:
app_config["feeds"][i]["lasthash"] = new_hash
logger.info(
"Feed %s has no existing hash, likely a new feed!", hook["name"]
)
# Generate the webhook
logger.info(
"Publishing webhook for %s. Last check was %d, now is %d",
hook["name"],
last_check,
now,
)
webhook = {
"embeds": [
{
"title": str(latest_post["title"]),
"url": str(latest_post["link"]),
"color": 2123412,
"footer": {
"text": "DiscoRSS",
"icon_url": "https://frzn.dev/~amr/images/discorss.png",
},
"author": {
"name": str(hook["name"]),
"url": str(hook["siteurl"]),
},
"fields": [
{
"name": "Excerpt from post:",
"value": get_description(latest_post),
}
],
# "timestamp": str(now),
}
],
"attachments": [],
}
custom_header = {
"user-agent": "DiscoRSS (https://git.frzn.dev/amr/discorss, 0.2rc3)",
"content-type": "application/json",
}
webhook_string = json.dumps(webhook)
logger.debug("About to run POST for %s", hook["name"])
r = requests.post(hook["webhook"], data=webhook_string, headers=custom_header)
if r.status_code not in success_codes:
logger.error(
"Error %d while trying to post %s", r.status_code, hook["name"]
)
else:
logger.debug("Got %d when posting %s", r.status_code, hook["name"])
# End of feed loop
# Dump updated config back to json file
logger.debug("Dumping config back to %s", str(config_file_path))
app_config["lastupdate"] = now
with open(config_file_path, "w") as config_file:
json.dump(app_config, config_file, indent=4)
return
app = Discorss()
app.process()
if __name__ == "__main__":