Skip to content

Commit 009c76b

Browse files
committed
perf: use in-memory cache to speed up job status polling
1 parent d8a5ba0 commit 009c76b

File tree

7 files changed

+116
-22
lines changed

7 files changed

+116
-22
lines changed

README.md

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,18 @@ npm start
2525

2626
### Submission jobs
2727

28-
| Component | Action |
29-
| --------- | ------------------------------------------------------------------------------------------------------------------- |
30-
| `Worker` | Launch and maintain pool of X prebooted microVMs in the background |
31-
| `Worker` | Bind to RabbitMQ queue `jobs_q` on `jobs_ex` exchange and `jobs_rk` routing key |
32-
| `API` | Bind to RabbitMQ queue `jobs_status_q` on `jobs_status_ex` exchange and `jobs_status_rk` routing key |
33-
| `Front` | `POST /submissions` |
34-
| `Front` | poll `GET /submissions/:id` every 500 ms |
35-
| `API` | Send job to RabbitMQ on `jobs_ex` direct exchange with `jobs_rk` routing key. |
36-
| `Worker` | Receive job, get ready microVM from pool, send job to agent in microVM |
37-
| `Agent` | Compile/Run code, return result |
38-
| `Worker` | During two previous steips, send status of jobs on `jobs_status_ex` exchange with `jobs_status_rk` routing key |
39-
| `API` | Receive each new status on `jobs_status_q`, and insert them into DB, so that the polling can get the latest status. |
28+
| Component | Action |
29+
| --------- | --------------------------------------------------------------------------------------------------------------------------------------- |
30+
| `Worker` | Launch and maintain pool of X prebooted microVMs in the background |
31+
| `Worker` | Bind to RabbitMQ queue `jobs_q` on `jobs_ex` exchange and `jobs_rk` routing key |
32+
| `API` | Bind to RabbitMQ queue `jobs_status_q` on `jobs_status_ex` exchange and `jobs_status_rk` routing key |
33+
| `Front` | `POST /submissions` |
34+
| `Front` | Poll `GET /submissions/:id` every 500 ms |
35+
| `API` | Send job to RabbitMQ on `jobs_ex` direct exchange with `jobs_rk` routing key. |
36+
| `Worker` | Receive job, get ready microVM from pool, send job to agent in microVM |
37+
| `Agent` | Compile/Run code, return result |
38+
| `Worker` | During two previous steips, send status of jobs on `jobs_status_ex` exchange with `jobs_status_rk` routing key |
39+
| `API` | Receive each new status on `jobs_status_q`, and insert them into DB and in-memory cache, so that the polling can get the latest status. |
4040

4141
## Contribute
4242

package-lock.json

Lines changed: 44 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
"amqp-connection-manager": "^3.2.2",
4747
"amqplib": "^0.8.0",
4848
"argon2": "^0.27.2",
49+
"cache-manager": "^3.4.3",
4950
"class-transformer": "^0.4.0",
5051
"class-validator": "^0.13.1",
5152
"dotenv": "^10.0.0",
@@ -69,6 +70,7 @@
6970
"@nestjs/schematics": "7.3.1",
7071
"@nestjs/testing": "7.6.17",
7172
"@tsconfig/node16": "^1.0.1",
73+
"@types/cache-manager": "^3.4.0",
7274
"@types/jest": "26.0.23",
7375
"@types/node": "15.6.1",
7476
"@types/passport-jwt": "^3.0.5",

src/submissions/submission.entity.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
/* eslint-disable @typescript-eslint/no-unused-vars */
22
import { ApiProperty } from '@nestjs/swagger';
33
import { User } from 'src/users/user.entity';
4+
import { jsonMember, jsonObject } from 'typedjson';
45
import {
56
BaseEntity,
67
Column,
@@ -11,6 +12,7 @@ import {
1112
UpdateDateColumn,
1213
} from 'typeorm';
1314

15+
@jsonObject
1416
@Entity('submissions')
1517
export class Submission extends BaseEntity {
1618
constructor(partial: Partial<Submission>) {
@@ -19,27 +21,34 @@ export class Submission extends BaseEntity {
1921
}
2022

2123
@ApiProperty()
24+
@jsonMember
2225
@PrimaryGeneratedColumn('uuid')
2326
id: string;
2427

2528
@ApiProperty()
29+
@jsonMember
2630
@Column()
2731
language: string;
2832

2933
@ApiProperty()
34+
@jsonMember
3035
@Column()
3136
code: string;
3237

38+
@jsonMember
3339
@Column()
3440
status: string;
3541

42+
@jsonMember
3643
@Column({ nullable: true })
3744
output: string;
3845

46+
@jsonMember
3947
@ApiProperty()
4048
@CreateDateColumn()
4149
createdAt: Date;
4250

51+
@jsonMember
4352
@ApiProperty()
4453
@UpdateDateColumn()
4554
updatedAt: Date;
@@ -49,5 +58,6 @@ export class Submission extends BaseEntity {
4958
eager: true,
5059
})
5160
@ApiProperty({ type: () => User })
61+
@jsonMember(() => User)
5262
user: User;
5363
}

src/submissions/submissions.controller.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ export class SubmissionsController {
4949
async findOne(
5050
@Param() findSubmissionDTO: FindSubmissionDTO,
5151
): Promise<Submission> {
52-
const submission: Submission | undefined =
53-
await this.submissionsService.findOne(findSubmissionDTO);
52+
// Since this endpoint is used for polling, the service will fetch from cache first and fallback to DB
53+
const submission = await this.submissionsService.findOne(findSubmissionDTO);
5454

5555
if (!submission) {
5656
throw NotFoundException;

src/submissions/submissions.module.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq';
2-
import { forwardRef, Module } from '@nestjs/common';
2+
import { CacheModule, forwardRef, Module } from '@nestjs/common';
33
import { TypeOrmModule } from '@nestjs/typeorm';
44
import { UsersModule } from 'src/users/users.module';
55
import { Submission } from './submission.entity';
@@ -10,6 +10,7 @@ import { SubmissionsService } from './submissions.service';
1010
imports: [
1111
TypeOrmModule.forFeature([Submission]),
1212
forwardRef(() => UsersModule),
13+
CacheModule.register(),
1314
RabbitMQModule.forRoot(RabbitMQModule, {
1415
exchanges: [
1516
{

src/submissions/submissions.service.ts

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
2-
import { Injectable } from '@nestjs/common';
2+
import { CACHE_MANAGER, Inject, Injectable } from '@nestjs/common';
33
import { InjectRepository } from '@nestjs/typeorm';
4+
import { Cache } from 'cache-manager';
45
import { TypedJSON } from 'typedjson';
56
import { Repository } from 'typeorm';
67
import { FindSubmissionDTO } from './dto/find-submission.dto';
@@ -13,6 +14,7 @@ export class SubmissionsService {
1314
constructor(
1415
@InjectRepository(Submission)
1516
private submissionsRepository: Repository<Submission>,
17+
@Inject(CACHE_MANAGER) private cacheManager: Cache,
1618
) {}
1719

1820
async create(insertSubmissionDTO: InsertSubmissionDTO): Promise<Submission> {
@@ -22,7 +24,11 @@ export class SubmissionsService {
2224
return submission.save();
2325
}
2426

25-
async setStatus(id: string, status: string, output?: string): Promise<void> {
27+
async setStatus(
28+
id: string,
29+
status: string,
30+
output?: string,
31+
): Promise<Submission | undefined> {
2632
const submission = await this.submissionsRepository.findOne({
2733
id,
2834
});
@@ -32,13 +38,28 @@ export class SubmissionsService {
3238
submission.output = output;
3339
}
3440
await submission.save();
41+
return submission;
3542
}
43+
return undefined;
3644
}
3745

3846
async findOne(
39-
submission: FindSubmissionDTO,
47+
queriedSubmission: FindSubmissionDTO,
4048
): Promise<Submission | undefined> {
41-
return this.submissionsRepository.findOne({ id: submission.id });
49+
// First, try to get from cache
50+
const cachedSubmission = await this.cacheManager.get(queriedSubmission.id);
51+
52+
if (cachedSubmission !== '') {
53+
const serializer = new TypedJSON(Submission);
54+
55+
const submission = serializer.parse(cachedSubmission);
56+
if (submission) {
57+
return submission;
58+
}
59+
}
60+
61+
// Fallback to DB
62+
return this.submissionsRepository.findOne({ id: queriedSubmission.id });
4263
}
4364

4465
@RabbitSubscribe({
@@ -50,11 +71,27 @@ export class SubmissionsService {
5071
// TODO: use logger instead
5172
console.log(`Received job status: ${JSON.stringify(msg)}`);
5273

53-
const serializer = new TypedJSON(JobStatusDTO);
74+
const jobSerializer = new TypedJSON(JobStatusDTO);
75+
const jobStatus = jobSerializer.parse(msg);
5476

55-
const jobStatus = serializer.parse(msg);
5677
if (jobStatus) {
57-
await this.setStatus(jobStatus.id, jobStatus.status, jobStatus.stdout);
78+
// Set in DB
79+
const submission = await this.setStatus(
80+
jobStatus.id,
81+
jobStatus.status,
82+
jobStatus.stdout,
83+
);
84+
85+
if (submission) {
86+
const submissionSerializer = new TypedJSON(Submission);
87+
88+
// Set in cache to speed up polling
89+
await this.cacheManager.set(
90+
jobStatus.id,
91+
submissionSerializer.stringify(submission),
92+
{ ttl: 600 },
93+
);
94+
}
5895
}
5996
}
6097
}

0 commit comments

Comments
 (0)