Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update some RST documentation syntax #2429

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
- name: Build artifacts
run: python -m build
- name: Upload built artifacts for testing
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: ${{ env.sdist-artifact }}
# NOTE: Exact expected file names are specified here
Expand All @@ -66,12 +66,8 @@ jobs:
- "3.10"
- "3.11"
- "3.12"
- "pypy3.9"
experimental: [ false ]
include:
- python-version: "pypy3.9"
experimental: true
- python-version: "~3.13.0-0"
experimental: true
steps:
- name: Checkout the source code
uses: actions/checkout@v4
Expand Down Expand Up @@ -111,15 +107,15 @@ jobs:
KAFKA_VERSION: ${{ env.KAFKA_LATEST }}

test-kafka:
name: Tests for Kafka ${{ matrix.kafka-version }}
name: Tests for Kafka ${{ matrix.kafka-version }} (Python ${{ matrix.python-version }})
needs:
- build-sdist
runs-on: ubuntu-latest
timeout-minutes: 10
strategy:
fail-fast: false
matrix:
kafka-version:
- "0.8.2.2"
- "0.9.0.1"
- "0.10.2.2"
- "0.11.0.2"
Expand All @@ -128,6 +124,18 @@ jobs:
- "2.4.0"
- "2.5.0"
- "2.6.0"
python-version: ['3.12']
experimental: [false]
include:
- kafka-version: '0.8.2.2'
experimental: true
python-version: "3.12"
- kafka-version: '0.8.2.2'
experimental: false
python-version: "3.10"
env:
PYTHON_LATEST: ${{ matrix.python-version }}
continue-on-error: ${{ matrix.experimental }}
steps:
- name: Checkout the source code
uses: actions/checkout@v4
Expand All @@ -141,7 +149,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_LATEST }}
python-version: ${{ matrix.python-version }}
cache: pip
cache-dependency-path: |
requirements-dev.txt
Expand Down Expand Up @@ -187,11 +195,11 @@ jobs:
environment: pypi
if: github.event_name == 'release' && github.event.action == 'created'
steps:
- name: Download the sdist artifact
uses: actions/download-artifact@v3
- name: Download the artifacts
uses: actions/download-artifact@v4
with:
name: artifact
path: dist
name: ${{ env.sdist-artifact }}
path: dist/${{ env.sdist-name }}
- name: Publish package to PyPI
uses: pypa/gh-action-pypi-publish@release/v1
with:
Expand Down
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ Some of the major changes include:
* SASL authentication is working (we think)
* Removed several circular references to improve gc on close()

Thanks to all contributors -- the state of the kafka-python community is strong!
Thanks to all contributors -- the state of the kafka-python-ng community is strong!

Detailed changelog are listed below:

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ cov-local: build-integration
@echo "open file://`pwd`/htmlcov/index.html"

# Check the readme for syntax errors, which can lead to invalid formatting on
# PyPi homepage (https://pypi.python.org/pypi/kafka-python)
# PyPi homepage (https://pypi.python.org/pypi/kafka-python-ng)
check-readme:
python setup.py check -rms

Expand Down
195 changes: 119 additions & 76 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,27 @@ Kafka Python client
------------------------

.. image:: https://img.shields.io/badge/kafka-2.6%2C%202.5%2C%202.4%2C%202.3%2C%202.2%2C%202.1%2C%202.0%2C%201.1%2C%201.0%2C%200.11%2C%200.10%2C%200.9%2C%200.8-brightgreen.svg
:target: https://kafka-python.readthedocs.io/en/master/compatibility.html
.. image:: https://img.shields.io/pypi/pyversions/kafka-python.svg
:target: https://pypi.python.org/pypi/kafka-python
.. image:: https://coveralls.io/repos/dpkp/kafka-python/badge.svg?branch=master&service=github
:target: https://coveralls.io/github/dpkp/kafka-python?branch=master
:target: https://kafka-python-ng.readthedocs.io/en/master/compatibility.html
.. image:: https://img.shields.io/pypi/pyversions/kafka-python-ng.svg
:target: https://pypi.python.org/pypi/kafka-python-ng
.. image:: https://coveralls.io/repos/wbarnha/kafka-python-ng/badge.svg?branch=master&service=github
:target: https://coveralls.io/github/wbarnha/kafka-python-ng?branch=master
.. image:: https://img.shields.io/badge/license-Apache%202-blue.svg
:target: https://github.com/dpkp/kafka-python/blob/master/LICENSE
.. image:: https://img.shields.io/pypi/dw/kafka-python.svg
:target: https://pypistats.org/packages/kafka-python
:target: https://github.com/wbarnha/kafka-python-ng/blob/master/LICENSE
.. image:: https://img.shields.io/pypi/dw/kafka-python-ng.svg
:target: https://pypistats.org/packages/kafka-python-ng
.. image:: https://img.shields.io/pypi/v/kafka-python.svg
:target: https://pypi.org/project/kafka-python
.. image:: https://img.shields.io/pypi/implementation/kafka-python
:target: https://github.com/dpkp/kafka-python/blob/master/setup.py
:target: https://pypi.org/project/kafka-python-ng
.. image:: https://img.shields.io/pypi/implementation/kafka-python-ng
:target: https://github.com/wbarnha/kafka-python-ng/blob/master/setup.py



Python client for the Apache Kafka distributed stream processing system.
kafka-python is designed to function much like the official java client, with a
kafka-python-ng is designed to function much like the official java client, with a
sprinkling of pythonic interfaces (e.g., consumer iterators).

kafka-python is best used with newer brokers (0.9+), but is backwards-compatible with
kafka-python-ng is best used with newer brokers (0.9+), but is backwards-compatible with
older versions (to 0.8.0). Some features will only be enabled on newer brokers.
For example, fully coordinated consumer groups -- i.e., dynamic partition
assignment to multiple consumers in the same group -- requires use of 0.9+ kafka
Expand All @@ -32,13 +32,19 @@ check code (perhaps using zookeeper or consul). For older brokers, you can
achieve something similar by manually assigning different partitions to each
consumer instance with config management tools like chef, ansible, etc. This
approach will work fine, though it does not support rebalancing on failures.
See <https://kafka-python.readthedocs.io/en/master/compatibility.html>

See https://kafka-python.readthedocs.io/en/master/compatibility.html

for more details.

Please note that the master branch may contain unreleased features. For release
documentation, please see readthedocs and/or python's inline help.

>>> pip install kafka-python

.. code-block:: bash

$ pip install kafka-python-ng



KafkaConsumer
Expand All @@ -48,89 +54,123 @@ KafkaConsumer is a high-level message consumer, intended to operate as similarly
as possible to the official java client. Full support for coordinated
consumer groups requires use of kafka brokers that support the Group APIs: kafka v0.9+.

See <https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html>

See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

for API and configuration details.

The consumer iterator returns ConsumerRecords, which are simple namedtuples
that expose basic message attributes: topic, partition, offset, key, and value:

>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer('my_favorite_topic')
>>> for msg in consumer:
... print (msg)
.. code-block:: python

>>> # join a consumer group for dynamic partition assignment and offset commits
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group')
>>> for msg in consumer:
... print (msg)
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic')
for msg in consumer:
print (msg)

>>> # manually assign the partition list for the consumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
>>> consumer.assign([TopicPartition('foobar', 2)])
>>> msg = next(consumer)
.. code-block:: python

>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads)
>>> consumer.subscribe(['msgpackfoo'])
>>> for msg in consumer:
... assert isinstance(msg.value, dict)
# join a consumer group for dynamic partition assignment and offset commits
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group')
for msg in consumer:
print (msg)

>>> # Access record headers. The returned value is a list of tuples
>>> # with str, bytes for key and value
>>> for msg in consumer:
... print (msg.headers)
.. code-block:: python

>>> # Get consumer metrics
>>> metrics = consumer.metrics()
# manually assign the partition list for the consumer
from kafka import TopicPartition
consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
consumer.assign([TopicPartition('foobar', 2)])
msg = next(consumer)

.. code-block:: python

# Deserialize msgpack-encoded values
consumer = KafkaConsumer(value_deserializer=msgpack.loads)
consumer.subscribe(['msgpackfoo'])
for msg in consumer:
assert isinstance(msg.value, dict)

.. code-block:: python

# Access record headers. The returned value is a list of tuples
# with str, bytes for key and value
for msg in consumer:
print (msg.headers)

.. code-block:: python

# Get consumer metrics
metrics = consumer.metrics()


KafkaProducer
*************

KafkaProducer is a high-level, asynchronous message producer. The class is
intended to operate as similarly as possible to the official java client.
See <https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html>

See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

for more details.

>>> from kafka import KafkaProducer
>>> producer = KafkaProducer(bootstrap_servers='localhost:1234')
>>> for _ in range(100):
... producer.send('foobar', b'some_message_bytes')
.. code-block:: python

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:1234')
for _ in range(100):
producer.send('foobar', b'some_message_bytes')

.. code-block:: python

# Block until a single message is sent (or timeout)
future = producer.send('foobar', b'another_message')
result = future.get(timeout=60)

.. code-block:: python

# Block until all pending messages are at least put on the network
# NOTE: This does not guarantee delivery or success! It is really
# only useful if you configure internal batching using linger_ms
producer.flush()

.. code-block:: python

>>> # Block until a single message is sent (or timeout)
>>> future = producer.send('foobar', b'another_message')
>>> result = future.get(timeout=60)
# Use a key for hashed-partitioning
producer.send('foobar', key=b'foo', value=b'bar')

>>> # Block until all pending messages are at least put on the network
>>> # NOTE: This does not guarantee delivery or success! It is really
>>> # only useful if you configure internal batching using linger_ms
>>> producer.flush()
.. code-block:: python

>>> # Use a key for hashed-partitioning
>>> producer.send('foobar', key=b'foo', value=b'bar')
# Serialize json messages
import json
producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('fizzbuzz', {'foo': 'bar'})

>>> # Serialize json messages
>>> import json
>>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
>>> producer.send('fizzbuzz', {'foo': 'bar'})
.. code-block:: python

>>> # Serialize string keys
>>> producer = KafkaProducer(key_serializer=str.encode)
>>> producer.send('flipflap', key='ping', value=b'1234')
# Serialize string keys
producer = KafkaProducer(key_serializer=str.encode)
producer.send('flipflap', key='ping', value=b'1234')

>>> # Compress messages
>>> producer = KafkaProducer(compression_type='gzip')
>>> for i in range(1000):
... producer.send('foobar', b'msg %d' % i)
.. code-block:: python

>>> # Include record headers. The format is list of tuples with string key
>>> # and bytes value.
>>> producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')])
# Compress messages
producer = KafkaProducer(compression_type='gzip')
for i in range(1000):
producer.send('foobar', b'msg %d' % i)

>>> # Get producer performance metrics
>>> metrics = producer.metrics()
.. code-block:: python

# Include record headers. The format is list of tuples with string key
# and bytes value.
producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')])

.. code-block:: python

# Get producer performance metrics
metrics = producer.metrics()


Thread safety
Expand All @@ -146,31 +186,34 @@ multiprocessing is recommended.
Compression
***********

kafka-python supports the following compression formats:
kafka-python-ng supports the following compression formats:

- gzip
- LZ4
- Snappy
- Zstandard (zstd)

gzip is supported natively, the others require installing additional libraries.
See <https://kafka-python.readthedocs.io/en/master/install.html> for more information.

See https://kafka-python.readthedocs.io/en/master/install.html for more information.



Optimized CRC32 Validation
**************************

Kafka uses CRC32 checksums to validate messages. kafka-python includes a pure
Kafka uses CRC32 checksums to validate messages. kafka-python-ng includes a pure
python implementation for compatibility. To improve performance for high-throughput
applications, kafka-python will use `crc32c` for optimized native code if installed.
See <https://kafka-python.readthedocs.io/en/master/install.html> for installation instructions.
See https://kafka-python.readthedocs.io/en/master/install.html for installation instructions.

See https://pypi.org/project/crc32c/ for details on the underlying crc32c lib.


Protocol
********

A secondary goal of kafka-python is to provide an easy-to-use protocol layer
A secondary goal of kafka-python-ng is to provide an easy-to-use protocol layer
for interacting with kafka brokers via the python repl. This is useful for
testing, probing, and general experimentation. The protocol support is
leveraged to enable a KafkaClient.check_version() method that
Expand Down
Loading
Loading