From f4403a7e3fb0b71effe4c05a9c2c78204d25f499 Mon Sep 17 00:00:00 2001 From: "Anton Bryzgalov @ CoinStats" Date: Fri, 17 Jun 2022 22:08:40 +0400 Subject: [PATCH] Added support for AWS Kinesis Sponsored by CoinStats.app --- .../jobs/exporters/kinesis_item_exporter.py | 82 +++++++++++++++++++ ethereumetl/cli/stream.py | 1 + .../streaming/item_exporter_creator.py | 8 ++ setup.py | 4 + 4 files changed, 95 insertions(+) create mode 100644 blockchainetl/jobs/exporters/kinesis_item_exporter.py diff --git a/blockchainetl/jobs/exporters/kinesis_item_exporter.py b/blockchainetl/jobs/exporters/kinesis_item_exporter.py new file mode 100644 index 000000000..6c9c717cb --- /dev/null +++ b/blockchainetl/jobs/exporters/kinesis_item_exporter.py @@ -0,0 +1,82 @@ +# MIT License +# +# Copyright (c) 2022 CoinStats LLC +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import json +import typing as t +import uuid +from itertools import zip_longest + +import boto3 + +_KINESIS_BATCH_LIMIT = 500 + + +def _uuid_partition_key(_: dict) -> str: + return uuid.uuid4().hex + + +class KinesisItemExporter: + def __init__( + self, + stream_name: str, + partition_key_callable: t.Callable[[dict], str] = _uuid_partition_key, + ): + import boto3 + self._stream_name = stream_name + self._partition_key_callable = partition_key_callable + self._kinesis_client = None # initialized in .open + + def open(self) -> None: + self._kinesis_client = boto3.client('kinesis') + + def export_items(self, items: t.Iterable[dict]) -> None: + sentinel = object() + chunks = zip_longest( + *(iter(items),) * _KINESIS_BATCH_LIMIT, + fillvalue=sentinel, + ) + for chunk in chunks: + self._kinesis_client.put_records( + StreamName=self._stream_name, + Records=[ + { + 'Data': _serialize_item(item), + 'PartitionKey': self._partition_key_callable(item), + } + for item in chunk + if item is not sentinel + ], + ) + + def export_item(self, item: dict) -> None: + self._kinesis_client.put_record( + StreamName=self._stream_name, + Data=_serialize_item(item), + PartitionKey=self._partition_key_callable(item), + ) + + def close(self): + pass + + +def _serialize_item(item: dict) -> bytes: + return json.dumps(item).encode() diff --git a/ethereumetl/cli/stream.py b/ethereumetl/cli/stream.py index 1ff978e04..217208dba 100644 --- a/ethereumetl/cli/stream.py +++ b/ethereumetl/cli/stream.py @@ -42,6 +42,7 @@ 'or Postgres connection url e.g. postgresql+pg8000://postgres:admin@127.0.0.1:5432/ethereum; ' 'or GCS bucket e.g. gs://your-bucket-name; ' 'or kafka, output name and connection host:port e.g. kafka/127.0.0.1:9092 ' + 'or Kinesis, e.g. kinesis://your-data-stream-name' 'If not specified will print to console') @click.option('-s', '--start-block', default=None, show_default=True, type=int, help='Start block') @click.option('-e', '--entity-types', default=','.join(EntityType.ALL_FOR_INFURA), show_default=True, type=str, diff --git a/ethereumetl/streaming/item_exporter_creator.py b/ethereumetl/streaming/item_exporter_creator.py index 400ad7efd..18bdb22b4 100644 --- a/ethereumetl/streaming/item_exporter_creator.py +++ b/ethereumetl/streaming/item_exporter_creator.py @@ -51,6 +51,11 @@ def create_item_exporter(output): batch_max_latency=2, batch_max_messages=1000, enable_message_ordering=enable_message_ordering) + elif item_exporter_type == ItemExporterType.KINESIS: + from blockchainetl.jobs.exporters.kinesis_item_exporter import KinesisItemExporter + item_exporter = KinesisItemExporter( + stream_name=output[len('kinesis://'):], + ) elif item_exporter_type == ItemExporterType.POSTGRES: from blockchainetl.jobs.exporters.postgres_item_exporter import PostgresItemExporter from blockchainetl.streaming.postgres_utils import create_insert_statement_for_table @@ -109,6 +114,8 @@ def get_bucket_and_path_from_gcs_output(output): def determine_item_exporter_type(output): if output is not None and output.startswith('projects'): return ItemExporterType.PUBSUB + if output is not None and output.startswith('kinesis://'): + return ItemExporterType.KINESIS if output is not None and output.startswith('kafka'): return ItemExporterType.KAFKA elif output is not None and output.startswith('postgresql'): @@ -123,6 +130,7 @@ def determine_item_exporter_type(output): class ItemExporterType: PUBSUB = 'pubsub' + KINESIS = 'kinesis' POSTGRES = 'postgres' GCS = 'gcs' CONSOLE = 'console' diff --git a/setup.py b/setup.py index 3804545c1..611923d03 100644 --- a/setup.py +++ b/setup.py @@ -53,6 +53,10 @@ def read(fname): # that's why we lock the version here 'libcst==0.3.21' ], + 'streaming-kinesis': [ + 'boto3==1.24.11', + 'botocore==1.27.11', + ], 'dev': [ 'pytest~=4.3.0' ]