diff --git a/packages/backend/src/connectionManager.ts b/packages/backend/src/connectionManager.ts index 456c5d15..4fcc4113 100644 --- a/packages/backend/src/connectionManager.ts +++ b/packages/backend/src/connectionManager.ts @@ -1,6 +1,6 @@ import { Connection, ConnectionSyncStatus, PrismaClient, Prisma } from "@sourcebot/db"; import { Job, Queue, Worker } from 'bullmq'; -import { AppContext, Settings, WithRequired } from "./types.js"; +import { Settings, WithRequired } from "./types.js"; import { ConnectionConfig } from "@sourcebot/schemas/v3/connection.type"; import { createLogger } from "./logger.js"; import os from 'os'; @@ -10,6 +10,7 @@ import { getGitHubReposFromConfig } from "./github.js"; interface IConnectionManager { scheduleConnectionSync: (connection: Connection) => Promise; + registerPollingCallback: () => void; dispose: () => void; } @@ -28,14 +29,13 @@ export class ConnectionManager implements IConnectionManager { constructor( private db: PrismaClient, - settings: Settings, + private settings: Settings, redis: Redis, - private context: AppContext, ) { const numCores = os.cpus().length; this.worker = new Worker(QUEUE_NAME, this.runSyncJob.bind(this), { connection: redis, - concurrency: numCores * settings.configSyncConcurrencyMultiple, + concurrency: numCores * this.settings.configSyncConcurrencyMultiple, }); this.worker.on('completed', this.onSyncJobCompleted.bind(this)); this.worker.on('failed', this.onSyncJobFailed.bind(this)); @@ -61,6 +61,19 @@ export class ConnectionManager implements IConnectionManager { }); } + public async registerPollingCallback() { + setInterval(async () => { + const connections = await this.db.connection.findMany({ + where: { + syncStatus: ConnectionSyncStatus.SYNC_NEEDED, + } + }); + for (const connection of connections) { + await this.scheduleConnectionSync(connection); + } + }, this.settings.resyncConnectionPollingIntervalMs); + } + private async runSyncJob(job: Job) { const { config, orgId } = job.data; // @note: We aren't actually doing anything with this atm. diff --git a/packages/backend/src/constants.ts b/packages/backend/src/constants.ts index 407ec462..bbc5e2eb 100644 --- a/packages/backend/src/constants.ts +++ b/packages/backend/src/constants.ts @@ -8,6 +8,7 @@ export const DEFAULT_SETTINGS: Settings = { autoDeleteStaleRepos: true, reindexIntervalMs: 1000 * 60, resyncConnectionPollingIntervalMs: 1000, + reindexRepoPollingInternvalMs: 1000, indexConcurrencyMultiple: 3, configSyncConcurrencyMultiple: 3, } \ No newline at end of file diff --git a/packages/backend/src/main.ts b/packages/backend/src/main.ts index d74e6d93..c6245178 100644 --- a/packages/backend/src/main.ts +++ b/packages/backend/src/main.ts @@ -1,120 +1,14 @@ -import { ConnectionSyncStatus, PrismaClient, Repo, RepoIndexingStatus, RepoToConnection, Connection } from '@sourcebot/db'; -import { existsSync } from 'fs'; -import { cloneRepository, fetchRepository } from "./git.js"; +import { PrismaClient } from '@sourcebot/db'; import { createLogger } from "./logger.js"; -import { captureEvent } from "./posthog.js"; import { AppContext } from "./types.js"; -import { getRepoPath, getTokenFromConfig, measure } from "./utils.js"; -import { indexGitRepository } from "./zoekt.js"; import { DEFAULT_SETTINGS } from './constants.js'; -import { Queue, Worker, Job } from 'bullmq'; import { Redis } from 'ioredis'; -import * as os from 'os'; import { ConnectionManager } from './connectionManager.js'; -import { ConnectionConfig } from '@sourcebot/schemas/v3/connection.type'; +import { RepoManager } from './repoManager.js'; const logger = createLogger('main'); -type RepoWithConnections = Repo & { connections: (RepoToConnection & { connection: Connection})[] }; - -// TODO: do this better? ex: try using the tokens from all the connections -// We can no longer use repo.cloneUrl directly since it doesn't contain the token for security reasons. As a result, we need to -// fetch the token here using the connections from the repo. Multiple connections could be referencing this repo, and each -// may have their own token. This method will just pick the first connection that has a token (if one exists) and uses that. This -// may technically cause syncing to fail if that connection's token just so happens to not have access to the repo it's referrencing. -const getTokenForRepo = async (repo: RepoWithConnections, db: PrismaClient) => { - const repoConnections = repo.connections; - if (repoConnections.length === 0) { - logger.error(`Repo ${repo.id} has no connections`); - return; - } - - let token: string | undefined; - for (const repoConnection of repoConnections) { - const connection = repoConnection.connection; - const config = connection.config as unknown as ConnectionConfig; - if (config.token) { - token = await getTokenFromConfig(config.token, connection.orgId, db); - if (token) { - break; - } - } - } - - return token; -} - -const syncGitRepository = async (repo: RepoWithConnections, ctx: AppContext, db: PrismaClient) => { - let fetchDuration_s: number | undefined = undefined; - let cloneDuration_s: number | undefined = undefined; - - const repoPath = getRepoPath(repo, ctx); - const metadata = repo.metadata as Record; - - if (existsSync(repoPath)) { - logger.info(`Fetching ${repo.id}...`); - - const { durationMs } = await measure(() => fetchRepository(repoPath, ({ method, stage, progress }) => { - logger.info(`git.${method} ${stage} stage ${progress}% complete for ${repo.id}`) - })); - fetchDuration_s = durationMs / 1000; - - process.stdout.write('\n'); - logger.info(`Fetched ${repo.name} in ${fetchDuration_s}s`); - - } else { - logger.info(`Cloning ${repo.id}...`); - - const token = await getTokenForRepo(repo, db); - let cloneUrl = repo.cloneUrl; - if (token) { - const url = new URL(cloneUrl); - url.username = token; - cloneUrl = url.toString(); - } - - const { durationMs } = await measure(() => cloneRepository(cloneUrl, repoPath, metadata, ({ method, stage, progress }) => { - logger.info(`git.${method} ${stage} stage ${progress}% complete for ${repo.id}`) - })); - cloneDuration_s = durationMs / 1000; - - process.stdout.write('\n'); - logger.info(`Cloned ${repo.id} in ${cloneDuration_s}s`); - } - - logger.info(`Indexing ${repo.id}...`); - const { durationMs } = await measure(() => indexGitRepository(repo, ctx)); - const indexDuration_s = durationMs / 1000; - logger.info(`Indexed ${repo.id} in ${indexDuration_s}s`); - - return { - fetchDuration_s, - cloneDuration_s, - indexDuration_s, - } -} - -async function addReposToQueue(db: PrismaClient, queue: Queue, repos: Repo[]) { - for (const repo of repos) { - await db.$transaction(async (tx) => { - await tx.repo.update({ - where: { id: repo.id }, - data: { repoIndexingStatus: RepoIndexingStatus.IN_INDEX_QUEUE }, - }); - - // Add the job to the queue - await queue.add('indexJob', repo); - logger.info(`Added job to queue for repo ${repo.id}`); - }).catch((err: unknown) => { - logger.error(`Failed to add job to queue for repo ${repo.id}: ${err}`); - }); - } -} - export const main = async (db: PrismaClient, context: AppContext) => { - ///////////////////////////// - // Init Redis - ///////////////////////////// const redis = new Redis({ host: 'localhost', port: 6379, @@ -128,99 +22,9 @@ export const main = async (db: PrismaClient, context: AppContext) => { process.exit(1); }); - const connectionManager = new ConnectionManager(db, DEFAULT_SETTINGS, redis, context); - setInterval(async () => { - const connections = await db.connection.findMany({ - where: { - syncStatus: ConnectionSyncStatus.SYNC_NEEDED, - } - }); - for (const connection of connections) { - await connectionManager.scheduleConnectionSync(connection); - } - }, DEFAULT_SETTINGS.resyncConnectionPollingIntervalMs); - - ///////////////////////// - // Setup repo indexing - ///////////////////////// - const indexQueue = new Queue('indexQueue'); - - const numCores = os.cpus().length; - const numWorkers = numCores * DEFAULT_SETTINGS.indexConcurrencyMultiple; - logger.info(`Detected ${numCores} cores. Setting repo index max concurrency to ${numWorkers}`); - const worker = new Worker('indexQueue', async (job: Job) => { - const repo = job.data as RepoWithConnections; - - let indexDuration_s: number | undefined; - let fetchDuration_s: number | undefined; - let cloneDuration_s: number | undefined; - - const stats = await syncGitRepository(repo, context, db); - indexDuration_s = stats.indexDuration_s; - fetchDuration_s = stats.fetchDuration_s; - cloneDuration_s = stats.cloneDuration_s; - - captureEvent('repo_synced', { - vcs: 'git', - codeHost: repo.external_codeHostType, - indexDuration_s, - fetchDuration_s, - cloneDuration_s, - }); - - await db.repo.update({ - where: { - id: repo.id, - }, - data: { - indexedAt: new Date(), - repoIndexingStatus: RepoIndexingStatus.INDEXED, - } - }); - }, { connection: redis, concurrency: numWorkers }); - - worker.on('completed', (job: Job) => { - logger.info(`Job ${job.id} completed`); - }); - worker.on('failed', async (job: Job | undefined, err: unknown) => { - logger.info(`Job failed with error: ${err}`); - if (job) { - await db.repo.update({ - where: { - id: job.data.id, - }, - data: { - repoIndexingStatus: RepoIndexingStatus.FAILED, - } - }) - } - }); - - // Repo indexing loop - while (true) { - const thresholdDate = new Date(Date.now() - DEFAULT_SETTINGS.reindexIntervalMs); - const repos = await db.repo.findMany({ - where: { - repoIndexingStatus: { - notIn: [RepoIndexingStatus.IN_INDEX_QUEUE, RepoIndexingStatus.FAILED] - }, - OR: [ - { indexedAt: null }, - { indexedAt: { lt: thresholdDate } }, - { repoIndexingStatus: RepoIndexingStatus.NEW } - ] - }, - include: { - connections: { - include: { - connection: true - } - } - } - }); - addReposToQueue(db, indexQueue, repos); - + const connectionManager = new ConnectionManager(db, DEFAULT_SETTINGS, redis); + connectionManager.registerPollingCallback(); - await new Promise(resolve => setTimeout(resolve, 1000)); - } + const repoManager = new RepoManager(db, DEFAULT_SETTINGS, redis, context); + repoManager.blockingPollLoop(); } diff --git a/packages/backend/src/repoManager.ts b/packages/backend/src/repoManager.ts new file mode 100644 index 00000000..0012b426 --- /dev/null +++ b/packages/backend/src/repoManager.ts @@ -0,0 +1,276 @@ +import { Job, Queue, Worker } from 'bullmq'; +import { Redis } from 'ioredis'; +import { createLogger } from "./logger.js"; +import { Connection, PrismaClient, Repo, RepoToConnection, RepoIndexingStatus } from "@sourcebot/db"; +import { ConnectionConfig } from '@sourcebot/schemas/v3/connection.type'; +import { AppContext, Settings } from "./types.js"; +import { captureEvent } from "./posthog.js"; +import { getRepoPath, getTokenFromConfig, measure, getShardPrefix } from "./utils.js"; +import { cloneRepository, fetchRepository } from "./git.js"; +import { existsSync, rmSync, readdirSync } from 'fs'; +import { indexGitRepository } from "./zoekt.js"; +import os from 'os'; + +interface IRepoManager { + blockingPollLoop: () => void; + scheduleRepoIndexing: (repo: RepoWithConnections) => Promise; + dispose: () => void; +} + +const QUEUE_NAME = 'repoIndexingQueue'; + +type RepoWithConnections = Repo & { connections: (RepoToConnection & { connection: Connection})[] }; +type JobPayload = { + repo: RepoWithConnections, +} + +export class RepoManager implements IRepoManager { + private queue = new Queue(QUEUE_NAME); + private worker: Worker; + private logger = createLogger('RepoManager'); + + constructor( + private db: PrismaClient, + private settings: Settings, + redis: Redis, + private ctx: AppContext, + ) { + const numCores = os.cpus().length; + this.worker = new Worker(QUEUE_NAME, this.runIndexJob.bind(this), { + connection: redis, + concurrency: numCores * this.settings.indexConcurrencyMultiple, + }); + this.worker.on('completed', this.onIndexJobCompleted.bind(this)); + this.worker.on('failed', this.onIndexJobFailed.bind(this)); + } + + public async blockingPollLoop() { + while(true) { + this.fetchAndScheduleRepoIndexing(); + this.garbageCollectRepo(); + + await new Promise(resolve => setTimeout(resolve, this.settings.reindexRepoPollingInternvalMs)); + } + } + + public async scheduleRepoIndexing(repo: RepoWithConnections) { + await this.db.$transaction(async (tx) => { + await tx.repo.update({ + where: { id: repo.id }, + data: { repoIndexingStatus: RepoIndexingStatus.IN_INDEX_QUEUE }, + }); + + await this.queue.add('repoIndexJob', { + repo + }); + this.logger.info(`Added job to queue for repo ${repo.id}`); + }).catch((err: unknown) => { + this.logger.error(`Failed to add job to queue for repo ${repo.id}: ${err}`); + }); + } + + private async fetchAndScheduleRepoIndexing() { + const thresholdDate = new Date(Date.now() - this.settings.reindexIntervalMs); + const repos = await this.db.repo.findMany({ + where: { + repoIndexingStatus: { + notIn: [RepoIndexingStatus.IN_INDEX_QUEUE, RepoIndexingStatus.FAILED] + }, + OR: [ + { indexedAt: null }, + { indexedAt: { lt: thresholdDate } }, + { repoIndexingStatus: RepoIndexingStatus.NEW } + ] + }, + include: { + connections: { + include: { + connection: true + } + } + } + }); + + for (const repo of repos) { + await this.scheduleRepoIndexing(repo); + } + } + + private async garbageCollectRepo() { + const reposWithNoConnections = await this.db.repo.findMany({ + where: { + repoIndexingStatus: { notIn: [RepoIndexingStatus.IN_INDEX_QUEUE, RepoIndexingStatus.INDEXING] }, // we let the job finish for now so we don't need to worry about cancelling + connections: { + none: {} + } + } + }); + + for (const repo of reposWithNoConnections) { + this.logger.info(`Garbage collecting repo with no connections: ${repo.id}`); + + // delete cloned repo + const repoPath = getRepoPath(repo, this.ctx); + if(existsSync(repoPath)) { + this.logger.info(`Deleting repo directory ${repoPath}`); + rmSync(repoPath, { recursive: true, force: true }); + } + + // delete shards + const shardPrefix = getShardPrefix(repo.orgId, repo.id); + const files = readdirSync(this.ctx.indexPath).filter(file => file.startsWith(shardPrefix)); + for (const file of files) { + const filePath = `${this.ctx.indexPath}/${file}`; + this.logger.info(`Deleting shard file ${filePath}`); + rmSync(filePath); + } + } + + await this.db.repo.deleteMany({ + where: { + id: { + in: reposWithNoConnections.map(repo => repo.id) + } + } + }); + } + + // TODO: do this better? ex: try using the tokens from all the connections + // We can no longer use repo.cloneUrl directly since it doesn't contain the token for security reasons. As a result, we need to + // fetch the token here using the connections from the repo. Multiple connections could be referencing this repo, and each + // may have their own token. This method will just pick the first connection that has a token (if one exists) and uses that. This + // may technically cause syncing to fail if that connection's token just so happens to not have access to the repo it's referrencing. + private async getTokenForRepo(repo: RepoWithConnections, db: PrismaClient) { + const repoConnections = repo.connections; + if (repoConnections.length === 0) { + this.logger.error(`Repo ${repo.id} has no connections`); + return; + } + + let token: string | undefined; + for (const repoConnection of repoConnections) { + const connection = repoConnection.connection; + const config = connection.config as unknown as ConnectionConfig; + if (config.token) { + token = await getTokenFromConfig(config.token, connection.orgId, db); + if (token) { + break; + } + } + } + + return token; + } + + private async syncGitRepository(repo: RepoWithConnections) { + let fetchDuration_s: number | undefined = undefined; + let cloneDuration_s: number | undefined = undefined; + + const repoPath = getRepoPath(repo, this.ctx); + const metadata = repo.metadata as Record; + + if (existsSync(repoPath)) { + this.logger.info(`Fetching ${repo.id}...`); + + const { durationMs } = await measure(() => fetchRepository(repoPath, ({ method, stage, progress }) => { + this.logger.info(`git.${method} ${stage} stage ${progress}% complete for ${repo.id}`) + })); + fetchDuration_s = durationMs / 1000; + + process.stdout.write('\n'); + this.logger.info(`Fetched ${repo.name} in ${fetchDuration_s}s`); + + } else { + this.logger.info(`Cloning ${repo.id}...`); + + const token = await this.getTokenForRepo(repo, this.db); + let cloneUrl = repo.cloneUrl; + if (token) { + const url = new URL(cloneUrl); + url.username = token; + cloneUrl = url.toString(); + } + + const { durationMs } = await measure(() => cloneRepository(cloneUrl, repoPath, metadata, ({ method, stage, progress }) => { + this.logger.info(`git.${method} ${stage} stage ${progress}% complete for ${repo.id}`) + })); + cloneDuration_s = durationMs / 1000; + + process.stdout.write('\n'); + this.logger.info(`Cloned ${repo.id} in ${cloneDuration_s}s`); + } + + this.logger.info(`Indexing ${repo.id}...`); + const { durationMs } = await measure(() => indexGitRepository(repo, this.ctx)); + const indexDuration_s = durationMs / 1000; + this.logger.info(`Indexed ${repo.id} in ${indexDuration_s}s`); + + return { + fetchDuration_s, + cloneDuration_s, + indexDuration_s, + } + } + + private async runIndexJob(job: Job) { + const repo = job.data.repo as RepoWithConnections; + await this.db.repo.update({ + where: { + id: repo.id, + }, + data: { + repoIndexingStatus: RepoIndexingStatus.INDEXING, + } + }); + + let indexDuration_s: number | undefined; + let fetchDuration_s: number | undefined; + let cloneDuration_s: number | undefined; + + const stats = await this.syncGitRepository(repo); + indexDuration_s = stats.indexDuration_s; + fetchDuration_s = stats.fetchDuration_s; + cloneDuration_s = stats.cloneDuration_s; + + captureEvent('repo_synced', { + vcs: 'git', + codeHost: repo.external_codeHostType, + indexDuration_s, + fetchDuration_s, + cloneDuration_s, + }); + } + + private async onIndexJobCompleted(job: Job) { + this.logger.info(`Repo index job ${job.id} completed`); + + await this.db.repo.update({ + where: { + id: job.data.repo.id, + }, + data: { + indexedAt: new Date(), + repoIndexingStatus: RepoIndexingStatus.INDEXED, + } + }); + } + + private async onIndexJobFailed(job: Job | undefined, err: unknown) { + this.logger.info(`Repo index job failed with error: ${err}`); + if (job) { + await this.db.repo.update({ + where: { + id: job.data.repo.id, + }, + data: { + repoIndexingStatus: RepoIndexingStatus.FAILED, + } + }) + } + } + + public async dispose() { + this.worker.close(); + this.queue.close(); + } +} \ No newline at end of file diff --git a/packages/backend/src/types.ts b/packages/backend/src/types.ts index 71d8fff8..ce44b5dc 100644 --- a/packages/backend/src/types.ts +++ b/packages/backend/src/types.ts @@ -74,6 +74,10 @@ export type Settings = { * The polling rate (in milliseconds) at which the db should be checked for connections that need to be re-synced. */ resyncConnectionPollingIntervalMs: number; + /** + * The polling rate (in milliseconds) at which the db should be checked for repos that should be re-indexed. + */ + reindexRepoPollingInternvalMs: number; /** * The multiple of the number of CPUs to use for indexing. */ diff --git a/packages/backend/src/utils.ts b/packages/backend/src/utils.ts index f85ea10a..b6ba1091 100644 --- a/packages/backend/src/utils.ts +++ b/packages/backend/src/utils.ts @@ -159,4 +159,8 @@ export const arraysEqualShallow = (a?: readonly T[], b?: readonly T[]) => { export const getRepoPath = (repo: Repo, ctx: AppContext) => { return path.join(ctx.reposPath, repo.id.toString()); +} + +export const getShardPrefix = (orgId: number, repoId: number) => { + return `${orgId}_${repoId}`; } \ No newline at end of file diff --git a/packages/backend/src/zoekt.ts b/packages/backend/src/zoekt.ts index 088ae03c..0c4727d0 100644 --- a/packages/backend/src/zoekt.ts +++ b/packages/backend/src/zoekt.ts @@ -3,6 +3,7 @@ import { AppContext, LocalRepository, Settings } from "./types.js"; import { Repo } from "@sourcebot/db"; import { getRepoPath } from "./utils.js"; import { DEFAULT_SETTINGS } from "./constants.js"; +import { getShardPrefix } from "./utils.js"; const ALWAYS_EXCLUDED_DIRS = ['.git', '.hg', '.svn']; @@ -11,7 +12,7 @@ export const indexGitRepository = async (repo: Repo, ctx: AppContext) => { 'HEAD' ]; - const shardPrefix = `${repo.orgId}_${repo.id}`; + const shardPrefix = getShardPrefix(repo.orgId, repo.id); const repoPath = getRepoPath(repo, ctx); const command = `zoekt-git-index -allow_missing_branches -index ${ctx.indexPath} -file_limit ${DEFAULT_SETTINGS.maxFileSize} -branches ${revisions.join(',')} -tenant_id ${repo.orgId} -shard_prefix ${shardPrefix} ${repoPath}`;