diff --git a/package.json b/package.json index 786cf2da..cf4e8ed5 100644 --- a/package.json +++ b/package.json @@ -6,16 +6,16 @@ "scripts": { "build": "yarn workspaces run build", "test": "yarn workspaces run test", - "dev": "yarn workspace @sourcebot/db prisma:migrate:dev && npm-run-all --print-label --parallel dev:zoekt dev:backend dev:web dev:redis", - "dev:mt": "yarn workspace @sourcebot/db prisma:migrate:dev && npm-run-all --print-label --parallel dev:zoekt:mt dev:backend dev:web dev:redis", + "dev": "yarn workspace @sourcebot/db prisma:migrate:dev && cross-env SOURCEBOT_TENANT_MODE=single npm-run-all --print-label --parallel dev:zoekt dev:backend dev:web dev:redis", + "dev:mt": "yarn workspace @sourcebot/db prisma:migrate:dev && cross-env SOURCEBOT_TENANT_MODE=multi npm-run-all --print-label --parallel dev:zoekt:mt dev:backend dev:web dev:redis", "dev:zoekt": "export PATH=\"$PWD/bin:$PATH\" && export SRC_TENANT_ENFORCEMENT_MODE=none && zoekt-webserver -index .sourcebot/index -rpc", "dev:zoekt:mt": "export PATH=\"$PWD/bin:$PATH\" && export SRC_TENANT_ENFORCEMENT_MODE=strict && zoekt-webserver -index .sourcebot/index -rpc", "dev:backend": "yarn workspace @sourcebot/backend dev:watch", "dev:web": "yarn workspace @sourcebot/web dev", "dev:redis": "docker ps --filter \"name=redis\" --format \"{{.Names}}\" | grep -q \"^redis$\" && docker rm -f redis; docker run -d --name redis -p 6379:6379 redis" - }, "devDependencies": { + "cross-env": "^7.0.3", "npm-run-all": "^4.1.5" } } diff --git a/packages/backend/src/config.ts b/packages/backend/src/config.ts index 3b416cbb..2434380c 100644 --- a/packages/backend/src/config.ts +++ b/packages/backend/src/config.ts @@ -7,7 +7,7 @@ import { SourcebotConfigurationSchema } from "./schemas/v2.js"; import { AppContext } from "./types.js"; import { getTokenFromConfig, isRemotePath, marshalBool } from "./utils.js"; -export const syncConfig = async (configPath: string, db: PrismaClient, signal: AbortSignal, ctx: AppContext) => { +export const fetchConfigFromPath = async (configPath: string, signal: AbortSignal) => { const configContent = await (async () => { if (isRemotePath(configPath)) { const response = await fetch(configPath, { @@ -25,9 +25,11 @@ export const syncConfig = async (configPath: string, db: PrismaClient, signal: A } })(); - // @todo: we should validate the configuration file's structure here. const config = JSON.parse(stripJsonComments(configContent)) as SourcebotConfigurationSchema; + return config; +} +export const syncConfig = async (config: SourcebotConfigurationSchema, db: PrismaClient, signal: AbortSignal, ctx: AppContext) => { for (const repoConfig of config.repos ?? []) { switch (repoConfig.type) { case 'github': { diff --git a/packages/backend/src/constants.ts b/packages/backend/src/constants.ts index 11f82341..0c983120 100644 --- a/packages/backend/src/constants.ts +++ b/packages/backend/src/constants.ts @@ -9,4 +9,5 @@ export const DEFAULT_SETTINGS: Settings = { reindexIntervalMs: 1000 * 60, resyncIntervalMs: 1000 * 60 * 60 * 24, // 1 day in milliseconds indexConcurrencyMultiple: 3, + configSyncConcurrencyMultiple: 3, } \ No newline at end of file diff --git a/packages/backend/src/environment.ts b/packages/backend/src/environment.ts index a9fee07d..508838f6 100644 --- a/packages/backend/src/environment.ts +++ b/packages/backend/src/environment.ts @@ -1,6 +1,10 @@ import dotenv from 'dotenv'; -export const getEnv = (env: string | undefined, defaultValue?: string) => { +export const getEnv = (env: string | undefined, defaultValue?: string, required?: boolean) => { + if (required && !env && !defaultValue) { + throw new Error(`Missing required environment variable`); + } + return env ?? defaultValue; } @@ -15,6 +19,8 @@ dotenv.config({ path: './.env', }); + +export const SOURCEBOT_TENANT_MODE = getEnv(process.env.SOURCEBOT_TENANT_MODE, undefined, true); export const SOURCEBOT_LOG_LEVEL = getEnv(process.env.SOURCEBOT_LOG_LEVEL, 'info')!; export const SOURCEBOT_TELEMETRY_DISABLED = getEnvBoolean(process.env.SOURCEBOT_TELEMETRY_DISABLED, false)!; export const SOURCEBOT_INSTALL_ID = getEnv(process.env.SOURCEBOT_INSTALL_ID, 'unknown')!; diff --git a/packages/backend/src/index.ts b/packages/backend/src/index.ts index ce6fff51..9cab5e65 100644 --- a/packages/backend/src/index.ts +++ b/packages/backend/src/index.ts @@ -6,6 +6,7 @@ import { isRemotePath } from "./utils.js"; import { AppContext } from "./types.js"; import { main } from "./main.js" import { PrismaClient } from "@sourcebot/db"; +import { SOURCEBOT_TENANT_MODE } from "./environment.js"; const parser = new ArgumentParser({ @@ -19,7 +20,7 @@ type Arguments = { parser.add_argument("--configPath", { help: "Path to config file", - required: true, + required: SOURCEBOT_TENANT_MODE === "single", }); parser.add_argument("--cacheDir", { @@ -28,8 +29,8 @@ parser.add_argument("--cacheDir", { }); const args = parser.parse_args() as Arguments; -if (!isRemotePath(args.configPath) && !existsSync(args.configPath)) { - console.error(`Config file ${args.configPath} does not exist`); +if (SOURCEBOT_TENANT_MODE === "single" && !isRemotePath(args.configPath) && !existsSync(args.configPath)) { + console.error(`Config file ${args.configPath} does not exist, and is required in single tenant mode`); process.exit(1); } diff --git a/packages/backend/src/main.ts b/packages/backend/src/main.ts index 3b4e8c9f..0c66ba46 100644 --- a/packages/backend/src/main.ts +++ b/packages/backend/src/main.ts @@ -1,6 +1,6 @@ -import { PrismaClient, Repo, RepoIndexingStatus } from '@sourcebot/db'; +import { ConfigSyncStatus, PrismaClient, Repo, Config, RepoIndexingStatus, Prisma } from '@sourcebot/db'; import { existsSync, watch } from 'fs'; -import { syncConfig } from "./config.js"; +import { fetchConfigFromPath, syncConfig } from "./config.js"; import { cloneRepository, fetchRepository } from "./git.js"; import { createLogger } from "./logger.js"; import { captureEvent } from "./posthog.js"; @@ -11,6 +11,8 @@ import { DEFAULT_SETTINGS } from './constants.js'; import { Queue, Worker, Job } from 'bullmq'; import { Redis } from 'ioredis'; import * as os from 'os'; +import { SOURCEBOT_TENANT_MODE } from './environment.js'; +import { SourcebotConfigurationSchema } from './schemas/v2.js'; const logger = createLogger('main'); @@ -56,6 +58,23 @@ const syncGitRepository = async (repo: Repo, ctx: AppContext) => { } } +async function addConfigsToQueue(db: PrismaClient, queue: Queue, configs: Config[]) { + for (const config of configs) { + await db.$transaction(async (tx) => { + await tx.config.update({ + where: { id: config.id }, + data: { syncStatus: ConfigSyncStatus.IN_SYNC_QUEUE }, + }); + + // Add the job to the queue + await queue.add('configSyncJob', config); + logger.info(`Added job to queue for config ${config.id}`); + }).catch((err: unknown) => { + logger.error(`Failed to add job to queue for config ${config.id}: ${err}`); + }); + } +} + async function addReposToQueue(db: PrismaClient, queue: Queue, repos: Repo[]) { for (const repo of repos) { await db.$transaction(async (tx) => { @@ -67,7 +86,7 @@ async function addReposToQueue(db: PrismaClient, queue: Queue, repos: Repo[]) { // Add the job to the queue await queue.add('indexJob', repo); logger.info(`Added job to queue for repo ${repo.id}`); - }).catch((err) => { + }).catch((err: unknown) => { logger.error(`Failed to add job to queue for repo ${repo.id}: ${err}`); }); } @@ -76,47 +95,80 @@ async function addReposToQueue(db: PrismaClient, queue: Queue, repos: Repo[]) { export const main = async (db: PrismaClient, context: AppContext) => { let abortController = new AbortController(); let isSyncing = false; - const _syncConfig = async () => { - if (isSyncing) { - abortController.abort(); - abortController = new AbortController(); - } + const _syncConfig = async (dbConfig?: Config | undefined) => { - logger.info(`Syncing configuration file ${context.configPath} ...`); - isSyncing = true; + // Fetch config object and update syncing status + let config: SourcebotConfigurationSchema; + switch (SOURCEBOT_TENANT_MODE) { + case 'single': + logger.info(`Syncing configuration file ${context.configPath} ...`); + if (isSyncing) { + abortController.abort(); + abortController = new AbortController(); + } + config = await fetchConfigFromPath(context.configPath, abortController.signal); + isSyncing = true; + break; + case 'multi': + if(!dbConfig) { + throw new Error('config object is required in multi tenant mode'); + } + config = dbConfig.data as SourcebotConfigurationSchema + db.config.update({ + where: { + id: dbConfig.id, + }, + data: { + syncStatus: ConfigSyncStatus.SYNCING, + } + }) + break; + default: + throw new Error(`Invalid SOURCEBOT_TENANT_MODE: ${SOURCEBOT_TENANT_MODE}`); + } + + // Attempt to sync the config, handle failure cases try { - const { durationMs } = await measure(() => syncConfig(context.configPath, db, abortController.signal, context)) - logger.info(`Synced configuration file ${context.configPath} in ${durationMs / 1000}s`); + const { durationMs } = await measure(() => syncConfig(config, db, abortController.signal, context)) + logger.info(`Synced configuration in ${durationMs / 1000}s`); isSyncing = false; } catch (err: any) { - if (err.name === "AbortError") { - // @note: If we're aborting, we don't want to set isSyncing to false - // since it implies another sync is in progress. - } else { - isSyncing = false; - logger.error(`Failed to sync configuration file ${context.configPath} with error:`); - console.log(err); + switch(SOURCEBOT_TENANT_MODE) { + case 'single': + if (err.name === "AbortError") { + // @note: If we're aborting, we don't want to set isSyncing to false + // since it implies another sync is in progress. + } else { + isSyncing = false; + logger.error(`Failed to sync configuration file with error:`); + console.log(err); + } + break; + case 'multi': + if (dbConfig) { + await db.config.update({ + where: { + id: dbConfig.id, + }, + data: { + syncStatus: ConfigSyncStatus.FAILED, + } + }) + logger.error(`Failed to sync configuration ${dbConfig.id} with error: ${err}`); + } else { + logger.error(`DB config undefined. Failed to sync configuration with error: ${err}`); + } + break; + default: + throw new Error(`Invalid SOURCEBOT_TENANT_MODE: ${SOURCEBOT_TENANT_MODE}`); } } } - // Re-sync on file changes if the config file is local - if (!isRemotePath(context.configPath)) { - watch(context.configPath, () => { - logger.info(`Config file ${context.configPath} changed. Re-syncing...`); - _syncConfig(); - }); - } - - // Re-sync at a fixed interval - setInterval(() => { - _syncConfig(); - }, DEFAULT_SETTINGS.resyncIntervalMs); - - // Sync immediately on startup - await _syncConfig(); - + ///////////////////////////// + // Init Redis + ///////////////////////////// const redis = new Redis({ host: 'localhost', port: 6379, @@ -124,18 +176,85 @@ export const main = async (db: PrismaClient, context: AppContext) => { }); redis.ping().then(() => { logger.info('Connected to redis'); - }).catch((err) => { + }).catch((err: unknown) => { logger.error('Failed to connect to redis'); console.error(err); process.exit(1); }); + ///////////////////////////// + // Setup config sync watchers + ///////////////////////////// + switch (SOURCEBOT_TENANT_MODE) { + case 'single': + // Re-sync on file changes if the config file is local + if (!isRemotePath(context.configPath)) { + watch(context.configPath, () => { + logger.info(`Config file ${context.configPath} changed. Re-syncing...`); + _syncConfig(); + }); + } + + // Re-sync at a fixed interval + setInterval(() => { + _syncConfig(); + }, DEFAULT_SETTINGS.resyncIntervalMs); + + // Sync immediately on startup + await _syncConfig(); + break; + case 'multi': + // Setup config sync queue and workers + const configSyncQueue = new Queue('configSyncQueue'); + const numCores = os.cpus().length; + const numWorkers = numCores * DEFAULT_SETTINGS.configSyncConcurrencyMultiple; + logger.info(`Detected ${numCores} cores. Setting config sync max concurrency to ${numWorkers}`); + const configSyncWorker = new Worker('configSyncQueue', async (job: Job) => { + const config = job.data as Config; + await _syncConfig(config); + }, { connection: redis, concurrency: numWorkers }); + configSyncWorker.on('completed', async (job: Job) => { + logger.info(`Config sync job ${job.id} completed`); + + const config = job.data as Config; + await db.config.update({ + where: { + id: config.id, + }, + data: { + syncStatus: ConfigSyncStatus.SYNCED, + } + }) + }); + configSyncWorker.on('failed', (job: Job | undefined, err: unknown) => { + logger.info(`Config sync job failed with error: ${err}`); + }); + + setInterval(async () => { + const configs = await db.config.findMany({ + where: { + syncStatus: ConfigSyncStatus.SYNC_NEEDED, + } + }); + + logger.info(`Found ${configs.length} configs to sync...`); + addConfigsToQueue(db, configSyncQueue, configs); + }, 1000); + break; + default: + throw new Error(`Invalid SOURCEBOT_TENANT_MODE: ${SOURCEBOT_TENANT_MODE}`); + } + + + ///////////////////////// + // Setup repo indexing + ///////////////////////// const indexQueue = new Queue('indexQueue'); const numCores = os.cpus().length; const numWorkers = numCores * DEFAULT_SETTINGS.indexConcurrencyMultiple; - logger.info(`Detected ${numCores} cores. Setting max concurrency to ${numWorkers}`); - const worker = new Worker('indexQueue', async (job) => { + logger.info(`Detected ${numCores} cores. Setting repo index max concurrency to ${numWorkers}`); + const worker = new Worker('indexQueue', async (job: Job) => { const repo = job.data as Repo; let indexDuration_s: number | undefined; @@ -166,10 +285,10 @@ export const main = async (db: PrismaClient, context: AppContext) => { }); }, { connection: redis, concurrency: numWorkers }); - worker.on('completed', (job) => { + worker.on('completed', (job: Job) => { logger.info(`Job ${job.id} completed`); }); - worker.on('failed', async (job: Job | undefined, err) => { + worker.on('failed', async (job: Job | undefined, err: unknown) => { logger.info(`Job failed with error: ${err}`); if (job) { await db.repo.update({ @@ -183,6 +302,7 @@ export const main = async (db: PrismaClient, context: AppContext) => { } }); + // Repo indexing loop while (true) { const thresholdDate = new Date(Date.now() - DEFAULT_SETTINGS.reindexIntervalMs); const repos = await db.repo.findMany({ diff --git a/packages/backend/src/types.ts b/packages/backend/src/types.ts index 05bd9639..a6e80b81 100644 --- a/packages/backend/src/types.ts +++ b/packages/backend/src/types.ts @@ -78,6 +78,10 @@ export type Settings = { * The multiple of the number of CPUs to use for indexing. */ indexConcurrencyMultiple: number; + /** + * The multiple of the number of CPUs to use for syncing the configuration. + */ + configSyncConcurrencyMultiple: number; } // @see : https://stackoverflow.com/a/61132308 diff --git a/packages/db/prisma/migrations/20250117183646_add_config_table/migration.sql b/packages/db/prisma/migrations/20250117183646_add_config_table/migration.sql new file mode 100644 index 00000000..e6fc16ae --- /dev/null +++ b/packages/db/prisma/migrations/20250117183646_add_config_table/migration.sql @@ -0,0 +1,11 @@ +-- CreateTable +CREATE TABLE "Config" ( + "id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + "data" JSONB NOT NULL, + "createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" DATETIME NOT NULL, + "syncedAt" DATETIME, + "syncStatus" TEXT NOT NULL DEFAULT 'SYNC_NEEDED', + "orgId" INTEGER NOT NULL, + CONSTRAINT "Config_orgId_fkey" FOREIGN KEY ("orgId") REFERENCES "Org" ("id") ON DELETE CASCADE ON UPDATE CASCADE +); diff --git a/packages/db/prisma/schema.prisma b/packages/db/prisma/schema.prisma index 5872c999..ea785ee3 100644 --- a/packages/db/prisma/schema.prisma +++ b/packages/db/prisma/schema.prisma @@ -18,6 +18,14 @@ enum RepoIndexingStatus { FAILED } +enum ConfigSyncStatus { + SYNC_NEEDED + IN_SYNC_QUEUE + SYNCING + SYNCED + FAILED +} + model Repo { id Int @id @default(autoincrement()) name String @@ -42,12 +50,27 @@ model Repo { @@unique([external_id, external_codeHostUrl]) } +model Config { + id Int @id @default(autoincrement()) + data Json + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + syncedAt DateTime? + + syncStatus ConfigSyncStatus @default(SYNC_NEEDED) + + // The organization that owns this config + org Org @relation(fields: [orgId], references: [id], onDelete: Cascade) + orgId Int +} + model Org { id Int @id @default(autoincrement()) name String createdAt DateTime @default(now()) updatedAt DateTime @updatedAt members UserToOrg[] + configs Config[] } model UserToOrg { diff --git a/yarn.lock b/yarn.lock index 96eb94f4..8314f3d2 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2938,6 +2938,13 @@ cron-parser@^4.9.0: dependencies: luxon "^3.2.1" +cross-env@^7.0.3: + version "7.0.3" + resolved "https://registry.yarnpkg.com/cross-env/-/cross-env-7.0.3.tgz#865264b29677dc015ba8418918965dd232fc54cf" + integrity sha512-+/HKd6EgcQCJGh2PSjZuUitQBQynKor4wrFbRg4DtAgS1aWO+gU52xpH7M9ScGgXSYmAVS9bIJ8EzuaGw0oNAw== + dependencies: + cross-spawn "^7.0.1" + cross-fetch@^4.0.0: version "4.0.0" resolved "https://registry.yarnpkg.com/cross-fetch/-/cross-fetch-4.0.0.tgz#f037aef1580bb3a1a35164ea2a848ba81b445983" @@ -2956,7 +2963,7 @@ cross-spawn@^6.0.5: shebang-command "^1.2.0" which "^1.2.9" -cross-spawn@^7.0.0, cross-spawn@^7.0.2, cross-spawn@^7.0.3: +cross-spawn@^7.0.0, cross-spawn@^7.0.1, cross-spawn@^7.0.2, cross-spawn@^7.0.3: version "7.0.6" resolved "https://registry.yarnpkg.com/cross-spawn/-/cross-spawn-7.0.6.tgz#8a58fe78f00dcd70c370451759dfbfaf03e8ee9f" integrity sha512-uV2QOWP2nWzsy2aMp8aRibhi9dlzF5Hgh5SHaB9OiTGEyDTiJJyx0uy51QXdyWbtAHNua4XJzUKca3OzKUd3vA== @@ -5850,8 +5857,16 @@ string-argv@^0.3.1: resolved "https://registry.yarnpkg.com/string-argv/-/string-argv-0.3.2.tgz#2b6d0ef24b656274d957d54e0a4bbf6153dc02b6" integrity sha512-aqD2Q0144Z+/RqG52NeHEkZauTAUWJO8c6yTftGJKO3Tja5tUgIfmIl6kExvhtxSDP7fXB6DvzkfMpCd/F3G+Q== -"string-width-cjs@npm:string-width@^4.2.0", string-width@^4.1.0: - name string-width-cjs +"string-width-cjs@npm:string-width@^4.2.0": + version "4.2.3" + resolved "https://registry.yarnpkg.com/string-width/-/string-width-4.2.3.tgz#269c7117d27b05ad2e536830a8ec895ef9c6d010" + integrity sha512-wKyQRQpjJ0sIp62ErSZdGsjMJWsap5oRNihHhu6G7JVO/9jIB6UyevL+tXuOqrng8j/cxKTWyWUwvSTriiZz/g== + dependencies: + emoji-regex "^8.0.0" + is-fullwidth-code-point "^3.0.0" + strip-ansi "^6.0.1" + +string-width@^4.1.0: version "4.2.3" resolved "https://registry.yarnpkg.com/string-width/-/string-width-4.2.3.tgz#269c7117d27b05ad2e536830a8ec895ef9c6d010" integrity sha512-wKyQRQpjJ0sIp62ErSZdGsjMJWsap5oRNihHhu6G7JVO/9jIB6UyevL+tXuOqrng8j/cxKTWyWUwvSTriiZz/g== @@ -5948,7 +5963,14 @@ string_decoder@^1.1.1, string_decoder@^1.3.0: dependencies: safe-buffer "~5.2.0" -"strip-ansi-cjs@npm:strip-ansi@^6.0.1", strip-ansi@^6.0.0, strip-ansi@^6.0.1: +"strip-ansi-cjs@npm:strip-ansi@^6.0.1": + version "6.0.1" + resolved "https://registry.yarnpkg.com/strip-ansi/-/strip-ansi-6.0.1.tgz#9e26c63d30f53443e9489495b2105d37b67a85d9" + integrity sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A== + dependencies: + ansi-regex "^5.0.1" + +strip-ansi@^6.0.0, strip-ansi@^6.0.1: version "6.0.1" resolved "https://registry.yarnpkg.com/strip-ansi/-/strip-ansi-6.0.1.tgz#9e26c63d30f53443e9489495b2105d37b67a85d9" integrity sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A==