Skip to content

Commit

Permalink
Update the Pubsub UserAgent format (close #362)
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Nov 8, 2023
1 parent 7110b3f commit d3a5801
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ package model {
googleProjectId: String,
backoffPolicy: GooglePubSubBackoffPolicyConfig,
startupCheckInterval: FiniteDuration,
retryInterval: FiniteDuration
retryInterval: FiniteDuration,
gcpUserAgent: GcpUserAgent
) extends SinkConfig
final case class Kafka(
maxBytes: Int,
Expand Down Expand Up @@ -169,6 +170,7 @@ package model {
sink: SinkConfig,
buffer: BufferConfig
)
final case class GcpUserAgent(productName: String)

final case class StatsdConfig(
enabled: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ abstract class ConfigSpec extends Specification {
rpcTimeoutMultiplier = 2
),
startupCheckInterval = 1.second,
retryInterval = 10.seconds
retryInterval = 10.seconds,
gcpUserAgent = GcpUserAgent(productName = "Snowplow OSS")
)
case "sqs" =>
Sqs(
Expand Down
3 changes: 3 additions & 0 deletions pubsub/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ collector {

startupCheckInterval = 1 second
retryInterval = 10 seconds
gcpUserAgent {
productName = "Snowplow OSS"
}
}

buffer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,14 @@ object GooglePubSubSink {
val credentialsProvider = NoCredentialsProvider.create()
(channelProvider, credentialsProvider)
}
publisher <- createPublisher(googlePubSubConfig.googleProjectId, topicName, batching, retry, customProviders)
publisher <- createPublisher(
googlePubSubConfig.googleProjectId,
topicName,
batching,
retry,
customProviders,
googlePubSubConfig.gcpUserAgent
)
sink = new GooglePubSubSink(
maxBytes,
publisher,
Expand All @@ -187,8 +194,6 @@ object GooglePubSubSink {
_ = sink.checkPubsubHealth(customProviders, googlePubSubConfig.startupCheckInterval)
} yield sink

private val UserAgent = s"snowplow/stream-collector-${generated.BuildInfo.version}"

/**
* Instantiates a Publisher on a topic with the given configuration options.
* This can fail if the publisher can't be created.
Expand All @@ -199,20 +204,24 @@ object GooglePubSubSink {
topicName: String,
batchingSettings: BatchingSettings,
retrySettings: RetrySettings,
customProviders: Option[(TransportChannelProvider, CredentialsProvider)]
customProviders: Option[(TransportChannelProvider, CredentialsProvider)],
gcpUserAgent: GcpUserAgent
): Either[Throwable, Publisher] = {
val builder = Publisher
.newBuilder(ProjectTopicName.of(projectId, topicName))
.setBatchingSettings(batchingSettings)
.setRetrySettings(retrySettings)
.setHeaderProvider(FixedHeaderProvider.create("User-Agent", UserAgent))
.setHeaderProvider(FixedHeaderProvider.create("User-Agent", createUserAgent(gcpUserAgent)))
customProviders.foreach {
case (channelProvider, credentialsProvider) =>
builder.setChannelProvider(channelProvider).setCredentialsProvider(credentialsProvider)
}
Either.catchNonFatal(builder.build()).leftMap(e => new RuntimeException("Couldn't build PubSub publisher", e))
}

private[sinks] def createUserAgent(gcpUserAgent: GcpUserAgent): String =
s"${gcpUserAgent.productName}/collector (GPN:Snowplow;)"

private def batchingSettings(bufferConfig: BufferConfig): BatchingSettings =
BatchingSettings
.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0, and
* you may not use this file except in compliance with the Apache License
* Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Apache License Version 2.0 is distributed on an "AS
* IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.collectors.scalastream.sinks

import java.util.regex.Pattern

import com.snowplowanalytics.snowplow.collectors.scalastream.model._

import org.specs2.mutable.Specification

class GcpUserAgentSpec extends Specification {

"createUserAgent" should {
"create user agent string correctly" in {
val gcpUserAgent = GcpUserAgent(productName = "Snowplow OSS")
val resultUserAgent = GooglePubSubSink.createUserAgent(gcpUserAgent)
val expectedUserAgent = s"Snowplow OSS/collector (GPN:Snowplow;)"

val userAgentRegex = Pattern.compile(
"""(?iU)(?:[^\(\)\/]+\/[^\/]+\s+)*(?:[^\s][^\(\)\/]+\/[^\/]+\s?\([^\(\)]*)gpn:(.*)[;\)]"""
)
val matcher = userAgentRegex.matcher(resultUserAgent)
val matched = if (matcher.find()) Some(matcher.group(1)) else None
val expectedMatched = "Snowplow;"

resultUserAgent must beEqualTo(expectedUserAgent)
matched must beSome(expectedMatched)
}
}
}

0 comments on commit d3a5801

Please sign in to comment.