Last-mile data streaming. Stream real-time Kafka data to mobile and web apps, anywhere. Scale Kafka to millions of clients.
- Introduction
- Architecture
- QUICK START: Set up in 5 minutes
- Deployment
- Configuration
- Client Side Error Handling
- Customize the Kafka Connector Metadata Adapter Class
- Kafka Lightstreamer Sink Connector
- Docs
- Examples
Is your product struggling to deliver Kafka events to remote users? The Lightstreamer Kafka Connector is an intelligent proxy that bridges the gap, providing seamless, real-time data streaming to web and mobile applications with unmatched ease and reliability. It streams data in real time to your apps over WebSockets, eliminating the need for polling a REST proxy and surpassing the limitations of MQTT.
Kafka, while powerful, isn’t designed for direct internet access—particularly when it comes to the last mile, the critical network segment that extends beyond enterprise boundaries and edges (LAN or WAN) to reach end users. Last-mile integration is essential for delivering real-time Kafka data to mobile, web, and desktop applications, addressing challenges that go beyond Kafka’s typical scope, such as:
- Disruptions from corporate firewalls and client-side proxies blocking Kafka connections.
- Performance issues due to unpredictable internet bandwidth, including packet loss and disconnections.
- User interfaces struggling with large data volumes.
- The need for scalable solutions capable of supporting millions of concurrent users.
With Intelligent Streaming, Lightstreamer dynamically adjusts the data flow to match each user’s network conditions, ensuring all users stay in sync regardless of connection quality. By resampling and conflating data on the fly, it delivers real-time updates with adaptive throttling, effectively handling packet loss without buffering delays. It also manages disconnections and reconnections seamlessly, keeping your users connected and up-to-date.
The rich set of supplied client libraries makes it easy to consume real-time Kafka data across a variety of platforms and languages.
Connect millions of clients without compromising performance. Fanout real-time messages published on Kafka topics efficiently, preventing overload on the Kafka brokers. Check out the load tests performed on the Lightstreamer Kafka Connector vs. plain Kafka.
The Lightstreamer Kafka Connector provides a wide range of powerful features, including firewall and proxy traversal, server-side filtering, advanced topic mapping, record evaluation, Schema Registry support, push notifications, and maximum security. Explore more details.
The Lightstreamer Kafka Connector seamlessly integrates the Lightstreamer Broker with Confluent Cloud and Confluent Platform. While existing producers and consumers continue connecting directly to the Kafka broker, internet-based applications connect through the Lightstreamer Broker, which efficiently handles last-mile data delivery. Authentication and authorization for internet-based clients are managed via a custom Metadata Adapter, created using the Metadata Adapter API Extension and integrated into the Lightstreamer Broker.
Both the Kafka Connector and the Metadata Adapter run in-process with the Lightstreamer Broker, which can be deployed in the cloud or on-premises.
The Lightstreamer Kafka Connector can operate in two distinct modes: as a direct Kafka client or as a Kafka Connect connector.
In this mode, the Lightstreamer Kafka Connector uses the Kafka client API to communicate directly with the Kafka broker. This approach is typically lighter, faster, and more scalable, as it avoids the additional layer of Kafka Connect. All sections of this documentation refer to this mode, except for the section specifically dedicated to the Sink Connector.
In this mode, the Lightstreamer Kafka Connector integrates with the Kafka Connect framework, acting as a sink connector. While this introduces an additional messaging layer, there are scenarios where the standardized deployment provided by Kafka Connect is required. For more details on using the Lightstreamer Kafka Connector as a Kafka Connect sink connector, please refer to this section: Kafka Connect Lightstreamer Sink Connector.
To efficiently showcase the functionalities of the Lightstreamer Kafka Connector, we have prepared an accessible quickstart application located in the examples/vendors/confluent/quickstart-confluent/
directory. This streamlined application facilitates real-time streaming of data from a Kafka topic directly to a web interface. It leverages a modified version of the Stock List Demo, specifically adapted to demonstrate Kafka integration. This setup is designed for rapid comprehension, enabling you to swiftly grasp and observe the connector's performance in a real-world scenario.
The diagram above illustrates how, in this setup, a stream of simulated market events is channeled from Kafka to the web client via the Lightstreamer Kafka Connector.
To provide a complete stack, the app is based on Docker Compose. The Docker Compose file comprises the following services:
- broker: the Kafka broker, based on the Official Confluent Docker Image for Kafka (Community Version)
- kafka-connector: Lightstreamer Broker with the Kafka Connector, based on the Lightstreamer Kafka Connector Docker image example, which also includes a web client mounted on
/lightstreamer/pages/QuickStart
- producer: a native Kafka Producer, based on the provided
Dockerfile
file from thequickstart-producer
sample client
-
Make sure you have Docker, Docker Compose, and a JDK (Java Development Kit) v17 or newer installed on your local machine.
-
From the
examples/vendors/confluent/quickstart-confluent/
folder, run the following:$ ./start.sh ... ⠏ Network quickstart_default Created ✔ Container broker Started ✔ Container producer Started ✔ Container kafka-connector Started ... Services started. Now you can point your browser to http://localhost:8080/QuickStart to see real-time data. ...
-
Once all containers are ready, point your browser to http://localhost:8080/QuickStart.
-
After a few moments, the user interface starts displaying the real-time stock data.
-
To shutdown Docker Compose and clean up all temporary resources:
$ ./stop.sh
This section will guide you through deploying the Kafka Connector quickly and easily with Confluent Cloud.
Deployment options:
-
Manual Deployment: Download and configure the Lightstreamer Broker and Kafka Connector from their respective archives.
-
Docker-based Deployment: Build and configure a Docker image that seamlessly integrates the Lightstreamer Broker and the Kafka Connector.
In both cases, you'll need a Confluent Cloud account.
Tip
Don't have a Confluent Cloud account yet? Start your free trial of Confluent Cloud today. New signups receive $400 to spend during their first 30 days.
- JDK (Java Development Kit) v17 or newer
- Lightstreamer Broker (also referred to as Lightstreamer Server) v7.4.2 or newer. Follow the installation instructions in the
LS_HOME/GETTING_STARTED.TXT
file included in the downloaded package.
Download the deployment archive lightstreamer-kafka-connector-<version>.zip
from the Releases page. Alternatively, check out this repository and execute the following command from the kafka-connector-project
folder:
$ ./gradlew adapterDistZip
which generates the archive file under the kafka-connector-project/kafka-connector/build/distributions
folder.
Then, unpack it into the adapters
folder of the Lightstreamer Server installation:
$ unzip lightstreamer-kafka-connector-<version>.zip -d LS_HOME/adapters
Finally, check that the Lightstreamer layout looks like the following:
LS_HOME/
...
├── adapters
│ ├── lightstreamer-kafka-connector-<version>
│ │ ├── LICENSE
│ │ ├── README.md
│ │ ├── adapters.xml
│ │ ├── javadoc
│ │ ├── lib
│ │ ├── log4j.properties
│ └── welcome_res
...
├── audit
├── bin
...
Before starting the Kafka Connector, you need to properly configure the LS_HOME/adapters/lightstreamer-kafka-connector-<version>/adapters.xml
file. For convenience, the package comes with a predefined configuration (the same used in the Quick Start app), which can be customized in all its aspects as per your requirements. Of course, you may add as many different connection configurations as desired to fit your needs.
To quickly complete the installation and verify the successful integration with Kafka, edit the data_provider block QuickStartConfluentCloud
in the file as follows:
-
Update the
bootstrap.servers
parameter with the connection string of Kafka:<param name="bootstrap.servers">kafka.connection.string</param>
-
Update the
authentication.username
andauthentication.password
parameters with the API key and API secret linked to your Confluent Cloud account:<param name="authentication.username">API.key</param> <param name="authentication.password">API.secret</param>
-
Configure topic and record mapping.
To enable a generic Lightstreamer client to receive real-time updates, it needs to subscribe to one or more items. Therefore, the Kafka Connector provides suitable mechanisms to map Kafka topics to Lightstreamer items effectively.
The
QuickStartConfluentCloud
factory configuration comes with a straightforward mapping defined through the following settings:-
An item template:
<param name="item-template.stock">stock-#{index=KEY}</param>
which defines the general format name of the items a client must subscribe to to receive updates from the Kafka Connector. The extraction expression syntax used here - denoted within
#{...}
- permits the clients to specify filtering values to be compared against the actual contents of a Kafka record, evaluated through Extraction Keys used to extract each part of a record. In this case, theKEY
predefined constant extracts the key part of Kafka records. -
A topic mapping:
<param name="map.stocks.to">item-template.stock</param>
which maps the topic
stocks
to the provided item template.
This configuration instructs the Kafka Connector to analyze every single event published to the topic
stocks
and check if it matches against any item subscribed by the client as:stock-[index=1]
: an item with theindex
parameter bound to a record key equal to1
stock-[index=2]
: an item with theindex
parameter bound to a record key equal to2
- ...
The Kafka Connector will then route the event to all matched items.
In addition, the following section defines how to map the record to the tabular form of Lightstreamer fields, by using the aforementioned Extraction Keys. In this case, the
VALUE
predefined constant extracts the value part of Kafka records.<param name="field.stock_name">#{VALUE.name}</param> <param name="field.last_price">#{VALUE.last_price}</param> <param name="field.ask">#{VALUE.ask}</param> <param name="field.ask_quantity">#{VALUE.ask_quantity}</param> <param name="field.bid">#{VALUE.bid}</param> <param name="field.bid_quantity">#{VALUE.bid_quantity}</param> <param name="field.pct_change">#{VALUE.pct_change}</param> <param name="field.min">#{VALUE.min}</param> <param name="field.max">#{VALUE.max}</param> <param name="field.ref_price">#{VALUE.ref_price}</param> <param name="field.open_price">#{VALUE.open_price}</param> <param name="field.item_status">#{VALUE.item_status}</param>
This way, the routed event is transformed into a flat structure, which can be forwarded to the clients.
-
-
Optionally, customize the
LS_HOME/adapters/lightstreamer-kafka-connector-<version>/log4j.properties
file (the current settings produce thequickstart-confluent.log
file).
You can get more details about all possible settings in the Configuration section.
To start the Kafka Connector, run the following fom the LS_HOME/bin/unix-like
directory:
$ ./background_start.sh
Then, point your browser to http://localhost:8080 and see a welcome page with some demos running out of the box.
- JDK (Java Development Kit) v17 or newer
- Docker
To build the Docker Image of the Lightstreamer Kafka Connector, follow the steps:
-
Copy the factory adapters.xml file into the examples/docker/resources folder.
-
Custome the file by editing the data provider block
QuickStartConfluentCloud
as explained in the previous Configure section. -
Optionally, provide a minimal version of the
log4j.properties
file similar to the following:log4j.logger.com.lightstreamer.kafka.adapters.pub.KafkaConnectorMetadataAdapter log4j.logger.org.apache.kafka=WARN, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] [%-10c{1}] %-5p %m%n log4j.appender.stdout.Target=System.out # QuickStartConfluentCloud logger log4j.logger.QuickStartConfluentCloud=INFO, stdout
and put it in the examples/docker/resources folder.
-
Run the following from the /examples/docker directory:
$ ./build.sh
For more insights on creating a Docker Image for the Lightstreamer Kafka Connector, check out examples/docker.
To launch the container, run the following from the examples/docker directory:
$ docker run --name kafka-connector -d -p 8080:8080 lightstreamer-kafka-connector-<version>
Then, point your browser to http://localhost:8080 and see a welcome page with some demos running out of the box.
Once the Lightstreamer Kafka Connector is up and running—whether deployed manually or using Docker—it's time to publish events and connect a Lightstreamer consumer to experience a basic end-to-end streaming flow in action.
The examples/quickstart-producer
folder contains a simple native Kafka producer designed to publish simulated market events for the QuickStart app.
Before launching the producer, you first need to build it. Open a new shell from the folder and execute the following command:
$ cd examples/quickstart-producer
$ ./gradlew build
This command generates the quickstart-producer-all.jar
file under the build/libs
folder.
Next, create a properties file that includes encryption and authentication settings as follows:
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<API.key>" password="<API.secret>";
sasl.mechanism=PLAIN
...
Replace <API.key>
and <API.secret>
with the API key and API secret linked to your Confluent Cloud account, which you can generate using the Confluent CLI or from the Confluent Cloud Console.
Now, launch the producer:
$ java -jar build/libs/quickstart-producer-all.jar --bootstrap-servers <kafka.connection.string> --topic stocks --config-file <path/to/config/file>
After starting the publisher, you can connect a client application to consume real-time data and display it in its frontend. Below, we'll demonstrate a browser-based example using HTML and JavaScript, and a Java example. However, you are encouraged to explore any of the Lightstreamer client SDKs for developing clients in other environments and languages, including iOS, Android, Python, and more.
Download the provided sample web client, based on HTML and JavaScript. Simply open the index.html
file and watch real-time updates populate the frontend immediately.
As shown in the source code, consuming live data from the Kafka Connector involves just a few steps:
-
Establishing a Connection: To connect to the Lightstreamer Kafka Connector, a
LightstreamerClient
object is created to connect to the server athttp://localhost:8080
and specifies the adapter setKafkaConnector
, as configured on the server side through theid
attribute of theadapters_conf
root tag in theadapters.xml
file.var lsClient = new LightstreamerClient("http://localhost:8080", "KafkaConnector"); ... lsClient.connect();
-
Setting up the Data Grid: To visualize real-time updates, a
StaticGrid
object is instantiated and configured to display data from aSubscription
into statically prepared HTML rows. This is a simple widget provided by the Lightstreamer client library for demonstration purposes. You are free to use any existing JavaScript framework or library to display the data.var stocksGrid = new StaticGrid("stocks", true); stocksGrid.setAutoCleanBehavior(true, false); stocksGrid.addListener({ onVisualUpdate: function (key, info, pos) { ... var stockIndex = key.substring(13, key.indexOf(']')); var color = (stockIndex % 2 == 1) ? "#fff" : "#e9fbf2"; info.setAttribute("#fff7d5", color, "backgroundColor"); } });
-
Subscribing to Live Data: To create a subscription, a
Subscription
object is created and configured inMERGE
mode with a list of items and fields to subscribe to, extracted from theStaticGrid
.The subscription references the
QuickStartConfluentCloud
data adapter name, as configured on the server side through thename
attribute of thedata_provider
element in theadapters.xml
file. TheStaticGrid
is attached as a listener to the subscription to receive and display updates.var stockSubscription = new Subscription("MERGE", stocksGrid.extractItemList(), stocksGrid.extractFieldList()); stockSubscription.setDataAdapter("QuickStart"); stockSubscription.addListener(stocksGrid); lsClient.subscribe(stockSubscription);
In addition to the browser-based consumer above, you can set up a Java consumer. The kafka-connector-utils
submodule hosts a simple Lightstreamer Java client that can be used to test the consumption of Kafka events from any Kafka topics.
Before launching the consumer, you first need to build it from the kafka-connector-project
folder with the command:
$ ./gradlew kafka-connector-utils:build
This command generates the lightstreamer-kafka-connector-utils-consumer-all-<version>.jar
file under the kafka-connector-project/kafka-connector-utils/build/libs
folder.
Then, launch it with:
$ java -jar kafka-connector-utils/build/libs/lightstreamer-kafka-connector-utils-consumer-all-<version>.jar --address http://localhost:8080 --adapter-set KafkaConnector --data-adapter QuickStartConfluentCloud --items stock-[index=1],stock-[index=2],stock-[index=3] --fields stock_name,ask,bid,min,max
As you can see, you need to specify a few parameters:
--address
: the Lightstreamer Server address--adapter-set
: the name of the requested Adapter Set, which triggers Lightstreamer to activate the Kafka Connector deployed into theadapters
folder--data-adapter
: the name of the requested Data Adapter, which identifies the selected Kafka connection configuration--items
: the list of items to subscribe to--fields
: the list of requested fields for the items
As already anticipated, the Kafka Connector is a Lightstreamer Adapter Set, which means it is made up of a Metadata Adapter and one or more Data Adapters, whose settings are defined in the LS_HOME/adapters/lightstreamer-kafka-connector-<version>/adapters.xml
file.
The following sections will guide you through the configuration details.
Mandatory. The id
attribute of the adapters_conf
root tag defines the Kafka Connector Identifier, which will be used by the Clients to request this Adapter Set while setting up the connection to a Lightstreamer Server through a LightstreamerClient object.
The factory value is set to KafkaConnector
for convenience, but you are free to change it as per your requirements.
Example:
<adapters_conf id="KafkaConnector">
Mandatory. The adapter_class
tag, specified inside the metadata_provider block, defines the Java class name of the Metadata Adapter.
The factory value is set to com.lightstreamer.kafka.adapters.pub.KafkaConnectorMetadataAdapter
, which implements the internal business of the Kafka Connector.
It is possible to provide a custom implementation by extending this class: just package your new class in a jar file and deploy it along with all required dependencies into the LS_HOME/adapters/lightstreamer-kafka-connector-<version>/lib
folder.
See the Customize the Kafka Connector Metadata Class section for more details.
Example:
...
<metadata_provider>
...
<adapter_class>your.custom.class</adapter_class>
...
</metadata_provider>
...
Mandatory. The path of the reload4j configuration file, relative to the deployment folder (LS_HOME/adapters/lightstreamer-kafka-connector-<version>
).
The parameter is specified inside the metadata_provider block.
The factory value points to the predefined file LS_HOME/adapters/lightstreamer-kafka-connector-<version>/log4g.properties
.
Example:
...
<metadata_provider>
...
<param name="logging.configuration.path">log4j.properties</param>
...
</metadata_provider>
...
The Kafka Connector allows the configuration of separate independent connections to different Kafka brokers/clusters.
Every single connection is configured via the definition of its own Data Adapter through the data_provider block. At least one connection must be provided.
Since the Kafka Connector manages the physical connection to Kafka by wrapping an internal Kafka Consumer, several configuration settings in the Data Adapter are identical to those required by the usual Kafka Consumer configuration.
Optional. The name
attribute of the data_provider
tag defines Kafka Connection Name, which will be used by the Clients to request real-time data from this specific Kafka connection through a Subscription object.
Furthermore, the name is also used to group all logging messages belonging to the same connection.
Tip
For every Data Adapter connection, add a new logger and its relative file appender to log4j.properties
, so that you can log to dedicated files all the interactions pertinent to the connection with the Kafka cluster and the message retrieval operations, along with their routing to the subscribed items.
For example, the factory logging configuration provides the logger QuickStartConfluentCloud
to print every log messages relative to the QuickStartConfluent
connection:
...
# QuickStartConfluentCloud logger
log4j.logger.QuickStartConfluentCloud=INFO, QuickStartConfluentCloudFile
log4j.appender.QuickStartConfluentCloudFile=org.apache.log4j.RollingFileAppender
log4j.appender.QuickStartConfluentCloudFile.layout=org.apache.log4j.PatternLayout
log4j.appender.QuickStartConfluentCloudFile.layout.ConversionPattern=[%d] [%-10c{1}] %-5p %m%n
log4j.appender.QuickStartConfluentCloudFile.File=../../logs/quickstart-confluent.log
Example:
<data_provider name="BrokerConnection">
Default value: DEFAULT
, but only one DEFAULT
configuration is permitted.
Mandatory. The adapter_class
tag defines the Java class name of the Data Adapter. DO NOT EDIT IT!.
Factory value: com.lightstreamer.kafka.adapters.KafkaConnectorDataAdapter
.
Optional. Enable this connection configuration. Can be one of the following:
true
false
If disabled, Lightstreamer Server will automatically deny every subscription made to this connection.
Default value: true
.
Example:
<param name="enable">false</param>
Mandatory. The Kafka Cluster bootstrap server endpoint expressed as the list of host/port pairs used to establish the initial connection.
The parameter sets the value of the bootstrap.servers
key to configure the internal Kafka Consumer.
Example:
<param name="bootstrap.servers">broker:29092,broker:29093</param>
Optional. The name of the consumer group this connection belongs to.
The parameter sets the value for the group.id
key to configure the internal Kafka Consumer.
Default value: Kafka Connector Identifier + Connection Name + Randomly generated suffix.
<param name="group.id">kafka-connector-group</param>
A TCP secure connection to Kafka is configured through parameters with the prefix encryption
.
Optional. Enable encryption of this connection. Can be one of the following:
true
false
Default value: false
.
Example:
<param name="encryption.enable">true</param>
Optional. The SSL protocol to be used. Can be one of the following:
TLSv1.2
TLSv1.3
Default value: TLSv1.3
when running on Java 11 or newer, TLSv1.2
otherwise.
Example:
<param name="encryption.protocol">TLSv1.2</param>
Optional. The list of enabled secure communication protocols.
Default value: TLSv1.2,TLSv1.3
when running on Java 11 or newer, TLSv1.2
otherwise.
Example:
<param name="encryption.enabled.protocols">TLSv1.3</param>
Optional. The list of enabled secure cipher suites.
Default value: all the available cipher suites in the running JVM.
Example:
<param name="encryption.cipher.suites">TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA</param>
Optional. Enable hostname verification. Can be one of the following:
true
false
Default value: false
.
Example:
<param name="encryption.hostname.verification.enable">true</param>
Optional. The path of the trust store file, relative to the deployment folder (LS_HOME/adapters/lightstreamer-kafka-connector-<version>
).
Example:
<param name="encryption.truststore.path">secrets/kafka-connector.truststore.jks</param>
Optional. The password of the trust store.
If not set, checking the integrity of the trust store file configured will not be possible.
Example:
<param name="encryption.truststore.password">kafka-connector-truststore-password</param>
Optional. Enable a key store. Can be one of the following:
true
false
A key store is required if the mutual TLS is enabled on Kafka.
If enabled, the following parameters configure the key store settings:
encryption.keystore.path
encryption.keystore.password
encryption.keystore.key.password
Default value: false
.
Example:
<param name="encryption.keystore.enable">true</param>
Mandatory if key store is enabled. The path of the key store file, relative to the deployment folder (LS_HOME/adapters/lightstreamer-kafka-connector-<version>
).
Example:
<param name="encryption.keystore.path">secrets/kafka-connector.keystore.jks</param>
Optional. The password of the key store.
If not set, checking the integrity of the key store file configured will not be possible.
Example:
<param name="encryption.keystore.password">keystore-password</param>
Optional. The password of the private key in the key store file.
Example:
<param name="encryption.keystore.key.password">kafka-connector-private-key-password</param>
Check out the adapters.xml file of the Quick Start SSL app, where you can find an example of encryption configuration.
Broker authentication is configured through parameters with the prefix authentication
.
Optional. Enable the authentication of this connection against the Kafka Cluster. Can be one of the following:
true
false
Default value: false
.
Example:
<param name="authentication.enable">true</param>
Mandatory if authentication is enabled. The SASL mechanism type. The Kafka Connector accepts the following authentication mechanisms:
PLAIN
(the default value)SCRAM-SHA-256
SCRAM-SHA-512
GSSAPI
In the case of PLAIN
, SCRAM-SHA-256
, and SCRAM-SHA-512
mechanisms, the credentials must be configured through the following mandatory parameters (which are not allowed for GSSAPI
):
authentication.username
: the usernameauthentication.password
: the password
Example:
<param name="authentication.enable">true</param>
<param name="authentication.mechanism">PLAIN</param>
<param name="authentication.username">authorized-kafka-user</param>
<param name="authentication.password">authorized-kafka-user-password</param>
Example:
<param name="authentication.enable">true</param>
<param name="authentication.mechanism">SCRAM-SHA-256</param>
<param name="authentication.username">authorized-kafka-usee</param>
<param name="authentication.password">authorized-kafka-user-password</param>
Example:
<param name="authentication.enable">true</param>
<param name="authentication.mechanism">SCRAM-SHA-512</param>
<param name="authentication.username">authorized-kafka-username</param>
<param name="authentication.password">authorized-kafka-username-password</param>
In the case of GSSAPI
authentication mechanism, the following parameters will be part of the authentication configuration:
-
authentication.gssapi.key.tab.enable
Optional. Enable the use of a keytab. Can be one of the following:
true
false
Default value:
false
. -
authentication.gssapi.key.tab.path
Mandatory if keytab is enabled. The path to the kaytab file, relative to the deployment folder (
LS_HOME/adapters/lightstreamer-kafka-connector-<version>
). -
authentication.gssapi.store.key.enable
Optional. Enable storage of the principal key. Can be one of the following:
true
false
Default value:
false
. -
authentication.gssapi.kerberos.service.name
Mandatory. The name of the Kerberos service.
-
authentication.gssapi.principal
Mandatory if ticket cache is disabled. The name of the principal to be used.
-
authentication.gssapi.ticket.cache.enable
Optional. Enable the use of a ticket cache. Can be one of the following:
true
false
Default value:
false
.
Example:
...
<param name="authentication.enable">true</param>
<param name="authentication.mechanism">GSSAPI</param>
<param name="authentication.gssapi.key.tab.enable">true</param>
<param name="authentication.gssapi.key.tab.path">gssapi/kafka-connector.keytab</param>
<param name="authentication.gssapi.store.key.enable">true</param>
<param name="authentication.gssapi.kerberos.service.name">kafka</param>
<param name="authentication.gssapi.principal">[email protected]</param>
...
Example of configuration with the use of a ticket cache:
<param name="authentication.enable">true</param>
<param name="authentication.mechanism">GSSAPI</param>
<param name="authentication.gssapi.kerberos.service.name">kafka</param>
<param name="authentication.gssapi.ticket.cache.enable">true</param>
Check out the QuickStartConfluentCloud
factory configuration file, where you can find an example of an authentication configuration that uses SASL/PLAIN.
The Kafka Connector can deserialize Kafka records from the following formats:
- Apache Avro
- JSON
- String
- Integer
- Float
and other scalar types (see the complete list).
In particular, the Kafka Connector supports message validation for Avro and JSON, which can be specified through:
- Local schema files
- The Confluent Schema Registry
The Kafka Connector enables the independent deserialization of keys and values, allowing them to have different formats. Additionally:
- Message validation against the Confluent Schema Registry can be enabled separately for the key and value (through
record.key.evaluator.schema.registry.enable
andrecord.value.evaluator.schema.registry.enable
) - Message validation against local schema files must be specified separately for the key and the value (through
record.key.evaluator.schema.path
andrecord.value.evaluator.schema.path
)
Important
For Avro, schema validation is mandatory, therefore either a local schema file must be provided or the Confluent Schema Registry must be enabled.
Optional. Specifies where to start consuming events from:
LATEST
: start consuming events from the end of the topic partitionEARLIEST
: start consuming events from the beginning of the topic partition
The parameter sets the value of the auto.offset.reset
key to configure the internal Kafka Consumer.
Default value: LATEST
.
Example:
<param name="record.consume.from">EARLIEST</param>
Optional. The number of threads to be used for concurrent processing of the incoming deserialized records. If set to -1
, the number of threads will be automatically determined based on the number of available CPU cores.
Default value: 1
.
Example:
<param name="record.consume.with.num.threads">4</param>
Optional but only effective if record.consume.with.num.threads
is set to a value greater than 1
(which includes hte default value). The order strategy to be used for concurrent processing of the incoming deserialized records. Can be one of the following:
-
ORDER_BY_PARTITION
: maintain the order of records within each partition.If you have multiple partitions, records from different partitions can be processed concurrently by different threads, but the order of records from a single partition will always be preserved. This is the default and generally a good balance between performance and order.
-
ORDER_BY_KEY
: maintain the order among the records sharing the same key.Different keys can be processed concurrently by different threads. So, while all records with key "A" are processed in order, and all records with key "B" are processed in order, the processing of "A" and "B" records can happen concurrently and interleaved in time. There's no guaranteed order between records of different keys.
-
UNORDERED
: provide no ordering guarantees.Records from any partition and with any key can be processed by any thread at any time. This offers the highest throughput when an high number of subscriptions is involved, but the order in which records are delivered to Lightstreamer clients might not match the order they were written to Kafka. This is suitable for use cases where message order is not important.
Default value: ORDER_BY_PARTITION
.
Example:
<param name="record.consume.with.order.strategyy">ORDER_BY_KEY</param>
Optional. The format to be used to deserialize respectively the key and value of a Kafka record. Can be one of the following:
AVRO
JSON
STRING
INTEGER
BOOLEAN
BYTE_ARRAY
BYTE_BUFFER
BYTES
DOUBLE
FLOAT
LONG
SHORT
UUID
Default value: STRING
.
Examples:
<param name="record.key.evaluator.type">INTEGER</param>
<param name="record.value.evaluator.type">JSON</param>
Mandatory if evaluator type is AVRO
and the Confluent Schema Registry is disabled. The path of the local schema file relative to the deployment folder (LS_HOME/adapters/lightstreamer-kafka-connector-<version>
) for message validation respectively of the key and the value.
Examples:
<param name="record.key.evaluator.schema.path">schema/record_key.avsc</param>
<param name="record.value.evaluator.schema.path">schemas/record_value.avsc</param>
Mandatory if evaluator type is AVRO
and no local schema paths are specified. Enable the use of the Confluent Schema Registry for validation respectively of the key and the value. Can be one of the following:
true
false
Default value: false
.
Examples:
<param name="record.key.evaluator.schema.registry.enable">true</param>
<param name="record.value.evaluator.schema.registry.enable">true</param>
Optional. The error handling strategy to be used if an error occurs while extracting data from incoming deserialized records. Can be one of the following:
IGNORE_AND_CONTINUE
: ignore the error and continue to process the next recordFORCE_UNSUBSCRIPTION
: stop processing records and force unsubscription of the items requested by all the clients subscribed to this connection (see the Client Side Error Handling section)
Default value: IGNORE_AND_CONTINUE
.
Example:
<param name="record.extraction.error.strategy">FORCE_UNSUBSCRIPTION</param>
The Kafka Connector allows the configuration of several routing and mapping strategies, thus enabling the convey of Kafka events streams to a potentially huge amount of devices connected to Lightstreamer with great flexibility.
The Data Extraction Language is the ad hoc tool provided for in-depth analysis of Kafa records to extract data that can be used for the following purposes:
- Mapping records to Lightstreamer fields
- Filtering routing to the designated Lightstreamer items
To write an extraction expression, the Data Extraction Language provides a pretty minimal syntax with the following basic rules:
-
Expressions must be enclosed within
#{...}
-
Expressions use Extraction Keys, a set of predefined constants that reference specific parts of the record structure:
#{KEY}
: the key#{VALUE}
: the value#{TOPIC}
: the topic#{TIMESTAMP}
: the timestamp#{PARTITION}
: the partition#{OFFSET}
: the offset
-
Expressions use the dot notation to access attributes or fields of record keys and record values serialized in JSON or Avro formats:
KEY.attribute1Name.attribute2Name... VALUE.attribute1Name.attribute2Name...
Important
Currently, it is required that the top-level element of either a record key or record value is:
Such a constraint may be removed in a future version of the Kafka Connector.
-
Expressions use the square notation to access:
-
Indexed attributes:
KEY.attribute1Name[i].attribute2Name... VALUE.attribute1Name[i].attribute2Name...
where
i
is a 0-indexed value. -
Key-based attributes:
KEY.attribute1Name['keyName'].attribute2Name... VALUE.attribute1Name['keyName'].attribute2Name...
where
keyName
is a string value.
-
Tip
For JSON format, accessing a child attribute using either dot notation or square bracket notation is equivalent:
VALUE.myProperty.myChild.childProperty
VALUE.myProperty['myChild'].childProperty
-
Expressions must evaluate to a scalar value
In case of non-scalar value, an error will be thrown during the extraction process and handled as per the configured strategy.
To configure a simple routing of Kafka event streams to Lightstreamer items, use at least one map.TOPIC_NAME.TO
parameter. The general format is:
<param name="map.TOPIC_NAME.to">item1,item2,itemN,...</param>
which defines the mapping between the source Kafka topic (TOPIC_NAME
) and the target items (item1
, item2
, itemN
, etc.).
This configuration enables the implementation of various routing scenarios, as shown by the following examples:
-
One-to-one
<param name="map.sample-topic.to">sample-item</param>
This is the most straightforward scenario one may think of: every record published to the Kafka topic
sample-topic
will simply be routed to the Lightstreamer itemsample-item
. Therefore, messages will be immediately broadcasted as real-time updates to all clients subscribed to such an item. -
Many-to-one
<param name="map.sample-topic1.to">sample-item</param> <param name="map.sample-topic2.to">sample-item</param> <param name="map.sample-topic3.to">sample-item</param>
With this scenario, it is possible to broadcast to all clients subscribed to a single item (
sample-item
) every message published to different topics (sample-topic1
,sample-topic2
,sample-topic3
). -
One-to-many
The one-to-many scenario is also supported, though it's often unnecessary. Lightstreamer already provides full control over individual items, such as differentiating access authorization for various users or subscribing with different maximum update frequencies, without requiring data replication across multiple items.
<param name="map.sample-topic.to">sample-item1,sample-item2,sample-item3</param>
Every record published to the Kafka topic
sample-topic
will be routed to the Lightstreamer itemssample-item1
,sample-item2
, andsample-item3
.
Optional. Enable the TOPIC_NAME
part of the map.TOPIC_NAME.to
parameter to be treated as a regular expression rather than of a literal topic name.
This allows for more flexible routing, where messages from multiple topics matching a specific pattern can be directed to the same Lightstreamer item(s) or item template(s).
Can be one of the following:
true
false
Default value: false
.
Example:
<param name="map.topic_\d+.to">item</param>
<param name="map.regex.enable">true</param>
To forward real-time updates to the Lightstreamer clients, a Kafka record must be mapped to Lightstreamer fields, which define the schema of any Lightstreamer item.
To configure the mapping, you define the set of all subscribable fields through parameters with the prefix field.
:
<param name="field.fieldName1">extractionExpression1</param>
<param name="field.fieldName2">extractionExpression2<param>
...
<param name="field.fieldNameN">extractionExpressionN<param>
...
The configuration specifies that the field fieldNameX
will contain the value extracted from the deserialized Kafka record through the extractionExpressionX
, written using the Data Extraction Language. This approach makes it possible to transform a Kafka record of any complexity to the flat structure required by Lightstreamer.
The QuickStartConfluentCloud
factory configuration shows a basic example, where a simple direct mapping has been defined between every attribute of the JSON record value and a Lightstreamer field with the corresponding name. Of course, thanks to the Data Extraction Language, more complex mapping can be employed.
...
<param name="field.timestamp">#{VALUE.timestamp}</param>
<param name="field.time">#{VALUE.time}</param>
<param name="field.stock_name">#{VALUE.name}</param>
<param name="field.last_price">#{VALUE.last_price}</param>
<param name="field.ask">#{VALUE.ask}</param>
<param name="field.ask_quantity">#{VALUE.ask_quantity}</param>
<param name="field.bid">#{VALUE.bid}</param>
<param name="field.bid_quantity">#{VALUE.bid_quantity}</param>
<param name="field.pct_change">#{VALUE.pct_change}</param>
<param name="field.min">#{VALUE.min}</param>
<param name="field.max">#{VALUE.max}</param>
<param name="field.ref_price">#{VALUE.ref_price}</param>
<param name="field.open_price">#{VALUE.open_price}</param>
<param name="field.item_status">#{VALUE.item_status}</param>
..
Optional. Normally, if a field mapping fails during the extraction from the Kafka record because of an issue with the data, it leads to the entire record being discarded or even cause the subscription to be terminated, depending on the record.extraction.error.strategy
setting. By enabling this parameter, the connector becomes more resilient to such errors. If a field mapping fails, that specific field's value will simply be omitted from the update sent to Lightstreamer clients, while other successfully mapped fields from the same record will still be delivered. This allows for partial updates even in the presence of data inconsistencies or transient extraction issues.
Can be one of the following:
true
false
Default value: false
.
Example:
<param name="fields.skip.failed.mapping.enable">true</param>
Besides mapping topics to statically predefined items, the Kafka Connector allows you to configure the item templates, which specify the rules needed to decide if a message can be forwarded to the items specified by the clients, thus enabling a filtered routing. The item template leverages the Data Extraction Language to extract data from Kafka records and match them against the parameterized subscribed items.
To configure an item template, use the item-template.TEMPLATE_NAME
parameter:
<param name="item-template.TEMPLATE_NAME">ITEM_PREFIX-EXPRESSIONS</param>
Then, map one (or more) topic to the template by referecing it in the map.TOPIC_NAME.to
parameter:
<param name="map.TOPIC_NAME.to">item-template.TEMPLATE_NAME</param>
Tip
It is allowed to mix references to simple item names and item templates in the same topic mapping configuration:
<param name="map.sample-topic.to">item-template.template1,item1,item2</param>
The item template is made of:
-
ITEM_PREFIX
: the prefix of the item name -
EXPRESSIONS
: a sequence of extraction expressions, which define filtering rules specified as:#{paramName1=<extractionExpression1>,paramName2=<extractionExpression2>,...}
where
paramNameX
is a bind parameter to be specified by the clients and whose actual value will be extracted from the deserialized Kafka record by evaluating the<extractionExpressionX>
expression (written using the Data Extraction Language).
To activate the filtered routing, the Lightstreamer clients must subscribe to a parameterized item that specifies a filtering value for every bind parameter defined in the template:
ITEM_PREFIX-[paramName1=filterValue_1,paramName2=filerValue_2,...]
Upon consuming a message, the Kafka Connector expands every item template addressed by the record topic by evaluating each extraction expression and binding the extracted value to the associated parameter. The expanded template will result as:
ITEM_PREFIX-[paramName1=extractedValue_1,paramName2=extractedValue_2,...]
Finally, the message will be mapped and routed only in case the subscribed item completely matches the expanded template or, more formally, the following is true:
filterValue_X == extractValue_X for every paramName_X
Consider the following configuration:
<param name="item-template.by-name">user-#{firstName=VALUE.name,lastName=VALUE.surname}</param>
<param name="item-template.by-age">user-#{age=VALUE.age}</param>
<param name="map.user.to">item-template.by-name,item-template.by-age</param>
which specifies how to route records published from the topic user
to the item templates defined to extract some personal data.
Let's suppose we have three different Lightstreamer clients:
- Client A subscribes to the following parameterized items:
- SA1
user-[firstName=James,lastName=Kirk]
for receiving real-time updates relative to the userJames Kirk
. - SA2
user-[age=45]
for receiving real-time updates relative to any 45 year-old user.
- SA1
- Client B subscribes to the parameterized item SB1
user-[firstName=Montgomery,lastName=Scotty]
for receiving real-time updates relative to the userMontgomery Scotty
. - Client C subscribes to the parameterized item SC1
user-[age=37]
for receiving real-time updates relative to any 37 year-old user.
Now, let's see how filtered routing works for the following incoming Kafka records published to the topic user
:
-
Record 1:
{ ... "name": "James", "surname": "Kirk", "age": 37, ... }
Template Expansion Matched Subscribed Item Routed to Client by-name
user-[firstName=James,lastName=Kirk]
SA1 Client A by-age
user-[age=37]
SC1 Client C -
Record 2:
{ ... "name": "Montgomery", "surname": "Scotty", "age": 45 ... }
Template Expansion Matched Subscribed Item Routed to Client by-name
user-[firstName=Montgomery,lastName=Scotty]
SB1 Client B by-age
user-[age=45]
SA2 Client A -
Record 3:
{ ... "name": "Nyota", "surname": "Uhura", "age": 37, ... }
Template Expansion Matched Subscribed Item Routed to Client by-name
user-[firstName=Nyota,lastName=Uhura]
None None by-age
user-[age=37]
SC1 Client C
A Schema Registry is a centralized repository that manages and validates schemas, which define the structure of valid messages.
The Kafka Connector supports integration with the Confluent Schema Registry through the configuration of parameters with the prefix schema.registry
.
Mandatory if the Confluent Schema Registry is enabled. The URL of the Confluent Schema Registry.
Example:
<param name="schema.registry.url">http//localhost:8081</param>
An encrypted connection is enabled by specifying the https
protocol (see the next section).
Example:
<param name="schema.registry.url">https://localhost:8084</param>
Basic HTTP authentication mechanism is supported through the configuration of parameters with the prefix schema.basic.authentication
.
Optional. Enable Basic HTTP authentication of this connection against the Schema Registry. Can be one of the following:
true
false
Default value: false
.
Example:
<param name="schema.registry.basic.authentication.enable">true</param>
Mandatory if Basic HTTP Authentication is enabled. The credentials.
schema.registry.basic.authentication.username
: the usernameschema.registry.basic.authentication.password
: the password
Example:
<param name="schema.registry.basic.authentication.username">authorized-schema-registry-user</param>
<param name="schema.registry.basic.authentication.password">authorized-schema-registry-user-password</param>
A secure connection to the Confluent Schema Registry can be configured through parameters with the prefix schema.registry.encryption
, each one having the same meaning as the homologous parameters defined in the Encryption Parameters section:
schema.registry.encryption.protocol
(see encryption.protocol)schema.registry.encryption.enabled.protocols
(see encryption.enabled.protocols)schema.registry.encryption.cipher.suites
(see encryption.cipher.suites)schema.registry.encryption.truststore.path
(see encryption.truststore.path)schema.registry.encryption.truststore.password
(see encryption.truststore.password)schema.registry.encryption.hostname.verification.enable
(see encryption.hostname.verification.enable)schema.registry.encryption.keystore.enable
(see encryption.keystore.enable)schema.registry.encryption.keystore.path
(see encryption.keystore.path)schema.registry.encryption.keystore.password
(see encryption.keystore.password)schema.registry.encryption.keystore.key.password
(see encryption.keystore.key.password)
Example:
<!-- Set the Confluent Schema Registry URL. The https protcol enable encryption parameters -->
<param name="schema.registry.url">https//localhost:8084</param>
<!-- Set general encryption settings -->
<param name="schema.registry.encryption.enabled.protocols">TLSv1.3</param>
<param name="schema.registry.encryption.cipher.suites">TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA</param>
<param name="schema.registry.encryption.hostname.verification.enable">true</param>
<!-- If required, configure the trust store to trust the Confluent Schema Registry certificates -->
<param name="schema.registry.encryption.truststore.path">secrets/secrets/kafka.connector.schema.registry.truststore.jks</param></param>
<!-- If mutual TLS is enabled on the Confluent Schema Registry, enable and configure the key store -->
<param name="schema.registry.encryption.keystore.enable">true</param>
<param name="schema.registry.encryption.keystore.path">secrets/kafka-connector.keystore.jks</param>
<param name="schema.registry.encryption.keystore.password">kafka-connector-password</param>
<param name="schema.registry.encryption.keystore.key.password">schemaregistry-private-key-password</param>
Check out the adapters.xml file of the Quick Start Schema Registry app, where you can find an example of Schema Registry settings.
When a client sends a subscription to the Kafka Connector, several error conditions can occur:
- Connection issues: the Kafka broker may be unreachable due to network problems or an incorrect configuration of the
bootstrap.servers
parameter. - Non-existent topics: none of the Kafka topics mapped in the record routing configurations exist in the broker.
- Data extraction: issues may arise while extracting data from incoming records and the
record.extraction.error.strategy
parameter is set toFORCE_UNSUBSCRIPTION
.
In these scenarios, the Kafka Connector triggers the unsubscription from all the items that were subscribed to the target connection. A client can be notified about the unsubscription event by implementing the onUnsubscription
event handler, as shown in the following Java code snippet:
subscription.addSubscriptionListener(new SubscriptionListener() {
...
public void onUnsubscription() {
// Manage the unscription event.
}
...
});
If you have any specific need to customize the Kafka Connector Metadata Adapter class (e.g, for implementing custom authentication and authorization logic), you can provide your implementation by extending the factory class com.lightstreamer.kafka.adapters.pub.KafkaConnectorMetadataAdapter
. The class provides the following hook methods, which you can override to add your custom logic:
-
postInit: invoked after the initialization phase of the Kafka Connector Metadata Adapter has been completed
-
onSubscription: invoked to notify that a user has submitted a subscription
-
onUnsubcription: invoked to notify that a Subscription has been removed
To develop your extension, you need the Kafka Connector jar library, which is hosted on Github Packages.
For a Maven project, add the dependency to your pom.xml file:
<dependency>
<groupId>com.lightstreamer.kafka</groupId>
<artifactId>kafka-connector</artifactId>
<version>VERSION</version>
</dependency>
and follow these instructions to configure the repository and authentication.
For a Gradle project, edit your build.gradle file as follows:
-
Add the dependency:
dependencies { implementation group: 'com.lightstreamer.kafka', name: 'kafka-connector', 'version': '<version>' }
-
Add the repository and specify your personal access token:
repositories { mavenCentral() maven { name = "GitHubPackages" url = uri("https://maven.pkg.github.com/lightstreamer/lightstreamer-kafka-connector") credentials { username = project.findProperty("gpr.user") ?: System.getenv("USERNAME") password = project.findProperty("gpr.key") ?: System.getenv("TOKEN") } } }
In the examples/custom-kafka-connector-adapter folder, you can find a sample Gradle project you may use as a starting point to build and deploy your custom extension.
The Lightstreamer Kafka Connector is also available as Sink Connector plugin to be installed into Kafka Connect.
In this scenario, an instance of the connector plugin acts as a Remote Adapter for the Lightstreamer server as depicted in the following picture:
The connector has been developed for Kafka Connect framework version 3.7 and requires JDK (Java Development Kit) v17 or newer.
Before running the connector, you first need to deploy a Proxy Adapter into the Lightstreamer server instance.
- JDK (Java Development Kit) v17 or newer
- Lightstreamer Broker (also referred to as Lightstreamer Server) v7.4.2 or newer. Follow the installation instructions in the
LS_HOME/GETTING_STARTED.TXT
file included in the downloaded package.
-
Create a directory within
LS_HOME/adapters
(choose whatever name you prefer, for examplekafka-connect-proxy
). -
Copy the sample
adapters.xml
file to thekafka-connect-proxy
directory. -
Edit the file as follows:
-
Update the
id
attribute of theadapters_conf
root tag. This settings has the same role of the already documented Kafka Connector Identifier. -
Update the
name
attribute of the data_provider tag. This settings has the same role of the already documented Kafka Connection Name. -
Update the
request_reply_port
parameter with the listening TCP port:<param name="request_reply_port">6661</param>
-
If authentication is required:
-
Set the
auth
parameter toY
:<param name="auth">Y</param>
-
Add the following parameters with the selected credential settings:
<param name="auth.credentials.1.user">USERNAME</param> <param name="auth.credentials.1.password">PASSWORD</param>
-
-
Note
As the id
attribute must be unique across all the Adapter Sets deployed in the same Lightstreamer instance, make sure there is no conflict with any previously installed adapters (for example, the factory adapters.xml file included in the Kafka Connector distribution package).
Finally, check that the Lightstreamer layout looks like the following:
LS_HOME/
...
├── adapters
│ ├── kafka-connect-proxy
│ │ ├── adapters.xml
│ └── welcome_res
...
├── audit
├── bin
...
To manually install the Kafka Connect Lightstreamer Sink Connector to a local Confluent Platform (version 7.6 or later) and run it in standalone mode:
-
Download the connector zip file
lightstreamer-kafka-connect-lightstreamer-<version>.zip
from the Releases page. Alternatively, check out this repository and execute the following command from thekafka-connector-project
folder:$ ./gradlew connectDistZip
which generates the zip file under the
kafka-connector-project/kafka-connector/build/distributions
folder. -
Extract the zip file into the desired location.
For example, you can copy the connector contents into a new directory named
CONFLUENT_HOME/share/kafka/plugins
. -
Edit the worker configuration properties file, ensuring you include the previous path in the
plugin.path
properties, for example:plugins.path=/usr/local/share/kafka/plugins
You may want to use the provided connect-standalone-local.properties file as a starting point.
-
Edit the connector configuration properties file as detailed in the Configuration Reference section.
You may want to use the provided
quickstart-lightstreamer-local.properties
orquickstart-lightstreamer-local.json
files as starting pint. This file provides the set of pre-configured settings to feed Lightstreamer with stock market events, as already shown in the installation instruction for the Lightstreamer Kafka Connector. -
Launch the Lightstreamer Server instance already configured in the Lightstreamer Setup section.
-
Start the Connect worker with:
$ bin/connect-standalone.sh connect-standalone-local.properties quickstart-lightstreamer-local.properties
To verify that an events stream actually flows from Kafka to a Lightstreamer consumer leveraging the same example already shwon in the Start section:
-
Attach a Lightstreamer consumer as specified in the step 2 of the Start section.
-
Make sure that a Schema Registy service is reachable from your local machine.
-
Edit a
producer.properties
file as follows:# JSON serializer with support for the Schema Registry value.serializer=io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer # Schema Registry URL schema.registry.url=http://<schema-registry-address>:<schema-registry-port>
This configuration enables the producer to leverage the Schema Registry, which is required by Kafka Connect when a connector wants to deserialize JSON messages (unless an embedded schema is provided).
-
Publish events as specified in the step 3 of the Start section.
This time, run the publisher passing as further argument the
producer.properties
file:$ java -jar examples/quickstart-producer/build/libs/quickstart-producer-all.jar --bootstrap-servers <kafka.connection.string> --topic stocks --confg-file producer.properties
-
Check the consumed events.
You shouls see real-time updated as shown in the step 4 of the Start section.
If you want to build a local Docker image based on Kafka Connect with the connector plugin, check out the exmaples/docker-kafka-connect folder.
In addition, the examples/quickstart-kafka-connect folder shows how to use that image in Docker Compose through a Kafka Connect version of the Quick Start app.
The Kafka Connect Lightstreamer Sink Connector supports all the converters that come packaged with the Confluent Platform. These include:
- AvroConverter
io.confluent.connect.avro.AvroConverter
- ProtobufConverter
io.confluent.connect.protobuf.ProtobufConverter
- JsonSchemaConverter
io.confluent.connect.json.JsonSchemaConverter
- JsonConverter
org.apache.kafka.connect.json.JsonConverter
- StringConverter
org.apache.kafka.connect.storage.StringConverter
- ByteArrayConverter
org.apache.kafka.connect.converters.ByteArrayConverter
It also supports the built-in primitive converters:
org.apache.kafka.connect.converters.DoubleConverter
org.apache.kafka.connect.converters.FloatConverter
org.apache.kafka.connect.converters.IntegerConverter
org.apache.kafka.connect.converters.LongConverter
org.apache.kafka.connect.converters.ShortConverter
The Kafka Connect Lightstreamer Sink Connector configuration properties are described below.
To use the connector, specify the following setting:
connector.class=com.lightstreamer.kafka.connect.LightstreamerSinkConnector
Due to the one-to-one relationship between a Proxy Adapter instance (deployed into the Lightstreamer server) and a Remote Adapter instance (a task), configuring more than one task in the tasks.max
configuration parameter is pointless.
The Lightstreamer server's Proxy Adapter address to connect to in the format host:port
.
- Type: string
- Default: none
- Importance: high
Example:
lightstreamer.server.proxy_adapter.address=lightstreamer.com:6661
The (optional) amount of time in milliseconds the connctor will wait for the socket connection to be established to the Lightstreamer server's Proxy Adapter before terminating the task. Specify 0
for infinite timeout.
- Type: int
- Default: 5000 (5 seconds)
- Valid Values: [0,...]
- Importance: low
Example:
lightstreamer.server.proxy_adapter.socket.connection.setup.timeout.ms=15000
The (optional) max number of retries to establish a connection to the Lightstreamer server's Proxy Adapter.
- Type: int
- Default: 1
- Valid Values: [0,...]
- Importance: medium
Example:
lightstreamer.server.proxy_adapter.socket.connection.setup.max.retries=5
The (optional) amount of time in milliseconds to wait before retrying to establish a new connection to the Lightstreamer server's Proxy Adapter in case of failure. Only applicable if
lightstreamer.server.proxy_adapter.socket.connection.setup.max.retries
> 0.
- Type: long
- Default: 5000 (5 seconds)
- Valid Values: [0,...]
- Importance: low
Example:
lightstreamer.server.proxy_adapter.socket.connection.setup.retry.delay.ms=15000
The username to use for authenticating to the Lightstreamer server's Proxy Adapter. This setting requires authentication to be enabled in the configuration of the Proxy Adapter.
- Type: string
- Importance: medium
- Default: none
Example:
lightstreamer.server.proxy_adapter.username=lightstreamer_user
The password to use for authenticating to the Lightstreamer server's Proxy Adapter. This setting requires authentication to be enabled in the configuration of the Proxy Adapter.
- Type: string
- Default: none
- Importance: medium
Example:
lightstreamer.server.proxy_adapter.password=lightstreamer_password
The (optional) error handling strategy to be used if an error occurs while extracting data from incoming deserialized records. Can be one of the following:
TERMINATE_TASK
: terminate the task immediatelyIGNORE_AND_CONTINUE
: ignore the error and continue to process the next recordFORWARD_TO_DLQ
: forward the record to the dead letter queue
In particular, the FORWARD_TO_DLQ
value requires a dead letter queue to be configured; otherwise it will fallback to TERMINATE_TASK
.
- Type: string
- Default:
TERMINATE_TASK
- Valid Values: [
IGNORE_AND_CONTINUE
,FORWARD_TO_DLQ
,TERMINATE_TASK
] - Importance: medium
Example:
record.extraction.error.strategy=FORWARD_TO_DLQ
Important
This configuration implements the same concepts already presented in the Record Routing section.
Semicolon-separated list of mappings between source topics and Lightstreamer items. The list should describe a set of mappings in the form:
[topicName1]:[mappingList1];[topicName2]:[mappingList2];...[topicNameN]:[mappingListN]
where every specified topic ([topicNameX]
) is mapped to the item names or item templates specified as comma-separated list ([mappingListX]
).
- Type: string
- Default: none
- Valid Values:
[topicName1]:[mappingList1];
[topicName2]:[mappingList2];... - Importance: high
Example:
topic.mappings=sample-topic:item-template.template1,item1,item2;order-topic:order-item
The configuration above specifes:
- A One-to-many mapping between the topic
sample-topic
and the Lightstreamer itemssample-item1
,sample-item2
, andsample-item3
- Filtered routing through the reference to the item template
template1
(not shown in the snippet) - A One-to-one mapping between the topic
order-topic
and the Lightstreamer itemorder-item
The (optional) flag to enable the topicName
parts of the topic.mappings
parameter to be treated as a regular expression rather than of a literal topic name.
- Type: boolean
- Default: false
- Importance: medium
Example:
topic.mappings.regex.enable=true
Important
This configuration implements the same concepts already presented in the Record Mapping section.
The list of mappings between Kafa records and Ligtstreamer fields. The list should describe a set of subscribable fields in the following form:
[fieldName1]:[extractionExpression1],[fieldName2]:[extractionExpressionN],...,[fieldNameN]:[extractionExpressionN]
where the Lightstreamer field [fieldNameX]
whill hold the data extracted from a deserialized Kafka record using the
Data Extraction Language [extractionExpressionX]
.
- Type: list
- Default: none
- Valid Values:
[fieldName1]:[extractionExpression1],
[fieldName2]:[extractionExpression2],... - Importance: high
Example:
record.mappings=index:#{KEY.}, \
stock_name:#{VALUE.name}, \
last_price:#{VALUE.last_price}
The configuration above specifies the following mappings:
- The record key to the Lightstreamer field
index
- The
name
attribute of the record value to the Lightstreamer fieldstock_name
- The
last_price
of the record value to the Lightstreamer fieldlast_price
By enabling this (optional) parameter, if a field mapping fails, that specific field's value will simply be omitted from the update sent to Lightstreamer clients, while other successfully mapped fields from the same record will still be delivered.
- Type: boolean
- Default: false
- Importance: medium
Example:
record.mappings.skip.failed.enable=true
Important
This configuration implements the same concepts already presented in the Filtered Record Routing section.
Semicolon-separated list of item templates, which specify the rules to enable the filtering routing. The list should describe a set of templates in the following form:
[templateName1]:[template1];[templateName2]:[template2];...;[templateNameN]:[templateN]
where the [templateX]
configures the item template [templaeName]
defining the general format of the items the Lightstremer clients must subscribe to to receive udpdates.
A template is specified in the form:
item-prefix-#{paramName1=extractionExpression1,paramName2=extractionExpression2,...}
To map a topic to an item template, reference it using the item-template
prefix in the topic.mappings
configuration:
topic.mappings=some-topic:item-template.templateName1,item-template.templateName2,...
- Type: string
- Default: null
- Valid Values:
[templateName1]:[template1];
[templateName2]:[template2];... - Importance: high
Example:
item.templates=by-name:user-#{firstName=VALUE.name,lastName=VALUE.surname}; \
by-age:user-#{age=VALUE.age}
topic.mappings=user:item-template.by-name,item-template.by-age
The configuration above specifies how to route records published from the topic user
to the item templates by-name
and by-age
, which define the rules to extract some personal data by leverging Data Extraction Langauge expressions.
The docs folder contains the complete Kafka Connector API Reference, which is useful for implementing custom authentication and authorization logic, as described in the Customize the Kafka Connector Metadata Adapter Class section.
To learn more about the Lightstreamer Broker and the Lightstreamer Kafka Connector, visit their respective product pages.
The examples folder contains all the examples referenced throughout this guide, along with additional resources tailored for specific Kafka broker vendors. Additionally, you can explore the Airport Demo for deeper insights into various usage and configuration options of the Lightstreamer Kafka Connector.
For more examples and live demos, visit our online showcase.