Skip to content

Commit

Permalink
Merge pull request #23 from SciCatProject/kafka-ci
Browse files Browse the repository at this point in the history
Add Kafka connection test. (Part of integration test)
  • Loading branch information
nitrosx authored May 29, 2024
2 parents 7400c4b + fadb2ee commit eecc1ad
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:

intergration-tests:
name: Integration Tests
needs: tests
needs: [tests, formatting]
uses: ./.github/workflows/integration.yml
with:
python-version: '${{needs.formatting.outputs.min_python}}'
4 changes: 4 additions & 0 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,8 @@ jobs:
python-version: ${{ inputs.python-version }}
- run: python -m pip install --upgrade pip
- run: python -m pip install -r requirements/ci.txt
- run: python -m pip install -e .
- run: docker-compose version
- run: docker-compose -f tests/docker-compose-file-writer.yml up -d
- run: scicat_ingestor -c resources/config.sample.json --verbose
- run: docker-compose -f tests/docker-compose-file-writer.yml down
4 changes: 1 addition & 3 deletions config.20240405.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
"kafka": {
"topics": ["KAFKA_TOPIC_1","KAFKA_TOPIC_2"],
"group_id": "GROUP_ID",
"bootstrap_servers": [
"HOST:9092"
],
"bootstrap_servers": ["localhost:9093"],
"enable_auto_commit": true,
"auto_offset_reset": "earliest"
},
Expand Down
6 changes: 2 additions & 4 deletions resources/config.sample.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
{
"kafka": {
"topics": ["KAFKA_TOPIC_1","KAFKA_TOPIC_2"],
"topics": ["KAFKA_TOPIC_1", "KAFKA_TOPIC_2"],
"group_id": "GROUP_ID",
"bootstrap_servers": [
"HOST:9092"
],
"bootstrap_servers": ["localhost:9093"],
"individual_message_commit": false,
"enable_auto_commit": true,
"auto_offset_reset": "earliest"
Expand Down
4 changes: 2 additions & 2 deletions src/scicat_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
from scicat_logging import build_logger


def quit(logger: logging.Logger) -> None:
def quit(logger: logging.Logger, unexpected: bool = True) -> None:
"""Log the message and exit the program."""
import sys

logger.info("Exiting ingestor")
sys.exit()
sys.exit(1 if unexpected else 0)


def main() -> None:
Expand Down
22 changes: 14 additions & 8 deletions src/scicat_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,37 +13,43 @@ def collect_consumer_options(options: kafkaOptions) -> dict:

# Build logger and formatter
config_dict = {
key.replace('_', '.'): value
key.replace("_", "."): value
for key, value in asdict(options).items()
if key not in ('topics', 'individual_message_commit')
if key not in ("topics", "individual_message_commit")
}
config_dict['enable.auto.commit'] = (
config_dict["enable.auto.commit"] = (
not options.individual_message_commit
) and options.enable_auto_commit
if isinstance(bootstrap_servers := options.bootstrap_servers, list):
# Convert the list to a comma-separated string
config_dict["bootstrap.servers"] = ",".join(bootstrap_servers)
else:
config_dict["bootstrap.servers"] = bootstrap_servers

return config_dict


def collect_kafka_topics(options: kafkaOptions) -> list[str]:
"""Return the Kafka topics as a list."""
if isinstance(options.topics, str):
return options.topics.split(',')
return options.topics.split(",")
elif isinstance(options.topics, list):
return options.topics
else:
raise TypeError('The topics must be a list or a comma-separated string.')
raise TypeError("The topics must be a list or a comma-separated string.")


def build_consumer(kafka_options: kafkaOptions, logger: logging.Logger) -> Consumer:
"""Build a Kafka consumer and configure it according to the ``options``."""
consumer_options = collect_consumer_options(kafka_options)
logger.info('Connecting to Kafka with the following parameters:')
logger.info("Connecting to Kafka with the following parameters:")
logger.info(consumer_options)
consumer = Consumer(consumer_options)
if not validate_consumer(consumer, logger):
return None

kafka_topics = collect_kafka_topics(kafka_options)
logger.info(f'Subscribing to the following Kafka topics: {kafka_topics}')
logger.info(f"Subscribing to the following Kafka topics: {kafka_topics}")
consumer.subscribe(kafka_topics)
return Consumer(consumer_options)

Expand All @@ -58,5 +64,5 @@ def validate_consumer(consumer: Consumer, logger: logging.Logger) -> bool:
)
return False
else:
logger.info('Kafka consumer successfully instantiated')
logger.info("Kafka consumer successfully instantiated")
return True
71 changes: 71 additions & 0 deletions tests/docker-compose-file-writer.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
version: "3.5"

services:
# Kafka and file-writer services are copied from
# https://gitlab.esss.lu.se/ecdc/ess-dmsc/kafka-to-nexus/-/blob/main/integration-tests/docker-compose.yml
# Currently github-ci fails to run the original docker-compose.yml file in the ecdc repository
# so we copied and modified the file here.
kafka:
container_name: file-writer-kafka
hostname: file-writer-kafka
image: confluentinc/cp-kafka:7.4.3
deploy:
resources:
limits:
memory: 600M
restart: always
depends_on:
- zookeeper
ports:
- "9093:9093"
networks:
- frontend
environment:
KAFKA_ZOOKEEPER_CONNECT: file-writer-zookeeper:2181
KAFKA_BROKER_ID: 0
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_MESSAGE_MAX_BYTES: 300000000
KAFKA_SOCKET_REQUEST_MAX_BYTES: 300000000
KAFKA_REPLICA_FETCH_MAX_BYTES: 300000000
KAFKA_LOG_RETENTION_MS: -1 # keep data forever, required for tests involving fake "historical" data
## listeners
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9093
KAFKA_ADVERTISED_LISTENERS: INSIDE://file-writer-kafka:9092,OUTSIDE://localhost:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
healthcheck:
test: ["CMD", "kafka-topics", "--bootstrap-server", "localhost:9092", "--list"]
interval: 5s
timeout: 5s
retries: 5
start_period: 10s

zookeeper:
container_name: file-writer-zookeeper
hostname: file-writer-zookeeper
image: confluentinc/cp-zookeeper:7.4.3
deploy:
resources:
limits:
memory: 200M
restart: always
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
networks:
- frontend

filewriter:
container_name: file-writer-file-writer
image: registry.esss.lu.se/ecdc/ess-dmsc/docker-centos7-build-node:latest
depends_on:
kafka:
condition: service_healthy
tty: true
networks:
- frontend

networks:
frontend:

0 comments on commit eecc1ad

Please sign in to comment.