Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Token based authentication integration with core extension #3063

Merged
merged 30 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
91871b6
Support for StreamingCredentials
ggivo Dec 2, 2024
e9d4d63
Tests & publish ReauthEvent
ggivo Dec 3, 2024
820ffff
Clean up & Format & Add ReauthenticateEvent test
ggivo Dec 4, 2024
5858286
Conditionally enable connection reauthentication based on client setting
ggivo Dec 6, 2024
779edca
Client setting for enabling reauthentication
ggivo Dec 8, 2024
21bf696
formating
ggivo Dec 8, 2024
c8ca829
Merge branch 'main' into streaming-auth
ggivo Dec 8, 2024
f3aef04
resolve conflict with main
ggivo Dec 8, 2024
631d420
format
ggivo Dec 8, 2024
6f46022
dispath using connection handler
ggivo Dec 10, 2024
b32f84c
Support multi with re-auth
ggivo Dec 11, 2024
7eaaf6b
Fix EndpointId missing in events
ggivo Dec 12, 2024
086ccf3
format
ggivo Dec 12, 2024
61158f2
Add unit tests for setCredenatials
ggivo Dec 13, 2024
9a0e513
Skip preProcessing of auth command to avoid replacing the credential …
ggivo Dec 13, 2024
6ec2846
clean up - remove dead code
ggivo Dec 13, 2024
110eb1a
Moved almost all code inside the new handler
tishun Dec 17, 2024
8e9ab48
fix inTransaction lock with dispatch command batch
ggivo Dec 17, 2024
746dd82
Remove StreamingCredentialsProvider interface.
ggivo Dec 18, 2024
31341f1
Add authentication handler to ClusterPubSub connections
ggivo Dec 18, 2024
6891281
Merge branch 'main' into streaming-auth
ggivo Dec 18, 2024
77c5058
Token based auth integration with core extension
ggivo Dec 2, 2024
3c6dbc9
rebase to address "oid" core-autx lib change
ggivo Dec 13, 2024
00de1ce
Add EntraId integration tests
ggivo Dec 16, 2024
0e0f502
StreamingCredentialsProvider replaced with RedisCredentialsProvider.s…
ggivo Dec 18, 2024
2acdb6d
pub/sub test basic functionality with entraid auth
ggivo Dec 18, 2024
8c760f1
Update src/main/java/io/lettuce/authx/TokenBasedRedisCredentialsProvi…
ggivo Dec 19, 2024
a2a8e10
Addressing review comments from @tishun
ggivo Dec 19, 2024
668b850
Bump redis-authx-core & redis-authx-entraid from 0.1.0-SNAPSHOT to 0.…
ggivo Dec 20, 2024
369341c
add java doc for
ggivo Dec 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,23 @@
</dependencyManagement>

<dependencies>

<dependency>
<groupId>redis.clients.authentication</groupId>
<artifactId>redis-authx-core</artifactId>
<version>0.1.1-beta1</version>
</dependency>
<dependency>
<groupId>redis.clients.authentication</groupId>
<artifactId>redis-authx-entraid</artifactId>
<version>0.1.1-beta1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.github.cdimascio</groupId>
<artifactId>dotenv-java</artifactId>
<version>2.2.0</version>
<scope>test</scope>
</dependency>
<!-- Start of core dependencies -->

<dependency>
Expand Down
137 changes: 137 additions & 0 deletions src/main/java/io/lettuce/authx/TokenBasedRedisCredentialsProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright 2024, Redis Ltd. and Contributors
* All rights reserved.
*
* Licensed under the MIT License.
*/
package io.lettuce.authx;
ggivo marked this conversation as resolved.
Show resolved Hide resolved

import io.lettuce.core.RedisCredentials;
import io.lettuce.core.RedisCredentialsProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import redis.clients.authentication.core.Token;
import redis.clients.authentication.core.TokenAuthConfig;
import redis.clients.authentication.core.TokenListener;
import redis.clients.authentication.core.TokenManager;

/**
* A {@link RedisCredentialsProvider} implementation that supports token-based authentication for Redis.
* <p>
* This provider uses a {@link TokenManager} to manage and renew tokens, ensuring that the Redis client can authenticate with
* Redis using a dynamically updated token. This is particularly useful in scenarios where Redis access is controlled via
* token-based authentication, such as when Redis is integrated with an identity provider like EntraID.
* </p>
* <p>
* The provider supports streaming of credentials and automatically emits new credentials whenever a token is renewed. It must
* be used with {@link io.lettuce.core.ClientOptions.ReauthenticateBehavior#ON_NEW_CREDENTIALS} to automatically re-authenticate
* connections whenever new tokens are emitted by the provider.
* </p>
* <p>
* The lifecycle of this provider is externally managed. It should be closed when there are no longer any connections using it,
* to stop the token management process and release resources.
* </p>
*
* @since 6.6
*/
public class TokenBasedRedisCredentialsProvider implements RedisCredentialsProvider, AutoCloseable {

private static final Logger log = LoggerFactory.getLogger(TokenBasedRedisCredentialsProvider.class);

private final TokenManager tokenManager;

private final Sinks.Many<RedisCredentials> credentialsSink = Sinks.many().replay().latest();

private TokenBasedRedisCredentialsProvider(TokenManager tokenManager) {
this.tokenManager = tokenManager;
}

private void init() {

TokenListener listener = new TokenListener() {

@Override
public void onTokenRenewed(Token token) {
String username = token.getUser();
char[] pass = token.getValue().toCharArray();
RedisCredentials credentials = RedisCredentials.just(username, pass);
credentialsSink.tryEmitNext(credentials);
}

@Override
public void onError(Exception exception) {
log.error("Token renew failed!", exception);
}

};

try {
tokenManager.start(listener, false);
} catch (Exception e) {
credentialsSink.tryEmitError(e);
tokenManager.stop();
throw new RuntimeException("Failed to start TokenManager", e);
}
}

/**
* Resolve the latest available credentials as a Mono.
* <p>
* This method returns a Mono that emits the most recent set of Redis credentials. The Mono will complete once the
* credentials are emitted. If no credentials are available at the time of subscription, the Mono will wait until
* credentials are available.
*
* @return a Mono that emits the latest Redis credentials
*/
@Override
public Mono<RedisCredentials> resolveCredentials() {

return credentialsSink.asFlux().next();
}

/**
* Expose the Flux for all credential updates.
* <p>
* This method returns a Flux that emits all updates to the Redis credentials. Subscribers will receive the latest
* credentials whenever they are updated. The Flux will continue to emit updates until the provider is shut down.
*
* @return a Flux that emits all updates to the Redis credentials
*/
@Override
public Flux<RedisCredentials> credentials() {
ggivo marked this conversation as resolved.
Show resolved Hide resolved

return credentialsSink.asFlux().onBackpressureLatest(); // Provide a continuous stream of credentials
}

@Override
public boolean supportsStreaming() {
return true;
}

/**
* Stop the credentials provider and clean up resources.
* <p>
* This method stops the TokenManager and completes the credentials sink, ensuring that all resources are properly released.
* It should be called when the credentials provider is no longer needed.
*/
@Override
public void close() {
credentialsSink.tryEmitComplete();
tokenManager.stop();
}

public static TokenBasedRedisCredentialsProvider create(TokenAuthConfig tokenAuthConfig) {
return create(new TokenManager(tokenAuthConfig.getIdentityProviderConfig().getProvider(),
tokenAuthConfig.getTokenManagerConfig()));
}

public static TokenBasedRedisCredentialsProvider create(TokenManager tokenManager) {
TokenBasedRedisCredentialsProvider credentialManager = new TokenBasedRedisCredentialsProvider(tokenManager);
credentialManager.init();
return credentialManager;
}

}
83 changes: 78 additions & 5 deletions src/main/java/io/lettuce/core/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public class ClientOptions implements Serializable {

public static final DisconnectedBehavior DEFAULT_DISCONNECTED_BEHAVIOR = DisconnectedBehavior.DEFAULT;

public static final ReauthenticateBehavior DEFAULT_REAUTHENTICATE_BEHAVIOUR = ReauthenticateBehavior.DEFAULT;

public static final boolean DEFAULT_PUBLISH_ON_SCHEDULER = false;

public static final boolean DEFAULT_PING_BEFORE_ACTIVATE_CONNECTION = true;
Expand Down Expand Up @@ -95,6 +97,8 @@ public class ClientOptions implements Serializable {

private final DisconnectedBehavior disconnectedBehavior;

private final ReauthenticateBehavior reauthenticateBehavior;

private final boolean publishOnScheduler;

private final boolean pingBeforeActivateConnection;
Expand Down Expand Up @@ -124,6 +128,7 @@ protected ClientOptions(Builder builder) {
this.cancelCommandsOnReconnectFailure = builder.cancelCommandsOnReconnectFailure;
this.decodeBufferPolicy = builder.decodeBufferPolicy;
this.disconnectedBehavior = builder.disconnectedBehavior;
this.reauthenticateBehavior = builder.reauthenticateBehavior;
this.publishOnScheduler = builder.publishOnScheduler;
this.pingBeforeActivateConnection = builder.pingBeforeActivateConnection;
this.protocolVersion = builder.protocolVersion;
Expand All @@ -143,6 +148,7 @@ protected ClientOptions(ClientOptions original) {
this.cancelCommandsOnReconnectFailure = original.isCancelCommandsOnReconnectFailure();
this.decodeBufferPolicy = original.getDecodeBufferPolicy();
this.disconnectedBehavior = original.getDisconnectedBehavior();
this.reauthenticateBehavior = original.getReauthenticateBehaviour();
this.publishOnScheduler = original.isPublishOnScheduler();
this.pingBeforeActivateConnection = original.isPingBeforeActivateConnection();
this.protocolVersion = original.getConfiguredProtocolVersion();
Expand Down Expand Up @@ -220,6 +226,8 @@ public static class Builder {

private TimeoutOptions timeoutOptions = DEFAULT_TIMEOUT_OPTIONS;

private ReauthenticateBehavior reauthenticateBehavior = DEFAULT_REAUTHENTICATE_BEHAVIOUR;

private boolean useHashIndexedQueue = DEFAULT_USE_HASH_INDEX_QUEUE;

protected Builder() {
Expand Down Expand Up @@ -301,6 +309,20 @@ public Builder disconnectedBehavior(DisconnectedBehavior disconnectedBehavior) {
return this;
}

/**
* Configure the {@link ReauthenticateBehavior} of the Lettuce driver. Defaults to
* {@link ReauthenticateBehavior#DEFAULT}.
*
* @param reauthenticateBehavior the {@link ReauthenticateBehavior} to use. Must not be {@code null}.
* @return {@code this}
*/
public Builder reauthenticateBehavior(ReauthenticateBehavior reauthenticateBehavior) {

LettuceAssert.notNull(reauthenticateBehavior, "ReuthenticatBehavior must not be null");
this.reauthenticateBehavior = reauthenticateBehavior;
return this;
}

/**
* Perform a lightweight {@literal PING} connection handshake when establishing a Redis connection. If {@code true}
* (default is {@code true}, {@link #DEFAULT_PING_BEFORE_ACTIVATE_CONNECTION}), every connection and reconnect will
Expand Down Expand Up @@ -505,11 +527,12 @@ public ClientOptions.Builder mutate() {

builder.autoReconnect(isAutoReconnect()).cancelCommandsOnReconnectFailure(isCancelCommandsOnReconnectFailure())
.decodeBufferPolicy(getDecodeBufferPolicy()).disconnectedBehavior(getDisconnectedBehavior())
.readOnlyCommands(getReadOnlyCommands()).publishOnScheduler(isPublishOnScheduler())
.pingBeforeActivateConnection(isPingBeforeActivateConnection()).protocolVersion(getConfiguredProtocolVersion())
.requestQueueSize(getRequestQueueSize()).scriptCharset(getScriptCharset()).jsonParser(getJsonParser())
.socketOptions(getSocketOptions()).sslOptions(getSslOptions())
.suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure()).timeoutOptions(getTimeoutOptions());
.reauthenticateBehavior(getReauthenticateBehaviour()).readOnlyCommands(getReadOnlyCommands())
.publishOnScheduler(isPublishOnScheduler()).pingBeforeActivateConnection(isPingBeforeActivateConnection())
.protocolVersion(getConfiguredProtocolVersion()).requestQueueSize(getRequestQueueSize())
.scriptCharset(getScriptCharset()).jsonParser(getJsonParser()).socketOptions(getSocketOptions())
.sslOptions(getSslOptions()).suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure())
.timeoutOptions(getTimeoutOptions());

return builder;
}
Expand Down Expand Up @@ -573,6 +596,16 @@ public DisconnectedBehavior getDisconnectedBehavior() {
return disconnectedBehavior;
}

/**
* Behavior for re-authentication when the {@link RedisCredentialsProvider} emits new credentials. Defaults to
* {@link ReauthenticateBehavior#DEFAULT}.
*
* @return the currently set {@link ReauthenticateBehavior}.
*/
public ReauthenticateBehavior getReauthenticateBehaviour() {
return reauthenticateBehavior;
}

/**
* Predicate to identify commands as read-only. Defaults to {@link #DEFAULT_READ_ONLY_COMMANDS}.
*
Expand Down Expand Up @@ -704,6 +737,46 @@ public TimeoutOptions getTimeoutOptions() {
return timeoutOptions;
}

/**
* Defines the re-authentication behavior of the Redis client.
* <p/>
* Certain implementations of the {@link RedisCredentialsProvider} could emit new credentials at runtime. This setting
* controls how the driver reacts to these newly emitted credentials.
*/
public enum ReauthenticateBehavior {

/**
* This is the default behavior. The client will fetch current credentials from the underlying
* {@link RedisCredentialsProvider} only when the driver needs to, e.g. when the connection is first established or when
* it is re-established after a disconnect.
* <p/>
* <p>
* No re-authentication is performed when new credentials are emitted by a {@link RedisCredentialsProvider} that
* supports streaming. The client does not subscribe to or react to any updates in the credential stream provided by
* {@link RedisCredentialsProvider#credentials()}.
* </p>
*/
DEFAULT,

/**
* Automatically triggers re-authentication whenever new credentials are emitted by a {@link RedisCredentialsProvider}
* that supports streaming, as indicated by {@link RedisCredentialsProvider#supportsStreaming()}.
*
* <p>
* When this behavior is enabled, the client subscribes to the credential stream provided by
* {@link RedisCredentialsProvider#credentials()} and issues an {@code AUTH} command to the Redis server each time new
* credentials are received. This behavior supports dynamic credential scenarios, such as token-based authentication, or
* credential rotation where credentials are refreshed periodically to maintain access.
* </p>
*
* <p>
* Note: {@code AUTH} commands issued as part of this behavior may interleave with user-submitted commands, as the
* client performs re-authentication independently of user command flow.
* </p>
*/
ON_NEW_CREDENTIALS
}

/**
* Whether we should use hash indexed queue, which provides O(1) remove(Object)
*
Expand Down
Loading
Loading