Skip to content

Latest commit

 

History

History

ccloud-integration

Deploy Control Center, Confluent Rest Proxy, ksqlDB, Connect, and Connectors using managed Confluent Cloud Kafka

You can use Confluent for Kubernetes to deploy and manage Control Center, Confluent Rest Proxy, ksqlDB, Connect, and Connectors against a Confluent Cloud Kafka and Schema Registry.

Before continuing with the scenario, ensure that you have set up the prerequisites.

Set the current tutorial directory

Set the tutorial directory for this tutorial under the directory you downloaded the tutorial files:

export TUTORIAL_HOME=<Tutorial directory>/ccloud-integration

Deploy Confluent for Kubernetes

  1. Set up the Helm Chart:

    helm repo add confluentinc https://packages.confluent.io/helm
    
  2. Install Confluent For Kubernetes using Helm:

    helm upgrade --install operator confluentinc/confluent-for-kubernetes
    
  3. Check that the Confluent For Kubernetes pod comes up and is running:

    kubectl get pods
    

Deploy configuration secrets

Provide a Root Certificate Authority

Confluent For Kubernetes provides auto-generated certificates for Confluent Platform components to use for inter-component TLS. You'll need to generate and provide a root Certificate Authority (CA).

  1. Generate a CA pair to use in this tutorial:

    openssl genrsa -out $TUTORIAL_HOME/ca-key.pem 2048
    
    openssl req -new -key $TUTORIAL_HOME/ca-key.pem -x509 \
      -days 1000 \
      -out $TUTORIAL_HOME/ca.pem \
      -subj "/C=US/ST=CA/L=MountainView/O=Confluent/OU=Operator/CN=TestCA"
    
  2. Create a Kubernetes secret for inter-component TLS:

    kubectl create secret tls ca-pair-sslcerts \
      --cert=$TUTORIAL_HOME/ca.pem \
      --key=$TUTORIAL_HOME/ca-key.pem
    

Provide authentication credentials

Confluent Cloud provides you an API key for both Kafka and Schema Registry. Configure Confluent For Kubernetes to use the API key when setting up Connect and ksqlDB to connect.

Create a Kubernetes secret object for Confluent Cloud Kafka access. This secret object contains file based properties. These files are in the format that each respective Confluent component requires for authentication credentials.

kubectl create secret generic cloud-plain \
--from-file=plain.txt=$TUTORIAL_HOME/creds-client-kafka-sasl-user.txt
kubectl create secret generic cloud-sr-access \
--from-file=basic.txt=$TUTORIAL_HOME/creds-schemaRegistry-user.txt
kubectl create secret generic control-center-user \
--from-file=basic.txt=$TUTORIAL_HOME/creds-control-center-users.txt

Create the kafka.properties file in $TUTORIAL_HOME. Add the above endpoint and the credentials as follows:

bootstrap.servers=<cloudKafka_url>:9092
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username="<api-key>"   password="<api-secret>";
ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN

Create a configuration secret for client applications to use

kubectl create secret generic kafka-client-config-secure \
--from-file=$TUTORIAL_HOME/kafka.properties \
-n confluent

Provide custom TLS secret

Due to the following known issues, ksqldb can not use auto-generated certificates for Confluent Cloud and Control Center can not use auto-generated certificates for MDS or Confluent Cloud Schema Registry:

  1. ksqldb can not use auto-generated certificates for ccloud
  2. control center cannot use auto-generated certificates for mds or ccloud sr

As a workaround, a custom TLS secret with Let's encrypt root CA needs to be used in the Control Center or ksqldb custom resource file.

  1. Download the Let's Encrypt root CA in PEM format

  2. Add the Let's Encrypt root CA to the certificate authority ca.pem file. List the certificates by simply concatenating them, one below the other, for example:

    -----BEGIN CERTIFICATE-----
    ...
    -----END CERTIFICATE-----
    -----BEGIN CERTIFICATE-----
    ...
    -----END CERTIFICATE-----
    
  3. Create a custom-secret with ca.pem, server.pem and server-key.pem, and use this custom-secret in the Control Center/ksqldb custom resource file as required.

    kubectl create secret generic custom-secret \
    --from-file=fullchain.pem=<path_to_server.pem> \
    --from-file=cacerts.pem=<path_to_ca.pem> \
    --from-file=privkey.pem=<path_to_server-key.pem> \
    --namespace <namespace>
    

Deploy Confluent Platform

Edit the confluent-platform.yaml deployment YAML, and add your respective Confluent Cloud URLs in the following places:

  • <cloudSR_url>
  • <cloudKafka_url>

Note: ksqlDB REST cannot use autoGeneratedCerts: true, as it needs to connect to Confluent Cloud Kafka's TLS enabled endpoints. To encrypt ksqlDB REST traffic, you'll need to use custom TLS certificates you provide.

  1. Deploy Confluent Platform with the above configuration:

    kubectl apply -f $TUTORIAL_HOME/confluent-platform.yaml
    
  2. Check that all Confluent Platform resources are deployed:

    kubectl get pods
    

Validate

Validate in Control Center

Use Control Center to monitor the Confluent Platform, and see the created topic and data.

  1. Set up port forwarding to Control Center web UI from local machine:

    kubectl port-forward controlcenter-0 9021:9021
    
  2. Browse to Control Center and log in as the admin user with the Developer1 password:

    https://localhost:9021
    

Validate Confluent Rest Proxy

Use Confluent Rest Proxy to produce and consume from Confluent Cloud.

  1. Open a shell to the connect pod and create a topic, name it CFK-D3dbf1I7mx.

    kubectl -n confluent exec -it connect-0 -- bash
    
  2. Create new topic:

    kafka-topics --bootstrap-server <cloudKafka_url>:9092 \
    --command-config /mnt/secrets/kafka-client-config-secure/kafka.properties \
    --create --partitions 3 \
    --replication-factor 3 \
    --topic CFK-D3dbf1I7mx
    
  3. Post to new topic:

    for i in $(seq 100 $END); do curl -X POST \
    http://kafkarestproxy.confluent.svc.cluster.local:8082/topics/CFK-D3dbf1I7mx \
    -H 'Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json' \
    -H 'Content-Type: application/vnd.kafka.json.v2+json' \
    -d '{
    "records": [
    {
    "key": "somekey",
    "value": {"foo": "bar"}
    },
    {
    "value": [ "foo", "bar" ],
    "partition": 1
    },
    {
    "value": 53.5
    }
    ]
    }';done
    
  4. Create new group:

    curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"name": "my_consumer_instance1", "format": "json", "auto.offset.reset": "earliest"}' http://kafkarestproxy.confluent.svc.cluster.local:8082/consumers/my_json_consumer1
    
  5. Subscribe:

    curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["CFK-D3dbf1I7mx"]}' http://kafkarestproxy.confluent.svc.cluster.local:8082/consumers/my_json_consumer1/instances/my_consumer_instance1/subscription
    
  6. Wait few seconds and then consume, you might need to run the same command twice.

    curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" http://kafkarestproxy.confluent.svc.cluster.local:8082/consumers/my_json_consumer1/instances/my_consumer_instance1/records
    

Tear down

kubectl delete -f $TUTORIAL_HOME/confluent-platform.yaml
kubectl delete secrets cloud-plain cloud-sr-access control-center-user kafka-client-config-secure
kubectl delete secret ca-pair-sslcerts
helm delete operator