diff --git a/operator/rpc_client.go b/operator/rpc_client.go index 7b2bfb1..90c531c 100644 --- a/operator/rpc_client.go +++ b/operator/rpc_client.go @@ -1,7 +1,10 @@ package operator import ( + "crypto/tls" "fmt" + "io" + "net" "net/rpc" "strings" "time" @@ -48,13 +51,19 @@ func NewAggregatorRpcClient(config config.NodeConfig, operatorId sdktypes.Operat }, nil } -func (c *AggregatorRpcClient) dialAggregatorRpcClient() error { - client, err := rpc.DialHTTP("tcp", c.aggregatorIpPortAddr) +func (c *AggregatorRpcClient) dialAggregatorRpcClient() (err error) { + var conn io.ReadWriteCloser + conn, err = tls.Dial("tcp", c.aggregatorIpPortAddr, &tls.Config{}) if err != nil { - return err + if _, ok := err.(tls.RecordHeaderError); !ok { + return err + } + // retry connect without tls + conn, err = net.Dial("tcp", c.aggregatorIpPortAddr) } + client := rpc.NewClient(conn) c.rpcClient = client - return nil + return } // CreateAlertTaskToAggregator create a new alert task, if had existing, just return current alert task. @@ -78,7 +87,7 @@ func (c *AggregatorRpcClient) InitOperatorToAggregator() error { RegistryCoordinatorAddr: c.RegistryCoordinatorAddr, } - c.logger.Info("Create task header to aggregator", "req", fmt.Sprintf("%#v", req)) + c.logger.Info("Init operator to aggregator", "req", fmt.Sprintf("%#v", req)) for i := 0; i < 5; i++ { err := c.rpcClient.Call("Aggregator.InitOperator", req, &reply) @@ -125,7 +134,7 @@ func (c *AggregatorRpcClient) CreateAlertTaskToAggregator(alertHash [32]byte) (* AlertHash: alertHash, } - c.logger.Info("Create task header to aggregator", "req", fmt.Sprintf("%#v", req)) + c.logger.Info("Create task to aggregator", "req", fmt.Sprintf("%#v", req)) for i := 0; i < 5; i++ { err := c.rpcClient.Call("Aggregator.CreateTask", req, &reply) @@ -135,16 +144,16 @@ func (c *AggregatorRpcClient) CreateAlertTaskToAggregator(alertHash [32]byte) (* return nil, err } } else { - c.logger.Info("create task response header accepted by aggregator.", "reply", reply) + c.logger.Info("create task accepted by aggregator.", "reply", reply) c.metrics.IncNumTasksAcceptedByAggregator() return &reply.Info, nil } c.logger.Infof("Retrying in 2 seconds") time.Sleep(2 * time.Second) } - c.logger.Errorf("Could not send signed task response to aggregator. Tried 5 times.") + c.logger.Errorf("Could not create task to aggregator. Tried 5 times.") - return nil, fmt.Errorf("Could not send signed task response to aggregator") + return nil, fmt.Errorf("Could not create task to aggregator") } // SendSignedTaskResponseToAggregator sends a signed task response to the aggregator.