From 3a158f99a68046665509f3ca21b6e4dd2e9882b5 Mon Sep 17 00:00:00 2001 From: msukkari Date: Tue, 25 Feb 2025 11:59:41 -0800 Subject: [PATCH 1/7] add gc queue logic --- packages/backend/src/constants.ts | 1 + packages/backend/src/promClient.ts | 60 +++- packages/backend/src/repoManager.ts | 322 ++++++++++++------ packages/backend/src/types.ts | 4 + .../migration.sql | 11 + packages/db/prisma/schema.prisma | 3 + 6 files changed, 280 insertions(+), 121 deletions(-) create mode 100644 packages/db/prisma/migrations/20250225194100_add_gc_repo_status/migration.sql diff --git a/packages/backend/src/constants.ts b/packages/backend/src/constants.ts index 2334c5df..ab31fbb8 100644 --- a/packages/backend/src/constants.ts +++ b/packages/backend/src/constants.ts @@ -11,4 +11,5 @@ export const DEFAULT_SETTINGS: Settings = { reindexRepoPollingIntervalMs: 1000, indexConcurrencyMultiple: 3, configSyncConcurrencyMultiple: 3, + gcConcurrencyMultiple: 3, } \ No newline at end of file diff --git a/packages/backend/src/promClient.ts b/packages/backend/src/promClient.ts index 27cf98c4..7dbcdbc0 100644 --- a/packages/backend/src/promClient.ts +++ b/packages/backend/src/promClient.ts @@ -6,9 +6,16 @@ export class PromClient { private app: express.Application; public activeRepoIndexingJobs: Gauge; public repoIndexingDuration: Histogram; - public repoIndexingErrors: Counter; - public repoIndexingFails: Counter; - public repoIndexingSuccesses: Counter; + public repoIndexingErrorTotal: Counter; + public repoIndexingFailTotal: Counter; + public repoIndexingSuccessTotal: Counter; + + public activeRepoGarbageCollectionJobs: Gauge; + public repoGarbageCollectionDuration: Histogram; + public repoGarbageCollectionErrorTotal: Counter; + public repoGarbageCollectionFailTotal: Counter; + public repoGarbageCollectionSuccessTotal: Counter; + public readonly PORT = 3060; constructor() { @@ -28,26 +35,61 @@ export class PromClient { }); this.registry.registerMetric(this.repoIndexingDuration); - this.repoIndexingErrors = new Counter({ + this.repoIndexingErrorTotal = new Counter({ name: 'repo_indexing_errors', help: 'The number of repo indexing errors', labelNames: ['repo'], }); - this.registry.registerMetric(this.repoIndexingErrors); + this.registry.registerMetric(this.repoIndexingErrorTotal); - this.repoIndexingFails = new Counter({ + this.repoIndexingFailTotal = new Counter({ name: 'repo_indexing_fails', help: 'The number of repo indexing fails', labelNames: ['repo'], }); - this.registry.registerMetric(this.repoIndexingFails); + this.registry.registerMetric(this.repoIndexingFailTotal); - this.repoIndexingSuccesses = new Counter({ + this.repoIndexingSuccessTotal = new Counter({ name: 'repo_indexing_successes', help: 'The number of repo indexing successes', labelNames: ['repo'], }); - this.registry.registerMetric(this.repoIndexingSuccesses); + this.registry.registerMetric(this.repoIndexingSuccessTotal); + + this.activeRepoGarbageCollectionJobs = new Gauge({ + name: 'active_repo_garbage_collection_jobs', + help: 'The number of repo garbage collection jobs in progress', + labelNames: ['repo'], + }); + this.registry.registerMetric(this.activeRepoGarbageCollectionJobs); + + this.repoGarbageCollectionDuration = new Histogram({ + name: 'repo_garbage_collection_duration', + help: 'The duration of repo garbage collection jobs', + labelNames: ['repo'], + }); + this.registry.registerMetric(this.repoGarbageCollectionDuration); + + this.repoGarbageCollectionErrorTotal = new Counter({ + name: 'repo_garbage_collection_errors', + help: 'The number of repo garbage collection errors', + labelNames: ['repo'], + }); + this.registry.registerMetric(this.repoGarbageCollectionErrorTotal); + + this.repoGarbageCollectionFailTotal = new Counter({ + name: 'repo_garbage_collection_fails', + help: 'The number of repo garbage collection fails', + labelNames: ['repo'], + }); + this.registry.registerMetric(this.repoGarbageCollectionFailTotal); + + this.repoGarbageCollectionSuccessTotal = new Counter({ + name: 'repo_garbage_collection_successes', + help: 'The number of repo garbage collection successes', + labelNames: ['repo'], + }); + this.registry.registerMetric(this.repoGarbageCollectionSuccessTotal); client.collectDefaultMetrics({ register: this.registry, diff --git a/packages/backend/src/repoManager.ts b/packages/backend/src/repoManager.ts index f49eb460..4b9783c9 100644 --- a/packages/backend/src/repoManager.ts +++ b/packages/backend/src/repoManager.ts @@ -13,20 +13,26 @@ import { PromClient } from './promClient.js'; interface IRepoManager { blockingPollLoop: () => void; - scheduleRepoIndexingBulk: (repos: RepoWithConnections[]) => Promise; dispose: () => void; } -const QUEUE_NAME = 'repoIndexingQueue'; +const REPO_INDEXING_QUEUE = 'repoIndexingQueue'; +const REPO_GC_QUEUE = 'repoGarbageCollectionQueue'; type RepoWithConnections = Repo & { connections: (RepoToConnection & { connection: Connection})[] }; -type JobPayload = { +type RepoIndexingPayload = { repo: RepoWithConnections, } +type RepoGarbageCollectionPayload = { + repo: Repo, +} + export class RepoManager implements IRepoManager { - private worker: Worker; - private queue: Queue; + private indexWorker: Worker; + private indexQueue: Queue; + private gcWorker: Worker; + private gcQueue: Queue; private logger = createLogger('RepoManager'); constructor( @@ -36,28 +42,45 @@ export class RepoManager implements IRepoManager { private promClient: PromClient, private ctx: AppContext, ) { - this.queue = new Queue(QUEUE_NAME, { + const numCores = os.cpus().length; + + // Repo indexing + this.indexQueue = new Queue(REPO_INDEXING_QUEUE, { connection: redis, }); - const numCores = os.cpus().length; - this.worker = new Worker(QUEUE_NAME, this.runIndexJob.bind(this), { + this.indexWorker = new Worker(REPO_INDEXING_QUEUE, this.runIndexJob.bind(this), { connection: redis, concurrency: numCores * this.settings.indexConcurrencyMultiple, }); - this.worker.on('completed', this.onIndexJobCompleted.bind(this)); - this.worker.on('failed', this.onIndexJobFailed.bind(this)); + this.indexWorker.on('completed', this.onIndexJobCompleted.bind(this)); + this.indexWorker.on('failed', this.onIndexJobFailed.bind(this)); + + // Garbage collection + this.gcQueue = new Queue(REPO_GC_QUEUE, { + connection: redis, + }); + this.gcWorker = new Worker(REPO_GC_QUEUE, this.runGarbageCollectionJob.bind(this), { + connection: redis, + concurrency: numCores * this.settings.gcConcurrencyMultiple, + }); + this.gcWorker.on('completed', this.onGarbageCollectionJobCompleted.bind(this)); + this.gcWorker.on('failed', this.onGarbageCollectionJobFailed.bind(this)); } public async blockingPollLoop() { while(true) { await this.fetchAndScheduleRepoIndexing(); - await this.garbageCollectRepo(); + await this.fetchAndScheduleRepoGarbageCollection(); await new Promise(resolve => setTimeout(resolve, this.settings.reindexRepoPollingIntervalMs)); } } - public async scheduleRepoIndexingBulk(repos: RepoWithConnections[]) { + /////////////////////////// + // Repo indexing + /////////////////////////// + + private async scheduleRepoIndexingBulk(repos: RepoWithConnections[]) { await this.db.$transaction(async (tx) => { await tx.repo.updateMany({ where: { id: { in: repos.map(repo => repo.id) } }, @@ -75,10 +98,10 @@ export class RepoManager implements IRepoManager { for (const orgId in reposByOrg) { const orgRepos = reposByOrg[orgId]; // Set priority based on number of repos (more repos = lower priority) - // This helps prevent large orgs from overwhelming the queue + // This helps prevent large orgs from overwhelming the indexQueue const priority = Math.min(Math.ceil(orgRepos.length / 10), 2097152); - await this.queue.addBulk(orgRepos.map(repo => ({ + await this.indexQueue.addBulk(orgRepos.map(repo => ({ name: 'repoIndexJob', data: { repo }, opts: { @@ -86,26 +109,29 @@ export class RepoManager implements IRepoManager { } }))); - this.logger.info(`Added ${orgRepos.length} jobs to queue for org ${orgId} with priority ${priority}`); + this.logger.info(`Added ${orgRepos.length} jobs to indexQueue for org ${orgId} with priority ${priority}`); } }).catch((err: unknown) => { - this.logger.error(`Failed to add jobs to queue for repos ${repos.map(repo => repo.id).join(', ')}: ${err}`); + this.logger.error(`Failed to add jobs to indexQueue for repos ${repos.map(repo => repo.id).join(', ')}: ${err}`); }); } + private async fetchAndScheduleRepoIndexing() { const thresholdDate = new Date(Date.now() - this.settings.reindexIntervalMs); const repos = await this.db.repo.findMany({ where: { repoIndexingStatus: { - notIn: [RepoIndexingStatus.IN_INDEX_QUEUE, RepoIndexingStatus.INDEXING, RepoIndexingStatus.FAILED] + in: [ + RepoIndexingStatus.NEW, + RepoIndexingStatus.INDEXED + ] }, OR: [ { indexedAt: null }, { indexedAt: { lt: thresholdDate } }, - { repoIndexingStatus: RepoIndexingStatus.NEW } ] }, include: { @@ -116,76 +142,13 @@ export class RepoManager implements IRepoManager { } } }); - + if (repos.length > 0) { await this.scheduleRepoIndexingBulk(repos); } } - private async garbageCollectRepo() { - const reposWithNoConnections = await this.db.repo.findMany({ - where: { - repoIndexingStatus: { notIn: [RepoIndexingStatus.IN_INDEX_QUEUE, RepoIndexingStatus.INDEXING] }, // we let the job finish for now so we don't need to worry about cancelling - connections: { - none: {} - } - } - }); - - const sevenDaysAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000); - const inactiveOrgs = await this.db.org.findMany({ - where: { - stripeSubscriptionStatus: StripeSubscriptionStatus.INACTIVE, - stripeLastUpdatedAt: { - lt: sevenDaysAgo - } - } - }); - - const inactiveOrgIds = inactiveOrgs.map(org => org.id); - - const inactiveOrgRepos = await this.db.repo.findMany({ - where: { - orgId: { - in: inactiveOrgIds - } - } - }); - - if (inactiveOrgIds.length > 0 && inactiveOrgRepos.length > 0) { - console.log(`Garbage collecting ${inactiveOrgs.length} inactive orgs: ${inactiveOrgIds.join(', ')}`); - } - - const reposToDelete = [...reposWithNoConnections, ...inactiveOrgRepos]; - for (const repo of reposToDelete) { - this.logger.info(`Garbage collecting repo: ${repo.id}`); - - // delete cloned repo - const repoPath = getRepoPath(repo, this.ctx); - if(existsSync(repoPath)) { - this.logger.info(`Deleting repo directory ${repoPath}`); - rmSync(repoPath, { recursive: true, force: true }); - } - - // delete shards - const shardPrefix = getShardPrefix(repo.orgId, repo.id); - const files = readdirSync(this.ctx.indexPath).filter(file => file.startsWith(shardPrefix)); - for (const file of files) { - const filePath = `${this.ctx.indexPath}/${file}`; - this.logger.info(`Deleting shard file ${filePath}`); - rmSync(filePath); - } - } - - await this.db.repo.deleteMany({ - where: { - id: { - in: reposToDelete.map(repo => repo.id) - } - } - }); - } - + // TODO: do this better? ex: try using the tokens from all the connections // We can no longer use repo.cloneUrl directly since it doesn't contain the token for security reasons. As a result, we need to // fetch the token here using the connections from the repo. Multiple connections could be referencing this repo, and each @@ -197,7 +160,7 @@ export class RepoManager implements IRepoManager { this.logger.error(`Repo ${repo.id} has no connections`); return; } - + let token: string | undefined; for (const repoConnection of repoConnections) { @@ -205,7 +168,7 @@ export class RepoManager implements IRepoManager { if (connection.connectionType !== 'github' && connection.connectionType !== 'gitlab' && connection.connectionType !== 'gitea') { continue; } - + const config = connection.config as unknown as GithubConnectionConfig | GitlabConnectionConfig | GiteaConnectionConfig; if (config.token) { const tokenResult = await getTokenFromConfig(config.token, connection.orgId, db); @@ -215,31 +178,31 @@ export class RepoManager implements IRepoManager { } } } - + return token; } - + private async syncGitRepository(repo: RepoWithConnections) { let fetchDuration_s: number | undefined = undefined; let cloneDuration_s: number | undefined = undefined; - + const repoPath = getRepoPath(repo, this.ctx); const metadata = repo.metadata as Record; - + if (existsSync(repoPath)) { this.logger.info(`Fetching ${repo.id}...`); - + const { durationMs } = await measure(() => fetchRepository(repoPath, ({ method, stage, progress }) => { //this.logger.info(`git.${method} ${stage} stage ${progress}% complete for ${repo.id}`) })); fetchDuration_s = durationMs / 1000; - + process.stdout.write('\n'); this.logger.info(`Fetched ${repo.name} in ${fetchDuration_s}s`); - + } else { this.logger.info(`Cloning ${repo.id}...`); - + const token = await this.getTokenForRepo(repo, this.db); let cloneUrl = repo.cloneUrl; if (token) { @@ -247,29 +210,29 @@ export class RepoManager implements IRepoManager { url.username = token; cloneUrl = url.toString(); } - + const { durationMs } = await measure(() => cloneRepository(cloneUrl, repoPath, metadata, ({ method, stage, progress }) => { //this.logger.info(`git.${method} ${stage} stage ${progress}% complete for ${repo.id}`) })); cloneDuration_s = durationMs / 1000; - + process.stdout.write('\n'); this.logger.info(`Cloned ${repo.id} in ${cloneDuration_s}s`); } - + this.logger.info(`Indexing ${repo.id}...`); const { durationMs } = await measure(() => indexGitRepository(repo, this.ctx)); const indexDuration_s = durationMs / 1000; this.logger.info(`Indexed ${repo.id} in ${indexDuration_s}s`); - + return { fetchDuration_s, cloneDuration_s, indexDuration_s, } } - - private async runIndexJob(job: Job) { + + private async runIndexJob(job: Job) { this.logger.info(`Running index job (id: ${job.id}) for repo ${job.data.repo.id}`); const repo = job.data.repo as RepoWithConnections; await this.db.repo.update({ @@ -281,11 +244,11 @@ export class RepoManager implements IRepoManager { } }); this.promClient.activeRepoIndexingJobs.inc(); - + let indexDuration_s: number | undefined; let fetchDuration_s: number | undefined; let cloneDuration_s: number | undefined; - + let stats; let attempts = 0; const maxAttempts = 3; @@ -296,27 +259,27 @@ export class RepoManager implements IRepoManager { break; } catch (error) { attempts++; - this.promClient.repoIndexingErrors.inc(); + this.promClient.repoIndexingErrorTotal.inc(); if (attempts === maxAttempts) { this.logger.error(`Failed to sync repository ${repo.id} after ${maxAttempts} attempts. Error: ${error}`); throw error; } - + const sleepDuration = 5000 * Math.pow(2, attempts - 1); this.logger.error(`Failed to sync repository ${repo.id}, attempt ${attempts}/${maxAttempts}. Sleeping for ${sleepDuration / 1000}s... Error: ${error}`); await new Promise(resolve => setTimeout(resolve, sleepDuration)); } } - + indexDuration_s = stats!.indexDuration_s; fetchDuration_s = stats!.fetchDuration_s; cloneDuration_s = stats!.cloneDuration_s; } - private async onIndexJobCompleted(job: Job) { + private async onIndexJobCompleted(job: Job) { this.logger.info(`Repo index job ${job.id} completed`); this.promClient.activeRepoIndexingJobs.dec(); - this.promClient.repoIndexingSuccesses.inc(); + this.promClient.repoIndexingSuccessTotal.inc(); await this.db.repo.update({ where: { @@ -328,13 +291,13 @@ export class RepoManager implements IRepoManager { } }); } - - private async onIndexJobFailed(job: Job | undefined, err: unknown) { + + private async onIndexJobFailed(job: Job | undefined, err: unknown) { this.logger.info(`Repo index job failed (id: ${job?.id ?? 'unknown'}) with error: ${err}`); if (job) { this.promClient.activeRepoIndexingJobs.dec(); - this.promClient.repoIndexingFails.inc(); - + this.promClient.repoIndexingFailTotal.inc(); + await this.db.repo.update({ where: { id: job.data.repo.id, @@ -346,8 +309,143 @@ export class RepoManager implements IRepoManager { } } + /////////////////////////// + // Repo garbage collection + /////////////////////////// + + private async scheduleRepoGarbageCollectionBulk(repos: Repo[]) { + await this.db.$transaction(async (tx) => { + await tx.repo.updateMany({ + where: { id: { in: repos.map(repo => repo.id) } }, + data: { repoIndexingStatus: RepoIndexingStatus.IN_GC_QUEUE } + }); + + await this.gcQueue.addBulk(repos.map(repo => ({ + name: 'repoGarbageCollectionJob', + data: { repo }, + }))); + + this.logger.info(`Added ${repos.length} jobs to gcQueue`); + }); + } + + private async fetchAndScheduleRepoGarbageCollection() { + //////////////////////////////////// + // Get repos with no connections + //////////////////////////////////// + const reposWithNoConnections = await this.db.repo.findMany({ + where: { + repoIndexingStatus: { in: [ + RepoIndexingStatus.INDEXED, // we don't include NEW repos here because they'll be picked up by the index queue (potential race condition) + RepoIndexingStatus.FAILED, + ] }, + connections: { + none: {} + } + }, + }); + if (reposWithNoConnections.length > 0) { + this.logger.info(`Garbage collecting ${reposWithNoConnections.length} repos with no connections: ${reposWithNoConnections.map(repo => repo.id).join(', ')}`); + } + + //////////////////////////////////// + // Get inactive org repos + //////////////////////////////////// + const sevenDaysAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000); + const inactiveOrgs = await this.db.org.findMany({ + where: { + stripeSubscriptionStatus: StripeSubscriptionStatus.INACTIVE, + stripeLastUpdatedAt: { + lt: sevenDaysAgo + } + } + }); + + const inactiveOrgIds = inactiveOrgs.map(org => org.id); + + const inactiveOrgRepos = await this.db.repo.findMany({ + where: { + orgId: { + in: inactiveOrgIds + } + } + }); + + if (inactiveOrgIds.length > 0 && inactiveOrgRepos.length > 0) { + this.logger.info(`Garbage collecting ${inactiveOrgs.length} inactive orgs: ${inactiveOrgIds.join(', ')}`); + } + + const reposToDelete = [...reposWithNoConnections, ...inactiveOrgRepos]; + if (reposToDelete.length > 0) { + await this.scheduleRepoGarbageCollectionBulk(reposToDelete); + } + } + + private async runGarbageCollectionJob(job: Job) { + this.logger.info(`Running garbage collection job (id: ${job.id}) for repo ${job.data.repo.id}`); + this.promClient.activeRepoGarbageCollectionJobs.inc(); + + const repo = job.data.repo as Repo; + await this.db.repo.update({ + where: { + id: repo.id + }, + data: { + repoIndexingStatus: RepoIndexingStatus.GARBAGE_COLLECTING + } + }); + + // delete cloned repo + const repoPath = getRepoPath(repo, this.ctx); + if(existsSync(repoPath)) { + this.logger.info(`Deleting repo directory ${repoPath}`); + rmSync(repoPath, { recursive: true, force: true }); + } + + // delete shards + const shardPrefix = getShardPrefix(repo.orgId, repo.id); + const files = readdirSync(this.ctx.indexPath).filter(file => file.startsWith(shardPrefix)); + for (const file of files) { + const filePath = `${this.ctx.indexPath}/${file}`; + this.logger.info(`Deleting shard file ${filePath}`); + rmSync(filePath); + } + } + + private async onGarbageCollectionJobCompleted(job: Job) { + this.logger.info(`Garbage collection job ${job.id} completed`); + this.promClient.activeRepoGarbageCollectionJobs.dec(); + this.promClient.repoGarbageCollectionSuccessTotal.inc(); + + await this.db.repo.delete({ + where: { + id: job.data.repo.id + } + }); + } + + private async onGarbageCollectionJobFailed(job: Job | undefined, err: unknown) { + this.logger.info(`Garbage collection job failed (id: ${job?.id ?? 'unknown'}) with error: ${err}`); + + if (job) { + this.promClient.activeRepoGarbageCollectionJobs.dec(); + this.promClient.repoGarbageCollectionFailTotal.inc(); + + await this.db.repo.update({ + where: { + id: job.data.repo.id + }, + data: { + repoIndexingStatus: RepoIndexingStatus.GARBAGE_COLLECTION_FAILED + } + }); + } + } + public async dispose() { - this.worker.close(); - this.queue.close(); + this.indexWorker.close(); + this.indexQueue.close(); + this.gcQueue.close(); + this.gcWorker.close(); } } \ No newline at end of file diff --git a/packages/backend/src/types.ts b/packages/backend/src/types.ts index 0c96b10c..bbda1a73 100644 --- a/packages/backend/src/types.ts +++ b/packages/backend/src/types.ts @@ -86,6 +86,10 @@ export type Settings = { * The multiple of the number of CPUs to use for syncing the configuration. */ configSyncConcurrencyMultiple: number; + /** + * The multiple of the number of CPUs to use for garbage collection. + */ + gcConcurrencyMultiple: number; } // @see : https://stackoverflow.com/a/61132308 diff --git a/packages/db/prisma/migrations/20250225194100_add_gc_repo_status/migration.sql b/packages/db/prisma/migrations/20250225194100_add_gc_repo_status/migration.sql new file mode 100644 index 00000000..53b8fa90 --- /dev/null +++ b/packages/db/prisma/migrations/20250225194100_add_gc_repo_status/migration.sql @@ -0,0 +1,11 @@ +-- AlterEnum +-- This migration adds more than one value to an enum. +-- With PostgreSQL versions 11 and earlier, this is not possible +-- in a single migration. This can be worked around by creating +-- multiple migrations, each migration adding only one value to +-- the enum. + + +ALTER TYPE "RepoIndexingStatus" ADD VALUE 'IN_GC_QUEUE'; +ALTER TYPE "RepoIndexingStatus" ADD VALUE 'GARBAGE_COLLECTING'; +ALTER TYPE "RepoIndexingStatus" ADD VALUE 'GARBAGE_COLLECTION_FAILED'; diff --git a/packages/db/prisma/schema.prisma b/packages/db/prisma/schema.prisma index 24d1ceb4..6c0d5fc7 100644 --- a/packages/db/prisma/schema.prisma +++ b/packages/db/prisma/schema.prisma @@ -16,6 +16,9 @@ enum RepoIndexingStatus { INDEXING INDEXED FAILED + IN_GC_QUEUE + GARBAGE_COLLECTING + GARBAGE_COLLECTION_FAILED } enum ConnectionSyncStatus { From abfd00228b6a4f28e7328b2c1a816d49bc80516e Mon Sep 17 00:00:00 2001 From: msukkari Date: Tue, 25 Feb 2025 12:39:07 -0800 Subject: [PATCH 2/7] fix missing switch cases for gc status --- .../connections/[id]/components/repoListItem.tsx | 10 ++++++++++ .../app/[domain]/connections/components/statusIcon.tsx | 3 ++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/packages/web/src/app/[domain]/connections/[id]/components/repoListItem.tsx b/packages/web/src/app/[domain]/connections/[id]/components/repoListItem.tsx index c75e37e5..88518ad3 100644 --- a/packages/web/src/app/[domain]/connections/[id]/components/repoListItem.tsx +++ b/packages/web/src/app/[domain]/connections/[id]/components/repoListItem.tsx @@ -35,6 +35,12 @@ export const RepoListItem = ({ return 'Indexed'; case RepoIndexingStatus.FAILED: return 'Index failed'; + case RepoIndexingStatus.IN_GC_QUEUE: + return 'In garbage collection queue...'; + case RepoIndexingStatus.GARBAGE_COLLECTING: + return 'Garbage collecting...'; + case RepoIndexingStatus.GARBAGE_COLLECTION_FAILED: + return 'Garbage collection failed'; } }, [status]); @@ -85,8 +91,12 @@ const convertIndexingStatus = (status: RepoIndexingStatus) => { case RepoIndexingStatus.IN_INDEX_QUEUE: case RepoIndexingStatus.INDEXING: return 'running'; + case RepoIndexingStatus.IN_GC_QUEUE: + case RepoIndexingStatus.GARBAGE_COLLECTING: + return "garbage-collecting" case RepoIndexingStatus.INDEXED: return 'succeeded'; + case RepoIndexingStatus.GARBAGE_COLLECTION_FAILED: case RepoIndexingStatus.FAILED: return 'failed'; } diff --git a/packages/web/src/app/[domain]/connections/components/statusIcon.tsx b/packages/web/src/app/[domain]/connections/components/statusIcon.tsx index edd4cc91..9ad68d75 100644 --- a/packages/web/src/app/[domain]/connections/components/statusIcon.tsx +++ b/packages/web/src/app/[domain]/connections/components/statusIcon.tsx @@ -3,7 +3,7 @@ import { CircleCheckIcon, CircleXIcon } from "lucide-react"; import { useMemo } from "react"; import { FiLoader } from "react-icons/fi"; -export type Status = 'waiting' | 'running' | 'succeeded' | 'failed'; +export type Status = 'waiting' | 'running' | 'succeeded' | 'failed' | 'garbage-collecting'; export const StatusIcon = ({ status, @@ -12,6 +12,7 @@ export const StatusIcon = ({ const Icon = useMemo(() => { switch (status) { case 'waiting': + case 'garbage-collecting': case 'running': return ; case 'succeeded': From 862c1dfff76581b5a187903cd2acce647959a669 Mon Sep 17 00:00:00 2001 From: msukkari Date: Tue, 25 Feb 2025 12:54:23 -0800 Subject: [PATCH 3/7] style org create form better with new staging domain --- .../app/onboard/components/orgCreateForm.tsx | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/packages/web/src/app/onboard/components/orgCreateForm.tsx b/packages/web/src/app/onboard/components/orgCreateForm.tsx index 33b36df2..69807d76 100644 --- a/packages/web/src/app/onboard/components/orgCreateForm.tsx +++ b/packages/web/src/app/onboard/components/orgCreateForm.tsx @@ -20,7 +20,7 @@ export function OrgCreateForm() { const { toast } = useToast(); const router = useRouter(); const captureEvent = useCaptureEvent(); - + const onboardingFormSchema = z.object({ name: z.string() .min(2, { message: "Organization name must be at least 3 characters long." }) @@ -73,9 +73,9 @@ export function OrgCreateForm() { } return ( - +
- + Organization Domain -
- staging.sourcebot.dev/ - +
+
staging.sourcebot.dev/
+
)} /> - From 35ba9f27f7d6ee3a80cd7714f6a2965b21e239b5 Mon Sep 17 00:00:00 2001 From: msukkari Date: Tue, 25 Feb 2025 13:24:58 -0800 Subject: [PATCH 4/7] change repo rm logic to be async --- packages/backend/src/constants.ts | 2 +- packages/backend/src/repoManager.ts | 16 +++++++++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/packages/backend/src/constants.ts b/packages/backend/src/constants.ts index ab31fbb8..2a50eea4 100644 --- a/packages/backend/src/constants.ts +++ b/packages/backend/src/constants.ts @@ -11,5 +11,5 @@ export const DEFAULT_SETTINGS: Settings = { reindexRepoPollingIntervalMs: 1000, indexConcurrencyMultiple: 3, configSyncConcurrencyMultiple: 3, - gcConcurrencyMultiple: 3, + gcConcurrencyMultiple: 1, } \ No newline at end of file diff --git a/packages/backend/src/repoManager.ts b/packages/backend/src/repoManager.ts index 4b9783c9..206d422c 100644 --- a/packages/backend/src/repoManager.ts +++ b/packages/backend/src/repoManager.ts @@ -6,7 +6,7 @@ import { GithubConnectionConfig, GitlabConnectionConfig, GiteaConnectionConfig } import { AppContext, Settings } from "./types.js"; import { getRepoPath, getTokenFromConfig, measure, getShardPrefix } from "./utils.js"; import { cloneRepository, fetchRepository } from "./git.js"; -import { existsSync, rmSync, readdirSync } from 'fs'; +import { existsSync, rmSync, readdirSync, rm } from 'fs'; import { indexGitRepository } from "./zoekt.js"; import os from 'os'; import { PromClient } from './promClient.js'; @@ -399,7 +399,12 @@ export class RepoManager implements IRepoManager { const repoPath = getRepoPath(repo, this.ctx); if(existsSync(repoPath)) { this.logger.info(`Deleting repo directory ${repoPath}`); - rmSync(repoPath, { recursive: true, force: true }); + await rm(repoPath, { recursive: true, force: true }, (err) => { + if (err) { + this.logger.error(`Failed to delete repo directory ${repoPath}: ${err}`); + throw err; + } + }); } // delete shards @@ -408,7 +413,12 @@ export class RepoManager implements IRepoManager { for (const file of files) { const filePath = `${this.ctx.indexPath}/${file}`; this.logger.info(`Deleting shard file ${filePath}`); - rmSync(filePath); + await rm(filePath, { force: true }, (err) => { + if (err) { + this.logger.error(`Failed to delete shard file ${filePath}: ${err}`); + throw err; + } + }); } } From c16fd8d735bcd679b913b2f4a9d591354c441156 Mon Sep 17 00:00:00 2001 From: msukkari Date: Tue, 25 Feb 2025 13:37:47 -0800 Subject: [PATCH 5/7] simplify repo for inactive org query --- packages/backend/src/repoManager.ts | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/packages/backend/src/repoManager.ts b/packages/backend/src/repoManager.ts index 206d422c..97c492fa 100644 --- a/packages/backend/src/repoManager.ts +++ b/packages/backend/src/repoManager.ts @@ -352,27 +352,19 @@ export class RepoManager implements IRepoManager { // Get inactive org repos //////////////////////////////////// const sevenDaysAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000); - const inactiveOrgs = await this.db.org.findMany({ - where: { - stripeSubscriptionStatus: StripeSubscriptionStatus.INACTIVE, - stripeLastUpdatedAt: { - lt: sevenDaysAgo - } - } - }); - - const inactiveOrgIds = inactiveOrgs.map(org => org.id); - const inactiveOrgRepos = await this.db.repo.findMany({ where: { - orgId: { - in: inactiveOrgIds + org: { + stripeSubscriptionStatus: StripeSubscriptionStatus.INACTIVE, + stripeLastUpdatedAt: { + lt: sevenDaysAgo + } } } }); - if (inactiveOrgIds.length > 0 && inactiveOrgRepos.length > 0) { - this.logger.info(`Garbage collecting ${inactiveOrgs.length} inactive orgs: ${inactiveOrgIds.join(', ')}`); + if (inactiveOrgRepos.length > 0) { + this.logger.info(`Garbage collecting ${inactiveOrgRepos.length} inactive org repos: ${inactiveOrgRepos.map(repo => repo.id).join(', ')}`); } const reposToDelete = [...reposWithNoConnections, ...inactiveOrgRepos]; From 3d46e6cb7b95eb4917707eb38e7ced49e2267651 Mon Sep 17 00:00:00 2001 From: msukkari Date: Tue, 25 Feb 2025 15:02:43 -0800 Subject: [PATCH 6/7] add grace period for garbage collecting repos --- demo-site-config.json | 1 - packages/backend/src/constants.ts | 2 +- packages/backend/src/repoManager.ts | 101 +++++++++++++----------- packages/backend/src/types.ts | 8 +- packages/schemas/src/v2/index.schema.ts | 5 -- packages/schemas/src/v2/index.type.ts | 4 - schemas/v2/index.json | 5 -- 7 files changed, 62 insertions(+), 64 deletions(-) diff --git a/demo-site-config.json b/demo-site-config.json index e23fafc7..ebc3a3b3 100644 --- a/demo-site-config.json +++ b/demo-site-config.json @@ -1,7 +1,6 @@ { "$schema": "./schemas/v2/index.json", "settings": { - "autoDeleteStaleRepos": true, "reindexInterval": 86400000, // 24 hours "resyncInterval": 86400000 // 24 hours }, diff --git a/packages/backend/src/constants.ts b/packages/backend/src/constants.ts index 2a50eea4..16aa6bf2 100644 --- a/packages/backend/src/constants.ts +++ b/packages/backend/src/constants.ts @@ -5,11 +5,11 @@ import { Settings } from "./types.js"; */ export const DEFAULT_SETTINGS: Settings = { maxFileSize: 2 * 1024 * 1024, // 2MB in bytes - autoDeleteStaleRepos: true, reindexIntervalMs: 1000 * 60 * 60, // 1 hour resyncConnectionPollingIntervalMs: 1000, reindexRepoPollingIntervalMs: 1000, indexConcurrencyMultiple: 3, configSyncConcurrencyMultiple: 3, gcConcurrencyMultiple: 1, + gcGracePeriodMs: 10 * 1000, // 10 seconds } \ No newline at end of file diff --git a/packages/backend/src/repoManager.ts b/packages/backend/src/repoManager.ts index 97c492fa..6941532c 100644 --- a/packages/backend/src/repoManager.ts +++ b/packages/backend/src/repoManager.ts @@ -19,7 +19,7 @@ interface IRepoManager { const REPO_INDEXING_QUEUE = 'repoIndexingQueue'; const REPO_GC_QUEUE = 'repoGarbageCollectionQueue'; -type RepoWithConnections = Repo & { connections: (RepoToConnection & { connection: Connection})[] }; +type RepoWithConnections = Repo & { connections: (RepoToConnection & { connection: Connection })[] }; type RepoIndexingPayload = { repo: RepoWithConnections, } @@ -68,7 +68,7 @@ export class RepoManager implements IRepoManager { } public async blockingPollLoop() { - while(true) { + while (true) { await this.fetchAndScheduleRepoIndexing(); await this.fetchAndScheduleRepoGarbageCollection(); @@ -86,7 +86,7 @@ export class RepoManager implements IRepoManager { where: { id: { in: repos.map(repo => repo.id) } }, data: { repoIndexingStatus: RepoIndexingStatus.IN_INDEX_QUEUE } }); - + const reposByOrg = repos.reduce>((acc, repo) => { if (!acc[repo.orgId]) { acc[repo.orgId] = []; @@ -118,7 +118,7 @@ export class RepoManager implements IRepoManager { }); } - + private async fetchAndScheduleRepoIndexing() { const thresholdDate = new Date(Date.now() - this.settings.reindexIntervalMs); const repos = await this.db.repo.findMany({ @@ -142,13 +142,13 @@ export class RepoManager implements IRepoManager { } } }); - + if (repos.length > 0) { await this.scheduleRepoIndexingBulk(repos); } } - + // TODO: do this better? ex: try using the tokens from all the connections // We can no longer use repo.cloneUrl directly since it doesn't contain the token for security reasons. As a result, we need to // fetch the token here using the connections from the repo. Multiple connections could be referencing this repo, and each @@ -160,15 +160,15 @@ export class RepoManager implements IRepoManager { this.logger.error(`Repo ${repo.id} has no connections`); return; } - - + + let token: string | undefined; for (const repoConnection of repoConnections) { const connection = repoConnection.connection; if (connection.connectionType !== 'github' && connection.connectionType !== 'gitlab' && connection.connectionType !== 'gitea') { continue; } - + const config = connection.config as unknown as GithubConnectionConfig | GitlabConnectionConfig | GiteaConnectionConfig; if (config.token) { const tokenResult = await getTokenFromConfig(config.token, connection.orgId, db); @@ -178,31 +178,31 @@ export class RepoManager implements IRepoManager { } } } - + return token; } - + private async syncGitRepository(repo: RepoWithConnections) { let fetchDuration_s: number | undefined = undefined; let cloneDuration_s: number | undefined = undefined; - + const repoPath = getRepoPath(repo, this.ctx); const metadata = repo.metadata as Record; - + if (existsSync(repoPath)) { this.logger.info(`Fetching ${repo.id}...`); - + const { durationMs } = await measure(() => fetchRepository(repoPath, ({ method, stage, progress }) => { //this.logger.info(`git.${method} ${stage} stage ${progress}% complete for ${repo.id}`) })); fetchDuration_s = durationMs / 1000; - + process.stdout.write('\n'); this.logger.info(`Fetched ${repo.name} in ${fetchDuration_s}s`); - + } else { this.logger.info(`Cloning ${repo.id}...`); - + const token = await this.getTokenForRepo(repo, this.db); let cloneUrl = repo.cloneUrl; if (token) { @@ -210,28 +210,28 @@ export class RepoManager implements IRepoManager { url.username = token; cloneUrl = url.toString(); } - + const { durationMs } = await measure(() => cloneRepository(cloneUrl, repoPath, metadata, ({ method, stage, progress }) => { //this.logger.info(`git.${method} ${stage} stage ${progress}% complete for ${repo.id}`) })); cloneDuration_s = durationMs / 1000; - + process.stdout.write('\n'); this.logger.info(`Cloned ${repo.id} in ${cloneDuration_s}s`); } - + this.logger.info(`Indexing ${repo.id}...`); const { durationMs } = await measure(() => indexGitRepository(repo, this.ctx)); const indexDuration_s = durationMs / 1000; this.logger.info(`Indexed ${repo.id} in ${indexDuration_s}s`); - + return { fetchDuration_s, cloneDuration_s, indexDuration_s, } } - + private async runIndexJob(job: Job) { this.logger.info(`Running index job (id: ${job.id}) for repo ${job.data.repo.id}`); const repo = job.data.repo as RepoWithConnections; @@ -244,15 +244,15 @@ export class RepoManager implements IRepoManager { } }); this.promClient.activeRepoIndexingJobs.inc(); - + let indexDuration_s: number | undefined; let fetchDuration_s: number | undefined; let cloneDuration_s: number | undefined; - + let stats; let attempts = 0; const maxAttempts = 3; - + while (attempts < maxAttempts) { try { stats = await this.syncGitRepository(repo); @@ -264,23 +264,23 @@ export class RepoManager implements IRepoManager { this.logger.error(`Failed to sync repository ${repo.id} after ${maxAttempts} attempts. Error: ${error}`); throw error; } - + const sleepDuration = 5000 * Math.pow(2, attempts - 1); this.logger.error(`Failed to sync repository ${repo.id}, attempt ${attempts}/${maxAttempts}. Sleeping for ${sleepDuration / 1000}s... Error: ${error}`); await new Promise(resolve => setTimeout(resolve, sleepDuration)); } } - + indexDuration_s = stats!.indexDuration_s; fetchDuration_s = stats!.fetchDuration_s; cloneDuration_s = stats!.cloneDuration_s; } - + private async onIndexJobCompleted(job: Job) { this.logger.info(`Repo index job ${job.id} completed`); this.promClient.activeRepoIndexingJobs.dec(); this.promClient.repoIndexingSuccessTotal.inc(); - + await this.db.repo.update({ where: { id: job.data.repo.id, @@ -291,13 +291,13 @@ export class RepoManager implements IRepoManager { } }); } - + private async onIndexJobFailed(job: Job | undefined, err: unknown) { this.logger.info(`Repo index job failed (id: ${job?.id ?? 'unknown'}) with error: ${err}`); if (job) { this.promClient.activeRepoIndexingJobs.dec(); this.promClient.repoIndexingFailTotal.inc(); - + await this.db.repo.update({ where: { id: job.data.repo.id, @@ -328,20 +328,29 @@ export class RepoManager implements IRepoManager { this.logger.info(`Added ${repos.length} jobs to gcQueue`); }); } - + private async fetchAndScheduleRepoGarbageCollection() { //////////////////////////////////// // Get repos with no connections //////////////////////////////////// + + + const thresholdDate = new Date(Date.now() - this.settings.gcGracePeriodMs); const reposWithNoConnections = await this.db.repo.findMany({ where: { - repoIndexingStatus: { in: [ - RepoIndexingStatus.INDEXED, // we don't include NEW repos here because they'll be picked up by the index queue (potential race condition) - RepoIndexingStatus.FAILED, - ] }, + repoIndexingStatus: { + in: [ + RepoIndexingStatus.INDEXED, // we don't include NEW repos here because they'll be picked up by the index queue (potential race condition) + RepoIndexingStatus.FAILED, + ] + }, connections: { none: {} - } + }, + OR: [ + { indexedAt: null }, + { indexedAt: { lt: thresholdDate } } + ] }, }); if (reposWithNoConnections.length > 0) { @@ -359,14 +368,18 @@ export class RepoManager implements IRepoManager { stripeLastUpdatedAt: { lt: sevenDaysAgo } - } + }, + OR: [ + { indexedAt: null }, + { indexedAt: { lt: thresholdDate } } + ] } }); - + if (inactiveOrgRepos.length > 0) { this.logger.info(`Garbage collecting ${inactiveOrgRepos.length} inactive org repos: ${inactiveOrgRepos.map(repo => repo.id).join(', ')}`); } - + const reposToDelete = [...reposWithNoConnections, ...inactiveOrgRepos]; if (reposToDelete.length > 0) { await this.scheduleRepoGarbageCollectionBulk(reposToDelete); @@ -389,7 +402,7 @@ export class RepoManager implements IRepoManager { // delete cloned repo const repoPath = getRepoPath(repo, this.ctx); - if(existsSync(repoPath)) { + if (existsSync(repoPath)) { this.logger.info(`Deleting repo directory ${repoPath}`); await rm(repoPath, { recursive: true, force: true }, (err) => { if (err) { @@ -413,12 +426,12 @@ export class RepoManager implements IRepoManager { }); } } - + private async onGarbageCollectionJobCompleted(job: Job) { this.logger.info(`Garbage collection job ${job.id} completed`); this.promClient.activeRepoGarbageCollectionJobs.dec(); this.promClient.repoGarbageCollectionSuccessTotal.inc(); - + await this.db.repo.delete({ where: { id: job.data.repo.id @@ -443,7 +456,7 @@ export class RepoManager implements IRepoManager { }); } } - + public async dispose() { this.indexWorker.close(); this.indexQueue.close(); diff --git a/packages/backend/src/types.ts b/packages/backend/src/types.ts index bbda1a73..4dbc9b6e 100644 --- a/packages/backend/src/types.ts +++ b/packages/backend/src/types.ts @@ -62,10 +62,6 @@ export type Settings = { * The maximum size of a file (in bytes) to be indexed. Files that exceed this maximum will not be inexed. */ maxFileSize: number; - /** - * Automatically delete stale repositories from the index. Defaults to true. - */ - autoDeleteStaleRepos: boolean; /** * The interval (in milliseconds) at which the indexer should re-index all repositories. */ @@ -90,6 +86,10 @@ export type Settings = { * The multiple of the number of CPUs to use for garbage collection. */ gcConcurrencyMultiple: number; + /** + * The grace period (in milliseconds) for garbage collection. Used to prevent deleting shards while they're being loaded + */ + gcGracePeriodMs: number; } // @see : https://stackoverflow.com/a/61132308 diff --git a/packages/schemas/src/v2/index.schema.ts b/packages/schemas/src/v2/index.schema.ts index 6b0b3453..88d220f5 100644 --- a/packages/schemas/src/v2/index.schema.ts +++ b/packages/schemas/src/v2/index.schema.ts @@ -595,11 +595,6 @@ const schema = { "default": 2097152, "minimum": 1 }, - "autoDeleteStaleRepos": { - "type": "boolean", - "description": "Automatically delete stale repositories from the index. Defaults to true.", - "default": true - }, "reindexInterval": { "type": "integer", "description": "The interval (in milliseconds) at which the indexer should re-index all repositories. Repositories are always indexed when first added. Defaults to 1 hour (3600000 milliseconds).", diff --git a/packages/schemas/src/v2/index.type.ts b/packages/schemas/src/v2/index.type.ts index 7120c907..e5b5ddf6 100644 --- a/packages/schemas/src/v2/index.type.ts +++ b/packages/schemas/src/v2/index.type.ts @@ -40,10 +40,6 @@ export interface Settings { * The maximum size of a file (in bytes) to be indexed. Files that exceed this maximum will not be inexed. Defaults to 2MB (2097152 bytes). */ maxFileSize?: number; - /** - * Automatically delete stale repositories from the index. Defaults to true. - */ - autoDeleteStaleRepos?: boolean; /** * The interval (in milliseconds) at which the indexer should re-index all repositories. Repositories are always indexed when first added. Defaults to 1 hour (3600000 milliseconds). */ diff --git a/schemas/v2/index.json b/schemas/v2/index.json index ee70c0c2..704d47b6 100644 --- a/schemas/v2/index.json +++ b/schemas/v2/index.json @@ -570,11 +570,6 @@ "default": 2097152, "minimum": 1 }, - "autoDeleteStaleRepos": { - "type": "boolean", - "description": "Automatically delete stale repositories from the index. Defaults to true.", - "default": true - }, "reindexInterval": { "type": "integer", "description": "The interval (in milliseconds) at which the indexer should re-index all repositories. Repositories are always indexed when first added. Defaults to 1 hour (3600000 milliseconds).", From b26645d02bc01b299e5b1d97f4def37a2bc26aa4 Mon Sep 17 00:00:00 2001 From: msukkari Date: Tue, 25 Feb 2025 15:48:53 -0800 Subject: [PATCH 7/7] make prom scrape interval 500ms --- grafana.alloy | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/grafana.alloy b/grafana.alloy index 2ce070fc..b8b6c852 100644 --- a/grafana.alloy +++ b/grafana.alloy @@ -9,7 +9,8 @@ prometheus.scrape "local_app" { ] metrics_path = "/metrics" - scrape_interval = "10s" + scrape_timeout = "500ms" + scrape_interval = "500ms" job_name = sys.env("GRAFANA_ENVIRONMENT")