Deploy Control Center, Confluent Rest Proxy, ksqlDB, Connect, and Connectors using managed Confluent Cloud Kafka
You can use Confluent for Kubernetes to deploy and manage Control Center, Confluent Rest Proxy, ksqlDB, Connect, and Connectors against a Confluent Cloud Kafka and Schema Registry.
Before continuing with the scenario, ensure that you have set up the prerequisites.
Set the tutorial directory for this tutorial under the directory you downloaded the tutorial files:
export TUTORIAL_HOME=<Tutorial directory>/ccloud-integration
Set up the Helm Chart:
helm repo add confluentinc https://packages.confluent.io/helm
Install Confluent For Kubernetes using Helm:
helm upgrade --install operator confluentinc/confluent-for-kubernetes
Check that the Confluent For Kubernetes pod comes up and is running:
kubectl get pods
Confluent For Kubernetes provides auto-generated certificates for Confluent Platform components to use for inter-component TLS. You'll need to generate and provide a root Certificate Authority (CA).
Generate a CA pair to use in this tutorial:
openssl genrsa -out $TUTORIAL_HOME/ca-key.pem 2048
openssl req -new -key $TUTORIAL_HOME/ca-key.pem -x509 \ -days 1000 \ -out $TUTORIAL_HOME/ca.pem \ -subj "/C=US/ST=CA/L=MountainView/O=Confluent/OU=Operator/CN=TestCA"
Create a Kubernetes secret for inter-component TLS:
kubectl create secret tls ca-pair-sslcerts \ --cert=$TUTORIAL_HOME/ca.pem \ --key=$TUTORIAL_HOME/ca-key.pem
Confluent Cloud provides you an API key for both Kafka and Schema Registry. Configure Confluent For Kubernetes to use the API key when setting up Connect and ksqlDB to connect.
Create a Kubernetes secret object for Confluent Cloud Kafka access. This secret object contains file based properties. These files are in the format that each respective Confluent component requires for authentication credentials.
kubectl create secret generic cloud-plain \ --from-file=plain.txt=$TUTORIAL_HOME/creds-client-kafka-sasl-user.txt
kubectl create secret generic cloud-sr-access \ --from-file=basic.txt=$TUTORIAL_HOME/creds-schemaRegistry-user.txt
kubectl create secret generic control-center-user \ --from-file=basic.txt=$TUTORIAL_HOME/creds-control-center-users.txt
Create the kafka.properties file in $TUTORIAL_HOME. Add the above endpoint and the credentials as follows:
bootstrap.servers=<cloudKafka_url>:9092 security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key>" password="<api-secret>"; ssl.endpoint.identification.algorithm=https sasl.mechanism=PLAIN
Create a configuration secret for client applications to use
kubectl create secret generic kafka-client-config-secure \ --from-file=$TUTORIAL_HOME/kafka.properties \ -n confluent
Due to the following known issues, ksqldb can not use auto-generated certificates for Confluent Cloud and Control Center can not use auto-generated certificates for MDS or Confluent Cloud Schema Registry:
- ksqldb can not use auto-generated certificates for ccloud
- control center cannot use auto-generated certificates for mds or ccloud sr
As a workaround, a custom TLS secret with Let's encrypt root CA needs to be used in the Control Center or ksqldb custom resource file.
Download the Let's Encrypt root CA in PEM format
Add the Let's Encrypt root CA to the certificate authority ca.pem file. List the certificates by simply concatenating them, one below the other, for example:
-----BEGIN CERTIFICATE----- ... -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- ... -----END CERTIFICATE-----
Create a custom-secret with ca.pem, server.pem and server-key.pem, and use this custom-secret in the Control Center/ksqldb custom resource file as required.
kubectl create secret generic custom-secret \ --from-file=fullchain.pem=<path_to_server.pem> \ --from-file=cacerts.pem=<path_to_ca.pem> \ --from-file=privkey.pem=<path_to_server-key.pem> \ --namespace <namespace>
Edit the confluent-platform.yaml
deployment YAML, and add your respective
Confluent Cloud URLs in the following places:
<cloudSR_url>
<cloudKafka_url>
Note: ksqlDB REST cannot use autoGeneratedCerts: true, as it needs to connect to Confluent Cloud Kafka's TLS enabled endpoints. To encrypt ksqlDB REST traffic, you'll need to use custom TLS certificates you provide.
Deploy Confluent Platform with the above configuration:
kubectl apply -f $TUTORIAL_HOME/confluent-platform.yaml
Check that all Confluent Platform resources are deployed:
kubectl get pods
Use Control Center to monitor the Confluent Platform, and see the created topic and data.
Set up port forwarding to Control Center web UI from local machine:
kubectl port-forward controlcenter-0 9021:9021
Browse to Control Center and log in as the
admin
user with theDeveloper1
password:https://localhost:9021
Use Confluent Rest Proxy to produce and consume from Confluent Cloud.
Open a shell to the connect pod and create a topic, name it CFK-D3dbf1I7mx.
kubectl -n confluent exec -it connect-0 -- bash
Create new topic:
kafka-topics --bootstrap-server <cloudKafka_url>:9092 \ --command-config /mnt/secrets/kafka-client-config-secure/kafka.properties \ --create --partitions 3 \ --replication-factor 3 \ --topic CFK-D3dbf1I7mx
Post to new topic:
for i in $(seq 100 $END); do curl -X POST \ http://kafkarestproxy.confluent.svc.cluster.local:8082/topics/CFK-D3dbf1I7mx \ -H 'Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json' \ -H 'Content-Type: application/vnd.kafka.json.v2+json' \ -d '{ "records": [ { "key": "somekey", "value": {"foo": "bar"} }, { "value": [ "foo", "bar" ], "partition": 1 }, { "value": 53.5 } ] }';done
Create new group:
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"name": "my_consumer_instance1", "format": "json", "auto.offset.reset": "earliest"}' http://kafkarestproxy.confluent.svc.cluster.local:8082/consumers/my_json_consumer1
Subscribe:
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["CFK-D3dbf1I7mx"]}' http://kafkarestproxy.confluent.svc.cluster.local:8082/consumers/my_json_consumer1/instances/my_consumer_instance1/subscription
Wait few seconds and then consume, you might need to run the same command twice.
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" http://kafkarestproxy.confluent.svc.cluster.local:8082/consumers/my_json_consumer1/instances/my_consumer_instance1/records
kubectl delete -f $TUTORIAL_HOME/confluent-platform.yaml
kubectl delete secrets cloud-plain cloud-sr-access control-center-user kafka-client-config-secure
kubectl delete secret ca-pair-sslcerts
helm delete operator