Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate waitForConnectionToBeUsable() to react to pushed connection event state changes #882

Merged
merged 26 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
4da348a
Working, but there are tiny gaps when there are no observers. Need to…
jlrobins Jan 8, 2025
017c285
Transition waitForConnectionToBeUsable() to be based on received even…
jlrobins Jan 9, 2025
3d541b2
Reveryt changes
jlrobins Jan 9, 2025
bc84f64
Merge remote-tracking branch 'origin/main' into wensocket_connection_…
jlrobins Jan 9, 2025
4a4325e
Purge prior cached connection state before updating a connection spec…
jlrobins Jan 9, 2025
ea91055
Fix tests
jlrobins Jan 9, 2025
17e91c0
Fix remainder tests.
jlrobins Jan 10, 2025
819dd84
Trim unneeded fat
jlrobins Jan 10, 2025
e458832
Tests over connectionStatusUtils::isConnectionStable.
jlrobins Jan 10, 2025
468dc2a
Revert stray comment.
jlrobins Jan 10, 2025
03ab948
Revert stray comment.
jlrobins Jan 10, 2025
f6cf88b
Better comments, organization.
jlrobins Jan 10, 2025
4edf39a
filter -> predicate
jlrobins Jan 10, 2025
7fec984
Changeloggery.
jlrobins Jan 10, 2025
8d20006
Merge remote-tracking branch 'origin/main' into wensocket_connection_…
jlrobins Jan 10, 2025
ce56a60
Respell away from old name in comments / test descriptions.
jlrobins Jan 10, 2025
4474164
Merge remote-tracking branch 'origin/main' into wensocket_connection_…
jlrobins Jan 10, 2025
36939ca
Address PR comments.
jlrobins Jan 10, 2025
843fff8
Address PR comments.
jlrobins Jan 10, 2025
464ed0e
Get the connection loading spinny back.
jlrobins Jan 10, 2025
3398546
Fix pasto
jlrobins Jan 10, 2025
5064a25
Remove blank line
jlrobins Jan 10, 2025
78b61fe
Fix name
jlrobins Jan 10, 2025
647c6f5
Fix corner cases of direct connection loading states.
jlrobins Jan 10, 2025
5e191d7
Fix direct connection loading state.
jlrobins Jan 10, 2025
aa60036
Cache
jlrobins Jan 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .versions/ide-sidecar.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v0.134.1
v0.139.0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need fix for ccloud connection java.time.Instant serialization-over-websocket.

2 changes: 1 addition & 1 deletion src/authn/ccloudProvider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ describe("ConfluentCloudAuthProvider", () => {

// assume the connection is immediately usable for most tests
sandbox
.stub(connections, "waitForConnectionToBeUsable")
.stub(connections, "waitForConnectionToBeStable")
.resolves(TEST_AUTHENTICATED_CCLOUD_CONNECTION);

// don't handle the progress notification, openExternal, etc in this test suite
Expand Down
4 changes: 2 additions & 2 deletions src/authn/ccloudProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
createCCloudConnection,
deleteCCloudConnection,
getCCloudConnection,
waitForConnectionToBeUsable,
waitForConnectionToBeStable,
} from "../sidecar/connections";
import { getStorageManager } from "../storage";
import { SecretStorageKeys } from "../storage/constants";
Expand Down Expand Up @@ -152,7 +152,7 @@ export class ConfluentCloudAuthProvider implements vscode.AuthenticationProvider
throw e;
}

const authenticatedConnection = await waitForConnectionToBeUsable(CCLOUD_CONNECTION_ID);
const authenticatedConnection = await waitForConnectionToBeStable(CCLOUD_CONNECTION_ID);
if (!authenticatedConnection) {
throw new Error("CCloud connection failed to become usable after authentication.");
}
Expand Down
2 changes: 1 addition & 1 deletion src/directConnectManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ describe("DirectConnectionManager behavior", () => {
.stub(connections, "tryToUpdateConnection")
.resolves({} as any);
// assume the connection is immediately usable for most tests
sandbox.stub(connections, "waitForConnectionToBeUsable").resolves(TEST_DIRECT_CONNECTION);
sandbox.stub(connections, "waitForConnectionToBeStable").resolves(TEST_DIRECT_CONNECTION);
});

afterEach(() => {
Expand Down
9 changes: 6 additions & 3 deletions src/directConnectManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
tryToCreateConnection,
tryToDeleteConnection,
tryToUpdateConnection,
waitForConnectionToBeUsable,
waitForConnectionToBeStable,
} from "./sidecar/connections";
import { SecretStorageKeys } from "./storage/constants";
import { DirectResourceLoader } from "./storage/directResourceLoader";
Expand Down Expand Up @@ -47,6 +47,8 @@ export class DirectConnectionManager {
* cleaned up on extension deactivation. */
disposables: Disposable[] = [];

// Map of connection ID to {current from-websocket-event state, event emitter when said state changes}

// singleton instance to prevent multiple listeners and single source of connection management
private static instance: DirectConnectionManager | null = null;
private constructor() {
Expand Down Expand Up @@ -218,6 +220,7 @@ export class DirectConnectionManager {
): Promise<{ connection: Connection | null; errorMessage: string | null }> {
let connection: Connection | null = null;
let errorMessage: string | null = null;

try {
connection = update
? await tryToUpdateConnection(spec)
Expand All @@ -230,7 +233,7 @@ export class DirectConnectionManager {
title: `Waiting for "${connection.spec.name}" to be usable...`,
},
async () => {
await waitForConnectionToBeUsable(connectionId);
await waitForConnectionToBeStable(connectionId);
},
);
}
Expand Down Expand Up @@ -298,7 +301,7 @@ export class DirectConnectionManager {
// wait for all new connections to be created before checking their status
await Promise.all(newConnectionPromises);
// kick off background checks to ensure the new connections are usable
connectionIdsToCheck.map((id) => waitForConnectionToBeUsable(id));
connectionIdsToCheck.map((id) => waitForConnectionToBeStable(id));
logger.debug(
`created and checked ${connectionIdsToCheck.length} new connection(s), firing event`,
);
Expand Down
6 changes: 5 additions & 1 deletion src/extension.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ import { createConfigChangeListener } from "./preferences/listener";
import { updatePreferences } from "./preferences/updates";
import { registerProjectGenerationCommand } from "./scaffold";
import { getSidecarManager, sidecarOutputChannel } from "./sidecar";
import { getCCloudAuthSession } from "./sidecar/connections";
import { ConnectionStateWatcher, getCCloudAuthSession } from "./sidecar/connections";
import { WebsocketManager } from "./sidecar/websocketManager";
import { getStorageManager, StorageManager } from "./storage";
import { SecretStorageKeys } from "./storage/constants";
Expand Down Expand Up @@ -224,6 +224,10 @@ async function _activateExtension(
// reset the Docker credentials secret so `src/docker/configs.ts` can pull it fresh
getStorageManager().deleteSecret(SecretStorageKeys.DOCKER_CREDS_SECRET_KEY);

// Watch for sidecar pushing connection state changes over websocket.
// (side effect of causing the watcher to be created)
ConnectionStateWatcher.getInstance();

const directConnectionManager = DirectConnectionManager.getInstance();
context.subscriptions.push(...directConnectionManager.disposables);

Expand Down
13 changes: 12 additions & 1 deletion src/models/resource.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
import { ConnectionType } from "../clients/sidecar";
import { IconNames } from "../constants";
import { CCLOUD_CONNECTION_ID, IconNames, LOCAL_CONNECTION_ID } from "../constants";

/** A uniquely-branded string-type for a connection ID. */
export type ConnectionId = string & { readonly brand: unique symbol };

/** Given a ConnectionId, return the corresponding ConnectionType */
export function connectionTypeFromId(id: ConnectionId): ConnectionType {
// CCloud and local are defined by constants. Direct are then arbitrary uuid strings.
return id === CCLOUD_CONNECTION_ID
? ConnectionType.Ccloud
: id === LOCAL_CONNECTION_ID
? ConnectionType.Local
: ConnectionType.Direct;
}

// TODO: use other branded resource ID types here

export interface IResourceBase {
Expand Down
125 changes: 78 additions & 47 deletions src/sidecar/connections.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,23 @@ import {
} from "../emitters";
import { ConnectionId } from "../models/resource";
import { getResourceManager } from "../storage/resourceManager";
import { ConnectionEventAction, Message, MessageType, newMessageHeaders } from "../ws/messageTypes";
import {
clearCurrentCCloudResources,
ConnectionStateWatcher,
getLocalConnection,
hasCCloudAuthSession,
tryToCreateConnection,
tryToDeleteConnection,
tryToUpdateConnection,
waitForConnectionToBeUsable,
waitForConnectionToBeStable,
} from "./connections";

import * as connectionStatusUtils from "./connectionStatusUtils";

describe("sidecar/connections.ts", () => {
let sandbox: sinon.SinonSandbox;
let stubConnectionsResourceApi: sinon.SinonStubbedInstance<ConnectionsResourceApi>;
let clock: sinon.SinonFakeTimers;
let connectionUsableFireStub: sinon.SinonStub;

beforeEach(() => {
sandbox = sinon.createSandbox();
Expand All @@ -47,8 +49,6 @@ describe("sidecar/connections.ts", () => {
stubSidecarHandle.getConnectionsResourceApi.returns(stubConnectionsResourceApi);
// stub the getSidecar function to return the stub sidecar handle
sandbox.stub(sidecar, "getSidecar").resolves(stubSidecarHandle);
// stub the event emitter
connectionUsableFireStub = sandbox.stub(connectionUsable, "fire");
});

afterEach(() => {
Expand Down Expand Up @@ -133,6 +133,40 @@ describe("sidecar/connections.ts", () => {
setContextValue(ContextValues.ccloudConnectionAvailable, true);
assert.strictEqual(hasCCloudAuthSession(), true);
});
});

describe("sidecar/connections.ts waitForConnectionToBeUsable() tests", () => {
jlrobins marked this conversation as resolved.
Show resolved Hide resolved
const connectionStateWatcher = ConnectionStateWatcher.getInstance();

let sandbox: sinon.SinonSandbox;
let clock: sinon.SinonFakeTimers;
let connectionUsableFireStub: sinon.SinonStub;

beforeEach(() => {
sandbox = sinon.createSandbox();

// stub the event emitter
connectionUsableFireStub = sandbox.stub(connectionUsable, "fire");
});

afterEach(() => {
sandbox.restore();
});

function announceConnectionState(connection: Connection): void {
// inject a event to updated the connection state
// as if sent from sidecar from websocket.
const websocketMessage: Message<MessageType.CONNECTION_EVENT> = {
headers: newMessageHeaders(MessageType.CONNECTION_EVENT),
body: {
action: ConnectionEventAction.UPDATED,
connection: connection,
},
};

// As if had been just sent from sidecar.
connectionStateWatcher.handleConnectionUpdateEvent(websocketMessage);
}

// dynamically set up tests for `waitForConnectionToBeUsable()` using different connections and states
type ConnectionStateMatches = [
Expand Down Expand Up @@ -182,9 +216,10 @@ describe("sidecar/connections.ts", () => {
...testAuthStatus,
},
};
stubConnectionsResourceApi.gatewayV1ConnectionsIdGet.resolves(testConnection);

const connection = await waitForConnectionToBeUsable(testConnectionId);
announceConnectionState(testConnection);

const connection = await waitForConnectionToBeStable(testConnectionId);

assert.deepStrictEqual(connection, testConnection);
});
Expand All @@ -202,14 +237,13 @@ describe("sidecar/connections.ts", () => {
...testAuthStatus,
},
};
stubConnectionsResourceApi.gatewayV1ConnectionsIdGet.resolves(testConnection);
announceConnectionState(testConnection);

// set a short timeout, even though we're using fake timers
const shortTimeoutMs = 10;
const connectionPromise: Promise<Connection | null> = waitForConnectionToBeUsable(
const connectionPromise: Promise<Connection | null> = waitForConnectionToBeStable(
testConnectionId,
shortTimeoutMs,
shortTimeoutMs / 2,
);
// "wait" for the timeout to occur
await clock.tickAsync(300);
Expand All @@ -220,7 +254,7 @@ describe("sidecar/connections.ts", () => {
assert.ok(connectionUsableFireStub.calledOnce);
});

it(`${baseConnection.spec.type}: waitForConnectionToBeUsable() should continue polling if the connection is not found initially`, async () => {
it(`${baseConnection.spec.type}: waitForConnectionToBeUsable() should wait for websocket event if the connection is not found initially`, async () => {
const testConnection = {
...baseConnection,
status: {
Expand All @@ -230,46 +264,43 @@ describe("sidecar/connections.ts", () => {
...testAuthStatus,
},
};
stubConnectionsResourceApi.gatewayV1ConnectionsIdGet
.onFirstCall()
.rejects(new ResponseError(new Response(null, { status: 404 })));
stubConnectionsResourceApi.gatewayV1ConnectionsIdGet.onSecondCall().resolves(testConnection);

const connection = await waitForConnectionToBeUsable(testConnectionId);
// wrap a spy around isConnectionStable so we can check when it's called
const isConnectionStableSpy = sandbox.spy(connectionStatusUtils, "isConnectionStable");

assert.deepStrictEqual(connection, testConnection);
});
// clear out prior connection state so that top of ConnectionStateWatcher.waitForConnectionUpdate will be a cache
// miss and it has to wait for an update.
connectionStateWatcher.purgeCachedConnectionState(testConnectionId);

it(`${baseConnection.spec.type}: waitForConnectionToBeUsable() should wait for a connection to be usable`, async () => {
const pendingConnection = {
...baseConnection,
status: {
kafka_cluster: { state: pendingState },
schema_registry: { state: pendingState },
ccloud: { state: pendingState },
...testAuthStatus,
},
};
stubConnectionsResourceApi.gatewayV1ConnectionsIdGet
.onFirstCall()
.resolves(pendingConnection);

const usableConnection = {
...pendingConnection,
status: {
kafka_cluster: { state: usableKafkaClusterState },
schema_registry: { state: usableSchemaRegistryState },
ccloud: { state: usableCcloudState },
...testAuthStatus,
},
};
stubConnectionsResourceApi.gatewayV1ConnectionsIdGet
.onSecondCall()
.resolves(usableConnection);

const connection = await waitForConnectionToBeUsable(testConnectionId, 10, 5);
clock = sandbox.useFakeTimers(Date.now());

assert.deepStrictEqual(connection, usableConnection);
async function scriptEventFlow() {
// let time pass some ...
await clock.tickAsync(100);

// isConnectionStableSpy should not have been called yet.
assert.ok(isConnectionStableSpy.notCalled);

// simulate a websocket event that updates the connection state to this new stable state.
announceConnectionState(testConnection);

// And now isConnectionStable should have been called.
assert.ok(isConnectionStableSpy.calledOnce);
// and it should have returned true
assert.ok(isConnectionStableSpy.returned(true));
}

// await both the script and waitForConnectionToBeStable
const results = await Promise.all([
scriptEventFlow(),
waitForConnectionToBeStable(testConnectionId),
]);

// waitForConnectionToBeStable should have returned the connection
assert.deepStrictEqual(results[1], testConnection);
// and that isDirectConnectionStable was called (after the websocket event)
// log number of calls.
assert.ok(isConnectionStableSpy.calledOnce);
});
}
});
Loading