diff --git a/CHANGELOG.md b/CHANGELOG.md index 725aa30bc..2ece37bd0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,14 @@ ## 0.32.0 * Dependency updates (JMX exporter 1.1.0) +* Added support for the [Strimzi Metrics Reporter](https://github.com/strimzi/metrics-reporter) metrics. + This is a Kafka plugin that exports metrics in Prometheus format without passing through JMX, and can be enabled by setting `bridge.metrics=strimziMetricsReporter`. + The JMX Exporter metrics can still be enabled by setting `bridge.metrics=jmxPrometheusExporter`. + +### Changes, deprecations and removals + +* `KAFKA_BRIDGE_METRICS_ENABLED` has been deprecated. + The equivalent configuration is `bridge.metrics=jmxPrometheusExporter`. ## 0.31.1 diff --git a/bin/docker/kafka_bridge_config_generator.sh b/bin/docker/kafka_bridge_config_generator.sh index 0ee64e420..a136ca53a 100755 --- a/bin/docker/kafka_bridge_config_generator.sh +++ b/bin/docker/kafka_bridge_config_generator.sh @@ -1,5 +1,9 @@ #!/usr/bin/env bash +if [ -n "$STRIMZI_METRICS" ]; then + BRIDGE_METRICS="bridge.metrics=${STRIMZI_METRICS}" +fi + if [ -n "$STRIMZI_TRACING" ]; then BRIDGE_TRACING="bridge.tracing=${STRIMZI_TRACING}" fi diff --git a/config/application.properties b/config/application.properties index 423b17886..192f54c13 100644 --- a/config/application.properties +++ b/config/application.properties @@ -1,5 +1,15 @@ #Bridge related settings bridge.id=my-bridge + +# uncomment the following line to enable JMX Exporter metrics, check the documentation for more details +#bridge.metrics=jmxPrometheusExporter + +# uncomment the following lines to enable Strimzi Reporter metrics, check the documentation for more details +#bridge.metrics=strimziMetricsReporter +#kafka.metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter +#kafka.prometheus.metrics.reporter.listener.enable=false +#kafka.prometheus.metrics.reporter.allowlist=.* + # uncomment the following line (bridge.tracing) to enable OpenTelemetry tracing, check the documentation for more details #bridge.tracing=opentelemetry diff --git a/documentation/assemblies/assembly-kafka-bridge-config.adoc b/documentation/assemblies/assembly-kafka-bridge-config.adoc index 9cb1238f0..2288a0857 100644 --- a/documentation/assemblies/assembly-kafka-bridge-config.adoc +++ b/documentation/assemblies/assembly-kafka-bridge-config.adoc @@ -14,5 +14,6 @@ Distributed tracing allows you to track the progress of transactions between app NOTE: Use the `KafkaBridge` resource to configure properties when you are xref:overview-components-running-kafka-bridge-cluster-{context}[running the Kafka Bridge on Kubernetes]. include::modules/proc-configuring-kafka-bridge.adoc[leveloffset=+1] -include::modules/proc-configuring-kafka-bridge-metrics.adoc[leveloffset=+1] -include::modules/proc-configuring-kafka-bridge-tracing.adoc[leveloffset=+1] \ No newline at end of file +include::modules/proc-configuring-kafka-bridge-jmx-metrics.adoc[leveloffset=+1] +include::modules/proc-configuring-kafka-bridge-smr-metrics.adoc[leveloffset=+1] +include::modules/proc-configuring-kafka-bridge-tracing.adoc[leveloffset=+1] diff --git a/documentation/modules/proc-configuring-kafka-bridge-metrics.adoc b/documentation/modules/proc-configuring-kafka-bridge-jmx-metrics.adoc similarity index 60% rename from documentation/modules/proc-configuring-kafka-bridge-metrics.adoc rename to documentation/modules/proc-configuring-kafka-bridge-jmx-metrics.adoc index 94a9c37dd..0c7744095 100644 --- a/documentation/modules/proc-configuring-kafka-bridge-metrics.adoc +++ b/documentation/modules/proc-configuring-kafka-bridge-jmx-metrics.adoc @@ -1,8 +1,8 @@ -[id='proc-configuring-kafka-bridge-metrics-{context}'] -= Configuring metrics +[id='proc-configuring-kafka-bridge-jmx-metrics-{context}'] += Configuring JMX Exporter metrics [role="_abstract"] -Enable metrics for the Kafka Bridge by setting the `KAFKA_BRIDGE_METRICS_ENABLED` environment variable. +Enable metrics for the Kafka Bridge by setting the `bridge.metrics` configuration. .Prerequisites @@ -10,13 +10,13 @@ Enable metrics for the Kafka Bridge by setting the `KAFKA_BRIDGE_METRICS_ENABLED .Procedure -. Set the environment variable for enabling metrics to `true`. +. Set the `bridge.metrics` configuration to `jmxPrometheusExporter`. + -.Environment variable for enabling metrics +.Configuration for enabling metrics [source,properties] ---- -KAFKA_BRIDGE_METRICS_ENABLED=true +bridge.metrics=jmxPrometheusExporter ---- . Run the Kafka Bridge script to enable metrics. @@ -27,4 +27,4 @@ KAFKA_BRIDGE_METRICS_ENABLED=true ./bin/kafka_bridge_run.sh --config-file=/application.properties ---- + -With metrics enabled, you can use `GET /metrics` with the `/metrics` endpoint to retrieve Kafka Bridge metrics in Prometheus format. \ No newline at end of file +With metrics enabled, you can use `GET /metrics` with the `/metrics` endpoint to retrieve Kafka Bridge metrics in Prometheus format. diff --git a/documentation/modules/proc-configuring-kafka-bridge-smr-metrics.adoc b/documentation/modules/proc-configuring-kafka-bridge-smr-metrics.adoc new file mode 100644 index 000000000..19fda8cc9 --- /dev/null +++ b/documentation/modules/proc-configuring-kafka-bridge-smr-metrics.adoc @@ -0,0 +1,35 @@ +[id='proc-configuring-kafka-bridge-smr-metrics-{context}'] += Configuring Strimzi Metrics Reporter metrics + +[role="_abstract"] +Enable metrics for the Kafka Bridge by setting the `bridge.metrics` configuration. + +.Prerequisites + +* xref:proc-downloading-kafka-bridge-{context}[The Kafka Bridge installation archive is downloaded]. + +.Procedure + +. Set the `bridge.metrics` configuration to `strimziMetricsReporter` and related configuration. ++ +.Configuration for enabling metrics + +[source,properties] +---- +bridge.metrics=strimziMetricsReporter +kafka.metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter +kafka.prometheus.metrics.reporter.listener.enable=false +kafka.prometheus.metrics.reporter.allowlist=.* +---- ++ +See the https://github.com/strimzi/metrics-reporter[Strimzi Metrics Reporter documentation] for more details. + +. Run the Kafka Bridge script to enable metrics. ++ +.Running the Kafka Bridge to enable metrics +[source,shell] +---- +./bin/kafka_bridge_run.sh --config-file=/application.properties +---- ++ +With metrics enabled, you can use `GET /metrics` with the `/metrics` endpoint to retrieve Kafka Bridge metrics in Prometheus format. diff --git a/pom.xml b/pom.xml index 0e7fa507d..c3dfc4983 100644 --- a/pom.xml +++ b/pom.xml @@ -108,8 +108,9 @@ 1.1.0 3.3.0 10.12.2 - 2.2 5.8.2 + 2.2 + 4.11.0 3.3.0 3.0.0-M7 3.0.0-M7 @@ -126,14 +127,15 @@ 4.7.3 4.7.3.0 0.15.0 + 0.1.0 1.34.1 1.34.1-alpha 1.32.0-alpha 1.21.0-alpha 1.61.0 1.12.3 - 1.1.0 1.3.4 + 1.1.0 1.4 0.109.0 2.3.2 @@ -291,20 +293,35 @@ ${micrometer.version} - io.prometheus.jmx - collector - ${jmx-prometheus-collector.version} + io.strimzi + metrics-reporter + ${strimzi-metrics-reporter.version} io.prometheus prometheus-metrics-model ${prometheus-client.version} + + io.prometheus + prometheus-metrics-instrumentation-jvm + ${prometheus-client.version} + + + io.prometheus + prometheus-metrics-exporter-httpserver + ${prometheus-client.version} + io.prometheus prometheus-metrics-exposition-textformats ${prometheus-client.version} + + io.prometheus.jmx + collector + ${jmx-prometheus-collector.version} + commons-cli commons-cli @@ -416,6 +433,12 @@ ${hamcrest.version} test + + org.mockito + mockito-core + ${mockito.version} + test + io.strimzi strimzi-test-container @@ -579,6 +602,10 @@ io.grpc:grpc-netty-shaded:jar com.google.guava:guava + + io.strimzi:metrics-reporter + io.prometheus:prometheus-metrics-instrumentation-jvm + io.prometheus:prometheus-metrics-exporter-httpserver diff --git a/src/main/java/io/strimzi/kafka/bridge/Application.java b/src/main/java/io/strimzi/kafka/bridge/Application.java index f0553ce82..4619f42a7 100644 --- a/src/main/java/io/strimzi/kafka/bridge/Application.java +++ b/src/main/java/io/strimzi/kafka/bridge/Application.java @@ -5,10 +5,13 @@ package io.strimzi.kafka.bridge; -import io.micrometer.core.instrument.MeterRegistry; import io.strimzi.kafka.bridge.config.BridgeConfig; import io.strimzi.kafka.bridge.config.ConfigRetriever; import io.strimzi.kafka.bridge.http.HttpBridge; +import io.strimzi.kafka.bridge.metrics.JmxCollectorRegistry; +import io.strimzi.kafka.bridge.metrics.MetricsReporter; +import io.strimzi.kafka.bridge.metrics.MetricsType; +import io.strimzi.kafka.bridge.metrics.StrimziCollectorRegistry; import io.strimzi.kafka.bridge.tracing.TracingUtil; import io.vertx.core.Future; import io.vertx.core.Promise; @@ -18,7 +21,6 @@ import io.vertx.micrometer.MetricsDomain; import io.vertx.micrometer.MicrometerMetricsOptions; import io.vertx.micrometer.VertxPrometheusOptions; -import io.vertx.micrometer.backends.BackendRegistries; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Option; @@ -46,8 +48,6 @@ public class Application { private static final Logger LOGGER = LogManager.getLogger(Application.class); - private static final String KAFKA_BRIDGE_METRICS_ENABLED = "KAFKA_BRIDGE_METRICS_ENABLED"; - /** * Bridge entrypoint * @@ -56,27 +56,12 @@ public class Application { public static void main(String[] args) { LOGGER.info("Strimzi Kafka Bridge {} is starting", Application.class.getPackage().getImplementationVersion()); try { - VertxOptions vertxOptions = new VertxOptions(); - JmxCollectorRegistry jmxCollectorRegistry = null; - if (Boolean.parseBoolean(System.getenv(KAFKA_BRIDGE_METRICS_ENABLED))) { - LOGGER.info("Metrics enabled and exposed on the /metrics endpoint"); - // setup Micrometer metrics options - vertxOptions.setMetricsOptions(metricsOptions()); - jmxCollectorRegistry = getJmxCollectorRegistry(); - } - Vertx vertx = Vertx.vertx(vertxOptions); - // MeterRegistry default instance is just null if metrics are not enabled in the VertxOptions instance - MeterRegistry meterRegistry = BackendRegistries.getDefaultNow(); - MetricsReporter metricsReporter = new MetricsReporter(jmxCollectorRegistry, meterRegistry); - - CommandLine commandLine = new DefaultParser().parse(generateOptions(), args); - Map config = ConfigRetriever.getConfig(absoluteFilePath(commandLine.getOptionValue("config-file"))); BridgeConfig bridgeConfig = BridgeConfig.fromMap(config); LOGGER.info("Bridge configuration {}", bridgeConfig); - deployHttpBridge(vertx, bridgeConfig, metricsReporter).onComplete(done -> { + deployHttpBridge(bridgeConfig).onComplete(done -> { if (done.succeeded()) { // register tracing - if set, etc TracingUtil.initialize(bridgeConfig); @@ -88,40 +73,26 @@ public static void main(String[] args) { } } - /** - * Set up the Vert.x metrics options - * - * @return instance of the MicrometerMetricsOptions on Vert.x - */ - private static MicrometerMetricsOptions metricsOptions() { - Set set = new HashSet<>(); - set.add(MetricsDomain.NAMED_POOLS.name()); - set.add(MetricsDomain.VERTICLES.name()); - return new MicrometerMetricsOptions() - .setPrometheusOptions(new VertxPrometheusOptions().setEnabled(true)) - // define the labels on the HTTP server related metrics - .setLabels(EnumSet.of(Label.HTTP_PATH, Label.HTTP_METHOD, Label.HTTP_CODE)) - // disable metrics about pool and verticles - .setDisabledMetricsCategories(set) - .setJvmMetricsEnabled(true) - .setEnabled(true); - } - /** * Deploys the HTTP bridge into a new verticle * - * @param vertx Vertx instance * @param bridgeConfig Bridge configuration - * @param metricsReporter MetricsReporter instance for scraping metrics from different registries * @return Future for the bridge startup */ - private static Future deployHttpBridge(Vertx vertx, BridgeConfig bridgeConfig, MetricsReporter metricsReporter) { + private static Future deployHttpBridge(BridgeConfig bridgeConfig) + throws MalformedObjectNameException, IOException { Promise httpPromise = Promise.promise(); + Vertx vertx = createVertxInstance(bridgeConfig); + MetricsReporter metricsReporter = getMetricsReporter(bridgeConfig); HttpBridge httpBridge = new HttpBridge(bridgeConfig, metricsReporter); + vertx.deployVerticle(httpBridge, done -> { if (done.succeeded()) { LOGGER.info("HTTP verticle instance deployed [{}]", done.result()); + if (metricsReporter != null) { + LOGGER.info("Metrics of type '{}' enabled and exposed on /metrics endpoint", bridgeConfig.getMetrics()); + } httpPromise.complete(httpBridge); } else { LOGGER.error("Failed to deploy HTTP verticle instance", done.cause()); @@ -132,6 +103,46 @@ private static Future deployHttpBridge(Vertx vertx, BridgeConfig bri return httpPromise.future(); } + private static Vertx createVertxInstance(BridgeConfig bridgeConfig) { + VertxOptions vertxOptions = new VertxOptions(); + if (bridgeConfig.getMetrics() != null) { + vertxOptions.setMetricsOptions(metricsOptions()); // enable Vertx metrics + } + Vertx vertx = Vertx.vertx(vertxOptions); + return vertx; + } + + /** + * Set up the Vert.x metrics options + * + * @return instance of the MicrometerMetricsOptions on Vert.x + */ + private static MicrometerMetricsOptions metricsOptions() { + Set set = new HashSet<>(); + set.add(MetricsDomain.NAMED_POOLS.name()); + set.add(MetricsDomain.VERTICLES.name()); + return new MicrometerMetricsOptions() + .setPrometheusOptions(new VertxPrometheusOptions().setEnabled(true)) + // define the labels on the HTTP server related metrics + .setLabels(EnumSet.of(Label.HTTP_PATH, Label.HTTP_METHOD, Label.HTTP_CODE)) + // disable metrics about pool and verticles + .setDisabledMetricsCategories(set) + .setJvmMetricsEnabled(true) + .setEnabled(true); + } + + private static MetricsReporter getMetricsReporter(BridgeConfig bridgeConfig) + throws MalformedObjectNameException, IOException { + if (bridgeConfig.getMetrics() != null) { + if (bridgeConfig.getMetrics().equals(MetricsType.JMX_EXPORTER.toString())) { + return new MetricsReporter(getJmxCollectorRegistry()); + } else if (bridgeConfig.getMetrics().equals(MetricsType.STRIMZI_REPORTER.toString())) { + return new MetricsReporter(new StrimziCollectorRegistry()); + } + } + return null; + } + /** * Return a JmxCollectorRegistry instance with the YAML configuration filters * @@ -139,8 +150,7 @@ private static Future deployHttpBridge(Vertx vertx, BridgeConfig bri * @throws MalformedObjectNameException * @throws IOException */ - private static JmxCollectorRegistry getJmxCollectorRegistry() - throws MalformedObjectNameException, IOException { + private static JmxCollectorRegistry getJmxCollectorRegistry() throws MalformedObjectNameException, IOException { InputStream is = Application.class.getClassLoader().getResourceAsStream("jmx_metrics_config.yaml"); if (is == null) { return null; @@ -160,7 +170,6 @@ private static JmxCollectorRegistry getJmxCollectorRegistry() * @return command line options */ private static Options generateOptions() { - Option configFileOption = Option.builder() .required(true) .hasArg(true) diff --git a/src/main/java/io/strimzi/kafka/bridge/MetricsReporter.java b/src/main/java/io/strimzi/kafka/bridge/MetricsReporter.java deleted file mode 100644 index ab8e2f315..000000000 --- a/src/main/java/io/strimzi/kafka/bridge/MetricsReporter.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright Strimzi authors. - * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). - */ - -package io.strimzi.kafka.bridge; - -import io.micrometer.core.instrument.Meter; -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.prometheus.PrometheusMeterRegistry; -import io.micrometer.prometheus.PrometheusNamingConvention; - -/** - * Used for scraping and reporting metrics in Prometheus format - */ -public class MetricsReporter { - - private final JmxCollectorRegistry jmxCollectorRegistry; - private final MeterRegistry meterRegistry; - - /** - * Constructor - * - * @param jmxCollectorRegistry JmxCollectorRegistry instance for scraping metrics from JMX endpoints - * @param meterRegistry MeterRegistry instance for scraping metrics exposed through Vert.x - */ - public MetricsReporter(JmxCollectorRegistry jmxCollectorRegistry, MeterRegistry meterRegistry) { - this.jmxCollectorRegistry = jmxCollectorRegistry; - this.meterRegistry = meterRegistry; - if (this.meterRegistry instanceof PrometheusMeterRegistry) { - this.meterRegistry.config().namingConvention(new MetricsNamingConvention()); - } - } - - private static class MetricsNamingConvention extends PrometheusNamingConvention { - @Override - public String name(String name, Meter.Type type, String baseUnit) { - String metricName = name.startsWith("vertx.") ? name.replace("vertx.", "strimzi.bridge.") : name; - return super.name(metricName, type, baseUnit); - } - } - - /** - * @return JmxCollectorRegistry instance for scraping metrics from JMX endpoints - */ - public JmxCollectorRegistry getJmxCollectorRegistry() { - return jmxCollectorRegistry; - } - - /** - * @return MeterRegistry instance for scraping metrics exposed through Vert.x - */ - public MeterRegistry getMeterRegistry() { - return meterRegistry; - } - - /** - * Scrape metrics on the provided registries returning them in the Prometheus format - * - * @return metrics in Prometheus format as String - */ - public String scrape() { - StringBuilder sb = new StringBuilder(); - if (jmxCollectorRegistry != null) { - sb.append(jmxCollectorRegistry.scrape()); - } - if (meterRegistry instanceof PrometheusMeterRegistry) { - sb.append(((PrometheusMeterRegistry) meterRegistry).scrape()); - } - return sb.toString(); - } -} diff --git a/src/main/java/io/strimzi/kafka/bridge/config/BridgeConfig.java b/src/main/java/io/strimzi/kafka/bridge/config/BridgeConfig.java index 919c7f924..d80470732 100644 --- a/src/main/java/io/strimzi/kafka/bridge/config/BridgeConfig.java +++ b/src/main/java/io/strimzi/kafka/bridge/config/BridgeConfig.java @@ -6,14 +6,19 @@ package io.strimzi.kafka.bridge.config; import io.strimzi.kafka.bridge.http.HttpConfig; +import io.strimzi.kafka.bridge.metrics.MetricsType; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; /** * Bridge configuration properties */ public class BridgeConfig extends AbstractConfig { + private static final Logger LOGGER = LogManager.getLogger(BridgeConfig.class); /** Prefix for all the specific bridge configuration parameters */ public static final String BRIDGE_CONFIG_PREFIX = "bridge."; @@ -21,6 +26,9 @@ public class BridgeConfig extends AbstractConfig { /** Bridge identification number */ public static final String BRIDGE_ID = BRIDGE_CONFIG_PREFIX + "id"; + /** Metrics system to be used in the bridge */ + public static final String METRICS_TYPE = BRIDGE_CONFIG_PREFIX + "metrics"; + /** Tracing system to be used in the bridge */ public static final String TRACING_TYPE = BRIDGE_CONFIG_PREFIX + "tracing"; @@ -63,13 +71,26 @@ public HttpConfig getHttpConfig() { public static BridgeConfig fromMap(Map map) { KafkaConfig kafkaConfig = KafkaConfig.fromMap(map); HttpConfig httpConfig = HttpConfig.fromMap(map); - + validateMetricsType(map); return new BridgeConfig(map.entrySet().stream() .filter(e -> e.getKey().startsWith(BridgeConfig.BRIDGE_CONFIG_PREFIX)) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)), kafkaConfig, httpConfig); } + private static void validateMetricsType(Map map) { + String metricsTypeValue = (String) map.get(METRICS_TYPE); + if (metricsTypeValue != null) { + if (!metricsTypeValue.equals(MetricsType.JMX_EXPORTER.toString()) + && !metricsTypeValue.equals(MetricsType.STRIMZI_REPORTER.toString())) { + throw new IllegalArgumentException( + String.format("Invalid %s configuration, choose one of %s and %s", + METRICS_TYPE, MetricsType.JMX_EXPORTER, MetricsType.STRIMZI_REPORTER) + ); + } + } + } + @Override public String toString() { return "BridgeConfig(" + @@ -90,6 +111,19 @@ public String getBridgeID() { } } + /** + * @return the metric system to be used in the bridge + */ + public String getMetrics() { + final String envVarValue = System.getenv("KAFKA_BRIDGE_METRICS_ENABLED"); + if (envVarValue != null) { + LOGGER.warn("KAFKA_BRIDGE_METRICS_ENABLED is deprecated, use bridge.metrics configuration"); + } + return (String) Optional.ofNullable(config.get(BridgeConfig.METRICS_TYPE)) + .orElse(Boolean.parseBoolean(envVarValue) + ? MetricsType.JMX_EXPORTER.toString() : null); + } + /** * @return the tracing system to be used in the bridge */ diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java index def1e5e75..7357a10e5 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java @@ -15,10 +15,10 @@ import io.strimzi.kafka.bridge.ConsumerInstanceId; import io.strimzi.kafka.bridge.EmbeddedFormat; import io.strimzi.kafka.bridge.IllegalEmbeddedFormatException; -import io.strimzi.kafka.bridge.MetricsReporter; import io.strimzi.kafka.bridge.config.BridgeConfig; import io.strimzi.kafka.bridge.http.converter.JsonUtils; import io.strimzi.kafka.bridge.http.model.HttpBridgeError; +import io.strimzi.kafka.bridge.metrics.MetricsReporter; import io.vertx.core.AbstractVerticle; import io.vertx.core.Promise; import io.vertx.core.file.FileSystem; @@ -166,7 +166,9 @@ public void start(Promise startPromise) { routerBuilder.operation(this.OPENAPI.getOperationId().toString()).handler(this.OPENAPI); routerBuilder.operation(this.OPENAPIV2.getOperationId().toString()).handler(this.OPENAPIV2); routerBuilder.operation(this.OPENAPIV3.getOperationId().toString()).handler(this.OPENAPIV3); - routerBuilder.operation(this.METRICS.getOperationId().toString()).handler(this.METRICS); + if (metricsReporter != null) { + routerBuilder.operation(this.METRICS.getOperationId().toString()).handler(this.METRICS); + } routerBuilder.operation(this.INFO.getOperationId().toString()).handler(this.INFO); if (this.bridgeConfig.getHttpConfig().isCorsEnabled()) { routerBuilder.rootHandler(getCorsHandler()); @@ -180,10 +182,11 @@ public void start(Promise startPromise) { // handling validation errors and not existing endpoints this.router.errorHandler(HttpResponseStatus.BAD_REQUEST.code(), this::errorHandler); this.router.errorHandler(HttpResponseStatus.NOT_FOUND.code(), this::errorHandler); + this.router.errorHandler(HttpResponseStatus.NOT_IMPLEMENTED.code(), this::errorHandler); - if (this.metricsReporter.getMeterRegistry() != null) { + if (this.metricsReporter != null && this.metricsReporter.getVertxRegistry() != null) { // exclude to report the HTTP server metrics for the /metrics endpoint itself - this.metricsReporter.getMeterRegistry().config().meterFilter( + this.metricsReporter.getVertxRegistry().config().meterFilter( MeterFilter.deny(meter -> "/metrics".equals(meter.getTag(Label.HTTP_PATH.toString()))) ); } @@ -610,6 +613,8 @@ private void errorHandler(RoutingContext routingContext) { } } else if (routingContext.statusCode() == HttpResponseStatus.NOT_FOUND.code()) { message = HttpResponseStatus.NOT_FOUND.reasonPhrase(); + } else if (routingContext.statusCode() == HttpResponseStatus.NOT_IMPLEMENTED.code()) { + message = HttpResponseStatus.NOT_IMPLEMENTED.reasonPhrase(); } HttpBridgeError error = new HttpBridgeError(routingContext.statusCode(), message); diff --git a/src/main/java/io/strimzi/kafka/bridge/JmxCollectorRegistry.java b/src/main/java/io/strimzi/kafka/bridge/metrics/JmxCollectorRegistry.java similarity index 54% rename from src/main/java/io/strimzi/kafka/bridge/JmxCollectorRegistry.java rename to src/main/java/io/strimzi/kafka/bridge/metrics/JmxCollectorRegistry.java index 27107d3d6..72bbc35ed 100644 --- a/src/main/java/io/strimzi/kafka/bridge/JmxCollectorRegistry.java +++ b/src/main/java/io/strimzi/kafka/bridge/metrics/JmxCollectorRegistry.java @@ -3,7 +3,7 @@ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). */ -package io.strimzi.kafka.bridge; +package io.strimzi.kafka.bridge.metrics; import io.prometheus.jmx.JmxCollector; import io.prometheus.metrics.expositionformats.PrometheusTextFormatWriter; @@ -11,48 +11,50 @@ import javax.management.MalformedObjectNameException; import java.io.ByteArrayOutputStream; -import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; /** - * Allow to collect JMX metrics exposing them in the Prometheus format + * Allow to collect JMX metrics exposing them in the Prometheus format. */ public class JmxCollectorRegistry { - private final PrometheusRegistry collectorRegistry; - private final PrometheusTextFormatWriter textFormatter = new PrometheusTextFormatWriter(true); + private final PrometheusRegistry registry; + private final PrometheusTextFormatWriter textFormatter; /** - * Constructor + * Constructor. * * @param yamlConfig YAML configuration string with metrics filtering rules * @throws MalformedObjectNameException Throws MalformedObjectNameException */ public JmxCollectorRegistry(String yamlConfig) throws MalformedObjectNameException { - new JmxCollector(yamlConfig).register(); - collectorRegistry = PrometheusRegistry.defaultRegistry; + // note that Prometheus default registry is a singleton, so it is shared with JmxCollector + this(new JmxCollector(yamlConfig), PrometheusRegistry.defaultRegistry, new PrometheusTextFormatWriter(true)); } /** - * Constructor + * Constructor. * - * @param yamlFileConfig file containing the YAML configuration with metrics filtering rules - * @throws MalformedObjectNameException Throws MalformedObjectNameException - * @throws IOException Throws IOException + * @param jmxCollector JMX collector registry + * @param registry Prometheus collector registry + * @param textFormatter Prometheus text formatter */ - public JmxCollectorRegistry(File yamlFileConfig) throws MalformedObjectNameException, IOException { - new JmxCollector(yamlFileConfig).register(); - collectorRegistry = PrometheusRegistry.defaultRegistry; + /* test */ JmxCollectorRegistry(JmxCollector jmxCollector, + PrometheusRegistry registry, + PrometheusTextFormatWriter textFormatter) { + jmxCollector.register(); + this.registry = registry; + this.textFormatter = textFormatter; } /** - * @return Content that should be included in the response body for an endpoint designated for - * Prometheus to scrape from. + * @return Content that should be included in the response body for + * an endpoint designated for Prometheus to scrape from. */ public String scrape() { ByteArrayOutputStream stream = new ByteArrayOutputStream(); try { - textFormatter.write(stream, collectorRegistry.scrape()); + textFormatter.write(stream, registry.scrape()); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/src/main/java/io/strimzi/kafka/bridge/metrics/MetricsReporter.java b/src/main/java/io/strimzi/kafka/bridge/metrics/MetricsReporter.java new file mode 100644 index 000000000..f81139a67 --- /dev/null +++ b/src/main/java/io/strimzi/kafka/bridge/metrics/MetricsReporter.java @@ -0,0 +1,94 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.metrics; + +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.prometheus.PrometheusMeterRegistry; +import io.micrometer.prometheus.PrometheusNamingConvention; +import io.vertx.micrometer.backends.BackendRegistries; + +/** + * Used for scraping and reporting metrics in Prometheus format. + */ +public class MetricsReporter { + private final JmxCollectorRegistry jmxRegistry; + private final StrimziCollectorRegistry strimziRegistry; + private final PrometheusMeterRegistry vertxRegistry; + + /** + * Constructor. + * + * @param jmxRegistry Registry instance for scraping metrics from JMX endpoints + */ + public MetricsReporter(JmxCollectorRegistry jmxRegistry) { + // default registry is null if metrics are not enabled in the VertxOptions instance + this(jmxRegistry, null, (PrometheusMeterRegistry) BackendRegistries.getDefaultNow()); + } + + /** + * Constructor. + * + * @param strimziRegistry Registry instance for scraping metrics from Strimzi Metrics Reporter + */ + public MetricsReporter(StrimziCollectorRegistry strimziRegistry) { + // default registry is null if metrics are not enabled in the VertxOptions instance + this(null, strimziRegistry, (PrometheusMeterRegistry) BackendRegistries.getDefaultNow()); + } + + /** + * Constructor. + * + * @param jmxRegistry Registry instance for scraping metrics from JMX endpoints + * @param strimziRegistry Registry instance for scraping metrics from Strimzi Metrics Reporter + * @param vertxRegistry Registry instance for scraping metrics exposed through Vert.x + */ + public MetricsReporter(JmxCollectorRegistry jmxRegistry, + StrimziCollectorRegistry strimziRegistry, + PrometheusMeterRegistry vertxRegistry) { + this.jmxRegistry = jmxRegistry; + this.strimziRegistry = strimziRegistry; + this.vertxRegistry = vertxRegistry; + if (vertxRegistry != null) { + this.vertxRegistry.config().namingConvention(new MetricsNamingConvention()); + } + } + + private static class MetricsNamingConvention extends PrometheusNamingConvention { + @Override + public String name(String name, Meter.Type type, String baseUnit) { + String metricName = name.startsWith("vertx.") ? name.replace("vertx.", "strimzi.bridge.") : name; + return super.name(metricName, type, baseUnit); + } + } + + /** + * @return Registry instance for scraping metrics exposed through Vert.x. + * This is null if metrics are not enabled in the VertxOptions instance. + */ + public MeterRegistry getVertxRegistry() { + return vertxRegistry; + } + + /** + * Scrape metrics on the provided registries returning them in the Prometheus format + * + * @return metrics in Prometheus format as String + */ + public String scrape() { + StringBuilder sb = new StringBuilder(); + if (jmxRegistry != null) { + sb.append(jmxRegistry.scrape()); + } + if (strimziRegistry != null) { + sb.append(strimziRegistry.scrape()); + } + if (vertxRegistry != null) { + sb.append(vertxRegistry.scrape()); + } + return sb.toString(); + } +} diff --git a/src/main/java/io/strimzi/kafka/bridge/metrics/MetricsType.java b/src/main/java/io/strimzi/kafka/bridge/metrics/MetricsType.java new file mode 100644 index 000000000..97d13d465 --- /dev/null +++ b/src/main/java/io/strimzi/kafka/bridge/metrics/MetricsType.java @@ -0,0 +1,30 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.bridge.metrics; + +/** + * Metrics type. + */ +public enum MetricsType { + /** JMX Prometheus Exporter. */ + JMX_EXPORTER("jmxPrometheusExporter"), + + /** Strimzi Metrics Reporter. */ + STRIMZI_REPORTER("strimziMetricsReporter"); + + private final String text; + + /** + * @param text + */ + MetricsType(final String text) { + this.text = text; + } + + @Override + public String toString() { + return text; + } +} diff --git a/src/main/java/io/strimzi/kafka/bridge/metrics/StrimziCollectorRegistry.java b/src/main/java/io/strimzi/kafka/bridge/metrics/StrimziCollectorRegistry.java new file mode 100644 index 000000000..e21a13216 --- /dev/null +++ b/src/main/java/io/strimzi/kafka/bridge/metrics/StrimziCollectorRegistry.java @@ -0,0 +1,55 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.metrics; + +import io.prometheus.metrics.expositionformats.PrometheusTextFormatWriter; +import io.prometheus.metrics.model.registry.PrometheusRegistry; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +/** + * Allow to collect Strimzi Reporter metrics exposing them in the Prometheus format. + */ +public class StrimziCollectorRegistry { + private final PrometheusRegistry registry; + private final PrometheusTextFormatWriter textFormatter; + + /** + * Constructor. + */ + public StrimziCollectorRegistry() { + // note that Prometheus default registry is a singleton, so it is shared with Strimzi Metrics Reporter + this(PrometheusRegistry.defaultRegistry, new PrometheusTextFormatWriter(true)); + } + + /** + * Constructor. + * + * @param registry Prometheus collector registry + * @param textFormatter Prometheus text formatter + */ + /* test */ StrimziCollectorRegistry(PrometheusRegistry registry, + PrometheusTextFormatWriter textFormatter) { + this.registry = registry; + this.textFormatter = textFormatter; + } + + /** + * @return Content that should be included in the response body for + * an endpoint designated for Prometheus to scrape from. + */ + public String scrape() { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + try { + textFormatter.write(stream, registry.scrape()); + } catch (IOException e) { + throw new RuntimeException(e); + } + return stream.toString(StandardCharsets.UTF_8); + } +} diff --git a/src/test/java/io/strimzi/kafka/bridge/config/ConfigTest.java b/src/test/java/io/strimzi/kafka/bridge/config/ConfigTest.java index a237e2fc6..fe341b7d3 100644 --- a/src/test/java/io/strimzi/kafka/bridge/config/ConfigTest.java +++ b/src/test/java/io/strimzi/kafka/bridge/config/ConfigTest.java @@ -2,9 +2,9 @@ * Copyright Strimzi authors. * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). */ - package io.strimzi.kafka.bridge.config; +import io.strimzi.kafka.bridge.metrics.MetricsType; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; @@ -14,13 +14,15 @@ import java.util.Map; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.junit.jupiter.api.Assertions.assertThrows; /** * Some config related classes unit tests */ public class ConfigTest { - @Test public void testConfig() { Map map = new HashMap<>(); @@ -64,8 +66,8 @@ public void testHidingPassword() { BridgeConfig bridgeConfig = BridgeConfig.fromMap(map); assertThat(bridgeConfig.getKafkaConfig().getConfig().size(), is(6)); - assertThat(bridgeConfig.getKafkaConfig().toString().contains("ssl.truststore.password=" + storePassword), is(false)); - assertThat(bridgeConfig.getKafkaConfig().toString().contains("ssl.truststore.password=[hidden]"), is(true)); + assertThat(bridgeConfig.getKafkaConfig().toString(), not(containsString("ssl.truststore.password=" + storePassword))); + assertThat(bridgeConfig.getKafkaConfig().toString(), containsString("ssl.truststore.password=[hidden]")); } @Test @@ -81,4 +83,52 @@ public void testHttpDefaults() { assertThat(bridgeConfig.getHttpConfig().isConsumerEnabled(), is(true)); assertThat(bridgeConfig.getHttpConfig().isProducerEnabled(), is(true)); } + + @Test + public void testJmxExporterMetricsType() { + Map map = Map.of( + "bridge.id", "my-bridge", + "kafka.bootstrap.servers", "localhost:9092", + "bridge.metrics", "jmxPrometheusExporter", + "http.host", "0.0.0.0", + "http.port", "8080" + ); + + BridgeConfig bridgeConfig = BridgeConfig.fromMap(map); + assertThat(bridgeConfig.getMetrics(), is(MetricsType.JMX_EXPORTER.toString())); + } + + @Test + public void testStrimziReporterMetricsType() { + Map map = Map.of( + "bridge.id", "my-bridge", + "kafka.bootstrap.servers", "localhost:9092", + "bridge.metrics", "strimziMetricsReporter", + "kafka.metric.reporters", "io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter", + "kafka.prometheus.metrics.reporter.listener.enable", "false", + "kafka.prometheus.metrics.reporter.allowlist", "kafka_log.*,kafka_network.*", + "http.host", "0.0.0.0", + "http.port", "8080" + ); + + BridgeConfig bridgeConfig = BridgeConfig.fromMap(map); + assertThat(bridgeConfig.getMetrics(), is(MetricsType.STRIMZI_REPORTER.toString())); + assertThat(bridgeConfig.getKafkaConfig().toString(), containsString("metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter")); + assertThat(bridgeConfig.getKafkaConfig().toString(), containsString("prometheus.metrics.reporter.listener.enable=false")); + assertThat(bridgeConfig.getKafkaConfig().toString(), containsString("prometheus.metrics.reporter.allowlist=kafka_log.*,kafka_network.*")); + } + + @Test + public void testInvalidMetricsType() { + Map map = Map.of( + "bridge.id", "my-bridge", + "kafka.bootstrap.servers", "localhost:9092", + "bridge.metrics", "invalidType", + "http.host", "0.0.0.0", + "http.port", "8080" + ); + + Exception e = assertThrows(IllegalArgumentException.class, () -> BridgeConfig.fromMap(map)); + assertThat(e.getMessage(), is("Invalid bridge.metrics configuration, choose one of jmxPrometheusExporter and strimziMetricsReporter")); + } } diff --git a/src/test/java/io/strimzi/kafka/bridge/http/ConsumerGeneratedNameIT.java b/src/test/java/io/strimzi/kafka/bridge/http/ConsumerGeneratedNameIT.java index cd48f7fdf..c34082c14 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/ConsumerGeneratedNameIT.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/ConsumerGeneratedNameIT.java @@ -4,15 +4,14 @@ */ package io.strimzi.kafka.bridge.http; -import io.micrometer.core.instrument.MeterRegistry; import io.netty.handler.codec.http.HttpResponseStatus; -import io.strimzi.kafka.bridge.JmxCollectorRegistry; -import io.strimzi.kafka.bridge.MetricsReporter; import io.strimzi.kafka.bridge.config.BridgeConfig; import io.strimzi.kafka.bridge.config.KafkaConfig; import io.strimzi.kafka.bridge.config.KafkaConsumerConfig; import io.strimzi.kafka.bridge.config.KafkaProducerConfig; import io.strimzi.kafka.bridge.http.services.ConsumerService; +import io.strimzi.kafka.bridge.metrics.JmxCollectorRegistry; +import io.strimzi.kafka.bridge.metrics.MetricsReporter; import io.strimzi.kafka.bridge.utils.Urls; import io.strimzi.test.container.StrimziKafkaContainer; import io.vertx.core.Vertx; @@ -78,7 +77,6 @@ public class ConsumerGeneratedNameIT { private static HttpBridge httpBridge; private static WebClient client; private static BridgeConfig bridgeConfig; - private static MeterRegistry meterRegistry = null; private static JmxCollectorRegistry jmxCollectorRegistry = null; ConsumerService consumerService() { @@ -97,7 +95,7 @@ static void beforeAll(VertxTestContext context) { if ("FALSE".equals(BRIDGE_EXTERNAL_ENV)) { bridgeConfig = BridgeConfig.fromMap(config); - httpBridge = new HttpBridge(bridgeConfig, new MetricsReporter(jmxCollectorRegistry, meterRegistry)); + httpBridge = new HttpBridge(bridgeConfig, new MetricsReporter(jmxCollectorRegistry)); LOGGER.info("Deploying in-memory bridge"); vertx.deployVerticle(httpBridge, context.succeeding(id -> context.completeNow())); diff --git a/src/test/java/io/strimzi/kafka/bridge/http/DisablingConsumerProducerIT.java b/src/test/java/io/strimzi/kafka/bridge/http/DisablingConsumerProducerIT.java index a951f5a01..de94b53e2 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/DisablingConsumerProducerIT.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/DisablingConsumerProducerIT.java @@ -7,7 +7,6 @@ import io.netty.handler.codec.http.HttpResponseStatus; import io.strimzi.kafka.bridge.BridgeContentType; -import io.strimzi.kafka.bridge.MetricsReporter; import io.strimzi.kafka.bridge.config.BridgeConfig; import io.strimzi.kafka.bridge.config.KafkaConfig; import io.strimzi.kafka.bridge.config.KafkaConsumerConfig; @@ -15,6 +14,8 @@ import io.strimzi.kafka.bridge.http.model.HttpBridgeError; import io.strimzi.kafka.bridge.http.services.ConsumerService; import io.strimzi.kafka.bridge.http.services.ProducerService; +import io.strimzi.kafka.bridge.metrics.MetricsReporter; +import io.strimzi.kafka.bridge.metrics.StrimziCollectorRegistry; import io.strimzi.kafka.bridge.utils.Urls; import io.vertx.core.Vertx; import io.vertx.core.json.JsonArray; @@ -164,7 +165,7 @@ private void startBridge(VertxTestContext context, Map config) t CompletableFuture startBridge = new CompletableFuture<>(); if ("FALSE".equals(BRIDGE_EXTERNAL_ENV)) { BridgeConfig bridgeConfig = BridgeConfig.fromMap(config); - HttpBridge httpBridge = new HttpBridge(bridgeConfig, new MetricsReporter(null, null)); + HttpBridge httpBridge = new HttpBridge(bridgeConfig, new MetricsReporter(new StrimziCollectorRegistry())); LOGGER.info("Deploying in-memory bridge"); vertx.deployVerticle(httpBridge, context.succeeding(id -> startBridge.complete(true))); diff --git a/src/test/java/io/strimzi/kafka/bridge/http/HttpCorsIT.java b/src/test/java/io/strimzi/kafka/bridge/http/HttpCorsIT.java index b6c3f6be9..be28c7977 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/HttpCorsIT.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/HttpCorsIT.java @@ -5,14 +5,13 @@ package io.strimzi.kafka.bridge.http; -import io.micrometer.core.instrument.MeterRegistry; import io.netty.handler.codec.http.HttpResponseStatus; import io.strimzi.kafka.bridge.BridgeContentType; -import io.strimzi.kafka.bridge.JmxCollectorRegistry; -import io.strimzi.kafka.bridge.MetricsReporter; import io.strimzi.kafka.bridge.config.BridgeConfig; import io.strimzi.kafka.bridge.config.KafkaConfig; import io.strimzi.kafka.bridge.facades.AdminClientFacade; +import io.strimzi.kafka.bridge.metrics.JmxCollectorRegistry; +import io.strimzi.kafka.bridge.metrics.MetricsReporter; import io.strimzi.kafka.bridge.utils.Urls; import io.strimzi.test.container.StrimziKafkaContainer; import io.vertx.core.Vertx; @@ -58,7 +57,6 @@ public class HttpCorsIT { static BridgeConfig bridgeConfig; static StrimziKafkaContainer kafkaContainer; - static MeterRegistry meterRegistry = null; static JmxCollectorRegistry jmxCollectorRegistry = null; static AdminClientFacade adminClientFacade; @@ -288,7 +286,7 @@ private void configureBridge(boolean corsEnabled, String methodsAllowed) { config.put(HttpConfig.HTTP_CORS_ALLOWED_METHODS, methodsAllowed != null ? methodsAllowed : "GET,POST,PUT,DELETE,OPTIONS,PATCH"); bridgeConfig = BridgeConfig.fromMap(config); - httpBridge = new HttpBridge(bridgeConfig, new MetricsReporter(jmxCollectorRegistry, meterRegistry)); + httpBridge = new HttpBridge(bridgeConfig, new MetricsReporter(jmxCollectorRegistry)); } } } diff --git a/src/test/java/io/strimzi/kafka/bridge/http/InvalidProducerIT.java b/src/test/java/io/strimzi/kafka/bridge/http/InvalidProducerIT.java index 0bc149bd1..bbc93de2e 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/InvalidProducerIT.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/InvalidProducerIT.java @@ -5,12 +5,12 @@ package io.strimzi.kafka.bridge.http; import io.strimzi.kafka.bridge.BridgeContentType; -import io.strimzi.kafka.bridge.MetricsReporter; import io.strimzi.kafka.bridge.clients.BasicKafkaClient; import io.strimzi.kafka.bridge.config.BridgeConfig; import io.strimzi.kafka.bridge.config.KafkaProducerConfig; import io.strimzi.kafka.bridge.facades.AdminClientFacade; import io.strimzi.kafka.bridge.http.base.HttpBridgeITAbstract; +import io.strimzi.kafka.bridge.metrics.MetricsReporter; import io.strimzi.kafka.bridge.utils.Urls; import io.vertx.core.Vertx; import io.vertx.core.json.JsonArray; @@ -75,7 +75,7 @@ static void beforeAll(VertxTestContext context) { if ("FALSE".equals(BRIDGE_EXTERNAL_ENV)) { bridgeConfig = BridgeConfig.fromMap(cfg); - httpBridge = new HttpBridge(bridgeConfig, new MetricsReporter(jmxCollectorRegistry, meterRegistry)); + httpBridge = new HttpBridge(bridgeConfig, new MetricsReporter(jmxCollectorRegistry)); LOGGER.info("Deploying in-memory bridge"); vertx.deployVerticle(httpBridge, context.succeeding(id -> context.completeNow())); diff --git a/src/test/java/io/strimzi/kafka/bridge/http/OtherServicesIT.java b/src/test/java/io/strimzi/kafka/bridge/http/OtherServicesIT.java index b915547ac..29eac8856 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/OtherServicesIT.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/OtherServicesIT.java @@ -70,15 +70,15 @@ void healthyTest(VertxTestContext context) throws InterruptedException { @Test void metricsTest(VertxTestContext context) { baseService() - .getRequest("/metrics") - .send(ar -> { - context.verify(() -> { - assertThat(ar.succeeded(), is(true)); - assertThat(ar.result().statusCode(), is(HttpResponseStatus.OK.code())); - assertThat(ar.result().getHeader("Content-Type"), is("text/plain; version=0.0.4; charset=utf-8")); - context.completeNow(); - }); + .getRequest("/metrics") + .send(ar -> { + context.verify(() -> { + assertThat(ar.succeeded(), is(true)); + assertThat(ar.result().statusCode(), is(HttpResponseStatus.OK.code())); + assertThat(ar.result().getHeader("Content-Type"), is("text/plain; version=0.0.4; charset=utf-8")); + context.completeNow(); }); + }); } @Test diff --git a/src/test/java/io/strimzi/kafka/bridge/http/base/HttpBridgeITAbstract.java b/src/test/java/io/strimzi/kafka/bridge/http/base/HttpBridgeITAbstract.java index 8926f01f0..721fe73df 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/base/HttpBridgeITAbstract.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/base/HttpBridgeITAbstract.java @@ -5,9 +5,6 @@ package io.strimzi.kafka.bridge.http.base; -import io.micrometer.core.instrument.MeterRegistry; -import io.strimzi.kafka.bridge.JmxCollectorRegistry; -import io.strimzi.kafka.bridge.MetricsReporter; import io.strimzi.kafka.bridge.clients.BasicKafkaClient; import io.strimzi.kafka.bridge.config.BridgeConfig; import io.strimzi.kafka.bridge.config.KafkaConfig; @@ -20,6 +17,9 @@ import io.strimzi.kafka.bridge.http.services.ConsumerService; import io.strimzi.kafka.bridge.http.services.ProducerService; import io.strimzi.kafka.bridge.http.services.SeekService; +import io.strimzi.kafka.bridge.metrics.JmxCollectorRegistry; +import io.strimzi.kafka.bridge.metrics.MetricsReporter; +import io.strimzi.kafka.bridge.metrics.StrimziCollectorRegistry; import io.strimzi.kafka.bridge.utils.Urls; import io.strimzi.test.container.StrimziKafkaContainer; import io.vertx.core.Vertx; @@ -96,8 +96,6 @@ public abstract class HttpBridgeITAbstract { protected static AdminClientFacade adminClientFacade; protected static HttpBridge httpBridge; protected static BridgeConfig bridgeConfig; - - protected static MeterRegistry meterRegistry = null; protected static JmxCollectorRegistry jmxCollectorRegistry = null; protected BaseService baseService() { @@ -127,7 +125,8 @@ static void beforeAll(VertxTestContext context) { if ("FALSE".equals(BRIDGE_EXTERNAL_ENV)) { bridgeConfig = BridgeConfig.fromMap(config); - httpBridge = new HttpBridge(bridgeConfig, new MetricsReporter(jmxCollectorRegistry, meterRegistry)); + + httpBridge = new HttpBridge(bridgeConfig, new MetricsReporter(new StrimziCollectorRegistry())); LOGGER.info("Deploying in-memory bridge"); vertx.deployVerticle(httpBridge, context.succeeding(id -> context.completeNow())); diff --git a/src/test/java/io/strimzi/kafka/bridge/metrics/JmxCollectorRegistryTest.java b/src/test/java/io/strimzi/kafka/bridge/metrics/JmxCollectorRegistryTest.java new file mode 100644 index 000000000..007cefa71 --- /dev/null +++ b/src/test/java/io/strimzi/kafka/bridge/metrics/JmxCollectorRegistryTest.java @@ -0,0 +1,73 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.bridge.metrics; + +import io.prometheus.jmx.JmxCollector; +import io.prometheus.metrics.expositionformats.PrometheusTextFormatWriter; +import io.prometheus.metrics.model.registry.PrometheusRegistry; +import io.prometheus.metrics.model.snapshots.MetricSnapshots; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class JmxCollectorRegistryTest { + @Test + void shouldReturnFormattedMetrics() throws IOException { + JmxCollector mockJmxCollector = mock(JmxCollector.class); + + PrometheusRegistry mockPromRegistry = mock(PrometheusRegistry.class); + MetricSnapshots mockSnapshots = mock(MetricSnapshots.class); + when(mockPromRegistry.scrape()).thenReturn(mockSnapshots); + + PrometheusTextFormatWriter mockPromFormatter = mock(PrometheusTextFormatWriter.class); + doAnswer(invocation -> { + ByteArrayOutputStream stream = invocation.getArgument(0); + stream.write("test_metric\n".getBytes(StandardCharsets.UTF_8)); + return null; + }).when(mockPromFormatter).write(any(), any()); + + JmxCollectorRegistry collectorRegistry = new JmxCollectorRegistry(mockJmxCollector, mockPromRegistry, mockPromFormatter); + + String result = collectorRegistry.scrape(); + assertThat(result, containsString("test_metric")); + assertThat(result.getBytes(StandardCharsets.UTF_8).length, is(result.length())); + } + + @Test + void shouldHandleIoException() throws IOException { + JmxCollector mockJmxCollector = mock(JmxCollector.class); + + PrometheusRegistry mockPromRegistry = mock(PrometheusRegistry.class); + MetricSnapshots mockSnapshots = mock(MetricSnapshots.class); + when(mockPromRegistry.scrape()).thenReturn(mockSnapshots); + + PrometheusTextFormatWriter mockPromFormatter = mock(PrometheusTextFormatWriter.class); + doThrow(new IOException("Test exception")) + .when(mockPromFormatter).write(any(ByteArrayOutputStream.class), Mockito.eq(mockSnapshots)); + + JmxCollectorRegistry collectorRegistry = new JmxCollectorRegistry(mockJmxCollector, mockPromRegistry, mockPromFormatter); + + RuntimeException exception = assertThrows(RuntimeException.class, () -> collectorRegistry.scrape()); + assertThat(exception.getMessage(), containsString("Test exception")); + } + + @Test + void shouldThrowWithInvalidYaml() { + assertThrows(ClassCastException.class, () -> new JmxCollectorRegistry("invalid")); + } +} diff --git a/src/test/java/io/strimzi/kafka/bridge/metrics/MetricsReporterTest.java b/src/test/java/io/strimzi/kafka/bridge/metrics/MetricsReporterTest.java new file mode 100644 index 000000000..c68ca2309 --- /dev/null +++ b/src/test/java/io/strimzi/kafka/bridge/metrics/MetricsReporterTest.java @@ -0,0 +1,113 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.bridge.metrics; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.prometheus.PrometheusMeterRegistry; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class MetricsReporterTest { + @Test + void shouldReturnMetrics() { + JmxCollectorRegistry mockJmxRegistry = mock(JmxCollectorRegistry.class); + when(mockJmxRegistry.scrape()).thenReturn("jmx_metrics\n"); + + StrimziCollectorRegistry mockStrimziRegistry = mock(StrimziCollectorRegistry.class); + when(mockStrimziRegistry.scrape()).thenReturn("strimzi_metrics\n"); + + PrometheusMeterRegistry mockVertxRegistry = mock(PrometheusMeterRegistry.class); + when(mockVertxRegistry.scrape()).thenReturn("vertx_metrics\n"); + MeterRegistry.Config meterRegistryConfig = mock(MeterRegistry.Config.class); + when(mockVertxRegistry.config()).thenReturn(meterRegistryConfig); + + MetricsReporter metricsReporter = new MetricsReporter( + mockJmxRegistry, mockStrimziRegistry, mockVertxRegistry); + + String result = metricsReporter.scrape(); + + assertThat(result, containsString("jmx_metrics")); + assertThat(result, containsString("strimzi_metrics")); + assertThat(result, containsString("vertx_metrics")); + + verify(mockJmxRegistry).scrape(); + verify(mockStrimziRegistry).scrape(); + verify(mockVertxRegistry).scrape(); + } + + @Test + void shouldReturnMetricsWithoutStrimziRegistry() { + JmxCollectorRegistry mockJmxRegistry = mock(JmxCollectorRegistry.class); + when(mockJmxRegistry.scrape()).thenReturn("jmx_metrics\n"); + + PrometheusMeterRegistry mockVertxRegistry = mock(PrometheusMeterRegistry.class); + when(mockVertxRegistry.scrape()).thenReturn("vertx_metrics\n"); + MeterRegistry.Config meterRegistryConfig = mock(MeterRegistry.Config.class); + when(mockVertxRegistry.config()).thenReturn(meterRegistryConfig); + + MetricsReporter metricsReporter = new MetricsReporter(mockJmxRegistry, null, mockVertxRegistry); + + String result = metricsReporter.scrape(); + + assertThat(result, containsString("jmx_metrics")); + assertThat(result, containsString("vertx_metrics")); + + verify(mockJmxRegistry).scrape(); + verify(mockVertxRegistry).scrape(); + } + + @Test + void shouldReturnMetricsWithoutJmxRegistry() { + StrimziCollectorRegistry mockStrimziRegistry = mock(StrimziCollectorRegistry.class); + when(mockStrimziRegistry.scrape()).thenReturn("strimzi_metrics\n"); + + PrometheusMeterRegistry mockVertxRegistry = mock(PrometheusMeterRegistry.class); + when(mockVertxRegistry.scrape()).thenReturn("vertx_metrics\n"); + MeterRegistry.Config meterRegistryConfig = mock(MeterRegistry.Config.class); + when(mockVertxRegistry.config()).thenReturn(meterRegistryConfig); + + MetricsReporter metricsReporter = new MetricsReporter(null, mockStrimziRegistry, mockVertxRegistry); + + String result = metricsReporter.scrape(); + + assertThat(result, containsString("strimzi_metrics")); + assertThat(result, containsString("vertx_metrics")); + + verify(mockStrimziRegistry).scrape(); + verify(mockVertxRegistry).scrape(); + } + + @Test + void testGetVertxRegistry() { + StrimziCollectorRegistry mockStrimziRegistry = mock(StrimziCollectorRegistry.class); + PrometheusMeterRegistry mockVertxRegistry = mock(PrometheusMeterRegistry.class); + MeterRegistry.Config meterRegistryConfig = mock(MeterRegistry.Config.class); + when(mockVertxRegistry.config()).thenReturn(meterRegistryConfig); + + MetricsReporter metricsReporter = new MetricsReporter(null, mockStrimziRegistry, mockVertxRegistry); + + MeterRegistry result = metricsReporter.getVertxRegistry(); + + assertThat(result, is(mockVertxRegistry)); + } + + @Test + void testNamingConventionAppliedForPrometheusRegistry() { + StrimziCollectorRegistry mockStrimziRegistry = mock(StrimziCollectorRegistry.class); + PrometheusMeterRegistry mockVertxRegistry = mock(PrometheusMeterRegistry.class); + MeterRegistry.Config meterRegistryConfig = mock(MeterRegistry.Config.class); + when(mockVertxRegistry.config()).thenReturn(meterRegistryConfig); + + new MetricsReporter(null, mockStrimziRegistry, mockVertxRegistry); + + verify(mockVertxRegistry).config(); + } +} diff --git a/src/test/java/io/strimzi/kafka/bridge/metrics/StrimziCollectorRegistryTest.java b/src/test/java/io/strimzi/kafka/bridge/metrics/StrimziCollectorRegistryTest.java new file mode 100644 index 000000000..bab4d7eb3 --- /dev/null +++ b/src/test/java/io/strimzi/kafka/bridge/metrics/StrimziCollectorRegistryTest.java @@ -0,0 +1,63 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.bridge.metrics; + +import io.prometheus.metrics.expositionformats.PrometheusTextFormatWriter; +import io.prometheus.metrics.model.registry.PrometheusRegistry; +import io.prometheus.metrics.model.snapshots.MetricSnapshots; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class StrimziCollectorRegistryTest { + @Test + void shouldReturnFormattedMetrics() throws IOException { + PrometheusRegistry mockPromRegistry = mock(PrometheusRegistry.class); + MetricSnapshots mockSnapshots = mock(MetricSnapshots.class); + when(mockPromRegistry.scrape()).thenReturn(mockSnapshots); + + PrometheusTextFormatWriter mockPromFormatter = mock(PrometheusTextFormatWriter.class); + doAnswer(invocation -> { + ByteArrayOutputStream stream = invocation.getArgument(0); + stream.write("test_metric\n".getBytes(StandardCharsets.UTF_8)); + return null; + }).when(mockPromFormatter).write(any(), any()); + + StrimziCollectorRegistry collectorRegistry = new StrimziCollectorRegistry(mockPromRegistry, mockPromFormatter); + + String result = collectorRegistry.scrape(); + assertThat(result, containsString("test_metric")); + assertThat(result.getBytes(StandardCharsets.UTF_8).length, is(result.length())); + } + + @Test + void shouldHandleIoException() throws IOException { + PrometheusRegistry mockPromRegistry = mock(PrometheusRegistry.class); + MetricSnapshots mockSnapshots = mock(MetricSnapshots.class); + when(mockPromRegistry.scrape()).thenReturn(mockSnapshots); + + PrometheusTextFormatWriter mockPromFormatter = mock(PrometheusTextFormatWriter.class); + doThrow(new IOException("Test exception")) + .when(mockPromFormatter).write(any(ByteArrayOutputStream.class), Mockito.eq(mockSnapshots)); + + StrimziCollectorRegistry collectorRegistry = new StrimziCollectorRegistry(mockPromRegistry, mockPromFormatter); + + RuntimeException exception = assertThrows(RuntimeException.class, () -> collectorRegistry.scrape()); + assertThat(exception.getMessage(), containsString("Test exception")); + } +}