diff --git a/.dockerignore b/.dockerignore index 46c3ebed..dbc99a6b 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,6 +1,6 @@ # Generated by Cargo # will have compiled files and executables -/target/ +target/ # Remove Cargo.lock from gitignore if creating an executable, leave it for libraries # More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html diff --git a/.github/actions/deps/action.yaml b/.github/actions/deps/action.yaml index aff79b69..7b277b1b 100644 --- a/.github/actions/deps/action.yaml +++ b/.github/actions/deps/action.yaml @@ -18,15 +18,14 @@ runs: shell: bash run: | curl -Lo /tmp/protoc.zip \ - https://github.com/protocolbuffers/protobuf/releases/download/v22.0/protoc-22.0-linux-x86_64.zip + https://github.com/protocolbuffers/protobuf/releases/download/v25.2/protoc-25.2-linux-x86_64.zip unzip /tmp/protoc.zip -d ${HOME}/.local echo "PROTOC=${HOME}/.local/bin/protoc" >> $GITHUB_ENV export PATH="${PATH}:${HOME}/.local/bin" + - name: Rust cache uses: Swatinem/rust-cache@v2 - with: - cache-directories: | - proto/src/prost - - name: Compile Protobuf definitions + + - name: "Compile Protobuf definitions (needed by fmt, doc, etc.)" shell: bash run: cargo build -p tenderdash-proto diff --git a/.github/workflows/audit.yml b/.github/workflows/audit.yml index dd952922..89da0e52 100644 --- a/.github/workflows/audit.yml +++ b/.github/workflows/audit.yml @@ -10,7 +10,7 @@ jobs: security_audit: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Cache cargo bin uses: actions/cache@v1 with: diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 41130bea..27309427 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -22,7 +22,7 @@ jobs: tenderdash: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - uses: ./.github/actions/deps - name: Build source code shell: bash diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml new file mode 100644 index 00000000..54aff681 --- /dev/null +++ b/.github/workflows/docker.yml @@ -0,0 +1,59 @@ +--- +name: Docker + +on: + workflow_dispatch: + pull_request: + paths-ignore: + - "docs/**" + push: + paths-ignore: + - "docs/**" + branches: + - master + - "v*.*.*" + +jobs: + build: + strategy: + matrix: + os: [alpine, debian] + runs-on: ubuntu-22.04 + steps: + - uses: actions/checkout@v3 + + - name: Set up QEMU + uses: docker/setup-qemu-action@master + with: + platforms: amd64 + + - name: Set up Docker Build + uses: docker/setup-buildx-action@v2.4.1 + + # ARM build takes very long time, so we build PRs for AMD64 only + - name: Select target platform + id: select_platforms + run: | + if [[ "${GITHUB_EVENT_NAME}" == "pull_request" ]] ; then + echo "build_platforms=linux/amd64" >> $GITHUB_ENV + else + echo "build_platforms=linux/amd64,linux/arm64" >> $GITHUB_ENV + fi + + - name: Build Docker sample image + id: docker_build + uses: docker/build-push-action@v4.0.0 + with: + context: . + file: ./Dockerfile-${{ matrix.os }} + build-args: | + REVISION=${{ github.ref }} + platforms: ${{ env.build_platforms }} + push: false + cache-from: | + type=gha + cache-to: | + type=gha,mode=max + + - name: Show Docker image digest + run: echo ${{ steps.docker_build.outputs.digest }} diff --git a/.github/workflows/rust-clippy.yml b/.github/workflows/rust-clippy.yml index fec392e3..e4a947d0 100644 --- a/.github/workflows/rust-clippy.yml +++ b/.github/workflows/rust-clippy.yml @@ -25,8 +25,7 @@ jobs: actions: read # only required for a private repository by github/codeql-action/upload-sarif to get the Action run status steps: - name: Checkout code - uses: actions/checkout@v2 - + uses: actions/checkout@v3 - name: Install Rust toolchain and deps uses: ./.github/actions/deps @@ -42,7 +41,7 @@ jobs: continue-on-error: true - name: Upload analysis results to GitHub - uses: github/codeql-action/upload-sarif@v1 + uses: github/codeql-action/upload-sarif@v2 with: sarif_file: rust-clippy-results.sarif wait-for-processing: true diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 79f8314a..4c03c7d3 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -23,7 +23,7 @@ jobs: fmt: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - uses: ./.github/actions/deps - name: Check code formatting shell: bash @@ -32,7 +32,7 @@ jobs: docs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - uses: ./.github/actions/deps - name: Check documentation generation shell: bash diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index fd7368df..206af614 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -28,8 +28,7 @@ jobs: RUST_TEST_TIME_INTEGRATION: "3000,6000" RUST_TEST_TIME_DOCTEST: "3000,6000" steps: - - uses: actions/checkout@v2 - + - uses: actions/checkout@v3 - uses: ./.github/actions/deps with: toolchain: nightly diff --git a/.gitignore b/.gitignore index 7dd10353..427f3b94 100644 --- a/.gitignore +++ b/.gitignore @@ -12,7 +12,13 @@ Cargo.lock # These are log files emitted by model-based tests **/*.log +# These are intelj files +.idea + # Proptest regressions dumps **/*.proptest-regressions .vscode/launch.json *.profraw + +# Mac spotlight +.DS_Store diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 00000000..e69de29b diff --git a/Cargo.toml b/Cargo.toml index f1f5d87a..78f73332 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,3 @@ [workspace] - +resolver = "2" members = ["abci", "proto-compiler", "proto"] diff --git a/Dockerfile-alpine b/Dockerfile-alpine new file mode 100644 index 00000000..2f4f937e --- /dev/null +++ b/Dockerfile-alpine @@ -0,0 +1,56 @@ +# This is an example Dockerfile, demonstrating build process of rs-tenderdash-abci + +# rust:alpine3.17, published Mar 24, 2023 at 2:55 am +FROM rust:alpine3.17 + +RUN apk add --no-cache \ + git \ + wget \ + alpine-sdk \ + openssl-dev \ + libc6-compat \ + perl \ + unzip \ + bash + +SHELL ["/bin/bash", "-c"] + +# one of: aarch_64, x86_64 +# ARG PROTOC_ARCH=x86_64 +ARG TARGETPLATFORM +ARG BUILDPLATFORM + +# Install protoc - protobuf compiler +# The one shipped with Alpine does not work +RUN if [[ "$BUILDPLATFORM" == "linux/arm64" ]] ; then export PROTOC_ARCH=aarch_64; else export PROTOC_ARCH=x86_64 ; fi; \ + wget -q -O /tmp/protoc.zip https://github.com/protocolbuffers/protobuf/releases/download/v25.2/protoc-25.2-linux-${PROTOC_ARCH}.zip && \ + unzip -qd /opt/protoc /tmp/protoc.zip && \ + rm /tmp/protoc.zip && \ + ln -s /opt/protoc/bin/protoc /usr/bin/ + +# Create a dummy package +RUN cargo init /usr/src/abci-app +WORKDIR /usr/src/abci-app + +# Let's display ABCI version instead of "hello world" +RUN sed -i'' -e 's/println!("Hello, world!");/println!("ABCI version: {}",tenderdash_abci::proto::ABCI_VERSION);/' src/main.rs + +# revspec or SHA of commit/branch/tag to use +ARG REVISION="refs/heads/master" + +# Add tenderdash-abci as a dependency and build the package +# +# Some notes here: +# 1. All these --mount... are to cache reusable info between runs. +# See https://doc.rust-lang.org/cargo/guide/cargo-home.html#caching-the-cargo-home-in-ci +# 2. We add `--config net.git-fetch-with-cli=true` to address ARM build issue, +# see https://github.com/rust-lang/cargo/issues/10781#issuecomment-1441071052 +# 3. To preserve space on github cache, we call `cargo clean`. +RUN --mount=type=cache,sharing=shared,target=${CARGO_HOME}/registry/index \ + --mount=type=cache,sharing=shared,target=${CARGO_HOME}/registry/cache \ + --mount=type=cache,sharing=shared,target=${CARGO_HOME}/git/db \ + cargo add --config net.git-fetch-with-cli=true \ + --git https://github.com/dashpay/rs-tenderdash-abci --rev "${REVISION}" tenderdash-abci && \ + cargo build --config net.git-fetch-with-cli=true && \ + cargo run --config net.git-fetch-with-cli=true && \ + cargo clean diff --git a/Dockerfile-debian b/Dockerfile-debian new file mode 100644 index 00000000..c4c3ef6d --- /dev/null +++ b/Dockerfile-debian @@ -0,0 +1,47 @@ +# This is an example Dockerfile, demonstrating build process of rs-tenderdash-abci + +# We use Debian base image, as Alpine has some segmentation fault issue +FROM rust:bullseye + +RUN --mount=type=cache,sharing=locked,target=/var/lib/apt/lists \ + --mount=type=cache,sharing=locked,target=/var/cache/apt \ + rm -f /etc/apt/apt.conf.d/docker-clean && \ + apt-get update -qq && \ + apt-get install -qq --yes \ + git \ + wget \ + bash + +# Install protoc - protobuf compiler +# The one shipped with Alpine does not work +RUN if [[ "$BUILDPLATFORM" == "linux/arm64" ]] ; then export PROTOC_ARCH=aarch_64; else export PROTOC_ARCH=x86_64 ; fi; \ + wget -q -O /tmp/protoc.zip https://github.com/protocolbuffers/protobuf/releases/download/v25.2/protoc-25.2-linux-${PROTOC_ARCH}.zip && \ + unzip -qd /opt/protoc /tmp/protoc.zip && \ + rm /tmp/protoc.zip && \ + ln -s /opt/protoc/bin/protoc /usr/bin/ + +# Create a dummy package +RUN cargo init /usr/src/abci-app +WORKDIR /usr/src/abci-app + + +# revspec or SHA of commit/branch/tag to use +ARG REVISION="refs/heads/master" + +SHELL ["/bin/bash", "-c"] + +# Add tenderdash-abci as a dependency and build the package +# +# Some notes here: +# 1. All these --mount... are to cache reusable info between runs. +# See https://doc.rust-lang.org/cargo/guide/cargo-home.html#caching-the-cargo-home-in-ci +# 2. We add `--config net.git-fetch-with-cli=true` to address ARM build issue, +# see https://github.com/rust-lang/cargo/issues/10781#issuecomment-1441071052 +# 3. To preserve space on github cache, we call `cargo clean`. +RUN --mount=type=cache,sharing=shared,target=${CARGO_HOME}/registry/index \ + --mount=type=cache,sharing=shared,target=${CARGO_HOME}/registry/cache \ + --mount=type=cache,sharing=shared,target=${CARGO_HOME}/git/db \ + cargo add --config net.git-fetch-with-cli=true \ + --git https://github.com/dashpay/rs-tenderdash-abci --rev "${REVISION}" tenderdash-abci && \ + cargo build --config net.git-fetch-with-cli=true && \ + cargo clean diff --git a/abci/Cargo.toml b/abci/Cargo.toml index 6d4b1606..ba3e3466 100644 --- a/abci/Cargo.toml +++ b/abci/Cargo.toml @@ -1,6 +1,6 @@ [package] +version = "0.14.0-dev.9" name = "tenderdash-abci" -version = "0.12.0-dev.1" edition = "2021" license = "Apache-2.0" readme = "README.md" @@ -13,29 +13,70 @@ description = """tenderdash-abci provides a simple framework with which to build low-level applications on top of Tenderdash.""" [features] -default = ["server", "docker-tests"] +default = [ + "server", + "docker-tests", + "crypto", + "tcp", + "unix", + "grpc", + "tracing-span", +] # docker-tests includes integration tests that require docker to be available docker-tests = ["server"] -server = ["tracing-subscriber/fmt"] +server = [ + "tracing-subscriber/fmt", + "dep:tokio", + "dep:tokio-util", + "dep:futures", +] + +grpc = ["tenderdash-proto/grpc"] +crypto = ["dep:lhash"] +tcp = ["server"] +unix = ["server"] +tracing-span = ["dep:uuid"] [[example]] name = "echo_socket" required-features = ["server"] [dependencies] +uuid = { version = "1.4.1", features = ["v4", "fast-rng"], optional = true } +tenderdash-proto = { path = "../proto" } bytes = { version = "1.0" } -prost = { version = "0.11" } -tenderdash-proto = { version = "0.12.0-dev.1", default-features = false, path = "../proto" } +prost = { version = "0.12" } tracing = { version = "0.1", default-features = false } -tracing-subscriber = { version = "0.3", optional = true, default-features = false } -thiserror = "1.0.39" +tracing-subscriber = { version = "0.3", optional = true, default-features = false, features = [ + "ansi", + "env-filter", +] } +thiserror = { version = "1.0.39" } url = { version = "2.3.1" } semver = { version = "1.0.17" } +lhash = { version = "1.0.1", features = ["sha256"], optional = true } +hex = { version = "0.4" } +tokio-util = { version = "0.7.8", features = [ + "net", + "codec", +], default-features = false, optional = true } +tokio = { version = "1.28", features = [ + "net", + "io-util", + "rt-multi-thread", + "sync", + "macros", +], default-features = false, optional = true } +futures = { version = "0.3.28", optional = true } [dev-dependencies] -anyhow = "1.0.69" -bincode = "2.0.0-rc.2" -blake2 = "0.10.6" -bollard = { version = "0.14.0" } +anyhow = { version = "1.0.69" } +bincode = { version = "2.0.0-rc.2" } +blake2 = { version = "0.10.6" } +bollard = { version = "0.16.0" } futures = { version = "0.3.26" } tokio = { version = "1", features = ["macros", "signal", "time", "io-std"] } +hex = { version = "0.4" } +lazy_static = { version = "1.4" } +# For tests of gRPC server +tonic = { version = "0.11" } diff --git a/abci/examples/echo_socket.rs b/abci/examples/echo_socket.rs index e7e43388..604910ea 100644 --- a/abci/examples/echo_socket.rs +++ b/abci/examples/echo_socket.rs @@ -1,9 +1,12 @@ -use tenderdash_abci::{proto::abci, start_server, Application}; +use lazy_static::lazy_static; +use tenderdash_abci::{proto::abci, Application, CancellationToken, ServerBuilder}; use tracing::info; use tracing_subscriber::filter::LevelFilter; const SOCKET: &str = "/tmp/abci.sock"; - +lazy_static! { + static ref CANCEL_TOKEN: CancellationToken = CancellationToken::new(); +} pub fn main() { let log_level = LevelFilter::DEBUG; tracing_subscriber::fmt().with_max_level(log_level).init(); @@ -12,9 +15,16 @@ pub fn main() { info!("This application listens on {SOCKET} and waits for incoming Tenderdash requests."); let socket = format!("unix://{}", SOCKET); - let server = start_server(&socket, EchoApp {}).expect("server failed"); + let app = EchoApp {}; + + let cancel = CANCEL_TOKEN.clone(); + let server = ServerBuilder::new(app, &socket) + .with_cancel_token(cancel) + .build() + .expect("server failed"); + loop { - match server.handle_connection() { + match server.next_client() { Ok(_) => {}, Err(e) => tracing::error!("error {}", e), }; @@ -30,7 +40,11 @@ impl Application for EchoApp { &self, request: abci::RequestEcho, ) -> Result { - info!("received echo"); + info!("received echo, cancelling"); + + let cancel = CANCEL_TOKEN.clone(); + cancel.cancel(); + Ok(abci::ResponseEcho { message: request.message, }) diff --git a/abci/src/application.rs b/abci/src/application.rs index 88812d94..07ae4da3 100644 --- a/abci/src/application.rs +++ b/abci/src/application.rs @@ -1,7 +1,6 @@ //! ABCI application interface. -use std::panic::RefUnwindSafe; -use tracing::debug; +use tracing::{debug, error}; use crate::proto::{ abci, @@ -139,7 +138,7 @@ pub trait Application { } } -pub trait RequestDispatcher: RefUnwindSafe { +pub trait RequestDispatcher { /// Executes the relevant application method based on the type of the /// request, and produces the corresponding response. /// @@ -149,11 +148,13 @@ pub trait RequestDispatcher: RefUnwindSafe { } // Implement `RequestDispatcher` for all `Application`s. -impl RequestDispatcher for A { +impl RequestDispatcher for A { fn handle(&self, request: abci::Request) -> Option { - tracing::trace!(?request, "received request"); + #[cfg(feature = "tracing-span")] + let _span = super::tracing_span::span(request.clone().value?); + tracing::trace!(?request, "received ABCI request"); - let response: Result = match request.value? { + let response: response::Value = match request.value? { request::Value::Echo(req) => self.echo(req).map(|v| v.into()), request::Value::Flush(req) => self.flush(req).map(|v| v.into()), request::Value::Info(req) => self.info(req).map(|v| v.into()), @@ -175,15 +176,15 @@ impl RequestDispatcher for A { request::Value::VerifyVoteExtension(req) => { self.verify_vote_extension(req).map(|v| v.into()) }, - }; + } + .unwrap_or_else(|e| e.into()); - let response = match response { - Ok(v) => v, - Err(e) => response::Value::from(e), + if let response::Value::Exception(_) = response { + tracing::error!(?response, "sending ABCI exception"); + } else { + tracing::trace!(?response, "sending ABCI response"); }; - tracing::trace!(?response, "sending response"); - Some(abci::Response { value: Some(response), }) @@ -199,6 +200,8 @@ impl RequestDispatcher for A { /// /// ## Examples /// +/// ### Using `check_version` in `Application::info` handler +/// /// ```should_panic /// use tenderdash_abci::{check_version, Application}; /// use tenderdash_abci::proto::abci::{RequestInfo, ResponseInfo, ResponseException}; @@ -225,42 +228,123 @@ pub fn check_version(tenderdash_version: &str) -> bool { match_versions(tenderdash_version, tenderdash_proto::ABCI_VERSION) } -fn match_versions(tenderdash_abci_requirement: &str, our_abci_version: &str) -> bool { - let our_version = - semver::Version::parse(our_abci_version).expect("cannot parse protobuf library version"); - - let require = String::from("^") + tenderdash_abci_requirement; - let td_version = - semver::VersionReq::parse(require.as_str()).expect("cannot parse tenderdash version"); - - debug!("ABCI version: required: {}, our: {}", require, our_version); - - td_version.matches(&our_version) +/// Check if Tenderdash provides ABCI interface compatible with our library. +/// +/// Tenderdash is compatible if its abci version matches the abci version of +/// linked protobuf data objects, eg. version provided in +/// `rs_tenderdash_abci_version` argument. The PATCH level can be ignored, as is +/// should be backwards-compatible. +/// +/// For example, Tenderdash abci version `1.23.2` should work with +/// rs-tenderdash-abci linked with abci version `1.23.1` and `1.22.1`, but not +/// with `1.24.1` or `0.23.1`. +fn match_versions(tenderdash_version: &str, rs_tenderdash_abci_version: &str) -> bool { + let rs_tenderdash_abci_version = semver::Version::parse(rs_tenderdash_abci_version) + .expect("cannot parse protobuf library version"); + let tenderdash_version = + semver::Version::parse(tenderdash_version).expect("cannot parse tenderdash version"); + + let requirement = match rs_tenderdash_abci_version.pre.as_str() { + "" => format!( + "^{}.{}", + rs_tenderdash_abci_version.major, rs_tenderdash_abci_version.minor + ), + pre => format!( + "^{}.{}.0-{}", + rs_tenderdash_abci_version.major, rs_tenderdash_abci_version.minor, pre + ), + }; + + let matcher = semver::VersionReq::parse(&requirement).expect("cannot parse tenderdash version"); + + match matcher.matches(&tenderdash_version) { + true => { + debug!( + "version match(rs-tenderdash-abci proto version: {}), tenderdash server proto version {} = {}", + rs_tenderdash_abci_version, tenderdash_version, requirement + ); + true + }, + false => { + error!( + "version mismatch(rs-tenderdash-abci proto version: {}), tenderdash server proto version {} != {}", + rs_tenderdash_abci_version, tenderdash_version, requirement + ); + false + }, + } } #[cfg(test)] mod tests { use super::match_versions; + fn setup_logs() { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::new("trace")) + .try_init() + .ok(); + } + /// test_versions! {} (td_version, our_version, expected); } + // Test if various combinations of versions match + // + // ## Arguments + // + // * `td_version` - Tenderdash version, as returned by the Tenderdash + // * `our_version` - our version - version of rs-tenderdash-abci library + // * `expected` - expected result - true or false + // macro_rules! test_versions { ($($name:ident: $value:expr,)*) => { $( #[test] fn $name() { + setup_logs(); let (td, our, expect) = $value; - assert_eq!(match_versions(td, our),expect); + assert_eq!(match_versions(td, our),expect, + "tenderdash version: {}, rs-tenderdash-abci version: {}, expect: {}", td, our,expect); } )* } } test_versions! { - test_versions_td_newer: ("0.1.2-dev.1", "0.1.0", false), - test_versions_equal: ("0.1.0","0.1.0",true), - test_versions_td_older: ("0.1.0","0.1.2",true), - test_versions_equal_dev: ("0.1.0-dev.1","0.1.0-dev.1",true), - test_versions_our_newer_dev: ("0.1.0-dev.1", "0.1.0-dev.2",true), - test_versions_our_dev:("0.1.0","0.1.0-dev.1",false), + // rs-tenderdash-abci should be able to connect to any Tenderdash that is backwards-compatible + // It means that: + // * MAJOR of Tenderdash must match MAJOR of rs-tenderdash-abci + // * MINOR of Tenderdash must be greater or equal to MINOR of rs-tenderdash-abci + // * PATCH of Tenderdash can be anything + + // MAJOR 0 + + // vesions match + test_major_0: ("0.23.1", "0.23.1", true), + // tenderdash is newer than our library, but it's backwards-compatible + test_major_0_old_minor: ("0.23.1", "0.22.1", false), + // tenderdash patch level is higher than ours; it should not matter + test_major_0_new_patch: ("0.23.2", "0.23.1", true), + // tenderdash patch level is lower than ours; it should not matter + test_major_0_old_patch: ("0.23.0", "0.23.1", true), + // tenderdash is older than our library, it should not match + test_major_0_new_minor: ("0.23.1", "0.24.1", false), + test_major_0_new_major: ("0.23.1", "1.23.1", false), + + // MAJOR 1 + + test_major_1: ("1.23.1", "1.23.1", true), + // tenderdash is newer than our library, but it's backwards-compatible + test_major_1_old_minor: ("1.23.1", "1.22.1", true), + // tenderdash patch level is higher than ours; it should not matter + test_major_1_new_patch: ("1.23.2", "1.23.1", true), + // tenderdash patch level is lower than ours; it should not matter + test_major_1_old_patch: ("1.23.0", "1.23.1", true), + // tenderdash is older than our library, it should not match + test_major_1_new_minor: ("1.23.1", "1.24.1", false), + test_major_1_old_major: ("1.23.1", "0.23.1", false), + + test_dev_td_newer: ("0.1.2-dev.1", "0.1.0", false), + test_dev_equal: ("0.1.0-dev.1","0.1.0-dev.1",true), + test_dev_our_newer_dev: ("0.1.0-dev.1", "0.1.0-dev.2",false), } } diff --git a/abci/src/lib.rs b/abci/src/lib.rs index 656a6158..fc86059f 100644 --- a/abci/src/lib.rs +++ b/abci/src/lib.rs @@ -20,9 +20,17 @@ use std::io; pub use application::{check_version, Application, RequestDispatcher}; use prost::{DecodeError, EncodeError}; -pub use server::{start_server, Server}; +#[allow(deprecated)] +#[cfg(feature = "server")] +pub use server::{start_server, CancellationToken, Server, ServerBuilder, ServerRuntime}; pub use tenderdash_proto as proto; +#[cfg(feature = "crypto")] +pub mod signatures; +#[cfg(feature = "tracing-span")] +/// Create tracing::Span for better logging +pub mod tracing_span; + /// Errors that may happen during protobuf communication #[derive(Debug, thiserror::Error)] pub enum Error { @@ -34,4 +42,10 @@ pub enum Error { Decode(#[from] DecodeError), #[error("cannot encode protobuf message")] Encode(#[from] EncodeError), + #[error("cannot create canonical message: {0}")] + Canonical(String), + #[error("server terminated")] + Cancelled(), + #[error("async runtime error")] + Async(String), } diff --git a/abci/src/server.rs b/abci/src/server.rs index eb70d5bf..f9cd02fa 100644 --- a/abci/src/server.rs +++ b/abci/src/server.rs @@ -1,61 +1,62 @@ //! Tenderdash ABCI Server. mod codec; -mod tcp; -mod unix; +mod generic; +#[cfg(feature = "tcp")] use std::{ - io::{Read, Write}, net::{IpAddr, SocketAddr, SocketAddrV4, SocketAddrV6}, - panic::RefUnwindSafe, str::FromStr, }; -use tracing::{error, info}; +use futures::Future; +#[cfg(feature = "tcp")] +use tokio::net::TcpListener; +#[cfg(feature = "unix")] +use tokio::net::UnixListener; +use tokio::{ + runtime::{Handle, Runtime}, + task::JoinHandle, +}; +pub use tokio_util::sync::CancellationToken; -use self::{tcp::TcpServer, unix::UnixSocketServer}; -use crate::{application::RequestDispatcher, proto::abci, server::codec::Codec, Error}; +use self::generic::GenericServer; +use crate::{application::RequestDispatcher, Error}; -/// The size of the read buffer for each incoming connection to the ABCI -/// server (1MB). -pub(crate) const DEFAULT_SERVER_READ_BUF_SIZE: usize = 1024 * 1024; +#[cfg(not(any(feature = "tcp", feature = "unix")))] +compile_error!("At least one of `tcp` or `unix` features must be enabled"); /// ABCI Server handle. /// /// Use [`Server::handle_connection()`] to accept connection and process all /// traffic in this connection. Each incoming connection will be processed using /// `app`. -pub trait Server: RefUnwindSafe { +pub trait Server { /// Process one incoming connection. /// - /// Returns when the connection is terminated or RequestDispatcher returns - /// error. + /// Returns when the connection is terminated, [CancellationToken::cancel()] + /// is called or RequestDispatcher returns `None`. /// /// It is safe to call this method multiple times after it finishes; /// however, errors must be examined and handled, as the connection - /// should not terminate. - fn handle_connection(&self) -> Result<(), Error>; + /// should not terminate. One exception is [Error::Cancelled], which + /// means server shutdown was requested. + fn next_client(&self) -> Result<(), Error>; + + #[deprecated = "use `next_client()`"] + fn handle_connection(&self) -> Result<(), Error> { + self.next_client() + } } -/// Create new ABCI server and bind to provided address/port or socket. -/// -/// Use [`handle_connection()`] to accept connection and process all traffic in -/// this connection. Each incoming connection will be processed using `app`. -/// -/// # Arguments -/// -/// * `address` - address in URI format, pointing either to TCP address and port -/// (eg. `tcp://0.0.0.0:1234`, `tcp://[::1]:1234`) or Unix socket -/// (`unix:///var/run/abci.sock`) -/// * `app` - request dispatcher, most likely implementation of Application -/// trait -/// -/// -/// # Return +/// ABCI server builder that creates and starts ABCI server /// -/// Returns [`Server`] which provides [`handle_connection()`] method. Call it -/// in a loop to accept and process incoming connections. +/// Create new server with [`ServerBuilder::new()`], configure it as needed, and +/// finalize using [`ServerBuilder::build()`]. This will create and start new +/// ABCI server. /// -/// [`handle_connection()`]: Server::handle_connection() +/// Use [`Server::next_client()`] to accept connection from ABCI client +/// (Tenderdash) and start processing incoming requests. Each incoming +/// connection will be processed using `app`. /// /// # Examples /// @@ -64,75 +65,212 @@ pub trait Server: RefUnwindSafe { /// impl tenderdash_abci::Application for MyAbciApplication {}; /// let app = MyAbciApplication {}; /// let bind_address = "unix:///tmp/abci.sock"; -/// let server = tenderdash_abci::start_server(&bind_address, app).expect("server failed"); +/// let server = tenderdash_abci::ServerBuilder::new(app, &bind_address).build().expect("server failed"); /// loop { -/// server.handle_connection(); +/// if let Err(tenderdash_abci::Error::Cancelled()) = server.next_client() { +/// break; +/// } /// } /// ``` -pub fn start_server<'a, App: RequestDispatcher + 'a, Addr>( - bind_address: Addr, - app: App, -) -> Result, crate::Error> +pub struct ServerBuilder where - Addr: AsRef, + D: RequestDispatcher, { - let app_address = url::Url::parse(bind_address.as_ref()).expect("invalid app address"); - if app_address.scheme() != "tcp" && app_address.scheme() != "unix" { - panic!("app_address must be either tcp:// or unix://"); - } + app: D, + bind_address: String, + cancel: Option, + server_runtime: Option, +} - let server = match app_address.scheme() { - "tcp" => { - Box::new(TcpServer::bind(app, parse_tcp_uri(app_address))?) as Box - }, - "unix" => Box::new(UnixSocketServer::bind( +impl<'a, App: RequestDispatcher + 'a> ServerBuilder { + /// Create new server builder. + /// + /// # Arguments + /// + /// * `address` - address in URI format, pointing either to TCP address and + /// port (eg. `tcp://0.0.0.0:1234`, `tcp://[::1]:1234`) or Unix socket + /// (`unix:///var/run/abci.sock`) + /// * `app` - request dispatcher, most likely implementation of Application + /// trait + pub fn new(app: App, address: &str) -> Self { + Self { app, - app_address.path(), - DEFAULT_SERVER_READ_BUF_SIZE, - )?) as Box, - _ => panic!( - "listen address uses unsupported scheme `{}`", - app_address.scheme() - ), - }; - - Ok(server) -} + bind_address: address.to_string(), + cancel: None, + server_runtime: None, + } + } -/// handle_client accepts one client connection and handles received messages. -pub(crate) fn handle_client( - stream: S, - name: String, - app: &App, - read_buf_size: usize, -) -> Result<(), Error> -where - App: RequestDispatcher, - S: Read + Write, -{ - let mut codec = Codec::new(stream, read_buf_size); - info!("Listening for incoming requests from {}", name); + /// Build and start the ABCI server. + /// + /// # Return + /// + /// Returns [`Server`] which provides [`Server::next_client()`] + /// method. Call it in a loop to accept and process incoming + /// connections. + pub fn build(self) -> Result, crate::Error> { + let bind_address = + url::Url::parse(self.bind_address.as_ref()).expect("invalid bind address"); + if bind_address.scheme() != "tcp" && bind_address.scheme() != "unix" { + panic!("app_address must be either tcp:// or unix://"); + } + let server_runtime: ServerRuntime = self.server_runtime.unwrap_or_default(); - loop { - let Some(request) = codec.receive()? else { - error!("Client {} terminated stream", name); - return Ok(()) - }; + let _guard = server_runtime.handle.enter(); - let Some(response) = app.handle(request.clone()) else { - // `RequestDispatcher` decided to stop receiving new requests: - info!("ABCI Application is shutting down"); - return Ok(()); + // No cancel is defined, so we add some "mock" + let cancel = self.cancel.unwrap_or_default(); + + let server = match bind_address.scheme() { + #[cfg(feature = "tcp")] + "tcp" => Box::new(GenericServer::::bind( + self.app, + parse_tcp_uri(bind_address), + cancel, + server_runtime, + )?) as Box, + #[cfg(feature = "unix")] + "unix" => Box::new(GenericServer::::bind( + self.app, + bind_address.path(), + cancel, + server_runtime, + )?) as Box, + _ => panic!( + "listen address uses unsupported scheme `{}`", + bind_address.scheme() + ), }; - if let Some(abci::response::Value::Exception(ex)) = response.value.clone() { - error!(error = ex.error, ?request, "error processing request") + Ok(server) + } + /// Set a [CancellationToken] token to support graceful shutdown. + /// + /// Call [`CancellationToken::cancel()`] to stop the server gracefully. + /// + /// [`CancellationToken::cancel()`]: tokio_util::sync::CancellationToken::cancel() + pub fn with_cancel_token(self, cancel: CancellationToken) -> Self { + Self { + cancel: Some(cancel), + ..self } + } + /// Set tokio [Runtime](tokio::runtime::Runtime) to use. + /// + /// By default, current tokio runtime is used. If no runtime is active + /// ([Handle::try_current()] returns error), new multi-threaded runtime + /// is started. If this is not what you want, use + /// [ServerBuilder::with_runtime()] to provide handler to correct Tokio + /// runtime. + /// + /// # Example + /// + /// ``` + /// use tokio::runtime::{Handle, Runtime}; + /// use tenderdash_abci::{RequestDispatcher, ServerBuilder, CancellationToken, Application}; + /// + /// // Your custom RequestDispatcher implementation + /// struct MyApp; + /// + /// impl Application for MyApp {} + /// + /// // Create a Tokio runtime + /// let runtime = Runtime::new().unwrap(); + /// let runtime_handle = runtime.handle().clone(); + /// + /// // Create an instance of your RequestDispatcher implementation + /// let app = MyApp; + /// + /// // Create cancellation token + /// let cancel = CancellationToken::new(); + /// # cancel.cancel(); + /// // Create a ServerBuilder instance and set the runtime using with_runtime() + /// + /// let server = ServerBuilder::new(app, "tcp://0.0.0.0:17534") + /// .with_runtime(runtime_handle) + /// .with_cancel_token(cancel) + /// .build(); + /// ``` + /// + /// In this example, we first create a Tokio runtime and get its handle. + /// Then we create an instance of our `MyApp` struct that implements the + /// `RequestDispatcher` trait. We create a `ServerBuilder` instance by + /// calling `new()` with our `MyApp` instance and then use the + /// `with_runtime()` method to set the runtime handle. Finally, you can + /// continue building your server and eventually run it. + /// + /// [Handle::try_current()]: tokio::runtime::Handle::try_current() + pub fn with_runtime(self, runtime_handle: Handle) -> Self { + Self { + server_runtime: Some(ServerRuntime { + _runtime: None, + handle: runtime_handle, + }), + ..self + } + } +} + +/// Server runtime that must be alive for the whole lifespan of the server +pub struct ServerRuntime { + /// Runtime stored here to ensure it is never dropped + _runtime: Option, + pub handle: Handle, +} + +impl ServerRuntime { + pub fn block_on(&self, future: F) -> F::Output { + self.handle.block_on(future) + } - codec.send(response)?; + pub fn spawn(&self, future: F) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + self.handle.spawn(future) } } +impl Default for ServerRuntime { + /// Return default server runtime. + /// + /// If tokio runtime is already initialized and entered, returns handle to + /// it. Otherwise, creates new runtime and returns handle AND the + /// runtime itself. + fn default() -> Self { + match Handle::try_current() { + Ok(runtime_handle) => Self { + handle: runtime_handle, + _runtime: None, + }, + Err(_) => { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(4) + .enable_all() + .build() + .expect("cannot create runtime"); + tracing::trace!("created new runtime"); + Self { + handle: rt.handle().clone(), + _runtime: Some(rt), + } + }, + } + } +} + +#[deprecated = "use `ServerBuilder::new(app, &bind_address).build()` instead"] +pub fn start_server<'a, App: RequestDispatcher + 'a, Addr>( + bind_address: Addr, + app: App, +) -> Result, crate::Error> +where + Addr: AsRef, +{ + ServerBuilder::new(app, bind_address.as_ref()).build() +} +#[cfg(feature = "tcp")] fn parse_tcp_uri(uri: url::Url) -> SocketAddr { let host = uri.host_str().unwrap(); // remove '[' and ']' from ipv6 address, as per https://github.com/servo/rust-url/issues/770 diff --git a/abci/src/server/codec.rs b/abci/src/server/codec.rs index ed4ccd94..8d531e33 100644 --- a/abci/src/server/codec.rs +++ b/abci/src/server/codec.rs @@ -4,124 +4,283 @@ //! //! [tsp]: https://github.com/tendermint/tendermint/blob/v0.34.x/spec/abci/client-server.md#tsp -use std::io::{self, Read, Write}; +use std::{fmt::Debug, sync::Arc}; use bytes::{Buf, BufMut, BytesMut}; -use prost::{DecodeError, EncodeError, Message}; +use futures::{SinkExt, StreamExt}; +use prost::Message; +use proto::abci::{Request, Response}; +use tokio::{ + io::{AsyncRead, AsyncWrite}, + sync::{ + mpsc::{self, Receiver, Sender}, + Mutex, + }, +}; +use tokio_util::{ + codec::{Decoder, Encoder, Framed}, + net::Listener, +}; -use crate::{proto, Error}; +use super::ServerRuntime; +use crate::{proto, CancellationToken, Error}; /// The maximum number of bytes we expect in a varint. We use this to check if /// we're encountering a decoding error for a varint. pub const MAX_VARINT_LENGTH: usize = 16; -/// Allows for iteration over `S` to produce instances of `I`, as well as -/// sending instances of `O`. -pub struct Codec { - stream: S, - // Long-running read buffer - read_buf: BytesMut, - // Fixed-length read window - read_window: Vec, - write_buf: BytesMut, +pub struct Codec { + request_rx: Receiver, + response_tx: Sender, } -impl Codec { - pub fn new(stream: S, read_buf_size: usize) -> Self { +impl<'a> Codec { + pub(crate) fn new( + listener: Arc>, + cancel: CancellationToken, + runtime: &ServerRuntime, + ) -> Self + where + L: Listener + Send + Sync + 'static, + L::Addr: Send + Debug, + L::Io: Send, + { + let (request_tx, request_rx) = mpsc::channel::(1); + let (response_tx, response_rx) = mpsc::channel::(1); + + runtime + .handle + .spawn(Self::worker(listener, request_tx, response_rx, cancel)); + Self { - stream, - read_buf: BytesMut::new(), - read_window: vec![0_u8; read_buf_size], - write_buf: BytesMut::new(), + request_rx, + response_tx, } } -} -impl Codec -where - S: Read + Write, -{ - pub(crate) fn receive(&mut self) -> Result, Error> { - loop { - // Try to decode an incoming message from our buffer first - if let Some(incoming) = decode_length_delimited(&mut self.read_buf)? { - return Ok(Some(incoming)); - } + /// Worker that bridges data between async streams and sync processing code. + /// + /// ## Error handling + /// + /// Any error will cause disconnect + async fn worker( + listener: Arc>, + request_tx: Sender, + response_rx: Receiver, + cancel: CancellationToken, + ) where + L: Listener + Send + Sync, + L::Addr: Debug, + { + let mut listener = listener.lock().await; + tracing::trace!("listening for new connection"); - // If we don't have enough data to decode a message, try to read more - let bytes_read = self.stream.read(self.read_window.as_mut())?; - if bytes_read == 0 { - // The underlying stream terminated - return Ok(None); + let (stream, address) = tokio::select! { + conn = listener.accept() => match conn { + Ok(r) => r, + Err(error) => { + tracing::error!(?error, "cannot accept connection"); + cancel.cancel(); + return; + }, + }, + _ = cancel.cancelled() => return, + }; + + tracing::info!(?address, "accepted connection"); + + let stream = Box::pin(stream); + let codec = Framed::new(stream, Coder {}); + + Self::process_worker_queues(codec, request_tx, response_rx, cancel).await; + } + async fn process_worker_queues( + mut codec: Framed, + request_tx: Sender, + mut response_rx: Receiver, + cancel: CancellationToken, + ) { + loop { + tokio::select! { + // Only read next message if we have capacity in request_tx to process it. + // Otherwise, we might block the codec worker on request_tx.send() and never + // process the next message from the response_rx stream. + request = codec.next(), if request_tx.capacity() > 0 => match request { + Some(Ok(i)) => { + if let Err(error) = request_tx.try_send(i) { + tracing::error!(?error, "unable to forward request for processing"); + cancel.cancel(); + } + }, + Some(Err(error)) => { + tracing::error!(?error, "unable to parse request"); + cancel.cancel(); + }, + None => { + tracing::warn!("client connection terminated"); + cancel.cancel(); + }, + }, + response = response_rx.recv() => match response{ + Some(msg) => { + if let Err(error) = codec.send(msg).await { + tracing::error!(?error, "unable to send response to tenderdash"); + cancel.cancel(); + } + }, + None => { + tracing::warn!("client connection terminated"); + cancel.cancel(); + } + }, + _ = cancel.cancelled() => { + tracing::debug!("codec worker shutting down"); + return; // stop processing + } } - self.read_buf - .extend_from_slice(&self.read_window[..bytes_read]); } } - /// Send a message using this codec. - pub(crate) fn send(&mut self, message: proto::abci::Response) -> Result<(), Error> { - encode_length_delimited(message, &mut self.write_buf)?; - while !self.write_buf.is_empty() { - let bytes_written = self.stream.write(self.write_buf.as_ref())?; - - if bytes_written == 0 { - return Err(Error::Connection(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write to underlying stream", - ))); - } - self.write_buf.advance(bytes_written); + pub fn next(&mut self) -> Option { + self.request_rx.blocking_recv() + } + + pub fn send(&self, value: Response) -> Result<(), Error> { + self.response_tx + .blocking_send(value) + .map_err(|e| Error::Async(e.to_string())) + } +} + +pub struct Coder; + +impl Decoder for Coder { + type Error = Error; + type Item = proto::abci::Request; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + let src_len = src.len(); + + let mut tmp = src.clone().freeze(); + let encoded_len = match prost::encoding::decode_varint(&mut tmp) { + Ok(len) => len, + // We've potentially only received a partial length delimiter + Err(_) if src_len <= MAX_VARINT_LENGTH => return Ok(None), + Err(e) => return Err(e.into()), + }; + let remaining = tmp.remaining() as u64; + if remaining < encoded_len { + // We don't have enough data yet to decode the entire message + Ok(None) + } else { + let delim_len = src_len - tmp.remaining(); + // We only advance the source buffer once we're sure we have enough + // data to try to decode the result. + src.advance(delim_len + (encoded_len as usize)); + + let mut result_bytes = BytesMut::from(tmp.split_to(encoded_len as usize).as_ref()); + let res = proto::abci::Request::decode(&mut result_bytes)?; + + Ok(Some(res)) } + } +} - self.stream.flush()?; +impl Encoder for Coder { + type Error = Error; + fn encode( + &mut self, + message: proto::abci::Response, + dst: &mut BytesMut, + ) -> Result<(), Self::Error> { + let mut buf = BytesMut::new(); + message.encode(&mut buf)?; + + let buf = buf.freeze(); + prost::encoding::encode_varint(buf.len() as u64, dst); + dst.put(buf); Ok(()) } } -/// Encode the given message with a length prefix. -pub fn encode_length_delimited( - message: proto::abci::Response, - mut dst: &mut B, -) -> Result<(), EncodeError> -where - B: BufMut, -{ - let mut buf = BytesMut::new(); - message.encode(&mut buf)?; - - let buf = buf.freeze(); - prost::encoding::encode_varint(buf.len() as u64, &mut dst); - dst.put(buf); - Ok(()) -} +#[cfg(test)] +mod test { + use prost::Message; + use tenderdash_proto::abci; + use tokio::{io::AsyncWriteExt, sync::mpsc}; + use tokio_util::sync::CancellationToken; + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + /// Test if a bug in the codec receiving 2 requests without a response in + /// between is fixed. + async fn test_codec_msg_msg_resp() { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_ansi(true) + .try_init() + .ok(); + + let (request_tx, mut request_rx) = mpsc::channel::(1); + let (response_tx, response_rx) = mpsc::channel::(1); + let cancel = CancellationToken::new(); + + let (mut client, server) = tokio::io::duplex(10240); + + let codec = tokio_util::codec::Framed::new(server, super::Coder {}); + + let worker_cancel = cancel.clone(); + let hdl = tokio::spawn(super::Codec::process_worker_queues( + codec, + request_tx, + response_rx, + worker_cancel, + )); + + // We send 2 requests over the wire + for n_requests in 0..5 { + let encoded = abci::Request { + value: Some(abci::request::Value::Echo(abci::RequestEcho { + message: format!("hello {}", n_requests), + })), + } + .encode_length_delimited_to_vec(); + + client.write_all(&encoded).await.unwrap(); + } + + // Now, wait till the codec has processed the requests + // The bug we fixed was that the codec would not process the second request + // until a response was sent. + // If the bug is still present, the test should report error here. + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + + // Then, we read one request + tracing::debug!("MAIN THREAD: reading request 1"); + request_rx.recv().await.expect("dequeue request 1"); + tracing::debug!("MAIN THREAD: dequeued request 1"); + + // Then, we send a response + tracing::debug!("MAIN THREAD: sending response 1"); + response_tx + .send(abci::Response { + value: Some(abci::response::Value::Echo(abci::ResponseEcho { + message: "hello".to_string(), + })), + }) + .await + .expect("enqueue response 1"); + tracing::debug!("MAIN THREAD: enqueued response 1"); + + // Then, we read second request + tracing::debug!("MAIN THREAD: reading request 2"); + request_rx.recv().await.expect("dequeue request 2"); + tracing::debug!("MAIN THREAD: dequeued request 2"); + + // Success :) -/// Attempt to decode a message of type `M` from the given source buffer. -pub fn decode_length_delimited( - src: &mut BytesMut, -) -> Result, DecodeError> { - let src_len = src.len(); - let mut tmp = src.clone().freeze(); - let encoded_len = match prost::encoding::decode_varint(&mut tmp) { - Ok(len) => len, - // We've potentially only received a partial length delimiter - Err(_) if src_len <= MAX_VARINT_LENGTH => return Ok(None), - Err(e) => return Err(e), - }; - let remaining = tmp.remaining() as u64; - if remaining < encoded_len { - // We don't have enough data yet to decode the entire message - Ok(None) - } else { - let delim_len = src_len - tmp.remaining(); - // We only advance the source buffer once we're sure we have enough - // data to try to decode the result. - src.advance(delim_len + (encoded_len as usize)); - - let mut result_bytes = BytesMut::from(tmp.split_to(encoded_len as usize).as_ref()); - let res = proto::abci::Request::decode(&mut result_bytes)?; - - Ok(Some(res)) + // Cleanup + cancel.cancel(); + hdl.await.unwrap(); } } diff --git a/abci/src/server/generic.rs b/abci/src/server/generic.rs new file mode 100644 index 00000000..75b08d02 --- /dev/null +++ b/abci/src/server/generic.rs @@ -0,0 +1,139 @@ +//! Generic ABCI server +#[cfg(feature = "tcp")] +use std::net::ToSocketAddrs; +use std::{fmt::Debug, sync::Arc}; +#[cfg(feature = "unix")] +use std::{fs, path::Path}; + +#[cfg(feature = "tcp")] +use tokio::net::TcpListener; +#[cfg(feature = "unix")] +use tokio::net::UnixListener; +use tokio::sync::Mutex; +use tokio_util::net::Listener; +use tracing::info; + +use super::{codec::Codec, Server, ServerRuntime}; +use crate::{CancellationToken, Error, RequestDispatcher}; + +/// A TCP-based server for serving a specific ABCI application. +/// +/// Only one incoming connection is handled at a time. +pub(super) struct GenericServer { + app: App, + listener: Arc>, + cancel: CancellationToken, + runtime: ServerRuntime, +} + +impl GenericServer { + fn new(app: App, listener: L, cancel: CancellationToken, runtime: ServerRuntime) -> Self { + Self { + app, + listener: Arc::new(Mutex::new(listener)), + cancel, + runtime, + } + } +} + +#[cfg(feature = "tcp")] +impl GenericServer { + pub(super) fn bind( + app: App, + addr: Addr, + cancel: CancellationToken, + runtime: ServerRuntime, + ) -> Result + where + Addr: ToSocketAddrs, + { + let std_listener = std::net::TcpListener::bind(addr)?; + std_listener.set_nonblocking(true)?; + let listener = TcpListener::from_std(std_listener)?; + + // let listener = TcpListener::bind(addr).await?; + let local_addr = listener.local_addr()?; + info!( + "ABCI TCP server {} with proto {} running at {}", + env!("CARGO_PKG_VERSION"), + tenderdash_proto::ABCI_VERSION, + local_addr + ); + + let server = Self::new(app, listener, cancel, runtime); + Ok(server) + } +} + +#[cfg(feature = "unix")] +impl GenericServer { + pub(super) fn bind( + app: App, + addr: Addr, + cancel: CancellationToken, + runtime: ServerRuntime, + ) -> Result + where + Addr: AsRef, + { + let socket_file = addr.as_ref(); + fs::remove_file(socket_file).ok(); + + let listener = UnixListener::bind(addr)?; + + // let listener = TcpListener::bind(addr).await?; + let local_addr = listener.local_addr()?; + info!( + "ABCI Unix server {} with proto {} running at {:?}", + env!("CARGO_PKG_VERSION"), + tenderdash_proto::ABCI_VERSION, + local_addr + ); + + let server = Self::new(app, listener, cancel, runtime); + Ok(server) + } +} + +impl<'a, App: RequestDispatcher + 'a, L: Listener> Server for GenericServer +where + L: Listener + Send + Sync + 'static, + L::Addr: Send + Debug, + L::Io: Send, +{ + fn next_client(&self) -> Result<(), Error> { + // we create child token to stop the codec but not kill the app + let cancel_token = self.cancel.child_token(); + let listener = Arc::clone(&self.listener); + + let mut codec = Codec::new(listener, cancel_token.clone(), &self.runtime); + while !cancel_token.is_cancelled() { + let Some(request) = codec.next() else { + tracing::error!("client terminated stream"); + return Ok(()); + }; + + let Some(response) = self.app.handle(request.clone()) else { + // `RequestDispatcher` decided to stop receiving new requests: + info!("ABCI Application is shutting down"); + return Ok(()); + }; + + if let Some(crate::proto::abci::response::Value::Exception(ex)) = response.value.clone() + { + tracing::error!(error = ex.error, ?request, "error processing request") + }; + + codec.send(response)?; + } + + Err(Error::Cancelled()) + } +} + +impl Drop for GenericServer { + fn drop(&mut self) { + tracing::debug!("ABCI server shut down") + } +} diff --git a/abci/src/server/tcp.rs b/abci/src/server/tcp.rs deleted file mode 100644 index c37eaab0..00000000 --- a/abci/src/server/tcp.rs +++ /dev/null @@ -1,44 +0,0 @@ -//! ABCI application server interface. - -use std::net::{TcpListener, ToSocketAddrs}; - -use tracing::info; - -use super::{handle_client, Server, DEFAULT_SERVER_READ_BUF_SIZE}; -use crate::{Error, RequestDispatcher}; - -/// A TCP-based server for serving a specific ABCI application. -/// -/// Only one incoming connection is handled at a time. -pub(super) struct TcpServer { - app: App, - listener: TcpListener, -} - -impl TcpServer { - pub(super) fn bind(app: App, addr: Addr) -> Result, Error> - where - Addr: ToSocketAddrs, - { - let listener = TcpListener::bind(addr)?; - let local_addr = listener.local_addr()?; - info!( - "ABCI TCP server {} with proto {} running at {}", - env!("CARGO_PKG_VERSION"), - tenderdash_proto::ABCI_VERSION, - local_addr - ); - let server = TcpServer { app, listener }; - Ok(server) - } -} - -impl Server for TcpServer { - fn handle_connection(&self) -> Result<(), Error> { - let (stream, addr) = self.listener.accept()?; - let addr = addr.to_string(); - info!("Incoming connection from: {}", addr); - - handle_client(stream, addr, &self.app, DEFAULT_SERVER_READ_BUF_SIZE) - } -} diff --git a/abci/src/server/unix.rs b/abci/src/server/unix.rs deleted file mode 100644 index c2d62440..00000000 --- a/abci/src/server/unix.rs +++ /dev/null @@ -1,56 +0,0 @@ -//! ABCI application server interface. - -use std::{fs, os::unix::net::UnixListener, path::Path}; - -use tracing::info; - -use super::Server; -use crate::{Error, RequestDispatcher}; - -/// A Unix socket-based server for serving a specific ABCI application. -pub(super) struct UnixSocketServer { - app: App, - listener: UnixListener, - read_buf_size: usize, -} - -impl UnixSocketServer { - pub(super) fn bind

( - app: App, - socket_file: P, - read_buf_size: usize, - ) -> Result, Error> - where - P: AsRef, - { - let socket_file = socket_file.as_ref(); - fs::remove_file(socket_file).ok(); - - let listener = UnixListener::bind(socket_file)?; - info!( - "ABCI Unix server {} with proto {} running at {:?}", - env!("CARGO_PKG_VERSION"), - tenderdash_proto::ABCI_VERSION, - socket_file.to_str().expect("wrong socket path") - ); - - let server = UnixSocketServer { - app, - listener, - read_buf_size, - }; - Ok(server) - } -} - -impl Server for UnixSocketServer { - fn handle_connection(&self) -> Result<(), Error> { - // let listener = self.listener; - let stream = self.listener.accept()?; - let name = String::from(""); - - info!("Incoming Unix connection"); - - super::handle_client(stream.0, name, &self.app, self.read_buf_size) - } -} diff --git a/abci/src/signatures.rs b/abci/src/signatures.rs new file mode 100644 index 00000000..649fb9e1 --- /dev/null +++ b/abci/src/signatures.rs @@ -0,0 +1,603 @@ +//! Digital signature processing +//! +//! This module contains code for processing digital signatures, including +//! calculating message hash to be signed, and calculating signature digest. +//! +//! The code in this module is based on Tenderdash implementation. +//! +//! Two main traits are defined: +//! - [Signable] - for objects that can be signed/verified by Tenderdash. +//! - [Hashable] - for objects that can be serialized and hashed by Tenderdash. +//! +//! All [Signable] objects are also [Hashable], but not vice versa. +//! For example, [StateId] is [Hashable], but not [Signable], as it is only +//! part of some other signed objects. +//! +//! When signing or verifying signature, use [Signable::calculate_sign_hash] to +//! calculate signature digest and provide it as a digest directly to the +//! signature or verification function. + +use std::{ + string::{String, ToString}, + vec::Vec, +}; + +use bytes::BufMut; +use prost::Message; +use tenderdash_proto::types::CanonicalVote; + +use crate::{ + proto::types::{ + BlockId, CanonicalBlockId, CanonicalVoteExtension, Commit, SignedMsgType, StateId, Vote, + VoteExtension, VoteExtensionType, + }, + Error, +}; + +const VOTE_REQUEST_ID_PREFIX: &str = "dpbvote"; +const VOTE_EXTENSION_REQUEST_ID_PREFIX: &str = "dpevote"; + +/// Object that can be signed/verified by Tenderdash. +pub trait Signable: Hashable { + #[deprecated = "replaced by calculate_sign_hash() to unify naming between core, platform and tenderdash"] + fn sign_digest( + &self, + chain_id: &str, + quorum_type: u8, + quorum_hash: &[u8; 32], + height: i64, + round: i32, + ) -> Result, Error> { + self.calculate_sign_hash(chain_id, quorum_type, quorum_hash, height, round) + } + + /// Returns message hash that should be provided directly to a + /// signing/verification function. + fn calculate_sign_hash( + &self, + chain_id: &str, + quorum_type: u8, + quorum_hash: &[u8; 32], + height: i64, + round: i32, + ) -> Result, Error>; +} + +impl Signable for Commit { + fn calculate_sign_hash( + &self, + chain_id: &str, + quorum_type: u8, + quorum_hash: &[u8; 32], + + height: i64, + round: i32, + ) -> Result, Error> { + if self.quorum_hash.ne(quorum_hash) { + return Err(Error::Canonical("quorum hash mismatch".to_string())); + } + + let request_id = sign_request_id(VOTE_REQUEST_ID_PREFIX, height, round); + let sign_bytes_hash = self.calculate_msg_hash(chain_id, height, round)?; + + let digest = sign_hash( + quorum_type, + quorum_hash, + request_id[..] + .try_into() + .expect("invalid request ID length"), + &sign_bytes_hash, + ); + + // TODO: Remove once withdrawals are stable + tracing::trace!( + digest=hex::encode(&digest), + ?quorum_type, + quorum_hash=hex::encode(quorum_hash), + request_id=hex::encode(request_id), + commit=?self, "commit digest"); + + Ok(digest) + } +} + +impl Signable for CanonicalVote { + fn calculate_sign_hash( + &self, + chain_id: &str, + quorum_type: u8, + quorum_hash: &[u8; 32], + + height: i64, + round: i32, + ) -> Result, Error> { + let request_id = sign_request_id(VOTE_REQUEST_ID_PREFIX, height, round); + let sign_bytes_hash = self.calculate_msg_hash(chain_id, height, round)?; + + let digest = sign_hash( + quorum_type, + quorum_hash, + request_id[..] + .try_into() + .expect("invalid request ID length"), + &sign_bytes_hash, + ); + + // TODO: Remove once withdrawals are stable + tracing::trace!( + digest=hex::encode(&digest), + ?quorum_type, + quorum_hash=hex::encode(quorum_hash), + request_id=hex::encode(request_id), + vote=?self, "canonical vote digest"); + + Ok(digest) + } +} + +impl Signable for VoteExtension { + fn calculate_sign_hash( + &self, + chain_id: &str, + quorum_type: u8, + quorum_hash: &[u8; 32], + height: i64, + round: i32, + ) -> Result, Error> { + let (request_id, sign_bytes_hash) = match self.r#type() { + VoteExtensionType::ThresholdRecover => { + let request_id = sign_request_id(VOTE_EXTENSION_REQUEST_ID_PREFIX, height, round); + let sign_bytes_hash = self.calculate_msg_hash(chain_id, height, round)?; + + (request_id, sign_bytes_hash) + }, + + VoteExtensionType::ThresholdRecoverRaw => { + let mut sign_bytes_hash = self.extension.clone(); + sign_bytes_hash.reverse(); + + let request_id = self.sign_request_id.clone().unwrap_or_default(); + let request_id = if request_id.is_empty() { + sign_request_id(VOTE_EXTENSION_REQUEST_ID_PREFIX, height, round) + } else { + // we do double-sha256, and then reverse bytes + let mut request_id = lhash::sha256(&lhash::sha256(&request_id)); + request_id.reverse(); + request_id.to_vec() + }; + + (request_id, sign_bytes_hash) + }, + + VoteExtensionType::Default => unimplemented!( + "vote extension of type {:?} cannot be signed", + self.r#type() + ), + }; + let sign_hash = sign_hash( + quorum_type, + quorum_hash, + request_id[..] + .try_into() + .expect("invalid request ID length"), + &sign_bytes_hash, + ); + + // TODO: Remove once withdrawals are stable + tracing::trace!( + digest=hex::encode(&sign_hash), + ?quorum_type, + quorum_hash=hex::encode(quorum_hash), + request_id=hex::encode(request_id), + vote_extension=?self, "vote extension sign hash"); + + Ok(sign_hash) + } +} + +fn sign_request_id(prefix: &str, height: i64, round: i32) -> Vec { + let mut buf: Vec = Vec::from(prefix.as_bytes()); + buf.put_i64_le(height); + buf.put_i32_le(round); + + lhash::sha256(&buf).to_vec() +} + +fn sign_hash( + quorum_type: u8, + quorum_hash: &[u8; 32], + request_id: &[u8; 32], + sign_bytes_hash: &[u8], +) -> Vec { + let mut quorum_hash = quorum_hash.to_vec(); + quorum_hash.reverse(); + + let mut request_id = request_id.to_vec(); + request_id.reverse(); + + let mut sign_bytes_hash = sign_bytes_hash.to_vec(); + sign_bytes_hash.reverse(); + + let mut buf = Vec::::new(); + + buf.put_u8(quorum_type); + buf.append(&mut quorum_hash); + buf.append(&mut request_id); + buf.append(&mut sign_bytes_hash); + + let hash = lhash::sha256(&buf); + // Note: In bls-signatures for go, we do double-hashing, so we need to also do + // it here. See: https://github.com/dashpay/bls-signatures/blob/9329803969fd325dc0d5c9029ab15669d658ed5d/go-bindings/threshold.go#L62 + lhash::sha256(&hash).to_vec() +} + +/// Calculate hash (sha256) of the data, using algorithms used by +/// Tenderdash. +pub trait Hashable { + /// Generate hash of data to sign + fn calculate_msg_hash(&self, chain_id: &str, height: i64, round: i32) + -> Result, Error>; +} + +impl Hashable for T { + /// Generate hash of data, to be used in signature process. + /// + /// Generates hash of the m + fn calculate_msg_hash( + &self, + chain_id: &str, + height: i64, + round: i32, + ) -> Result, Error> { + let sb = self.sign_bytes(chain_id, height, round)?; + let result = lhash::sha256(&sb); + Ok(Vec::from(result)) + } +} + +/// Marshals data into bytes to be used in signature process. +/// +/// After marhaling, the bytes are hashed and then +trait SignBytes { + /// Marshal into byte buffer, representing bytes to be used in signature + /// process. + /// + /// See also: [SignDigest]. + fn sign_bytes(&self, chain_id: &str, height: i64, round: i32) -> Result, Error>; +} + +impl SignBytes for StateId { + fn sign_bytes(&self, _chain_id: &str, _height: i64, _round: i32) -> Result, Error> { + let mut buf = Vec::new(); + self.encode_length_delimited(&mut buf) + .map_err(Error::Encode)?; + + Ok(buf.to_vec()) + } +} + +impl SignBytes for BlockId { + fn sign_bytes(&self, _chain_id: &str, _height: i64, _round: i32) -> Result, Error> { + // determine if block id is zero + if self.hash.is_empty() + && (self.part_set_header.is_none() + || self.part_set_header.as_ref().unwrap().hash.is_empty()) + && self.state_id.is_empty() + { + return Ok(Vec::::new()); + } + + let part_set_header = self.part_set_header.clone().unwrap_or_default(); + + let block_id = CanonicalBlockId { + hash: self.hash.clone(), + part_set_header: Some(crate::proto::types::CanonicalPartSetHeader { + total: part_set_header.total, + hash: part_set_header.hash, + }), + }; + let mut buf = Vec::new(); + block_id + .encode_length_delimited(&mut buf) + .map_err(Error::Encode)?; + + Ok(buf) + } +} + +impl SignBytes for Vote { + fn sign_bytes(&self, chain_id: &str, height: i64, round: i32) -> Result, Error> { + if height != self.height || round != self.round { + return Err(Error::Canonical(String::from("vote height/round mismatch"))); + } + + let block_id = self + .block_id + .clone() + .ok_or(Error::Canonical(String::from("missing vote.block id")))?; + + let block_id_hash = block_id.calculate_msg_hash(chain_id, height, round)?; + let state_id_hash = block_id.state_id; + + let canonical = CanonicalVote { + block_id: block_id_hash, + state_id: state_id_hash, + chain_id: chain_id.to_string(), + height, + round: round as i64, + r#type: self.r#type, + }; + + canonical.sign_bytes(chain_id, height, round) + } +} + +impl SignBytes for Commit { + fn sign_bytes(&self, chain_id: &str, height: i64, round: i32) -> Result, Error> { + if height != self.height || round != self.round { + return Err(Error::Canonical(String::from( + "commit height/round mismatch", + ))); + } + + let block_id = self + .block_id + .clone() + .ok_or(Error::Canonical(String::from("missing vote.block id")))?; + + let state_id_hash = block_id.state_id.clone(); + let block_id_hash = block_id.calculate_msg_hash(chain_id, height, round)?; + + let canonical = CanonicalVote { + block_id: block_id_hash, + state_id: state_id_hash, + chain_id: chain_id.to_string(), + height, + round: round as i64, + r#type: SignedMsgType::Precommit.into(), + }; + + canonical.sign_bytes(chain_id, height, round) + } +} + +impl SignBytes for CanonicalVote { + fn sign_bytes(&self, chain_id: &str, height: i64, round: i32) -> Result, Error> { + if height != self.height || (round as i64) != self.round { + return Err(Error::Canonical(String::from( + "commit height/round mismatch", + ))); + } + + // we just use some rough guesstimate of intial capacity for performance + let mut buf = Vec::with_capacity(100); + + // Based on Tenderdash implementation in + // https://github.com/dashpay/tenderdash/blob/bcb623bcf002ac54b26ed1324b98116872dd0da7/proto/tendermint/types/types.go#L56 + + buf.put_i32_le(self.r#type().into()); // 4 bytes + buf.put_i64_le(height); // 8 bytes + buf.put_i64_le(round as i64); // 8 bytes + + buf.extend(&self.block_id); // 32 bytes + buf.extend(&self.state_id); // 32 bytes + if buf.len() != 4 + 8 + 8 + 32 + 32 { + return Err(Error::Canonical( + "cannot encode sign bytes: length of input data is invalid".to_string(), + )); + } + buf.put(chain_id.as_bytes()); + + // TODO: Remove once withdrawals are stable + tracing::trace!( + sign_bytes=hex::encode(&buf), + height,round, + vote=?self, "vote/commit sign bytes calculated"); + + Ok(buf.to_vec()) + } +} + +impl SignBytes for VoteExtension { + fn sign_bytes(&self, chain_id: &str, height: i64, round: i32) -> Result, Error> { + match self.r#type() { + VoteExtensionType::ThresholdRecover => { + let ve = CanonicalVoteExtension { + chain_id: chain_id.to_string(), + extension: self.extension.clone(), + height, + round: round as i64, + r#type: self.r#type, + }; + + Ok(ve.encode_length_delimited_to_vec()) + }, + VoteExtensionType::ThresholdRecoverRaw => Ok(self.extension.to_vec()), + _ => Err(Error::Canonical(format!( + "unimplemented: vote extension of type {:?} cannot be signed", + self.r#type() + ))), + } + } +} + +#[cfg(test)] +pub mod tests { + use std::{string::ToString, vec::Vec}; + + use super::SignBytes; + use crate::{ + proto::types::{ + Commit, PartSetHeader, SignedMsgType, Vote, VoteExtension, VoteExtensionType, + }, + signatures::Signable, + }; + + #[test] + /// Compare sign bytes for Vote with sign bytes generated by Tenderdash and + /// put into `expect_sign_bytes`. + fn vote_sign_bytes() { + let h = [1u8, 2, 3, 4].repeat(8); + + let state_id_hash = + hex::decode("d7509905b5407ee72dadd93b4ae70a24ad8a7755fc677acd2b215710a05cfc47") + .unwrap(); + let expect_sign_bytes = hex::decode("0100000001000000000000000200000000000000fb\ + 7c89bf010a91d50f890455582b7fed0c346e53ab33df7da0bcd85c10fa92ead7509905b5407ee72dadd93b\ + 4ae70a24ad8a7755fc677acd2b215710a05cfc47736f6d652d636861696e") + .unwrap(); + + let vote = Vote { + r#type: SignedMsgType::Prevote as i32, + height: 1, + round: 2, + block_id: Some(crate::proto::types::BlockId { + hash: h.clone(), + part_set_header: Some(PartSetHeader { + total: 1, + hash: h.clone(), + }), + state_id: state_id_hash, + }), + ..Default::default() + }; + let chain_id = "some-chain".to_string(); + let height = vote.height; + let round = vote.round; + + let actual = vote.sign_bytes(&chain_id, height, round).unwrap(); + + assert_eq!(expect_sign_bytes, actual); + } + + #[test] + fn commit_sign_bytes() { + let h = [1u8, 2, 3, 4].repeat(8); + + let state_id_hash = + hex::decode("d7509905b5407ee72dadd93b4ae70a24ad8a7755fc677acd2b215710a05cfc47") + .unwrap(); + let expect_sign_bytes = hex::decode("0200000001000000000000000200000000000000fb7c89bf010a91d5\ + 0f890455582b7fed0c346e53ab33df7da0bcd85c10fa92ead7509905b5407ee72dadd93b4ae70a24ad8a7755fc677acd2b215710\ + a05cfc47736f6d652d636861696e") + .unwrap(); + + let commit = Commit { + height: 1, + round: 2, + block_id: Some(crate::proto::types::BlockId { + hash: h.clone(), + part_set_header: Some(PartSetHeader { + total: 1, + hash: h.clone(), + }), + state_id: state_id_hash, + }), + ..Default::default() + }; + let chain_id = "some-chain".to_string(); + let height = commit.height; + let round = commit.round; + + let actual = commit.sign_bytes(&chain_id, height, round).unwrap(); + + assert_eq!(expect_sign_bytes, actual); + } + + #[test] + fn vote_extension_threshold_sign_bytes() { + let ve = VoteExtension { + extension: Vec::from([1u8, 2u8, 3u8, 4u8, 5u8, 6u8, 7u8, 8u8]), + r#type: VoteExtensionType::ThresholdRecover.into(), + signature: Default::default(), + sign_request_id: None, + }; + + let chain_id = "some-chain".to_string(); + let height = 1; + let round = 2; + + let expect_sign_bytes = hex::decode( + "2a0a080102030405060708110100000000000000190200000000000000220a736f6d652d636861696e2801", + ) + .unwrap(); + + let actual = ve.sign_bytes(&chain_id, height, round).unwrap(); + + assert_eq!(expect_sign_bytes, actual); + } + + /// test vector for threshold-raw vote extensions + /// + /// Returns expected sig hash and vote extension + fn ve_threshold_raw() -> ([u8; 32], VoteExtension) { + let ve = VoteExtension { + extension: [1, 2, 3, 4, 5, 6, 7, 8].repeat(4), + r#type: VoteExtensionType::ThresholdRecoverRaw.into(), + signature: Default::default(), + sign_request_id: Some("dpevote-someSignRequestID".as_bytes().to_vec()), + }; + let expected_sign_hash: [u8; 32] = [ + 0xe, 0x88, 0x8d, 0xa8, 0x97, 0xf1, 0xc0, 0xfd, 0x6a, 0xe8, 0x3b, 0x77, 0x9b, 0x5, 0xdd, + 0x28, 0xc, 0xe2, 0x58, 0xf6, 0x4c, 0x86, 0x1, 0x34, 0xfa, 0x4, 0x27, 0xe1, 0xaa, 0xab, + 0x1a, 0xde, + ]; + + (expected_sign_hash, ve) + } + + #[test] + fn test_ve_threshold_raw_sign_bytes() { + let (_, ve) = ve_threshold_raw(); + let expected_sign_bytes = ve.extension.clone(); + + // chain_id, height and round are unused + let chain_id = String::new(); + let height = -1; + let round = -1; + + let actual = ve.sign_bytes(&chain_id, height, round).unwrap(); + + assert_eq!(expected_sign_bytes, actual); + } + + #[test] + fn test_sign_digest() { + let quorum_hash: [u8; 32] = + hex::decode("6A12D9CF7091D69072E254B297AEF15997093E480FDE295E09A7DE73B31CEEDD") + .unwrap() + .try_into() + .unwrap(); + + let request_id = super::sign_request_id(super::VOTE_REQUEST_ID_PREFIX, 1001, 0); + let request_id = request_id[..].try_into().unwrap(); + + let sign_bytes_hash = + hex::decode("0CA3D5F42BDFED0C4FDE7E6DE0F046CC76CDA6CEE734D65E8B2EE0E375D4C57D") + .unwrap(); + + let expect_sign_hash = + hex::decode("DA25B746781DDF47B5D736F30B1D9D0CC86981EEC67CBE255265C4361DEF8C2E") + .unwrap(); + + let sign_hash = super::sign_hash(100, &quorum_hash, request_id, &sign_bytes_hash); + assert_eq!(expect_sign_hash, sign_hash); // 194,4 + } + + #[test] + fn test_ve_threshold_raw_sign_digest() { + const QUORUM_TYPE: u8 = 106; + let quorum_hash: [u8; 32] = [8u8, 7, 6, 5, 4, 3, 2, 1] + .repeat(4) + .try_into() + .expect("invalid quorum hash length"); + let (expected_sign_hash, ve) = ve_threshold_raw(); + + // height, round, chain id are not used in sign digest for threshold-raw + let sign_hash = ve + .calculate_sign_hash("", QUORUM_TYPE, &quorum_hash, -1, -1) + .expect("sign digest failed"); + + assert_eq!(sign_hash, expected_sign_hash); + } +} diff --git a/abci/src/tracing_span.rs b/abci/src/tracing_span.rs new file mode 100644 index 00000000..96d58fad --- /dev/null +++ b/abci/src/tracing_span.rs @@ -0,0 +1,97 @@ +use tenderdash_proto::abci::request::Value; +use tracing::Level; + +const SPAN_NAME: &str = "abci"; +const LEVEL: Level = Level::ERROR; + +macro_rules! block_span { + ($request: expr, $endpoint:expr, $request_id:expr) => { + tracing::span!( + LEVEL, + SPAN_NAME, + endpoint = $endpoint, + request_id = $request_id, + height = $request.height, + round = $request.round + ) + }; +} +/// Creates a new span for tracing. +/// +/// This function creates a new `tracing::span::EnteredSpan` based on the +/// provided request. It uses the request to determine the endpoint and includes +/// a unique request ID in the span. +/// +/// The level of the span is set to ERROR, so it will be included on all log +/// levels. +/// +/// # Arguments +/// +/// * `request` - A value that can be converted into a `Value`. Depending on the +/// specific variant of `Value`, additional information like height, round, or +/// path might be included in the span. +/// +/// # Returns +/// +/// An entered span which represents an active or entered span state. +/// +/// # Examples +/// +/// ``` +/// # use tenderdash_proto::abci::{RequestInfo, request}; +/// # use tenderdash_abci::tracing_span::span; +/// +/// let request = request::Value::Info(RequestInfo::default()); +/// let span = span(request); +/// ``` +pub fn span(request: T) -> tracing::span::EnteredSpan +where + T: Into, +{ + let value = request.into(); + + let endpoint = abci_method_name(&value); + let request_id = uuid::Uuid::new_v4().to_string(); + + let span = match value { + Value::Info(_r) => tracing::span!(LEVEL, SPAN_NAME, endpoint, request_id), + Value::InitChain(_r) => { + tracing::span!(LEVEL, SPAN_NAME, endpoint, request_id) + }, + Value::PrepareProposal(r) => block_span!(r, endpoint, request_id), + Value::ProcessProposal(r) => block_span!(r, endpoint, request_id), + Value::ExtendVote(r) => block_span!(r, endpoint, request_id), + Value::VerifyVoteExtension(r) => block_span!(r, endpoint, request_id), + Value::FinalizeBlock(r) => block_span!(r, endpoint, request_id), + Value::CheckTx(_r) => { + tracing::span!(LEVEL, SPAN_NAME, endpoint, request_id) + }, + Value::Query(r) => { + tracing::span!(LEVEL, SPAN_NAME, endpoint, request_id, path = r.path) + }, + _ => tracing::span!(LEVEL, SPAN_NAME, endpoint, request_id), + }; + + span.entered() +} + +fn abci_method_name(request: &Value) -> String { + match request { + Value::ApplySnapshotChunk(_) => "ApplySnapshotChunk", + Value::CheckTx(_) => "CheckTx", + Value::Echo(_) => "Echo", + Value::ExtendVote(_) => "ExtendVote", + Value::FinalizeBlock(_) => "FinalizeBlock", + Value::Flush(_) => "Flush", + Value::Info(_) => "Info", + Value::InitChain(_) => "InitChain", + Value::ListSnapshots(_) => "ListSnapshots", + Value::LoadSnapshotChunk(_) => "LoadSnapshotChunk", + Value::OfferSnapshot(_) => "OfferSnapshot", + Value::PrepareProposal(_) => "PrepareProposal", + Value::ProcessProposal(_) => "ProcessProposal", + Value::Query(_) => "Query", + Value::VerifyVoteExtension(_) => "VerifyVoteExtension", + } + .to_string() +} diff --git a/abci/tests/common/docker.rs b/abci/tests/common/docker.rs index 61cc9216..1645dc44 100644 --- a/abci/tests/common/docker.rs +++ b/abci/tests/common/docker.rs @@ -6,7 +6,8 @@ use bollard::{ Docker, API_DEFAULT_VERSION, }; use futures::StreamExt; -use tokio::{io::AsyncWriteExt, runtime::Runtime, time::timeout}; +use tenderdash_abci::ServerRuntime; +use tokio::{io::AsyncWriteExt, time::timeout}; use tracing::{debug, error, info}; use url::Url; @@ -16,7 +17,7 @@ pub struct TenderdashDocker { name: String, docker: Docker, image: String, - runtime: Runtime, + runtime: ServerRuntime, } impl TenderdashDocker { /// new() creates and starts new Tenderdash docker container for provided @@ -31,29 +32,28 @@ impl TenderdashDocker { /// /// * `tag` - Docker tag to use; provide empty string to use default /// * `app_address` - address of ABCI app server; for example, - /// `tcp://172.17.0.1:4567`, `tcp://[::ffff:ac11:1]:5678` or - /// `unix:///path/to/file` + /// `tcp://172.17.0.1:4567`, `tcp://[::ffff:ac11:1]:5678`, + /// `grpc://172.17.01:5678` or `unix:///path/to/file` pub(crate) fn new( container_name: &str, tag: Option<&str>, app_address: &str, ) -> TenderdashDocker { // let tag = String::from(tenderdash_proto::VERSION); - let tag = match tag { - None => tenderdash_proto::meta::TENDERDASH_VERSION, - Some("") => tenderdash_proto::meta::TENDERDASH_VERSION, - Some(tag) => tag, + let tag = match tag.unwrap_or_default() { + "" => tenderdash_proto::meta::TENDERDASH_VERSION, + tag => tag, }; let app_address = url::Url::parse(app_address).expect("invalid app address"); - if app_address.scheme() != "tcp" && app_address.scheme() != "unix" { - panic!("app_address must be either tcp:// or unix://"); + if app_address.scheme() != "tcp" + && app_address.scheme() != "unix" + && app_address.scheme() != "grpc" + { + panic!("app_address must be either grpc://, tcp:// or unix://"); } - let runtime = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .expect("cannot initialize tokio runtime"); + let runtime = tenderdash_abci::ServerRuntime::default(); info!("Starting Tenderdash docker container"); @@ -153,14 +153,26 @@ impl TenderdashDocker { None }; - let app_address = app_address.to_string().replace("/", "\\/"); + let (abci, app_address) = match app_address.scheme() { + "grpc" => { + let address = app_address + .to_string() + .replace("grpc://", "") + .replace('/', "\\/"); + ("grpc", address) + }, + _ => ("socket", app_address.to_string().replace('/', "\\/")), + }; debug!("Tenderdash will connect to ABCI address: {}", app_address); let container_config = Config { image: Some(self.image.clone()), - env: Some(vec![format!("PROXY_APP={}", app_address)]), + env: Some(vec![ + format!("PROXY_APP={}", app_address), + format!("ABCI={}", abci), + ]), host_config: Some(HostConfig { - binds: binds, + binds, ..Default::default() }), ..Default::default() @@ -215,7 +227,7 @@ impl TenderdashDocker { let mut dest = tokio::io::BufWriter::new(stderror); let mut logs = docker.logs( - &id, + id, Some(bollard::container::LogsOptions { follow: false, stdout: true, @@ -264,11 +276,14 @@ impl Drop for TenderdashDocker { } } } + /// Use custom panic handler to dump logs on panic #[allow(dead_code)] pub fn setup_td_logs_panic(td_docker: &Arc) { let weak_ref = Arc::downgrade(td_docker); std::panic::set_hook(Box::new(move |_| { - weak_ref.upgrade().map(|td| td.print_logs()); + if let Some(td) = weak_ref.upgrade() { + td.print_logs() + } })); } diff --git a/abci/tests/grpc.rs b/abci/tests/grpc.rs new file mode 100644 index 00000000..b75516c3 --- /dev/null +++ b/abci/tests/grpc.rs @@ -0,0 +1,148 @@ +//! Test gRPC server for ABCI protocol. +//! +//! This test verifies that the gRPC server generated with tonic as part of the +//! tenderdash-proto crate can successfully connect to Tenderdash instance. +//! +//! This test should be implemented in the tenderdash-proto crate; however, it +//! is implemented here to use already existing docker container testing +//! logic. +#![cfg(feature = "grpc")] + +use std::sync::Arc; + +use tenderdash_abci::{ + proto::abci::{ + abci_application_server::AbciApplication, RequestEcho, RequestInfo, ResponseInfo, + }, + CancellationToken, +}; +mod common; +use tenderdash_abci::proto; +use tonic::{async_trait, Response, Status}; + +#[cfg(feature = "docker-tests")] +#[tokio::test] +/// Test server listening on ipv4 address. +/// +/// See [tcp_server_test()]. +async fn test_ipv4_server() { + // we assume the host uses default Docker network configuration, with the host + // using 172.17.0.1 + let bind_address = "172.17.0.1:1234".to_string(); + + grpc_server_test("v4", bind_address.as_str()).await; +} + +#[cfg(feature = "docker-tests")] +#[tokio::test] +/// Test server listening on ipv6 address. +/// +/// See [tcp_server_test()]. +async fn test_ipv6_server() { + // we assume the host uses default Docker network configuration, with the host + // using 172.17.0.1. This is IPv6 notation of the IPv4 address. + let bind_address = "[::ffff:ac11:1]:5678".to_string(); + + grpc_server_test("v6", bind_address.as_str()).await; +} + +#[cfg(feature = "docker-tests")] +/// Feature: ABCI App TCO server +/// +/// * Given that we have Tenderdash instance using TCP connection to communicate +/// with ABCI APP +/// * When we estabilish connection with Tenderdash +/// * Then Tenderdash sends Info request +async fn grpc_server_test(test_name: &str, bind_address: &str) { + use core::panic; + + use proto::abci::abci_application_server::AbciApplicationServer; + use tonic::transport::Server; + + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::new("debug")) + .with_ansi(true) + .try_init() + .ok(); + + let cancel = CancellationToken::new(); + let app = TestApp { + cancel: cancel.clone(), + }; + + let addr = bind_address.parse().expect("address must be valid"); + let server_cancel = cancel.clone(); + let server_handle = tokio::spawn(async move { + tracing::debug!("starting gRPC server"); + Server::builder() + .add_service(AbciApplicationServer::new(app)) + .serve_with_shutdown(addr, server_cancel.cancelled()) + .await + .expect("server failed"); + tracing::debug!("gRPC server stopped"); + }); + + let socket_uri = format!("grpc://{}", bind_address); + let container_name = format!("tenderdash_{}", test_name); + + let td = tokio::task::spawn_blocking(move || { + tracing::debug!("starting Tenderdash in Docker container"); + let td = Arc::new(common::docker::TenderdashDocker::new( + &container_name, + None, + &socket_uri, + )); + common::docker::setup_td_logs_panic(&td); + tracing::debug!("started Tenderdash in Docker container"); + + td + }) + .await + .expect("start tenderdash"); + + tokio::select! { + _ = tokio::time::sleep(tokio::time::Duration::from_secs(60)) => { + panic!("Test timed out"); + } + _ = cancel.cancelled() => { + tracing::debug!("CancellationToken cancelled"); + } + ret = server_handle => { + ret.expect("gRPC server failed"); + } + } + + tokio::task::spawn_blocking(move || drop(td)) + .await + .expect("tenderdash cleanup"); + + tracing::info!("Test finished successfully"); +} + +pub struct TestApp { + // when test succeeds, we cancel this token to finish it + cancel: CancellationToken, +} +#[async_trait] +impl AbciApplication for TestApp { + async fn echo( + &self, + request: tonic::Request, + ) -> Result, Status> { + tracing::info!(?request, "Echo called"); + Ok(Response::new(proto::abci::ResponseEcho { + message: request.into_inner().message, + })) + } + async fn info( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + tracing::info!("Info called, test successful"); + let resp = ResponseInfo { + ..Default::default() + }; + self.cancel.cancel(); + Ok(Response::new(resp)) + } +} diff --git a/abci/tests/kvstore.rs b/abci/tests/kvstore.rs index 9ca5a491..bcd41090 100644 --- a/abci/tests/kvstore.rs +++ b/abci/tests/kvstore.rs @@ -2,7 +2,6 @@ mod common; use std::{ collections::{BTreeMap, BTreeSet}, - mem, ops::Deref, sync::{RwLock, RwLockWriteGuard}, }; @@ -12,17 +11,25 @@ use blake2::{ digest::{consts::U32, FixedOutput}, Blake2b, Digest, }; +use lazy_static::lazy_static; use proto::abci::{self, ResponseException}; -use tenderdash_abci::{check_version, proto, start_server, Application, RequestDispatcher}; -use tracing::{debug, error}; +use tenderdash_abci::{check_version, proto, Application, CancellationToken}; +use tracing::error; use tracing_subscriber::filter::LevelFilter; const SOCKET: &str = "/tmp/abci.sock"; +lazy_static! { + static ref CANCEL_TOKEN: CancellationToken = CancellationToken::new(); +} + #[cfg(feature = "docker-tests")] +#[cfg(feature = "unix")] #[test] fn test_kvstore() { use std::{fs, os::unix::prelude::PermissionsExt}; + + use tenderdash_abci::ServerBuilder; tracing_subscriber::fmt() .with_max_level(LevelFilter::DEBUG) .init(); @@ -45,8 +52,12 @@ fn test_kvstore() { state_reference.insert("ayy".to_owned(), "lmao".to_owned()); let bind_address = format!("unix://{}", SOCKET); - let app = TestDispatcher::new(abci_app); - let server = start_server(&bind_address, app).expect("server failed"); + + let cancel = CANCEL_TOKEN.clone(); + let server = ServerBuilder::new(abci_app, &bind_address) + .with_cancel_token(cancel) + .build() + .expect("server failed"); let perms = fs::Permissions::from_mode(0o777); fs::set_permissions(SOCKET, perms).expect("set perms"); @@ -54,7 +65,10 @@ fn test_kvstore() { let socket_uri = bind_address.to_string(); let _td = common::docker::TenderdashDocker::new("tenderdash", None, &socket_uri); - assert!(matches!(server.handle_connection(), Ok(()))); + assert!(matches!( + server.next_client(), + Err(tenderdash_abci::Error::Cancelled()) + )); drop(server); let kvstore_app = kvstore.into_inner().expect("kvstore lock is poisoned"); @@ -62,33 +76,6 @@ fn test_kvstore() { assert_eq!(kvstore_app.last_block_height, 1); } -pub struct TestDispatcher<'a> { - abci_app: KVStoreABCI<'a>, -} - -impl<'a> TestDispatcher<'a> { - fn new(abci_app: KVStoreABCI<'a>) -> Self { - Self { abci_app } - } -} - -impl RequestDispatcher for TestDispatcher<'_> { - fn handle(&self, request: proto::abci::Request) -> Option { - debug!("Incoming request: {:?}", request); - - if let Some(proto::abci::request::Value::FinalizeBlock(req)) = request.value { - self.abci_app - .finalize_block(req) - .expect("finalize block failed"); - - // Shudown ABCI application after one block - return None; - } - // We use generic dispatcher implementation here - self.abci_app.handle(request) - } -} - /// An example storage. /// /// For clarity it separates commited data (application data with associated @@ -107,7 +94,7 @@ impl KVStore { } pub(crate) fn commit(&mut self) { - let pending_operations = mem::replace(&mut self.pending_operations, BTreeSet::new()); + let pending_operations = std::mem::take(&mut self.pending_operations); pending_operations .into_iter() .for_each(|op| op.apply(&mut self.persisted_state)); @@ -239,7 +226,9 @@ impl Application for KVStoreABCI<'_> { .collect::>>() else { error!("Cannot decode transactions"); - return Err(abci::ResponseException {error:"cannot decode transactions".to_string()}); + return Err(abci::ResponseException { + error: "cannot decode transactions".to_string(), + }); }; // Mark transactions that should be added to the proposed transactions @@ -265,7 +254,9 @@ impl Application for KVStoreABCI<'_> { let Some(tx_records) = tx_records_encoded else { error!("cannot encode transactions"); - return Err(ResponseException{error:"cannot encode transactions".to_string()}); + return Err(ResponseException { + error: "cannot encode transactions".to_string(), + }); }; // Put both local and proposed transactions into staging area @@ -298,7 +289,9 @@ impl Application for KVStoreABCI<'_> { .map(decode_transaction) .collect::>>() else { - return Err(ResponseException{error:"cannot decode transactions".to_string()}); + return Err(ResponseException { + error: "cannot decode transactions".to_string(), + }); }; let tx_results = tx_results_accept(td_proposed_transactions.len()); @@ -327,6 +320,7 @@ impl Application for KVStoreABCI<'_> { vote_extensions: vec![proto::abci::ExtendVoteExtension { r#type: proto::types::VoteExtensionType::ThresholdRecover as i32, extension: height, + sign_request_id: None, }], }) } @@ -359,6 +353,10 @@ impl Application for KVStoreABCI<'_> { kvstore_lock.commit(); + // we want to end the test and shutdown the server + let cancel = CANCEL_TOKEN.clone(); + cancel.cancel(); + Ok(Default::default()) } } diff --git a/abci/tests/tcp.rs b/abci/tests/tcp.rs index cf55cee2..8b439768 100644 --- a/abci/tests/tcp.rs +++ b/abci/tests/tcp.rs @@ -14,7 +14,7 @@ use tenderdash_abci::proto; fn test_ipv4_server() { // we assume the host uses default Docker network configuration, with the host // using 172.17.0.1 - let bind_address = format!("tcp://172.17.0.1:1234"); + let bind_address = "tcp://172.17.0.1:1234".to_string(); tcp_server_test("v4", bind_address.as_str()); } @@ -27,7 +27,7 @@ fn test_ipv4_server() { fn test_ipv6_server() { // we assume the host uses default Docker network configuration, with the host // using 172.17.0.1. This is IPv6 notation of the IPv4 address. - let bind_address = format!("tcp://[::ffff:ac11:1]:5678"); + let bind_address = "tcp://[::ffff:ac11:1]:5678".to_string(); tcp_server_test("v6", bind_address.as_str()); } @@ -40,17 +40,19 @@ fn test_ipv6_server() { /// * When we estabilish connection with Tenderdash /// * Then Tenderdash sends Info request fn tcp_server_test(test_name: &str, bind_address: &str) { - use tenderdash_abci::start_server; - use tracing_subscriber::filter::LevelFilter; + use tenderdash_abci::ServerBuilder; tracing_subscriber::fmt() - .with_max_level(LevelFilter::DEBUG) + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_ansi(true) .try_init() .ok(); let app = TestDispatcher {}; - let server = start_server(&bind_address, app).expect("server failed"); + let server = ServerBuilder::new(app, bind_address) + .build() + .expect("server failed"); let socket_uri = bind_address.to_string(); let container_name = format!("tenderdash_{}", test_name); @@ -62,7 +64,9 @@ fn tcp_server_test(test_name: &str, bind_address: &str) { common::docker::setup_td_logs_panic(&td); - assert!(matches!(server.handle_connection(), Ok(()))); + let result = server.next_client(); + tracing::debug!(?result, "connection handled"); + assert!(matches!(result, Ok(()))); } pub struct TestDispatcher {} @@ -74,6 +78,7 @@ impl RequestDispatcher for TestDispatcher { request.value, Some(proto::abci::request::Value::Info(_)) )); + tracing::info!("info request received"); None } } diff --git a/abci/tests/unix.rs b/abci/tests/unix.rs index 1adc804d..36baa16d 100644 --- a/abci/tests/unix.rs +++ b/abci/tests/unix.rs @@ -4,12 +4,13 @@ use tenderdash_abci::RequestDispatcher; mod common; use std::{fs, os::unix::prelude::PermissionsExt}; -use tenderdash_abci::{proto, start_server}; +use tenderdash_abci::proto; use tracing_subscriber::filter::LevelFilter; const SOCKET: &str = "/tmp/abci.sock"; #[cfg(feature = "docker-tests")] +#[cfg(feature = "unix")] #[test] /// Feature: ABCI App socket server /// @@ -18,6 +19,8 @@ const SOCKET: &str = "/tmp/abci.sock"; /// * When we estabilish connection with Tenderdash /// * Then Tenderdash sends Info request fn test_unix_socket_server() { + use tenderdash_abci::ServerBuilder; + tracing_subscriber::fmt() .with_max_level(LevelFilter::DEBUG) .init(); @@ -25,7 +28,10 @@ fn test_unix_socket_server() { let bind_address = format!("unix://{}", SOCKET); let app = TestDispatcher {}; - let server = start_server(&bind_address, app).expect("server failed"); + + let server = ServerBuilder::new(app, &bind_address) + .build() + .expect("server failed"); let perms = fs::Permissions::from_mode(0o777); fs::set_permissions(SOCKET, perms).expect("set perms"); @@ -38,7 +44,7 @@ fn test_unix_socket_server() { common::docker::setup_td_logs_panic(&td); - assert!(matches!(server.handle_connection(), Ok(()))); + assert!(matches!(server.next_client(), Ok(()))); } /// Returns error containing string [`INFO_CALLED_ERROR`] when Tenderdash calls diff --git a/proto-compiler/Cargo.toml b/proto-compiler/Cargo.toml index a2945851..9dd520ec 100644 --- a/proto-compiler/Cargo.toml +++ b/proto-compiler/Cargo.toml @@ -1,7 +1,7 @@ [package] +version = "0.14.0-dev.9" name = "tenderdash-proto-compiler" -version = "0.1.0" -authors = ["Informal Systems "] +authors = ["Informal Systems ", "Dash Core Group"] edition = "2021" description = "Internal tool to download and build tenderdash protobuf definitions; used by proto/build.rs" publish = false @@ -10,8 +10,22 @@ publish = false [dependencies] walkdir = { version = "2.3" } -prost-build = { version = "0.11" } -git2 = { version = "0.16" } +prost-build = { version = "0.12" } tempfile = { version = "3.2.0" } -subtle-encoding = { version = "0.5" } regex = { "version" = "1.7.1" } +# Use of native-tls-vendored should build vendored openssl, which is required for Alpine build +ureq = { "version" = "2.6.2" } +zip = { version = "0.6.4", default-features = false, features = ["deflate"] } +fs_extra = { version = "1.3.0" } +tonic-build = { version = "0.11.0", optional = true } + + +[features] +default = [] +# Enable gRPC support; needed by server and client features. +# Conflicts with no_std +grpc = ["dep:tonic-build"] +# Build the gRPC server. Requires tenderdash-proto/std feature. +server = ["grpc"] +# Build the gRPC client. Requires tenderdash-proto/std feature. +client = ["grpc"] diff --git a/proto-compiler/src/constants.rs b/proto-compiler/src/constants.rs index e7e6a649..25f27f3a 100644 --- a/proto-compiler/src/constants.rs +++ b/proto-compiler/src/constants.rs @@ -54,7 +54,6 @@ const DERIVE_FROM_STR: &str = r#"#[derive(derive_more::FromStr)]"#; /// here: pub static CUSTOM_TYPE_ATTRIBUTES: &[(&str, &str)] = &[ (".tendermint.libs.bits.BitArray", SERIALIZED), - (".tendermint.types.EvidenceParams", SERIALIZED), (".tendermint.types.BlockIDFlag", PRIMITIVE_ENUM), (".tendermint.types.Block", SERIALIZED), (".tendermint.types.Data", SERIALIZED), @@ -90,6 +89,16 @@ pub static CUSTOM_TYPE_ATTRIBUTES: &[(&str, &str)] = &[ (".tendermint.types.TxProof", SERIALIZED), (".tendermint.crypto.Proof", SERIALIZED), (".tendermint.abci.Response.value", DERIVE_FROM), + (".tendermint.abci.Request.value", DERIVE_FROM), + // Consensus params + (".tendermint.types.ConsensusParams", SERIALIZED), + (".tendermint.types.ABCIParams", SERIALIZED), + (".tendermint.types.BlockParams", SERIALIZED), + (".tendermint.types.EvidenceParams", SERIALIZED), + (".tendermint.types.ValidatorParams", SERIALIZED), + (".tendermint.types.VersionParams", SERIALIZED), + (".tendermint.types.SynchronyParams", SERIALIZED), + (".tendermint.types.TimeoutParams", SERIALIZED), ]; /// Custom field attributes applied on top of protobuf fields in (a) struct(s) @@ -98,10 +107,6 @@ pub static CUSTOM_TYPE_ATTRIBUTES: &[(&str, &str)] = &[ /// The first item is a path as defined in the prost_build::Config::btree_map /// here: pub static CUSTOM_FIELD_ATTRIBUTES: &[(&str, &str)] = &[ - ( - ".tendermint.types.EvidenceParams.max_bytes", - QUOTED_WITH_DEFAULT, - ), (".tendermint.version.Consensus.block", QUOTED), (".tendermint.version.Consensus.app", QUOTED_WITH_DEFAULT), (".tendermint.abci.ResponseInfo.data", DEFAULT), @@ -199,4 +204,22 @@ pub static CUSTOM_FIELD_ATTRIBUTES: &[(&str, &str)] = &[ (".tendermint.crypto.Proof.total", QUOTED), (".tendermint.crypto.Proof.aunts", VEC_BASE64STRING), (".tendermint.crypto.Proof.leaf_hash", BASE64STRING), + // Consensus params + ( + ".tendermint.types.BlockParams.max_bytes", + QUOTED_WITH_DEFAULT, + ), + (".tendermint.types.BlockParams.max_gas", QUOTED_WITH_DEFAULT), + ( + ".tendermint.types.EvidenceParams.max_age_num_blocks", + QUOTED_WITH_DEFAULT, + ), + ( + ".tendermint.types.EvidenceParams.max_bytes", + QUOTED_WITH_DEFAULT, + ), + ( + ".tendermint.types.VersionParams.app_version", + QUOTED_WITH_DEFAULT, + ), ]; diff --git a/proto-compiler/src/functions.rs b/proto-compiler/src/functions.rs index 80b232d6..33b6a565 100644 --- a/proto-compiler/src/functions.rs +++ b/proto-compiler/src/functions.rs @@ -5,163 +5,150 @@ use std::{ path::{Path, PathBuf}, }; -use git2::{ - build::{CheckoutBuilder, RepoBuilder}, - AutotagOption, Commit, FetchOptions, Oid, Reference, Repository, -}; -use subtle_encoding::hex; use walkdir::WalkDir; use crate::constants::DEFAULT_TENDERDASH_COMMITISH; -/// Clone or open+fetch a repository and check out a specific commitish -/// In case of an existing repository, the origin remote will be set to `url`. -pub fn fetch_commitish(dir: &Path, url: &str, commitish: &str) { - let mut dotgit = dir.to_path_buf(); - dotgit.push(".git"); - let repo = if dotgit.is_dir() { - fetch_existing(dir, url) - } else { - clone_new(dir, url) - }; - checkout_commitish(&repo, commitish) -} +/// Check out a specific commitish of the tenderdash repository. +/// +/// As this tool is mainly used by build.rs script, we rely +/// on cargo to decide wherther or not to call it. It means +/// we will not be called too frequently, so the fetch will +/// not happen too often. +pub fn fetch_commitish(tenderdash_dir: &Path, cache_dir: &Path, url: &str, commitish: &str) { + let url = format!("{url}/archive/{commitish}.zip"); -fn clone_new(dir: &Path, url: &str) -> Repository { println!( - " [info] => Cloning {} into {} folder", + " [info] => Downloading and extracting {} into {}", url, - dir.to_string_lossy() + tenderdash_dir.to_string_lossy() ); - let mut fo = FetchOptions::new(); - fo.download_tags(AutotagOption::All); - fo.update_fetchhead(true); + // ensure cache dir exists + if !cache_dir.is_dir() { + std::fs::create_dir_all(cache_dir).expect("cannot create cache directory"); + } - let mut builder = RepoBuilder::new(); - builder.fetch_options(fo); + let archive_file = cache_dir.join(format!("tenderdash-{}.zip", commitish)); + // Unzip Tenderdash sources to tmpdir and move to target/tenderdash + let tmpdir = tempfile::tempdir().expect("cannot create temporary dir to extract archive"); + download_and_unzip(&url, archive_file.as_path(), tmpdir.path()); - builder.clone(url, dir).unwrap() -} + // Downloaded zip contains subdirectory like tenderdash-0.12.0-dev.2. We need to + // move its contents to target/tederdash, so that we get correct paths like + // target/tenderdash/version/version.go + let src_dir = find_subdir(tmpdir.path(), "tenderdash-"); -fn fetch_existing(dir: &Path, url: &str) -> Repository { - println!( - " [info] => Fetching from {} into existing {} folder", - url, - dir.to_string_lossy() - ); - let repo = Repository::open(dir).unwrap(); + let options = fs_extra::dir::CopyOptions::new().content_only(true); - let mut fo = git2::FetchOptions::new(); - fo.download_tags(git2::AutotagOption::All); - fo.update_fetchhead(true); + fs_extra::dir::create(tenderdash_dir, true).expect("cannot create destination directory"); + fs_extra::dir::move_dir(src_dir, tenderdash_dir, &options) + .expect("cannot move tenderdash directory"); +} - let mut remote = repo - .find_remote("origin") - .unwrap_or_else(|_| repo.remote("origin", url).unwrap()); - if remote.url().is_none() || remote.url().unwrap() != url { - repo.remote_set_url("origin", url).unwrap(); - } - println!(" [info] => Fetching repo using remote `origin`"); - let specs: &[&str] = &[]; - remote.fetch(specs, Some(&mut fo), None).unwrap(); +/// Download file from URL and unzip it to `dest_dir` +fn download_and_unzip(url: &str, archive_file: &Path, dest_dir: &Path) { + const RETRIES: usize = 2; - let stats = remote.stats(); - if stats.local_objects() > 0 { + for retry in 1..=RETRIES { println!( - " [info] => Received {}/{} objects in {} bytes (used {} local objects)", - stats.indexed_objects(), - stats.total_objects(), - stats.received_bytes(), - stats.local_objects() + " [info] => Download and extract tenderdash sources, attempt {}/{}", + retry, RETRIES ); - } else { + + if !archive_file.is_file() { + println!(" [info] => Downloading {}", url); + download(url, archive_file) + .unwrap_or_else(|e| println!(" [error] => Cannot download archive: {:?}", e)); + } else { + println!( + " [info] => Archive file {} already exists, skipping download", + archive_file.display() + ); + } + println!( - " [info] => Received {}/{} objects in {} bytes", - stats.indexed_objects(), - stats.total_objects(), - stats.received_bytes() + " [info] => Extracting downloaded archive {}", + archive_file.display() ); - } - - Repository::open(dir).unwrap() -} + match unzip(archive_file, dest_dir) { + Ok(_) => break, + Err(e) => { + println!( + " [error] => Cannot unzip archive: {}: {:?}", + archive_file.display(), + e + ); + }, + } -fn checkout_commitish(repo: &Repository, commitish: &str) { - let (reference, commit) = find_reference_or_commit(repo, commitish); + // remove invalid file + std::fs::remove_file(archive_file) + .unwrap_or_else(|_| println!(" [warn] => Cannot remove file: {:?}", archive_file)); + } println!( - " [info] => Checking out repo in detached HEAD mode:\n \ - [info] => id: {},\n \ - [info] => author: {},\n \ - [info] => committer: {},\n \ - [info] => summary: {}", - commit.id(), - commit.author(), - commit.committer(), - commit.summary().unwrap_or(""), + " [info] => Extracted tenderdash sources to {}", + dest_dir.display() ); +} - match reference { - None => repo.set_head_detached(commit.id()).unwrap(), - Some(reference) => { - println!(" [info] => name: {}", reference.shorthand().unwrap()); - repo.set_head(reference.name().unwrap()).unwrap(); - }, +/// Download file from URL +fn download(url: &str, archive_file: &Path) -> Result<(), String> { + let mut file = + File::create(archive_file).map_err(|e| format!("cannot create file: {:?}", e))?; + let rb = ureq::get(url) + .call() + .map_err(|e| format!("cannot download archive from: {}: {:?}", url, e))?; + + let mut reader = rb.into_reader(); + std::io::copy(&mut reader, &mut file).map_err(|e| { + format!( + "cannot save downloaded data to: {:?}: {:?}", + archive_file, e + ) + })?; + + file.flush() + .map_err(|e| format!("cannot flush downloaded file: {:?}: {:?}", archive_file, e)) +} + +// Unzip archive; when return false, it means that the archive file does not +// exist or is corrupted and should be downloaded again +fn unzip(archive_file: &Path, dest_dir: &Path) -> Result<(), String> { + if !archive_file.is_file() { + // no archive file, so we request another download + return Err("archive file does not exist".to_string()); } + let file = File::open(archive_file).expect("cannot open downloaded zip"); + let mut archive = + zip::ZipArchive::new(&file).map_err(|e| format!("cannot open zip archive: {:?}", e))?; + + archive + .extract(dest_dir) + .map_err(|e| format!("cannot extract archive: {:?}", e))?; - let mut checkout_options = CheckoutBuilder::new(); - checkout_options - .force() - .remove_untracked(true) - .remove_ignored(true) - .use_theirs(true); - repo.checkout_head(Some(&mut checkout_options)).unwrap(); + Ok(()) } -fn find_reference_or_commit<'a>( - repo: &'a Repository, - commitish: &str, -) -> (Option>, Commit<'a>) { - let mut tried_origin = false; // we tried adding 'origin/' to the commitish - - let mut try_reference = repo.resolve_reference_from_short_name(commitish); - if try_reference.is_err() { - // Local branch might be missing, try the remote branch - try_reference = repo.resolve_reference_from_short_name(&format!("origin/{commitish}")); - tried_origin = true; - if try_reference.is_err() { - // Remote branch not found, last chance: try as a commit ID - // Note: Oid::from_str() currently does an incorrect conversion and cuts the - // second half of the ID. We are falling back on Oid::from_bytes() - // for now. - let commitish_vec = hex::decode(commitish).unwrap_or_else(|_| { - hex::decode_upper(commitish).expect( - "TENDERDASH_COMMITISH refers to non-existing or invalid git branch/tag/commit", - ) - }); - return ( - None, - repo.find_commit(Oid::from_bytes(commitish_vec.as_slice()).unwrap()) - .unwrap(), - ); - } +/// Find a subdirectory of a parent path which has provided name prefix +fn find_subdir(parent: &Path, name_prefix: &str) -> PathBuf { + let dir_content = fs_extra::dir::get_dir_content(parent).expect("cannot ls tmp dir"); + let mut src_dir = String::new(); + for directory in dir_content.directories { + let directory = Path::new(&directory) + .file_name() + .expect("cannot extract dir name"); + + if directory.to_string_lossy().starts_with(name_prefix) { + src_dir = directory.to_string_lossy().into(); + break; + }; } - - let mut reference = try_reference.unwrap(); - if reference.is_branch() { - if tried_origin { - panic!("[error] => local branch names with 'origin/' prefix not supported"); - } - try_reference = repo.resolve_reference_from_short_name(&format!("origin/{commitish}")); - reference = try_reference.unwrap(); - if reference.is_branch() { - panic!("[error] => local branch names with 'origin/' prefix not supported"); - } + if src_dir.is_empty() { + panic!("cannot find extracted Tenderdash sources") } - - let commit = reference.peel_to_commit().unwrap(); - (Some(reference), commit) + parent.join(src_dir) } /// Copy generated files to target folder @@ -235,8 +222,32 @@ pub fn abci_version>(dir: T) -> String { .to_string() } +pub fn tenderdash_version>(dir: T) -> String { + let mut file_path = dir.as_ref().to_path_buf(); + file_path.push("version/version.go"); + + let contents = read_to_string(&file_path).expect("cannot read version/version.go"); + use regex::Regex; + + let re = Regex::new(r##"(?m)^\s+TMVersionDefault\s*=\s*"([^"]+)"\s+*$"##).unwrap(); + let captures = re + .captures(&contents) + .expect("cannot find TMVersionDefault in version/version.go"); + + captures + .get(1) + .expect("TMVersionDefault not found in version/version.go") + .as_str() + .to_string() +} + /// Create tenderdash.rs with library information -pub fn generate_tenderdash_lib(prost_dir: &Path, tenderdash_lib_target: &Path, abci_version: &str) { +pub fn generate_tenderdash_lib( + prost_dir: &Path, + tenderdash_lib_target: &Path, + abci_ver: &str, + td_ver: &str, +) { let mut file_names = WalkDir::new(prost_dir) .into_iter() .filter_map(|e| e.ok()) @@ -292,13 +303,14 @@ pub mod meta {{ /// Semantic version of ABCI protocol pub const ABCI_VERSION: &str = \"{}\"; /// Version of Tenderdash server used to generate protobuf configs - pub const TENDERDASH_VERSION: &str = env!(\"CARGO_PKG_VERSION\"); + pub const TENDERDASH_VERSION: &str = \"{}\"; }} ", content, crate::constants::TENDERDASH_REPO, tenderdash_commitish(), - abci_version, + abci_ver, + td_ver, ); let mut file = @@ -313,3 +325,34 @@ pub(crate) fn tenderdash_commitish() -> String { Err(_) => DEFAULT_TENDERDASH_COMMITISH.to_string(), } } + +/// Save the commitish of last successful download to a file in a state file, +/// located in the `dir` directory and named `download.state`. +pub(crate) fn save_state(dir: &Path, commitish: &str) { + let state_file = PathBuf::from(&dir).join("download.state"); + + std::fs::write(&state_file, commitish) + .map_err(|e| { + println!( + "[warn] => Failed to write download.state file {}: {}", + state_file.display(), + e + ); + }) + .ok(); +} + +/// Check if the state file contains the same commitish as the one we are trying +/// to download. State file should be located in the `dir` and named +/// `download.state` +pub(crate) fn check_state(dir: &Path, commitish: &str) -> bool { + let state_file = PathBuf::from(&dir).join("download.state"); + + match read_to_string(state_file) { + Ok(content) => { + println!("[info] => Detected Tenderdash version: {}.", content.trim()); + content.eq(commitish) + }, + Err(_) => false, + } +} diff --git a/proto-compiler/src/lib.rs b/proto-compiler/src/lib.rs index 1d6ad542..483ec259 100644 --- a/proto-compiler/src/lib.rs +++ b/proto-compiler/src/lib.rs @@ -1,41 +1,40 @@ -//! Convert Tenderdash ABCI++ protocol buffers (protobuf3) definitions to Rust. -//! -//! This tool is for internal use. Used by tenderdash-proto/build.rs. -use std::{ - env::{self, var}, - path::PathBuf, -}; +use std::{env::var, path::PathBuf}; use tempfile::tempdir; mod functions; use functions::{ abci_version, copy_files, fetch_commitish, find_proto_files, generate_tenderdash_lib, - tenderdash_commitish, + tenderdash_commitish, tenderdash_version, }; mod constants; use constants::{CUSTOM_FIELD_ATTRIBUTES, CUSTOM_TYPE_ATTRIBUTES, TENDERDASH_REPO}; -/// Internal tool to download and compile protobuf definitions for Tenderdash. +use crate::functions::{check_state, save_state}; + +/// Import and compile protobuf definitions for Tenderdash. /// /// Checkouts tenderdash repository to ../target/tenderdash and generates /// Rust protobuf definitions in ../proto/src/prost/ and /// ../proto/src/tenderdash.rs pub fn proto_compile() { let root = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let tenderdash_lib_target = root .join("..") .join("proto") .join("src") .join("tenderdash.rs"); - let target_dir = root.join("..").join("proto").join("src").join("prost"); + + let prost_out_dir = root.join("..").join("proto").join("src").join("prost"); + let out_dir = var("OUT_DIR") .map(PathBuf::from) .or_else(|_| tempdir().map(|d| d.into_path())) .unwrap(); - let cargo_target_dir = match env::var("CARGO_TARGET_DIR") { + let cargo_target_dir = match std::env::var("CARGO_TARGET_DIR") { Ok(s) => PathBuf::from(s), Err(_) => root.join("..").join("target"), }; @@ -50,13 +49,37 @@ pub fn proto_compile() { let thirdparty_dir = root.join("third_party"); let commitish = tenderdash_commitish(); - println!("[info] => Fetching {TENDERDASH_REPO} at {commitish} into {tenderdash_dir:?}"); - fetch_commitish(&PathBuf::from(&tenderdash_dir), TENDERDASH_REPO, &commitish); // This panics if it fails. + // check if this commitish is already downloaded + let download = std::fs::metadata(tenderdash_dir.join("proto")).is_err() + || !check_state(&prost_out_dir, &commitish); + + if download { + println!("[info] => Fetching {TENDERDASH_REPO} at {commitish} into {tenderdash_dir:?}."); + fetch_commitish( + &PathBuf::from(&tenderdash_dir), + &cargo_target_dir, + TENDERDASH_REPO, + &commitish, + ); // This panics if it fails. + } else { + println!("[info] => Skipping download."); + } + + // We need all files in proto/tendermint/abci, plus .../types/canonical.proto + // for signature verification let proto_paths = vec![tenderdash_dir.join("proto").join("tendermint").join("abci")]; let proto_includes_paths = vec![tenderdash_dir.join("proto"), thirdparty_dir]; // List available proto files - let protos = find_proto_files(proto_paths); + let mut protos = find_proto_files(proto_paths); + // On top of that, we add canonical.proto, required to verify signatures + protos.push( + tenderdash_dir + .join("proto") + .join("tendermint") + .join("types") + .join("canonical.proto"), + ); let mut pb = prost_build::Config::new(); @@ -81,15 +104,25 @@ pub fn proto_compile() { ); println!("[info] => Determining ABCI protocol version."); - let abci_ver = abci_version(tenderdash_dir); + let abci_ver = abci_version(&tenderdash_dir); + let tenderdash_ver = tenderdash_version(tenderdash_dir); println!("[info] => Creating structs."); + + #[cfg(feature = "grpc")] + tonic_build::configure() + .generate_default_stubs(true) + .compile_with_config(pb, &protos, &proto_includes_paths) + .unwrap(); + + #[cfg(not(feature = "grpc"))] pb.compile_protos(&protos, &proto_includes_paths).unwrap(); println!("[info] => Removing old structs and copying new structs."); - copy_files(&out_dir, &target_dir); // This panics if it fails. + copy_files(&out_dir, &prost_out_dir); // This panics if it fails. - generate_tenderdash_lib(&out_dir, &tenderdash_lib_target, &abci_ver); + generate_tenderdash_lib(&out_dir, &tenderdash_lib_target, &abci_ver, &tenderdash_ver); + save_state(&prost_out_dir, &commitish); println!("[info] => Done!"); } diff --git a/proto/Cargo.toml b/proto/Cargo.toml index fdf6120f..e2824006 100644 --- a/proto/Cargo.toml +++ b/proto/Cargo.toml @@ -1,6 +1,6 @@ [package] +version = "0.14.0-dev.9" name = "tenderdash-proto" -version = "0.12.0-dev.1" edition = "2021" license = "Apache-2.0" repository = "https://github.com/dashpay/rs-tenderdash-abci/tree/main/proto" @@ -22,30 +22,53 @@ description = """ [package.metadata.docs.rs] all-features = true +[features] +# Features configuration. +# +# Note that, due to the way build.rs scripts work, change of features does not trigger +# regeneration of protobuf files. This means you need to be extra careful when changing +# features, as you might end up with outdated and/or conflicting generated files. +# +# Sometimes cleaning the build cache with `cargo clean` might be necessary to solve +# issues related to outdated generated files. +default = ["grpc"] + +# Enable standard library support +std = ["prost/std", "prost-types/std"] +# Build gRPC server +grpc = [ + "std", + "tenderdash-proto-compiler/server", + "tenderdash-proto-compiler/client", + "dep:tonic", +] + [dependencies] -prost = { version = "0.11", default-features = false } -prost-types = { version = "0.11", default-features = false } +prost = { version = "0.12", default-features = false, features = [ + "prost-derive", +] } +prost-types = { version = "0.12", default-features = false } +tonic = { version = "0.11", optional = true } bytes = { version = "1.0", default-features = false, features = ["serde"] } serde = { version = "1.0", default-features = false, features = ["derive"] } -serde_bytes = { version = "0.11", default-features = false, features = [ - "alloc", -] } subtle-encoding = { version = "0.5", default-features = false, features = [ "hex", "base64", "alloc", ] } num-traits = { version = "0.2", default-features = false } -num-derive = { version = "0.3", default-features = false } +num-derive = { version = "0.4", default-features = false } time = { version = "0.3", default-features = false, features = [ "macros", "parsing", ] } flex-error = { version = "0.4.4", default-features = false } -chrono = { version = "0.4.24", default-features = false } +chrono = { version = "0.4.35", default-features = false } derive_more = { version = "0.99.17" } + [dev-dependencies] serde_json = { version = "1.0", default-features = false, features = ["alloc"] } + [build-dependencies] tenderdash-proto-compiler = { path = "../proto-compiler" } diff --git a/proto/build.rs b/proto/build.rs index dd0eb6f1..2087ba78 100644 --- a/proto/build.rs +++ b/proto/build.rs @@ -1,12 +1,20 @@ use std::env; fn main() { - let version = env!("CARGO_PKG_VERSION"); + // default Tenderdash version to use if TENDERDASH_COMMITISH is not set + const DEFAULT_VERSION: &str = "v0.14.0-dev.4"; + + // check if TENDERDASH_COMMITISH is already set; if not, set it to the current + // version + let commitish = env::var("TENDERDASH_COMMITISH").unwrap_or_default(); + if commitish.is_empty() { + env::set_var("TENDERDASH_COMMITISH", DEFAULT_VERSION); + } - env::set_var("TENDERDASH_COMMITISH", "v".to_owned() + version); tenderdash_proto_compiler::proto_compile(); println!("cargo:rerun-if-changed=../proto-compiler/src"); println!("cargo:rerun-if-changed=Cargo.toml"); println!("cargo:rerun-if-env-changed=CARGO_PKG_VERSION"); + println!("cargo:rerun-if-env-changed=TENDERDASH_COMMITISH"); } diff --git a/proto/src/error.rs b/proto/src/error.rs index 1495ff2d..cfa6eb4f 100644 --- a/proto/src/error.rs +++ b/proto/src/error.rs @@ -1,7 +1,9 @@ //! This module defines the various errors that be raised during Protobuf //! conversions. - +#[cfg(not(feature = "std"))] use core::{convert::TryFrom, fmt::Display, num::TryFromIntError}; +#[cfg(feature = "std")] +use std::{fmt::Display, num::TryFromIntError}; use flex_error::{define_error, DisplayOnly}; use prost::{DecodeError, EncodeError}; diff --git a/proto/src/lib.rs b/proto/src/lib.rs index e5c82657..ca3ad64f 100644 --- a/proto/src/lib.rs +++ b/proto/src/lib.rs @@ -1,7 +1,7 @@ //! tenderdash-proto library gives the developer access to the Tenderdash //! proto-defined structs. -#![no_std] +#![cfg_attr(not(feature = "std"), no_std)] #![deny(warnings, trivial_casts, trivial_numeric_casts, unused_import_braces)] #![allow(clippy::large_enum_variant)] #![forbid(unsafe_code)] @@ -23,10 +23,13 @@ mod error; #[allow(warnings)] mod tenderdash; +#[cfg(not(feature = "std"))] use core::{ convert::{TryFrom, TryInto}, fmt::Display, }; +#[cfg(feature = "std")] +use std::fmt::Display; use bytes::{Buf, BufMut}; pub use error::Error; @@ -36,8 +39,9 @@ pub use tenderdash::*; pub mod serializers; use prelude::*; - pub use tenderdash::meta::ABCI_VERSION; +#[cfg(feature = "grpc")] +pub use tonic; /// Allows for easy Google Protocol Buffers encoding and decoding of domain /// types with validation. diff --git a/proto/src/prelude.rs b/proto/src/prelude.rs index aa9a20b6..74cb1a7f 100644 --- a/proto/src/prelude.rs +++ b/proto/src/prelude.rs @@ -1,5 +1,6 @@ // Re-export according to alloc::prelude::v1 because it is not yet stabilized // https://doc.rust-lang.org/src/alloc/prelude/v1.rs.html +#[allow(unused_imports)] pub use alloc::{ borrow::ToOwned, boxed::Box, diff --git a/proto/src/protobuf.rs b/proto/src/protobuf.rs index 8cececae..b21e789d 100644 --- a/proto/src/protobuf.rs +++ b/proto/src/protobuf.rs @@ -1,7 +1,12 @@ -// Google protobuf Timestamp and Duration types reimplemented because their comments are turned -// into invalid documentation texts and doctest chokes on them. See https://github.com/danburkert/prost/issues/374 -// Prost does not seem to have a way yet to remove documentations defined in protobuf files. -// These structs are defined in gogoproto v1.3.1 at https://github.com/gogo/protobuf/tree/v1.3.1/protobuf/google/protobuf +// Google protobuf Timestamp and Duration types reimplemented because their +// comments are turned into invalid documentation texts and doctest chokes on them. See https://github.com/danburkert/prost/issues/374 +// Prost does not seem to have a way yet to remove documentations defined in +// protobuf files. These structs are defined in gogoproto v1.3.1 at https://github.com/gogo/protobuf/tree/v1.3.1/protobuf/google/protobuf + +#[cfg(not(feature = "std"))] +use core::fmt; +#[cfg(feature = "std")] +use std::fmt; /// A Timestamp represents a point in time independent of any time zone or local /// calendar, encoded as a count of seconds and fractions of seconds at @@ -17,7 +22,10 @@ /// restricting to that range, we ensure that we can convert to and from [RFC /// 3339](https://www.ietf.org/rfc/rfc3339.txt) date strings. #[derive(Clone, PartialEq, ::prost::Message, ::serde::Deserialize, ::serde::Serialize)] -#[serde(from = "crate::serializers::timestamp::Rfc3339", into = "crate::serializers::timestamp::Rfc3339")] +#[serde( + from = "crate::serializers::timestamp::Rfc3339", + into = "crate::serializers::timestamp::Rfc3339" +)] pub struct Timestamp { /// Represents seconds of UTC time since Unix epoch /// 1970-01-01T00:00:00Z. Must be from 0001-01-01T00:00:00Z to @@ -38,7 +46,7 @@ pub struct Timestamp { /// or "month". It is related to Timestamp in that the difference between /// two Timestamp values is a Duration and it can be added or subtracted /// from a Timestamp. Range is approximately +-10,000 years. -#[derive(Clone, PartialEq, ::prost::Message, ::serde::Deserialize, ::serde::Serialize)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct Duration { /// Signed seconds of the span of time. Must be from -315,576,000,000 /// to +315,576,000,000 inclusive. Note: these bounds are computed from: @@ -54,3 +62,49 @@ pub struct Duration { #[prost(int32, tag = "2")] pub nanos: i32, } + +impl serde::Serialize for Duration { + fn serialize(&self, serializer: S) -> Result + where + S: serde::ser::Serializer, + { + let total_nanos = self.seconds * 1_000_000_000 + self.nanos as i64; + serializer.serialize_i64(total_nanos) + } +} + +struct DurationVisitor; + +impl<'de> serde::de::Visitor<'de> for DurationVisitor { + type Value = Duration; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a nanosecond representation of a duration") + } + + fn visit_i128(self, value: i128) -> Result + where + E: serde::de::Error, + { + let seconds = (value / 1_000_000_000) as i64; + let nanos = (value % 1_000_000_000) as i32; + Ok(Duration { seconds, nanos }) + } + + fn visit_str(self, value: &str) -> Result + where + E: serde::de::Error, + { + let value = value.parse::().map_err(E::custom)?; + self.visit_i128(value) + } +} + +impl<'de> serde::Deserialize<'de> for Duration { + fn deserialize(deserializer: D) -> Result + where + D: serde::de::Deserializer<'de>, + { + deserializer.deserialize_str(DurationVisitor) + } +} diff --git a/proto/src/serializers/from_str.rs b/proto/src/serializers/from_str.rs index c1762524..0544cc71 100644 --- a/proto/src/serializers/from_str.rs +++ b/proto/src/serializers/from_str.rs @@ -1,16 +1,20 @@ //! Serialize and deserialize any `T` that implements [[core::str::FromStr]] //! and [[core::fmt::Display]] from or into string. Note this can be used for //! all primitive data types. +#[cfg(not(feature = "std"))] +use core::{fmt::Display, str::FromStr}; +#[cfg(feature = "std")] +use std::{fmt::Display, str::FromStr}; + use serde::{de::Error as _, Deserialize, Deserializer, Serialize, Serializer}; use crate::prelude::*; - /// Deserialize string into T pub fn deserialize<'de, D, T>(deserializer: D) -> Result where D: Deserializer<'de>, - T: core::str::FromStr, - ::Err: core::fmt::Display, + T: FromStr, + ::Err: Display, { String::deserialize(deserializer)? .parse::() @@ -21,7 +25,7 @@ where pub fn serialize(value: &T, serializer: S) -> Result where S: Serializer, - T: core::fmt::Display, + T: Display, { format!("{value}").serialize(serializer) } diff --git a/proto/src/serializers/part_set_header_total.rs b/proto/src/serializers/part_set_header_total.rs index cecfc52d..17c92220 100644 --- a/proto/src/serializers/part_set_header_total.rs +++ b/proto/src/serializers/part_set_header_total.rs @@ -5,7 +5,11 @@ //! from a string-quoted integer value into an integer value without quotes in //! Tendermint Core v0.34.0. This deserializer allows backwards-compatibility by //! deserializing both ways. See also: + +#[cfg(not(feature = "std"))] use core::{convert::TryFrom, fmt::Formatter}; +#[cfg(feature = "std")] +use std::fmt::Formatter; use serde::{ de::{Error, Visitor}, diff --git a/proto/src/serializers/time_duration.rs b/proto/src/serializers/time_duration.rs index b8af3304..f30e943e 100644 --- a/proto/src/serializers/time_duration.rs +++ b/proto/src/serializers/time_duration.rs @@ -1,5 +1,8 @@ //! Serialize/deserialize core::time::Duration type from and into string: +#[cfg(not(feature = "std"))] use core::time::Duration; +#[cfg(feature = "std")] +use std::time::Duration; use serde::{de::Error as _, Deserialize, Deserializer, Serialize, Serializer}; diff --git a/proto/src/serializers/timestamp.rs b/proto/src/serializers/timestamp.rs index 84f1d371..41bf1f0b 100644 --- a/proto/src/serializers/timestamp.rs +++ b/proto/src/serializers/timestamp.rs @@ -1,6 +1,8 @@ //! Serialize/deserialize Timestamp type from and into string: - -use core::fmt; +#[cfg(not(feature = "std"))] +use core::fmt::{self, Debug}; +#[cfg(feature = "std")] +use std::fmt::{self, Debug}; use serde::{de::Error as _, ser::Error, Deserialize, Deserializer, Serialize, Serializer}; use time::{ @@ -29,15 +31,63 @@ impl From for Timestamp { pub trait ToMilis { /// Convert protobuf timestamp into miliseconds since epoch + + /// Note there is a resolution difference, as timestamp uses nanoseconds + /// + /// # Arguments + /// + /// * millis - time since epoch, in miliseconds + /// + /// # Panics + /// + /// Panics when timestamp doesn't fit `u64` type fn to_milis(&self) -> u64; } impl ToMilis for Timestamp { /// Convert protobuf timestamp into miliseconds since epoch fn to_milis(&self) -> u64 { - chrono::NaiveDateTime::from_timestamp_opt(self.seconds, self.nanos as u32) + chrono::DateTime::from_timestamp(self.seconds, self.nanos as u32) .unwrap() - .timestamp_millis() as u64 + .to_utc() + .timestamp_millis() + .try_into() + .expect("timestamp value out of u64 range") + } +} + +pub trait FromMilis { + /// Create protobuf timestamp from miliseconds since epoch + /// + /// Note there is a resolution difference, as timestamp uses nanoseconds + /// + /// # Arguments + /// + /// * millis - time since epoch, in miliseconds; must fit `i64` type + fn from_milis(millis: u64) -> Self; +} + +impl FromMilis for Timestamp { + /// Create protobuf timestamp from miliseconds since epoch + /// + /// Note there is a resolution difference, as timestamp uses nanoseconds + /// + /// # Panics + /// + /// Panics when `millis` don't fit `i64` type + fn from_milis(millis: u64) -> Self { + let dt = chrono::DateTime::from_timestamp_millis( + millis + .try_into() + .expect("milliseconds timestamp out of i64 range"), + ) + .expect("cannot parse timestamp") + .to_utc(); + + Self { + nanos: dt.timestamp_subsec_nanos() as i32, + seconds: dt.timestamp(), + } } } @@ -101,8 +151,8 @@ pub fn to_rfc3339_nanos(t: OffsetDateTime) -> String { /// ie. a RFC3339 date-time with left-padded subsecond digits without /// trailing zeros and no trailing dot. /// -/// [`Display`]: core::fmt::Display -/// [`Debug`]: core::fmt::Debug +/// [`Display`]: fmt::Display +/// [`Debug`]: fmt::Debug pub fn fmt_as_rfc3339_nanos(t: OffsetDateTime, f: &mut impl fmt::Write) -> fmt::Result { let t = t.to_offset(offset!(UTC)); let nanos = t.nanosecond(); @@ -207,4 +257,33 @@ mod test { assert_eq!(json, serde_json::to_string(&rfc).unwrap()); } } + + #[test] + fn timestamp_from_to() { + let time_ms = 1687848809533; + + let from = Timestamp::from_milis(time_ms); + let to = from.to_milis(); + + assert_eq!(to, time_ms); + } + + #[test] + #[should_panic] + fn timestamp_millis_out_of_range() { + let time_ms = u64::MAX - 1; + + let from = Timestamp::from_milis(time_ms); + } + + #[test] + #[should_panic] + fn timestamp_negative() { + let ts = Timestamp { + nanos: 1000, + seconds: -12, + }; + + let to = ts.to_milis(); + } } diff --git a/proto/tests/unit.rs b/proto/tests/unit.rs index f820160b..e0fdfeb2 100644 --- a/proto/tests/unit.rs +++ b/proto/tests/unit.rs @@ -1,8 +1,9 @@ +#[cfg(not(feature = "std"))] use core::convert::TryFrom; use tenderdash_proto::{ abci::ResponseException, - types::{BlockId as RawBlockId, PartSetHeader as RawPartSetHeader}, + types::{BlockId as RawBlockId, ConsensusParams, PartSetHeader as RawPartSetHeader}, Protobuf, }; @@ -133,3 +134,44 @@ pub fn test_response_exception_from() { "string" ); } + +#[test] +pub fn test_consensus_params_serde() { + let json = r#" + { + "block": { + "max_bytes": "2097152", + "max_gas": "500000000" + }, + "evidence": { + "max_age_num_blocks": "10000", + "max_age_duration": "172800000000000", + "max_bytes": "0" + }, + "validator": { + "pub_key_types": [ + "bls12381" + ] + }, + "version": { + "app_version": "1" + }, + "synchrony": { + "precision": "500000000", + "message_delay": "60000000000" + }, + "timeout": { + "propose": "40000000000", + "propose_delta": "5000000000", + "vote": "40000000000", + "vote_delta": "5000000000", + "bypass_commit_timeout": true + }, + "abci": { + "recheck_tx": true + } + } + "#; + + let _new_params: ConsensusParams = serde_json::from_str(json).unwrap(); +} diff --git a/scripts/release.sh b/scripts/release.sh new file mode 100755 index 00000000..d4df98c6 --- /dev/null +++ b/scripts/release.sh @@ -0,0 +1,90 @@ +#! /bin/bash + +PLATFORM_DIR="$(realpath "$(dirname "$0")/../../platform")" + +function help() { + cat < [-a|--abci] + +Arguments: + - the version of Tenderdash to use + - the version of this library (rs-tenderdash-abci) + +Examples: + + ./scripts/release.sh -t 0.14.0-dev.2 -a 0.14.0-dev.6 + ./scripts/release.sh -t 0.14.5 -a 0.14.12 + + +EOF +} + +# Parse arguments +while [[ $# -gt 0 ]]; do + key="$1" + case $key in + -h | --help) + help + exit 0 + ;; + -t | --tenderdash) + shift + if [ -z "$1" ]; then + echo "Please specify the version of Tenderdash." + exit 1 + fi + td_version=$1 + shift + ;; + -a | --abci) + shift + if [ -z "$1" ]; then + echo "Please specify the version of the library." + exit 1 + fi + rs_tenderdash_abci_version=$1 + shift + ;; + *) + break + ;; + esac +done + +# Check if the versions are passed. +if [ -z "$td_version" ]; then + echo "Please specify the version of Tenderdash." + exit 1 +fi +td_version=${td_version#v} # remove 'v' if it exists + +if [ -z "$rs_tenderdash_abci_version" ]; then + echo "Please specify the version of the library." + exit 1 +fi + +rs_tenderdash_abci_version=${rs_tenderdash_abci_version#v} # remove 'v' if it exists + +set -ex +# Update the version in the Cargo.toml files. +sed -i "s/^version = .*/version = \"$rs_tenderdash_abci_version\"/" ./*/Cargo.toml +sed -i "s/^\s*const DEFAULT_VERSION: &str = \".*\";/const DEFAULT_VERSION: \&str = \"v$td_version\";/" ./proto/build.rs +cargo fmt -- ./proto/build.rs 2>/dev/null + +if [ -d "$PLATFORM_DIR" ]; then + rs_tenderdash="git = \"https:\/\/github.com\/dashpay\/rs-tenderdash-abci\", version = \"$rs_tenderdash_abci_version\"" + echo "INFO: Updating references to tenderdash-abci / tenderdash-proto in $PLATFORM_DIR" + + sed -i "s/^tenderdash-abci = { git = .*, version = [^,\}]*/tenderdash-abci = { $rs_tenderdash/" "${PLATFORM_DIR}"/packages/*/Cargo.toml + sed -i "s/^tenderdash-proto = { git = .*, version = [^,\}]*/tenderdash-proto = { $rs_tenderdash/" "${PLATFORM_DIR}"/packages/*/Cargo.toml +else + echo "WARN: Dash Platform not found in $PLATFORM_DIR, skipping" +fi +# tenderdash-proto = { git = "https://github.com/dashpay/rs-tenderdash-abci", version = "0.14.0-dev.8", features = [