Skip to content

Commit

Permalink
SMT for json parsing (#214)
Browse files Browse the repository at this point in the history
* smt-nested-json-as-map

- parse json objects into Maps rather than Structs prior to handing to the iceberg connector, for users with unstructured json data.
  • Loading branch information
tabmatfournier authored Apr 4, 2024
1 parent 5ab5c53 commit 303435a
Show file tree
Hide file tree
Showing 8 changed files with 1,130 additions and 0 deletions.
74 changes: 74 additions & 0 deletions kafka-connect-transforms/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,77 @@ It will promote the `before` or `after` element fields to top level and add the
| Property | Description |
|---------------------|-----------------------------------------------------------------------------------|
| cdc.target.pattern | Pattern to use for setting the CDC target field value, default is `{db}.{table}` |

# JsonToMapTransform
_(Experimental)_

The `JsonToMapTransform` SMT parses Strings as Json object payloads to infer schemas. The iceberg-kafka-connect
connector for schema-less data (e.g. the Map produced by the Kafka supplied JsonConverter) is to convert Maps into Iceberg
Structs. This is fine when the JSON is well-structured, but when you have JSON objects with dynamically
changing keys, it will lead to an explosion of columns in the Iceberg table due to schema evolutions.

This SMT is useful in situations where the JSON is not well-structured, in order to get data into Iceberg where
it can be further processed by query engines into a more manageable form. It will convert nested objects to
Maps and include Map type in the Schema. The connector will respect the Schema and create Iceberg tables with Iceberg
Map (String) columns for the JSON objects.

Note:

- You must use the `stringConverter` as the `value.converter` setting for your connector, not `jsonConverter`
- It expects JSON objects (`{...}`) in those strings.
- Message keys, tombstones, and headers are not transformed and are passed along as-is by the SMT

## Configuration

| Property | Description (default value) |
|----------------------|------------------------------------------|
| json.root | (false) Boolean value to start at root |

The `transforms.IDENTIFIER_HERE.json.root` is meant for the most inconsistent data. It will construct a Struct with a single field
called `payload` with a Schema of `Map<String, String>`.

If `transforms.IDENTIFIER_HERE.json.root` is false (the default), it will construct a Struct with inferred schemas for primitive and
array fields. Nested objects become fields of type `Map<String, String>`.

Keys with empty arrays and empty objects are filtered out from the final schema. Arrays will be typed unless the
json arrays have mixed types in which case they are converted to arrays of strings.

Example json:

```json
{
"key": 1,
"array": [1,"two",3],
"empty_obj": {},
"nested_obj": {"some_key": ["one", "two"]}
}
```

Will become the following if `json.root` is true:

```
SinkRecord.schema:
"payload" : (Optional) Map<String, String>
Sinkrecord.value (Struct):
"payload" : Map(
"key" : "1",
"array" : "[1,"two",3]"
"empty_obj": "{}"
"nested_obj": "{"some_key":["one","two"]}}"
)
```

Will become the following if `json.root` is false

```
SinkRecord.schema:
"key": (Optional) Int32,
"array": (Optional) Array<String>,
"nested_object": (Optional) Map<string, String>
SinkRecord.value (Struct):
"key" 1,
"array" ["1", "two", "3"]
"nested_object" Map ("some_key" : "["one", "two"]")
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.transforms;

class JsonToMapException extends RuntimeException {
JsonToMapException(String errorMessage) {
super(errorMessage);
}

JsonToMapException(String errorMessage, Throwable err) {
super(errorMessage, err);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.transforms;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.SimpleConfig;

public class JsonToMapTransform<R extends ConnectRecord<R>> implements Transformation<R> {

public static final String JSON_LEVEL = "json.root";

private static final ObjectReader mapper = new ObjectMapper().reader();

private boolean startAtRoot = false;

public static final ConfigDef CONFIG_DEF =
new ConfigDef()
.define(
JSON_LEVEL,
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.MEDIUM,
"Boolean value to start at root. False is one level in from the root");

private static final String ALL_JSON_SCHEMA_FIELD = "payload";
private static final Schema JSON_MAP_SCHEMA =
SchemaBuilder.struct()
.field(
ALL_JSON_SCHEMA_FIELD,
SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).optional().build())
.build();

@Override
public R apply(R record) {
if (record.value() == null) {
return record;
} else {
return process(record);
}
}

private R process(R record) {
if (!(record.value() instanceof String)) {
throw new JsonToMapException("record value is not a string, use StringConverter");
}

String json = (String) record.value();
JsonNode obj;

try {
obj = mapper.readTree(json);
} catch (Exception e) {
throw new JsonToMapException(
String.format(
"record.value is not valid json for record.value: %s", collectRecordDetails(record)),
e);
}

if (!(obj instanceof ObjectNode)) {
throw new JsonToMapException(
String.format(
"Expected json object for record.value after parsing: %s",
collectRecordDetails(record)));
}

if (startAtRoot) {
return singleField(record, (ObjectNode) obj);
}
return structRecord(record, (ObjectNode) obj);
}

private R singleField(R record, ObjectNode obj) {
Struct struct =
new Struct(JSON_MAP_SCHEMA)
.put(ALL_JSON_SCHEMA_FIELD, JsonToMapUtils.populateMap(obj, Maps.newHashMap()));
return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
JSON_MAP_SCHEMA,
struct,
record.timestamp(),
record.headers());
}

private R structRecord(R record, ObjectNode contents) {
SchemaBuilder builder = SchemaBuilder.struct();
contents.fields().forEachRemaining(entry -> JsonToMapUtils.addField(entry, builder));
Schema schema = builder.build();
Struct value = JsonToMapUtils.addToStruct(contents, schema, new Struct(schema));
return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
schema,
value,
record.timestamp(),
record.headers());
}

private String collectRecordDetails(R record) {
if (record instanceof SinkRecord) {
SinkRecord sinkRecord = (SinkRecord) record;
return String.format(
"topic %s partition %s offset %s",
sinkRecord.topic(), sinkRecord.kafkaPartition(), sinkRecord.kafkaOffset());
} else {
return String.format("topic %s partition %S", record.topic(), record.kafkaPartition());
}
}

@Override
public ConfigDef config() {
return CONFIG_DEF;
}

@Override
public void close() {}

@Override
public void configure(Map<String, ?> configs) {
SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs);
startAtRoot = config.getBoolean(JSON_LEVEL);
}
}
Loading

0 comments on commit 303435a

Please sign in to comment.