From 3290c69ad3efc692311febd0c1efa15a0a90816c Mon Sep 17 00:00:00 2001
From: Gero Posmyk-Leinemann <gero@gitpod.io>
Date: Tue, 21 Sep 2021 10:02:54 +0000
Subject: [PATCH 1/2] [protocol] connection: buffer messages until reconnect

This avoids Error("Connection got disposed") errors which gobble up to the frontend and block users.
---
 components/dashboard/src/Login.tsx            |   2 +-
 .../src/messaging/browser/connection.ts       | 170 +++++++++++++++++-
 2 files changed, 164 insertions(+), 8 deletions(-)

diff --git a/components/dashboard/src/Login.tsx b/components/dashboard/src/Login.tsx
index e67bca5f2024de..71df482bdecef1 100644
--- a/components/dashboard/src/Login.tsx
+++ b/components/dashboard/src/Login.tsx
@@ -53,7 +53,7 @@ export function Login() {
     }, [])
 
     const authorizeSuccessful = async (payload?: string) => {
-        updateUser();
+        updateUser().catch(console.error);
         // Check for a valid returnTo in payload
         const safeReturnTo = getSafeURLRedirect(payload);
         if (safeReturnTo) {
diff --git a/components/gitpod-protocol/src/messaging/browser/connection.ts b/components/gitpod-protocol/src/messaging/browser/connection.ts
index 9adc452a7795da..3b495b6123b2a2 100644
--- a/components/gitpod-protocol/src/messaging/browser/connection.ts
+++ b/components/gitpod-protocol/src/messaging/browser/connection.ts
@@ -5,10 +5,13 @@
  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
  */
 
-import { listen as doListen, Logger, ConsoleLogger } from "vscode-ws-jsonrpc";
+import { Logger, ConsoleLogger, toSocket, IWebSocket } from "vscode-ws-jsonrpc";
+import { MessageConnection, createMessageConnection } from "vscode-jsonrpc";
+import { AbstractMessageWriter } from "vscode-jsonrpc/lib/messageWriter";
+import { AbstractMessageReader } from "vscode-jsonrpc/lib/messageReader";
 import { JsonRpcProxyFactory, JsonRpcProxy } from "../proxy-factory";
 import { ConnectionHandler } from "../handler";
-import ReconnectingWebSocket from 'reconnecting-websocket';
+import ReconnectingWebSocket, { Event } from 'reconnecting-websocket';
 
 export interface WebSocketOptions {
     onerror?: (event: Event) => void;
@@ -64,11 +67,11 @@ export class WebSocketConnectionProvider {
                 logger.error(JSON.stringify(error));
             });
         }
-        doListen({
-            webSocket,
-            onConnection: connection => handler.onConnection(connection),
-            logger
-        });
+        doListen(
+            webSocket as any as ReconnectingWebSocket,
+            connection => handler.onConnection(connection),
+            logger,
+        );
         return webSocket;
     }
 
@@ -91,3 +94,156 @@ export class WebSocketConnectionProvider {
     }
 
 }
+
+// The following was extracted from vscode-ws-jsonrpc to make these changes:
+//  - switch from WebSocket to ReconnectingWebSocket
+//  - webSocket.onopen: making sure it's only ever called once so we're re-using MessageConnection
+//  - WebSocketMessageWriter: buffer and re-try messages instead of throwing an error immidiately
+//  - WebSocketMessageReader: don't close MessageConnection on 'socket.onclose'
+function doListen(resocket: ReconnectingWebSocket, onConnection: (connection: MessageConnection) => void, logger: Logger) {
+    let alreadyOpened = false;
+    resocket.onopen = () => {
+        if (alreadyOpened) {
+            return;
+        }
+        alreadyOpened = true;
+
+        const connection = createWebSocketConnection(resocket, logger);
+        onConnection(connection);
+    };
+}
+
+function createWebSocketConnection(resocket: ReconnectingWebSocket, logger: Logger) {
+    const socket = toSocket(resocket as any);
+    const messageReader = new NonClosingWebSocketMessageReader(socket);
+    const messageWriter = new BufferingWebSocketMessageWriter(resocket, logger);
+    const connection = createMessageConnection(messageReader, messageWriter, logger);
+    connection.onClose(() => connection.dispose());
+    return connection;
+}
+
+/**
+ * This takes vscode-ws-jsonrpc/lib/socket/writer/WebSocketMessageWriter and adds a buffer
+ */
+class BufferingWebSocketMessageWriter extends AbstractMessageWriter {
+    protected readonly socket: ReconnectingWebSocket;
+    protected readonly logger: Logger;
+    protected errorCount: number = 0;
+
+    protected buffer: any[] = [];
+
+    constructor(socket: ReconnectingWebSocket, logger: Logger) {
+        super();
+        this.socket = socket;
+        this.logger = logger;
+
+        socket.addEventListener("open", (event: Event) => this.flushBuffer());
+    }
+
+    write(msg: any) {
+        if (this.socket.readyState !== ReconnectingWebSocket.OPEN) {
+            this.bufferMsg(msg);
+            return;
+        }
+
+        try {
+            const content = JSON.stringify(msg);
+            this.socket.send(content);
+        } catch (e) {
+            this.errorCount++;
+            this.fireError(e, msg, this.errorCount);
+
+            this.bufferMsg(msg);
+        }
+    }
+
+    protected flushBuffer() {
+        if (this.buffer.length === 0) {
+            return
+        }
+
+        const buffer = [...this.buffer];
+        this.buffer = [];
+        for (const msg of buffer) {
+            this.write(msg);
+        }
+        this.logger.info(`flushed buffer (${this.buffer.length})`)
+    }
+
+    protected bufferMsg(msg: any) {
+        this.buffer.push(msg);
+        this.logger.info(`buffered message (${this.buffer.length})`);
+    }
+}
+
+
+/**
+ * This takes vscode-ws-jsonrpc/lib/socket/reader/WebSocketMessageReader and removes the "onClose -> fireClose" connection
+ */
+class NonClosingWebSocketMessageReader extends AbstractMessageReader {
+    protected readonly socket: IWebSocket;
+    protected readonly events: any[] = [];
+    protected state: 'initial' | 'listening' | 'closed' = 'initial';
+    protected callback: (message: any) => void = () => {};
+
+    constructor(socket: IWebSocket) {
+        super();
+        this.socket = socket;
+        this.socket.onMessage(message => this.readMessage(message));
+        this.socket.onError(error => this.fireError(error));
+        this.socket.onClose((code, reason) => {
+            if (code !== 1000) {
+                const error = {
+                    name: '' + code,
+                    message: `Error during socket reconnect: code = ${code}, reason = ${reason}`
+                };
+                this.fireError(error);
+            }
+            // this.fireClose();        // <-- reason for this class to be copied over
+        });
+    }
+    listen(callback: (message: any) => void) {
+        if (this.state === 'initial') {
+            this.state = 'listening';
+            this.callback = callback;
+            while (this.events.length !== 0) {
+                const event = this.events.pop();
+                if (event.message) {
+                    this.readMessage(event.message);
+                }
+                else if (event.error) {
+                    this.fireError(event.error);
+                }
+                else {
+                    this.fireClose();
+                }
+            }
+        }
+    }
+    readMessage(message: any) {
+        if (this.state === 'initial') {
+            this.events.splice(0, 0, { message });
+        }
+        else if (this.state === 'listening') {
+            const data = JSON.parse(message);
+            this.callback(data);
+        }
+    }
+    fireError(error: any) {
+        if (this.state === 'initial') {
+            this.events.splice(0, 0, { error });
+        }
+        else if (this.state === 'listening') {
+            super.fireError(error);
+        }
+    }
+    fireClose() {
+        if (this.state === 'initial') {
+            this.events.splice(0, 0, {});
+        }
+        else if (this.state === 'listening') {
+            super.fireClose();
+        }
+        this.state = 'closed';
+    }
+}

From 2b441d990887b618ef6b360f2080bc2d029ff8ae Mon Sep 17 00:00:00 2001
From: Gero Posmyk-Leinemann <gero@gitpod.io>
Date: Mon, 4 Oct 2021 08:06:36 +0000
Subject: [PATCH 2/2] [protocol] Trigger onClose/onOpen whenever the underlying
 websocket OPEN/CLOSEs.

---
 .../src/messaging/browser/connection.ts       | 28 +++++++++++++------
 .../gitpod-protocol/src/messaging/handler.ts  | 12 ++++++++
 .../src/messaging/proxy-factory.ts            | 14 +++++++---
 3 files changed, 41 insertions(+), 13 deletions(-)

diff --git a/components/gitpod-protocol/src/messaging/browser/connection.ts b/components/gitpod-protocol/src/messaging/browser/connection.ts
index 3b495b6123b2a2..bd2ef6f781c8bf 100644
--- a/components/gitpod-protocol/src/messaging/browser/connection.ts
+++ b/components/gitpod-protocol/src/messaging/browser/connection.ts
@@ -6,11 +6,11 @@
  */
 
 import { Logger, ConsoleLogger, toSocket, IWebSocket } from "vscode-ws-jsonrpc";
-import { MessageConnection, createMessageConnection } from "vscode-jsonrpc";
+import { createMessageConnection } from "vscode-jsonrpc";
 import { AbstractMessageWriter } from "vscode-jsonrpc/lib/messageWriter";
 import { AbstractMessageReader } from "vscode-jsonrpc/lib/messageReader";
 import { JsonRpcProxyFactory, JsonRpcProxy } from "../proxy-factory";
-import { ConnectionHandler } from "../handler";
+import { ConnectionEventHandler, ConnectionHandler } from "../handler";
 import ReconnectingWebSocket, { Event } from 'reconnecting-websocket';
 
 export interface WebSocketOptions {
@@ -32,7 +32,10 @@ export class WebSocketConnectionProvider {
         const startListening = (path: string) => {
             const socket = this.listen({
                 path,
-                onConnection: c => factory.listen(c)
+                onConnection: c => factory.listen(c),
+            }, {
+                onTransportDidClose: () => factory.fireConnectionClosed(),
+                onTransportDidOpen: () => factory.fireConnectionOpened(),
             },
                 options
             );
@@ -52,7 +55,7 @@ export class WebSocketConnectionProvider {
     /**
      * Install a connection handler for the given path.
      */
-    listen(handler: ConnectionHandler, options?: WebSocketOptions): WebSocket {
+    listen(handler: ConnectionHandler, eventHandler: ConnectionEventHandler, options?: WebSocketOptions): WebSocket {
         const url = handler.path;
         const webSocket = this.createWebSocket(url);
 
@@ -69,7 +72,8 @@ export class WebSocketConnectionProvider {
         }
         doListen(
             webSocket as any as ReconnectingWebSocket,
-            connection => handler.onConnection(connection),
+            handler,
+            eventHandler,
             logger,
         );
         return webSocket;
@@ -100,16 +104,22 @@ export class WebSocketConnectionProvider {
 //  - webSocket.onopen: making sure it's only ever called once so we're re-using MessageConnection
 //  - WebSocketMessageWriter: buffer and re-try messages instead of throwing an error immidiately
 //  - WebSocketMessageReader: don't close MessageConnection on 'socket.onclose'
-function doListen(resocket: ReconnectingWebSocket, onConnection: (connection: MessageConnection) => void, logger: Logger) {
+function doListen(resocket: ReconnectingWebSocket, handler: ConnectionHandler, eventHandler: ConnectionEventHandler, logger: Logger) {
+    resocket.addEventListener("close", () => eventHandler.onTransportDidClose());
+
     let alreadyOpened = false;
     resocket.onopen = () => {
+        // trigerr "open" every time we re-open the underlying websocket
+        eventHandler.onTransportDidOpen();
+
+        // make sure we're only ever creating one MessageConnection, irregardless of how many times we have to re-open the underlying (reconnecting) websocket
         if (alreadyOpened) {
             return;
         }
         alreadyOpened = true;
 
         const connection = createWebSocketConnection(resocket, logger);
-        onConnection(connection);
+        handler.onConnection(connection);
     };
 }
 
@@ -167,12 +177,12 @@ class BufferingWebSocketMessageWriter extends AbstractMessageWriter {
         for (const msg of buffer) {
             this.write(msg);
         }
-        this.logger.info(`flushed buffer (${this.buffer.length})`)
+        //this.logger.info(`flushed buffer (${this.buffer.length})`)
     }
 
     protected bufferMsg(msg: any) {
         this.buffer.push(msg);
-        this.logger.info(`buffered message (${this.buffer.length})`);
+        //this.logger.info(`buffered message (${this.buffer.length})`);
     }
 }
 
diff --git a/components/gitpod-protocol/src/messaging/handler.ts b/components/gitpod-protocol/src/messaging/handler.ts
index 3d5b99a4a5434c..e17b91de26e940 100644
--- a/components/gitpod-protocol/src/messaging/handler.ts
+++ b/components/gitpod-protocol/src/messaging/handler.ts
@@ -13,3 +13,15 @@ export interface ConnectionHandler {
     readonly path: string;
     onConnection(connection: MessageConnection, session?: object): void;
 }
+
+export interface ConnectionEventHandler {
+    /**
+     * Called when the transport underpinning the connection got closed
+     */
+     onTransportDidClose(): void;
+
+     /**
+      * Called when the transport underpinning the connection is (re-)opened
+      */
+     onTransportDidOpen(): void;
+}
diff --git a/components/gitpod-protocol/src/messaging/proxy-factory.ts b/components/gitpod-protocol/src/messaging/proxy-factory.ts
index c50666f6f65c39..aae18d12ccb3cd 100644
--- a/components/gitpod-protocol/src/messaging/proxy-factory.ts
+++ b/components/gitpod-protocol/src/messaging/proxy-factory.ts
@@ -105,13 +105,19 @@ export class JsonRpcProxyFactory<T extends object> implements ProxyHandler<T> {
             this.connectionPromiseResolve = resolve
         );
         this.connectionPromise.then(connection => {
-            connection.onClose(() =>
-                this.onDidCloseConnectionEmitter.fire(undefined)
-            );
-            this.onDidOpenConnectionEmitter.fire(undefined);
+            connection.onClose(() => this.fireConnectionClosed());
+            this.fireConnectionOpened();
         });
     }
 
+    fireConnectionClosed() {
+        this.onDidCloseConnectionEmitter.fire(undefined)
+    }
+
+    fireConnectionOpened() {
+        this.onDidOpenConnectionEmitter.fire(undefined);
+    }
+
     /**
      * Connect a MessageConnection to the factory.
      *