Skip to content

Commit 45bcc84

Browse files
committed
refactor some conat server code so that sticky routing and normal routing take a different code path, so can make sticky be async
1 parent d317ca0 commit 45bcc84

File tree

1 file changed

+59
-53
lines changed

1 file changed

+59
-53
lines changed

src/packages/conat/core/server.ts

Lines changed: 59 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -744,12 +744,17 @@ export class ConatServer extends EventEmitter {
744744
}
745745
// send to exactly one in each queue group
746746
for (const queue in g) {
747-
const target = this.loadBalance({
748-
pattern,
749-
subject,
750-
queue,
751-
targets: g[queue],
752-
});
747+
let target;
748+
const targets = g[queue];
749+
if (queue == STICKY_QUEUE_GROUP) {
750+
target = await this.stickyLoadBalance({
751+
pattern,
752+
subject,
753+
targets,
754+
});
755+
} else {
756+
target = this.loadBalance(targets);
757+
}
753758
if (target !== undefined) {
754759
this.io.to(target).emit(pattern, { subject, data });
755760
if (!isSilentPattern(pattern)) {
@@ -773,12 +778,17 @@ export class ConatServer extends EventEmitter {
773778
for (const pattern in clusterInterest) {
774779
const g = clusterInterest[pattern];
775780
for (const queue in g) {
776-
const t = this.clusterLoadBalance({
777-
pattern,
778-
subject,
779-
queue,
780-
targets: g[queue],
781-
});
781+
let t;
782+
const targets = g[queue];
783+
if (queue == STICKY_QUEUE_GROUP) {
784+
t = await this.stickyClusterLoadBalance({
785+
pattern,
786+
subject,
787+
targets,
788+
});
789+
} else {
790+
t = this.clusterLoadBalance(targets);
791+
}
782792
if (t !== undefined) {
783793
const { id, target } = t;
784794
if (id == this.id) {
@@ -824,78 +834,74 @@ export class ConatServer extends EventEmitter {
824834
link?.client.conn.emit("publish", data1);
825835
}
826836

827-
//
828-
// TODO: Supercluster routing. NOT IMPLEMENTED YET
829-
//
830-
// // if no matches in local cluster, try the supercluster (if there is one)
831-
// if (count == 0) {
832-
// // nothing in this cluster, so try other clusters
833-
// for (const clusterName in this.clusterLinks) {
834-
// if (clusterName == this.clusterName) continue;
835-
// const links = this.clusterLinks[clusterName];
836-
// for (const id in links) {
837-
// const link = links[id];
838-
// const count2 = link.publish({ subject, data, queueGroups });
839-
// if (count2 > 0) {
840-
// count += count2;
841-
// // once we publish to any other cluster, we are done.
842-
// break;
843-
// }
844-
// }
845-
// }
846-
// }
847-
848837
return count;
849838
};
850839

851840
///////////////////////////////////////
852841
// WHO GETS PUBLISHED MESSAGE:
853842
///////////////////////////////////////
854-
private loadBalance = ({
843+
private loadBalance = (targets: Set<string>): string | undefined => {
844+
if (targets.size == 0) {
845+
return undefined;
846+
}
847+
return randomChoice(targets);
848+
};
849+
850+
clusterLoadBalance = (targets0: {
851+
[id: string]: Set<string>;
852+
}): { id: string; target: string } | undefined => {
853+
const targets = new Set<string>();
854+
for (const id in targets0) {
855+
for (const target of targets0[id]) {
856+
targets.add(JSON.stringify({ id, target }));
857+
}
858+
}
859+
const x = this.loadBalance(targets);
860+
if (!x) {
861+
return undefined;
862+
}
863+
return JSON.parse(x);
864+
};
865+
866+
// sticky versions
867+
868+
private stickyLoadBalance = async ({
855869
pattern,
856870
subject,
857-
queue,
858871
targets,
859872
}: {
860873
pattern: string;
861874
subject: string;
862-
queue: string;
863875
targets: Set<string>;
864-
}): string | undefined => {
876+
}): Promise<string | undefined> => {
865877
if (targets.size == 0) {
866878
return undefined;
867879
}
868-
if (queue == STICKY_QUEUE_GROUP) {
869-
return stickyChoice({
870-
pattern,
871-
subject,
872-
targets,
873-
updateSticky: this.updateSticky,
874-
getStickyTarget: this.getStickyTarget,
875-
});
876-
} else {
877-
return randomChoice(targets);
878-
}
880+
return stickyChoice({
881+
pattern,
882+
subject,
883+
targets,
884+
updateSticky: this.updateSticky,
885+
getStickyTarget: this.getStickyTarget,
886+
});
879887
};
880888

881-
clusterLoadBalance = ({
889+
stickyClusterLoadBalance = async ({
882890
pattern,
883891
subject,
884-
queue,
885892
targets: targets0,
886893
}: {
887894
pattern: string;
888895
subject: string;
889-
queue: string;
890896
targets: { [id: string]: Set<string> };
891-
}): { id: string; target: string } | undefined => {
897+
}): Promise<{ id: string; target: string } | undefined> => {
892898
const targets = new Set<string>();
893899
for (const id in targets0) {
894900
for (const target of targets0[id]) {
895901
targets.add(JSON.stringify({ id, target }));
896902
}
897903
}
898-
const x = this.loadBalance({ pattern, subject, queue, targets });
904+
const x = await this.stickyLoadBalance({ pattern, subject, targets });
899905
if (!x) {
900906
return undefined;
901907
}

0 commit comments

Comments
 (0)