Skip to content

Commit f723601

Browse files
committed
Update notifications on updates to subscription
1 parent 980e54e commit f723601

File tree

6 files changed

+132
-7
lines changed

6 files changed

+132
-7
lines changed

components/dashboard/src/AppNotifications.tsx

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,22 @@ export function AppNotifications() {
1919
setNotifications(localState);
2020
return;
2121
}
22-
(async () => {
23-
const serverState = await getGitpodService().server.getNotifications();
24-
setNotifications(serverState);
25-
if (serverState.length > 0) {
26-
setLocalStorageObject(KEY_APP_NOTIFICATIONS, serverState, /* expires in */ 60 /* seconds */);
27-
}
28-
})();
22+
reloadNotifications().catch(console.error);
23+
24+
getGitpodService().registerClient({
25+
onSubscriptionUpdate: () => reloadNotifications().catch(console.error),
26+
});
2927
}, []);
3028

29+
const reloadNotifications = async () => {
30+
const serverState = await getGitpodService().server.getNotifications();
31+
setNotifications(serverState);
32+
removeLocalStorageObject(KEY_APP_NOTIFICATIONS);
33+
if (serverState.length > 0) {
34+
setLocalStorageObject(KEY_APP_NOTIFICATIONS, serverState, /* expires in */ 300 /* seconds */);
35+
}
36+
};
37+
3138
const topNotification = notifications[0];
3239
if (topNotification === undefined) {
3340
return null;

components/gitpod-messagebus/src/messagebus.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ export interface MessageBusHelper {
6161
* @param topic the topic to parse
6262
*/
6363
getWsInformationFromTopic(topic: string): WorkspaceTopic | undefined;
64+
65+
getSubscriptionUpdateTopic(attributionId?: string): string;
6466
}
6567

6668
export const WorkspaceTopic = Symbol("WorkspaceTopic");
@@ -89,6 +91,10 @@ export class MessageBusHelperImpl implements MessageBusHelper {
8991
await ch.assertExchange(this.workspaceExchange, "topic", { durable: true });
9092
}
9193

94+
getSubscriptionUpdateTopic(attributionId: string | undefined): string {
95+
return `subscription.${attributionId || "*"}.update`;
96+
}
97+
9298
/**
9399
* Computes the topic name of for listening to a workspace.
94100
*

components/gitpod-protocol/src/gitpod-service.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ export interface GitpodClient {
7070

7171
onPrebuildUpdate(update: PrebuildWithStatus): void;
7272

73+
onSubscriptionUpdate(attributionId: string): void;
74+
7375
onCreditAlert(creditAlert: CreditAlert): void;
7476

7577
//#region propagating reconnection to iframe
@@ -571,6 +573,18 @@ export class GitpodCompositeClient<Client extends GitpodClient> implements Gitpo
571573
}
572574
}
573575
}
576+
577+
onSubscriptionUpdate(attributionId: string): void {
578+
for (const client of this.clients) {
579+
if (client.onSubscriptionUpdate) {
580+
try {
581+
client.onSubscriptionUpdate(attributionId);
582+
} catch (error) {
583+
console.error(error);
584+
}
585+
}
586+
}
587+
}
574588
}
575589

576590
export type GitpodService = GitpodServiceImpl<GitpodClient, GitpodServer>;

components/server/ee/src/workspace/gitpod-server-impl.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ import { BillingModes } from "../billing/billing-mode";
115115
import { getExperimentsClientForBackend } from "@gitpod/gitpod-protocol/lib/experiments/configcat-server";
116116
import { BillingService } from "../billing/billing-service";
117117
import Stripe from "stripe";
118+
import { MessageBusIntegration } from "../../../src/workspace/messagebus-integration";
118119

119120
@injectable()
120121
export class GitpodServerEEImpl extends GitpodServerImpl {
@@ -162,6 +163,8 @@ export class GitpodServerEEImpl extends GitpodServerImpl {
162163
@inject(BillingModes) protected readonly billingModes: BillingModes;
163164
@inject(BillingService) protected readonly billingService: BillingService;
164165

166+
@inject(MessageBusIntegration) protected readonly messageBus: MessageBusIntegration;
167+
165168
initialize(
166169
client: GitpodClient | undefined,
167170
user: User | undefined,
@@ -174,6 +177,7 @@ export class GitpodServerEEImpl extends GitpodServerImpl {
174177

175178
this.listenToCreditAlerts();
176179
this.listenForPrebuildUpdates().catch((err) => log.error("error registering for prebuild updates", err));
180+
this.listenForSubscriptionUpdates().catch((err) => log.error("error registering for prebuild updates", err));
177181
}
178182

179183
protected async listenForPrebuildUpdates() {
@@ -201,6 +205,32 @@ export class GitpodServerEEImpl extends GitpodServerImpl {
201205
// TODO(at) we need to keep the list of accessible project up to date
202206
}
203207

208+
protected async listenForSubscriptionUpdates() {
209+
if (!this.user) {
210+
return;
211+
}
212+
const teamIds = (await this.teamDB.findTeamsByUser(this.user.id)).map(({ id }) =>
213+
AttributionId.render({ kind: "team", teamId: id }),
214+
);
215+
for (const attributionId of [AttributionId.render({ kind: "user", userId: this.user.id }), ...teamIds]) {
216+
this.disposables.push(
217+
this.localMessageBroker.listenForSubscriptionUpdates(
218+
attributionId,
219+
(ctx: TraceContext, attributionId: AttributionId) =>
220+
TraceContext.withSpan(
221+
"forwardSubscriptionUpdateToClient",
222+
(ctx) => {
223+
traceClientMetadata(ctx, this.clientMetadata);
224+
TraceContext.setJsonRPCMetadata(ctx, "onSubscriptionUpdate");
225+
this.client?.onSubscriptionUpdate(AttributionId.render(attributionId));
226+
},
227+
ctx,
228+
),
229+
),
230+
);
231+
}
232+
}
233+
204234
protected async getAccessibleProjects() {
205235
if (!this.user) {
206236
return [];
@@ -2154,6 +2184,9 @@ export class GitpodServerEEImpl extends GitpodServerImpl {
21542184
spendingLimit: this.defaultSpendingLimit,
21552185
billingStrategy: "stripe",
21562186
});
2187+
2188+
console.log("notifyOnSubscriptionUpdate");
2189+
this.messageBus.notifyOnSubscriptionUpdate(ctx, attributionId).catch();
21572190
} catch (error) {
21582191
log.error(`Failed to subscribe '${attributionId}' to Stripe`, error);
21592192
throw new ResponseError(
@@ -2241,6 +2274,9 @@ export class GitpodServerEEImpl extends GitpodServerImpl {
22412274
spendingLimit: usageLimit,
22422275
billingStrategy: costCenter?.billingStrategy || "other",
22432276
});
2277+
2278+
console.log("notifyOnSubscriptionUpdate");
2279+
this.messageBus.notifyOnSubscriptionUpdate(ctx, attributionId).catch();
22442280
}
22452281

22462282
async getNotifications(ctx: TraceContext): Promise<string[]> {

components/server/src/messaging/local-message-broker.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
WorkspaceInstance,
1313
} from "@gitpod/gitpod-protocol";
1414
import { CreditAlert } from "@gitpod/gitpod-protocol/lib/accounting-protocol";
15+
import { AttributionId } from "@gitpod/gitpod-protocol/lib/attribution";
1516
import { log } from "@gitpod/gitpod-protocol/lib/util/logging";
1617
import { TraceContext } from "@gitpod/gitpod-protocol/lib/util/tracing";
1718
import { inject, injectable } from "inversify";
@@ -20,6 +21,9 @@ import { MessageBusIntegration } from "../workspace/messagebus-integration";
2021
export interface PrebuildUpdateListener {
2122
(ctx: TraceContext, evt: PrebuildWithStatus): void;
2223
}
24+
export interface SubscriptionUpdateListener {
25+
(ctx: TraceContext, attributionId: AttributionId): void;
26+
}
2327
export interface CreditAlertListener {
2428
(ctx: TraceContext, alert: CreditAlert): void;
2529
}
@@ -38,6 +42,8 @@ export interface LocalMessageBroker {
3842

3943
listenForPrebuildUpdates(projectId: string, listener: PrebuildUpdateListener): Disposable;
4044

45+
listenForSubscriptionUpdates(attributionId: string, listener: SubscriptionUpdateListener): Disposable;
46+
4147
listenToCreditAlerts(userId: string, listener: CreditAlertListener): Disposable;
4248

4349
listenForPrebuildUpdatableEvents(listener: HeadlessWorkspaceEventListener): Disposable;
@@ -69,6 +75,7 @@ export class LocalRabbitMQBackedMessageBroker implements LocalMessageBroker {
6975
protected creditAlertsListeners: Map<string, CreditAlertListener[]> = new Map();
7076
protected headlessWorkspaceEventListeners: Map<string, HeadlessWorkspaceEventListener[]> = new Map();
7177
protected workspaceInstanceUpdateListeners: Map<string, WorkspaceInstanceUpdateListener[]> = new Map();
78+
protected subscriptionUpdateListeners: Map<string, SubscriptionUpdateListener[]> = new Map();
7279

7380
protected readonly disposables = new DisposableCollection();
7481

@@ -151,6 +158,24 @@ export class LocalRabbitMQBackedMessageBroker implements LocalMessageBroker {
151158
},
152159
),
153160
);
161+
this.disposables.push(
162+
this.messageBusIntegration.listenToSubscriptionUpdates(
163+
(ctx: TraceContext, attributionId: AttributionId) => {
164+
TraceContext.setOWI(ctx, {});
165+
166+
const listeners = this.subscriptionUpdateListeners.get(AttributionId.render(attributionId)) || [];
167+
168+
for (const l of listeners) {
169+
try {
170+
l(ctx, attributionId);
171+
} catch (err) {
172+
TraceContext.setError(ctx, err);
173+
log.error("listenForWorkspaceInstanceUpdates", err, { attributionId });
174+
}
175+
}
176+
},
177+
),
178+
);
154179
}
155180

156181
async stop() {
@@ -165,6 +190,10 @@ export class LocalRabbitMQBackedMessageBroker implements LocalMessageBroker {
165190
return this.doRegister(userId, listener, this.creditAlertsListeners);
166191
}
167192

193+
listenForSubscriptionUpdates(attributionId: string, listener: SubscriptionUpdateListener): Disposable {
194+
return this.doRegister(attributionId, listener, this.subscriptionUpdateListeners);
195+
}
196+
168197
listenForPrebuildUpdatableEvents(listener: HeadlessWorkspaceEventListener): Disposable {
169198
// we're being cheap here in re-using a map where it just needs to be a plain array.
170199
return this.doRegister(

components/server/src/workspace/messagebus-integration.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import * as opentracing from "opentracing";
2222
import { CancellationTokenSource } from "vscode-ws-jsonrpc";
2323
import { increaseMessagebusTopicReads } from "../prometheus-metrics";
2424
import { CreditAlert } from "@gitpod/gitpod-protocol/lib/accounting-protocol";
25+
import { AttributionId } from "@gitpod/gitpod-protocol/lib/attribution";
2526

2627
interface WorkspaceInstanceUpdateCallback {
2728
(ctx: TraceContext, instance: WorkspaceInstance, ownerId: string | undefined): void;
@@ -72,6 +73,16 @@ export class CreditAlertListener extends AbstractTopicListener<CreditAlert> {
7273
}
7374
}
7475

76+
export class SubscriptionUpdateListener extends AbstractTopicListener<AttributionId> {
77+
constructor(protected messageBusHelper: MessageBusHelper, listener: TopicListener<AttributionId>) {
78+
super(messageBusHelper.workspaceExchange, listener);
79+
}
80+
81+
topic() {
82+
return this.messageBusHelper.getSubscriptionUpdateTopic();
83+
}
84+
}
85+
7586
export class PrebuildUpdatableQueueListener implements MessagebusListener {
7687
protected channel: Channel | undefined;
7788
protected consumerTag: string | undefined;
@@ -208,6 +219,28 @@ export class MessageBusIntegration extends AbstractMessageBusIntegration {
208219
return Disposable.create(() => cancellationTokenSource.cancel());
209220
}
210221

222+
async notifyOnSubscriptionUpdate(ctx: TraceContext, attributionId: AttributionId) {
223+
if (!this.channel) {
224+
throw new Error("Not connected to message bus");
225+
}
226+
const topic = this.messageBusHelper.getSubscriptionUpdateTopic(AttributionId.render(attributionId));
227+
const msg = Buffer.from(JSON.stringify(attributionId));
228+
await this.messageBusHelper.assertWorkspaceExchange(this.channel);
229+
230+
await super.publish(MessageBusHelperImpl.WORKSPACE_EXCHANGE_LOCAL, topic, msg, {
231+
trace: ctx,
232+
});
233+
}
234+
235+
listenToSubscriptionUpdates(callback: (ctx: TraceContext, attributionId: AttributionId) => void): Disposable {
236+
const listener = new SubscriptionUpdateListener(this.messageBusHelper, callback);
237+
const cancellationTokenSource = new CancellationTokenSource();
238+
this.listen(listener, cancellationTokenSource.token).catch((err) => {
239+
/** ignore */
240+
});
241+
return Disposable.create(() => cancellationTokenSource.cancel());
242+
}
243+
211244
async notifyOnPrebuildUpdate(prebuildInfo: PrebuildWithStatus) {
212245
if (!this.channel) {
213246
throw new Error("Not connected to message bus");

0 commit comments

Comments
 (0)