Skip to content

Commit 3c149cf

Browse files
committed
Update notifications on updates to subscription
1 parent 2baf387 commit 3c149cf

File tree

6 files changed

+130
-7
lines changed

6 files changed

+130
-7
lines changed

components/dashboard/src/AppNotifications.tsx

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,22 @@ export function AppNotifications() {
1919
setNotifications(localState);
2020
return;
2121
}
22-
(async () => {
23-
const serverState = await getGitpodService().server.getNotifications();
24-
setNotifications(serverState);
25-
if (serverState.length > 0) {
26-
setLocalStorageObject(KEY_APP_NOTIFICATIONS, serverState, /* expires in */ 60 /* seconds */);
27-
}
28-
})();
22+
reloadNotifications().catch(console.error);
23+
24+
getGitpodService().registerClient({
25+
onNotificationUpdated: () => reloadNotifications().catch(console.error),
26+
});
2927
}, []);
3028

29+
const reloadNotifications = async () => {
30+
const serverState = await getGitpodService().server.getNotifications();
31+
setNotifications(serverState);
32+
removeLocalStorageObject(KEY_APP_NOTIFICATIONS);
33+
if (serverState.length > 0) {
34+
setLocalStorageObject(KEY_APP_NOTIFICATIONS, serverState, /* expires in */ 300 /* seconds */);
35+
}
36+
};
37+
3138
const topNotification = notifications[0];
3239
if (topNotification === undefined) {
3340
return null;

components/gitpod-messagebus/src/messagebus.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ export interface MessageBusHelper {
6161
* @param topic the topic to parse
6262
*/
6363
getWsInformationFromTopic(topic: string): WorkspaceTopic | undefined;
64+
65+
getSubscriptionUpdateTopic(attributionId?: string): string;
6466
}
6567

6668
export const WorkspaceTopic = Symbol("WorkspaceTopic");
@@ -89,6 +91,10 @@ export class MessageBusHelperImpl implements MessageBusHelper {
8991
await ch.assertExchange(this.workspaceExchange, "topic", { durable: true });
9092
}
9193

94+
getSubscriptionUpdateTopic(attributionId: string | undefined): string {
95+
return `subscription.${attributionId || "*"}.update`;
96+
}
97+
9298
/**
9399
* Computes the topic name of for listening to a workspace.
94100
*

components/gitpod-protocol/src/gitpod-service.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ export interface GitpodClient {
7070

7171
onPrebuildUpdate(update: PrebuildWithStatus): void;
7272

73+
onNotificationUpdated(): void;
74+
7375
onCreditAlert(creditAlert: CreditAlert): void;
7476

7577
//#region propagating reconnection to iframe
@@ -570,6 +572,18 @@ export class GitpodCompositeClient<Client extends GitpodClient> implements Gitpo
570572
}
571573
}
572574
}
575+
576+
onNotificationUpdated(): void {
577+
for (const client of this.clients) {
578+
if (client.onNotificationUpdated) {
579+
try {
580+
client.onNotificationUpdated();
581+
} catch (error) {
582+
console.error(error);
583+
}
584+
}
585+
}
586+
}
573587
}
574588

575589
export type GitpodService = GitpodServiceImpl<GitpodClient, GitpodServer>;

components/server/ee/src/workspace/gitpod-server-impl.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ import { getExperimentsClientForBackend } from "@gitpod/gitpod-protocol/lib/expe
118118
import { BillingService } from "../billing/billing-service";
119119
import Stripe from "stripe";
120120
import { UsageServiceDefinition } from "@gitpod/usage-api/lib/usage/v1/usage.pb";
121+
import { MessageBusIntegration } from "../../../src/workspace/messagebus-integration";
121122

122123
@injectable()
123124
export class GitpodServerEEImpl extends GitpodServerImpl {
@@ -165,6 +166,8 @@ export class GitpodServerEEImpl extends GitpodServerImpl {
165166
@inject(BillingModes) protected readonly billingModes: BillingModes;
166167
@inject(BillingService) protected readonly billingService: BillingService;
167168

169+
@inject(MessageBusIntegration) protected readonly messageBus: MessageBusIntegration;
170+
168171
initialize(
169172
client: GitpodClient | undefined,
170173
user: User | undefined,
@@ -177,6 +180,7 @@ export class GitpodServerEEImpl extends GitpodServerImpl {
177180

178181
this.listenToCreditAlerts();
179182
this.listenForPrebuildUpdates().catch((err) => log.error("error registering for prebuild updates", err));
183+
this.listenForSubscriptionUpdates().catch((err) => log.error("error registering for prebuild updates", err));
180184
}
181185

182186
protected async listenForPrebuildUpdates() {
@@ -204,6 +208,32 @@ export class GitpodServerEEImpl extends GitpodServerImpl {
204208
// TODO(at) we need to keep the list of accessible project up to date
205209
}
206210

211+
protected async listenForSubscriptionUpdates() {
212+
if (!this.user) {
213+
return;
214+
}
215+
const teamIds = (await this.teamDB.findTeamsByUser(this.user.id)).map(({ id }) =>
216+
AttributionId.render({ kind: "team", teamId: id }),
217+
);
218+
for (const attributionId of [AttributionId.render({ kind: "user", userId: this.user.id }), ...teamIds]) {
219+
this.disposables.push(
220+
this.localMessageBroker.listenForSubscriptionUpdates(
221+
attributionId,
222+
(ctx: TraceContext, attributionId: AttributionId) =>
223+
TraceContext.withSpan(
224+
"forwardSubscriptionUpdateToClient",
225+
(ctx) => {
226+
traceClientMetadata(ctx, this.clientMetadata);
227+
TraceContext.setJsonRPCMetadata(ctx, "onSubscriptionUpdate");
228+
this.client?.onNotificationUpdated();
229+
},
230+
ctx,
231+
),
232+
),
233+
);
234+
}
235+
}
236+
207237
protected async getAccessibleProjects() {
208238
if (!this.user) {
209239
return [];
@@ -2161,6 +2191,8 @@ export class GitpodServerEEImpl extends GitpodServerImpl {
21612191
billingStrategy: CostCenter_BillingStrategy.BILLING_STRATEGY_STRIPE,
21622192
},
21632193
});
2194+
2195+
this.messageBus.notifyOnSubscriptionUpdate(ctx, attrId).catch();
21642196
} catch (error) {
21652197
log.error(`Failed to subscribe '${attributionId}' to Stripe`, error);
21662198
throw new ResponseError(
@@ -2262,6 +2294,8 @@ export class GitpodServerEEImpl extends GitpodServerImpl {
22622294
response?.costCenter?.billingStrategy || CostCenter_BillingStrategy.BILLING_STRATEGY_OTHER,
22632295
},
22642296
});
2297+
2298+
this.messageBus.notifyOnSubscriptionUpdate(ctx, attributionId).catch();
22652299
}
22662300

22672301
async getUsageLimitForTeam(ctx: TraceContext, teamId: string): Promise<number | undefined> {

components/server/src/messaging/local-message-broker.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
WorkspaceInstance,
1313
} from "@gitpod/gitpod-protocol";
1414
import { CreditAlert } from "@gitpod/gitpod-protocol/lib/accounting-protocol";
15+
import { AttributionId } from "@gitpod/gitpod-protocol/lib/attribution";
1516
import { log } from "@gitpod/gitpod-protocol/lib/util/logging";
1617
import { TraceContext } from "@gitpod/gitpod-protocol/lib/util/tracing";
1718
import { inject, injectable } from "inversify";
@@ -20,6 +21,9 @@ import { MessageBusIntegration } from "../workspace/messagebus-integration";
2021
export interface PrebuildUpdateListener {
2122
(ctx: TraceContext, evt: PrebuildWithStatus): void;
2223
}
24+
export interface SubscriptionUpdateListener {
25+
(ctx: TraceContext, attributionId: AttributionId): void;
26+
}
2327
export interface CreditAlertListener {
2428
(ctx: TraceContext, alert: CreditAlert): void;
2529
}
@@ -38,6 +42,8 @@ export interface LocalMessageBroker {
3842

3943
listenForPrebuildUpdates(projectId: string, listener: PrebuildUpdateListener): Disposable;
4044

45+
listenForSubscriptionUpdates(attributionId: string, listener: SubscriptionUpdateListener): Disposable;
46+
4147
listenToCreditAlerts(userId: string, listener: CreditAlertListener): Disposable;
4248

4349
listenForPrebuildUpdatableEvents(listener: HeadlessWorkspaceEventListener): Disposable;
@@ -69,6 +75,7 @@ export class LocalRabbitMQBackedMessageBroker implements LocalMessageBroker {
6975
protected creditAlertsListeners: Map<string, CreditAlertListener[]> = new Map();
7076
protected headlessWorkspaceEventListeners: Map<string, HeadlessWorkspaceEventListener[]> = new Map();
7177
protected workspaceInstanceUpdateListeners: Map<string, WorkspaceInstanceUpdateListener[]> = new Map();
78+
protected subscriptionUpdateListeners: Map<string, SubscriptionUpdateListener[]> = new Map();
7279

7380
protected readonly disposables = new DisposableCollection();
7481

@@ -151,6 +158,24 @@ export class LocalRabbitMQBackedMessageBroker implements LocalMessageBroker {
151158
},
152159
),
153160
);
161+
this.disposables.push(
162+
this.messageBusIntegration.listenToSubscriptionUpdates(
163+
(ctx: TraceContext, attributionId: AttributionId) => {
164+
TraceContext.setOWI(ctx, {});
165+
166+
const listeners = this.subscriptionUpdateListeners.get(AttributionId.render(attributionId)) || [];
167+
168+
for (const l of listeners) {
169+
try {
170+
l(ctx, attributionId);
171+
} catch (err) {
172+
TraceContext.setError(ctx, err);
173+
log.error("listenToSubscriptionUpdates", err, { attributionId });
174+
}
175+
}
176+
},
177+
),
178+
);
154179
}
155180

156181
async stop() {
@@ -165,6 +190,10 @@ export class LocalRabbitMQBackedMessageBroker implements LocalMessageBroker {
165190
return this.doRegister(userId, listener, this.creditAlertsListeners);
166191
}
167192

193+
listenForSubscriptionUpdates(attributionId: string, listener: SubscriptionUpdateListener): Disposable {
194+
return this.doRegister(attributionId, listener, this.subscriptionUpdateListeners);
195+
}
196+
168197
listenForPrebuildUpdatableEvents(listener: HeadlessWorkspaceEventListener): Disposable {
169198
// we're being cheap here in re-using a map where it just needs to be a plain array.
170199
return this.doRegister(

components/server/src/workspace/messagebus-integration.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import * as opentracing from "opentracing";
2222
import { CancellationTokenSource } from "vscode-ws-jsonrpc";
2323
import { increaseMessagebusTopicReads } from "../prometheus-metrics";
2424
import { CreditAlert } from "@gitpod/gitpod-protocol/lib/accounting-protocol";
25+
import { AttributionId } from "@gitpod/gitpod-protocol/lib/attribution";
2526

2627
interface WorkspaceInstanceUpdateCallback {
2728
(ctx: TraceContext, instance: WorkspaceInstance, ownerId: string | undefined): void;
@@ -72,6 +73,16 @@ export class CreditAlertListener extends AbstractTopicListener<CreditAlert> {
7273
}
7374
}
7475

76+
export class SubscriptionUpdateListener extends AbstractTopicListener<AttributionId> {
77+
constructor(protected messageBusHelper: MessageBusHelper, listener: TopicListener<AttributionId>) {
78+
super(messageBusHelper.workspaceExchange, listener);
79+
}
80+
81+
topic() {
82+
return this.messageBusHelper.getSubscriptionUpdateTopic();
83+
}
84+
}
85+
7586
export class PrebuildUpdatableQueueListener implements MessagebusListener {
7687
protected channel: Channel | undefined;
7788
protected consumerTag: string | undefined;
@@ -208,6 +219,28 @@ export class MessageBusIntegration extends AbstractMessageBusIntegration {
208219
return Disposable.create(() => cancellationTokenSource.cancel());
209220
}
210221

222+
async notifyOnSubscriptionUpdate(ctx: TraceContext, attributionId: AttributionId) {
223+
if (!this.channel) {
224+
throw new Error("Not connected to message bus");
225+
}
226+
const topic = this.messageBusHelper.getSubscriptionUpdateTopic(AttributionId.render(attributionId));
227+
const msg = Buffer.from(JSON.stringify(attributionId));
228+
await this.messageBusHelper.assertWorkspaceExchange(this.channel);
229+
230+
await super.publish(MessageBusHelperImpl.WORKSPACE_EXCHANGE_LOCAL, topic, msg, {
231+
trace: ctx,
232+
});
233+
}
234+
235+
listenToSubscriptionUpdates(callback: (ctx: TraceContext, attributionId: AttributionId) => void): Disposable {
236+
const listener = new SubscriptionUpdateListener(this.messageBusHelper, callback);
237+
const cancellationTokenSource = new CancellationTokenSource();
238+
this.listen(listener, cancellationTokenSource.token).catch((err) => {
239+
/** ignore */
240+
});
241+
return Disposable.create(() => cancellationTokenSource.cancel());
242+
}
243+
211244
async notifyOnPrebuildUpdate(prebuildInfo: PrebuildWithStatus) {
212245
if (!this.channel) {
213246
throw new Error("Not connected to message bus");

0 commit comments

Comments
 (0)