diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitState.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitState.java index 502a01bd..dfc87a57 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitState.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitState.java @@ -106,8 +106,9 @@ public boolean isCommitTimedOut() { return false; } - if (System.currentTimeMillis() - startTime > config.commitTimeoutMs()) { - LOG.info("Commit timeout reached"); + long currentTime = System.currentTimeMillis(); + if (currentTime - startTime > config.commitTimeoutMs()) { + LOG.info("Commit timeout reached. Now: {}, start: {}, timeout: {}", currentTime, startTime, config.commitTimeoutMs()); return true; } return false; @@ -125,14 +126,14 @@ public boolean isCommitReady(int expectedPartitionCount) { .sum(); if (receivedPartitionCount >= expectedPartitionCount) { - LOG.debug( + LOG.info( "Commit {} ready, received responses for all {} partitions", currentCommitId, receivedPartitionCount); return true; } - LOG.debug( + LOG.info( "Commit {} not ready, received responses for {} of {} partitions, waiting for more", currentCommitId, receivedPartitionCount, diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java index 0fcc0bb7..b7204c85 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java @@ -97,10 +97,12 @@ public void process() { if (commitState.isCommitIntervalReached()) { // send out begin commit commitState.startNewCommit(); + LOG.info("Started new commit with commit-id={}", commitState.currentCommitId().toString()); Event event = new Event(config.controlGroupId(), new StartCommit(commitState.currentCommitId())); send(event); - LOG.debug("Started new commit with commit-id={}", commitState.currentCommitId().toString()); + LOG.info("Sent workers commit trigger with commit-id={}", commitState.currentCommitId().toString()); + } consumeAvailable(POLL_DURATION, this::receive); @@ -127,6 +129,7 @@ private boolean receive(Envelope envelope) { private void commit(boolean partialCommit) { try { + LOG.info("Processing commit after responses for {}, isPartialCommit {}",commitState.currentCommitId(), partialCommit); doCommit(partialCommit); } catch (Exception e) { LOG.warn("Commit failed, will try again next cycle", e); diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Worker.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Worker.java index 7fb9d899..dcf57abb 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Worker.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Worker.java @@ -134,7 +134,7 @@ private void routeRecordStatically(SinkRecord record) { private void routeRecordDynamically(SinkRecord record) { String routeField = config.tablesRouteField(); - Preconditions.checkNotNull(routeField, "Route field cannot be null with dynamic routing"); + Preconditions.checkNotNull(routeField, String.format("Route field cannot be null with dynamic routing at topic: %s, partition: %d, offset: %d", record.topic(), record.kafkaPartition(), record.kafkaOffset())); String routeValue = extractRouteValue(record.value(), routeField); if (routeValue != null) { diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriterFactory.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriterFactory.java index 2bbf0788..84f54843 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriterFactory.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriterFactory.java @@ -47,7 +47,7 @@ public IcebergWriterFactory(Catalog catalog, IcebergSinkConfig config) { } public RecordWriter createWriter( - String tableName, SinkRecord sample, boolean ignoreMissingTable) { + String tableName, SinkRecord sample, boolean ignoreMissingTable) { TableIdentifier identifier = TableIdentifier.parse(tableName); Table table; try { @@ -67,47 +67,52 @@ public RecordWriter createWriter( @VisibleForTesting Table autoCreateTable(String tableName, SinkRecord sample) { - StructType structType; - if (sample.valueSchema() == null) { - structType = - SchemaUtils.inferIcebergType(sample.value(), config) - .orElseThrow(() -> new DataException("Unable to create table from empty object")) - .asStructType(); - } else { - structType = SchemaUtils.toIcebergType(sample.valueSchema(), config).asStructType(); - } + try { + StructType structType; + if (sample.valueSchema() == null) { + structType = + SchemaUtils.inferIcebergType(sample.value(), config) + .orElseThrow(() -> new DataException("Unable to create table from empty object")) + .asStructType(); + } else { + structType = SchemaUtils.toIcebergType(sample.valueSchema(), config).asStructType(); + } - org.apache.iceberg.Schema schema = new org.apache.iceberg.Schema(structType.fields()); - TableIdentifier identifier = TableIdentifier.parse(tableName); + org.apache.iceberg.Schema schema = new org.apache.iceberg.Schema(structType.fields()); + TableIdentifier identifier = TableIdentifier.parse(tableName); - List partitionBy = config.tableConfig(tableName).partitionBy(); - PartitionSpec spec; - try { - spec = SchemaUtils.createPartitionSpec(schema, partitionBy); + List partitionBy = config.tableConfig(tableName).partitionBy(); + PartitionSpec spec; + try { + spec = SchemaUtils.createPartitionSpec(schema, partitionBy); + } catch (Exception e) { + LOG.error( + "Unable to create partition spec {}, table {} will be unpartitioned", + partitionBy, + identifier, + e); + spec = PartitionSpec.unpartitioned(); + } + + PartitionSpec partitionSpec = spec; + AtomicReference result = new AtomicReference<>(); + Tasks.range(1) + .retry(IcebergSinkConfig.CREATE_TABLE_RETRIES) + .run( + notUsed -> { + try { + result.set(catalog.loadTable(identifier)); + } catch (NoSuchTableException e) { + result.set( + catalog.createTable( + identifier, schema, partitionSpec, config.autoCreateProps())); + LOG.info("Created new table {} from record at topic: {}, partition: {}, offset: {}", identifier, sample.topic(), sample.kafkaPartition(), sample.kafkaOffset()); + } + }); + return result.get(); } catch (Exception e) { - LOG.error( - "Unable to create partition spec {}, table {} will be unpartitioned", - partitionBy, - identifier, - e); - spec = PartitionSpec.unpartitioned(); + LOG.error("Error creating new table {} from record at topic: {}, partition: {}, offset: {}", tableName, sample.topic(), sample.kafkaPartition(), sample.kafkaOffset()); + throw e; } - - PartitionSpec partitionSpec = spec; - AtomicReference
result = new AtomicReference<>(); - Tasks.range(1) - .retry(IcebergSinkConfig.CREATE_TABLE_RETRIES) - .run( - notUsed -> { - try { - result.set(catalog.loadTable(identifier)); - } catch (NoSuchTableException e) { - result.set( - catalog.createTable( - identifier, schema, partitionSpec, config.autoCreateProps())); - LOG.info("Created new table {} from record at topic: {}, partition: {}, offset: {}", identifier, sample.topic(), sample.kafkaPartition(), sample.kafkaOffset()); - } - }); - return result.get(); } }