Skip to content

db performance improvements and job resilience #200

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ ENV DATA_CACHE_DIR=$DATA_DIR/.sourcebot
ENV DB_DATA_DIR=$DATA_CACHE_DIR/db
ENV DB_NAME=sourcebot
ENV DATABASE_URL="postgresql://postgres@localhost:5432/sourcebot"
ENV REDIS_URL="redis://localhost:6379"
ENV SRC_TENANT_ENFORCEMENT_MODE=strict

ARG SOURCEBOT_VERSION=unknown
Expand Down
134 changes: 120 additions & 14 deletions packages/backend/src/connectionManager.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import { Connection, ConnectionSyncStatus, PrismaClient, Prisma } from "@sourcebot/db";
import { Connection, ConnectionSyncStatus, PrismaClient, Prisma, Repo } from "@sourcebot/db";
import { Job, Queue, Worker } from 'bullmq';
import { Settings, WithRequired } from "./types.js";
import { ConnectionConfig } from "@sourcebot/schemas/v3/connection.type";
import { createLogger } from "./logger.js";
import os from 'os';
import { Redis } from 'ioredis';
import { RepoData, compileGithubConfig, compileGitlabConfig, compileGiteaConfig, compileGerritConfig } from "./repoCompileUtils.js";
import { CONFIG_REPO_UPSERT_TIMEOUT_MS } from "./environment.js";

interface IConnectionManager {
scheduleConnectionSync: (connection: Connection) => Promise<void>;
Expand All @@ -23,15 +22,18 @@ type JobPayload = {
};

export class ConnectionManager implements IConnectionManager {
private queue = new Queue<JobPayload>(QUEUE_NAME);
private worker: Worker;
private queue: Queue<JobPayload>;
private logger = createLogger('ConnectionManager');

constructor(
private db: PrismaClient,
private settings: Settings,
redis: Redis,
) {
this.queue = new Queue<JobPayload>(QUEUE_NAME, {
connection: redis,
});
const numCores = os.cpus().length;
this.worker = new Worker(QUEUE_NAME, this.runSyncJob.bind(this), {
connection: redis,
Expand Down Expand Up @@ -113,6 +115,7 @@ export class ConnectionManager implements IConnectionManager {
// appear in the repoData array above, and so the RepoToConnection record won't be re-created.
// Repos that have no RepoToConnection records are considered orphaned and can be deleted.
await this.db.$transaction(async (tx) => {
const deleteStart = performance.now();
await tx.connection.update({
where: {
id: job.data.connectionId,
Expand All @@ -123,21 +126,124 @@ export class ConnectionManager implements IConnectionManager {
}
}
});
const deleteDuration = performance.now() - deleteStart;
this.logger.info(`Deleted all RepoToConnection records for connection ${job.data.connectionId} in ${deleteDuration}ms`);

await Promise.all(repoData.map((repo) => {
return tx.repo.upsert({
where: {
external_id_external_codeHostUrl: {
external_id: repo.external_id,
external_codeHostUrl: repo.external_codeHostUrl,
},
const existingRepos: Repo[] = await tx.repo.findMany({
where: {
external_id: {
in: repoData.map(repo => repo.external_id),
},
create: repo,
update: repo as Prisma.RepoUpdateInput,
external_codeHostUrl: {
in: repoData.map(repo => repo.external_codeHostUrl),
},
},
});
const existingRepoKeys = existingRepos.map(repo => `${repo.external_id}-${repo.external_codeHostUrl}`);

const existingRepoData = repoData.filter(repo => existingRepoKeys.includes(`${repo.external_id}-${repo.external_codeHostUrl}`));
const [toCreate, toUpdate] = repoData.reduce<[Prisma.RepoCreateManyInput[], Prisma.RepoUpdateManyMutationInput[]]>(([toCreate, toUpdate], repo) => {
const existingRepo = existingRepoData.find((r: RepoData) => r.external_id === repo.external_id && r.external_codeHostUrl === repo.external_codeHostUrl);
if (existingRepo) {
// @note: make sure to reflect any changes here in the raw sql update below
const updateRepo: Prisma.RepoUpdateManyMutationInput = {
name: repo.name,
cloneUrl: repo.cloneUrl,
imageUrl: repo.imageUrl,
isFork: repo.isFork,
isArchived: repo.isArchived,
metadata: repo.metadata,
external_id: repo.external_id,
external_codeHostType: repo.external_codeHostType,
external_codeHostUrl: repo.external_codeHostUrl,
}
toUpdate.push(updateRepo);
} else {
const createRepo: Prisma.RepoCreateManyInput = {
name: repo.name,
cloneUrl: repo.cloneUrl,
imageUrl: repo.imageUrl,
isFork: repo.isFork,
isArchived: repo.isArchived,
metadata: repo.metadata,
orgId: job.data.orgId,
external_id: repo.external_id,
external_codeHostType: repo.external_codeHostType,
external_codeHostUrl: repo.external_codeHostUrl,
}
toCreate.push(createRepo);
}
return [toCreate, toUpdate];
}, [[], []]);

if (toCreate.length > 0) {
const createStart = performance.now();
const createdRepos = await tx.repo.createManyAndReturn({
data: toCreate,
});
}));

}, { timeout: parseInt(CONFIG_REPO_UPSERT_TIMEOUT_MS) });
await tx.repoToConnection.createMany({
data: createdRepos.map(repo => ({
repoId: repo.id,
connectionId: job.data.connectionId,
})),
});

const createDuration = performance.now() - createStart;
this.logger.info(`Created ${toCreate.length} repos in ${createDuration}ms`);
}

if (toUpdate.length > 0) {
const updateStart = performance.now();

// Build values string for update query
const updateValues = toUpdate.map(repo => `(
'${repo.name}',
'${repo.cloneUrl}',
${repo.imageUrl ? `'${repo.imageUrl}'` : 'NULL'},
${repo.isFork},
${repo.isArchived},
'${JSON.stringify(repo.metadata)}'::jsonb,
'${repo.external_id}',
'${repo.external_codeHostType}',
'${repo.external_codeHostUrl}'
)`).join(',');

// Update repos and get their IDs in one quercy
const updateSql = `
WITH updated AS (
UPDATE "Repo" r
SET
name = v.name,
"cloneUrl" = v.clone_url,
"imageUrl" = v.image_url,
"isFork" = v.is_fork,
"isArchived" = v.is_archived,
metadata = v.metadata,
"updatedAt" = NOW()
FROM (
VALUES ${updateValues}
) AS v(name, clone_url, image_url, is_fork, is_archived, metadata, external_id, external_code_host_type, external_code_host_url)
WHERE r.external_id = v.external_id
AND r."external_codeHostUrl" = v.external_code_host_url
RETURNING r.id
)
SELECT id FROM updated
`;
const updatedRepoIds = await tx.$queryRawUnsafe<{id: number}[]>(updateSql);

// Insert repo-connection mappings
const createConnectionSql = `
INSERT INTO "RepoToConnection" ("repoId", "connectionId", "addedAt")
SELECT id, ${job.data.connectionId}, NOW()
FROM unnest(ARRAY[${updatedRepoIds.map(r => r.id).join(',')}]) AS id
`;
await tx.$executeRawUnsafe(createConnectionSql);

const updateDuration = performance.now() - updateStart;
this.logger.info(`Updated ${toUpdate.length} repos in ${updateDuration}ms`);
}
});
}


Expand Down
5 changes: 2 additions & 3 deletions packages/backend/src/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,5 @@ export const FALLBACK_GITHUB_TOKEN = getEnv(process.env.FALLBACK_GITHUB_TOKEN);
export const FALLBACK_GITLAB_TOKEN = getEnv(process.env.FALLBACK_GITLAB_TOKEN);
export const FALLBACK_GITEA_TOKEN = getEnv(process.env.FALLBACK_GITEA_TOKEN);

export const CONFIG_REPO_UPSERT_TIMEOUT_MS = getEnv(process.env.CONFIG_REPO_UPSERT_TIMEOUT_MS, '15000')!;

export const INDEX_CONCURRENCY_MULTIPLE = getEnv(process.env.INDEX_CONCURRENCY_MULTIPLE);
export const INDEX_CONCURRENCY_MULTIPLE = getEnv(process.env.INDEX_CONCURRENCY_MULTIPLE);
export const REDIS_URL = getEnv(process.env.REDIS_URL, 'redis://localhost:6379')!;
6 changes: 2 additions & 4 deletions packages/backend/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@ import { DEFAULT_SETTINGS } from './constants.js';
import { Redis } from 'ioredis';
import { ConnectionManager } from './connectionManager.js';
import { RepoManager } from './repoManager.js';
import { INDEX_CONCURRENCY_MULTIPLE } from './environment.js';
import { INDEX_CONCURRENCY_MULTIPLE, REDIS_URL } from './environment.js';

const logger = createLogger('main');

export const main = async (db: PrismaClient, context: AppContext) => {
const redis = new Redis({
host: 'localhost',
port: 6379,
const redis = new Redis(REDIS_URL, {
maxRetriesPerRequest: null
});
redis.ping().then(() => {
Expand Down
57 changes: 41 additions & 16 deletions packages/backend/src/repoManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import os from 'os';

interface IRepoManager {
blockingPollLoop: () => void;
scheduleRepoIndexing: (repo: RepoWithConnections) => Promise<void>;
scheduleRepoIndexingBulk: (repos: RepoWithConnections[]) => Promise<void>;
dispose: () => void;
}

Expand All @@ -25,8 +25,8 @@ type JobPayload = {
}

export class RepoManager implements IRepoManager {
private queue = new Queue<JobPayload>(QUEUE_NAME);
private worker: Worker;
private queue: Queue<JobPayload>;
private logger = createLogger('RepoManager');

constructor(
Expand All @@ -35,6 +35,9 @@ export class RepoManager implements IRepoManager {
redis: Redis,
private ctx: AppContext,
) {
this.queue = new Queue<JobPayload>(QUEUE_NAME, {
connection: redis,
});
const numCores = os.cpus().length;
this.worker = new Worker(QUEUE_NAME, this.runIndexJob.bind(this), {
connection: redis,
Expand All @@ -46,26 +49,48 @@ export class RepoManager implements IRepoManager {

public async blockingPollLoop() {
while(true) {
this.fetchAndScheduleRepoIndexing();
this.garbageCollectRepo();
await this.fetchAndScheduleRepoIndexing();
await this.garbageCollectRepo();

await new Promise(resolve => setTimeout(resolve, this.settings.reindexRepoPollingIntervalMs));
}
}

public async scheduleRepoIndexing(repo: RepoWithConnections) {
public async scheduleRepoIndexingBulk(repos: RepoWithConnections[]) {
await this.db.$transaction(async (tx) => {
await tx.repo.update({
where: { id: repo.id },
data: { repoIndexingStatus: RepoIndexingStatus.IN_INDEX_QUEUE },
await tx.repo.updateMany({
where: { id: { in: repos.map(repo => repo.id) } },
data: { repoIndexingStatus: RepoIndexingStatus.IN_INDEX_QUEUE }
});

const reposByOrg = repos.reduce<Record<number, RepoWithConnections[]>>((acc, repo) => {
if (!acc[repo.orgId]) {
acc[repo.orgId] = [];
}
acc[repo.orgId].push(repo);
return acc;
}, {});

for (const orgId in reposByOrg) {
const orgRepos = reposByOrg[orgId];
// Set priority based on number of repos (more repos = lower priority)
// This helps prevent large orgs from overwhelming the queue
const priority = Math.min(Math.ceil(orgRepos.length / 10), 2097152);

await this.queue.addBulk(orgRepos.map(repo => ({
name: 'repoIndexJob',
data: { repo },
opts: {
priority: priority
}
})));

this.logger.info(`Added ${orgRepos.length} jobs to queue for org ${orgId} with priority ${priority}`);
}


await this.queue.add('repoIndexJob', {
repo
});
this.logger.info(`Added job to queue for repo ${repo.id}`);
}).catch((err: unknown) => {
this.logger.error(`Failed to add job to queue for repo ${repo.id}: ${err}`);
this.logger.error(`Failed to add jobs to queue for repos ${repos.map(repo => repo.id).join(', ')}: ${err}`);
});
}

Expand All @@ -91,9 +116,9 @@ export class RepoManager implements IRepoManager {
}
});

for (const repo of repos) {
await this.scheduleRepoIndexing(repo);
}
if (repos.length > 0) {
await this.scheduleRepoIndexingBulk(repos);
}
}

private async garbageCollectRepo() {
Expand Down