troutbot/src/polling.ts
NotAShelf 3eb5ccf61c
treewide: implement authorized users for polling; cleanup
Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: I0c72309281e8c67e4ee4333c4c3bc1fe6a6a6964
2026-02-01 17:57:38 +03:00

342 lines
10 KiB
TypeScript

import type { Config, WebhookEvent, RepoConfig } from './types.js';
import {
listRecentComments,
fetchPR,
fetchIssue,
hasExistingComment,
postComment,
updateComment,
formatComment,
createReaction,
type RecentComment,
} from './github.js';
import { createEngine } from './engine/index.js';
import { getLogger } from './logger.js';
import { recordEvent } from './events.js';
import { readFileSync, writeFileSync, existsSync } from 'fs';
interface ProcessedComment {
id: number;
timestamp: number;
}
interface PollingState {
lastProcessedAt: Record<string, string>;
}
const processedComments: Map<string, ProcessedComment> = new Map();
const MAX_PROCESSED_CACHE = 1000;
let pollingState: PollingState = { lastProcessedAt: {} };
function loadPollingState(stateFile: string): void {
if (existsSync(stateFile)) {
try {
const data = readFileSync(stateFile, 'utf-8');
const parsed = JSON.parse(data);
// Validate that parsed data has expected structure
if (
parsed &&
typeof parsed === 'object' &&
parsed.lastProcessedAt &&
typeof parsed.lastProcessedAt === 'object'
) {
pollingState = parsed;
} else {
getLogger().warn('Invalid polling state format, resetting to empty state');
pollingState = { lastProcessedAt: {} };
}
} catch {
// Ignore parse errors, use empty state
pollingState = { lastProcessedAt: {} };
}
}
}
function savePollingState(stateFile: string): void {
try {
writeFileSync(stateFile, JSON.stringify(pollingState, null, 2));
} catch (err) {
getLogger().warn('Failed to save polling state', err);
}
}
function getRepoKey(owner: string, repo: string): string {
return `${owner}/${repo}`;
}
function getLastProcessedAt(owner: string, repo: string): Date | undefined {
const key = getRepoKey(owner, repo);
const timestamp = pollingState.lastProcessedAt[key];
return timestamp ? new Date(timestamp) : undefined;
}
function setLastProcessedAt(owner: string, repo: string, date: Date): void {
const key = getRepoKey(owner, repo);
pollingState.lastProcessedAt[key] = date.toISOString();
}
function getCacheKey(owner: string, repo: string, commentId: number): string {
return `${owner}/${repo}#${commentId}`;
}
function isProcessed(owner: string, repo: string, commentId: number): boolean {
return processedComments.has(getCacheKey(owner, repo, commentId));
}
function markProcessed(owner: string, repo: string, commentId: number): void {
const key = getCacheKey(owner, repo, commentId);
processedComments.set(key, { id: commentId, timestamp: Date.now() });
// Clean up old entries if cache is too large
if (processedComments.size > MAX_PROCESSED_CACHE) {
const entries = Array.from(processedComments.entries());
entries.sort((a, b) => a[1].timestamp - b[1].timestamp);
const toRemove = entries.slice(0, entries.length - MAX_PROCESSED_CACHE);
for (const [k] of toRemove) {
processedComments.delete(k);
}
}
}
function containsMention(body: string): boolean {
return body.includes('@troutbot');
}
async function analyzeAndComment(
event: WebhookEvent,
config: Config
): Promise<Record<string, unknown>> {
const logger = getLogger();
const engine = createEngine(config.engine);
// Run analysis
const analysis = await engine.analyze(event);
logger.info(
`Analyzed ${event.owner}/${event.repo}#${event.number}: impact=${analysis.impact}, confidence=${analysis.confidence.toFixed(2)}`
);
// Check for existing comment
const { commentMarker, allowUpdates } = config.response;
const existing = await hasExistingComment(event.owner, event.repo, event.number, commentMarker);
if (existing.exists && !allowUpdates) {
logger.info(`Already commented on ${event.owner}/${event.repo}#${event.number}, skipping`);
const result = { skipped: true, reason: 'Already commented' };
recordEvent(event, result, analysis);
return result;
}
const body = formatComment(
config.response,
event.type,
analysis.impact,
analysis.confidence,
analysis.reasoning
);
if (existing.exists && allowUpdates && existing.commentId) {
logger.info(`Updating existing comment on ${event.owner}/${event.repo}#${event.number}`);
await updateComment(event.owner, event.repo, existing.commentId, body);
} else {
await postComment(event.owner, event.repo, event.number, body);
}
const result = { processed: true, impact: analysis.impact, confidence: analysis.confidence };
recordEvent(event, result, analysis);
return result;
}
function isAuthorized(username: string, authorizedUsers?: string[]): boolean {
if (!authorizedUsers || authorizedUsers.length === 0) {
return true; // No restrictions
}
const normalizedUsername = username.toLowerCase();
return authorizedUsers.some((u) => u.toLowerCase() === normalizedUsername);
}
function isRepoAuthorized(owner: string, repo: string, pollingRepos?: RepoConfig[]): boolean {
if (!pollingRepos || pollingRepos.length === 0) {
return true; // No restrictions, use global repos
}
return pollingRepos.some((r) => r.owner === owner && r.repo === repo);
}
async function processComment(
comment: RecentComment,
owner: string,
repo: string,
config: Config
): Promise<void> {
const logger = getLogger();
if (!containsMention(comment.body)) {
return;
}
if (isProcessed(owner, repo, comment.id)) {
logger.debug(`Comment ${owner}/${repo}#${comment.id} already processed, skipping`);
return;
}
// Check if repo is authorized for polling
const pollingRepos = config.polling?.repositories;
if (!isRepoAuthorized(owner, repo, pollingRepos)) {
logger.info(
`Unauthorized repo ${owner}/${repo} for polling, ignoring mention from ${comment.author}`
);
await createReaction(owner, repo, comment.id, 'thumbs_down');
markProcessed(owner, repo, comment.id);
return;
}
// Check if user is authorized
const authorizedUsers = config.polling?.authorizedUsers;
if (!isAuthorized(comment.author, authorizedUsers)) {
logger.info(
`Unauthorized user ${comment.author} attempted on-demand analysis in ${owner}/${repo}#${comment.issueNumber}`
);
markProcessed(owner, repo, comment.id);
return;
}
logger.info(`Found @troutbot mention in ${owner}/${repo}#${comment.issueNumber}`);
try {
// First, try to fetch as a PR to check if it's a pull request
const prData = await fetchPR(owner, repo, comment.issueNumber);
let event: WebhookEvent;
if (prData) {
// It's a pull request
event = {
action: 'on_demand',
type: 'pull_request',
number: comment.issueNumber,
title: prData.title,
body: prData.body,
owner,
repo,
author: prData.author,
labels: prData.labels,
branch: prData.branch,
sha: prData.sha,
};
} else {
// It's an issue
const issueData = await fetchIssue(owner, repo, comment.issueNumber);
if (!issueData) {
logger.warn(`Could not fetch issue ${owner}/${repo}#${comment.issueNumber}`);
return;
}
event = {
action: 'on_demand',
type: 'issue',
number: comment.issueNumber,
title: issueData.title,
body: issueData.body,
owner,
repo,
author: issueData.author,
labels: issueData.labels,
};
}
await analyzeAndComment(event, config);
markProcessed(owner, repo, comment.id);
logger.info(
`Successfully processed on-demand analysis for ${owner}/${repo}#${comment.issueNumber}`
);
} catch (err) {
logger.error(`Failed to process mention in ${owner}/${repo}#${comment.issueNumber}`, err);
}
}
async function pollRepository(
owner: string,
repo: string,
config: Config,
since: Date,
stateFile?: string
): Promise<void> {
const logger = getLogger();
try {
const comments = await listRecentComments(owner, repo, since);
logger.debug(`Fetched ${comments.length} recent comments from ${owner}/${repo}`);
let latestCommentDate = since;
for (const comment of comments) {
await processComment(comment, owner, repo, config);
const commentDate = new Date(comment.createdAt);
if (commentDate > latestCommentDate) {
latestCommentDate = commentDate;
}
}
// Update last processed timestamp
setLastProcessedAt(owner, repo, latestCommentDate);
if (stateFile) {
savePollingState(stateFile);
}
} catch (err) {
logger.error(`Failed to poll ${owner}/${repo}`, err);
}
}
export async function startPolling(config: Config): Promise<void> {
const logger = getLogger();
const pollingConfig = config.polling;
if (!pollingConfig || !pollingConfig.enabled) {
logger.info('Polling is disabled');
return;
}
if (config.repositories.length === 0) {
logger.warn('Polling enabled but no repositories configured');
return;
}
const intervalMs = pollingConfig.intervalMinutes * 60 * 1000;
const lookbackMs = pollingConfig.lookbackMinutes * 60 * 1000;
logger.info(`Starting polling for ${config.repositories.length} repositories`);
logger.info(
`Poll interval: ${pollingConfig.intervalMinutes} minutes, lookback: ${pollingConfig.lookbackMinutes} minutes`
);
// Load persisted state if backfill is enabled
const stateFile = pollingConfig.backfill
? pollingConfig.stateFile || '.troutbot-polling-state.json'
: undefined;
if (stateFile) {
loadPollingState(stateFile);
logger.info(`Polling state file: ${stateFile}`);
}
// Do an initial poll - use persisted timestamp if available, otherwise use lookback
for (const repo of config.repositories) {
const lastProcessed = getLastProcessedAt(repo.owner, repo.repo);
const initialSince = lastProcessed || new Date(Date.now() - lookbackMs);
if (lastProcessed) {
logger.info(
`Resuming polling for ${repo.owner}/${repo.repo} from ${lastProcessed.toISOString()}`
);
}
await pollRepository(repo.owner, repo.repo, config, initialSince, stateFile);
}
// Set up recurring polling
setInterval(async () => {
const since = new Date(Date.now() - lookbackMs);
for (const repo of config.repositories) {
await pollRepository(repo.owner, repo.repo, config, since, stateFile);
}
}, intervalMs);
}