diff --git a/Dockerfile b/Dockerfile index 1f2425b4..2871ed3a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -80,6 +80,7 @@ ENV DATA_CACHE_DIR=$DATA_DIR/.sourcebot ENV DB_DATA_DIR=$DATA_CACHE_DIR/db ENV DB_NAME=sourcebot ENV DATABASE_URL="postgresql://postgres@localhost:5432/sourcebot" +ENV REDIS_URL="redis://localhost:6379" ENV SRC_TENANT_ENFORCEMENT_MODE=strict ARG SOURCEBOT_VERSION=unknown diff --git a/packages/backend/src/connectionManager.ts b/packages/backend/src/connectionManager.ts index f4c4ad33..0990c6c6 100644 --- a/packages/backend/src/connectionManager.ts +++ b/packages/backend/src/connectionManager.ts @@ -1,4 +1,4 @@ -import { Connection, ConnectionSyncStatus, PrismaClient, Prisma } from "@sourcebot/db"; +import { Connection, ConnectionSyncStatus, PrismaClient, Prisma, Repo } from "@sourcebot/db"; import { Job, Queue, Worker } from 'bullmq'; import { Settings, WithRequired } from "./types.js"; import { ConnectionConfig } from "@sourcebot/schemas/v3/connection.type"; @@ -6,7 +6,6 @@ import { createLogger } from "./logger.js"; import os from 'os'; import { Redis } from 'ioredis'; import { RepoData, compileGithubConfig, compileGitlabConfig, compileGiteaConfig, compileGerritConfig } from "./repoCompileUtils.js"; -import { CONFIG_REPO_UPSERT_TIMEOUT_MS } from "./environment.js"; interface IConnectionManager { scheduleConnectionSync: (connection: Connection) => Promise; @@ -23,8 +22,8 @@ type JobPayload = { }; export class ConnectionManager implements IConnectionManager { - private queue = new Queue(QUEUE_NAME); private worker: Worker; + private queue: Queue; private logger = createLogger('ConnectionManager'); constructor( @@ -32,6 +31,9 @@ export class ConnectionManager implements IConnectionManager { private settings: Settings, redis: Redis, ) { + this.queue = new Queue(QUEUE_NAME, { + connection: redis, + }); const numCores = os.cpus().length; this.worker = new Worker(QUEUE_NAME, this.runSyncJob.bind(this), { connection: redis, @@ -113,6 +115,7 @@ export class ConnectionManager implements IConnectionManager { // appear in the repoData array above, and so the RepoToConnection record won't be re-created. // Repos that have no RepoToConnection records are considered orphaned and can be deleted. await this.db.$transaction(async (tx) => { + const deleteStart = performance.now(); await tx.connection.update({ where: { id: job.data.connectionId, @@ -123,21 +126,124 @@ export class ConnectionManager implements IConnectionManager { } } }); + const deleteDuration = performance.now() - deleteStart; + this.logger.info(`Deleted all RepoToConnection records for connection ${job.data.connectionId} in ${deleteDuration}ms`); - await Promise.all(repoData.map((repo) => { - return tx.repo.upsert({ - where: { - external_id_external_codeHostUrl: { - external_id: repo.external_id, - external_codeHostUrl: repo.external_codeHostUrl, - }, + const existingRepos: Repo[] = await tx.repo.findMany({ + where: { + external_id: { + in: repoData.map(repo => repo.external_id), }, - create: repo, - update: repo as Prisma.RepoUpdateInput, + external_codeHostUrl: { + in: repoData.map(repo => repo.external_codeHostUrl), + }, + }, + }); + const existingRepoKeys = existingRepos.map(repo => `${repo.external_id}-${repo.external_codeHostUrl}`); + + const existingRepoData = repoData.filter(repo => existingRepoKeys.includes(`${repo.external_id}-${repo.external_codeHostUrl}`)); + const [toCreate, toUpdate] = repoData.reduce<[Prisma.RepoCreateManyInput[], Prisma.RepoUpdateManyMutationInput[]]>(([toCreate, toUpdate], repo) => { + const existingRepo = existingRepoData.find((r: RepoData) => r.external_id === repo.external_id && r.external_codeHostUrl === repo.external_codeHostUrl); + if (existingRepo) { + // @note: make sure to reflect any changes here in the raw sql update below + const updateRepo: Prisma.RepoUpdateManyMutationInput = { + name: repo.name, + cloneUrl: repo.cloneUrl, + imageUrl: repo.imageUrl, + isFork: repo.isFork, + isArchived: repo.isArchived, + metadata: repo.metadata, + external_id: repo.external_id, + external_codeHostType: repo.external_codeHostType, + external_codeHostUrl: repo.external_codeHostUrl, + } + toUpdate.push(updateRepo); + } else { + const createRepo: Prisma.RepoCreateManyInput = { + name: repo.name, + cloneUrl: repo.cloneUrl, + imageUrl: repo.imageUrl, + isFork: repo.isFork, + isArchived: repo.isArchived, + metadata: repo.metadata, + orgId: job.data.orgId, + external_id: repo.external_id, + external_codeHostType: repo.external_codeHostType, + external_codeHostUrl: repo.external_codeHostUrl, + } + toCreate.push(createRepo); + } + return [toCreate, toUpdate]; + }, [[], []]); + + if (toCreate.length > 0) { + const createStart = performance.now(); + const createdRepos = await tx.repo.createManyAndReturn({ + data: toCreate, }); - })); - }, { timeout: parseInt(CONFIG_REPO_UPSERT_TIMEOUT_MS) }); + await tx.repoToConnection.createMany({ + data: createdRepos.map(repo => ({ + repoId: repo.id, + connectionId: job.data.connectionId, + })), + }); + + const createDuration = performance.now() - createStart; + this.logger.info(`Created ${toCreate.length} repos in ${createDuration}ms`); + } + + if (toUpdate.length > 0) { + const updateStart = performance.now(); + + // Build values string for update query + const updateValues = toUpdate.map(repo => `( + '${repo.name}', + '${repo.cloneUrl}', + ${repo.imageUrl ? `'${repo.imageUrl}'` : 'NULL'}, + ${repo.isFork}, + ${repo.isArchived}, + '${JSON.stringify(repo.metadata)}'::jsonb, + '${repo.external_id}', + '${repo.external_codeHostType}', + '${repo.external_codeHostUrl}' + )`).join(','); + + // Update repos and get their IDs in one quercy + const updateSql = ` + WITH updated AS ( + UPDATE "Repo" r + SET + name = v.name, + "cloneUrl" = v.clone_url, + "imageUrl" = v.image_url, + "isFork" = v.is_fork, + "isArchived" = v.is_archived, + metadata = v.metadata, + "updatedAt" = NOW() + FROM ( + VALUES ${updateValues} + ) AS v(name, clone_url, image_url, is_fork, is_archived, metadata, external_id, external_code_host_type, external_code_host_url) + WHERE r.external_id = v.external_id + AND r."external_codeHostUrl" = v.external_code_host_url + RETURNING r.id + ) + SELECT id FROM updated + `; + const updatedRepoIds = await tx.$queryRawUnsafe<{id: number}[]>(updateSql); + + // Insert repo-connection mappings + const createConnectionSql = ` + INSERT INTO "RepoToConnection" ("repoId", "connectionId", "addedAt") + SELECT id, ${job.data.connectionId}, NOW() + FROM unnest(ARRAY[${updatedRepoIds.map(r => r.id).join(',')}]) AS id + `; + await tx.$executeRawUnsafe(createConnectionSql); + + const updateDuration = performance.now() - updateStart; + this.logger.info(`Updated ${toUpdate.length} repos in ${updateDuration}ms`); + } + }); } diff --git a/packages/backend/src/environment.ts b/packages/backend/src/environment.ts index c7d8f8bb..e674c027 100644 --- a/packages/backend/src/environment.ts +++ b/packages/backend/src/environment.ts @@ -35,6 +35,5 @@ export const FALLBACK_GITHUB_TOKEN = getEnv(process.env.FALLBACK_GITHUB_TOKEN); export const FALLBACK_GITLAB_TOKEN = getEnv(process.env.FALLBACK_GITLAB_TOKEN); export const FALLBACK_GITEA_TOKEN = getEnv(process.env.FALLBACK_GITEA_TOKEN); -export const CONFIG_REPO_UPSERT_TIMEOUT_MS = getEnv(process.env.CONFIG_REPO_UPSERT_TIMEOUT_MS, '15000')!; - -export const INDEX_CONCURRENCY_MULTIPLE = getEnv(process.env.INDEX_CONCURRENCY_MULTIPLE); \ No newline at end of file +export const INDEX_CONCURRENCY_MULTIPLE = getEnv(process.env.INDEX_CONCURRENCY_MULTIPLE); +export const REDIS_URL = getEnv(process.env.REDIS_URL, 'redis://localhost:6379')!; diff --git a/packages/backend/src/main.ts b/packages/backend/src/main.ts index 7d2bd601..a8a05bc3 100644 --- a/packages/backend/src/main.ts +++ b/packages/backend/src/main.ts @@ -5,14 +5,12 @@ import { DEFAULT_SETTINGS } from './constants.js'; import { Redis } from 'ioredis'; import { ConnectionManager } from './connectionManager.js'; import { RepoManager } from './repoManager.js'; -import { INDEX_CONCURRENCY_MULTIPLE } from './environment.js'; +import { INDEX_CONCURRENCY_MULTIPLE, REDIS_URL } from './environment.js'; const logger = createLogger('main'); export const main = async (db: PrismaClient, context: AppContext) => { - const redis = new Redis({ - host: 'localhost', - port: 6379, + const redis = new Redis(REDIS_URL, { maxRetriesPerRequest: null }); redis.ping().then(() => { diff --git a/packages/backend/src/repoManager.ts b/packages/backend/src/repoManager.ts index 0e3b2c27..34d438df 100644 --- a/packages/backend/src/repoManager.ts +++ b/packages/backend/src/repoManager.ts @@ -13,7 +13,7 @@ import os from 'os'; interface IRepoManager { blockingPollLoop: () => void; - scheduleRepoIndexing: (repo: RepoWithConnections) => Promise; + scheduleRepoIndexingBulk: (repos: RepoWithConnections[]) => Promise; dispose: () => void; } @@ -25,8 +25,8 @@ type JobPayload = { } export class RepoManager implements IRepoManager { - private queue = new Queue(QUEUE_NAME); private worker: Worker; + private queue: Queue; private logger = createLogger('RepoManager'); constructor( @@ -35,6 +35,9 @@ export class RepoManager implements IRepoManager { redis: Redis, private ctx: AppContext, ) { + this.queue = new Queue(QUEUE_NAME, { + connection: redis, + }); const numCores = os.cpus().length; this.worker = new Worker(QUEUE_NAME, this.runIndexJob.bind(this), { connection: redis, @@ -46,26 +49,48 @@ export class RepoManager implements IRepoManager { public async blockingPollLoop() { while(true) { - this.fetchAndScheduleRepoIndexing(); - this.garbageCollectRepo(); + await this.fetchAndScheduleRepoIndexing(); + await this.garbageCollectRepo(); await new Promise(resolve => setTimeout(resolve, this.settings.reindexRepoPollingIntervalMs)); } } - public async scheduleRepoIndexing(repo: RepoWithConnections) { + public async scheduleRepoIndexingBulk(repos: RepoWithConnections[]) { await this.db.$transaction(async (tx) => { - await tx.repo.update({ - where: { id: repo.id }, - data: { repoIndexingStatus: RepoIndexingStatus.IN_INDEX_QUEUE }, + await tx.repo.updateMany({ + where: { id: { in: repos.map(repo => repo.id) } }, + data: { repoIndexingStatus: RepoIndexingStatus.IN_INDEX_QUEUE } }); + + const reposByOrg = repos.reduce>((acc, repo) => { + if (!acc[repo.orgId]) { + acc[repo.orgId] = []; + } + acc[repo.orgId].push(repo); + return acc; + }, {}); + + for (const orgId in reposByOrg) { + const orgRepos = reposByOrg[orgId]; + // Set priority based on number of repos (more repos = lower priority) + // This helps prevent large orgs from overwhelming the queue + const priority = Math.min(Math.ceil(orgRepos.length / 10), 2097152); + + await this.queue.addBulk(orgRepos.map(repo => ({ + name: 'repoIndexJob', + data: { repo }, + opts: { + priority: priority + } + }))); + + this.logger.info(`Added ${orgRepos.length} jobs to queue for org ${orgId} with priority ${priority}`); + } + - await this.queue.add('repoIndexJob', { - repo - }); - this.logger.info(`Added job to queue for repo ${repo.id}`); }).catch((err: unknown) => { - this.logger.error(`Failed to add job to queue for repo ${repo.id}: ${err}`); + this.logger.error(`Failed to add jobs to queue for repos ${repos.map(repo => repo.id).join(', ')}: ${err}`); }); } @@ -91,9 +116,9 @@ export class RepoManager implements IRepoManager { } }); - for (const repo of repos) { - await this.scheduleRepoIndexing(repo); - } + if (repos.length > 0) { + await this.scheduleRepoIndexingBulk(repos); + } } private async garbageCollectRepo() {