Skip to content

Commit

Permalink
add init to aggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
fyInALT committed Apr 2, 2024
1 parent cce652b commit 25c5411
Show file tree
Hide file tree
Showing 10 changed files with 217 additions and 22 deletions.
12 changes: 12 additions & 0 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/Layr-Labs/eigensdk-go/crypto/bls"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/ethereum/go-ethereum/common"

Expand Down Expand Up @@ -43,6 +44,11 @@ type FinishedTaskStatus struct {
TransactionIndex uint `json:"transactionIndex"`
}

type OperatorStatus struct {
LastTime int64 `json:"lastTime"`
OperatorId bls.OperatorId `json:"operatorId"`
}

// Aggregator sends tasks (numbers to square) onchain, then listens for operator signed TaskResponses.
// It aggregates responses signatures, and if any of the TaskResponses reaches the QuorumThresholdPercentage for each quorum
// (currently we only use a single quorum of the ERC20Mock token), it sends the aggregated TaskResponse and signature onchain.
Expand Down Expand Up @@ -90,6 +96,10 @@ type Aggregator struct {
finishedTasksMu sync.RWMutex
nextTaskIndex types.TaskIndex
nextTaskIndexMu sync.RWMutex

cfg *config.Config
operatorStatus map[common.Address]*OperatorStatus
operatorStatusMu sync.RWMutex
}

// NewAggregator creates a new Aggregator with the provided config.
Expand Down Expand Up @@ -134,6 +144,8 @@ func NewAggregator(c *config.Config) (*Aggregator, error) {
blsAggregationService: blsAggregationService,
tasks: make(map[types.TaskIndex]*message.AlertTaskInfo),
finishedTasks: make(map[[32]byte]*FinishedTaskStatus),
operatorStatus: make(map[common.Address]*OperatorStatus),
cfg: c,
}, nil
}

Expand Down
3 changes: 0 additions & 3 deletions aggregator/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ var (
)

func main() {

app := cli.NewApp()
app.Flags = config.Flags
app.Version = fmt.Sprintf("%s-%s-%s", Version, GitCommit, GitDate)
Expand All @@ -37,7 +36,6 @@ func main() {
}

func aggregatorMain(ctx *cli.Context) error {

log.Println("Initializing Aggregator")
config, err := config.NewConfig(ctx)
if err != nil {
Expand All @@ -60,5 +58,4 @@ func aggregatorMain(ctx *cli.Context) error {
}

return nil

}
41 changes: 41 additions & 0 deletions aggregator/rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net/http"
"net/rpc"
"time"

"github.com/alt-research/avs/aggregator/types"
"github.com/alt-research/avs/core/message"
Expand Down Expand Up @@ -74,6 +75,46 @@ func (agg *Aggregator) GetFinishedTaskByAlertHash(alertHash [32]byte) *FinishedT
return agg.finishedTasks[alertHash]
}

// rpc endpoint which is called by operator
// will init operator, just for keep config valid
func (agg *Aggregator) InitOperator(req *message.InitOperatorRequest, reply *message.InitOperatorResponse) error {
agg.logger.Infof("Received InitOperator: %#v", req)

reply.Ok = false

if agg.cfg.OperatorStateRetrieverAddr != req.OperatorStateRetrieverAddr {
reply.Res = fmt.Sprintf("OperatorStateRetrieverAddr invaild, expect %s", agg.cfg.OperatorStateRetrieverAddr.Hex())
return nil
}

if agg.cfg.RegistryCoordinatorAddr != req.RegistryCoordinatorAddr {
reply.Res = fmt.Sprintf("RegistryCoordinatorAddr invaild, expect %s", agg.cfg.RegistryCoordinatorAddr.Hex())
return nil
}

if agg.cfg.Layer1ChainId != req.Layer1ChainId {
reply.Res = fmt.Sprintf("Layer1ChainId invaild, expect %d", agg.cfg.Layer1ChainId)
return nil
}

if agg.cfg.Layer2ChainId != req.ChainId {
reply.Res = fmt.Sprintf("Layer2ChainId invaild, expect %d", agg.cfg.Layer2ChainId)
return nil
}

agg.operatorStatusMu.Lock()
defer agg.operatorStatusMu.Unlock()

agg.operatorStatus[req.OperatorAddress] = &OperatorStatus{
LastTime: time.Now().Unix(),
OperatorId: req.OperatorId,
}

agg.logger.Infof("new operator status: %s", req.OperatorAddress.Hex())

return nil
}

// rpc endpoint which is called by operator
// will try to init the task, if currently had a same task for the alert,
// it will return the existing task.
Expand Down
7 changes: 7 additions & 0 deletions config-files/aggregator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,12 @@
environment: production
eth_rpc_url: http://localhost:8545
eth_ws_url: ws://localhost:8545

# address which the aggregator listens on for operator signed messages
aggregator_server_ip_port_address: localhost:8090

# the layer1 chain id the avs contracts in
layer1_chain_id: 31337

# the layer2 chain id
layer2_chain_id: 20240219
2 changes: 2 additions & 0 deletions core/config/avs_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ type NodeConfig struct {
OperatorServerIpPortAddr string `yaml:"operator_server_ip_port_addr"`
MetadataURI string `yaml:"metadata_uri"`
OperatorSocket string `yaml:"operator_socket"`
Layer1ChainId uint32 `yaml:"layer1_chain_id"`
Layer2ChainId uint32 `yaml:"layer2_chain_id"`
}
18 changes: 18 additions & 0 deletions core/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"os"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -36,6 +37,8 @@ type Config struct {
OperatorStateRetrieverAddr common.Address
RegistryCoordinatorAddr common.Address
AggregatorServerIpPortAddr string
Layer1ChainId uint32
Layer2ChainId uint32
// json:"-" skips this field when marshaling (only used for logging to stdout), since SignerFn doesnt implement marshalJson
SignerFn signerv2.SignerFn `json:"-"`
PrivateKey *ecdsa.PrivateKey `json:"-"`
Expand All @@ -49,6 +52,8 @@ type ConfigRaw struct {
EthRpcUrl string `yaml:"eth_rpc_url"`
EthWsUrl string `yaml:"eth_ws_url"`
AggregatorServerIpPortAddr string `yaml:"aggregator_server_ip_port_address"`
Layer1ChainId uint32 `yaml:"layer1_chain_id"`
Layer2ChainId uint32 `yaml:"layer2_chain_id"`
}

// These are read from DeploymentFileFlag
Expand Down Expand Up @@ -116,6 +121,17 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
return nil, err
}

layer1ChainIdFromRpc, err := ethRpcClient.ChainID(context.Background())
if err != nil {
logger.Errorf("Cannot got chain id from eth rpc client", "err", err)
return nil, err
}

if layer1ChainIdFromRpc.Uint64() != uint64(configRaw.Layer1ChainId) {
logger.Errorf("The layer1 chain id not expect", "layer1 rpc", layer1ChainIdFromRpc, "config", configRaw.Layer1ChainId)
return nil, fmt.Errorf("layer1 chain id not expect")
}

ethWsClient, err := eth.NewClient(configRaw.EthWsUrl)
if err != nil {
logger.Errorf("Cannot create ws ethclient", "err", err)
Expand Down Expand Up @@ -168,6 +184,8 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
PrivateKey: ecdsaPrivateKey,
TxMgr: txMgr,
AggregatorAddress: aggregatorAddr,
Layer1ChainId: configRaw.Layer1ChainId,
Layer2ChainId: configRaw.Layer2ChainId,
}
config.validate()
return config, nil
Expand Down
17 changes: 17 additions & 0 deletions core/message/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package message
import (
csservicemanager "github.com/alt-research/avs/contracts/bindings/MachServiceManager"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"golang.org/x/crypto/sha3"

"github.com/Layr-Labs/eigensdk-go/crypto/bls"
Expand Down Expand Up @@ -92,6 +93,22 @@ func (a AlertTaskInfo) ToIMachServiceManagerAlertHeader() csservicemanager.IMach
}
}

// The init operator request
type InitOperatorRequest struct {
Layer1ChainId uint32
ChainId uint32
OperatorId bls.OperatorId
OperatorAddress common.Address
OperatorStateRetrieverAddr common.Address
RegistryCoordinatorAddr common.Address
}

// The init operator response
type InitOperatorResponse struct {
Ok bool
Res string
}

// The Alert task create request
type CreateTaskRequest struct {
AlertHash [32]byte
Expand Down
48 changes: 38 additions & 10 deletions operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log"
"os"
"strconv"
"strings"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
Expand Down Expand Up @@ -157,6 +158,26 @@ func withEnvConfig(c config.NodeConfig) config.NodeConfig {
c.OperatorSocket = operatorSocket
}

layer1ChainId, ok := os.LookupEnv("LAYER1_CHAIN_ID")
if ok && layer1ChainId != "" {
layer1ChainId, err := strconv.Atoi(layer1ChainId)
if err != nil {
panic(fmt.Sprintf("layer1_chain_id parse error: %v", err))
}

c.Layer1ChainId = uint32(layer1ChainId)
}

layer2ChainId, ok := os.LookupEnv("LAYER2_CHAIN_ID")
if ok && layer2ChainId != "" {
layer2ChainId, err := strconv.Atoi(layer2ChainId)
if err != nil {
panic(fmt.Sprintf("layer2_chain_id parse error: %v", err))
}

c.Layer2ChainId = uint32(layer2ChainId)
}

configJson, err := json.MarshalIndent(c, "", " ")
if err != nil {
panic(err)
Expand Down Expand Up @@ -299,7 +320,14 @@ func NewOperatorFromConfig(cfg config.NodeConfig) (*Operator, error) {
AVS_NAME, logger, operatorAddress, quorumNames)
reg.MustRegister(economicMetricsCollector)

aggregatorRpcClient, err := NewAggregatorRpcClient(c.AggregatorServerIpPortAddress, logger, avsAndEigenMetrics)
// OperatorId is set in contract during registration so we get it after registering operator.
operatorId, err := sdkClients.AvsRegistryChainReader.GetOperatorId(&bind.CallOpts{}, operatorAddress)
if err != nil {
logger.Error("Cannot get operator id", "err", err)
return nil, err
}

aggregatorRpcClient, err := NewAggregatorRpcClient(c, operatorId, operatorAddress, logger, avsAndEigenMetrics)
if err != nil {
logger.Error("Cannot create AggregatorRpcClient. Is aggregator running?", "err", err)
return nil, err
Expand Down Expand Up @@ -331,17 +359,9 @@ func NewOperatorFromConfig(cfg config.NodeConfig) (*Operator, error) {
newTaskCreatedChan: newTaskCreatedChan,
serviceManagerAddr: common.HexToAddress(c.AVSRegistryCoordinatorAddress),
metadataURI: c.MetadataURI,
operatorId: [32]byte{0}, // this is set below

operatorId: operatorId,
}

// OperatorId is set in contract during registration so we get it after registering operator.
operatorId, err := sdkClients.AvsRegistryChainReader.GetOperatorId(&bind.CallOpts{}, operator.operatorAddr)
if err != nil {
logger.Error("Cannot get operator id", "err", err)
return nil, err
}
operator.operatorId = operatorId
logger.Info("Operator info",
"operatorId", operatorId,
"operatorAddr", operatorAddress,
Expand All @@ -368,6 +388,14 @@ func (o *Operator) Start(ctx context.Context) error {

o.logger.Infof("Starting operator.")

o.logger.Infof("Init operator to aggregator.")
err = o.aggregatorRpcClient.InitOperatorToAggregator()
if err != nil {
o.logger.Errorf("Init operator to aggregator failed: %v", err)
return err
}
o.logger.Infof("Init operator to aggregator succeeded.")

if o.config.EnableNodeApi {
o.nodeApi.Start()
}
Expand Down
Loading

0 comments on commit 25c5411

Please sign in to comment.