Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate waitForConnectionToBeUsable() to react to pushed connection event state changes #882

Merged
merged 26 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
4da348a
Working, but there are tiny gaps when there are no observers. Need to…
jlrobins Jan 8, 2025
017c285
Transition waitForConnectionToBeUsable() to be based on received even…
jlrobins Jan 9, 2025
3d541b2
Reveryt changes
jlrobins Jan 9, 2025
bc84f64
Merge remote-tracking branch 'origin/main' into wensocket_connection_…
jlrobins Jan 9, 2025
4a4325e
Purge prior cached connection state before updating a connection spec…
jlrobins Jan 9, 2025
ea91055
Fix tests
jlrobins Jan 9, 2025
17e91c0
Fix remainder tests.
jlrobins Jan 10, 2025
819dd84
Trim unneeded fat
jlrobins Jan 10, 2025
e458832
Tests over connectionStatusUtils::isConnectionStable.
jlrobins Jan 10, 2025
468dc2a
Revert stray comment.
jlrobins Jan 10, 2025
03ab948
Revert stray comment.
jlrobins Jan 10, 2025
f6cf88b
Better comments, organization.
jlrobins Jan 10, 2025
4edf39a
filter -> predicate
jlrobins Jan 10, 2025
7fec984
Changeloggery.
jlrobins Jan 10, 2025
8d20006
Merge remote-tracking branch 'origin/main' into wensocket_connection_…
jlrobins Jan 10, 2025
ce56a60
Respell away from old name in comments / test descriptions.
jlrobins Jan 10, 2025
4474164
Merge remote-tracking branch 'origin/main' into wensocket_connection_…
jlrobins Jan 10, 2025
36939ca
Address PR comments.
jlrobins Jan 10, 2025
843fff8
Address PR comments.
jlrobins Jan 10, 2025
464ed0e
Get the connection loading spinny back.
jlrobins Jan 10, 2025
3398546
Fix pasto
jlrobins Jan 10, 2025
5064a25
Remove blank line
jlrobins Jan 10, 2025
78b61fe
Fix name
jlrobins Jan 10, 2025
647c6f5
Fix corner cases of direct connection loading states.
jlrobins Jan 10, 2025
5e191d7
Fix direct connection loading state.
jlrobins Jan 10, 2025
aa60036
Cache
jlrobins Jan 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .versions/ide-sidecar.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v0.134.1
v0.139.0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need fix for ccloud connection java.time.Instant serialization-over-websocket.

3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ All notable changes to this extension will be documented in this file.

## Unreleased

### Changed
- React to websocket events pushed from sidecar instead of polling for some connection change monitoring.

## 0.23.3

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion src/authn/ccloudProvider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ describe("ConfluentCloudAuthProvider", () => {

// assume the connection is immediately usable for most tests
sandbox
.stub(connections, "waitForConnectionToBeUsable")
.stub(connections, "waitForConnectionToBeStable")
.resolves(TEST_AUTHENTICATED_CCLOUD_CONNECTION);

// don't handle the progress notification, openExternal, etc in this test suite
Expand Down
4 changes: 2 additions & 2 deletions src/authn/ccloudProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
createCCloudConnection,
deleteCCloudConnection,
getCCloudConnection,
waitForConnectionToBeUsable,
waitForConnectionToBeStable,
} from "../sidecar/connections";
import { getStorageManager } from "../storage";
import { SecretStorageKeys } from "../storage/constants";
Expand Down Expand Up @@ -152,7 +152,7 @@ export class ConfluentCloudAuthProvider implements vscode.AuthenticationProvider
throw e;
}

const authenticatedConnection = await waitForConnectionToBeUsable(CCLOUD_CONNECTION_ID);
const authenticatedConnection = await waitForConnectionToBeStable(CCLOUD_CONNECTION_ID);
if (!authenticatedConnection) {
throw new Error("CCloud connection failed to become usable after authentication.");
}
Expand Down
2 changes: 1 addition & 1 deletion src/directConnectManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ describe("DirectConnectionManager behavior", () => {
.stub(connections, "tryToUpdateConnection")
.resolves({} as any);
// assume the connection is immediately usable for most tests
sandbox.stub(connections, "waitForConnectionToBeUsable").resolves(TEST_DIRECT_CONNECTION);
sandbox.stub(connections, "waitForConnectionToBeStable").resolves(TEST_DIRECT_CONNECTION);
});

afterEach(() => {
Expand Down
7 changes: 4 additions & 3 deletions src/directConnectManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
tryToCreateConnection,
tryToDeleteConnection,
tryToUpdateConnection,
waitForConnectionToBeUsable,
waitForConnectionToBeStable,
} from "./sidecar/connections";
import { SecretStorageKeys } from "./storage/constants";
import { DirectResourceLoader } from "./storage/directResourceLoader";
Expand Down Expand Up @@ -218,6 +218,7 @@ export class DirectConnectionManager {
): Promise<{ connection: Connection | null; errorMessage: string | null }> {
let connection: Connection | null = null;
let errorMessage: string | null = null;

try {
connection = update
? await tryToUpdateConnection(spec)
Expand All @@ -230,7 +231,7 @@ export class DirectConnectionManager {
title: `Waiting for "${connection.spec.name}" to be usable...`,
},
async () => {
await waitForConnectionToBeUsable(connectionId);
await waitForConnectionToBeStable(connectionId);
},
);
}
Expand Down Expand Up @@ -298,7 +299,7 @@ export class DirectConnectionManager {
// wait for all new connections to be created before checking their status
await Promise.all(newConnectionPromises);
// kick off background checks to ensure the new connections are usable
connectionIdsToCheck.map((id) => waitForConnectionToBeUsable(id));
connectionIdsToCheck.map((id) => waitForConnectionToBeStable(id));
logger.debug(
`created and checked ${connectionIdsToCheck.length} new connection(s), firing event`,
);
Expand Down
3 changes: 1 addition & 2 deletions src/emitters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,4 @@ export const currentKafkaClusterChanged = new vscode.EventEmitter<KafkaCluster |
*/
export const currentSchemaRegistryChanged = new vscode.EventEmitter<SchemaRegistry | null>();

export const connectionLoading = new vscode.EventEmitter<ConnectionId>();
export const connectionUsable = new vscode.EventEmitter<ConnectionId>();
export const connectionStable = new vscode.EventEmitter<ConnectionId>();
6 changes: 5 additions & 1 deletion src/extension.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ import { createConfigChangeListener } from "./preferences/listener";
import { updatePreferences } from "./preferences/updates";
import { registerProjectGenerationCommand } from "./scaffold";
import { getSidecarManager, sidecarOutputChannel } from "./sidecar";
import { getCCloudAuthSession } from "./sidecar/connections";
import { ConnectionStateWatcher, getCCloudAuthSession } from "./sidecar/connections";
import { WebsocketManager } from "./sidecar/websocketManager";
import { getStorageManager, StorageManager } from "./storage";
import { SecretStorageKeys } from "./storage/constants";
Expand Down Expand Up @@ -224,6 +224,10 @@ async function _activateExtension(
// reset the Docker credentials secret so `src/docker/configs.ts` can pull it fresh
getStorageManager().deleteSecret(SecretStorageKeys.DOCKER_CREDS_SECRET_KEY);

// Watch for sidecar pushing connection state changes over websocket.
// (side effect of causing the watcher to be created)
ConnectionStateWatcher.getInstance();

const directConnectionManager = DirectConnectionManager.getInstance();
context.subscriptions.push(...directConnectionManager.disposables);

Expand Down
3 changes: 3 additions & 0 deletions src/models/environment.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ describe("EnvironmentTreeItem", () => {
schemaRegistry: undefined,
});

// Override isLoading to false due to no clusters
env.isLoading = false;

const treeItem = new EnvironmentTreeItem(env);

assert.deepStrictEqual(
Expand Down
10 changes: 9 additions & 1 deletion src/models/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ export abstract class Environment implements IResourceBase {
kafkaClusters!: KafkaCluster[];
schemaRegistry?: SchemaRegistry | undefined;

// updated by the ResourceViewProvider from connectionLoading/connectionUsable events
// updated by the ResourceViewProvider from connectionUsable events
// (DirectEnvironment instances are constructed with isLoading = true)
isLoading: boolean = false;

get hasClusters(): boolean {
Expand Down Expand Up @@ -113,6 +114,12 @@ export class DirectEnvironment extends Environment {
this.kafkaClusters = props.kafkaClusters;
this.schemaRegistry = props.schemaRegistry;
if (props.formConnectionType) this.formConnectionType = props.formConnectionType;

// newly born direct connections are loading unless we already have children.
// This will eventually mutate
// to false when the connection is stable and emitters.connectionStable fires through
// a real Rube Goldberg machine of events.
this.isLoading = !this.hasClusters;
}

get iconName(): IconNames {
Expand Down Expand Up @@ -174,6 +181,7 @@ export class EnvironmentTreeItem extends TreeItem {
// user-facing properties
this.description = isDirect(this.resource) ? "" : this.resource.id;
this.iconPath = new ThemeIcon(this.resource.iconName);

if (this.resource.isLoading) {
this.iconPath = new ThemeIcon(IconNames.LOADING);
} else if (isDirect(resource) && !resource.hasClusters) {
Expand Down
73 changes: 73 additions & 0 deletions src/sidecar/connectionStatusUtils.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// tests over isConnectionStable

import * as assert from "assert";

import {
TEST_CCLOUD_CONNECTION,
TEST_DIRECT_CONNECTION,
TEST_LOCAL_CONNECTION,
} from "../../tests/unit/testResources/connection";

import { ConnectedState, Status } from "../clients/sidecar/models";
import { isConnectionStable } from "./connectionStatusUtils";

describe("isConnectionStable", () => {
const testAuthStatus = { authentication: { status: Status.NoToken } };

it("ccloud connection tests", () => {
type CCloudConnectionStateAndResult = [ConnectedState, boolean];

// ccloud connection is stable if not in Attempting state
const testCases: CCloudConnectionStateAndResult[] = [
[ConnectedState.None, false],
[ConnectedState.Attempting, true],
[ConnectedState.Success, true],
[ConnectedState.Expired, true],
[ConnectedState.Failed, true],
];

for (const [connectedState, expectedResult] of testCases) {
const testConnection = {
...TEST_CCLOUD_CONNECTION,
status: {
ccloud: { state: connectedState },
...testAuthStatus,
},
};
assert.strictEqual(isConnectionStable(testConnection), expectedResult);
}
});

it("direct connection tests", () => {
type LocalConnectionStatesAndResult = [ConnectedState, ConnectedState, boolean];

// direct connection is stable if neither kafka nor schema registry are attempting
const testCases: LocalConnectionStatesAndResult[] = [
[ConnectedState.None, ConnectedState.None, true],
[ConnectedState.None, ConnectedState.Success, true],
[ConnectedState.Success, ConnectedState.None, true],
[ConnectedState.Attempting, ConnectedState.None, false],
[ConnectedState.None, ConnectedState.Attempting, false],
[ConnectedState.Attempting, ConnectedState.Attempting, false],
[ConnectedState.Success, ConnectedState.Success, true],
[ConnectedState.Expired, ConnectedState.Expired, true],
[ConnectedState.Failed, ConnectedState.Failed, true],
];

for (const [kafkaState, schemaRegistryState, expectedResult] of testCases) {
const testConnection = {
...TEST_DIRECT_CONNECTION,
status: {
kafka_cluster: { state: kafkaState },
schema_registry: { state: schemaRegistryState },
...testAuthStatus,
},
};
assert.strictEqual(isConnectionStable(testConnection), expectedResult);
}
});

it("should raise when asked about a local connection (not implemented yet)", () => {
assert.throws(() => isConnectionStable(TEST_LOCAL_CONNECTION));
});
});
58 changes: 58 additions & 0 deletions src/sidecar/connectionStatusUtils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// helpers for connection status testing, factored out for test spying
jlrobins marked this conversation as resolved.
Show resolved Hide resolved

import { Connection, ConnectionType } from "../clients/sidecar/models";
import { Logger } from "../logging";

const logger = new Logger("connectionStatusUtils");

export function isConnectionStable(connection: Connection): boolean {
const type = connection.spec.type!;

switch (type) {
case ConnectionType.Ccloud:
return isCCloudConnectionStable(connection);
case ConnectionType.Direct:
return isDirectConnectionStable(connection);
default:
logger.warn(`isConnectionStable: unhandled connection type ${type}`);
throw new Error(`Unhandled connection type ${type}`);
}
}

function isCCloudConnectionStable(connection: Connection): boolean {
const ccloudStatus = connection.status.ccloud!;
const ccloudState = ccloudStatus.state;

const ccloudFailed = ccloudStatus.errors?.sign_in?.message;
if (ccloudFailed) {
logger.error(`isCCloudConnectionStable(): error: ${ccloudFailed}`);
}

const rv = ccloudState !== "NONE";
logger.debug(`isCCloudConnectionStable(): returning ${rv} based on state ${ccloudState}`);

return rv;
}

function isDirectConnectionStable(connection: Connection): boolean {
const status = connection.status;

for (const [entity, maybeError] of [
["kafka", status.kafka_cluster?.errors?.sign_in?.message],
["schema registry", status.schema_registry?.errors?.sign_in?.message],
] as [string, string | undefined][]) {
if (maybeError) {
logger.error(`isDirectConnectionStable(): ${entity} error: ${maybeError}`);
}
}

const kafkaState = status.kafka_cluster?.state;
const schemaRegistryState = status.schema_registry?.state;

const rv = kafkaState !== "ATTEMPTING" && schemaRegistryState !== "ATTEMPTING";
logger.debug(
`isDirectConnectionStable(): returning ${rv} for connection ${connection.id} based on kafkaState ${kafkaState} and schemaRegistryState ${schemaRegistryState}`,
);

return rv;
}
Loading