Skip to content

Commit

Permalink
frontend: Add websocket multiplexer
Browse files Browse the repository at this point in the history
This adds a single websocket connection from frontend to backend and
sends lists of messages to the backend. With the help of messages
backend creates multiple websocket connection to k8s API and returns
back the data.

This solves the issue for limiting of websocket connection in frontend
in case of multi cluster setup.

Signed-off-by: Kautilya Tripathi <[email protected]>
  • Loading branch information
knrt10 committed Jan 6, 2025
1 parent fe2966a commit 85dd1ce
Show file tree
Hide file tree
Showing 5 changed files with 430 additions and 219 deletions.
3 changes: 3 additions & 0 deletions backend/cmd/headlamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -1609,6 +1609,9 @@ func (c *HeadlampConfig) addClusterSetupRoute(r *mux.Router) {

// Rename a cluster
r.HandleFunc("/cluster/{name}", c.renameCluster).Methods("PUT")

// Websocket connections
r.HandleFunc("/wsMultiplexer", c.multiplexer.HandleClientWebSocket)
}

/*
Expand Down
3 changes: 2 additions & 1 deletion backend/cmd/multiplexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ type Message struct {
UserID string `json:"userId"`
// Data contains the message payload.
Data []byte `json:"data,omitempty"`
// Type is the type of the message.
Type string `json:"type"`
}

// Multiplexer manages multiple WebSocket connections.
Expand Down Expand Up @@ -315,7 +317,6 @@ func (m *Multiplexer) reconnect(conn *Connection) (*Connection, error) {
return newConn, nil
}

// HandleClientWebSocket handles incoming WebSocket connections from clients.
// HandleClientWebSocket handles incoming WebSocket connections from clients.
func (m *Multiplexer) HandleClientWebSocket(w http.ResponseWriter, r *http.Request) {
clientConn, err := m.upgrader.Upgrade(w, r, nil)
Expand Down
3 changes: 1 addition & 2 deletions frontend/src/lib/k8s/api/v2/hooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { getCluster } from '../../../cluster';
import { ApiError, QueryParameters } from '../../apiProxy';
import { KubeObject, KubeObjectInterface } from '../../KubeObject';
import { clusterFetch } from './fetch';
import { KubeListUpdateEvent } from './KubeList';
import { KubeObjectEndpoint } from './KubeObjectEndpoint';
import { makeUrl } from './makeUrl';
import { useWebSocket } from './webSocket';
Expand Down Expand Up @@ -133,7 +132,7 @@ export function useKubeObject<K extends KubeObject>({

const data: Instance | null = query.error ? null : query.data ?? null;

useWebSocket<KubeListUpdateEvent<Instance>>({
useWebSocket<Instance>({
url: () =>
makeUrl([KubeObjectEndpoint.toUrl(endpoint!)], {
...cleanedUpQueryParams,
Expand Down
102 changes: 71 additions & 31 deletions frontend/src/lib/k8s/api/v2/useKubeObjectList.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { QueryObserverOptions, useQueries, useQueryClient } from '@tanstack/react-query';
import { useMemo, useState } from 'react';
import { useEffect, useMemo, useRef, useState } from 'react';
import { KubeObject, KubeObjectClass } from '../../KubeObject';
import { ApiError } from '../v1/clusterRequests';
import { QueryParameters } from '../v1/queryParameters';
import { clusterFetch } from './fetch';
import { QueryListResponse, useEndpoints } from './hooks';
import { KubeList, KubeListUpdateEvent } from './KubeList';
import { KubeList } from './KubeList';
import { KubeObjectEndpoint } from './KubeObjectEndpoint';
import { makeUrl } from './makeUrl';
import { useWebSockets } from './webSocket';
import { BASE_WS_URL, WebSocketManager } from './webSocket';

/**
* Object representing a List of Kube object
Expand Down Expand Up @@ -117,43 +117,83 @@ export function useWatchKubeObjectLists<K extends KubeObject>({
lists: Array<{ cluster: string; namespace?: string; resourceVersion: string }>;
}) {
const client = useQueryClient();
const latestResourceVersions = useRef<Record<string, string>>({});

// Create URLs for all lists
const connections = useMemo(() => {
if (!endpoint) return [];

return lists.map(({ cluster, namespace, resourceVersion }) => {
const url = makeUrl([KubeObjectEndpoint.toUrl(endpoint!, namespace)], {
...queryParams,
watch: 1,
resourceVersion,
});
return lists.map(list => {
const key = `${list.cluster}:${list.namespace || ''}`;
// Only update resourceVersion if it's newer
if (
!latestResourceVersions.current[key] ||
parseInt(list.resourceVersion) > parseInt(latestResourceVersions.current[key])
) {
latestResourceVersions.current[key] = list.resourceVersion;
}

return {
cluster,
url,
onMessage(update: KubeListUpdateEvent<K>) {
const key = kubeObjectListQuery<K>(
kubeObjectClass,
endpoint,
namespace,
cluster,
queryParams ?? {}
).queryKey;
client.setQueryData(key, (oldResponse: ListResponse<any> | undefined | null) => {
if (!oldResponse) return oldResponse;

const newList = KubeList.applyUpdate(oldResponse.list, update, kubeObjectClass);
return { ...oldResponse, list: newList };
});
},
url: makeUrl([KubeObjectEndpoint.toUrl(endpoint, list.namespace)], {
...queryParams,
watch: 1,
resourceVersion: latestResourceVersions.current[key],
}),
cluster: list.cluster,
namespace: list.namespace,
};
});
}, [lists, kubeObjectClass, endpoint]);
}, [endpoint, lists, queryParams]);

useWebSockets<KubeListUpdateEvent<K>>({
enabled: !!endpoint,
connections,
});
useEffect(() => {
if (!endpoint || connections.length === 0) return;

const cleanups: (() => void)[] = [];

connections.forEach(({ url, cluster, namespace }) => {
const parsedUrl = new URL(url, BASE_WS_URL);
const key = `${cluster}:${namespace || ''}`;

WebSocketManager.subscribe(cluster, parsedUrl.pathname, parsedUrl.search.slice(1), update => {
if (!update || typeof update !== 'object') return;

// Update latest resourceVersion
if (update.object?.metadata?.resourceVersion) {
latestResourceVersions.current[key] = update.object.metadata.resourceVersion;
}

const queryKey = kubeObjectListQuery<K>(
kubeObjectClass,
endpoint,
namespace,
cluster,
queryParams ?? {}
).queryKey;

client.setQueryData(queryKey, (oldResponse: ListResponse<any> | undefined | null) => {
if (!oldResponse) return oldResponse;
const newList = KubeList.applyUpdate(oldResponse.list, update, kubeObjectClass);
if (newList === oldResponse.list) return oldResponse;
return { ...oldResponse, list: newList };
});
}).then(
cleanup => cleanups.push(cleanup),
error => {
// Track retry count in the URL's searchParams
const retryCount = parseInt(parsedUrl.searchParams.get('retryCount') || '0');
if (retryCount < 3) {
// Only log and allow retry if under threshold
console.error('WebSocket subscription failed:', error);
parsedUrl.searchParams.set('retryCount', (retryCount + 1).toString());
}
}
);
});

return () => {
cleanups.forEach(cleanup => cleanup());
};
}, [connections, endpoint, client, kubeObjectClass, queryParams]);
}

/**
Expand Down
Loading

0 comments on commit 85dd1ce

Please sign in to comment.