Skip to content

Commit

Permalink
Merge pull request #37 from manolama/tweaks
Browse files Browse the repository at this point in the history
Add validation around null tag strings and drop the measurements with a
  • Loading branch information
manolama authored Dec 11, 2020
2 parents 12014f5 + d758ec1 commit 60cc8e4
Show file tree
Hide file tree
Showing 14 changed files with 275 additions and 15 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ buildscript {
classpath group: 'com.bmuschko', name: 'gradle-clover-plugin', version: '2.2.2'
classpath group: 'com.github.ben-manes', name: 'gradle-versions-plugin', version: '0.20.0'
classpath group: 'me.champeau.gradle', name: 'jmh-gradle-plugin', version: '0.4.8'
classpath group: 'org.owasp', name: 'dependency-check-gradle', version: '5.2.2'
classpath group: 'org.owasp', name: 'dependency-check-gradle', version: '5.3.2.1'
classpath group: 'gradle.plugin.com.github.spotbugs', name: 'spotbugs-gradle-plugin', version: '2.0.0'
classpath group: 'org.ajoberstar.reckon', name: 'reckon-gradle', version: '0.9.0'
}
Expand Down
22 changes: 22 additions & 0 deletions core/src/main/java/io/ultrabrew/metrics/util/Strings.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2020, Oath Inc.
// Licensed under the terms of the Apache License 2.0 license. See LICENSE file in Ultrabrew Metrics
// for terms.
package io.ultrabrew.metrics.util;

public class Strings {

///CLOVER:OFF
private Strings() {
// static class
}
///CLOVER:ON

/**
* Whether or not the string is empty or null.
* @param s A string to test.
* @return True if the string is empty or null, false if not.
*/
public static boolean isNullOrEmpty(String s) {
return s == null || s.isEmpty();
}
}
19 changes: 19 additions & 0 deletions core/src/test/java/io/ultrabrew/metrics/util/StringsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2020, Oath Inc.
// Licensed under the terms of the Apache License 2.0 license. See LICENSE file in Ultrabrew Metrics
// for terms.
package io.ultrabrew.metrics.util;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;

import org.junit.jupiter.api.Test;

public class StringsTest {

@Test
public void testIsNullOrEmpty() {
assertTrue(Strings.isNullOrEmpty(null));
assertTrue(Strings.isNullOrEmpty(""));
assertFalse(Strings.isNullOrEmpty("foo"));
}
}
2 changes: 1 addition & 1 deletion examples/undertow-httphandler/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ dependencies {
compile platform(group: 'org.apache.logging.log4j', name: 'log4j-bom', version: '2.13.3')
compile group: 'org.apache.logging.log4j', name: 'log4j-core'
compile group: 'org.apache.logging.log4j', name: 'log4j-slf4j-impl'
compile group: 'io.undertow', name: 'undertow-core', version: '2.1.3.Final'
compile group: 'io.undertow', name: 'undertow-core', version: '2.2.3.Final'
}
28 changes: 28 additions & 0 deletions reporter-influxdb/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# InfluxReporter

This reporter will push batches of metrics to an InfluxDB V1x HTTP API server as JSON object. Note that a database name must be included.

Configuration parameters include:

* **baseUri** - *(required)* A hostname including protocol, host and optional port. E.g. `http://localhost:4242`. Note that the host must start with a protocol of either `http://` or `https://`.
* **endpoint** - *(Default: `/write?db=`)* A string with the endpoint to post results to. Note that the endpoint must start with a forward slash and can be `/?db=`. If this parameter is set then the `database` parameter will be ignored and must be supplied as a query parameter in the endpoint string.
* **database** - *(required)* A string denoting the InfluxDB database to send measurements to.
* **bufferSize** - *(Default: `64 * 1024`)* The maximum size of the buffer before it's flushed.
* **windowSize** - (Default: `1`) How often to report to the API in seconds.

To instantiate and run the reporter execute:

```Java
InfluxDBReporter.Builder reporter_builder =
InfluxDBReporter.builder()
.withBaseUri(URI.create("http://localhost:4242")))
.withDatabase("Ultrabrew")
.withWindowSize(60);

InfluxDBReporter reporter = reporter_builder.build();
MetricRegistry metricRegistry = new MetricRegistry();
metric_registry.addReporter(reporter);

// on program shutdown
reporter.close();
```
2 changes: 1 addition & 1 deletion reporter-influxdb/build.gradle
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
dependencies {
compile project(':core')
compile group: 'org.apache.httpcomponents', name: 'httpclient', version: '4.5.6'
compile group: 'org.apache.httpcomponents', name: 'httpclient', version: '4.5.13'
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.ultrabrew.metrics.reporters.influxdb;

import io.ultrabrew.metrics.util.Strings;
import java.util.Arrays;
import java.io.IOException;
import java.net.URI;
import java.nio.BufferOverflowException;
Expand All @@ -17,17 +19,21 @@
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class provides methods to access InfluxDB.
*/
public class InfluxDBClient {

private static final Logger LOGGER = LoggerFactory.getLogger(InfluxDBClient.class);

private static final String UTF_8 = StandardCharsets.UTF_8.name();
private static final byte WHITESPACE = ' ';
private static final byte COMMA = ',';
private static final byte EQUALS = '=';
private static final byte NEWLINE = '\n';
private static final byte[] NULL_STRING = new byte[] { 'N', 'U', 'L', 'L' };

private final ByteBuffer byteBuffer;
private final URI dbUri;
Expand Down Expand Up @@ -57,12 +63,29 @@ private CloseableHttpClient getHttpClient() {
private void doWrite(final String measurement, final String[] tags, final String[] fields,
final long timestamp)
throws IOException {
if (Strings.isNullOrEmpty(measurement)) {
LOGGER.warn("Null or empty measurement.");
return;
}
int rollback = byteBuffer.position();
byteBuffer.put(measurement.getBytes(UTF_8));
for (int i = 0; i < tags.length; i += 2) {
if (Strings.isNullOrEmpty(tags[i])) {
LOGGER.warn("Null or empty tag key in tags array: {} for measurement {}",
Arrays.toString(tags), measurement);
byteBuffer.position(rollback);
return;
}
if (Strings.isNullOrEmpty(tags[i + 1])) {
// TODO - Some users want this, some don't. Set a flag in the builder.
//LOGGER.warn("Null or empty tag value in tags array: {} for measurement {}",
// Arrays.toString(tags), measurement);
}
byteBuffer.put(COMMA)
.put(tags[i].getBytes(UTF_8))
.put(EQUALS)
.put(tags[i + 1].getBytes(UTF_8));
.put(Strings.isNullOrEmpty(tags[i + 1]) ? NULL_STRING :
tags[i + 1].getBytes(UTF_8));
}
byteBuffer.put(WHITESPACE);

Expand All @@ -71,6 +94,18 @@ private void doWrite(final String measurement, final String[] tags, final String
if (!f) {
byteBuffer.put(COMMA);
}
if (Strings.isNullOrEmpty(fields[i])) {
LOGGER.warn("Null or empty field name in array: {} for measurement {}",
Arrays.toString(fields), measurement);
byteBuffer.position(rollback);
return;
}
if (Strings.isNullOrEmpty(fields[i + 1])) {
LOGGER.warn("Null or empty field value in array: {} for measurement {}",
Arrays.toString(fields), measurement);
byteBuffer.position(rollback);
return;
}
byteBuffer.put(fields[i].getBytes(UTF_8))
.put(EQUALS)
.put(fields[i + 1].getBytes(UTF_8));
Expand Down Expand Up @@ -133,4 +168,5 @@ public void flush() throws IOException {
byteBuffer.clear();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.ultrabrew.metrics.data.CursorEntry;
import io.ultrabrew.metrics.data.Type;
import io.ultrabrew.metrics.reporters.TimeWindowReporter;
import io.ultrabrew.metrics.util.Strings;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
Expand Down Expand Up @@ -82,6 +83,7 @@ private String[] buildFields(CursorEntry cursor) {
public static class Builder extends TimeWindowReporterBuilder<Builder, InfluxDBReporter> {

private URI baseUri = null;
private String endpoint = null;
private String database = null;
private int windowSeconds = 1;
private int bufferSize = 64 * 1024;
Expand All @@ -98,6 +100,18 @@ public Builder withBaseUri(final URI baseUri) {
this.baseUri = baseUri;
return this;
}

/**
* An optional endpoint to build the URI that, when provided, overrides the
* default `/write?db=$database`. You must supply the full URI string, e.g.
* `/api/put/influx/write?db=mydatabase`.
* @param endpoint An optional non-null endpoint to apply to the base URI.
* @return The builder.
*/
public Builder withEndpoint(final String endpoint) {
this.endpoint = endpoint;
return this;
}

/**
* Set the database name measurements are to be written to.
Expand Down Expand Up @@ -137,9 +151,14 @@ public InfluxDBReporter build() {
if (baseUri == null) {
throw new IllegalArgumentException("Invalid baseUri");
}
if (database == null || database.isEmpty()) {
if (Strings.isNullOrEmpty(database) &&
Strings.isNullOrEmpty(endpoint)) {
throw new IllegalArgumentException("Invalid database");
}
if (!Strings.isNullOrEmpty(endpoint)) {
return new InfluxDBReporter(baseUri.resolve(endpoint), windowSeconds,
bufferSize, defaultAggregators, metricAggregators);
}
return new InfluxDBReporter(baseUri.resolve("/write?db=" + database), windowSeconds,
bufferSize, defaultAggregators, metricAggregators);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,37 @@ public void testWriteSplitting() throws IOException {
}};
}

@Test
public void testNullStrings() throws IOException {
new Expectations() {{
httpClient.execute((HttpUriRequest) any);
result = closeableHttpResponse;
closeableHttpResponse.getEntity();
result = new BasicHttpEntity();
closeableHttpResponse.getStatusLine();
result = statusLine;
statusLine.getStatusCode();
result = 200;
}};
client.write(null, new String[] { "host", "web01" }, new String[] { "temp", "80" }, 1534055562000000003L);
client.write("cpu_load_short", new String[] { null, "web01" }, new String[] { "temp", "80" }, 1534055562000000003L);
// one good one in the middle
client.write("cpu_load_short", new String[] { "host", "web01" }, new String[] { "temp", "80" }, 1534055562000000003L);
// this is ok too
client.write("cpu_load_short", new String[] { "host", null }, new String[] { "temp", "80" }, 1534055562000000003L);
client.write("cpu_load_short", new String[] { "host", "web01" }, new String[] { null, "80" }, 1534055562000000003L);
client.write("cpu_load_short", new String[] { "host", "web01" }, new String[] { "temp", null }, 1534055562000000003L);
client.flush();
new Verifications() {{
HttpPost request;
httpClient.execute(request = withCapture());
times = 1;
assertEquals("cpu_load_short,host=web01 temp=80 1534055562000000003\n" +
"cpu_load_short,host=NULL temp=80 1534055562000000003\n",
EntityUtils.toString(request.getEntity()));
}};
}

@Test
public void testTooLargeMeasurement() throws IOException {
InfluxDBClient c = new InfluxDBClient(URI.create("http://localhost:8086/write?db=test"), 10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import mockit.Verifications;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.CloseableHttpClient;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -64,6 +65,42 @@ public void testSetBufferSize() {
ByteBuffer buffer = Deencapsulation.getField(c, "byteBuffer");
assertEquals(12765, buffer.capacity());
}

@Test
public void testSeEndpoint(@Mocked CloseableHttpClient httpClient,
@Mocked CloseableHttpResponse closeableHttpResponse, @Mocked StatusLine statusLine)
throws InterruptedException, IOException {
new Expectations() {{
httpClient.execute((HttpUriRequest) any);
result = closeableHttpResponse;
closeableHttpResponse.getStatusLine();
result = statusLine;
statusLine.getStatusCode();
result = 200;
}};

MetricRegistry registry = new MetricRegistry();
InfluxDBReporter r = InfluxDBReporter.builder()
.withBaseUri(TEST_URI)
.withDatabase("test") // ignored
.withEndpoint("/my/change?db=foo")
.withBufferSize(12765)
.build();

registry.addReporter(r);

Counter counter = registry.counter("counter");
counter.inc("tag", "value");

Thread.sleep(3000);

new Verifications() {{
HttpPost request;
httpClient.execute(request = withCapture());
times = 1;
assertEquals(TEST_URI + "/my/change?db=foo", request.getURI().toString());
}};
}

@Test
public void testReporting(@Mocked CloseableHttpClient httpClient,
Expand Down
21 changes: 14 additions & 7 deletions reporter-opentsdb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,23 @@ Configuration parameters include:
* **endpoint** - *(Default: `/api/put`)* A string with the endpoint to post results to. Note that the endpoint must start with a forward slash and can be `/`.
* **batchSize** - *(Default: `64`)* The maximum number of measurements to flush in each batch.
* **timestampsInMilliseconds** - (Default: `false`) Whether or not to post timestamps in seconds `false` or milliseconds `true`.
* **windowSize** - (Default: `1`) How often to report to the API in seconds.

To instantiate and run the reporter execute:

```Java
OpenTSDBConfig config = OpenTSDBConfig.newBuilder()
.setHost("http://localhost:4242")
.build();

// 60 second reporting window
OpenTSDBReporter reporter = new OpenTSDBReporter(config, 60);
OpenTSDBReporter.Builder reporter_builder =
OpenTSDBReporter.builder()
.withBaseUri(URI.create("http://localhost:4242")))
.withBatchSize(64)
.withWindowSize(60);
if (!Strings.isNullOrEmpty(tsdb.getConfig().getString(TSD_ENDPOINT))) {
reporter_builder.withApiEndpoint("/proxy/opentsdb/api/put);
}
OpenTSDBReporter opentsdb_reporter = reporter_builder.build();
MetricRegistry metricRegistry = new MetricRegistry();
metricRegistry.addReporter(reporter);
metric_registry.addReporter(opentsdb_reporter);
// on program shutdown
opentsdb_reporter.close();
```
2 changes: 1 addition & 1 deletion reporter-opentsdb/build.gradle
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
dependencies {
compile project(':core')
compile group: 'org.apache.httpcomponents', name: 'httpclient', version: '4.5.6'
compile group: 'org.apache.httpcomponents', name: 'httpclient', version: '4.5.13'
}
Loading

0 comments on commit 60cc8e4

Please sign in to comment.