feat(client): realtime client (#29)

* feat(client): realtime client

* chore: alpha release

* fix: remove os requirement

* fix: check if process is defined

* fix: ws connection key

* fix: outgoing request throttling logic

* chore: 0.6.0.alpha.4 release

* chore: update realtime demo

* chore: update preloaded scene

* feat: auth wip

* fix: compilation issue

* feat: basic auth impl missing error handling

* chore: remove console.log prepare 0.6.0

* fix: remove unsused code
This commit is contained in:
Daniel Rochetti 2023-11-27 09:43:37 -08:00 committed by GitHub
parent c8ff2af189
commit 145159a12f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 626 additions and 20 deletions

View File

@ -1,2 +0,0 @@
.page {
}

View File

@ -0,0 +1,63 @@
'use client';
/* eslint-disable @next/next/no-img-element */
import * as fal from '@fal-ai/serverless-client';
import { DrawingCanvas } from '../../components/drawing';
import { useState } from 'react';
fal.config({
proxyUrl: '/api/fal/proxy',
});
const PROMPT = 'a moon in a starry night sky';
export default function RealtimePage() {
const [image, setImage] = useState<string | null>(null);
const { send } = fal.realtime.connect('110602490-shared-lcm-test', {
connectionKey: 'realtime-demo',
onResult(result) {
if (result.images && result.images[0]) {
setImage(result.images[0].url);
}
},
});
return (
<div className="min-h-screen bg-neutral-900 text-neutral-50">
<main className="container flex flex-col items-center justify-center w-full flex-1 py-10 space-y-8">
<h1 className="text-4xl font-mono mb-8 text-neutral-50">
fal<code className="font-light text-pink-600">realtime</code>
</h1>
<div className="prose text-neutral-400">
<blockquote className="italic text-xl">{PROMPT}</blockquote>
</div>
<div className="flex flex-col md:flex-row space-x-4">
<div className="flex-1">
<DrawingCanvas
onCanvasChange={({ imageData }) => {
send({
prompt: PROMPT,
image_url: imageData,
sync_mode: true,
seed: 6252023,
});
}}
/>
</div>
<div className="flex-1">
<div className="w-[512px] h-[512px]">
{image && (
<img
src={image}
alt={`${PROMPT} generated by fal.ai`}
className="object-contain w-full h-full"
/>
)}
</div>
</div>
</div>
</main>
</div>
);
}

View File

@ -113,7 +113,7 @@ export default function WhisperDemo() {
const result = await fal.subscribe('110602490-whisper', {
input: {
file_name: 'recording.wav',
url: audioFile,
audio_url: audioFile,
},
pollInterval: 1000,
logs: true,
@ -128,7 +128,6 @@ export default function WhisperDemo() {
},
});
setResult(result);
console.log(result);
} catch (error: any) {
setError(error);
} finally {

View File

@ -0,0 +1,116 @@
import { type Excalidraw } from '@excalidraw/excalidraw';
import { ExcalidrawElement } from '@excalidraw/excalidraw/types/element/types';
import {
AppState,
ExcalidrawImperativeAPI,
} from '@excalidraw/excalidraw/types/types';
import { useCallback, useEffect, useState } from 'react';
import initialDrawing from './drawingState.json';
export type CanvasChangeEvent = {
elements: readonly ExcalidrawElement[];
appState: AppState;
imageData: string;
};
export type DrawingCanvasProps = {
onCanvasChange: (event: CanvasChangeEvent) => void;
};
async function blobToBase64(blob: Blob): Promise<string> {
const reader = new FileReader();
reader.readAsDataURL(blob);
return new Promise<string>((resolve) => {
reader.onloadend = () => {
resolve(reader.result?.toString() || '');
};
});
}
export function DrawingCanvas({ onCanvasChange }: DrawingCanvasProps) {
const [ExcalidrawComponent, setExcalidrawComponent] = useState<
typeof Excalidraw | null
>(null);
const [excalidrawAPI, setExcalidrawAPI] =
useState<ExcalidrawImperativeAPI | null>(null);
const [sceneData, setSceneData] = useState<any>(null);
useEffect(() => {
import('@excalidraw/excalidraw').then((comp) =>
setExcalidrawComponent(comp.Excalidraw)
);
const onResize = () => {
if (excalidrawAPI) {
excalidrawAPI.refresh();
}
};
window.addEventListener('resize', onResize);
return () => {
window.removeEventListener('resize', onResize);
};
}, []);
const handleCanvasChanges = useCallback(
async (elements: readonly ExcalidrawElement[], appState: AppState) => {
if (!excalidrawAPI || !elements || !elements.length) {
return;
}
const { exportToBlob, convertToExcalidrawElements, serializeAsJSON } =
await import('@excalidraw/excalidraw');
const [boundingBoxElement] = convertToExcalidrawElements([
{
type: 'rectangle',
x: 0,
y: 0,
width: 512,
height: 512,
fillStyle: 'solid',
backgroundColor: 'cyan',
},
]);
const newSceneData = serializeAsJSON(
elements,
appState,
excalidrawAPI.getFiles(),
'local'
);
if (newSceneData !== sceneData) {
setSceneData(newSceneData);
const blob = await exportToBlob({
elements: [boundingBoxElement, ...elements],
appState: {
...appState,
frameRendering: {
...(appState.frameRendering || {}),
clip: false,
},
},
files: excalidrawAPI.getFiles(),
mimeType: 'image/webp',
quality: 0.5,
exportPadding: 0,
getDimensions: () => {
return { width: 512, height: 512 };
},
});
const imageData = await blobToBase64(blob);
onCanvasChange({ elements, appState, imageData });
}
},
[excalidrawAPI, onCanvasChange, sceneData]
);
return (
<div style={{ height: '560px', width: '560px' }}>
{ExcalidrawComponent && (
<ExcalidrawComponent
excalidrawAPI={(api) => setExcalidrawAPI(api)}
initialData={{ elements: initialDrawing as ExcalidrawElement[] }}
onChange={handleCanvasChanges}
/>
)}
</div>
);
}

View File

@ -0,0 +1,58 @@
[
{
"type": "rectangle",
"version": 240,
"versionNonce": 21728473,
"isDeleted": false,
"id": "EnLu91BTRnzWtj7m-l4Id",
"fillStyle": "solid",
"strokeWidth": 2,
"strokeStyle": "solid",
"roughness": 1,
"opacity": 100,
"angle": 0,
"x": -3.3853912353515625,
"y": -2.3741912841796875,
"strokeColor": "#1971c2",
"backgroundColor": "#343a40",
"width": 568.016487121582,
"height": 582.1398010253906,
"seed": 295965933,
"groupIds": [],
"frameId": null,
"roundness": null,
"boundElements": [],
"updated": 1700904828477,
"link": null,
"locked": false
},
{
"type": "ellipse",
"version": 3545,
"versionNonce": 647409943,
"isDeleted": false,
"id": "F6oN3k42RqfCqlzJLGXXS",
"fillStyle": "solid",
"strokeWidth": 1,
"strokeStyle": "solid",
"roughness": 1,
"opacity": 100,
"angle": 0,
"x": 345.65307998657227,
"y": 81.02682495117188,
"strokeColor": "#f08c00",
"backgroundColor": "#ffec99",
"width": 124.31249999999997,
"height": 113.591796875,
"seed": 23374002,
"groupIds": [],
"frameId": null,
"roundness": {
"type": 2
},
"boundElements": [],
"updated": 1700904844024,
"link": null,
"locked": false
}
]

View File

@ -1,7 +1,7 @@
{
"name": "@fal-ai/serverless-client",
"description": "The fal serverless JS/TS client",
"version": "0.5.4",
"version": "0.6.0",
"license": "MIT",
"repository": {
"type": "git",

View File

@ -1,4 +1,8 @@
import type { RequestMiddleware } from './middleware';
import {
withProxy,
type RequestMiddleware,
withMiddleware,
} from './middleware';
import type { ResponseHandler } from './response';
import { defaultResponseHandler } from './response';
@ -7,6 +11,7 @@ export type CredentialsResolver = () => string | undefined;
export type Config = {
credentials?: undefined | string | CredentialsResolver;
host?: string;
proxyUrl?: string;
requestMiddleware?: RequestMiddleware;
responseHandler?: ResponseHandler<any>;
};
@ -21,7 +26,7 @@ export type RequiredConfig = Required<Config>;
*/
function hasEnvVariables(): boolean {
return (
process &&
typeof process !== 'undefined' &&
process.env &&
(typeof process.env.FAL_KEY !== 'undefined' ||
(typeof process.env.FAL_KEY_ID !== 'undefined' &&
@ -49,7 +54,7 @@ export const credentialsFromEnv: CredentialsResolver = () => {
*/
function getDefaultHost(): string {
const host = 'gateway.alpha.fal.ai';
if (process && process.env) {
if (typeof process !== 'undefined' && process.env) {
return process.env.FAL_HOST || host;
}
return host;
@ -71,6 +76,15 @@ let configuration: RequiredConfig;
*/
export function config(config: Config) {
configuration = { ...DEFAULT_CONFIG, ...config } as RequiredConfig;
if (config.proxyUrl) {
configuration = {
...configuration,
requestMiddleware: withMiddleware(
configuration.requestMiddleware,
withProxy({ targetUrl: config.proxyUrl })
),
};
}
}
/**
@ -85,3 +99,11 @@ export function getConfig(): RequiredConfig {
}
return configuration;
}
/**
* @returns the URL of the fal serverless rest api endpoint.
*/
export function getRestApiUrl(): string {
const { host } = getConfig();
return host.replace('gateway', 'rest');
}

View File

@ -1,10 +1,11 @@
export { config, getConfig } from './config';
export { storageImpl as storage } from './storage';
export { queue, run, subscribe } from './function';
export { withMiddleware, withProxy } from './middleware';
export type { RequestMiddleware } from './middleware';
export { realtimeImpl as realtime } from './realtime';
export { ApiError, ValidationError } from './response';
export type { ResponseHandler } from './response';
export { storageImpl as storage } from './storage';
export type {
QueueStatus,
ValidationErrorInfo,

295
libs/client/src/realtime.ts Normal file
View File

@ -0,0 +1,295 @@
import { getConfig, getRestApiUrl } from './config';
import { dispatchRequest } from './request';
import { ApiError } from './response';
import { isBrowser } from './runtime';
import { isReact, throttle } from './utils';
/**
* A connection object that allows you to `send` request payloads to a
* realtime endpoint.
*/
export interface RealtimeConnection<Input> {
send(input: Input): void;
close(): void;
}
type ResultWithRequestId = {
request_id: string;
};
/**
* Options for connecting to the realtime endpoint.
*/
export interface RealtimeConnectionHandler<Output> {
/**
* The connection key. This is used to reuse the same connection
* across multiple calls to `connect`. This is particularly useful in
* contexts where the connection is established as part of a component
* lifecycle (e.g. React) and the component is re-rendered multiple times.
*/
connectionKey?: string;
/**
* If `true`, the connection will only be established on the client side.
* This is useful for frameworks that reuse code for both server-side
* rendering and client-side rendering (e.g. Next.js).
*
* This is set to `true` by default when running on React in the server.
* Otherwise, it is set to `false`.
*
* Note that more SSR frameworks might be automatically detected
* in the future. In the meantime, you can set this to `true` when needed.
*/
clientOnly?: boolean;
/**
* The throtle duration in milliseconds. This is used to throtle the
* calls to the `send` function. Realtime apps usually react to user
* input, which can be very frequesnt (e.g. fast typing or mouse/drag movements).
*
* The default value is `64` milliseconds.
*/
throttleInterval?: number;
/**
* Callback function that is called when a result is received.
* @param result - The result of the request.
*/
onResult(result: Output & ResultWithRequestId): void;
/**
* Callback function that is called when an error occurs.
* @param error - The error that occurred.
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
onError?(error: ApiError<any>): void;
}
export interface RealtimeClient {
/**
* Connect to the realtime endpoint. The default implementation uses
* WebSockets to connect to fal function endpoints that support WSS.
*
* @param app the app alias or identifier.
* @param handler the connection handler.
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
connect<Input = any, Output = any>(
app: string,
handler: RealtimeConnectionHandler<Output>
): RealtimeConnection<Input>;
}
function buildRealtimeUrl(app: string): string {
const { host } = getConfig();
return `wss://${app}.${host}/ws`;
}
const TOKEN_EXPIRATION_SECONDS = 120;
/**
* Get a token to connect to the realtime endpoint.
*/
async function getToken(app: string): Promise<string> {
const [_, ...appAlias] = app.split('-');
const token: string | object = await dispatchRequest<any, string>(
'POST',
`https://${getRestApiUrl()}/tokens/`,
{
allowed_apps: [appAlias.join('-')],
token_expiration: 120,
}
);
// keep this in case the response was wrapped (old versions of the proxy do that)
// should be safe to remove in the future
if (typeof token !== 'string' && token['detail']) {
return token['detail'];
}
return token;
}
/**
* See https://www.rfc-editor.org/rfc/rfc6455.html#section-7.4.1
*/
const WebSocketErrorCodes = {
NORMAL_CLOSURE: 1000,
GOING_AWAY: 1001,
};
const connectionManager = (() => {
const connections = new Map<string, WebSocket>();
const tokens = new Map<string, string>();
return {
token(app: string) {
return tokens.get(app);
},
expireToken(app: string) {
tokens.delete(app);
},
async refreshToken(app: string) {
const token = await getToken(app);
tokens.set(app, token);
// Very simple token expiration mechanism.
// We should make it more robust in the future.
setTimeout(() => {
tokens.delete(app);
}, TOKEN_EXPIRATION_SECONDS * 0.9 * 1000);
return token;
},
has(connectionKey: string): boolean {
return connections.has(connectionKey);
},
get(connectionKey: string): WebSocket | undefined {
return connections.get(connectionKey);
},
set(connectionKey: string, ws: WebSocket) {
connections.set(connectionKey, ws);
},
remove(connectionKey: string) {
connections.delete(connectionKey);
},
};
})();
async function getConnection(app: string, key: string): Promise<WebSocket> {
const url = buildRealtimeUrl(app);
if (connectionManager.has(key)) {
return connectionManager.get(key) as WebSocket;
}
let token = connectionManager.token(app);
if (!token) {
token = await connectionManager.refreshToken(app);
}
const ws = new WebSocket(`${url}?fal_jwt_token=${token}`);
connectionManager.set(key, ws);
return ws;
}
const noop = () => {
/* No-op */
};
/**
* A no-op connection that does not send any message.
* Useful on the frameworks that reuse code for both ssr and csr (e.g. Next)
* so the call when doing ssr has no side-effects.
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const NoOpConnection: RealtimeConnection<any> = {
send: noop,
close: noop,
};
/**
* The default implementation of the realtime client.
*/
export const realtimeImpl: RealtimeClient = {
connect<Input, Output>(
app: string,
handler: RealtimeConnectionHandler<Output>
): RealtimeConnection<Input> {
const {
// if running on React in the server, set clientOnly to true by default
clientOnly = isReact() && !isBrowser(),
connectionKey = crypto.randomUUID(),
throttleInterval = 64,
onError = noop,
onResult,
} = handler;
if (clientOnly && typeof window === 'undefined') {
return NoOpConnection;
}
const enqueueMessages: Input[] = [];
let reconnecting = false;
let ws: WebSocket | null = null;
const _send = (input: Input) => {
const requestId = crypto.randomUUID();
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(
JSON.stringify({
request_id: requestId,
...input,
})
);
} else {
enqueueMessages.push(input);
if (!reconnecting) {
reconnecting = true;
reconnect();
}
}
};
const send =
throttleInterval > 0 ? throttle(_send, throttleInterval) : _send;
const reconnect = () => {
if (ws && ws.readyState === WebSocket.OPEN) {
return;
}
getConnection(app, connectionKey)
.then((connection) => {
ws = connection;
ws.onopen = () => {
reconnecting = false;
if (enqueueMessages.length > 0) {
enqueueMessages.forEach((input) => send(input));
enqueueMessages.length = 0;
}
};
ws.onclose = (event) => {
connectionManager.remove(connectionKey);
if (event.code !== WebSocketErrorCodes.NORMAL_CLOSURE) {
onError(
new ApiError({
message: `Error closing the connection: ${event.reason}`,
status: event.code,
})
);
}
ws = null;
};
ws.onerror = (event) => {
// TODO handle errors once server specify them
// if error 401, refresh token and retry
// if error 403, refresh token and retry
connectionManager.expireToken(app);
connectionManager.remove(connectionKey);
ws = null;
// if any of those are failed again, call onError
onError(new ApiError({ message: 'Unknown error', status: 500 }));
};
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
// Drop messages that are not related to the actual result.
// In the future, we might want to handle other types of messages.
// TODO: specify the fal ws protocol format
if (data.status !== 'error' && data.type !== 'x-fal-message') {
onResult(data);
}
};
})
.catch((error) => {
onError(
new ApiError({ message: 'Error opening connection', status: 500 })
);
});
};
return {
send,
close() {
if (ws && ws.readyState === WebSocket.CLOSED) {
ws.close(
WebSocketErrorCodes.GOING_AWAY,
'Client manually closed the connection.'
);
}
},
};
},
};

View File

@ -56,7 +56,7 @@ export async function defaultResponseHandler<Output>(
response: Response
): Promise<Output> {
const { status, statusText } = response;
const contentType = response.headers.get('Content-Type') ?? "";
const contentType = response.headers.get('Content-Type') ?? '';
if (!response.ok) {
if (contentType.includes('application/json')) {
const body = await response.json();

View File

@ -13,9 +13,6 @@ export function getUserAgent(): string {
return memoizedUserAgent;
}
const packageInfo = require('../package.json');
const os = require('os');
memoizedUserAgent = `${packageInfo.name}/${
packageInfo.version
} ${os.platform()}-${os.arch()} ${process.release.name}-${process.version}`;
memoizedUserAgent = `${packageInfo.name}/${packageInfo.version}`;
return memoizedUserAgent;
}

View File

@ -1,4 +1,4 @@
import { getConfig } from './config';
import { getConfig, getRestApiUrl } from './config';
import { dispatchRequest } from './request';
/**
@ -50,11 +50,6 @@ type InitiateUploadData = {
content_type: string | null;
};
function getRestApiUrl(): string {
const { host } = getConfig();
return host.replace('gateway', 'rest');
}
/**
* Get the file extension from the content type. This is used to generate
* a file name if the file name is not provided.

View File

@ -15,3 +15,52 @@ export function isValidUrl(url: string) {
return false;
}
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function throttle<T extends (...args: any[]) => any>(
func: T,
limit: number
): (...funcArgs: Parameters<T>) => ReturnType<T> | void {
let lastFunc: NodeJS.Timeout | null;
let lastRan: number;
return (...args: Parameters<T>): ReturnType<T> | void => {
if (!lastRan) {
func(...args);
lastRan = Date.now();
} else {
if (lastFunc) {
clearTimeout(lastFunc);
}
lastFunc = setTimeout(() => {
if (Date.now() - lastRan >= limit) {
func(...args);
lastRan = Date.now();
}
}, limit - (Date.now() - lastRan));
}
};
}
let isRunningInReact: boolean | undefined;
/**
* Not really the most optimal way to detect if we're running in React,
* but the idea here is that we can support multiple rendering engines
* (starting with React), with all their peculiarities, without having
* to add a dependency or creating custom integrations (e.g. custom hooks).
*
* Yes, a bit of magic to make things works out-of-the-box.
* @returns `true` if running in React, `false` otherwise.
*/
export function isReact() {
if (isRunningInReact === undefined) {
const stack = new Error().stack;
isRunningInReact =
!!stack &&
(stack.includes('node_modules/react-dom/') ||
stack.includes('node_modules/next/'));
}
return isRunningInReact;
}

View File

@ -3,6 +3,7 @@
"compilerOptions": {
"module": "commonjs",
"outDir": "../../dist/out-tsc",
"inlineSources": true,
"declaration": true,
"allowJs": true,
"checkJs": false,

11
package-lock.json generated
View File

@ -34,6 +34,7 @@
"devDependencies": {
"@commitlint/cli": "^17.0.0",
"@commitlint/config-conventional": "^17.0.0",
"@excalidraw/excalidraw": "^0.17.0",
"@nrwl/express": "16.10.0",
"@nx/cypress": "16.10.0",
"@nx/eslint-plugin": "16.10.0",
@ -3073,6 +3074,16 @@
"node": "^12.22.0 || ^14.17.0 || >=16.0.0"
}
},
"node_modules/@excalidraw/excalidraw": {
"version": "0.17.0",
"resolved": "https://registry.npmjs.org/@excalidraw/excalidraw/-/excalidraw-0.17.0.tgz",
"integrity": "sha512-NzP22v5xMqxYW27ZtTHhiGFe7kE8NeBk45aoeM/mDSkXiOXPDH+PcvwzHRN/Ei+Vj/0sTPHxejn8bZyRWKGjXg==",
"dev": true,
"peerDependencies": {
"react": "^17.0.2 || ^18.2.0",
"react-dom": "^17.0.2 || ^18.2.0"
}
},
"node_modules/@humanwhocodes/config-array": {
"version": "0.11.13",
"resolved": "https://registry.npmjs.org/@humanwhocodes/config-array/-/config-array-0.11.13.tgz",

View File

@ -50,6 +50,7 @@
"devDependencies": {
"@commitlint/cli": "^17.0.0",
"@commitlint/config-conventional": "^17.0.0",
"@excalidraw/excalidraw": "^0.17.0",
"@nrwl/express": "16.10.0",
"@nx/cypress": "16.10.0",
"@nx/eslint-plugin": "16.10.0",