diff --git a/.travis.yml b/.travis.yml index d303e439..fc66ce6a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,6 +13,7 @@ jobs: - hydra - hydra-jwt - mockoauth + - kerberos apt: packages: - maven @@ -26,6 +27,7 @@ jobs: - hydra - hydra-jwt - mockoauth + - kerberos apt: packages: - maven @@ -43,6 +45,7 @@ addons: - hydra - hydra-jwt - mockoauth + - kerberos env: global: - PULL_REQUEST=${TRAVIS_PULL_REQUEST} diff --git a/.travis/build.sh b/.travis/build.sh index 834e59df..d9f53c1c 100755 --- a/.travis/build.sh +++ b/.travis/build.sh @@ -2,7 +2,7 @@ set -e clearDockerEnv() { - docker rm -f kafka zookeeper keycloak keycloak-import hydra hydra-import hydra-jwt hydra-jwt-import || true + docker rm -f kafka zookeeper keycloak keycloak-import hydra hydra-import hydra-jwt hydra-jwt-import kerberos || true DOCKER_TEST_NETWORKS=$(docker network ls | grep test | awk '{print $1}') [ "$DOCKER_TEST_NETWORKS" != "" ] && docker network rm $DOCKER_TEST_NETWORKS } diff --git a/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/OAuthKafkaPrincipalBuilder.java b/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/OAuthKafkaPrincipalBuilder.java index f8abb82b..8f9de53f 100644 --- a/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/OAuthKafkaPrincipalBuilder.java +++ b/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/OAuthKafkaPrincipalBuilder.java @@ -18,10 +18,12 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.security.auth.SaslAuthenticationContext; import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder; +import org.apache.kafka.common.security.kerberos.KerberosShortNamer; import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule; import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslServer; import org.apache.kafka.common.security.plain.internals.PlainSaslServer; +import javax.security.auth.kerberos.KerberosPrincipal; import javax.security.sasl.SaslServer; import java.io.IOException; import java.lang.reflect.Field; @@ -51,7 +53,8 @@ */ public class OAuthKafkaPrincipalBuilder extends DefaultKafkaPrincipalBuilder implements Configurable { - private static final SetAccessibleAction SET_PRINCIPAL_MAPPER = SetAccessibleAction.newInstance(); + private static final SetAccessibleAction SET_PRINCIPAL_MAPPER = SetAccessibleAction.newInstance("sslPrincipalMapper"); + private static final SetAccessibleAction SET_KERBEROS_SHORT_NAMER = SetAccessibleAction.newInstance("kerberosShortNamer"); private static final int OAUTH_DATA_TAG = 575; @@ -74,9 +77,9 @@ void invoke(DefaultKafkaPrincipalBuilder target, Object value) throws IllegalAcc field.set(target, value); } - static SetAccessibleAction newInstance() { + static SetAccessibleAction newInstance(String fieldName) { try { - return new SetAccessibleAction(DefaultKafkaPrincipalBuilder.class.getDeclaredField("sslPrincipalMapper")); + return new SetAccessibleAction(DefaultKafkaPrincipalBuilder.class.getDeclaredField(fieldName)); } catch (NoSuchFieldException e) { throw new IllegalStateException("Failed to install OAuthKafkaPrincipalBuilder. This Kafka version does not seem to be supported", e); } @@ -91,12 +94,21 @@ public OAuthKafkaPrincipalBuilder() { super(null, null); } + + @Override public void configure(Map configs) { Object sslPrincipalMappingRules = configs.get(BrokerSecurityConfigs.SSL_PRINCIPAL_MAPPING_RULES_CONFIG); Object sslPrincipalMapper; + + @SuppressWarnings("unchecked") + List principalToLocalRules = (List) configs.get(BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG); + String defaultRealm; + Object kerberosShortNamer; + + try { Class clazz = Class.forName("org.apache.kafka.common.security.ssl.SslPrincipalMapper"); try { @@ -120,6 +132,18 @@ public void configure(Map configs) { SET_PRINCIPAL_MAPPER.invoke(this, sslPrincipalMapper); + try { + defaultRealm = new KerberosPrincipal("tmp", 1).getRealm(); + } catch (Exception ex) { + defaultRealm = ""; + } + + if (principalToLocalRules != null) { + kerberosShortNamer = KerberosShortNamer.fromUnparsedRules(defaultRealm, principalToLocalRules); + SET_KERBEROS_SHORT_NAMER.invoke(this, kerberosShortNamer); + } + + } catch (RuntimeException | ClassNotFoundException | NoSuchMethodException diff --git a/testsuite/README.md b/testsuite/README.md index f82c08c3..8f913c31 100644 --- a/testsuite/README.md +++ b/testsuite/README.md @@ -20,6 +20,7 @@ Then, you have to add some entries to your `/etc/hosts` file: 127.0.0.1 hydra-jwt 127.0.0.1 kafka 127.0.0.1 mockoauth + 127.0.0.1 kerberos That's needed for host resolution, because Kafka brokers and Kafka clients connecting to Keycloak / Hydra have to use the same hostname to ensure compatibility of generated access tokens. diff --git a/testsuite/docker/kerberos/Dockerfile b/testsuite/docker/kerberos/Dockerfile new file mode 100644 index 00000000..5b1a2c29 --- /dev/null +++ b/testsuite/docker/kerberos/Dockerfile @@ -0,0 +1,10 @@ +FROM ubuntu:22.04 + +RUN DEBIAN_FRONTEND=noninteractive apt-get update -y && apt-get install -y krb5-kdc krb5-admin-server + +EXPOSE 88 749 + +ADD ./config.sh /config.sh + +ENTRYPOINT ["/config.sh"] + diff --git a/testsuite/docker/kerberos/config.sh b/testsuite/docker/kerberos/config.sh new file mode 100755 index 00000000..45df02f8 --- /dev/null +++ b/testsuite/docker/kerberos/config.sh @@ -0,0 +1,97 @@ +#!/bin/bash + +[[ "TRACE" ]] && set -x + +: ${REALM:=KERBEROS} +: ${DOMAIN_REALM:=kerberos} +: ${KERB_MASTER_KEY:=masterkey} +: ${KERB_ADMIN_USER:=admin} +: ${KERB_ADMIN_PASS:=admin} +: ${KAFKA_USER:=kafka} +: ${KAFKA_HOST:=kafka} +: ${KAFKA_CLIENT_USER:=client} + +fix_nameserver() { + cat>/etc/resolv.conf</etc/krb5.conf< /etc/krb5kdc/kadm5.acl +} + +create_kafka_user() { + kadmin.local -q "addprinc -randkey $KAFKA_HOST/$KAFKA_USER@$REALM" + kadmin.local -q "ktadd -k /keytabs/kafka_broker.keytab $KAFKA_HOST/$KAFKA_USER@$REALM" + kadmin.local -q "addprinc -randkey $KAFKA_HOST/$KAFKA_CLIENT_USER@$REALM" + kadmin.local -q "ktadd -k /keytabs/kafka_client.keytab $KAFKA_HOST/$KAFKA_CLIENT_USER@$REALM" + chmod 666 /keytabs/kafka_broker.keytab + chmod 666 /keytabs/kafka_client.keytab +} + + + +if [ ! -f /kerberos_initialized ]; then + mkdir -p /var/log/kerberos + create_config + create_db + create_admin_user + create_kafka_user + start_kdc + + touch /kerberos_initialized +else + start_kdc +fi + +tail -F /var/log/kerberos/krb5kdc.log diff --git a/testsuite/docker/kerberos/kafka_server_jaas.conf b/testsuite/docker/kerberos/kafka_server_jaas.conf new file mode 100644 index 00000000..5644b473 --- /dev/null +++ b/testsuite/docker/kerberos/kafka_server_jaas.conf @@ -0,0 +1,7 @@ +KafkaServer { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + storeKey=true + keyTab="/opt/kafka/keytabs/kafka_broker.keytab" + principal="kafka/kafka@KERBEROS"; +}; diff --git a/testsuite/docker/kerberos/krb5.conf b/testsuite/docker/kerberos/krb5.conf new file mode 100644 index 00000000..c1014b33 --- /dev/null +++ b/testsuite/docker/kerberos/krb5.conf @@ -0,0 +1,21 @@ +[logging] + default = FILE:/var/log/kerberos/krb5libs.log + kdc = FILE:/var/log/kerberos/krb5kdc.log + admin_server = FILE:/var/log/kerberos/kadmind.log +[libdefaults] + default_realm = KERBEROS + dns_lookup_realm = false + dns_lookup_kdc = false + ticket_lifetime = 24h + renew_lifetime = 7d + forwardable = true + rdns = false + ignore_acceptor_hostname = true +[realms] + KERBEROS = { + kdc = kerberos + admin_server = kerberos + } +[domain_realm] + .kerberos = KERBEROS + kerberos = KERBEROS diff --git a/testsuite/keycloak-authz-zk-tests/src/test/java/io/strimzi/testsuite/oauth/authz/kraft/KeycloakZKAuthorizationTests.java b/testsuite/keycloak-authz-zk-tests/src/test/java/io/strimzi/testsuite/oauth/authz/kraft/KeycloakZKAuthorizationTests.java index 4ea0534f..a5733e09 100644 --- a/testsuite/keycloak-authz-zk-tests/src/test/java/io/strimzi/testsuite/oauth/authz/kraft/KeycloakZKAuthorizationTests.java +++ b/testsuite/keycloak-authz-zk-tests/src/test/java/io/strimzi/testsuite/oauth/authz/kraft/KeycloakZKAuthorizationTests.java @@ -116,6 +116,7 @@ public void doTest() throws Exception { } catch (Throwable e) { log.error("Keycloak ZK Authorization Test failed: ", e); + e.printStackTrace(); throw e; } } diff --git a/testsuite/mockoauth-tests/docker-compose-kerberos.yml b/testsuite/mockoauth-tests/docker-compose-kerberos.yml new file mode 100644 index 00000000..e6cf359e --- /dev/null +++ b/testsuite/mockoauth-tests/docker-compose-kerberos.yml @@ -0,0 +1,156 @@ +version: '3' + +services: + mockoauth: + image: testsuite/mock-oauth-server + ports: + - "8090:8090" + - "8091:8091" + - "5005:5005" + volumes: + - ${PWD}/../docker/certificates:/application/config + + environment: + #- JAVA_DEBUG=y + #- DEBUG_SUSPEND_FLAG=y + #- JAVA_DEBUG_PORT=0.0.0.0:5005 + + - KEYSTORE_ONE_PATH=/application/config/mockoauth.server.keystore.p12 + - KEYSTORE_ONE_PASSWORD=changeit + - KEYSTORE_TWO_PATH=/application/config/mockoauth.server.keystore_2.p12 + - KEYSTORE_TWO_PASSWORD=changeit + - KEYSTORE_EXPIRED_PATH=/application/config/mockoauth.server.keystore_expired.p12 + - KEYSTORE_EXPIRED_PASSWORD=changeit + + kafka: + image: ${KAFKA_DOCKER_IMAGE} + ports: + - "9091:9091" + - "9092:9092" + - "9093:9093" + - "9094:9094" + - "9095:9095" + - "9096:9096" + - "9097:9097" + - "9098:9098" + - "9099:9099" + - "9404:9404" + + # Debug port + - "5006:5006" + volumes: + - ${PWD}/../docker/target/kafka/libs:/opt/kafka/libs/strimzi + - ${PWD}/../docker/kafka/config:/opt/kafka/config/strimzi + - ${PWD}/../docker/target/kafka/certs:/opt/kafka/config/strimzi/certs + - ${PWD}/../docker/kafka/scripts:/opt/kafka/strimzi + - ${PWD}/../docker/kerberos/krb5.conf:/etc/krb5.conf + - ${PWD}/../docker/kerberos/kafka_server_jaas.conf:/opt/kafka/kafka_server_jaas.conf + - ${PWD}/../docker/kerberos/keys:/opt/kafka/keytabs + command: + - /bin/bash + - -c + #- sleep 10000 + - cd /opt/kafka/strimzi && ./start_no_wait.sh + environment: + #- KAFKA_DEBUG=y + #- DEBUG_SUSPEND_FLAG=y + #- JAVA_DEBUG_PORT=0.0.0.0:5006 + + - KAFKA_BROKER_ID=1 + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + - KAFKA_LISTENERS=INTERBROKER://kafka:9091,JWT://kafka:9092,INTROSPECT://kafka:9093,JWTPLAIN://kafka:9094,PLAIN://kafka:9095,INTROSPECTTIMEOUT://kafka:9096,FAILINGINTROSPECT://kafka:9097,FAILINGJWT://kafka:9098,KERBEROS://kafka:9099 + - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERBROKER:PLAINTEXT,JWT:SASL_PLAINTEXT,INTROSPECT:SASL_PLAINTEXT,JWTPLAIN:SASL_PLAINTEXT,PLAIN:SASL_PLAINTEXT,INTROSPECTTIMEOUT:SASL_PLAINTEXT,FAILINGINTROSPECT:SASL_PLAINTEXT,FAILINGJWT:SASL_PLAINTEXT,KERBEROS:SASL_PLAINTEXT + - KAFKA_SASL_ENABLED_MECHANISMS=OAUTHBEARER + - KAFKA_INTER_BROKER_LISTENER_NAME=INTERBROKER + + # Common settings for all the listeners + # username extraction from JWT token claim + - KAFKA_PRINCIPAL_BUILDER_CLASS=io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder + - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 + + # Configuration of individual listeners + - KAFKA_LISTENER_NAME_INTROSPECT_OAUTHBEARER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required oauth.config.id=\"INTROSPECT\" oauth.introspection.endpoint.uri=\"https://mockoauth:8090/introspect\" oauth.client.id=\"unused\" oauth.client.secret=\"unused-secret\" oauth.valid.issuer.uri=\"https://mockoauth:8090\" unsecuredLoginStringClaim_sub=\"admin\" ; + - KAFKA_LISTENER_NAME_INTROSPECT_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler + #- KAFKA_LISTENER_NAME_INTROSPECT_OAUTHBEARER_SASL_LOGIN_CALLBACK_HANDLER_CLASS=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler + + - KAFKA_LISTENER_NAME_JWT_OAUTHBEARER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required oauth.config.id=\"JWT\" oauth.fail.fast=\"false\" oauth.jwks.endpoint.uri=\"https://mockoauth:8090/jwks\" oauth.jwks.refresh.seconds=\"10\" oauth.valid.issuer.uri=\"https://mockoauth:8090\" oauth.check.access.token.type=\"false\" unsecuredLoginStringClaim_sub=\"admin\" ; + - KAFKA_LISTENER_NAME_JWT_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler + + - KAFKA_LISTENER_NAME_JWTPLAIN_SASL_ENABLED_MECHANISMS=OAUTHBEARER,PLAIN + - KAFKA_LISTENER_NAME_JWTPLAIN_OAUTHBEARER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required oauth.config.id=\"JWTPLAIN\" oauth.fail.fast=\"false\" oauth.jwks.endpoint.uri=\"https://mockoauth:8090/jwks\" oauth.valid.issuer.uri=\"https://mockoauth:8090\" unsecuredLoginStringClaim_sub=\"admin\" ; + - KAFKA_LISTENER_NAME_JWTPLAIN_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler + + - KAFKA_LISTENER_NAME_JWTPLAIN_PLAIN_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required oauth.config.id=\"JWTPLAIN\" oauth.token.endpoint.uri=\"https://mockoauth:8090/token\" oauth.fail.fast=\"false\" oauth.jwks.endpoint.uri=\"https://mockoauth:8090/jwks\" oauth.valid.issuer.uri=\"https://mockoauth:8090\" unsecuredLoginStringClaim_sub=\"admin\" ; + - KAFKA_LISTENER_NAME_JWTPLAIN_PLAIN_SASL_SERVER_CALLBACK_HANDLER_CLASS=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler + + - KAFKA_LISTENER_NAME_PLAIN_SASL_ENABLED_MECHANISMS=PLAIN + - KAFKA_LISTENER_NAME_PLAIN_PLAIN_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-password\" user_admin=\"admin-password\" ; + + # The 'oauth.connect.timeout.seconds' should be overridden by env var OAUTH_CONNECT_TIMEOUT_SECONDS, so it should be 10 seconds + - KAFKA_LISTENER_NAME_INTROSPECTTIMEOUT_OAUTHBEARER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required oauth.config.id=\"INTROSPECTTIMEOUT\" oauth.connect.timeout.seconds=\"5\" oauth.introspection.endpoint.uri=\"https://mockoauth:8090/introspect\" oauth.client.id=\"kafka\" oauth.client.secret=\"kafka-secret\" oauth.valid.issuer.uri=\"https://mockoauth:8090\" unsecuredLoginStringClaim_sub=\"admin\" ; + - KAFKA_LISTENER_NAME_INTROSPECTTIMEOUT_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler + + - KAFKA_LISTENER_NAME_FAILINGINTROSPECT_SASL_ENABLED_MECHANISMS=OAUTHBEARER,PLAIN + - KAFKA_LISTENER_NAME_FAILINGINTROSPECT_OAUTHBEARER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required oauth.config.id=\"FAILINGINTROSPECT\" oauth.introspection.endpoint.uri=\"https://mockoauth:8090/failing_introspect\" oauth.userinfo.endpoint.uri=\"https://mockoauth:8090/failing_userinfo\" oauth.username.claim=\"uid\" oauth.client.id=\"kafka\" oauth.client.secret=\"kafka-secret\" oauth.valid.issuer.uri=\"https://mockoauth:8090\" oauth.http.retries=\"1\" oauth.http.retry.pause.millis=\"3000\" unsecuredLoginStringClaim_sub=\"admin\" ; + - KAFKA_LISTENER_NAME_FAILINGINTROSPECT_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler + + - KAFKA_LISTENER_NAME_FAILINGINTROSPECT_PLAIN_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required oauth.config.id=\"FAILINGINTROSPECT\" oauth.token.endpoint.uri=\"https://mockoauth:8090/failing_token\" oauth.introspection.endpoint.uri=\"https://mockoauth:8090/failing_introspect\" oauth.userinfo.endpoint.uri=\"https://mockoauth:8090/failing_userinfo\" oauth.username.claim=\"uid\" oauth.client.id=\"kafka\" oauth.client.secret=\"kafka-secret\" oauth.valid.issuer.uri=\"https://mockoauth:8090\" oauth.http.retries=\"1\" oauth.http.retry.pause.millis=\"3000\" unsecuredLoginStringClaim_sub=\"admin\" ; + - KAFKA_LISTENER_NAME_FAILINGINTROSPECT_PLAIN_SASL_SERVER_CALLBACK_HANDLER_CLASS=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler + + - KAFKA_LISTENER_NAME_FAILINGJWT_SASL_ENABLED_MECHANISMS=OAUTHBEARER,PLAIN + - KAFKA_LISTENER_NAME_FAILINGJWT_OAUTHBEARER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required oauth.config.id=\"FAILINGJWT\" oauth.fail.fast=\"false\" oauth.check.access.token.type=\"false\" oauth.jwks.endpoint.uri=\"https://mockoauth:8090/jwks\" oauth.jwks.refresh.seconds=\"10\" oauth.valid.issuer.uri=\"https://mockoauth:8090\" unsecuredLoginStringClaim_sub=\"admin\" ; + - KAFKA_LISTENER_NAME_FAILINGJWT_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler + + - KAFKA_LISTENER_NAME_FAILINGJWT_PLAIN_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required oauth.config.id=\"FAILINGJWT\" oauth.fail.fast=\"false\" oauth.check.access.token.type=\"false\" oauth.jwks.endpoint.uri=\"https://mockoauth:8090/jwks\" oauth.jwks.refresh.seconds=\"10\" oauth.valid.issuer.uri=\"https://mockoauth:8090\" oauth.token.endpoint.uri=\"https://mockoauth:8090/failing_token\" oauth.http.retries=\"1\" oauth.http.retry.pause.millis=\"3000\" unsecuredLoginStringClaim_sub=\"admin\" ; + - KAFKA_LISTENER_NAME_FAILINGJWT_PLAIN_SASL_SERVER_CALLBACK_HANDLER_CLASS=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler + + + # Truststore config for connecting to secured authorization server + - OAUTH_SSL_TRUSTSTORE_LOCATION=/opt/kafka/config/strimzi/certs/ca-truststore.p12 + - OAUTH_SSL_TRUSTSTORE_PASSWORD=changeit + - OAUTH_SSL_TRUSTSTORE_TYPE=pkcs12 + - OAUTH_CONNECT_TIMEOUT_SECONDS=10 + - OAUTH_READ_TIMEOUT_SECONDS=10 + + + # OAuth metrics configuration + + - OAUTH_ENABLE_METRICS=true + # When enabling metrics we also have to explicitly configure JmxReporter to have metrics available in JMX + # The following value will be available as env var STRIMZI_OAUTH_METRIC_REPORTERS + - STRIMZI_OAUTH_METRIC_REPORTERS=org.apache.kafka.common.metrics.JmxReporter + + # The following value will turn into 'strimzi.oauth.metric.reporters=...' in 'strimzi.properties' file + # However, that won't work as the value may be filtered to the component that happens to initialise OAuthMetrics + #- KAFKA_STRIMZI_OAUTH_METRIC_REPORTERS=org.apache.kafka.common.metrics.JmxReporter + - KAFKA_LISTENER_NAME_KERBEROS_SASL_KERBEROS_SERVICE_NAME=kafka + - KAFKA_LISTENER_NAME_KERBEROS_SASL_ENABLED_MECHANISMS=GSSAPI + - KAFKA_OPTS=-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/opt/kafka/kafka_server_jaas.conf + + zookeeper: + image: ${KAFKA_DOCKER_IMAGE} + ports: + - "2181:2181" + volumes: + - ${PWD}/../docker/zookeeper/scripts:/opt/kafka/strimzi + - ${PWD}/../docker/kafka/kerberos/keys:/keytabs + command: + - /bin/bash + - -c + - cd /opt/kafka/strimzi && ./start.sh + environment: + - LOG_DIR=/tmp/logs + kerberos: + build: ${PWD}/../docker/kerberos + hostname: 'kerberos' + environment: + - REALM=KERBEROS + - DOMAIN_REALM=kerberos + - KERB_MASTER_KEY=masterkey + - KERB_ADMIN_USER=admin + - KERB_ADMIN_PASS=admin + volumes: + - ${PWD}/../docker/kerberos/keys:/keytabs + ports: + - "749:749" + - "88:88/udp" diff --git a/testsuite/mockoauth-tests/docker-compose.yml b/testsuite/mockoauth-tests/docker-compose.yml index 9af6e973..cdec1dbe 100644 --- a/testsuite/mockoauth-tests/docker-compose.yml +++ b/testsuite/mockoauth-tests/docker-compose.yml @@ -33,6 +33,7 @@ services: - "9096:9096" - "9097:9097" - "9098:9098" + - "9099:9099" - "9404:9404" # Debug port @@ -45,6 +46,7 @@ services: command: - /bin/bash - -c + #- sleep 10000 - cd /opt/kafka/strimzi && ./start_no_wait.sh environment: #- KAFKA_DEBUG=y diff --git a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/MockOAuthTests.java b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/MockOAuthTests.java index 5d4791f5..ac2a0c2c 100644 --- a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/MockOAuthTests.java +++ b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/MockOAuthTests.java @@ -4,6 +4,7 @@ */ package io.strimzi.testsuite.oauth; +import io.strimzi.kafka.oauth.common.Config; import io.strimzi.testsuite.oauth.common.TestContainersLogCollector; import io.strimzi.testsuite.oauth.common.TestContainersWatcher; import io.strimzi.testsuite.oauth.mockoauth.AuthorizationEndpointsTest; @@ -16,6 +17,7 @@ import io.strimzi.testsuite.oauth.mockoauth.KeycloakAuthorizerTest; import io.strimzi.testsuite.oauth.mockoauth.PasswordAuthTest; import io.strimzi.testsuite.oauth.mockoauth.RetriesTests; +import io.strimzi.testsuite.oauth.mockoauth.KerberosListenerTest; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -37,14 +39,28 @@ */ public class MockOAuthTests { + static boolean includeKerberosTests = new Config(System.getProperties()).getValueAsBoolean("oauth.testsuite.test.kerberos", true); + @ClassRule public static TestContainersWatcher environment = - new TestContainersWatcher(new File("docker-compose.yml")) - .withServices("mockoauth", "kafka", "zookeeper") - .waitingFor("mockoauth", Wait.forLogMessage(".*Succeeded in deploying verticle.*", 1) - .withStartupTimeout(Duration.ofSeconds(180))) - .waitingFor("kafka", Wait.forLogMessage(".*started \\(kafka.server.KafkaServer\\).*", 1) - .withStartupTimeout(Duration.ofSeconds(180))); + initWatcher(); + + private static TestContainersWatcher initWatcher() { + TestContainersWatcher watcher = new TestContainersWatcher(new File(includeKerberosTests ? "docker-compose-kerberos.yml" : "docker-compose.yml")); + if (includeKerberosTests) { + watcher.withServices("mockoauth", "kerberos", "kafka", "zookeeper") + .waitingFor("kerberos", Wait.forLogMessage(".*commencing operation.*", 1) + .withStartupTimeout(Duration.ofSeconds(45))); + } else { + watcher.withServices("mockoauth", "kafka", "zookeeper"); + } + watcher.waitingFor("mockoauth", Wait.forLogMessage(".*Succeeded in deploying verticle.*", 1) + .withStartupTimeout(Duration.ofSeconds(180))) + .waitingFor("kafka", Wait.forLogMessage(".*started \\(kafka.server.KafkaServer\\).*", 1) + .withStartupTimeout(Duration.ofSeconds(180))); + + return watcher; + } @Rule public TestRule logCollector = new TestContainersLogCollector(environment); @@ -95,6 +111,11 @@ public void runTests() throws Exception { logStart("ClientAssertionAuthTest :: Client Assertion Tests"); new ClientAssertionAuthTest().doTest(); + if (includeKerberosTests) { + logStart("KerberosTests :: Test authentication with Kerberos"); + new KerberosListenerTest().doTests(); + } + } catch (Throwable e) { log.error("Exception has occurred: ", e); throw e; diff --git a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/KerberosListenerTest.java b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/KerberosListenerTest.java new file mode 100644 index 00000000..69cff3c9 --- /dev/null +++ b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/KerberosListenerTest.java @@ -0,0 +1,94 @@ +/* + * Copyright 2017-2023, Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.testsuite.oauth.mockoauth; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.Assert; + +import java.io.File; +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +public class KerberosListenerTest { + + private static final String TOPIC_NAME = "Kerberos-Test-Topic"; + private static final long CONSUMER_TIMEOUT = 10000L; + private static final int MESSAGE_COUNT = 100; + + public void doTests() throws Exception { + + File keyTab = new File("../docker/kerberos/keys/kafka_client.keytab"); + Assert.assertTrue(keyTab.exists()); + Assert.assertTrue(keyTab.canRead()); + + Properties props = new Properties(); + props.put("security.protocol", "SASL_PLAINTEXT"); + props.put("sasl.kerberos.service.name", "kafka"); + props.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab='../docker/kerberos/keys/kafka_client.keytab' principal='kafka/client@KERBEROS';"); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9099"); + + Admin admin = Admin.create(props); + CreateTopicsResult result = admin.createTopics(Collections.singleton(new NewTopic(TOPIC_NAME, (short) 1, (short) 1))); + try { + result.all().get(); + } catch (Exception e) { + Assert.fail("Failed to create topic on Kerberos listener because of " + e.getMessage()); + e.printStackTrace(); + } + + Properties producerProps = (Properties) props.clone(); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + KafkaProducer producer = new KafkaProducer<>(producerProps); + + for (int i = 0; i < MESSAGE_COUNT; i++) { + try { + producer.send(new ProducerRecord<>(TOPIC_NAME, String.format("message_%d", i))).get(); + } catch (ExecutionException e) { + Assert.fail("Failed to produce to Kerberos listener because of " + e.getCause()); + e.printStackTrace(); + } + } + + Properties consumerProps = (Properties) props.clone(); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, String.format("kerberos_listener_test_%d", System.currentTimeMillis())); + + KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); + + TopicPartition tp = new TopicPartition(TOPIC_NAME, 0); + consumer.assign(Collections.singleton(tp)); + consumer.seekToBeginning(consumer.assignment()); + long startTime = System.currentTimeMillis(); + + int receiveCount = 0; + + while (System.currentTimeMillis() - startTime < CONSUMER_TIMEOUT) { + ConsumerRecords results = consumer.poll(Duration.ofMillis(300)); + for (ConsumerRecord record : results) { + if (record.value().startsWith("message_")) receiveCount++; + } + } + Assert.assertEquals("Kerberos listener consumer should consume all messsages", MESSAGE_COUNT, receiveCount); + + } + +} diff --git a/testsuite/pom.xml b/testsuite/pom.xml index 7025372f..9e624392 100644 --- a/testsuite/pom.xml +++ b/testsuite/pom.xml @@ -292,7 +292,9 @@ ${kafka.docker.image} + -Djava.security.krb5.conf=${project.basedir}/../docker/kerberos/krb5.conf +