From da581a538c1db8fd72e5d30dd14bbe657ec318ae Mon Sep 17 00:00:00 2001 From: tchataigner Date: Fri, 30 Aug 2024 18:47:03 +0200 Subject: [PATCH] feat: docker & k8s (#12) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: docker & k8s Signed-off-by: Thomas Chataigner * feat: wip docker * feat: wip docker flexible lc * feat: simplify k8s conf * feat: refactor aptos proof server to one bin * feat: one server bin eth + k8s aptos * feat: ethereum client configuration * chore: lint * ci: revise docker publish * refactor: base review integrated * chore: lint * refactor: router for proof server + health check * refactor: ethereum health check * refactor: probes * refactor: multiple routes eth proof server * refactor: fix compilation * refactor: accept octet stream * refactor: change handling request proof server * refactor: all routes working * refactor: not using serde json * refactor: only one request * refactor: health does not count as increment * fix: fix middleware * refactor: working aptos proof_server * chore: use let-else more effectively (#197) * refactor: replicas --------- Signed-off-by: Thomas Chataigner Co-authored-by: François Garillot <4142+huitseeker@users.noreply.github.com> --- .github/workflows/docker-publish.yml | 82 ++++ .gitignore | 10 + aptos/Cargo.lock | 16 + aptos/Cargo.toml | 2 + aptos/README.md | 41 +- aptos/docs/src/run/setup_proof_server.md | 4 +- aptos/proof-server/Cargo.toml | 10 +- aptos/proof-server/benches/proof_server.rs | 78 ++-- aptos/proof-server/src/bin/client.rs | 111 +++-- aptos/proof-server/src/bin/proof_server.rs | 436 ++++++++++++++++++ aptos/proof-server/src/bin/server_primary.rs | 216 --------- .../proof-server/src/bin/server_secondary.rs | 124 ----- aptos/proof-server/src/lib.rs | 2 +- aptos/proof-server/src/types/proof_server.rs | 90 +++- aptos/proof-server/src/utils.rs | 28 -- docker/.dockerignore | 2 + docker/Dockerfile | 78 ++++ docker/README.md | 59 +++ docker/compose/.example.env | 5 + docker/compose/docker-compose-aptos.yml | 12 + docker/compose/docker-compose-ethereum.yml | 14 + .../compose/docker-compose-proof-servers.yml | 24 + docker/k8s/README.md | 50 ++ .../proof-server/proof-server-deployment.yaml | 42 ++ docker/k8s/proof-server/proof-server-hpa.yaml | 18 + .../proof-server/proof-server-service.yaml | 12 + ethereum/Cargo.lock | 16 + ethereum/Cargo.toml | 2 + ethereum/core/src/merkle/utils/mod.rs | 1 - ethereum/docs/src/run/setup_proof_server.md | 6 +- ethereum/light-client/Cargo.toml | 10 +- ethereum/light-client/src/bin/client.rs | 2 + ethereum/light-client/src/bin/proof_server.rs | 323 +++++++++++++ .../light-client/src/bin/server_primary.rs | 90 ---- .../light-client/src/bin/server_secondary.rs | 77 ---- ethereum/light-client/src/client/beacon.rs | 11 + .../light-client/src/client/checkpoint.rs | 11 + ethereum/light-client/src/client/error.rs | 2 + ethereum/light-client/src/client/mod.rs | 17 + .../light-client/src/client/proof_server.rs | 215 ++++----- ethereum/light-client/src/client/storage.rs | 11 + ethereum/light-client/src/client/utils.rs | 30 ++ ethereum/light-client/src/lib.rs | 2 - ethereum/light-client/src/utils.rs | 29 -- fixture-generator/Cargo.lock | 16 + 45 files changed, 1634 insertions(+), 803 deletions(-) create mode 100644 .github/workflows/docker-publish.yml create mode 100644 aptos/proof-server/src/bin/proof_server.rs delete mode 100644 aptos/proof-server/src/bin/server_primary.rs delete mode 100644 aptos/proof-server/src/bin/server_secondary.rs create mode 100644 docker/.dockerignore create mode 100644 docker/Dockerfile create mode 100644 docker/README.md create mode 100644 docker/compose/.example.env create mode 100644 docker/compose/docker-compose-aptos.yml create mode 100644 docker/compose/docker-compose-ethereum.yml create mode 100644 docker/compose/docker-compose-proof-servers.yml create mode 100644 docker/k8s/README.md create mode 100644 docker/k8s/proof-server/proof-server-deployment.yaml create mode 100644 docker/k8s/proof-server/proof-server-hpa.yaml create mode 100644 docker/k8s/proof-server/proof-server-service.yaml create mode 100644 ethereum/light-client/src/bin/proof_server.rs delete mode 100644 ethereum/light-client/src/bin/server_primary.rs delete mode 100644 ethereum/light-client/src/bin/server_secondary.rs create mode 100644 ethereum/light-client/src/client/utils.rs delete mode 100644 ethereum/light-client/src/utils.rs diff --git a/.github/workflows/docker-publish.yml b/.github/workflows/docker-publish.yml new file mode 100644 index 00000000..33a0145b --- /dev/null +++ b/.github/workflows/docker-publish.yml @@ -0,0 +1,82 @@ +# Source: https://raw.githubusercontent.com/foundry-rs/foundry/master/.github/workflows/docker-publish.yml +name: docker + +on: + workflow_dispatch: + inputs: + light-client: + description: 'aptos or ethereum' + type: choice + options: + - aptos + - ethereum + +env: + REGISTRY: ghcr.io +jobs: + container: + runs-on: ubuntu-latest + # https://docs.github.com/en/actions/reference/authentication-in-a-workflow + permissions: + id-token: write + packages: write + contents: read + timeout-minutes: 120 + steps: + - name: Checkout repository + id: checkout + uses: actions/checkout@v4 + + - name: Install Docker BuildX + uses: docker/setup-buildx-action@v2 + id: buildx + with: + install: true + + # Login against a Docker registry except on PR + # https://github.com/docker/login-action + - name: Log into registry ${{ env.REGISTRY }} + # Ensure this doesn't trigger on PR's + if: github.event_name != 'pull_request' + uses: docker/login-action@v2 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.REPO_TOKEN }} + + # Extract metadata (tags, labels) for Docker + # https://github.com/docker/metadata-action + - name: Extract Docker metadata + id: meta + uses: docker/metadata-action@v4 + with: + images: "argumentcomputer/${{ inputs.light-client }}" + + # Creates an additional 'latest' + - name: Finalize Docker Metadata + id: docker_tagging + run: | + echo "Neither scheduled nor manual release from main branch. Just tagging as branch name" + echo "docker_tags=argumentcomputer/${{ inputs.light-client }}:${GITHUB_REF##*/}" >> $GITHUB_OUTPUT + + # Log docker metadata to explicitly know what is being pushed + - name: Inspect Docker Metadata + run: | + echo "TAGS -> ${{ steps.docker_tagging.outputs.docker_tags }}" + echo "LABELS -> ${{ steps.meta.outputs.labels }}" + + # Build and push Docker image + # https://github.com/docker/build-push-action + # https://github.com/docker/build-push-action/blob/master/docs/advanced/cache.md + - name: Build and push Docker image + uses: docker/build-push-action@v3 + with: + context: . + file: ./docker/Dockerfile + push: true + tags: ${{ steps.docker_tagging.outputs.docker_tags }} + labels: ${{ steps.meta.outputs.labels }} + cache-from: type=gha + cache-to: type=gha,mode=max + build-args: | + LIGHT_CLIENT=${{ inputs.light-client }} \ No newline at end of file diff --git a/.gitignore b/.gitignore index d9247537..dc156aa2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,16 @@ +# Rust assets **/target +Cargo.lock + +# IDE config .idea +# Secrets +*secret* +*.env +!.example.env + +# Contract assets ethereum/move/build/* ethereum/move/gas-profiling/* diff --git a/aptos/Cargo.lock b/aptos/Cargo.lock index aa44f5da..8f8e5b44 100644 --- a/aptos/Cargo.lock +++ b/aptos/Cargo.lock @@ -2233,6 +2233,20 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b7e4c2464d97fe331d41de9d5db0def0a96f4d823b8b32a2efd503578988973" +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "futures-core", + "getrandom 0.2.14", + "instant", + "pin-project-lite", + "rand 0.8.5", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.73" @@ -8084,6 +8098,8 @@ dependencies = [ "anyhow", "aptos-lc", "aptos-lc-core", + "axum 0.7.5", + "backoff", "bcs 0.1.4", "chrono", "clap 4.5.16", diff --git a/aptos/Cargo.toml b/aptos/Cargo.toml index e73185ec..e4fda640 100644 --- a/aptos/Cargo.toml +++ b/aptos/Cargo.toml @@ -25,6 +25,8 @@ aptos-temppath = { git = "https://github.com/aptos-labs/aptos-core/", tag = "apt aptos-types = { git = "https://github.com/aptos-labs/aptos-core/", tag = "aptos-node-v1.14.0" } aptos-vm = { git = "https://github.com/aptos-labs/aptos-core/", tag = "aptos-node-v1.14.0" } aptos-vm-genesis = { git = "https://github.com/aptos-labs/aptos-core/", tag = "aptos-node-v1.14.0" } +axum = "0.7.5" +backoff = { version = "0.4.0", features = ["tokio"] } # From https://github.com/aptos-labs/aptos-core/blob/aptos-node-v1.14.0/Cargo.toml#L485 bcs = { git = "https://github.com/aptos-labs/bcs.git", rev = "d31fab9d81748e2594be5cd5cdf845786a30562d" } bls12_381 = { git = "https://github.com/argumentcomputer/bls12_381.git", branch = "zkvm" } diff --git a/aptos/README.md b/aptos/README.md index 84994bab..372ac957 100644 --- a/aptos/README.md +++ b/aptos/README.md @@ -1,12 +1,8 @@ ## Aptos Light Client -This is a light client for the Aptos blockchain. It is written in Rust and lives in the workspace defined in this -directory. -In this README we will go over a few details that need to be known before hopping into development. +This is a light client for the Aptos blockchain. It is written in Rust and lives in the workspace defined in this directory. In this README we will go over a few details that need to be known before hopping into development. -For a more detailed overview of the Light Client and its components, and how to run and benchmark it, you can refer to -the -mdBook. To read it run: +For a more detailed overview of the Light Client and its components, and how to run and benchmark it, you can refer to the mdBook. To read it run: ```bash cd docs && \ @@ -19,31 +15,36 @@ Then navigate to [`localhost:3000`](http://localhost:3000). The workspace is divided into the following: -- `proof-server`: The server layer on top of the proving library. It exposes a REST API to generate proofs for the - light client. -- `light-client`: The main library that contains the light client implementation. It is in charge of producing proofs - regarding the consensus of the chain and inclusion of some account values in a Merkle Tree. +- +`proof-server`: The server layer on top of the proving library. It exposes a REST API to generate proofs for the light client. +- +`light-client`: The main library that contains the light client implementation. It is in charge of producing proofs regarding the consensus of the chain and inclusion of some account values in a Merkle Tree. - `core`: The core library that contains the data structures and utilities used by the light client. - `aptos-programs`: A library that exposes the Sphinx programs used to generate proofs for our light client.* - `programs/*`: Actual implementations of the Sphinx programs. ## Development -When developing, you might have to update the programs' implementation. The -programs implementations are located in `./programs/*` and the compiled binaries -are located in `./aptos-programs/artifacts`. Currently, artifacts binaries are -generated in two ways: +When developing, you might have to update the programs' implementation. The programs implementations are located in +`./programs/*` and the compiled binaries are located in +`./aptos-programs/artifacts`. Currently, artifacts binaries are generated in two ways: -- Automated: There is a build script located at `./aptos-programs/build.rs` that - will compile all the programs and place them in the `./aptos-programs/artifacts` +- Automated: There is a build script located at + `./aptos-programs/build.rs` that will compile all the programs and place them in the `./aptos-programs/artifacts` folder. To enable this feature, it is needed to set the environment variable `LC_PROGRAM_AUTOBUILD=1`. -- Manual: You can also compile the programs manually using `make` by running the following - command in the `./aptos-programs` folder: +- Manual: You can also compile the programs manually using `make` by running the following command in the + `./aptos-programs` folder: ```shell make ``` +## Running the Project + +To run all the Light Client components, you can either run them manually (refer to [the README in the `proof-server` +crate](./proof-server/README.md)) +or leverage our docker files (see [the README in the `docker` folder](../docker/README.md)). + ## Benchmarks -For more information about how to run the benchmarks, please refer to the dedicated section of the mdBook. Otherwise, -the READMEs can be found in the [`docs/src/benchmark`](./docs/src/benchmark/overview.md) folder. \ No newline at end of file +For more information about how to run the benchmarks, please refer to the dedicated section of the mdBook. Otherwise, the READMEs can be found in the [ +`docs/src/benchmark`](./docs/src/benchmark/overview.md) folder. \ No newline at end of file diff --git a/aptos/docs/src/run/setup_proof_server.md b/aptos/docs/src/run/setup_proof_server.md index 9a5fa11e..6cdbcbff 100644 --- a/aptos/docs/src/run/setup_proof_server.md +++ b/aptos/docs/src/run/setup_proof_server.md @@ -38,7 +38,7 @@ Now that our deployment machine is properly configured, we can run the secondary ```bash git clone git@github.com:argumentcomputer/zk-light-clients.git && \ cd zk-light-clients/aptos/proof-server && \ - SHARD_BATCH_SIZE=0 RUSTFLAGS="-C target-cpu=native --cfg tokio_unstable -C opt-level=3" cargo run --release --bin server_secondary -- -a + SHARD_BATCH_SIZE=0 RUSTFLAGS="-C target-cpu=native --cfg tokio_unstable -C opt-level=3" cargo run --release --bin proof_server -- --mode "single" -a ``` ## Deploy the primary server @@ -48,5 +48,5 @@ Finally, once the primary server is configured in the same fashion, run it: ```bash git clone git@github.com:argumentcomputer/zk-light-clients.git && \ cd zk-light-clients/aptos/proof-server && \ - SHARD_BATCH_SIZE=0 RUSTFLAGS="-C target-cpu=native --cfg tokio_unstable -C opt-level=3" cargo run --release --bin server_primary -- -a --snd-addr + SHARD_BATCH_SIZE=0 RUSTFLAGS="-C target-cpu=native --cfg tokio_unstable -C opt-level=3" cargo run --release --bin proof_server -- --mode "split" -a --snd-addr ``` diff --git a/aptos/proof-server/Cargo.toml b/aptos/proof-server/Cargo.toml index 71dfb78b..b5aba083 100644 --- a/aptos/proof-server/Cargo.toml +++ b/aptos/proof-server/Cargo.toml @@ -12,12 +12,8 @@ name = "client" path = "src/bin/client.rs" [[bin]] -name = "server_primary" -path = "src/bin/server_primary.rs" - -[[bin]] -name = "server_secondary" -path = "src/bin/server_secondary.rs" +name = "proof_server" +path = "src/bin/proof_server.rs" [dependencies] # local @@ -26,6 +22,8 @@ aptos-lc-core = { path = "../core" } # workspace anyhow = { workspace = true } +axum = { workspace = true } +backoff = { workspace = true, features = ["tokio"] } bcs = { workspace = true } chrono = { workspace = true } clap = { workspace = true } diff --git a/aptos/proof-server/benches/proof_server.rs b/aptos/proof-server/benches/proof_server.rs index 8854bdcc..949f4df9 100644 --- a/aptos/proof-server/benches/proof_server.rs +++ b/aptos/proof-server/benches/proof_server.rs @@ -3,9 +3,9 @@ use anyhow::anyhow; use bcs::from_bytes; +use proof_server::error::ClientError; use proof_server::types::aptos::{AccountInclusionProofResponse, EpochChangeProofResponse}; -use proof_server::types::proof_server::{EpochChangeData, InclusionData, Request}; -use proof_server::utils::{read_bytes, write_bytes}; +use proof_server::types::proof_server::{EpochChangeData, InclusionData, ProvingMode, Request}; use serde::Serialize; use sphinx_sdk::artifacts::try_install_plonk_bn254_artifacts; use std::env; @@ -337,9 +337,7 @@ async fn bench_proving_inclusion(final_snark: bool) -> Result Result Result Result Result<()> { let aptos_node_url = Arc::new(aptos_node_url); debug!("Initializing client"); + // Try to connect to proof server. + connect_to_proof_server(&proof_server_address).await?; // Initialize the client. let (client_state, verififer_state) = init(&proof_server_address, &aptos_node_url).await?; debug!("Client initialized successfully"); @@ -228,6 +231,27 @@ async fn main() -> Result<()> { } } +/// This method tries to connect to the proof server and returns an error if it fails. +/// +/// # Errors +/// This method returns an error if the connection to the proof server fails. +async fn connect_to_proof_server(proof_server_address: &str) -> Result<(), ClientError> { + debug!("Connecting to the proof server at {}", proof_server_address); + + let res = backoff::future::retry(ExponentialBackoff::default(), || async { + Ok(TcpStream::connect(proof_server_address).await?) + }) + .await; + + if let Err(err) = res { + return Err(ClientError::Internal { + source: format!("Failed to connect to the proof server: {}", err).into(), + }); + } + + Ok(()) +} + /// Method to initialize the client. It fetches the initial data from the Aptos node and generates /// the initial state for the client and the verifier. While initializing the client, it handles the /// generation of both proof as it would happen in the worst-case scenario. @@ -414,11 +438,7 @@ async fn request_prover( request: &Request, ) -> Result, ClientError> { debug!("Connecting to the proof server at {}", proof_server_address); - let mut stream = TcpStream::connect(&proof_server_address) - .await - .map_err(|err| ClientError::Internal { - source: format!("Error while connecting to proof server: {err}").into(), - })?; + let client = reqwest::Client::new(); debug!("Successfully connected to the proof server"); info!("Sending request to prover: {}", request); @@ -426,16 +446,23 @@ async fn request_prover( let request_bytes = bcs::to_bytes(request).map_err(|err| ClientError::Internal { source: err.into() })?; - write_bytes(&mut stream, &request_bytes) + let response = client + .post(proof_server_address) + .header("Accept", "application/octet-stream") + .body(request_bytes) + .send() .await .map_err(|err| ClientError::Request { - endpoint: "prover".into(), + endpoint: proof_server_address.into(), source: err.into(), })?; - read_bytes(&mut stream) + let response_bytes = response + .bytes() .await - .map_err(|err| ClientError::Internal { source: err.into() }) + .map_err(|err| ClientError::Internal { source: err.into() })?; + + Ok(response_bytes.to_vec()) } /// This method verifies the validator verifier predicate, ie: that the validator committee that @@ -510,10 +537,17 @@ async fn epoch_change_proving_task( // Request a proof generation for the latest epoch change. debug!("Sending epoch change proof request to the prover"); - let request = Request::ProveEpochChange(epoch_change_proof_data.clone().into()); + let request = Request::ProveEpochChange(Box::new(( + get_proving_mode(), + epoch_change_proof_data.clone().into(), + ))); let epoch_change_proof: SphinxProofWithPublicValues = bcs::from_bytes( - &request_prover(&proof_server_address, &request).await?, + &request_prover( + &format!("http://{}/epoch/proof", proof_server_address), + &request, + ) + .await?, ) .map_err(|err| ClientError::ResponsePayload { endpoint: format!("{}", &request), @@ -566,13 +600,16 @@ async fn epoch_change_verifying_task( info!("Starting epoch change verification task"); // Verifying the received epoch change proof and the validator verifier hash. let request = Request::VerifyEpochChange(epoch_change_proof.clone()); - let epoch_change_proof_verified = *request_prover(&proof_server_address, &request) - .await? - .first() - .ok_or_else(|| ClientError::ResponsePayload { - endpoint: format!("{}", &request), - source: "No response from prover".into(), - })?; + let epoch_change_proof_verified = *request_prover( + &format!("http://{}/epoch/verify", proof_server_address), + &request, + ) + .await? + .first() + .ok_or_else(|| ClientError::ResponsePayload { + endpoint: format!("{}", &request), + source: "No response from prover".into(), + })?; if epoch_change_proof_verified != 1 { return Err(ClientError::Verification(String::from( @@ -614,9 +651,14 @@ async fn inclusion_proving_task( let inclusion_proof_data = fetch_inclusion_proof_data(&aptos_node_url).await?; debug!("Sending account inclusion proof request to the prover"); - let request = Request::ProveInclusion(inclusion_proof_data.into()); + let request = + Request::ProveInclusion(Box::new((get_proving_mode(), inclusion_proof_data.into()))); let account_inclusion_proof: SphinxProofWithPublicValues = bcs::from_bytes( - &request_prover(&proof_server_address, &request).await?, + &request_prover( + &format!("http://{}/inclusion/proof", proof_server_address), + &request, + ) + .await?, ) .map_err(|err| ClientError::ResponsePayload { endpoint: format!("{}", &request), @@ -647,13 +689,16 @@ async fn inclusion_verifying_task( info!("Verifying account inclusion proof"); // Verifying the received account inclusion proof and the validator verifier hash. let request = Request::VerifyInclusion(account_inclusion_proof.clone()); - let inclusion_proof_verified = *request_prover(&proof_server_address, &request) - .await? - .first() - .ok_or_else(|| ClientError::ResponsePayload { - endpoint: format!("{}", &request), - source: "No response from prover".into(), - })?; + let inclusion_proof_verified = *request_prover( + &format!("http://{}/inclusion/verify", proof_server_address), + &request, + ) + .await? + .first() + .ok_or_else(|| ClientError::ResponsePayload { + endpoint: format!("{}", &request), + source: "No response from prover".into(), + })?; if inclusion_proof_verified != 1 { return Err(ClientError::Verification(String::from( @@ -759,3 +804,9 @@ async fn verifier_task( } } } + +fn get_proving_mode() -> ProvingMode { + // Get proving mode for the light client. + let mode_str: String = env::var("MODE").unwrap_or_else(|_| "STARK".into()); + ProvingMode::try_from(mode_str.as_str()).expect("MODE should be STARK or SNARK") +} diff --git a/aptos/proof-server/src/bin/proof_server.rs b/aptos/proof-server/src/bin/proof_server.rs new file mode 100644 index 00000000..67898a77 --- /dev/null +++ b/aptos/proof-server/src/bin/proof_server.rs @@ -0,0 +1,436 @@ +// Copyright (c) Argument Computer Corporation +// SPDX-License-Identifier: Apache-2.0 + +//! # Server secondary +//! +//! Server capable of handling proof generation and verification regarding epoch changes. Such +//! requests are expected to come from the primary server. +//! +//! ## Usage +//! +//! For a detailed usage guide, please refer to the dedicated README in `aptos/docs/src/run/setup_proof_server.md`. + +use anyhow::{Error, Result}; +use aptos_lc::{epoch_change, inclusion}; +use axum::body::Body; +use axum::extract::State; +use axum::http::header::CONTENT_TYPE; +use axum::http::{Response, StatusCode}; +use axum::middleware::Next; +use axum::response::IntoResponse; +use axum::routing::{get, post}; +use axum::Router; +use clap::{Parser, ValueEnum}; +use log::{error, info}; +use proof_server::types::proof_server::{EpochChangeData, ProvingMode}; +use proof_server::types::proof_server::{InclusionData, Request}; +use sphinx_sdk::{ProverClient, SphinxProvingKey, SphinxVerifyingKey}; +use std::cmp::PartialEq; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use tokio::{net::TcpListener, task::spawn_blocking}; + +#[derive(ValueEnum, Clone, Debug, Eq, PartialEq)] +enum Mode { + Single, + Split, +} + +/// Server capable of handling proof generation and verification regarding epoch +/// changes. Such requests are expected to come from the primary server. +/// +/// Making requests to this server and handling responses from it follows the +/// same logic from the primary server: +/// +/// * Request data must be preceded by its size in bytes +/// * Proof responses will follow the same logic +/// * Verification responses will follow the same logic. +/// +/// The request bytes must be deserializable into `proof_server::SecondaryRequest` +/// by the `bcs` crate, so it's recommended to simply use that (pub) type when +/// producing request data. +#[derive(Parser)] +struct Cli { + /// Address of this server. E.g. 127.0.0.1:4321 + #[arg(short, long)] + addr: String, + + /// Address of the secondary server. E.g. 127.0.0.1:4321 + #[arg(short, long)] + snd_addr: Option, + + /// Mode of operation: either 'single' or 'split' + #[arg(short, long)] + mode: Mode, +} + +#[derive(Clone)] +struct ServerState { + prover_client: Arc, + inclusion_pk: Arc, + inclusion_vk: Arc, + epoch_pk: Arc, + epoch_vk: Arc, + snd_addr: Arc>, + mode: Mode, + active_requests: Arc, +} + +#[tokio::main] +async fn main() -> Result<()> { + let Cli { + addr, + snd_addr, + mode, + } = Cli::parse(); + + if mode == Mode::Split && snd_addr.is_none() { + return Err(Error::msg( + "Secondary server address is required in split mode", + )); + } + + env_logger::init(); + + let prover_client = Arc::new(ProverClient::default()); + let (inclusion_pk, inclusion_vk) = inclusion::generate_keys(&prover_client); + let (epoch_pk, epoch_vk) = epoch_change::generate_keys(&prover_client); + + let state = ServerState { + prover_client, + inclusion_pk: Arc::new(inclusion_pk), + inclusion_vk: Arc::new(inclusion_vk), + epoch_pk: Arc::new(epoch_pk), + epoch_vk: Arc::new(epoch_vk), + snd_addr: Arc::new(snd_addr), + mode, + active_requests: Arc::new(AtomicUsize::new(0)), + }; + + let app = Router::new() + .route("/health", get(health_check)) + .route("/inclusion/proof", post(inclusion_proof)) + .route("/epoch/proof", post(epoch_proof)) + .route("/epoch/verify", post(epoch_verify)) + .route("/inclusion/verify", post(inclusion_verify)) + .layer(axum::middleware::from_fn_with_state( + state.clone(), + count_requests_middleware, + )) + .with_state(state); + + info!("Server running on {}", addr); + + let listener = TcpListener::bind(addr).await?; + + axum::serve(listener, app).await?; + + Ok(()) +} + +async fn inclusion_proof( + State(state): State, + request: axum::extract::Request, +) -> Result { + let bytes = axum::body::to_bytes(request.into_body(), usize::MAX) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + let res = bcs::from_bytes::(&bytes); + + if let Err(err) = res { + error!("Failed to deserialize request object: {err}"); + return Err(StatusCode::BAD_REQUEST); + } + + let request = res.unwrap(); + + let Request::ProveInclusion(boxed) = request else { + error!("Invalid request type"); + return Err(StatusCode::BAD_REQUEST); + }; + let res = { + info!("Start proving"); + + let (proof_type, inclusion_data) = boxed.as_ref(); + let InclusionData { + sparse_merkle_proof_assets, + transaction_proof_assets, + validator_verifier_assets, + } = inclusion_data; + let stdin = inclusion::generate_stdin( + sparse_merkle_proof_assets, + transaction_proof_assets, + validator_verifier_assets, + ); + + let prover_client = state.prover_client.clone(); + let pk = state.inclusion_pk.clone(); + + let proof_handle = if proof_type == &ProvingMode::SNARK { + spawn_blocking(move || prover_client.prove(&pk, stdin).plonk().run()) + } else { + spawn_blocking(move || prover_client.prove(&pk, stdin).run()) + }; + + let proof = proof_handle + .await + .map_err(|_| { + error!("Failed to handle generate inclusion proof task"); + StatusCode::INTERNAL_SERVER_ERROR + })? + .map_err(|err| { + error!("Failed to generate inclusion proof: {err}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + info!("Proof generated. Serializing"); + bcs::to_bytes(&proof).map_err(|err| { + error!("Failed to serialize epoch change proof: {err}"); + StatusCode::INTERNAL_SERVER_ERROR + }) + }?; + + let response = Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, "application/octet-stream") + .body(Body::from(res)) + .map_err(|err| { + error!("Could not construct response for client: {err}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + Ok(response) +} + +async fn inclusion_verify( + State(state): State, + request: axum::extract::Request, +) -> Result { + let bytes = axum::body::to_bytes(request.into_body(), usize::MAX) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + let res = bcs::from_bytes::(&bytes); + + if let Err(err) = res { + error!("Failed to deserialize request object: {err}"); + return Err(StatusCode::BAD_REQUEST); + } + + let request = res.unwrap(); + + let Request::VerifyInclusion(proof) = request else { + error!("Invalid request type"); + return Err(StatusCode::BAD_REQUEST); + }; + let res = { + info!("Start verifying inclusion proof"); + + let is_valid = state + .prover_client + .verify(&proof, &state.inclusion_vk) + .is_ok(); + + info!("Inclusion verification result: {}", is_valid); + + bcs::to_bytes(&is_valid).map_err(|_| { + error!("Failed to serialize inclusion verification result"); + StatusCode::INTERNAL_SERVER_ERROR + }) + }?; + + let response = Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, "application/octet-stream") + .body(Body::from(res)) + .map_err(|err| { + error!("Could not construct response for client: {err}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + Ok(response) +} + +async fn epoch_proof( + State(state): State, + request: axum::extract::Request, +) -> Result { + let bytes = axum::body::to_bytes(request.into_body(), usize::MAX) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + let res = bcs::from_bytes::(&bytes); + + if let Err(err) = res { + error!("Failed to deserialize request object: {err}"); + return Err(StatusCode::BAD_REQUEST); + } + + let request = res.unwrap(); + + let Request::ProveEpochChange(boxed) = request else { + error!("Invalid request type"); + return Err(StatusCode::BAD_REQUEST); + }; + let res = { + match state.mode { + Mode::Single => { + let (proof_type, epoch_change_data) = boxed.as_ref(); + + let EpochChangeData { + trusted_state, + epoch_change_proof, + } = epoch_change_data; + + let stdin = epoch_change::generate_stdin(trusted_state, epoch_change_proof); + info!("Start proving epoch change"); + + let prover_client = state.prover_client.clone(); + let pk = state.epoch_pk.clone(); + + let proof_handle = if proof_type == &ProvingMode::SNARK { + spawn_blocking(move || prover_client.prove(&pk, stdin).plonk().run()) + } else { + spawn_blocking(move || prover_client.prove(&pk, stdin).run()) + }; + let proof = proof_handle + .await + .map_err(|_| { + error!("Failed to handle generate epoch change proof task"); + StatusCode::INTERNAL_SERVER_ERROR + })? + .map_err(|err| { + error!("Failed to generate epoch change proof: {err}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + info!("Epoch change proof generated. Serializing"); + bcs::to_bytes(&proof).map_err(|err| { + error!("Failed to serialize epoch change proof: {err}"); + StatusCode::INTERNAL_SERVER_ERROR + }) + } + Mode::Split => { + let snd_addr = state.snd_addr.as_ref().clone().unwrap(); + forward_request(bytes.to_vec(), &snd_addr).await + } + } + }?; + + let response = Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, "application/octet-stream") + .body(Body::from(res)) + .map_err(|err| { + error!("Could not construct response for client: {err}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + Ok(response) +} + +async fn epoch_verify( + State(state): State, + request: axum::extract::Request, +) -> Result { + info!("Start verifying epoch change proof"); + + let bytes = axum::body::to_bytes(request.into_body(), usize::MAX) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + let res = bcs::from_bytes::(&bytes); + + if let Err(err) = res { + error!("Failed to deserialize request object: {err}"); + return Err(StatusCode::BAD_REQUEST); + } + + let request = res.unwrap(); + + let Request::VerifyEpochChange(proof) = request else { + error!("Invalid request type"); + return Err(StatusCode::BAD_REQUEST); + }; + let res = { + let is_valid = state.prover_client.verify(&proof, &state.epoch_vk).is_ok(); + + info!("Epoch change verification result: {}", is_valid); + + bcs::to_bytes(&is_valid).map_err(|_| { + error!("Failed to serialize epoch change verification result"); + StatusCode::INTERNAL_SERVER_ERROR + }) + }?; + + let response = Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, "application/octet-stream") + .body(Body::from(res)) + .map_err(|err| { + error!("Could not construct response for client: {err}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + Ok(response) +} + +async fn forward_request( + secondary_request_bytes: Vec, + snd_addr: &str, +) -> Result, StatusCode> { + info!("Connecting to the secondary server"); + let client = reqwest::Client::new(); + info!("Sending secondary request"); + let res_bytes = client + .post(format!("http://{}/proof", snd_addr)) + .body(secondary_request_bytes) + .send() + .await + .map_err(|err| { + error!("Failed to send request to secondary server: {err}"); + StatusCode::INTERNAL_SERVER_ERROR + })? + .bytes() + .await + .map_err(|err| { + error!("Failed to receive response from secondary server: {err}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + info!("Response received. Sending it to the client"); + + Ok(res_bytes.to_vec()) +} + +async fn health_check(State(state): State) -> impl IntoResponse { + let active_requests = state.active_requests.load(Ordering::SeqCst); + if active_requests > 0 { + StatusCode::CONFLICT + } else { + StatusCode::OK + } +} + +async fn count_requests_middleware( + State(state): State, + req: axum::http::Request, + next: Next, +) -> Result { + let is_health = req.uri().path() != "/health"; + // Check if the request is for the health endpoint. + if is_health { + // Increment the active requests counter. + state.active_requests.fetch_add(1, Ordering::SeqCst); + } + + // Proceed with the request. + let response = next.run(req).await; + + // Decrement the active requests counter if not a health check. + if is_health { + state.active_requests.fetch_sub(1, Ordering::SeqCst); + } + + Ok(response) +} diff --git a/aptos/proof-server/src/bin/server_primary.rs b/aptos/proof-server/src/bin/server_primary.rs deleted file mode 100644 index 24e68859..00000000 --- a/aptos/proof-server/src/bin/server_primary.rs +++ /dev/null @@ -1,216 +0,0 @@ -// Copyright (c) Argument Computer Corporation -// SPDX-License-Identifier: Apache-2.0 - -//! # Primary server -//! -//! The primary server is responsible for handling requests regarding inclusion proofs. It is -//! capable of generating and verifying such proofs. Epoch change requests are offloaded to a -//! secondary server to ease the computational load necessary to handle inclusion proof requests. -//! -//! ## Usage -//! -//! For a detailed usage guide, please refer to the dedicated README in `aptos/docs/src/run/setup_proof_server.md`. - -use anyhow::{Error, Result}; -use aptos_lc::inclusion; -use clap::Parser; -use log::{error, info}; -use sphinx_sdk::ProverClient; -use std::sync::Arc; -use tokio::{ - net::{TcpListener, TcpStream}, - task::spawn_blocking, -}; - -use proof_server::{ - types::proof_server::{InclusionData, Request, SecondaryRequest}, - utils::{read_bytes, write_bytes}, -}; - -/// Server responsible from handling requests for proof generation and verification -/// of inclusion and epoch changes. -/// -/// Requests regarding epoch changes are offloaded to a secondary server in order -/// to ease the computation load necessary to handle requests regarding inclusion -/// proofs. -/// -/// 1. Making requests to this server -/// -/// From the client's perspective, before writing the request bytes on the stream, -/// the size of the request must be written as a big-endian 32 bits unsigned integer -/// so the server knows the size of the buffer it needs to allocate and how many -/// bytes is should read next. -/// -/// The request bytes must be deserializable into `proof_server::Request` by the -/// `bcs` crate, so it's recommended to simply use that (pub) type when producing -/// request data. -/// -/// Since request sizes must be expressible in 32 bits, the actual request payload -/// is bound by 4 GB, which should be way more than enough for the use case at hand. -/// -/// 2. Handling responses from this server -/// -/// Proofs are provided following the same logic as above: their sizes (in number -/// of bytes) must be read before reading the proof bytes themselves. -/// -/// Verification responses are the same, with their payload only being a boolean. -#[derive(Parser)] -struct Cli { - /// Address of this server. E.g. 127.0.0.1:1234 - #[arg(short, long)] - addr: String, - - /// Address of the secondary server. E.g. 127.0.0.1:4321 - #[arg(long)] - snd_addr: String, -} - -#[tokio::main] -async fn main() -> Result<()> { - let Cli { addr, snd_addr } = Cli::parse(); - - env_logger::init(); - - let listener = TcpListener::bind(addr).await?; - info!("Server is running on {}", listener.local_addr()?); - - let snd_addr = Arc::new(snd_addr); - let prover_client = Arc::new(ProverClient::default()); - let (pk, vk) = inclusion::generate_keys(&prover_client); - let (pk, vk) = (Arc::new(pk), Arc::new(vk)); - - loop { - let (mut client_stream, _) = listener.accept().await?; - info!("Received a connection"); - - // cheap `Arc` clones - let snd_addr = snd_addr.clone(); - let prover_client = prover_client.clone(); - let pk = pk.clone(); - let vk = vk.clone(); - - tokio::spawn(async move { - info!("Awaiting request"); - let request_bytes = read_bytes(&mut client_stream).await?; - info!("Request received"); - - info!("Deserializing request"); - match bcs::from_bytes::(&request_bytes) { - Ok(Request::ProveInclusion(inclusion_data)) => { - let InclusionData { - sparse_merkle_proof_assets, - transaction_proof_assets, - validator_verifier_assets, - } = inclusion_data; - let stdin = inclusion::generate_stdin( - &sparse_merkle_proof_assets, - &transaction_proof_assets, - &validator_verifier_assets, - ); - info!("Start proving"); - let proof_handle = - spawn_blocking(move || prover_client.prove(&pk, stdin).run()); - let proof = proof_handle.await??; - info!("Proof generated. Serializing"); - let proof_bytes = bcs::to_bytes(&proof)?; - info!("Sending proof"); - write_bytes(&mut client_stream, &proof_bytes).await?; - info!("Proof sent"); - } - Ok(Request::VerifyInclusion(proof)) => { - write_bytes( - &mut client_stream, - &bcs::to_bytes(&prover_client.verify(&proof, &vk).is_ok())?, - ) - .await?; - } - Ok(Request::ProveEpochChange(epoch_change_data)) => { - info!("Connecting to the secondary server"); - let mut secondary_stream = TcpStream::connect(&*snd_addr).await?; - let secondary_request = SecondaryRequest::Prove(epoch_change_data); - info!("Serializing secondary request"); - let secondary_request_bytes = bcs::to_bytes(&secondary_request)?; - info!("Sending secondary request"); - write_bytes(&mut secondary_stream, &secondary_request_bytes).await?; - info!("Awaiting proof"); - let proof_bytes = read_bytes(&mut secondary_stream).await?; - info!("Proof received. Sending it to the primary server"); - write_bytes(&mut client_stream, &proof_bytes).await?; - info!("Proof sent"); - } - Ok(Request::VerifyEpochChange(proof)) => { - info!("Connecting to the secondary server"); - let mut secondary_stream = TcpStream::connect(&*snd_addr).await?; - let secondary_request = SecondaryRequest::Verify(proof); - info!("Serializing secondary request"); - let secondary_request_bytes = bcs::to_bytes(&secondary_request)?; - info!("Sending secondary request"); - write_bytes(&mut secondary_stream, &secondary_request_bytes).await?; - info!("Awaiting verification"); - let verified = read_bytes(&mut secondary_stream).await?; - info!("Verification finished. Sending result"); - write_bytes(&mut client_stream, &verified).await?; - info!("Verification result sent"); - } - Ok(Request::SnarkProveInclusion(inclusion_data)) => { - let InclusionData { - sparse_merkle_proof_assets, - transaction_proof_assets, - validator_verifier_assets, - } = inclusion_data; - let stdin = inclusion::generate_stdin( - &sparse_merkle_proof_assets, - &transaction_proof_assets, - &validator_verifier_assets, - ); - info!("Start proving"); - let proof_handle = - spawn_blocking(move || prover_client.prove(&pk, stdin).plonk().run()); - let proof = proof_handle.await??; - info!("Proof generated. Serializing"); - let proof_bytes = bcs::to_bytes(&proof)?; - info!("Sending proof"); - write_bytes(&mut client_stream, &proof_bytes).await?; - info!("Proof sent"); - } - Ok(Request::SnarkVerifyInclusion(proof)) => { - write_bytes( - &mut client_stream, - &bcs::to_bytes(&prover_client.verify(&proof, &vk).is_ok())?, - ) - .await?; - } - Ok(Request::SnarkProveEpochChange(epoch_change_data)) => { - info!("Connecting to the secondary server"); - let mut secondary_stream = TcpStream::connect(&*snd_addr).await?; - let secondary_request = SecondaryRequest::SnarkProve(epoch_change_data); - info!("Serializing secondary request"); - let secondary_request_bytes = bcs::to_bytes(&secondary_request)?; - info!("Sending secondary request"); - write_bytes(&mut secondary_stream, &secondary_request_bytes).await?; - info!("Awaiting proof"); - let proof_bytes = read_bytes(&mut secondary_stream).await?; - info!("Proof received. Sending it to the primary server"); - write_bytes(&mut client_stream, &proof_bytes).await?; - info!("Proof sent"); - } - Ok(Request::SnarkVerifyEpochChange(proof)) => { - info!("Connecting to the secondary server"); - let mut secondary_stream = TcpStream::connect(&*snd_addr).await?; - let secondary_request = SecondaryRequest::SnarkVerify(proof); - info!("Serializing secondary request"); - let secondary_request_bytes = bcs::to_bytes(&secondary_request)?; - info!("Sending secondary request"); - write_bytes(&mut secondary_stream, &secondary_request_bytes).await?; - info!("Awaiting verification"); - let verified = read_bytes(&mut secondary_stream).await?; - info!("Verification finished. Sending result"); - write_bytes(&mut client_stream, &verified).await?; - info!("Verification result sent"); - } - Err(e) => error!("Failed to deserialize request object: {e}"), - } - Ok::<(), Error>(()) - }); - } -} diff --git a/aptos/proof-server/src/bin/server_secondary.rs b/aptos/proof-server/src/bin/server_secondary.rs deleted file mode 100644 index 84b4399e..00000000 --- a/aptos/proof-server/src/bin/server_secondary.rs +++ /dev/null @@ -1,124 +0,0 @@ -// Copyright (c) Argument Computer Corporation -// SPDX-License-Identifier: Apache-2.0 - -//! # Server secondary -//! -//! Server capable of handling proof generation and verification regarding epoch changes. Such -//! requests are expected to come from the primary server. -//! -//! ## Usage -//! -//! For a detailed usage guide, please refer to the dedicated README in `aptos/docs/src/run/setup_proof_server.md`. - -use anyhow::{Error, Result}; -use aptos_lc::epoch_change; -use clap::Parser; -use log::info; -use sphinx_sdk::ProverClient; -use std::sync::Arc; -use tokio::{net::TcpListener, task::spawn_blocking}; - -use proof_server::{ - types::proof_server::{EpochChangeData, SecondaryRequest}, - utils::{read_bytes, write_bytes}, -}; - -/// Server capable of handling proof generation and verification regarding epoch -/// changes. Such requests are expected to come from the primary server. -/// -/// Making requests to this server and handling responses from it follows the -/// same logic from the primary server: -/// -/// * Request data must be preceded by its size in bytes -/// * Proof responses will follow the same logic -/// * Verification responses will follow the same logic. -/// -/// The request bytes must be deserializable into `proof_server::SecondaryRequest` -/// by the `bcs` crate, so it's recommended to simply use that (pub) type when -/// producing request data. -#[derive(Parser)] -struct Cli { - /// Address of this server. E.g. 127.0.0.1:4321 - #[arg(short, long)] - addr: String, -} - -#[tokio::main] -async fn main() -> Result<()> { - let Cli { addr } = Cli::parse(); - - env_logger::init(); - - let listener = TcpListener::bind(addr).await?; - - info!("Server is running on {}", listener.local_addr()?); - - let prover_client = Arc::new(ProverClient::default()); - let (pk, vk) = epoch_change::generate_keys(&prover_client); - let (pk, vk) = (Arc::new(pk), Arc::new(vk)); - - loop { - let (mut primary_stream, _) = listener.accept().await?; - info!("Received a connection"); - - // cheap `Arc` clones - let prover_client = prover_client.clone(); - let pk = pk.clone(); - let vk = vk.clone(); - - tokio::spawn(async move { - info!("Awaiting request data"); - let request_bytes = read_bytes(&mut primary_stream).await?; - info!("Request data received"); - - info!("Deserializing request data"); - match bcs::from_bytes::(&request_bytes)? { - SecondaryRequest::Prove(EpochChangeData { - trusted_state, - epoch_change_proof, - }) => { - let stdin = epoch_change::generate_stdin(&trusted_state, &epoch_change_proof); - info!("Start proving"); - let proof_handle = - spawn_blocking(move || prover_client.prove(&pk, stdin).run()); - let proof = proof_handle.await??; - info!("Proof generated. Serializing"); - let proof_bytes = bcs::to_bytes(&proof)?; - info!("Sending proof to the primary server"); - write_bytes(&mut primary_stream, &proof_bytes).await?; - info!("Proof sent"); - } - SecondaryRequest::Verify(proof) => { - write_bytes( - &mut primary_stream, - &bcs::to_bytes(&prover_client.verify(&proof, &vk).is_ok())?, - ) - .await?; - } - SecondaryRequest::SnarkProve(EpochChangeData { - trusted_state, - epoch_change_proof, - }) => { - let stdin = epoch_change::generate_stdin(&trusted_state, &epoch_change_proof); - info!("Start proving"); - let proof_handle = - spawn_blocking(move || prover_client.prove(&pk, stdin).plonk().run()); - let proof = proof_handle.await??; - info!("Proof generated. Serializing"); - let proof_bytes = bcs::to_bytes(&proof)?; - info!("Sending proof to the primary server"); - write_bytes(&mut primary_stream, &proof_bytes).await?; - info!("Proof sent"); - } - SecondaryRequest::SnarkVerify(proof) => { - write_bytes( - &mut primary_stream, - &bcs::to_bytes(&prover_client.verify(&proof, &vk).is_ok())?, - ) - .await?; - } - } - Ok::<(), Error>(()) - }); - } -} diff --git a/aptos/proof-server/src/lib.rs b/aptos/proof-server/src/lib.rs index d77d47ab..5b5c6909 100644 --- a/aptos/proof-server/src/lib.rs +++ b/aptos/proof-server/src/lib.rs @@ -14,7 +14,7 @@ //! Aptos Public Full Node and the proof server. //! - [primary server](./bin/server_primary.rs): The main entrypoint for our proof server, in charge //! of load balancing the incoming requests and handling proofs about account inclusion. -//! - [secondary server](./bin/server_secondary.rs): A secondary server that is in charge of handling +//! - [secondary server](./bin/server): A secondary server that is in charge of handling //! requests about epoch changes. /// Module containing the errors that can be thrown while using the client and the proof server. diff --git a/aptos/proof-server/src/types/proof_server.rs b/aptos/proof-server/src/types/proof_server.rs index dc0a735e..0cf7c345 100644 --- a/aptos/proof-server/src/types/proof_server.rs +++ b/aptos/proof-server/src/types/proof_server.rs @@ -1,6 +1,7 @@ // Copyright (c) Argument Computer Corporation // SPDX-License-Identifier: Apache-2.0 +use anyhow::{anyhow, Result}; use aptos_lc::inclusion::{ SparseMerkleProofAssets, TransactionProofAssets, ValidatorVerifierAssets, }; @@ -8,6 +9,73 @@ use serde::{Deserialize, Serialize}; use sphinx_sdk::SphinxProofWithPublicValues; use std::fmt::Display; +/// The proving mode for the prover. +#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)] +pub enum ProvingMode { + STARK, + SNARK, +} + +impl ProvingMode { + /// Returns a boolean indicating if the proving mode is STARK. + /// + /// # Returns + /// + /// A boolean indicating if the proving mode is STARK. + pub const fn is_stark(&self) -> bool { + matches!(self, ProvingMode::STARK) + } + + /// Returns a serialized representation of the enum. + /// + /// # Returns + /// + /// A u8 representing the enum. + pub const fn to_bytes(&self) -> u8 { + match self { + ProvingMode::STARK => 0, + ProvingMode::SNARK => 1, + } + } + + /// Returns a ProvingMode from a serialized representation. + /// + /// # Arguments + /// + /// * `bytes` - The serialized representation of the enum. + /// + /// # Returns + /// + /// The ProvingMode. + pub fn from_bytes(bytes: &[u8]) -> Result { + match bytes[0] { + 0 => Ok(ProvingMode::STARK), + 1 => Ok(ProvingMode::SNARK), + _ => Err(anyhow!("Invalid proving mode")), + } + } +} +impl From for String { + fn from(mode: ProvingMode) -> String { + match mode { + ProvingMode::STARK => "STARK".to_string(), + ProvingMode::SNARK => "SNARK".to_string(), + } + } +} + +impl TryFrom<&str> for ProvingMode { + type Error = anyhow::Error; + + fn try_from(value: &str) -> std::result::Result { + match value { + "STARK" => Ok(ProvingMode::STARK), + "SNARK" => Ok(ProvingMode::SNARK), + _ => Err(anyhow!("Invalid proving mode")), + } + } +} + /// Data structure used as a payload to request an epoch change proof generation from the proof /// server. #[derive(Serialize, Deserialize)] @@ -29,14 +97,10 @@ pub struct InclusionData { /// one using the [`SphinxProof`] type and another using the [`SphinxGroth16Proof`] type. #[derive(Serialize, Deserialize)] pub enum Request { - ProveInclusion(InclusionData), - ProveEpochChange(EpochChangeData), + ProveInclusion(Box<(ProvingMode, InclusionData)>), + ProveEpochChange(Box<(ProvingMode, EpochChangeData)>), VerifyInclusion(SphinxProofWithPublicValues), VerifyEpochChange(SphinxProofWithPublicValues), - SnarkProveInclusion(InclusionData), - SnarkProveEpochChange(EpochChangeData), - SnarkVerifyInclusion(SphinxProofWithPublicValues), - SnarkVerifyEpochChange(SphinxProofWithPublicValues), } impl Display for &Request { @@ -46,20 +110,6 @@ impl Display for &Request { Request::ProveEpochChange(_) => write!(f, "ProveEpochChange"), Request::VerifyInclusion(_) => write!(f, "VerifyInclusion"), Request::VerifyEpochChange(_) => write!(f, "VerifyEpochChange"), - Request::SnarkProveInclusion(_) => write!(f, "SnarkProveInclusion"), - Request::SnarkProveEpochChange(_) => write!(f, "SnarkProveEpochChange"), - Request::SnarkVerifyInclusion(_) => write!(f, "SnarkVerifyInclusion"), - Request::SnarkVerifyEpochChange(_) => write!(f, "SnarkVerifyEpochChange"), } } } - -/// Secondary request type for the proof server. It is used to convey request from the primary -/// server to the secondary one. -#[derive(Serialize, Deserialize)] -pub enum SecondaryRequest { - Prove(EpochChangeData), - Verify(SphinxProofWithPublicValues), - SnarkProve(EpochChangeData), - SnarkVerify(SphinxProofWithPublicValues), -} diff --git a/aptos/proof-server/src/utils.rs b/aptos/proof-server/src/utils.rs index bc6a82f8..bef2fb64 100644 --- a/aptos/proof-server/src/utils.rs +++ b/aptos/proof-server/src/utils.rs @@ -2,36 +2,8 @@ // SPDX-License-Identifier: Apache-2.0 use anyhow::Result; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::net::TcpStream; use url::Url; -/// Auxiliary function to write bytes on a stream. Before actually writing the -/// bytes, it writes the number of bytes to be written as a big-endian `u32`. -/// -/// # Errors -/// This function errors if the number of bytes can't fit in a `u32` -pub async fn write_bytes(stream: &mut TcpStream, bytes: &[u8]) -> Result<()> { - stream.write_u32(u32::try_from(bytes.len())?).await?; - stream.write_all(bytes).await?; - stream.flush().await?; - Ok(()) -} - -/// Auxiliary function to read bytes on a stream. Before actually reading the -/// bytes, it reads the number of bytes to be read as a big-endian `u32`. -/// -/// # Important -/// The number of bytes read must fit in a `u32` thus the amount of data read in -/// a single call to this function is 4 GB. -pub async fn read_bytes(stream: &mut TcpStream) -> Result> { - let size = stream.read_u32().await?; - let mut bytes = vec![0; size as usize]; - let num_read = stream.read_exact(&mut bytes).await?; - assert_eq!(num_read, bytes.len()); - Ok(bytes) -} - /// Validates and formats a URL. If the URL is relative, it will be prefixed /// with `http://`. pub fn validate_and_format_url(url: &str) -> Result { diff --git a/docker/.dockerignore b/docker/.dockerignore new file mode 100644 index 00000000..03e1d325 --- /dev/null +++ b/docker/.dockerignore @@ -0,0 +1,2 @@ +**/target/ +**/Cargo.lock \ No newline at end of file diff --git a/docker/Dockerfile b/docker/Dockerfile new file mode 100644 index 00000000..f2504f66 --- /dev/null +++ b/docker/Dockerfile @@ -0,0 +1,78 @@ +# Use an existing docker image as a base +FROM rust:latest AS builder + +ARG LIGHT_CLIENT=aptos + +# Install git +RUN apt-get update && apt-get install -y git build-essential pkg-config libssl-dev libudev-dev cmake clang + +# Configure git to use the GitHub token for authentication +RUN --mount=type=secret,id=github_token \ + git config --global url."https://$(cat /run/secrets/github_token)@github.com/".insteadOf ssh://git@github.com && \ + git config --global url."https://$(cat /run/secrets/github_token)@github.com".insteadOf https://github.com + +# Install rust nightly +RUN rustup install nightly + +# Set context in /tmp +WORKDIR /tmp + +# Download Go +RUN wget https://go.dev/dl/go1.22.4.linux-amd64.tar.gz + +# Install Go +RUN tar -C /usr/local -xzf go1.22.4.linux-amd64.tar.gz + +# Set PATH environment variable +ENV PATH=$PATH:/usr/local/go/bin + +# Set workdir to /app +WORKDIR /app + +# Clone Sphinx +RUN git clone https://github.com/lurk-lab/sphinx.git + +# Set context in sphinx/cli +WORKDIR /app/sphinx/cli + +# Install wp1 & succint toolchain +RUN cargo install --locked --force --path . +RUN cargo prove install-toolchain + +# Set context in /app/aptos +WORKDIR /app/$LIGHT_CLIENT + +# Copy light client folder +COPY ./$LIGHT_CLIENT . + +# Determine the package name based on the value of LIGHT_CLIENT +RUN if [ "$LIGHT_CLIENT" = "aptos" ]; then \ + PACKAGE_NAME="proof-server"; \ + else \ + PACKAGE_NAME="ethereum-lc"; \ + fi && \ + RUSTFLAGS="--cfg tokio_unstable -C target-cpu=native" cargo +nightly build --release --bins -p $PACKAGE_NAME + +# Start a new stage from scratch +FROM debian:stable-slim + +ARG LIGHT_CLIENT=aptos + +# Install SSL libraries +RUN apt update && apt install -y openssl libssl-dev + +# Set workdir to /app +WORKDIR /app + +# Copy only the binaries from the builder stage +COPY --from=builder /app/$LIGHT_CLIENT/target/release/client /app/$LIGHT_CLIENT/target/release/proof_server ./ + +# Create group and user +RUN groupadd execgroup && useradd -m -g execgroup execuser + +# Give ownership of files in /app to execuser +RUN chown -R execuser:execgroup /app + +# Switch to execuser +USER execuser + diff --git a/docker/README.md b/docker/README.md new file mode 100644 index 00000000..2b13b7f3 --- /dev/null +++ b/docker/README.md @@ -0,0 +1,59 @@ +# Light Client container configuration + +To ease the execution and deployment of our Light Client implementation, we provide a set of Docker configuration file that helps to build and run the necessary components. + +> Note: The following indications are meant to run the Light Client +> using [docker compose](https://docs.docker.com/compose/). +> A set of instructions exists for k8s in [its dedicated README](k8s/README.md). + +## Notice + +⚠️ The following commands should be run in the context of the root of the repository. + +## Dockerfile + +The Dockerfile leverages multi-stage builds to create an image that contains all three binaries (`client`, +`server_primary`, and +`server_secondary`). The same image is used across the three containers to have all of our proof servers and the client. + +> Note: For more information about those binaries and how they interact, please refer +> to [the `proof-server` README](../aptos/proof-server/README.md). + +### Setting up the Environment Variables + +The project uses environment variables to configure the proof servers and the client. These variables are defined in a .env file located in the +`docker/compose` directory. An example .env file is provided as +`.example.env`. Some environment variables are common to all Light Clients and others are specific to one implementation. + +**Common variables** + +- `PRIMARY_ADDR`: The address of the primary server (e.g., `0.0.0.0`). +- `PRIMARY_PORT`: The port number for the primary server (e.g., `6379`). +- `SECONDARY_ADDR`: The address of the secondary server (e.g., `0.0.0.0`). +- `SECONDARY_PORT`: The port number for the secondary server (e.g., `6380`). + +**Aptos variables** + +- `APTOS_NODE_URL`: The URL of the Aptos node (e.g., `127.0.0.1:8080`). + +**Ethereum variables** + +- `CHECKPOINT_PROVIDER_ADDRESS`: The address of the checkpoint provider for the Ethereum client. (e.g., + `https://sync-mainnet.beaconcha.in`) +- `BEACON_NODE_ADDRESS`: The address of the Beacon node used to query consensus data (e.g., + `https://www.lightclientdata.org`) +- + +`RPC_PROVIDER_ADDRESS`: The address that the client will use to fetch execution data through RPC. Can be access through various RPC provider such as [Infura](https://docs.infura.io/api/networks/polygon-pos/json-rpc-methods/eth_getproof) +or [Chainstack](https://docs.chainstack.com/reference/getproof). + +### Running the Docker Compose + +Once the Docker image is built and the environment variables are set, you can start the proof servers and the client using Docker Compose with the following command: + +```bash +docker compose -f docker/compose/docker-compose-proof-servers.yml -f docker/compose/docker-compose-.yml build --build-arg LIGHT_CLIENT= && \ + docker compose -f docker/compose/docker-compose-proof-servers.yml -f docker/compose/docker-compose-.yml up +``` + +This command will start the containers as defined in the docker-compose.yml file. The proof servers and the client will start running, and you can interact with them as defined in the project documentation. diff --git a/docker/compose/.example.env b/docker/compose/.example.env new file mode 100644 index 00000000..bfb1b04f --- /dev/null +++ b/docker/compose/.example.env @@ -0,0 +1,5 @@ +PRIMARY_ADDR=0.0.0.0 +PRIMARY_PORT=6379 +SECONDARY_ADDR=0.0.0.0 +SECONDARY_PORT=6380 +APTOS_NODE_URL=127.0.0.1:8080 \ No newline at end of file diff --git a/docker/compose/docker-compose-aptos.yml b/docker/compose/docker-compose-aptos.yml new file mode 100644 index 00000000..e5f274da --- /dev/null +++ b/docker/compose/docker-compose-aptos.yml @@ -0,0 +1,12 @@ +services: + client: + build: + context: ../.. + dockerfile: docker/Dockerfile + command: sh -c "/app/client --proof-server-address server-primary:${PRIMARY_PORT} --aptos-node-url ${APTOS_NODE_URL}" + environment: + - RUST_LOG=debug + restart: always + depends_on: + - server-primary + - server-secondary \ No newline at end of file diff --git a/docker/compose/docker-compose-ethereum.yml b/docker/compose/docker-compose-ethereum.yml new file mode 100644 index 00000000..4134ff05 --- /dev/null +++ b/docker/compose/docker-compose-ethereum.yml @@ -0,0 +1,14 @@ +services: + client: + build: + context: ../.. + dockerfile: docker/Dockerfile + args: + - LIGHT_CLIENT=ethereum + command: sh -c "/app/client -p server-primary:${PRIMARY_PORT} -c ${CHECKPOINT_PROVIDER_ADDRESS} -b ${BEACON_NODE_ADDRESS} -r ${RPC_PROVIDER_ADDRESS}" + environment: + - RUST_LOG=debug + restart: always + depends_on: + - server-primary + - server-secondary \ No newline at end of file diff --git a/docker/compose/docker-compose-proof-servers.yml b/docker/compose/docker-compose-proof-servers.yml new file mode 100644 index 00000000..6de0a7aa --- /dev/null +++ b/docker/compose/docker-compose-proof-servers.yml @@ -0,0 +1,24 @@ +services: + server-primary: + build: + context: ../.. + dockerfile: docker/Dockerfile + command: sh -c "/app/proof_server --mode "split" --addr ${PRIMARY_ADDR}:${PRIMARY_PORT} --snd-addr server-secondary:${SECONDARY_PORT}" + environment: + - RUST_LOG=debug + restart: always + depends_on: + - server-secondary + ports: + - "${PRIMARY_PORT}:${PRIMARY_PORT}" + + server-secondary: + build: + context: ../.. + dockerfile: docker/Dockerfile + command: sh -c "/app/proof_server --mode "single" --addr ${SECONDARY_ADDR}:${SECONDARY_PORT}" + environment: + - RUST_LOG=debug + restart: always + ports: + - "${SECONDARY_PORT}:${SECONDARY_PORT}" \ No newline at end of file diff --git a/docker/k8s/README.md b/docker/k8s/README.md new file mode 100644 index 00000000..a9533b93 --- /dev/null +++ b/docker/k8s/README.md @@ -0,0 +1,50 @@ +# Kubernetes Configuration for Light Client + +This project consists of three main components for the proof server: `Deployment`, `Service`, and +`HorizontalPodAutoscaler`. Each of these components has a corresponding configuration file. The Docker image used +for the proof server is built using the Dockerfile located in the parent directory. + +## Prerequisites + +Before you begin, ensure that you have installed Docker and Minikube. You can find the installation instructions at the +following links: + +- Docker: [https://docs.docker.com/engine/install/](https://docs.docker.com/engine/install/) +- Minikube: [https://minikube.sigs.k8s.io/docs/start/?arch=%2Flinux%2Fx86-64%2Fstable%2Fbinary+download] +(https://minikube.sigs.k8s.io/docs/start/?arch=%2Flinux%2Fx86-64%2Fstable%2Fbinary+download) + +## Running the Project + +Navigate to the `/aptos` directory and follow these steps: + +1. Start Minikube with Docker as the driver: + +```bash +minikube start --driver=docker +``` + +2. Set the Docker context to Minikube: + +```bash +eval $(minikube -p minikube docker-env) +``` + +3. Build the Docker image using the following command: + +```bash +docker build -t argumentcomputer/-light-client:latest -f ./docker/Dockerfile --build-arg LIGHT_CLIENT= . +``` + +4. Apply the Kubernetes configuration files: + +**Aptos** + +```bash +minikube kubectl apply -f k8s/proof-server/proof-server-service.yaml &&\ + minikube kubectl apply -f k8s/proof-server/proof-server-hpa.yaml && \ + minikube kubectl apply -f k8s/aptos-client/client-deployment.yaml + +``` + +After running these commands, the `client` and +`proof-server` should be up and running in your Minikube environment. \ No newline at end of file diff --git a/docker/k8s/proof-server/proof-server-deployment.yaml b/docker/k8s/proof-server/proof-server-deployment.yaml new file mode 100644 index 00000000..7194492d --- /dev/null +++ b/docker/k8s/proof-server/proof-server-deployment.yaml @@ -0,0 +1,42 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: proof-server-deployment +spec: + replicas: 1 # Initial number of replicas, can be adjusted by HPA + selector: + matchLabels: + app: proof-server + template: + metadata: + labels: + app: proof-server + spec: + containers: + - name: proof-server + image: argumentcomputer/aptos-light-client:latest + imagePullPolicy: IfNotPresent + command: [ "sh", "-c", "/app/proof_server --mode \"single\" --addr ${CONTAINER_ADDR}:${CONTAINER_PORT}" ] + env: + - name: RUST_LOG + value: "debug" + - name: CONTAINER_ADDR + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: CONTAINER_PORT + value: "8080" # Internal port for the server + ports: + - containerPort: 8080 # Ensure this matches CONTAINER_PORT + livenessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 10 + periodSeconds: 5 + readinessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 15 + periodSeconds: 10 \ No newline at end of file diff --git a/docker/k8s/proof-server/proof-server-hpa.yaml b/docker/k8s/proof-server/proof-server-hpa.yaml new file mode 100644 index 00000000..76b64ace --- /dev/null +++ b/docker/k8s/proof-server/proof-server-hpa.yaml @@ -0,0 +1,18 @@ +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: proof-server-hpa +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: proof-server-deployment + minReplicas: 2 + maxReplicas: 5 + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: 50 \ No newline at end of file diff --git a/docker/k8s/proof-server/proof-server-service.yaml b/docker/k8s/proof-server/proof-server-service.yaml new file mode 100644 index 00000000..95dc8da6 --- /dev/null +++ b/docker/k8s/proof-server/proof-server-service.yaml @@ -0,0 +1,12 @@ +apiVersion: v1 +kind: Service +metadata: + name: proof-server-service +spec: + selector: + app: proof-server + ports: + - protocol: TCP + port: 80 # External port + targetPort: 8080 # Internal container port + type: ClusterIP \ No newline at end of file diff --git a/ethereum/Cargo.lock b/ethereum/Cargo.lock index 2857f43c..598d5a10 100644 --- a/ethereum/Cargo.lock +++ b/ethereum/Cargo.lock @@ -491,6 +491,20 @@ dependencies = [ "tracing", ] +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "futures-core", + "getrandom", + "instant", + "pin-project-lite", + "rand", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.73" @@ -1419,6 +1433,8 @@ name = "ethereum-lc" version = "1.0.1" dependencies = [ "anyhow", + "axum", + "backoff", "clap", "env_logger", "ethereum-lc-core", diff --git a/ethereum/Cargo.toml b/ethereum/Cargo.toml index 6a239fba..2f9040ea 100644 --- a/ethereum/Cargo.toml +++ b/ethereum/Cargo.toml @@ -10,6 +10,8 @@ repository = "https://github.com/argumentcomputer/zk-light-clients" [workspace.dependencies] anyhow = "1.0.86" +axum = "0.7.5" +backoff = { version = "0.4.0", features = ["tokio"] } clap = "4.5.8" env_logger = "0.11.3" ethereum_ssz = "0.5.4" diff --git a/ethereum/core/src/merkle/utils/mod.rs b/ethereum/core/src/merkle/utils/mod.rs index a66627fd..75160fb5 100644 --- a/ethereum/core/src/merkle/utils/mod.rs +++ b/ethereum/core/src/merkle/utils/mod.rs @@ -1,6 +1,5 @@ // Copyright (c) Argument Computer Corporation // SPDX-License-Identifier: Apache-2.0 - use std::mem::size_of; use crate::crypto::error::CryptoError; diff --git a/ethereum/docs/src/run/setup_proof_server.md b/ethereum/docs/src/run/setup_proof_server.md index 3f5aaa0c..1ab1a75b 100644 --- a/ethereum/docs/src/run/setup_proof_server.md +++ b/ethereum/docs/src/run/setup_proof_server.md @@ -35,8 +35,8 @@ Now that our deployment machine is properly configured, we can run the secondary ```bash git clone git@github.com:argumentcomputer/zk-light-clients.git && \ - cd zk-light-clients/aptos/proof-server && \ - SHARD_SIZE=4194304 SHARD_BATCH_SIZE=0 RUSTFLAGS="-C target-cpu=native -C opt-level=3" cargo run --release --bin server_secondary -- -a + cd zk-light-clients/ethereum/light-client && \ + SHARD_SIZE=4194304 RUSTFLAGS="-C target-cpu=native -C opt-level=3" cargo run --release --bin proof_server -- --mode "single" -a ``` ## Deploy the primary server @@ -46,5 +46,5 @@ Finally, once the primary server is configured in the same fashion, run it: ```bash git clone git@github.com:argumentcomputer/zk-light-clients.git && \ cd zk-light-clients/ethereum/light-client && \ - SHARD_SIZE=4194304 SHARD_BATCH_SIZE=0 RUSTFLAGS="-C target-cpu=native -C opt-level=3" cargo run --release --bin server_primary -- -a --snd-addr + SHARD_SIZE=4194304 RUSTFLAGS="-C target-cpu=native -C opt-level=3" cargo run --release --bin proof_server -- --mode "split" -a --snd-addr ``` diff --git a/ethereum/light-client/Cargo.toml b/ethereum/light-client/Cargo.toml index f0741530..11150804 100644 --- a/ethereum/light-client/Cargo.toml +++ b/ethereum/light-client/Cargo.toml @@ -7,6 +7,8 @@ license = { workspace = true } [dependencies] anyhow = { workspace = true } +axum = { workspace = true } +backoff = { workspace = true, features = ["tokio"] } clap = { workspace = true, features = ["derive"] } env_logger = { workspace = true } ethers-core = { workspace = true } @@ -29,12 +31,8 @@ name = "client" path = "src/bin/client.rs" [[bin]] -name = "server_primary" -path = "src/bin/server_primary.rs" - -[[bin]] -name = "server_secondary" -path = "src/bin/server_secondary.rs" +name = "proof_server" +path = "src/bin/proof_server.rs" [[bench]] name = "committee_change" diff --git a/ethereum/light-client/src/bin/client.rs b/ethereum/light-client/src/bin/client.rs index 96d635d5..847cd87c 100644 --- a/ethereum/light-client/src/bin/client.rs +++ b/ethereum/light-client/src/bin/client.rs @@ -270,6 +270,8 @@ async fn initialize_light_client( &rpc_provider_address, ); + info!("Testing connection to endpoints..."); + info!("Fetching latest state checkpoint and bootstrap data..."); // Fetch latest state checkpoint. diff --git a/ethereum/light-client/src/bin/proof_server.rs b/ethereum/light-client/src/bin/proof_server.rs new file mode 100644 index 00000000..6bf9a35a --- /dev/null +++ b/ethereum/light-client/src/bin/proof_server.rs @@ -0,0 +1,323 @@ +// Copyright (c) Argument Computer Corporation +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::{Error, Result}; +use axum::body::Body; +use axum::http::header::CONTENT_TYPE; +use axum::http::Response; +use axum::middleware::Next; +use axum::{ + extract::State, + http::StatusCode, + response::IntoResponse, + routing::{get, post}, + Router, +}; +use clap::{Parser, ValueEnum}; +use ethereum_lc::proofs::committee_change::CommitteeChangeProver; +use ethereum_lc::proofs::inclusion::StorageInclusionProver; +use ethereum_lc::proofs::Prover; +use ethereum_lc::types::network::Request; +use ethers_core::k256::elliptic_curve::ff::derive::bitvec::macros::internal::funty::Fundamental; +use log::{error, info}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use tokio::net::TcpListener; +use tokio::task::spawn_blocking; + +#[derive(ValueEnum, Clone, Debug, Eq, PartialEq)] +enum Mode { + Single, + Split, +} + +#[derive(Parser)] +struct Cli { + /// Address of this server. E.g. 127.0.0.1:1234 + #[arg(short, long)] + addr: String, + + /// Required in 'split' mode, address of the secondary server. E.g. 127.0.0.1:4321 + #[arg(short, long)] + snd_addr: Option, + + /// Mode of operation: either 'single' or 'split' + #[arg(short, long)] + mode: Mode, +} + +#[derive(Clone)] +struct ServerState { + committee_prover: Arc, + inclusion_prover: Arc, + snd_addr: Arc>, + mode: Mode, + active_requests: Arc, +} + +#[tokio::main] +async fn main() -> Result<()> { + let Cli { + addr, + snd_addr, + mode, + } = Cli::parse(); + + if mode == Mode::Split && snd_addr.is_none() { + return Err(Error::msg( + "Secondary server address is required in split mode", + )); + } + + env_logger::init(); + + let state = ServerState { + committee_prover: Arc::new(CommitteeChangeProver::new()), + inclusion_prover: Arc::new(StorageInclusionProver::new()), + snd_addr: Arc::new(snd_addr), + mode, + active_requests: Arc::new(AtomicUsize::new(0)), + }; + + let app = Router::new() + .route("/health", get(health_check)) + .route("/inclusion/proof", post(inclusion_proof)) + .route("/committee/proof", post(committee_proof)) + .route("/committee/verify", post(committee_verify)) + .route("/inclusion/verify", post(inclusion_verify)) + .layer(axum::middleware::from_fn_with_state( + state.clone(), + count_requests_middleware, + )) + .with_state(state); + + info!("Server running on {}", addr); + + let listener = TcpListener::bind(addr).await?; + axum::serve(listener, app).await?; + + Ok(()) +} + +async fn health_check(State(state): State) -> impl IntoResponse { + let active_requests = state.active_requests.load(Ordering::SeqCst); + if active_requests > 0 { + StatusCode::CONFLICT + } else { + StatusCode::OK + } +} + +async fn inclusion_proof( + State(state): State, + request: axum::extract::Request, +) -> Result { + let bytes = axum::body::to_bytes(request.into_body(), usize::MAX) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + let res = Request::from_bytes(&bytes); + + if let Err(err) = res { + error!("Failed to deserialize request object: {err}"); + return Err(StatusCode::BAD_REQUEST); + } + + let request = res.unwrap(); + let Request::ProveInclusion(boxed) = request else { + error!("Invalid request type"); + return Err(StatusCode::BAD_REQUEST); + }; + let res = { + let (proving_mode, inputs) = *boxed; + let proof_handle = + spawn_blocking(move || state.inclusion_prover.prove(&inputs, proving_mode)); + let proof = proof_handle + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + proof + .to_bytes() + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR) + }?; + + let response = Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, "application/octet-stream") + .body(Body::from(res)) + .map_err(|err| { + error!("Could not construct response for client: {err}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + Ok(response) +} + +async fn committee_proof( + State(state): State, + request: axum::extract::Request, +) -> Result { + let bytes = axum::body::to_bytes(request.into_body(), usize::MAX) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + let res = Request::from_bytes(&bytes); + + if let Err(err) = res { + error!("Failed to deserialize request object: {err}"); + return Err(StatusCode::BAD_REQUEST); + } + + let request = res.unwrap(); + let Request::ProveCommitteeChange(boxed) = request else { + error!("Invalid request type"); + return Err(StatusCode::BAD_REQUEST); + }; + let res = { + match state.mode { + Mode::Single => { + let (proving_mode, inputs) = *boxed; + let proof_handle = + spawn_blocking(move || state.committee_prover.prove(&inputs, proving_mode)); + let proof = proof_handle + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + proof + .to_bytes() + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR) + } + Mode::Split => { + let snd_addr = state.snd_addr.as_ref().clone().unwrap(); + forward_request(&bytes, &snd_addr).await + } + } + }?; + + let response = Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, "application/octet-stream") + .body(Body::from(res)) + .map_err(|err| { + error!("Could not construct response for client: {err}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + Ok(response) +} + +async fn inclusion_verify( + State(state): State, + request: axum::extract::Request, +) -> Result { + let bytes = axum::body::to_bytes(request.into_body(), usize::MAX) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + let res = Request::from_bytes(&bytes); + + if let Err(err) = res { + error!("Failed to deserialize request object: {err}"); + return Err(StatusCode::BAD_REQUEST); + } + + let request = res.unwrap(); + let Request::VerifyInclusion(proof) = request else { + error!("Invalid request type"); + return Err(StatusCode::BAD_REQUEST); + }; + let res = { + let is_valid = state.inclusion_prover.verify(&proof).is_ok(); + vec![is_valid.as_u8()] + }; + + let response = Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, "application/octet-stream") + .body(Body::from(res)) + .map_err(|err| { + error!("Could not construct response for client: {err}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + Ok(response) +} + +async fn committee_verify( + State(state): State, + request: axum::extract::Request, +) -> Result { + let bytes = axum::body::to_bytes(request.into_body(), usize::MAX) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + let res = Request::from_bytes(&bytes); + + if let Err(err) = res { + error!("Failed to deserialize request object: {err}"); + return Err(StatusCode::BAD_REQUEST); + } + + let request = res.unwrap(); + let Request::VerifyCommitteeChange(proof) = request else { + error!("Invalid request type"); + return Err(StatusCode::BAD_REQUEST); + }; + let res = { + let is_valid = state.committee_prover.verify(&proof).is_ok(); + vec![is_valid.as_u8()] + }; + + let response = Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, "application/octet-stream") + .body(Body::from(res)) + .map_err(|err| { + error!("Could not construct response for client: {err}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + Ok(response) +} + +async fn forward_request(request_bytes: &[u8], snd_addr: &str) -> Result, StatusCode> { + info!("Connecting to the secondary server"); + let client = reqwest::Client::new(); + let res = client + .post(format!("http://{}/proof", snd_addr)) + .body(request_bytes.to_vec()) + .header(CONTENT_TYPE, "application/octet-stream") + .send() + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + res.bytes() + .await + .map(|b| b.to_vec()) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR) +} + +async fn count_requests_middleware( + State(state): State, + req: axum::http::Request, + next: Next, +) -> Result { + let is_health = req.uri().path() != "/health"; + // Check if the request is for the health endpoint. + if is_health { + // Increment the active requests counter. + state.active_requests.fetch_add(1, Ordering::SeqCst); + } + + // Proceed with the request. + let response = next.run(req).await; + + // Decrement the active requests counter if not a health check. + if is_health { + state.active_requests.fetch_sub(1, Ordering::SeqCst); + } + + Ok(response) +} diff --git a/ethereum/light-client/src/bin/server_primary.rs b/ethereum/light-client/src/bin/server_primary.rs deleted file mode 100644 index eff0dcd8..00000000 --- a/ethereum/light-client/src/bin/server_primary.rs +++ /dev/null @@ -1,90 +0,0 @@ -// Copyright (c) Argument Computer Corporation -// SPDX-License-Identifier: Apache-2.0 - -use anyhow::{Error, Result}; -use clap::Parser; -use ethereum_lc::proofs::committee_change::CommitteeChangeProver; -use ethereum_lc::proofs::Prover; -use ethereum_lc::types::network::Request; -use ethereum_lc::utils::{read_bytes, write_bytes}; -use log::{error, info}; -use std::sync::Arc; -use tokio::net::{TcpListener, TcpStream}; -use tokio::task::spawn_blocking; - -#[derive(Parser)] -struct Cli { - /// Address of this server. E.g. 127.0.0.1:1234 - #[arg(short, long)] - addr: String, - - /// Address of the secondary server. E.g. 127.0.0.1:4321 - #[arg(long)] - snd_addr: String, -} - -#[tokio::main] -async fn main() -> Result<()> { - let Cli { addr, snd_addr } = Cli::parse(); - - env_logger::init(); - - let listener = TcpListener::bind(addr).await?; - info!("Server is running on {}", listener.local_addr()?); - - let snd_addr = Arc::new(snd_addr); - let committee_prover = Arc::new(CommitteeChangeProver::new()); - - loop { - let (mut client_stream, _) = listener.accept().await?; - info!("Received a connection"); - - let committee_prover = committee_prover.clone(); - let snd_addr = snd_addr.clone(); - - tokio::spawn(async move { - info!("Awaiting request"); - let request_bytes = read_bytes(&mut client_stream).await?; - info!("Request received"); - - info!("Deserializing request"); - match Request::from_bytes(&request_bytes) { - Ok(request) => match request { - Request::ProveCommitteeChange(boxed) => { - info!("Start proving"); - let proof_handle = spawn_blocking(move || { - let (proving_mode, inputs) = *boxed; - committee_prover.prove(&inputs, proving_mode) - }); - let proof = proof_handle.await??; - info!("Proof generated. Serializing"); - let proof_bytes = proof.to_bytes()?; - info!("Sending proof"); - write_bytes(&mut client_stream, &proof_bytes).await?; - info!("Proof sent"); - } - Request::VerifyCommitteeChange(proof) => { - write_bytes( - &mut client_stream, - &[u8::from(committee_prover.verify(&proof).is_ok())], - ) - .await?; - } - Request::ProveInclusion(_) | Request::VerifyInclusion(_) => { - info!("Connecting to the secondary server"); - let mut secondary_stream = TcpStream::connect(&*snd_addr).await?; - info!("Sending secondary request"); - write_bytes(&mut secondary_stream, &request_bytes).await?; - info!("Awaiting response from secondary server"); - let response = read_bytes(&mut secondary_stream).await?; - info!("Received response from the secondary server. Sending result"); - write_bytes(&mut client_stream, &response).await?; - info!("Response forwarded"); - } - }, - Err(err) => error!("Failed to deserialize request object: {err}"), - } - Ok::<(), Error>(()) - }); - } -} diff --git a/ethereum/light-client/src/bin/server_secondary.rs b/ethereum/light-client/src/bin/server_secondary.rs deleted file mode 100644 index 64f622da..00000000 --- a/ethereum/light-client/src/bin/server_secondary.rs +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright (c) Argument Computer Corporation -// SPDX-License-Identifier: Apache-2.0 - -use anyhow::{Error, Result}; -use clap::Parser; -use ethereum_lc::proofs::inclusion::StorageInclusionProver; -use ethereum_lc::proofs::Prover; -use ethereum_lc::types::network::Request; -use ethereum_lc::utils::{read_bytes, write_bytes}; -use log::{error, info}; -use std::sync::Arc; -use tokio::net::TcpListener; -use tokio::task::spawn_blocking; - -#[derive(Parser)] -struct Cli { - /// Address of this server. E.g. 127.0.0.1:4321 - #[arg(short, long)] - addr: String, -} - -#[tokio::main] -async fn main() -> Result<()> { - let Cli { addr } = Cli::parse(); - - env_logger::init(); - - let listener = TcpListener::bind(addr).await?; - - info!("Server is running on {}", listener.local_addr()?); - - let inclusion_prover = Arc::new(StorageInclusionProver::new()); - - loop { - let (mut client_stream, _) = listener.accept().await?; - info!("Received a connection"); - - let inclusion_prover = inclusion_prover.clone(); - - tokio::spawn(async move { - info!("Awaiting request"); - let request_bytes = read_bytes(&mut client_stream).await?; - info!("Request received"); - - info!("Deserializing request"); - match Request::from_bytes(&request_bytes) { - Ok(request) => match request { - Request::ProveInclusion(boxed) => { - info!("Start proving"); - let proof_handle = spawn_blocking(move || { - let (proving_mode, inputs) = *boxed; - inclusion_prover.prove(&inputs, proving_mode) - }); - let proof = proof_handle.await??; - info!("Proof generated. Serializing"); - let proof_bytes = proof.to_bytes()?; - info!("Sending proof"); - write_bytes(&mut client_stream, &proof_bytes).await?; - info!("Proof sent"); - } - Request::VerifyInclusion(proof) => { - write_bytes( - &mut client_stream, - &[u8::from(inclusion_prover.verify(&proof).is_ok())], - ) - .await?; - } - _ => { - error!("Received unexpected request object, secondary server only handles inclusion proofs") - } - }, - Err(err) => error!("Failed to deserialize request object: {err}"), - } - Ok::<(), Error>(()) - }); - } -} diff --git a/ethereum/light-client/src/client/beacon.rs b/ethereum/light-client/src/client/beacon.rs index 71f06b7a..9147355f 100644 --- a/ethereum/light-client/src/client/beacon.rs +++ b/ethereum/light-client/src/client/beacon.rs @@ -9,6 +9,7 @@ //! It maintains an internal HTTP client to handle communication with the Beacon Node. use crate::client::error::ClientError; +use crate::client::utils::test_connection; use crate::types::beacon::update::UpdateResponse; use ethereum_lc_core::types::bootstrap::Bootstrap; use ethereum_lc_core::types::update::FinalityUpdate; @@ -43,6 +44,16 @@ impl BeaconClient { } } + /// Test the connection to the beacon node. + /// + /// # Returns + /// + /// A result indicating whether the connection was successful. + pub(crate) async fn test_endpoint(&self) -> Result<(), ClientError> { + // Try to connect to the beacon node server + test_connection(&self.beacon_node_address).await + } + /// `get_bootstrap_data` makes an HTTP request to the Beacon Node API to get the bootstrap data. /// /// # Arguments diff --git a/ethereum/light-client/src/client/checkpoint.rs b/ethereum/light-client/src/client/checkpoint.rs index df275aa7..cdc0fd53 100644 --- a/ethereum/light-client/src/client/checkpoint.rs +++ b/ethereum/light-client/src/client/checkpoint.rs @@ -9,6 +9,7 @@ //! It maintains an internal HTTP client to handle communication with the Checkpoint Provider API. use crate::client::error::ClientError; +use crate::client::utils::test_connection; use crate::types::checkpoint::{Checkpoint, SlotsResponse}; use reqwest::header::ACCEPT; use reqwest::Client; @@ -30,6 +31,16 @@ impl CheckpointClient { } } + /// Test the connection to the checkpoint provider. + /// + /// # Returns + /// + /// A result indicating whether the connection was successful. + pub(crate) async fn test_endpoint(&self) -> Result<(), ClientError> { + // Try to connect to the checkpoint provider + test_connection(&self.address).await + } + /// `get_checkpoint` makes an HTTP request to the Checkpoint Provider API to get the checkpoint /// at the specified slot. If no particular slot is specified, returns the latest checkpoint. /// diff --git a/ethereum/light-client/src/client/error.rs b/ethereum/light-client/src/client/error.rs index 6dab4b88..1e492764 100644 --- a/ethereum/light-client/src/client/error.rs +++ b/ethereum/light-client/src/client/error.rs @@ -18,4 +18,6 @@ pub enum ClientError { #[source] source: Box, }, + #[error("Could not connect to the given address, {address}")] + Connection { address: String }, } diff --git a/ethereum/light-client/src/client/mod.rs b/ethereum/light-client/src/client/mod.rs index 6f2005d5..90237442 100644 --- a/ethereum/light-client/src/client/mod.rs +++ b/ethereum/light-client/src/client/mod.rs @@ -31,6 +31,7 @@ pub(crate) mod checkpoint; pub mod error; pub(crate) mod proof_server; pub mod storage; +mod utils; /// The client for the light client. It is the entrypoint for any needed remote call. #[derive(Debug, Clone)] @@ -68,6 +69,22 @@ impl Client { } } + /// Test the connection to all the endpoints. + /// + /// # Returns + /// + /// A result indicating whether the connections were successful. + pub async fn test_endpoints(&self) -> Result<(), ClientError> { + tokio::try_join!( + self.beacon_client.test_endpoint(), + self.checkpoint_client.test_endpoint(), + self.proof_server_client.test_endpoint(), + self.storage_client.test_endpoint() + )?; + + Ok(()) + } + /// `get_bootstrap_data` makes an HTTP request to the Beacon Node API to get the bootstrap data. /// /// # Arguments diff --git a/ethereum/light-client/src/client/proof_server.rs b/ethereum/light-client/src/client/proof_server.rs index 22dffe94..2f2c2fe2 100644 --- a/ethereum/light-client/src/client/proof_server.rs +++ b/ethereum/light-client/src/client/proof_server.rs @@ -7,21 +7,24 @@ //! connections to the Proof Server to generate and verify our proofs. use crate::client::error::ClientError; +use crate::client::utils::test_connection; use crate::proofs::committee_change::CommitteeChangeIn; use crate::proofs::inclusion::StorageInclusionIn; use crate::proofs::{ProofType, ProvingMode}; use crate::types::network::Request; -use crate::utils::{read_bytes, write_bytes}; use ethereum_lc_core::merkle::storage_proofs::EIP1186Proof; use ethereum_lc_core::types::store::LightClientStore; use ethereum_lc_core::types::update::Update; -use tokio::net::TcpStream; +use reqwest::header::CONTENT_TYPE; +use reqwest::Client; /// An internal client to handle communication with a Checkpoint Provider. #[derive(Debug, Clone)] pub(crate) struct ProofServerClient { /// The address of the Proof Server. address: String, + /// The inner HTTP client. + inner: Client, } impl ProofServerClient { @@ -37,9 +40,20 @@ impl ProofServerClient { pub(crate) fn new(proof_server_address: &str) -> Self { Self { address: proof_server_address.to_string(), + inner: Client::new(), } } + /// Test the connection to the proof server. + /// + /// # Returns + /// + /// A result indicating whether the connection was successful. + pub(crate) async fn test_endpoint(&self) -> Result<(), ClientError> { + // Try to connect to the proof server + test_connection(&self.address).await + } + /// Prove a sync committee change by executing the [`LightClientStore::process_light_client_update`] /// and proving its correct execution. /// @@ -58,37 +72,22 @@ impl ProofServerClient { store: LightClientStore, update: Update, ) -> Result { - let mut stream = - TcpStream::connect(&self.address) - .await - .map_err(|err| ClientError::Request { - endpoint: "ProofServer::ProveCommitteeChange".into(), - source: err.into(), - })?; + let url = format!("http://{}/committee/proof", self.address); + let inputs = CommitteeChangeIn::new(store, update); let request = Request::ProveCommitteeChange(Box::new((proving_mode, inputs))); - write_bytes( - &mut stream, - &request.to_bytes().map_err(|err| ClientError::Request { - endpoint: "ProofServer::ProveCommitteeChange".into(), - source: err.into(), - })?, - ) - .await - .map_err(|err| ClientError::Request { - endpoint: "prover".into(), - source: err.into(), - })?; - - let res = read_bytes(&mut stream) - .await - .map_err(|err| ClientError::Response { - endpoint: "ProofServer::ProveCommitteeChange".into(), - source: err.into(), - })?; + let response = self + .post_request( + &url, + request.to_bytes().map_err(|err| ClientError::Request { + endpoint: "ProofServer::ProveCommitteeChange".into(), + source: err.into(), + })?, + ) + .await?; - ProofType::from_bytes(&res).map_err(|err| ClientError::Response { + ProofType::from_bytes(&response).map_err(|err| ClientError::Response { endpoint: "ProofServer::ProveCommitteeChange".into(), source: err.into(), }) @@ -107,44 +106,21 @@ impl ProofServerClient { &self, proof: ProofType, ) -> Result { - let mut stream = - TcpStream::connect(&self.address) - .await - .map_err(|err| ClientError::Request { - endpoint: "ProofServer::VerifyCommitteeChange".into(), - source: err.into(), - })?; + let url = format!("http://{}/committee/verify", self.address); let request = Request::VerifyCommitteeChange(proof); - write_bytes( - &mut stream, - &request.to_bytes().map_err(|err| ClientError::Request { - endpoint: "ProofServer::VerifyCommitteeChange".into(), - source: err.into(), - })?, - ) - .await - .map_err(|err| ClientError::Request { - endpoint: "prover".into(), - source: err.into(), - })?; - - let res = read_bytes(&mut stream) - .await - .map_err(|err| ClientError::Response { - endpoint: "ProofServer::VerifyCommitteeChange".into(), - source: err.into(), - })?; - - if res.len() != 1 { - return Err(ClientError::Response { - endpoint: "ProofServer::VerifyCommitteeChange".into(), - source: "Invalid response length".into(), - }); - } + let response = self + .post_request( + &url, + request.to_bytes().map_err(|err| ClientError::Request { + endpoint: "ProofServer::VerifyCommitteeChange".into(), + source: err.into(), + })?, + ) + .await?; - Ok(res[0] == 1) + Ok(response.first().unwrap_or(&0) == &1) } /// Prove the inclusion of a given value in the chain storage by executing [`EIP1186Proof::verify`] @@ -167,37 +143,22 @@ impl ProofServerClient { update: Update, eip1186_proof: EIP1186Proof, ) -> Result { - let mut stream = - TcpStream::connect(&self.address) - .await - .map_err(|err| ClientError::Request { - endpoint: "ProofServer::ProveInclusion".into(), - source: err.into(), - })?; + let url = format!("http://{}/inclusion/proof", self.address); + let inputs = StorageInclusionIn::new(store, update, eip1186_proof); let request = Request::ProveInclusion(Box::new((proving_mode, inputs))); - write_bytes( - &mut stream, - &request.to_bytes().map_err(|err| ClientError::Request { - endpoint: "ProofServer::ProveInclusion".into(), - source: err.into(), - })?, - ) - .await - .map_err(|err| ClientError::Request { - endpoint: "prover".into(), - source: err.into(), - })?; - - let res = read_bytes(&mut stream) - .await - .map_err(|err| ClientError::Response { - endpoint: "ProofServer::ProveInclusion".into(), - source: err.into(), - })?; + let response = self + .post_request( + &url, + request.to_bytes().map_err(|err| ClientError::Request { + endpoint: "ProofServer::ProveInclusion".into(), + source: err.into(), + })?, + ) + .await?; - ProofType::from_bytes(&res).map_err(|err| ClientError::Response { + ProofType::from_bytes(&response).map_err(|err| ClientError::Response { endpoint: "ProofServer::ProveInclusion".into(), source: err.into(), }) @@ -216,43 +177,55 @@ impl ProofServerClient { &self, proof: ProofType, ) -> Result { - let mut stream = - TcpStream::connect(&self.address) - .await - .map_err(|err| ClientError::Request { - endpoint: "ProofServer::VerifyInclusiona".into(), - source: err.into(), - })?; + let url = format!("http://{}/inclusion/verify", self.address); let request = Request::VerifyInclusion(proof); - write_bytes( - &mut stream, - &request.to_bytes().map_err(|err| ClientError::Request { - endpoint: "ProofServer::VerifyInclusiona".into(), - source: err.into(), - })?, - ) - .await - .map_err(|err| ClientError::Request { - endpoint: "prover".into(), - source: err.into(), - })?; + let response = self + .post_request( + &url, + request.to_bytes().map_err(|err| ClientError::Request { + endpoint: "ProofServer::VerifyInclusion".into(), + source: err.into(), + })?, + ) + .await?; - let res = read_bytes(&mut stream) + Ok(response.first().unwrap_or(&0) == &1) + } + + /// Send a POST request to the given URL with the given request body. + /// + /// # Arguments + /// + /// * `url` - The URL to send the request to. + /// * `request` - The request body to send. + /// + /// # Returns + /// + /// The response from the server. + async fn post_request(&self, url: &str, request: Vec) -> Result, ClientError> { + // Call the endpoint. + let response = self + .inner + .post(url) + .body(request) + .header(CONTENT_TYPE, "application/octet-stream") + .send() .await - .map_err(|err| ClientError::Response { - endpoint: "ProofServer::VerifyInclusiona".into(), - source: err.into(), + .map_err(|err| ClientError::Request { + endpoint: url.into(), + source: Box::new(err), })?; - if res.len() != 1 { - return Err(ClientError::Response { - endpoint: "ProofServer::VerifyInclusiona".into(), - source: "Invalid response length".into(), - }); - } - - Ok(res[0] == 1) + // Store the bytes in a variable first. + response + .bytes() + .await + .map(|bytes| bytes.to_vec()) + .map_err(|err| ClientError::Response { + endpoint: url.into(), + source: err.into(), + }) } } diff --git a/ethereum/light-client/src/client/storage.rs b/ethereum/light-client/src/client/storage.rs index 0fdf9cdb..4a301159 100644 --- a/ethereum/light-client/src/client/storage.rs +++ b/ethereum/light-client/src/client/storage.rs @@ -15,6 +15,7 @@ //! authenticate the client with the RPC provider. use crate::client::error::ClientError; +use crate::client::utils::test_connection; use crate::types::storage::GetProofResponse; use ethers_core::types::EIP1186ProofResponse; use getset::Getters; @@ -48,6 +49,16 @@ impl StorageClient { } } + /// Test the connection to the RPC provider. + /// + /// # Returns + /// + /// A result indicating whether the connection was successful. + pub(crate) async fn test_endpoint(&self) -> Result<(), ClientError> { + // Try to connect to the RPC provider. + test_connection(&self.storage_provider_address).await + } + /// `get_proof` makes an HTTP request to the RPC Provider API to get the proof of inclusion /// for the specified address and specified storage keys. /// diff --git a/ethereum/light-client/src/client/utils.rs b/ethereum/light-client/src/client/utils.rs new file mode 100644 index 00000000..834cccd8 --- /dev/null +++ b/ethereum/light-client/src/client/utils.rs @@ -0,0 +1,30 @@ +use crate::client::error::ClientError; +use anyhow::Result; +use backoff::ExponentialBackoff; +use tokio::net::TcpStream; + +/// Tries to execute a future related to a connection to an endpoint. +/// It retries to connect following an exponential policy. +/// +/// # Arguments +/// +/// * `connection` - Future that represent the connection to the endpoint. +/// +/// # Returns +/// +/// Returns an error if the connection failed. +pub(crate) async fn test_connection(address: &str) -> Result<(), ClientError> { + // Try to connect to the proof server + let res = backoff::future::retry(ExponentialBackoff::default(), || async { + Ok(TcpStream::connect(address).await?) + }) + .await; + + if res.is_err() { + return Err(ClientError::Connection { + address: address.to_string(), + }); + } + + Ok(()) +} diff --git a/ethereum/light-client/src/lib.rs b/ethereum/light-client/src/lib.rs index fcff857f..42ac737f 100644 --- a/ethereum/light-client/src/lib.rs +++ b/ethereum/light-client/src/lib.rs @@ -25,7 +25,6 @@ //! - [`client`] : The client that can be used to coordinate data fetching from the remote services. //! - [`proofs`]: The utilities to generate and verify proofs for the light client. //! - [`types`]: Types and utilities to leverage data from the remote services. -//! - [`utils`]: Utilities to help with the light client. //! //! For more detailed information, users should refer to the specific documentation for each //! sub-module. @@ -35,4 +34,3 @@ pub mod proofs; #[cfg(feature = "ethereum")] pub use ethereum_lc_core::test_utils; pub mod types; -pub mod utils; diff --git a/ethereum/light-client/src/utils.rs b/ethereum/light-client/src/utils.rs deleted file mode 100644 index dad05b51..00000000 --- a/ethereum/light-client/src/utils.rs +++ /dev/null @@ -1,29 +0,0 @@ -use anyhow::Result; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::net::TcpStream; - -/// Auxiliary function to write bytes on a stream. Before actually writing the -/// bytes, it writes the number of bytes to be written as a big-endian `u32`. -/// -/// # Errors -/// This function errors if the number of bytes can't fit in a `u32` -pub async fn write_bytes(stream: &mut TcpStream, bytes: &[u8]) -> Result<()> { - stream.write_u32(u32::try_from(bytes.len())?).await?; - stream.write_all(bytes).await?; - stream.flush().await?; - Ok(()) -} - -/// Auxiliary function to read bytes on a stream. Before actually reading the -/// bytes, it reads the number of bytes to be read as a big-endian `u32`. -/// -/// # Important -/// The number of bytes read must fit in a `u32` thus the amount of data read in -/// a single call to this function is 4 GB. -pub async fn read_bytes(stream: &mut TcpStream) -> Result> { - let size = stream.read_u32().await?; - let mut bytes = vec![0; size as usize]; - let num_read = stream.read_exact(&mut bytes).await?; - assert_eq!(num_read, bytes.len()); - Ok(bytes) -} diff --git a/fixture-generator/Cargo.lock b/fixture-generator/Cargo.lock index d78cdb07..d6345b33 100644 --- a/fixture-generator/Cargo.lock +++ b/fixture-generator/Cargo.lock @@ -2230,6 +2230,20 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b7e4c2464d97fe331d41de9d5db0def0a96f4d823b8b32a2efd503578988973" +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "futures-core", + "getrandom 0.2.14", + "instant", + "pin-project-lite", + "rand 0.8.5", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.73" @@ -3965,6 +3979,8 @@ name = "ethereum-lc" version = "1.0.1" dependencies = [ "anyhow", + "axum 0.7.5", + "backoff", "clap 4.5.16", "env_logger", "ethereum-lc-core",