fix: reconnect token state (#31)

This commit is contained in:
Daniel Rochetti 2023-11-29 08:55:40 -08:00 committed by GitHub
parent 145159a12f
commit c020d97acd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 25 additions and 6 deletions

View File

@ -1,7 +1,7 @@
{
"name": "@fal-ai/serverless-client",
"description": "The fal serverless JS/TS client",
"version": "0.6.0",
"version": "0.6.1",
"license": "MIT",
"repository": {
"type": "git",

View File

@ -120,6 +120,7 @@ const WebSocketErrorCodes = {
const connectionManager = (() => {
const connections = new Map<string, WebSocket>();
const tokens = new Map<string, string>();
const isAuthInProgress = new Map<string, true>();
return {
token(app: string) {
@ -150,10 +151,23 @@ const connectionManager = (() => {
remove(connectionKey: string) {
connections.delete(connectionKey);
},
isAuthInProgress(app: string) {
return isAuthInProgress.has(app);
},
setAuthInProgress(app: string, inProgress: boolean) {
if (inProgress) {
isAuthInProgress.set(app, true);
} else {
isAuthInProgress.delete(app);
}
},
};
})();
async function getConnection(app: string, key: string): Promise<WebSocket> {
if (connectionManager.isAuthInProgress(app)) {
throw new Error('Authentication in progress');
}
const url = buildRealtimeUrl(app);
if (connectionManager.has(key)) {
@ -161,7 +175,9 @@ async function getConnection(app: string, key: string): Promise<WebSocket> {
}
let token = connectionManager.token(app);
if (!token) {
connectionManager.setAuthInProgress(app, true);
token = await connectionManager.refreshToken(app);
connectionManager.setAuthInProgress(app, false);
}
const ws = new WebSocket(`${url}?fal_jwt_token=${token}`);
connectionManager.set(key, ws);
@ -203,7 +219,7 @@ export const realtimeImpl: RealtimeClient = {
return NoOpConnection;
}
const enqueueMessages: Input[] = [];
let pendingMessage: Input | undefined = undefined;
let reconnecting = false;
let ws: WebSocket | null = null;
@ -217,7 +233,7 @@ export const realtimeImpl: RealtimeClient = {
})
);
} else {
enqueueMessages.push(input);
pendingMessage = input;
if (!reconnecting) {
reconnecting = true;
reconnect();
@ -231,14 +247,17 @@ export const realtimeImpl: RealtimeClient = {
if (ws && ws.readyState === WebSocket.OPEN) {
return;
}
if (connectionManager.isAuthInProgress(app)) {
return;
}
getConnection(app, connectionKey)
.then((connection) => {
ws = connection;
ws.onopen = () => {
reconnecting = false;
if (enqueueMessages.length > 0) {
enqueueMessages.forEach((input) => send(input));
enqueueMessages.length = 0;
if (pendingMessage) {
send(pendingMessage);
pendingMessage = undefined;
}
};
ws.onclose = (event) => {