Skip to content

Commit

Permalink
[jaegerreceiver] Remove unused remote sampling server (#36971)
Browse files Browse the repository at this point in the history
#### Description

The RemoteSampling config of the receiver has been disabled in the past
(it just logs a warning now), but the code for the actual sampling
server was still around. This PR removes that code.

While the endpoint is defined in the private config
https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/b53b5ad2e857367170c0ba04dfca8ad9886f0a87/receiver/jaegerreceiver/trace_receiver.go#L51

it has been removed from the public config and the private part is never
populated
https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/b53b5ad2e857367170c0ba04dfca8ad9886f0a87/receiver/jaegerreceiver/factory.go#L107-L109

#### Testing

CI

Signed-off-by: Yuri Shkuro <[email protected]>
Co-authored-by: Alex Boten <[email protected]>
  • Loading branch information
yurishkuro and codeboten authored Jan 9, 2025
1 parent c14c96b commit 7865587
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 199 deletions.
24 changes: 12 additions & 12 deletions receiver/jaegerreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ type RemoteSamplingConfig struct {

// Protocols is the configuration for the supported protocols.
type Protocols struct {
GRPC *configgrpc.ServerConfig `mapstructure:"grpc"`
ThriftHTTP *confighttp.ServerConfig `mapstructure:"thrift_http"`
ThriftBinary *ProtocolUDP `mapstructure:"thrift_binary"`
ThriftCompact *ProtocolUDP `mapstructure:"thrift_compact"`
GRPC *configgrpc.ServerConfig `mapstructure:"grpc"`
ThriftHTTP *confighttp.ServerConfig `mapstructure:"thrift_http"`
ThriftBinaryUDP *ProtocolUDP `mapstructure:"thrift_binary"`
ThriftCompactUDP *ProtocolUDP `mapstructure:"thrift_compact"`
}

// ProtocolUDP is the configuration for a UDP protocol.
Expand Down Expand Up @@ -82,8 +82,8 @@ var (
func (cfg *Config) Validate() error {
if cfg.GRPC == nil &&
cfg.ThriftHTTP == nil &&
cfg.ThriftBinary == nil &&
cfg.ThriftCompact == nil {
cfg.ThriftBinaryUDP == nil &&
cfg.ThriftCompactUDP == nil {
return errors.New("must specify at least one protocol when using the Jaeger receiver")
}

Expand All @@ -99,14 +99,14 @@ func (cfg *Config) Validate() error {
}
}

if cfg.ThriftBinary != nil {
if err := checkPortFromEndpoint(cfg.ThriftBinary.Endpoint); err != nil {
if cfg.ThriftBinaryUDP != nil {
if err := checkPortFromEndpoint(cfg.ThriftBinaryUDP.Endpoint); err != nil {
return fmt.Errorf("invalid port number for the Thrift UDP Binary endpoint: %w", err)
}
}

if cfg.ThriftCompact != nil {
if err := checkPortFromEndpoint(cfg.ThriftCompact.Endpoint); err != nil {
if cfg.ThriftCompactUDP != nil {
if err := checkPortFromEndpoint(cfg.ThriftCompactUDP.Endpoint); err != nil {
return fmt.Errorf("invalid port number for the Thrift UDP Compact endpoint: %w", err)
}
}
Expand Down Expand Up @@ -145,10 +145,10 @@ func (cfg *Config) Unmarshal(componentParser *confmap.Conf) error {
cfg.ThriftHTTP = nil
}
if !protocols.IsSet(protoThriftBinary) {
cfg.ThriftBinary = nil
cfg.ThriftBinaryUDP = nil
}
if !protocols.IsSet(protoThriftCompact) {
cfg.ThriftCompact = nil
cfg.ThriftCompactUDP = nil
}

return nil
Expand Down
16 changes: 8 additions & 8 deletions receiver/jaegerreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestLoadConfig(t *testing.T) {
ThriftHTTP: &confighttp.ServerConfig{
Endpoint: ":3456",
},
ThriftCompact: &ProtocolUDP{
ThriftCompactUDP: &ProtocolUDP{
Endpoint: "0.0.0.0:456",
ServerConfigUDP: ServerConfigUDP{
QueueSize: 100_000,
Expand All @@ -51,7 +51,7 @@ func TestLoadConfig(t *testing.T) {
SocketBufferSize: 65_536,
},
},
ThriftBinary: &ProtocolUDP{
ThriftBinaryUDP: &ProtocolUDP{
Endpoint: "0.0.0.0:789",
ServerConfigUDP: ServerConfigUDP{
QueueSize: 1_000,
Expand All @@ -76,11 +76,11 @@ func TestLoadConfig(t *testing.T) {
ThriftHTTP: &confighttp.ServerConfig{
Endpoint: "localhost:14268",
},
ThriftCompact: &ProtocolUDP{
ThriftCompactUDP: &ProtocolUDP{
Endpoint: "localhost:6831",
ServerConfigUDP: defaultServerConfigUDP(),
},
ThriftBinary: &ProtocolUDP{
ThriftBinaryUDP: &ProtocolUDP{
Endpoint: "localhost:6832",
ServerConfigUDP: defaultServerConfigUDP(),
},
Expand All @@ -97,7 +97,7 @@ func TestLoadConfig(t *testing.T) {
Transport: confignet.TransportTypeTCP,
},
},
ThriftCompact: &ProtocolUDP{
ThriftCompactUDP: &ProtocolUDP{
Endpoint: "localhost:6831",
ServerConfigUDP: defaultServerConfigUDP(),
},
Expand Down Expand Up @@ -183,7 +183,7 @@ func TestInvalidConfig(t *testing.T) {
{
desc: "thrift-udp-compact-no-port",
apply: func(cfg *Config) {
cfg.ThriftCompact = &ProtocolUDP{
cfg.ThriftCompactUDP = &ProtocolUDP{
Endpoint: "localhost:",
}
},
Expand All @@ -192,7 +192,7 @@ func TestInvalidConfig(t *testing.T) {
{
desc: "thrift-udp-binary-no-port",
apply: func(cfg *Config) {
cfg.ThriftBinary = &ProtocolUDP{
cfg.ThriftBinaryUDP = &ProtocolUDP{
Endpoint: "localhost:",
}
},
Expand Down Expand Up @@ -220,7 +220,7 @@ func TestInvalidConfig(t *testing.T) {
{
desc: "port-outside-of-range",
apply: func(cfg *Config) {
cfg.ThriftBinary = &ProtocolUDP{
cfg.ThriftBinaryUDP = &ProtocolUDP{
Endpoint: "localhost:65536",
}
},
Expand Down
24 changes: 3 additions & 21 deletions receiver/jaegerreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ func createDefaultConfig() component.Config {
ThriftHTTP: &confighttp.ServerConfig{
Endpoint: defaultHTTPEndpoint,
},
ThriftBinary: &ProtocolUDP{
ThriftBinaryUDP: &ProtocolUDP{
Endpoint: defaultThriftBinaryEndpoint,
ServerConfigUDP: defaultServerConfigUDP(),
},
ThriftCompact: &ProtocolUDP{
ThriftCompactUDP: &ProtocolUDP{
Endpoint: defaultThriftCompactEndpoint,
ServerConfigUDP: defaultServerConfigUDP(),
},
Expand All @@ -85,28 +85,10 @@ func createTracesReceiver(

rCfg := cfg.(*Config)

var config configuration
// Set ports
if rCfg.Protocols.GRPC != nil {
config.GRPCServerConfig = *rCfg.Protocols.GRPC
}

if rCfg.Protocols.ThriftHTTP != nil {
config.HTTPServerConfig = *rCfg.ThriftHTTP
}

if rCfg.Protocols.ThriftBinary != nil {
config.AgentBinaryThrift = *rCfg.ThriftBinary
}

if rCfg.Protocols.ThriftCompact != nil {
config.AgentCompactThrift = *rCfg.ThriftCompact
}

if rCfg.RemoteSampling != nil {
set.Logger.Warn("You are using a deprecated no-op `remote_sampling` option which will be removed soon; use a `jaegerremotesampling` extension instead")
}

// Create the receiver.
return newJaegerReceiver(set.ID, &config, nextConsumer, set)
return newJaegerReceiver(set.ID, rCfg.Protocols, nextConsumer, set)
}
12 changes: 6 additions & 6 deletions receiver/jaegerreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestCreateDefaultGRPCEndpoint(t *testing.T) {
r, err := factory.CreateTraces(context.Background(), set, cfg, nil)

assert.NoError(t, err, "unexpected error creating receiver")
assert.Equal(t, "0.0.0.0:14250", r.(*jReceiver).config.GRPCServerConfig.NetAddr.Endpoint, "grpc port should be default")
assert.Equal(t, "0.0.0.0:14250", r.(*jReceiver).config.GRPC.NetAddr.Endpoint, "grpc port should be default")
}

func TestCreateTLSGPRCEndpoint(t *testing.T) {
Expand Down Expand Up @@ -144,33 +144,33 @@ func TestCreateInvalidHTTPEndpoint(t *testing.T) {
r, err := factory.CreateTraces(context.Background(), set, cfg, nil)

assert.NoError(t, err, "unexpected error creating receiver")
assert.Equal(t, "localhost:14268", r.(*jReceiver).config.HTTPServerConfig.Endpoint, "http port should be default")
assert.Equal(t, "localhost:14268", r.(*jReceiver).config.ThriftHTTP.Endpoint, "http port should be default")
}

func TestCreateInvalidThriftBinaryEndpoint(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

cfg.(*Config).Protocols.ThriftBinary = &ProtocolUDP{
cfg.(*Config).Protocols.ThriftBinaryUDP = &ProtocolUDP{
Endpoint: "0.0.0.0:6832",
}
set := receivertest.NewNopSettings()
r, err := factory.CreateTraces(context.Background(), set, cfg, nil)

assert.NoError(t, err, "unexpected error creating receiver")
assert.Equal(t, "0.0.0.0:6832", r.(*jReceiver).config.AgentBinaryThrift.Endpoint, "thrift port should be default")
assert.Equal(t, "0.0.0.0:6832", r.(*jReceiver).config.ThriftBinaryUDP.Endpoint, "thrift port should be default")
}

func TestCreateInvalidThriftCompactEndpoint(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

cfg.(*Config).Protocols.ThriftCompact = &ProtocolUDP{
cfg.(*Config).Protocols.ThriftCompactUDP = &ProtocolUDP{
Endpoint: "0.0.0.0:6831",
}
set := receivertest.NewNopSettings()
r, err := factory.CreateTraces(context.Background(), set, cfg, nil)

assert.NoError(t, err, "unexpected error creating receiver")
assert.Equal(t, "0.0.0.0:6831", r.(*jReceiver).config.AgentCompactThrift.Endpoint, "thrift port should be default")
assert.Equal(t, "0.0.0.0:6831", r.(*jReceiver).config.ThriftCompactUDP.Endpoint, "thrift port should be default")
}
2 changes: 0 additions & 2 deletions receiver/jaegerreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

83 changes: 13 additions & 70 deletions receiver/jaegerreceiver/jaeger_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,14 @@ package jaegerreceiver

import (
"context"
"fmt"
"net"
"net/http"
"testing"
"time"

"github.com/apache/thrift/lib/go/thrift"
"github.com/jaegertracing/jaeger/cmd/agent/app/servers/thriftudp"
"github.com/jaegertracing/jaeger/model"
jaegerconvert "github.com/jaegertracing/jaeger/model/converter/thrift/jaeger"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/thrift-gen/agent"
jaegerthrift "github.com/jaegertracing/jaeger/thrift-gen/jaeger"
"github.com/stretchr/testify/assert"
Expand All @@ -26,7 +23,6 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/receiver/receivertest"
conventions "go.opentelemetry.io/collector/semconv/v1.27.0"
"google.golang.org/grpc"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
Expand All @@ -37,17 +33,17 @@ var jaegerAgent = component.NewIDWithName(metadata.Type, "agent_test")

func TestJaegerAgentUDP_ThriftCompact(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)
testJaegerAgent(t, addr, &configuration{
AgentCompactThrift: ProtocolUDP{
testJaegerAgent(t, addr, Protocols{
ThriftCompactUDP: &ProtocolUDP{
Endpoint: addr,
ServerConfigUDP: defaultServerConfigUDP(),
},
})
}

func TestJaegerAgentUDP_ThriftCompact_InvalidPort(t *testing.T) {
config := &configuration{
AgentCompactThrift: ProtocolUDP{
config := Protocols{
ThriftCompactUDP: &ProtocolUDP{
Endpoint: "0.0.0.0:999999",
ServerConfigUDP: defaultServerConfigUDP(),
},
Expand All @@ -63,8 +59,8 @@ func TestJaegerAgentUDP_ThriftCompact_InvalidPort(t *testing.T) {

func TestJaegerAgentUDP_ThriftBinary(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)
testJaegerAgent(t, addr, &configuration{
AgentBinaryThrift: ProtocolUDP{
testJaegerAgent(t, addr, Protocols{
ThriftBinaryUDP: &ProtocolUDP{
Endpoint: addr,
ServerConfigUDP: defaultServerConfigUDP(),
},
Expand All @@ -75,8 +71,8 @@ func TestJaegerAgentUDP_ThriftBinary_PortInUse(t *testing.T) {
// This test confirms that the thrift binary port is opened correctly. This is all we can test at the moment. See above.
addr := testutil.GetAvailableLocalAddress(t)

config := &configuration{
AgentBinaryThrift: ProtocolUDP{
config := Protocols{
ThriftBinaryUDP: &ProtocolUDP{
Endpoint: addr,
ServerConfigUDP: defaultServerConfigUDP(),
},
Expand All @@ -85,7 +81,7 @@ func TestJaegerAgentUDP_ThriftBinary_PortInUse(t *testing.T) {
jr, err := newJaegerReceiver(jaegerAgent, config, nil, set)
require.NoError(t, err)

assert.NoError(t, jr.startAgent(componenttest.NewNopHost()), "Start failed")
assert.NoError(t, jr.startAgent(), "Start failed")
t.Cleanup(func() { require.NoError(t, jr.Shutdown(context.Background())) })

l, err := net.Listen("udp", addr)
Expand All @@ -97,8 +93,8 @@ func TestJaegerAgentUDP_ThriftBinary_PortInUse(t *testing.T) {
}

func TestJaegerAgentUDP_ThriftBinary_InvalidPort(t *testing.T) {
config := &configuration{
AgentBinaryThrift: ProtocolUDP{
config := Protocols{
ThriftBinaryUDP: &ProtocolUDP{
Endpoint: "0.0.0.0:999999",
ServerConfigUDP: defaultServerConfigUDP(),
},
Expand All @@ -112,60 +108,7 @@ func TestJaegerAgentUDP_ThriftBinary_InvalidPort(t *testing.T) {
require.NoError(t, jr.Shutdown(context.Background()))
}

func initializeGRPCTestServer(t *testing.T, beforeServe func(server *grpc.Server), opts ...grpc.ServerOption) (*grpc.Server, net.Addr) {
server := grpc.NewServer(opts...)
lis, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
beforeServe(server)
go func() {
err := server.Serve(lis)
assert.NoError(t, err)
}()
return server, lis.Addr()
}

type mockSamplingHandler struct{}

func (*mockSamplingHandler) GetSamplingStrategy(context.Context, *api_v2.SamplingStrategyParameters) (*api_v2.SamplingStrategyResponse, error) {
return &api_v2.SamplingStrategyResponse{StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC}, nil
}

func TestJaegerHTTP(t *testing.T) {
s, _ := initializeGRPCTestServer(t, func(s *grpc.Server) {
api_v2.RegisterSamplingManagerServer(s, &mockSamplingHandler{})
})
defer s.GracefulStop()

endpoint := testutil.GetAvailableLocalAddress(t)
config := &configuration{
AgentHTTPEndpoint: endpoint,
}
set := receivertest.NewNopSettings()
jr, err := newJaegerReceiver(jaegerAgent, config, nil, set)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, jr.Shutdown(context.Background())) })

assert.NoError(t, jr.Start(context.Background(), componenttest.NewNopHost()), "Start failed")

// allow http server to start
assert.Eventually(t, func() bool {
var conn net.Conn
conn, err = net.Dial("tcp", endpoint)
if err == nil && conn != nil {
conn.Close()
return true
}
return false
}, 10*time.Second, 5*time.Millisecond, "failed to wait for the port to be open")

resp, err := http.Get(fmt.Sprintf("http://%s/sampling?service=test", endpoint))
assert.NoError(t, err, "should not have failed to make request")
assert.NotNil(t, resp)
defer resp.Body.Close()
assert.Equal(t, 500, resp.StatusCode, "should have returned 200")
}

func testJaegerAgent(t *testing.T, agentEndpoint string, receiverConfig *configuration) {
func testJaegerAgent(t *testing.T, agentEndpoint string, receiverConfig Protocols) {
// 1. Create the Jaeger receiver aka "server"
sink := new(consumertest.TracesSink)
set := receivertest.NewNopSettings()
Expand All @@ -184,7 +127,7 @@ func testJaegerAgent(t *testing.T, agentEndpoint string, receiverConfig *configu
require.NoError(t, err, "Start failed")

// 2. Then send spans to the Jaeger receiver.
jexp, err := newClientUDP(agentEndpoint, jr.config.AgentBinaryThrift.Endpoint != "")
jexp, err := newClientUDP(agentEndpoint, jr.config.ThriftBinaryUDP != nil)
require.NoError(t, err, "Failed to create the Jaeger OpenTelemetry exporter for the live application")

// 3. Now finally send some spans
Expand Down
Loading

0 comments on commit 7865587

Please sign in to comment.