Skip to content

Commit

Permalink
Refactor in preparation for castorm#88 support of response-level offset
Browse files Browse the repository at this point in the history
  • Loading branch information
castorm committed Jan 4, 2021
1 parent a53dba2 commit 5d02dd4
Show file tree
Hide file tree
Showing 14 changed files with 520 additions and 162 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@

import lombok.experimental.UtilityClass;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collector;

import static java.util.stream.Collectors.toMap;

@UtilityClass
public class CollectorsUtils {
public class CollectionUtils {

public static <T, K, U> Collector<T, ?, LinkedHashMap<K, U>> toLinkedHashMap(
Function<? super T, ? extends K> keyMapper,
Expand All @@ -44,4 +46,10 @@ public class CollectorsUtils {
LinkedHashMap::new
);
}

public static <S, T> Map<S, T> merge(Map<S, T> mapA, Map<S, T> mapB) {
Map<S, T> merged = new HashMap<>(mapA);
mapB.forEach((key, value) -> merged.merge(key, value, (k, v) -> v));
return merged;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.Map;
import java.util.Optional;

import static com.github.castorm.kafka.connect.common.CollectorsUtils.toLinkedHashMap;
import static com.github.castorm.kafka.connect.common.CollectionUtils.toLinkedHashMap;
import static java.util.function.Function.identity;

@Slf4j
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
* #L%
*/

import com.fasterxml.jackson.databind.JsonNode;
import com.github.castorm.kafka.connect.http.model.HttpResponse;
import com.github.castorm.kafka.connect.http.model.Offset;
import com.github.castorm.kafka.connect.http.record.model.KvRecord;
import com.github.castorm.kafka.connect.http.response.jackson.model.JacksonRecord;
import com.github.castorm.kafka.connect.http.response.spi.KvRecordHttpResponseParser;
import com.github.castorm.kafka.connect.http.response.timestamp.spi.TimestampParser;
import lombok.RequiredArgsConstructor;
Expand All @@ -43,7 +43,7 @@ public class JacksonKvRecordHttpResponseParser implements KvRecordHttpResponsePa

private final Function<Map<String, ?>, JacksonKvRecordHttpResponseParserConfig> configFactory;

private JacksonRecordParser recordParser;
private JacksonResponseRecordParser responseParser;

private TimestampParser timestampParser;

Expand All @@ -54,27 +54,27 @@ public JacksonKvRecordHttpResponseParser() {
@Override
public void configure(Map<String, ?> configs) {
JacksonKvRecordHttpResponseParserConfig config = configFactory.apply(configs);
recordParser = config.getRecordParser();
responseParser = config.getResponseParser();
timestampParser = config.getTimestampParser();
}

@Override
public List<KvRecord> parse(HttpResponse response) {
return recordParser.getRecords(response.getBody())
return responseParser.getRecords(response.getBody())
.map(this::map)
.collect(toList());
}

private KvRecord map(JsonNode node) {
private KvRecord map(JacksonRecord record) {

Map<String, Object> offsets = recordParser.getOffsets(node);
Map<String, Object> offsets = record.getOffset();

String key = recordParser.getKey(node)
String key = ofNullable(record.getKey())
.map(Optional::of)
.orElseGet(() -> ofNullable(offsets.get("key")).map(String.class::cast))
.orElseGet(() -> generateConsistentKey(node));
.orElseGet(() -> generateConsistentKey(record.getBody()));

Optional<Instant> timestamp = recordParser.getTimestamp(node)
Optional<Instant> timestamp = ofNullable(record.getTimestamp())
.map(Optional::of)
.orElseGet(() -> ofNullable(offsets.get("timestamp")).map(String.class::cast))
.map(timestampParser::parse);
Expand All @@ -85,12 +85,12 @@ private KvRecord map(JsonNode node) {

return KvRecord.builder()
.key(key)
.value(recordParser.getValue(node))
.value(record.getBody())
.offset(offset)
.build();
}

private String generateConsistentKey(JsonNode node) {
return nameUUIDFromBytes(node.toString().getBytes()).toString();
private static String generateConsistentKey(String body) {
return nameUUIDFromBytes(body.getBytes()).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,16 @@ public class JacksonKvRecordHttpResponseParserConfig extends AbstractConfig {

private static final String RECORD_TIMESTAMP_PARSER_CLASS = "http.response.record.timestamp.parser";

private final JacksonRecordParser recordParser;
private final JacksonResponseRecordParser responseParser;
private final TimestampParser timestampParser;

JacksonKvRecordHttpResponseParserConfig(Map<String, ?> originals) {
super(config(), originals);
recordParser = new JacksonRecordParser();
JacksonSerializer serializer = new JacksonSerializer();
JacksonRecordParser recordParser = new JacksonRecordParser(serializer);
recordParser.configure(originals);
responseParser = new JacksonResponseRecordParser(recordParser, serializer);
responseParser.configure(originals);
timestampParser = getConfiguredInstance(RECORD_TIMESTAMP_PARSER_CLASS, TimestampParser.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,13 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.kafka.common.Configurable;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Stream;

import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toMap;
Expand All @@ -43,73 +40,59 @@ public class JacksonRecordParser implements Configurable {

private final Function<Map<String, ?>, JacksonRecordParserConfig> configFactory;

private final ObjectMapper objectMapper;
private final JacksonSerializer serializer;

private final JacksonPropertyResolver propertyResolver;

private JsonPointer recordsPointer;
private List<JsonPointer> keyPointer;
private Optional<JsonPointer> timestampPointer;
private Map<String, JsonPointer> offsetPointers;
private JsonPointer valuePointer;

public JacksonRecordParser() {
this(JacksonRecordParserConfig::new, new ObjectMapper(), new JacksonPropertyResolver());
this(new JacksonSerializer(new ObjectMapper()));
}

public JacksonRecordParser(JacksonSerializer serializer) {
this(JacksonRecordParserConfig::new, serializer);
}

@Override
public void configure(Map<String, ?> settings) {
JacksonRecordParserConfig config = configFactory.apply(settings);
recordsPointer = config.getRecordsPointer();
keyPointer = config.getKeyPointer();
valuePointer = config.getValuePointer();
offsetPointers = config.getOffsetPointers();
timestampPointer = config.getTimestampPointer();
}

Stream<JsonNode> getRecords(byte[] body) {
return propertyResolver.getArrayAt(deserialize(body), recordsPointer);
}

@Deprecated
/*
Replaced by Offset
/**
* @deprecated Replaced by Offset
*/
@Deprecated
Optional<String> getKey(JsonNode node) {
String key = keyPointer.stream()
.map(pointer -> propertyResolver.getObjectAt(node, pointer).asText())
.map(pointer -> serializer.getObjectAt(node, pointer).asText())
.filter(it -> !it.isEmpty())
.collect(joining("+"));
return key.isEmpty() ? Optional.empty() : Optional.of(key);
}

@Deprecated
/*
Replaced by Offset
/**
* @deprecated Replaced by Offset
*/
@Deprecated
Optional<String> getTimestamp(JsonNode node) {
return timestampPointer.map(pointer -> propertyResolver.getObjectAt(node, pointer).asText());
return timestampPointer.map(pointer -> serializer.getObjectAt(node, pointer).asText());
}

Map<String, Object> getOffsets(JsonNode node) {
Map<String, Object> getOffset(JsonNode node) {
return offsetPointers.entrySet().stream()
.collect(toMap(Entry::getKey, entry -> propertyResolver.getObjectAt(node, entry.getValue()).asText()));
.collect(toMap(Entry::getKey, entry -> serializer.getObjectAt(node, entry.getValue()).asText()));
}

String getValue(JsonNode node) {

JsonNode value = propertyResolver.getObjectAt(node, valuePointer);

return value.isObject() ? serialize(value) : value.asText();
}

@SneakyThrows(IOException.class)
private JsonNode deserialize(byte[] body) {
return objectMapper.readTree(body);
}
JsonNode value = serializer.getObjectAt(node, valuePointer);

@SneakyThrows(IOException.class)
private String serialize(JsonNode node) {
return objectMapper.writeValueAsString(node);
return value.isObject() ? serializer.serialize(value) : value.asText();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package com.github.castorm.kafka.connect.http.response.jackson;

/*-
* #%L
* kafka-connect-http
* %%
* Copyright (C) 2020 CastorM
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/

import com.fasterxml.jackson.core.JsonPointer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.castorm.kafka.connect.http.response.jackson.model.JacksonRecord;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.Configurable;

import java.util.Map;
import java.util.function.Function;
import java.util.stream.Stream;

import static com.github.castorm.kafka.connect.common.CollectionUtils.merge;
import static java.util.Collections.emptyMap;

@RequiredArgsConstructor
public class JacksonResponseRecordParser implements Configurable {

private final Function<Map<String, ?>, JacksonRecordParserConfig> configFactory;

private final JacksonRecordParser recordParser;

private final JacksonSerializer serializer;

private JsonPointer recordsPointer;

public JacksonResponseRecordParser() {
this(new JacksonRecordParser(), new JacksonSerializer(new ObjectMapper()));
}

public JacksonResponseRecordParser(JacksonRecordParser recordParser, JacksonSerializer serializer) {
this(JacksonRecordParserConfig::new, recordParser, serializer);
}

@Override
public void configure(Map<String, ?> settings) {
JacksonRecordParserConfig config = configFactory.apply(settings);
recordsPointer = config.getRecordsPointer();
}

Stream<JacksonRecord> getRecords(byte[] body) {

JsonNode jsonBody = serializer.deserialize(body);

Map<String, Object> responseOffset = getResponseOffset(jsonBody);

return serializer.getArrayAt(jsonBody, recordsPointer)
.map(jsonRecord -> toJacksonRecord(jsonRecord, responseOffset));
}

private Map<String, Object> getResponseOffset(JsonNode node) {
return emptyMap();
}

private JacksonRecord toJacksonRecord(JsonNode jsonRecord, Map<String, Object> responseOffset) {
return JacksonRecord.builder()
.key(recordParser.getKey(jsonRecord).orElse(null))
.timestamp(recordParser.getTimestamp(jsonRecord).orElse(null))
.offset(merge(responseOffset, recordParser.getOffset(jsonRecord)))
.body(recordParser.getValue(jsonRecord))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -22,16 +22,37 @@

import com.fasterxml.jackson.core.JsonPointer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;

import java.io.IOException;
import java.util.stream.Stream;

import static com.fasterxml.jackson.core.JsonPointer.compile;
import static java.util.stream.StreamSupport.stream;

class JacksonPropertyResolver {
@RequiredArgsConstructor
class JacksonSerializer {

private static final JsonPointer JSON_ROOT = compile("/");

private final ObjectMapper objectMapper;

public JacksonSerializer() {
this(new ObjectMapper());
}

@SneakyThrows(IOException.class)
JsonNode deserialize(byte[] body) {
return objectMapper.readTree(body);
}

@SneakyThrows(IOException.class)
String serialize(JsonNode node) {
return objectMapper.writeValueAsString(node);
}

JsonNode getObjectAt(JsonNode node, JsonPointer pointer) {
return getRequiredAt(node, pointer);
}
Expand Down
Loading

0 comments on commit 5d02dd4

Please sign in to comment.