Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka #75

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ thredds = "stac_generator.plugins.inputs.thredds:ThreddsInput"
text_file = "stac_generator.plugins.inputs.text_file:TextFileInput"
solr = "stac_generator.plugins.inputs.solr:SolrInput"
elasticsearch = "stac_generator.plugins.inputs.elasticsearch:ElasticsearchInput"
kafka = "stac_generator.plugins.inputs.kafka:KafkaInput"

[tool.poetry.plugins."stac_generator.outputs"]
standard_out = "stac_generator.plugins.outputs.standard_out:StandardOutOutput"
Expand All @@ -110,6 +111,7 @@ rabbitmq = "stac_generator.plugins.outputs.rabbit_mq:RabbitMQOutput"
rabbitmq_bulk = "stac_generator.plugins.bulk_outputs.rabbit_mq:RabbitMQBulkOutput"
intake_esm = "stac_generator.plugins.outputs.intake_esm:IntakeESMOutput"
stac_fastapi = "stac_generator.plugins.outputs.stac_fastapi:STACFastAPIOutput"
kafka = "stac_generator.plugins.outputs.kafka:KafkaOutput"

[tool.poetry.plugins."stac_generator.mappings"]
ceda = "stac_generator.plugins.mappings.ceda:CEDAMapping"
Expand Down
4 changes: 1 addition & 3 deletions stac_generator/core/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def _process(self, body: dict, **kwargs) -> None:
:param kwargs:
"""

def process(self, uri: str, **kwargs) -> None:
def process(self, body: dict, **kwargs) -> None:
"""
Run generator.

Expand All @@ -201,6 +201,4 @@ def process(self, uri: str, **kwargs) -> None:
"""
kwargs["TYPE"] = self.TYPE

body = {"uri": uri}

self._process(body, **kwargs)
2 changes: 1 addition & 1 deletion stac_generator/plugins/inputs/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def run(self, generator: BaseGenerator):
for bucket in aggregation["buckets"]:
uri = bucket["key"]["uri"]
if self.should_process(uri):
generator.process(**bucket["key"])
generator.process(bucket["key"])
total_generated += 1

if "after_key" not in aggregation.keys():
Expand Down
2 changes: 1 addition & 1 deletion stac_generator/plugins/inputs/file_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def run(self, generator: BaseGenerator):
filename = os.path.abspath(os.path.join(root, file))

if self.should_process(filename):
generator.process(filename)
generator.process({"uri": filename})
logger.debug(f"Input processing: {filename}")
else:
logger.debug(f"Input skipping: {filename}")
Expand Down
2 changes: 1 addition & 1 deletion stac_generator/plugins/inputs/intake_esm.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def run(self, generator: BaseGenerator):
uri = getattr(row, self.object_attr)

if self.should_process(uri):
generator.process(uri)
generator.process({"uri": uri})
LOGGER.debug(f"Input processing: {uri}")
else:
LOGGER.debug(f"Input skipping: {uri}")
Expand Down
83 changes: 83 additions & 0 deletions stac_generator/plugins/inputs/kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# encoding: utf-8
"""
Kafka
-----

An input plufin which polls a kafka event stream.

**Plugin name:** ``kafka``

.. list-table::
:header-rows: 1

* - Option
- Value Type
- Description
* - ``config``
- ``dict``
- ``REQUIRED`` Configuration for the `Kafka consumer <https://docs.confluent.io/kafka-clients/python/current/overview.html>`_
* - ``topics``
- ``list``
- ``REQUIRED`` The topics to poll for messages.
* - ``timeout``
- ``str``
- ``REQUIRED`` The time between polling the event stream.

Example configuration:
.. code-block:: yaml

outputs:
- method: kafka
config:
'bootstrap.servers': 'host1:9092,host2:9092'
topics:
- stac
"""

import logging
import json

from confluent_kafka import Consumer, KafkaError, KafkaException

from stac_generator.core.generator import BaseGenerator
from stac_generator.core.input import BaseInput

LOGGER = logging.getLogger(__name__)


class KafkaInput(BaseInput):
"""
Use Kafka event stream as input to collect messages to pass to
the processor.
"""

def __init__(self, **kwargs):
super().__init__(**kwargs)
self.timeout = kwargs["timeout"]
self.topics = kwargs["topics"]
self.consumer = Consumer(**kwargs["config"])

def run(self, generator: BaseGenerator):
try:
self.consumer.subscribe(self.topics)
while True:
msg = self.consumer.poll(timeout=self.timeout)
if msg is None:
continue

if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
LOGGER.error(
"%% %s [%d] reached end at offset %d\n"
% (msg.topic(), msg.partition(), msg.offset())
)
elif msg.error():
raise KafkaException(msg.error())

else:
data = json.loads(msg.value().decode('utf-8'))
generator.process(data)
finally:
# Close down consumer to commit final offsets.
self.consumer.close()
3 changes: 1 addition & 2 deletions stac_generator/plugins/inputs/object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ def run(self, generator: BaseGenerator):
):

generator.process(
f"{self.endpoint_url}/{bucket.name}/{obj.key}",
client=self.client,
{"uri": f"{self.endpoint_url}/{bucket.name}/{obj.key}"}
)
total_files += 1

Expand Down
7 changes: 4 additions & 3 deletions stac_generator/plugins/inputs/rabbit_mq.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,14 +310,15 @@ def callback(
self.acknowledge_message(ch, method.delivery_tag, connection)
return

# Extract uri
uri = message.pop("uri")
# Extract body
body = message.pop("body")
uri = body["uri"]

if self.should_process(uri):
LOGGER.info("Input processing: %s message: %s", uri, message)

self.acknowledge_message(ch, method.delivery_tag, connection)
generator.process(uri, **message)
generator.process(body["uri"], **message["kwargs"])

else:
LOGGER.info("Input skipping: %s", uri)
Expand Down
2 changes: 1 addition & 1 deletion stac_generator/plugins/inputs/solr.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,4 @@ def run(self, generator: BaseGenerator):
# by replacing '.' with '/' up until the filename
uri = uri.replace(".", "/", uri.split("|")[0].count(".") - 1)

generator.process(uri)
generator.process({"uri": uri})
2 changes: 1 addition & 1 deletion stac_generator/plugins/inputs/text_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def run(self, generator: BaseGenerator):
unique_lines.add(line)
data = json.loads(line)
try:
generator.process(**data)
generator.process(data)
except Exception:
errors.write(line)
errors.write(traceback.format_exc())
Expand Down
2 changes: 1 addition & 1 deletion stac_generator/plugins/inputs/thredds.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def process_ds(self, ds, generator: BaseGenerator):
filepath = get_sub_attr(ds, self.object_attr)

if self.should_process(filepath):
generator.process(filepath)
generator.process({"uri": filepath})
logger.debug(f"Input processing: {filepath}")
else:
logger.debug(f"Input skipping: {filepath}")
Expand Down
73 changes: 73 additions & 0 deletions stac_generator/plugins/outputs/kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# encoding: utf-8
"""
Kafka
-----

An output backend which outputs the generated metadata to a kafka event stream.

**Plugin name:** ``kafka``

.. list-table::
:header-rows: 1

* - Option
- Value Type
- Description
* - ``config``
- ``dict``
- ``REQUIRED`` Configuration for the `Kafka producer <https://docs.confluent.io/kafka-clients/python/current/overview.html>`_
* - ``topic``
- ``str``
- ``REQUIRED`` The topic to post the message to.
* - ``key_term``
- ``str``
- Term to be used as the kafka messages key.

Example configuration:
.. code-block:: yaml

outputs:
- method: kafka
config:
'bootstrap.servers': 'host1:9092,host2:9092'
topic: stac
key_term: item_id
"""
__author__ = "Richard Smith"
__date__ = "01 Jun 2021"
__copyright__ = "Copyright 2018 United Kingdom Research and Innovation"
__license__ = "BSD - see LICENSE file in top-level package directory"
__contact__ = "[email protected]"

import json

from confluent_kafka import Producer

from stac_generator.core.output import BaseOutput


class KafkaOutput(BaseOutput):
"""
Simple print backend which can be used
for testing and debugging.
"""

def __init__(self, **kwargs):
super().__init__(**kwargs)

# Create the credentials object
if not hasattr(self, "input_term"):
self.key_term = "uri"
self.producer = Producer(self.config)

def export(self, data: dict, **kwargs) -> None:
"""
Post the message to the kafka server.

:param data: Data from extraction processes
:param kwargs: Not used
"""
key = data.get(self.key_term, None)
message = json.dumps(data).encode("utf8")
self.producer.produce(self.topic, key=key, value=message)
self.producer.flush()
Loading