Skip to content

Commit

Permalink
feat: Add JSON Rpc api support (#111)
Browse files Browse the repository at this point in the history
* add jsonrpc api server

* support json rpc
  • Loading branch information
fyInALT authored Apr 8, 2024
1 parent e0531e5 commit dc05876
Show file tree
Hide file tree
Showing 10 changed files with 561 additions and 80 deletions.
46 changes: 32 additions & 14 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,16 @@ type OperatorStatus struct {
type Aggregator struct {
logger logging.Logger

serverIpPortAddr string
grpcServerIpPortAddr string
serverIpPortAddr string
grpcServerIpPortAddr string
jsonRpcServerIpPortAddr string

avsWriter chainio.AvsWriterer

service *AggregatorService
legacyRpc *rpc.LegacyRpcHandler
gRpc *rpc.GRpcHandler
service *AggregatorService
legacyRpc *rpc.LegacyRpcHandler
gRpc *rpc.GRpcHandler
jsonrpcServer *rpc.JsonRpcServer
}

// NewAggregator creates a new Aggregator with the provided config.
Expand All @@ -105,20 +107,28 @@ func NewAggregator(c *config.Config) (*Aggregator, error) {

legacyRpc := rpc.NewLegacyRpcHandler(c.Logger, service)

var grpc_server *rpc.GRpcHandler
var grpcServer *rpc.GRpcHandler
if c.AggregatorGRPCServerIpPortAddr != "" {
c.Logger.Infof("Create grpc server in %s", c.AggregatorGRPCServerIpPortAddr)
grpc_server = rpc.NewGRpcHandler(c.Logger, service)
grpcServer = rpc.NewGRpcHandler(c.Logger, service)
}

var jsonrpcServer *rpc.JsonRpcServer
if c.AggregatorJSONRPCServerIpPortAddr != "" {
c.Logger.Infof("Create json rpc server in %s", c.AggregatorJSONRPCServerIpPortAddr)
jsonrpcServer = rpc.NewJsonRpcServer(c.Logger, service)
}

return &Aggregator{
logger: c.Logger,
serverIpPortAddr: c.AggregatorServerIpPortAddr,
grpcServerIpPortAddr: c.AggregatorGRPCServerIpPortAddr,
avsWriter: avsWriter,
service: service,
legacyRpc: legacyRpc,
gRpc: grpc_server,
logger: c.Logger,
serverIpPortAddr: c.AggregatorServerIpPortAddr,
grpcServerIpPortAddr: c.AggregatorGRPCServerIpPortAddr,
jsonRpcServerIpPortAddr: c.AggregatorJSONRPCServerIpPortAddr,
avsWriter: avsWriter,
service: service,
legacyRpc: legacyRpc,
gRpc: grpcServer,
jsonrpcServer: jsonrpcServer,
}, nil
}

Expand Down Expand Up @@ -151,6 +161,10 @@ func (agg *Aggregator) startRpcServer(ctx context.Context) {
if agg.gRpc != nil {
go agg.gRpc.StartServer(ctx, agg.grpcServerIpPortAddr)
}

if agg.jsonrpcServer != nil {
go agg.jsonrpcServer.StartServer(ctx, agg.jsonRpcServerIpPortAddr)
}
}

func (agg *Aggregator) wait() {
Expand All @@ -164,6 +178,10 @@ func (agg *Aggregator) wait() {
agg.gRpc.Wait()
}

if agg.jsonrpcServer != nil {
agg.jsonrpcServer.Wait()
}

agg.logger.Info("The aggregator is exited")
}

Expand Down
213 changes: 213 additions & 0 deletions aggregator/rpc/jsonrpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package rpc

import (
"context"
"fmt"
"sync"

"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/alt-research/avs/api/grpc/aggregator"
"github.com/alt-research/avs/core/message"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/node"
gethrpc "github.com/ethereum/go-ethereum/rpc"
)

type JsonRpcServer struct {
logger logging.Logger
handler JsonRpcHandler
vhosts []string
cors []string
wg *sync.WaitGroup
}

func NewJsonRpcServer(logger logging.Logger, aggreagtor AggregatorRpcHandler) *JsonRpcServer {
return &JsonRpcServer{
logger: logger,
handler: JsonRpcHandler{
logger: logger,
aggreagtor: aggreagtor,
},
wg: &sync.WaitGroup{},
}
}

func (s *JsonRpcServer) GetAPI() gethrpc.API {
return gethrpc.API{
Namespace: "aggregator",
Service: &s.handler,
}
}

func (s *JsonRpcServer) StartServer(ctx context.Context, serverIpPortAddr string) {
s.logger.Info("Start JSON RPC Server", "addr", serverIpPortAddr)

rpcAPI := []gethrpc.API{s.GetAPI()}

srv := gethrpc.NewServer()
err := node.RegisterApis(rpcAPI, []string{"aggregator"}, srv)
if err != nil {
s.logger.Fatalf("Could not register API: %w", err)
}
handler := node.NewHTTPHandlerStack(srv, s.cors, s.vhosts, nil)

httpServer, addr, err := node.StartHTTPEndpoint(serverIpPortAddr, gethrpc.DefaultHTTPTimeouts, handler)
if err != nil {
s.logger.Fatalf("Could not start RPC api: %v", err)
}

extapiURL := fmt.Sprintf("http://%v/", addr)
s.logger.Info("HTTP endpoint opened", "url", extapiURL)

serverErr := make(chan error, 1)

s.wg.Add(1)
defer s.wg.Done()

select {
case <-ctx.Done():
s.logger.Info("Stop JSON RPC Server by Done")
err := httpServer.Shutdown(context.Background())
if err != nil {
s.logger.Errorf("Stop JSON RPC Server by error: %v", err.Error())
}
case err = <-serverErr:
}

if err != nil {
s.logger.Error("JSON RPC Server serve stopped by error", "err", err)
} else {
s.logger.Info("JSON RPC Server serve stopped")
}
}

func (s *JsonRpcServer) Wait() {
s.wg.Wait()
}

type JsonRpcHandler struct {
logger logging.Logger
aggreagtor AggregatorRpcHandler
}

type InitOperatorResponse struct {
Ok bool `json:"ok"`
Reason string `json:"reason"`
}

func (h *JsonRpcHandler) InitOperator(
ctx context.Context,
layer1ChainId uint32,
chainId uint32,
operatorId hexutil.Bytes,
operatorAddress string,
operatorStateRetrieverAddr string,
registryCoordinatorAddr string,
) (InitOperatorResponse, error) {
req, err := message.NewInitOperatorRequest(&aggregator.InitOperatorRequest{
Layer1ChainId: layer1ChainId,
ChainId: chainId,
OperatorId: operatorId,
OperatorAddress: operatorAddress,
OperatorStateRetrieverAddr: operatorStateRetrieverAddr,
RegistryCoordinatorAddr: registryCoordinatorAddr,
})
if err != nil {
return InitOperatorResponse{}, fmt.Errorf("initOperator parse request falied: %v", err)
}

res, err := h.aggreagtor.InitOperator(req)
if err != nil {
return InitOperatorResponse{}, fmt.Errorf("initOperator process request falied: %v", err)
}

resp := InitOperatorResponse{
Ok: res.Ok,
Reason: res.Res,
}

return resp, nil
}

type AlertTaskInfo struct {
// The hash of alert
AlertHash hexutil.Bytes `json:"alert_hash"`
// QuorumNumbers of task
QuorumNumbers []uint8 `json:"quorum_numbers"`
// QuorumThresholdPercentages of task
QuorumThresholdPercentages []uint8 `json:"quorum_threshold_percentages"`
// TaskIndex
TaskIndex uint32 `json:"task_index"`
// ReferenceBlockNumber
ReferenceBlockNumber uint64 `json:"reference_block_number"`
}

func (h *JsonRpcHandler) CreateTask(
ctx context.Context,
alertHash hexutil.Bytes,
) (AlertTaskInfo, error) {
req, err := message.NewCreateTaskRequest(&aggregator.CreateTaskRequest{
AlertHash: alertHash,
})
if err != nil {
return AlertTaskInfo{}, fmt.Errorf("createTask parse request falied: %v", err)
}

res, err := h.aggreagtor.CreateTask(req)
if err != nil {
return AlertTaskInfo{}, fmt.Errorf("createTask process request falied: %v", err)
}

info := res.Info.ToPbType()

resp := AlertTaskInfo{
AlertHash: info.AlertHash,
QuorumNumbers: info.QuorumNumbers,
QuorumThresholdPercentages: info.QuorumThresholdPercentages,
TaskIndex: info.TaskIndex,
ReferenceBlockNumber: info.ReferenceBlockNumber,
}

return resp, nil
}

type SignedTaskRespResponse struct {
// If need reply
Reply bool `json:"reply"`
// The tx hash of send
TxHash []byte `json:"tx_hash"`
}

func (h *JsonRpcHandler) ProcessSignedTaskResponse(
ctx context.Context,
alertInfo AlertTaskInfo,
operatorRequestSignature hexutil.Bytes,
operatorId hexutil.Bytes,
) (SignedTaskRespResponse, error) {
req, err := message.NewSignedTaskRespRequest(&aggregator.SignedTaskRespRequest{
Alert: &aggregator.AlertTaskInfo{
AlertHash: alertInfo.AlertHash,
QuorumNumbers: alertInfo.QuorumNumbers,
QuorumThresholdPercentages: alertInfo.QuorumThresholdPercentages,
TaskIndex: alertInfo.TaskIndex,
ReferenceBlockNumber: alertInfo.ReferenceBlockNumber,
},
OperatorRequestSignature: operatorRequestSignature,
OperatorId: operatorId,
})
if err != nil {
return SignedTaskRespResponse{}, fmt.Errorf("processSignedTaskResponse parse request falied: %v", err)
}

res, err := h.aggreagtor.ProcessSignedTaskResponse(req)
if err != nil {
return SignedTaskRespResponse{}, fmt.Errorf("processSignedTaskResponse process request falied: %v", err)
}

resp := SignedTaskRespResponse{
Reply: res.Reply,
TxHash: res.TxHash[:],
}

return resp, nil
}
3 changes: 3 additions & 0 deletions config-files/aggregator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ aggregator_server_ip_port_address: localhost:8090
# address which the aggregator grpc listens on for operator signed messages
aggregator_grpc_server_ip_port_address: localhost:8190

# address which the aggregator json rpc listens on for operator signed messages
aggregator_jsonrpc_server_ip_port_address: localhost:8290

# the layer1 chain id the avs contracts in
layer1_chain_id: 31337

Expand Down
1 change: 1 addition & 0 deletions core/config/avs_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type NodeConfig struct {
EcdsaPrivateKeyStorePath string `yaml:"ecdsa_private_key_store_path"`
AggregatorServerIpPortAddress string `yaml:"aggregator_server_ip_port_address"`
AggregatorGRPCServerIpPortAddress string `yaml:"aggregator_grpc_server_ip_port_address"`
AggregatorJSONRPCServerIpPortAddr string `yaml:"aggregator_jsonrpc_server_ip_port_address"`
EigenMetricsIpPortAddress string `yaml:"eigen_metrics_ip_port_address"`
EnableMetrics bool `yaml:"enable_metrics"`
NodeApiIpPortAddress string `yaml:"node_api_ip_port_address"`
Expand Down
Loading

0 comments on commit dc05876

Please sign in to comment.