From 932966071a68b07f2275a80e4d2397050b50dcf1 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 31 Mar 2020 16:31:05 +0200 Subject: [PATCH 1/4] Switch to the new protocol --- client/network/src/behaviour.rs | 4 +- client/network/src/config.rs | 2 +- client/network/src/on_demand_layer.rs | 116 ++++++-- client/network/src/protocol.rs | 269 +++-------------- .../src/protocol/light_client_handler.rs | 274 +++++++++++++----- client/network/src/service.rs | 67 +++-- 6 files changed, 393 insertions(+), 339 deletions(-) diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index e7aca1975cd0d..b753270c28776 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -133,7 +133,6 @@ impl Behaviour { } /// Issue a light client request. - #[allow(unused)] pub fn light_client_request(&mut self, r: light_client_handler::Request) -> Result<(), light_client_handler::Error> { self.light_client_handler.request(r) } @@ -175,6 +174,9 @@ Behaviour { let ev = Event::NotificationsReceived { remote, messages }; self.events.push(BehaviourOut::Event(ev)); }, + CustomMessageOutcome::PeerNewBest(peer_id, number) => { + self.light_client_handler.update_best_block(&peer_id, number); + } CustomMessageOutcome::None => {} } } diff --git a/client/network/src/config.rs b/client/network/src/config.rs index 3f73d761ce879..b8031654df942 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -20,7 +20,7 @@ //! See the documentation of [`Params`]. pub use crate::chain::{Client, FinalityProofProvider}; -pub use crate::on_demand_layer::OnDemand; +pub use crate::on_demand_layer::{AlwaysBadChecker, OnDemand}; pub use crate::service::{TransactionPool, EmptyTransactionPool}; pub use libp2p::{identity, core::PublicKey, wasm_ext::ExtTransport, build_multiaddr}; diff --git a/client/network/src/on_demand_layer.rs b/client/network/src/on_demand_layer.rs index d672ed0b7f569..822901e67737a 100644 --- a/client/network/src/on_demand_layer.rs +++ b/client/network/src/on_demand_layer.rs @@ -16,16 +16,17 @@ //! On-demand requests service. -use crate::protocol::light_dispatch::RequestData; -use std::{collections::HashMap, pin::Pin, sync::Arc, task::Context, task::Poll}; -use futures::{prelude::*, channel::mpsc, channel::oneshot}; +use crate::protocol::light_client_handler; + +use futures::{channel::mpsc, channel::oneshot, prelude::*}; use parking_lot::Mutex; -use sp_blockchain::Error as ClientError; use sc_client_api::{ - Fetcher, FetchChecker, RemoteHeaderRequest, RemoteCallRequest, RemoteReadRequest, - RemoteChangesRequest, RemoteReadChildRequest, RemoteBodyRequest, + FetchChecker, Fetcher, RemoteBodyRequest, RemoteCallRequest, RemoteChangesRequest, + RemoteHeaderRequest, RemoteReadChildRequest, RemoteReadRequest, StorageProof, ChangesProof, }; +use sp_blockchain::Error as ClientError; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; +use std::{collections::HashMap, pin::Pin, sync::Arc, task::Context, task::Poll}; /// Implements the `Fetcher` trait of the client. Makes it possible for the light client to perform /// network requests for some state. @@ -41,13 +42,72 @@ pub struct OnDemand { /// Note that a better alternative would be to use a MPMC queue here, and add a `poll` method /// from the `OnDemand`. However there exists no popular implementation of MPMC channels in /// asynchronous Rust at the moment - requests_queue: Mutex>>>, + requests_queue: Mutex>>>, /// Sending side of `requests_queue`. - requests_send: mpsc::UnboundedSender>, + requests_send: mpsc::UnboundedSender>, +} + +/// Dummy implementation of `FetchChecker` that always assumes that responses are bad. +/// +/// Considering that it is the responsibility of the client to build the fetcher, it can use this +/// implementation if it knows that it will never perform any request. +#[derive(Default, Clone)] +pub struct AlwaysBadChecker; + +impl FetchChecker for AlwaysBadChecker { + fn check_header_proof( + &self, + _request: &RemoteHeaderRequest, + _remote_header: Option, + _remote_proof: StorageProof, + ) -> Result { + Err(ClientError::Msg("AlwaysBadChecker".into())) + } + + fn check_read_proof( + &self, + _request: &RemoteReadRequest, + _remote_proof: StorageProof, + ) -> Result,Option>>, ClientError> { + Err(ClientError::Msg("AlwaysBadChecker".into())) + } + + fn check_read_child_proof( + &self, + _request: &RemoteReadChildRequest, + _remote_proof: StorageProof, + ) -> Result, Option>>, ClientError> { + Err(ClientError::Msg("AlwaysBadChecker".into())) + } + + fn check_execution_proof( + &self, + _request: &RemoteCallRequest, + _remote_proof: StorageProof, + ) -> Result, ClientError> { + Err(ClientError::Msg("AlwaysBadChecker".into())) + } + + fn check_changes_proof( + &self, + _request: &RemoteChangesRequest, + _remote_proof: ChangesProof + ) -> Result, u32)>, ClientError> { + Err(ClientError::Msg("AlwaysBadChecker".into())) + } + + fn check_body_proof( + &self, + _request: &RemoteBodyRequest, + _body: Vec + ) -> Result, ClientError> { + Err(ClientError::Msg("AlwaysBadChecker".into())) + } } -impl OnDemand where +impl OnDemand +where B::Header: HeaderT, { /// Creates new on-demand service. @@ -74,12 +134,15 @@ impl OnDemand where /// /// If this function returns `None`, that means that the receiver has already been extracted in /// the past, and therefore that something already handles the requests. - pub(crate) fn extract_receiver(&self) -> Option>> { + pub(crate) fn extract_receiver( + &self, + ) -> Option>> { self.requests_queue.lock().take() } } -impl Fetcher for OnDemand where +impl Fetcher for OnDemand +where B: BlockT, B::Header: HeaderT, { @@ -91,40 +154,55 @@ impl Fetcher for OnDemand where fn remote_header(&self, request: RemoteHeaderRequest) -> Self::RemoteHeaderResult { let (sender, receiver) = oneshot::channel(); - let _ = self.requests_send.unbounded_send(RequestData::RemoteHeader(request, sender)); + let _ = self + .requests_send + .unbounded_send(light_client_handler::Request::Header { request, sender }); RemoteResponse { receiver } } fn remote_read(&self, request: RemoteReadRequest) -> Self::RemoteReadResult { let (sender, receiver) = oneshot::channel(); - let _ = self.requests_send.unbounded_send(RequestData::RemoteRead(request, sender)); + let _ = self + .requests_send + .unbounded_send(light_client_handler::Request::Read { request, sender }); RemoteResponse { receiver } } fn remote_read_child( &self, - request: RemoteReadChildRequest + request: RemoteReadChildRequest, ) -> Self::RemoteReadResult { let (sender, receiver) = oneshot::channel(); - let _ = self.requests_send.unbounded_send(RequestData::RemoteReadChild(request, sender)); + let _ = self + .requests_send + .unbounded_send(light_client_handler::Request::ReadChild { request, sender }); RemoteResponse { receiver } } fn remote_call(&self, request: RemoteCallRequest) -> Self::RemoteCallResult { let (sender, receiver) = oneshot::channel(); - let _ = self.requests_send.unbounded_send(RequestData::RemoteCall(request, sender)); + let _ = self + .requests_send + .unbounded_send(light_client_handler::Request::Call { request, sender }); RemoteResponse { receiver } } - fn remote_changes(&self, request: RemoteChangesRequest) -> Self::RemoteChangesResult { + fn remote_changes( + &self, + request: RemoteChangesRequest, + ) -> Self::RemoteChangesResult { let (sender, receiver) = oneshot::channel(); - let _ = self.requests_send.unbounded_send(RequestData::RemoteChanges(request, sender)); + let _ = self + .requests_send + .unbounded_send(light_client_handler::Request::Changes { request, sender }); RemoteResponse { receiver } } fn remote_body(&self, request: RemoteBodyRequest) -> Self::RemoteBodyResult { let (sender, receiver) = oneshot::channel(); - let _ = self.requests_send.unbounded_send(RequestData::RemoteBody(request, sender)); + let _ = self + .requests_send + .unbounded_send(light_client_handler::Request::Body { request, sender }); RemoteResponse { receiver } } } diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 55bc40a95047e..e5c6b2328e2b3 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -38,21 +38,20 @@ use sp_runtime::traits::{ Block as BlockT, Header as HeaderT, NumberFor, One, Zero, CheckedSub }; use sp_arithmetic::traits::SaturatedConversion; -use message::{BlockAnnounce, BlockAttributes, Direction, FromBlock, Message, RequestId}; +use message::{BlockAnnounce, Message}; use message::generic::{Message as GenericMessage, ConsensusMessage}; -use light_dispatch::{LightDispatch, LightDispatchNetwork, RequestData}; use prometheus_endpoint::{Registry, Gauge, GaugeVec, PrometheusError, Opts, register, U64}; use sync::{ChainSync, SyncState}; use crate::service::{TransactionPool, ExHashT}; use crate::config::{BoxFinalityProofRequestBuilder, Roles}; use std::borrow::Cow; -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; use std::sync::Arc; use std::fmt::Write; use std::{cmp, num::NonZeroUsize, pin::Pin, task::Poll, time}; use log::{log, Level, trace, debug, warn, error}; use crate::chain::{Client, FinalityProofProvider}; -use sc_client_api::{FetchChecker, ChangesProof, StorageProof}; +use sc_client_api::{ChangesProof, StorageProof}; use crate::error; use util::LruHashSet; use wasm_timer::Instant; @@ -74,7 +73,6 @@ pub mod block_requests; pub mod message; pub mod event; pub mod light_client_handler; -pub mod light_dispatch; pub mod sync; pub use block_requests::BlockRequests; @@ -201,9 +199,9 @@ pub struct Protocol { tick_timeout: Pin + Send>>, /// Interval at which we call `propagate_extrinsics`. propagate_timeout: Pin + Send>>, + /// Pending list of messages to return from `poll` as a priority. + pending_messages: VecDeque>, config: ProtocolConfig, - /// Handler for light client requests. - light_dispatch: LightDispatch, genesis_hash: B::Hash, sync: ChainSync, context_data: ContextData, @@ -276,132 +274,6 @@ pub struct PeerInfo { pub best_number: ::Number, } -struct LightDispatchIn<'a> { - behaviour: &'a mut GenericProto, - peerset: sc_peerset::PeersetHandle, -} - -impl<'a, B: BlockT> LightDispatchNetwork for LightDispatchIn<'a> { - fn report_peer(&mut self, who: &PeerId, reputation: sc_peerset::ReputationChange) { - self.peerset.report_peer(who.clone(), reputation) - } - - fn disconnect_peer(&mut self, who: &PeerId) { - self.behaviour.disconnect_peer(who) - } - - fn send_header_request(&mut self, who: &PeerId, id: RequestId, block: <::Header as HeaderT>::Number) { - let message: Message = message::generic::Message::RemoteHeaderRequest(message::RemoteHeaderRequest { - id, - block, - }); - - self.behaviour.send_packet(who, message.encode()) - } - - fn send_read_request( - &mut self, - who: &PeerId, - id: RequestId, - block: ::Hash, - keys: Vec>, - ) { - let message: Message = message::generic::Message::RemoteReadRequest(message::RemoteReadRequest { - id, - block, - keys, - }); - - self.behaviour.send_packet(who, message.encode()) - } - - fn send_read_child_request( - &mut self, - who: &PeerId, - id: RequestId, - block: ::Hash, - storage_key: Vec, - child_info: Vec, - child_type: u32, - keys: Vec>, - ) { - let message: Message = message::generic::Message::RemoteReadChildRequest(message::RemoteReadChildRequest { - id, - block, - storage_key, - child_info, - child_type, - keys, - }); - - self.behaviour.send_packet(who, message.encode()) - } - - fn send_call_request( - &mut self, - who: &PeerId, - id: RequestId, - block: ::Hash, - method: String, - data: Vec - ) { - let message: Message = message::generic::Message::RemoteCallRequest(message::RemoteCallRequest { - id, - block, - method, - data, - }); - - self.behaviour.send_packet(who, message.encode()) - } - - fn send_changes_request( - &mut self, - who: &PeerId, - id: RequestId, - first: ::Hash, - last: ::Hash, - min: ::Hash, - max: ::Hash, - storage_key: Option>, - key: Vec, - ) { - let message: Message = message::generic::Message::RemoteChangesRequest(message::RemoteChangesRequest { - id, - first, - last, - min, - max, - storage_key, - key, - }); - - self.behaviour.send_packet(who, message.encode()) - } - - fn send_body_request( - &mut self, - who: &PeerId, - id: RequestId, - fields: BlockAttributes, - from: FromBlock<::Hash, <::Header as HeaderT>::Number>, - to: Option<::Hash>, - direction: Direction, - max: Option - ) { - let message: Message = message::generic::Message::BlockRequest(message::BlockRequest:: { - id, - fields, - from, - to, - direction, - max, - }); - - self.behaviour.send_packet(who, message.encode()) - } -} - /// Data necessary to create a context. struct ContextData { // All connected peers @@ -444,7 +316,6 @@ impl Protocol { pub fn new( config: ProtocolConfig, chain: Arc>, - checker: Arc>, transaction_pool: Arc>, finality_proof_provider: Option>>, finality_proof_request_builder: Option>, @@ -500,13 +371,13 @@ impl Protocol { let protocol = Protocol { tick_timeout: Box::pin(interval(TICK_TIMEOUT)), propagate_timeout: Box::pin(interval(PROPAGATE_TIMEOUT)), + pending_messages: VecDeque::new(), config, context_data: ContextData { peers: HashMap::new(), stats: HashMap::new(), chain, }, - light_dispatch: LightDispatch::new(checker), genesis_hash: info.genesis_hash, sync, handshaking_peers: HashMap::new(), @@ -609,20 +480,6 @@ impl Protocol { self.sync.num_sync_requests() } - /// Starts a new data demand request. - /// - /// The parameter contains a `Sender` where the result, once received, must be sent. - pub(crate) fn add_light_client_request(&mut self, rq: RequestData) { - self.light_dispatch.add_request(LightDispatchIn { - behaviour: &mut self.behaviour, - peerset: self.peerset_handle.clone(), - }, rq); - } - - fn is_light_response(&self, who: &PeerId, response_id: message::RequestId) -> bool { - self.light_dispatch.is_light_response(&who, response_id) - } - fn handle_response( &mut self, who: PeerId, @@ -682,15 +539,10 @@ impl Protocol { GenericMessage::Status(s) => return self.on_status_message(who, s), GenericMessage::BlockRequest(r) => self.on_block_request(who, r), GenericMessage::BlockResponse(r) => { - // Note, this is safe because only `ordinary bodies` and `remote bodies` are received in this matter. - if self.is_light_response(&who, r.id) { - self.on_remote_body_response(who, r); - } else { - if let Some(request) = self.handle_response(who.clone(), &r) { - let outcome = self.on_block_response(who.clone(), request, r); - self.update_peer_info(&who); - return outcome - } + if let Some(request) = self.handle_response(who.clone(), &r) { + let outcome = self.on_block_response(who.clone(), request, r); + self.update_peer_info(&who); + return outcome } }, GenericMessage::BlockAnnounce(announce) => { @@ -805,10 +657,6 @@ impl Protocol { }; if let Some(_peer_data) = removed { self.sync.peer_disconnected(peer.clone()); - self.light_dispatch.on_disconnect(LightDispatchIn { - behaviour: &mut self.behaviour, - peerset: self.peerset_handle.clone(), - }, &peer); // Notify all the notification protocols as closed. CustomMessageOutcome::NotificationStreamClosed { @@ -989,10 +837,6 @@ impl Protocol { /// > **Note**: This method normally doesn't have to be called except for testing purposes. pub fn tick(&mut self) { self.maintain_peers(); - self.light_dispatch.maintain_peers(LightDispatchIn { - behaviour: &mut self.behaviour, - peerset: self.peerset_handle.clone(), - }); self.report_metrics() } @@ -1140,10 +984,7 @@ impl Protocol { }; let info = self.context_data.peers.get(&who).expect("We just inserted above; QED").info.clone(); - self.light_dispatch.on_connect(LightDispatchIn { - behaviour: &mut self.behaviour, - peerset: self.peerset_handle.clone(), - }, who.clone(), status.roles, status.best_number); + self.pending_messages.push_back(CustomMessageOutcome::PeerNewBest(who.clone(), status.best_number)); if info.roles.is_full() { match self.sync.new_peer(who.clone(), info.best_hash, info.best_number) { Ok(None) => (), @@ -1408,13 +1249,11 @@ impl Protocol { announce: BlockAnnounce, ) -> CustomMessageOutcome { let hash = announce.header.hash(); + let number = *announce.header.number(); + if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) { peer.known_blocks.insert(hash.clone()); } - self.light_dispatch.update_best_number(LightDispatchIn { - behaviour: &mut self.behaviour, - peerset: self.peerset_handle.clone(), - }, who.clone(), *announce.header.number()); let is_their_best = match announce.state.unwrap_or(message::BlockState::Best) { message::BlockState::Best => true, @@ -1429,7 +1268,11 @@ impl Protocol { // 1) we're on light client; // AND // 2) parent block is already imported and not pruned. - return CustomMessageOutcome::None + if is_their_best { + return CustomMessageOutcome::PeerNewBest(who, number); + } else { + return CustomMessageOutcome::None; + } } sync::OnBlockAnnounce::ImportHeader => () // We proceed with the import. } @@ -1454,15 +1297,28 @@ impl Protocol { }, ); match blocks_to_import { - Ok(sync::OnBlockData::Import(origin, blocks)) => CustomMessageOutcome::BlockImport(origin, blocks), + Ok(sync::OnBlockData::Import(origin, blocks)) => { + if is_their_best { + self.pending_messages.push_back(CustomMessageOutcome::PeerNewBest(who, number)); + } + CustomMessageOutcome::BlockImport(origin, blocks) + }, Ok(sync::OnBlockData::Request(peer, req)) => { self.send_request(&peer, GenericMessage::BlockRequest(req)); - CustomMessageOutcome::None + if is_their_best { + CustomMessageOutcome::PeerNewBest(who, number) + } else { + CustomMessageOutcome::None + } } Err(sync::BadPeer(id, repu)) => { self.behaviour.disconnect_peer(&id); self.peerset_handle.report_peer(id, repu); - CustomMessageOutcome::None + if is_their_best { + CustomMessageOutcome::PeerNewBest(who, number) + } else { + CustomMessageOutcome::None + } } } } @@ -1592,14 +1448,9 @@ impl Protocol { fn on_remote_call_response( &mut self, - who: PeerId, - response: message::RemoteCallResponse + _: PeerId, + _: message::RemoteCallResponse ) { - trace!(target: "sync", "Remote call response {} from {}", response.id, who); - self.light_dispatch.on_remote_call_response(LightDispatchIn { - behaviour: &mut self.behaviour, - peerset: self.peerset_handle.clone(), - }, who, response); } fn on_remote_read_request( @@ -1718,14 +1569,9 @@ impl Protocol { fn on_remote_read_response( &mut self, - who: PeerId, - response: message::RemoteReadResponse + _: PeerId, + _: message::RemoteReadResponse ) { - trace!(target: "sync", "Remote read response {} from {}", response.id, who); - self.light_dispatch.on_remote_read_response(LightDispatchIn { - behaviour: &mut self.behaviour, - peerset: self.peerset_handle.clone(), - }, who, response); } fn on_remote_header_request( @@ -1760,14 +1606,9 @@ impl Protocol { fn on_remote_header_response( &mut self, - who: PeerId, - response: message::RemoteHeaderResponse, + _: PeerId, + _: message::RemoteHeaderResponse, ) { - trace!(target: "sync", "Remote header proof response {} from {}", response.id, who); - self.light_dispatch.on_remote_header_response(LightDispatchIn { - behaviour: &mut self.behaviour, - peerset: self.peerset_handle.clone(), - }, who, response); } fn on_remote_changes_request( @@ -1833,18 +1674,9 @@ impl Protocol { fn on_remote_changes_response( &mut self, - who: PeerId, - response: message::RemoteChangesResponse, B::Hash>, + _: PeerId, + _: message::RemoteChangesResponse, B::Hash>, ) { - trace!(target: "sync", "Remote changes proof response {} from {} (max={})", - response.id, - who, - response.max - ); - self.light_dispatch.on_remote_changes_response(LightDispatchIn { - behaviour: &mut self.behaviour, - peerset: self.peerset_handle.clone(), - }, who, response); } fn on_finality_proof_request( @@ -1898,17 +1730,6 @@ impl Protocol { } } - fn on_remote_body_response( - &mut self, - peer: PeerId, - response: message::BlockResponse - ) { - self.light_dispatch.on_remote_body_response(LightDispatchIn { - behaviour: &mut self.behaviour, - peerset: self.peerset_handle.clone(), - }, peer, response); - } - fn format_stats(&self) -> String { let mut out = String::new(); for (id, stats) in &self.context_data.stats { @@ -1980,6 +1801,8 @@ pub enum CustomMessageOutcome { NotificationStreamClosed { remote: PeerId, protocols: Vec }, /// Messages have been received on one or more notifications protocols. NotificationsReceived { remote: PeerId, messages: Vec<(ConsensusEngineId, Bytes)> }, + /// Peer has a reported a new head of chain. + PeerNewBest(PeerId, NumberFor), None, } @@ -2060,6 +1883,10 @@ impl NetworkBehaviour for Protocol { Self::OutEvent > > { + if let Some(message) = self.pending_messages.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message)); + } + while let Poll::Ready(Some(())) = self.tick_timeout.poll_next_unpin(cx) { self.tick(); } diff --git a/client/network/src/protocol/light_client_handler.rs b/client/network/src/protocol/light_client_handler.rs index c96c5d0818573..d4b0c01c8c41f 100644 --- a/client/network/src/protocol/light_client_handler.rs +++ b/client/network/src/protocol/light_client_handler.rs @@ -29,7 +29,7 @@ use codec::{self, Encode, Decode}; use crate::{ chain::Client, config::ProtocolId, - protocol::{api, light_dispatch::TIMEOUT_REPUTATION_CHANGE} + protocol::{api, message::BlockAttributes} }; use futures::{channel::oneshot, future::BoxFuture, prelude::*, stream::FuturesUnordered}; use libp2p::{ @@ -74,38 +74,52 @@ use std::{ use void::Void; use wasm_timer::Instant; +/// Reputation change for a peer when a request timed out. +pub(crate) const TIMEOUT_REPUTATION_CHANGE: i32 = -(1 << 8); + /// Configuration options for `LightClientHandler` behaviour. #[derive(Debug, Clone)] pub struct Config { - max_data_size: usize, + max_request_size: usize, + max_response_size: usize, max_pending_requests: usize, inactivity_timeout: Duration, request_timeout: Duration, - protocol: Bytes, + light_protocol: Bytes, + block_protocol: Bytes, } impl Config { /// Create a fresh configuration with the following options: /// - /// - max. data size = 1 MiB + /// - max. request size = 1 MiB + /// - max. response size = 16 MiB /// - max. pending requests = 128 /// - inactivity timeout = 15s /// - request timeout = 15s pub fn new(id: &ProtocolId) -> Self { let mut c = Config { - max_data_size: 1024 * 1024, + max_request_size: 1 * 1024 * 1024, + max_response_size: 16 * 1024 * 1024, max_pending_requests: 128, inactivity_timeout: Duration::from_secs(15), request_timeout: Duration::from_secs(15), - protocol: Bytes::new(), + light_protocol: Bytes::new(), + block_protocol: Bytes::new(), }; c.set_protocol(id); c } - /// Limit the max. length of incoming request bytes. - pub fn set_max_data_size(&mut self, v: usize) -> &mut Self { - self.max_data_size = v; + /// Limit the max. length in bytes of a request. + pub fn set_max_request_size(&mut self, v: usize) -> &mut Self { + self.max_request_size = v; + self + } + + /// Limit the max. length in bytes of a response. + pub fn set_max_response_size(&mut self, v: usize) -> &mut Self { + self.max_response_size = v; self } @@ -129,11 +143,18 @@ impl Config { /// Set protocol to use for upgrade negotiation. pub fn set_protocol(&mut self, id: &ProtocolId) -> &mut Self { - let mut v = Vec::new(); - v.extend_from_slice(b"/"); - v.extend_from_slice(id.as_bytes()); - v.extend_from_slice(b"/light/2"); - self.protocol = v.into(); + let mut vl = Vec::new(); + vl.extend_from_slice(b"/"); + vl.extend_from_slice(id.as_bytes()); + vl.extend_from_slice(b"/light/2"); + self.light_protocol = vl.into(); + + let mut vb = Vec::new(); + vb.extend_from_slice(b"/"); + vb.extend_from_slice(id.as_bytes()); + vb.extend_from_slice(b"/sync/2"); + self.block_protocol = vb.into(); + self } } @@ -167,6 +188,10 @@ pub enum Error { // used because we currently only support a subset of those. #[derive(Debug)] pub enum Request { + Body { + request: fetcher::RemoteBodyRequest, + sender: oneshot::Sender, ClientError>> + }, Header { request: fetcher::RemoteHeaderRequest, sender: oneshot::Sender> @@ -199,7 +224,8 @@ enum Reply { VecU8(Vec), VecNumberU32(Vec<(::Number, u32)>), MapVecU8OptVecU8(HashMap, Option>>), - Header(B::Header) + Header(B::Header), + Extrinsics(Vec), } /// Augments a light client request with metadata. @@ -282,6 +308,7 @@ where /// means to determine it ourselves. pub fn update_best_block(&mut self, peer: &PeerId, num: NumberFor) { if let Some(info) = self.peers.get_mut(peer) { + log::trace!("new best block for {:?}: {:?}", peer, num); info.best_block = Some(num) } } @@ -351,10 +378,23 @@ where ( &mut self , peer: &PeerId , request: &Request - , response: api::v1::light::Response + , response: Response ) -> Result, Error> { log::trace!("response from {}", peer); + match response { + Response::Light(r) => self.on_response_light(peer, request, r), + Response::Block(r) => self.on_response_block(peer, request, r), + } + } + + fn on_response_light + ( &mut self + , peer: &PeerId + , request: &Request + , response: api::v1::light::Response + ) -> Result, Error> + { use api::v1::light::response::Response; match response.response { Some(Response::RemoteCallResponse(response)) => @@ -420,6 +460,32 @@ where } } + fn on_response_block + ( &mut self + , peer: &PeerId + , request: &Request + , response: api::v1::BlockResponse + ) -> Result, Error> + { + let request = if let Request::Body { request , .. } = &request { + request + } else { + return Err(Error::UnexpectedResponse); + }; + + let body: Vec<_> = match response.blocks.into_iter().next() { + Some(b) => b.body, + None => return Err(Error::UnexpectedResponse), + }; + + let body = body.into_iter() + .map(|mut extrinsic| B::Extrinsic::decode(&mut &extrinsic[..])) + .collect::>()?; + + let body = self.checker.check_body_proof(&request, body)?; + Ok(Reply::Extrinsics(body)) + } + fn on_remote_call_request ( &mut self , peer: &PeerId @@ -654,8 +720,8 @@ where fn new_handler(&mut self) -> Self::ProtocolsHandler { let p = InboundProtocol { - max_data_size: self.config.max_data_size, - protocol: self.config.protocol.clone(), + max_request_size: self.config.max_request_size, + protocol: self.config.light_protocol.clone(), }; OneShotHandler::new(SubstreamProtocol::new(p), self.config.inactivity_timeout) } @@ -830,30 +896,40 @@ where } }; if let Some(peer) = available_peer { - let rq = serialize_request(&request.request); - let mut buf = Vec::with_capacity(rq.encoded_len()); - if let Err(e) = rq.encode(&mut buf) { - log::debug!("failed to serialize request: {}", e); - send_reply(Err(ClientError::RemoteFetchFailed), request.request) - } else { - let id = self.next_request_id(); - log::trace!("sending request {} to peer {}", id, peer); - let protocol = OutboundProtocol { - request: buf, - request_id: id, - max_data_size: self.config.max_data_size, - protocol: self.config.protocol.clone(), - }; - self.peers.get_mut(&peer).map(|info| info.status = PeerStatus::BusyWith(id)); - let rw = RequestWrapper { - timestamp: request.timestamp, - retries: request.retries, - request: request.request, - peer: peer.clone(), - }; - self.outstanding.insert(id, rw); - return Poll::Ready(NetworkBehaviourAction::SendEvent { peer_id: peer, event: protocol }) - } + let buf = match serialize_request(&request.request) { + Ok(b) => b, + Err(e) => { + log::debug!("failed to serialize request: {}", e); + send_reply(Err(ClientError::RemoteFetchFailed), request.request); + continue; + } + }; + + let id = self.next_request_id(); + log::trace!("sending request {} to peer {}", id, peer); + let protocol = OutboundProtocol { + request: buf, + request_id: id, + expected: match request.request { + Request::Body { .. } => ExpectedResponseTy::Block, + _ => ExpectedResponseTy::Light, + }, + max_response_size: self.config.max_response_size, + protocol: match request.request { + Request::Body { .. } => self.config.block_protocol.clone(), + _ => self.config.light_protocol.clone(), + }, + }; + self.peers.get_mut(&peer).map(|info| info.status = PeerStatus::BusyWith(id)); + let rw = RequestWrapper { + timestamp: request.timestamp, + retries: request.retries, + request: request.request, + peer: peer.clone(), + }; + self.outstanding.insert(id, rw); + return Poll::Ready(NetworkBehaviourAction::SendEvent { peer_id: peer, event: protocol }) + } else { self.pending_requests.push_front(request); log::debug!("no peer available to send request to"); @@ -894,6 +970,7 @@ where fn required_block(request: &Request) -> NumberFor { match request { + Request::Body { request, .. } => *request.header.number(), Request::Header { request, .. } => request.block, Request::Read { request, .. } => *request.header.number(), Request::ReadChild { request, .. } => *request.header.number(), @@ -904,6 +981,7 @@ fn required_block(request: &Request) -> NumberFor { fn retries(request: &Request) -> usize { let rc = match request { + Request::Body { request, .. } => request.retry_count, Request::Header { request, .. } => request.retry_count, Request::Read { request, .. } => request.retry_count, Request::ReadChild { request, .. } => request.retry_count, @@ -913,8 +991,20 @@ fn retries(request: &Request) -> usize { rc.unwrap_or(0) } -fn serialize_request(request: &Request) -> api::v1::light::Request { +fn serialize_request(request: &Request) -> Result, prost::EncodeError> { let request = match request { + Request::Body { request, .. } => { + let rq = api::v1::BlockRequest { + fields: u32::from(BlockAttributes::BODY.bits()), + from_block: Some(api::v1::block_request::FromBlock::Hash(request.header.hash().encode())), + to_block: Vec::new(), + direction: api::v1::Direction::Ascending as i32, + max_blocks: 1, + }; + let mut buf = Vec::with_capacity(rq.encoded_len()); + rq.encode(&mut buf)?; + return Ok(buf); + } Request::Header { request, .. } => { let r = api::v1::light::RemoteHeaderRequest { block: request.block.encode() }; api::v1::light::request::Request::RemoteHeaderRequest(r) @@ -957,7 +1047,10 @@ fn serialize_request(request: &Request) -> api::v1::light::Request } }; - api::v1::light::Request { request: Some(request) } + let rq = api::v1::light::Request { request: Some(request) }; + let mut buf = Vec::with_capacity(rq.encoded_len()); + rq.encode(&mut buf)?; + Ok(buf) } fn send_reply(result: Result, ClientError>, request: Request) { @@ -965,6 +1058,11 @@ fn send_reply(result: Result, ClientError>, request: Request< let _ = sender.send(item); // It is okay if the other end already hung up. } match request { + Request::Body { request, sender } => match result { + Err(e) => send(Err(e), sender), + Ok(Reply::Extrinsics(x)) => send(Ok(x), sender), + reply => log::error!("invalid reply for body request: {:?}, {:?}", reply, request), + } Request::Header { request, sender } => match result { Err(e) => send(Err(e), sender), Ok(Reply::Header(x)) => send(Ok(x), sender), @@ -999,7 +1097,16 @@ pub enum Event { /// Incoming request from remote and substream to use for the response. Request(api::v1::light::Request, T), /// Incoming response from remote. - Response(u64, api::v1::light::Response), + Response(u64, Response), +} + +/// Incoming response from remote. +#[derive(Debug, Clone)] +pub enum Response { + /// Incoming light response from remote. + Light(api::v1::light::Response), + /// Incoming block response from remote. + Block(api::v1::BlockResponse), } /// Substream upgrade protocol. @@ -1008,31 +1115,31 @@ pub enum Event { #[derive(Debug, Clone)] pub struct InboundProtocol { /// The max. request length in bytes. - max_data_size: usize, + max_request_size: usize, /// The protocol to use for upgrade negotiation. protocol: Bytes, } impl UpgradeInfo for InboundProtocol { - type Info = Bytes; - type InfoIter = iter::Once; + type Info = Bytes; + type InfoIter = iter::Once; - fn protocol_info(&self) -> Self::InfoIter { - iter::once(self.protocol.clone()) - } + fn protocol_info(&self) -> Self::InfoIter { + iter::once(self.protocol.clone()) + } } impl InboundUpgrade for InboundProtocol where T: AsyncRead + AsyncWrite + Unpin + Send + 'static { - type Output = Event; - type Error = ReadOneError; - type Future = BoxFuture<'static, Result>; + type Output = Event; + type Error = ReadOneError; + type Future = BoxFuture<'static, Result>; - fn upgrade_inbound(self, mut s: T, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, mut s: T, _: Self::Info) -> Self::Future { let future = async move { - let vec = read_one(&mut s, self.max_data_size).await?; + let vec = read_one(&mut s, self.max_request_size).await?; match api::v1::light::Request::decode(&vec[..]) { Ok(r) => Ok(Event::Request(r, s)), Err(e) => Err(ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e))) @@ -1051,38 +1158,59 @@ pub struct OutboundProtocol { request: Vec, /// Local identifier for the request. Used to associate it with a response. request_id: u64, - /// The max. request length in bytes. - max_data_size: usize, + /// Kind of response expected for this request. + expected: ExpectedResponseTy, + /// The max. response length in bytes. + max_response_size: usize, /// The protocol to use for upgrade negotiation. protocol: Bytes, } +/// Type of response expected from the remote for this request. +#[derive(Debug, Clone)] +enum ExpectedResponseTy { + Light, + Block, +} + impl UpgradeInfo for OutboundProtocol { - type Info = Bytes; - type InfoIter = iter::Once; + type Info = Bytes; + type InfoIter = iter::Once; - fn protocol_info(&self) -> Self::InfoIter { - iter::once(self.protocol.clone()) - } + fn protocol_info(&self) -> Self::InfoIter { + iter::once(self.protocol.clone()) + } } impl OutboundUpgrade for OutboundProtocol where T: AsyncRead + AsyncWrite + Unpin + Send + 'static { - type Output = Event; - type Error = ReadOneError; - type Future = BoxFuture<'static, Result>; + type Output = Event; + type Error = ReadOneError; + type Future = BoxFuture<'static, Result>; - fn upgrade_outbound(self, mut s: T, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, mut s: T, _: Self::Info) -> Self::Future { let future = async move { write_one(&mut s, &self.request).await?; - let vec = read_one(&mut s, self.max_data_size).await?; - api::v1::light::Response::decode(&vec[..]) - .map(|r| Event::Response(self.request_id, r)) - .map_err(|e| { - ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e)) - }) + let vec = read_one(&mut s, self.max_response_size).await?; + + match self.expected { + ExpectedResponseTy::Light => { + api::v1::light::Response::decode(&vec[..]) + .map(|r| Event::Response(self.request_id, Response::Light(r))) + .map_err(|e| { + ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e)) + }) + }, + ExpectedResponseTy::Block => { + api::v1::BlockResponse::decode(&vec[..]) + .map(|r| Event::Response(self.request_id, Response::Block(r))) + .map_err(|e| { + ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e)) + }) + } + } }; future.boxed() } diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 2c93d70e268bc..4625015c1ea49 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -25,31 +25,42 @@ //! The methods of the [`NetworkService`] are implemented by sending a message over a channel, //! which is then processed by [`NetworkWorker::poll`]. -use std::{borrow::Cow, collections::{HashMap, HashSet}, fs, marker::PhantomData, io, path::Path, str}; -use std::sync::{Arc, atomic::{AtomicBool, AtomicUsize, Ordering}}; -use std::pin::Pin; -use std::task::Poll; - -use sp_consensus::import_queue::{ImportQueue, Link}; -use sp_consensus::import_queue::{BlockImportResult, BlockImportError}; +use crate::{ + behaviour::{Behaviour, BehaviourOut}, + config::{parse_addr, parse_str_addr, NonReservedPeerMode, Params, TransportConfig}, + error::Error, + network_state::{ + NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer, + }, + on_demand_layer::AlwaysBadChecker, + protocol::{self, event::Event, light_client_handler, sync::SyncState, PeerInfo, Protocol}, + transport, ReputationChange, +}; use futures::{prelude::*, channel::mpsc}; -use log::{warn, error, info, trace}; -use libp2p::{PeerId, Multiaddr, kad::record}; use libp2p::swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}; +use libp2p::{kad::record, Multiaddr, PeerId}; +use log::{error, info, trace, warn}; use parking_lot::Mutex; +use prometheus_endpoint::{ + register, Counter, CounterVec, Gauge, GaugeVec, Opts, PrometheusError, Registry, U64, +}; use sc_peerset::PeersetHandle; -use sp_runtime::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId}; -use prometheus_endpoint::{Registry, Counter, CounterVec, Gauge, GaugeVec, Opts, U64, register, PrometheusError}; - -use crate::{behaviour::{Behaviour, BehaviourOut}, config::{parse_str_addr, parse_addr}}; -use crate::{transport, config::NonReservedPeerMode, ReputationChange}; -use crate::config::{Params, TransportConfig}; -use crate::error::Error; -use crate::network_state::{NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer}; -use crate::protocol::{self, Protocol, PeerInfo}; -use crate::protocol::{event::Event, light_dispatch::{AlwaysBadChecker, RequestData}}; -use crate::protocol::sync::SyncState; - +use sp_consensus::import_queue::{BlockImportError, BlockImportResult, ImportQueue, Link}; +use sp_runtime::{ + traits::{Block as BlockT, NumberFor}, + ConsensusEngineId, +}; +use std::{ + borrow::Cow, + collections::{HashMap, HashSet}, + fs, io, + marker::PhantomData, + path::Path, + pin::Pin, + str, + sync::{atomic::{AtomicBool, AtomicUsize, Ordering}, Arc}, + task::Poll, +}; /// Minimum Requirements for a Hash within Networking pub trait ExHashT: std::hash::Hash + Eq + std::fmt::Debug + Clone + Send + Sync + 'static {} @@ -240,7 +251,6 @@ impl NetworkWorker { max_parallel_downloads: params.network_config.max_parallel_downloads, }, params.chain.clone(), - checker.clone(), params.transaction_pool, params.finality_proof_provider.clone(), params.finality_proof_request_builder, @@ -773,7 +783,7 @@ pub struct NetworkWorker { /// Messages from the `NetworkService` and that must be processed. from_worker: mpsc::UnboundedReceiver>, /// Receiver for queries from the light client that must be processed. - light_client_rqs: Option>>, + light_client_rqs: Option>>, /// Senders for events that happen on the network. event_streams: Vec>, /// Prometheus network metrics. @@ -789,6 +799,7 @@ struct Metrics { import_queue_finality_proofs_submitted: Counter, import_queue_justifications_submitted: Counter, is_major_syncing: Gauge, + issued_light_requests: Counter, kbuckets_num_nodes: Gauge, network_per_sec_bytes: GaugeVec, notifications_total: CounterVec, @@ -822,6 +833,10 @@ impl Metrics { is_major_syncing: register(Gauge::new( "sub_libp2p_is_major_syncing", "Whether the node is performing a major sync or not.", )?, registry)?, + issued_light_requests: register(Counter::new( + "issued_light_requests", + "Number of light client requests that our node has issued.", + )?, registry)?, kbuckets_num_nodes: register(Gauge::new( "sub_libp2p_kbuckets_num_nodes", "Number of nodes in the Kademlia k-buckets" )?, registry)?, @@ -897,7 +912,11 @@ impl Future for NetworkWorker { // Check for new incoming light client requests. if let Some(light_client_rqs) = this.light_client_rqs.as_mut() { while let Poll::Ready(Some(rq)) = light_client_rqs.poll_next_unpin(cx) { - this.network_service.user_protocol_mut().add_light_client_request(rq); + // This can error if there are too many queued requests already. + let _ = this.network_service.light_client_request(rq); + if let Some(metrics) = this.metrics.as_ref() { + metrics.issued_light_requests.inc(); + } } } From d093c4971efcf8e983d97892fe659213da76c515 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 31 Mar 2020 16:32:35 +0200 Subject: [PATCH 2/4] Oops, forgot to remove light_dispatch.rs --- client/network/src/protocol/light_dispatch.rs | 1313 ----------------- 1 file changed, 1313 deletions(-) delete mode 100644 client/network/src/protocol/light_dispatch.rs diff --git a/client/network/src/protocol/light_dispatch.rs b/client/network/src/protocol/light_dispatch.rs deleted file mode 100644 index 39e90881fb0c6..0000000000000 --- a/client/network/src/protocol/light_dispatch.rs +++ /dev/null @@ -1,1313 +0,0 @@ -// Copyright 2017-2020 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -//! Light client requests service. -//! -//! Handles requests for data coming from our local light client and that must be answered by -//! nodes on the network. - -use std::collections::{HashMap, VecDeque}; -use std::sync::Arc; -use std::time::Duration; -use wasm_timer::Instant; -use log::{trace, info}; -use futures::channel::oneshot::{Sender as OneShotSender}; -use linked_hash_map::{Entry, LinkedHashMap}; -use sp_blockchain::Error as ClientError; -use sc_client_api::{FetchChecker, RemoteHeaderRequest, - RemoteCallRequest, RemoteReadRequest, RemoteChangesRequest, ChangesProof, - RemoteReadChildRequest, RemoteBodyRequest, StorageProof}; -use crate::protocol::message::{self, BlockAttributes, Direction, FromBlock, RequestId}; -use libp2p::PeerId; -use crate::config::Roles; -use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; -use sc_peerset::ReputationChange; - -/// Remote request timeout. -const REQUEST_TIMEOUT: Duration = Duration::from_secs(15); -/// Default request retry count. -const RETRY_COUNT: usize = 1; -/// Reputation change for a peer when a request timed out. -pub(crate) const TIMEOUT_REPUTATION_CHANGE: i32 = -(1 << 8); - -/// Trait used by the `LightDispatch` service to communicate messages back to the network. -pub trait LightDispatchNetwork { - /// Adjusts the reputation of the given peer. - fn report_peer(&mut self, who: &PeerId, reputation_change: ReputationChange); - - /// Disconnect from the given peer. Used in case of misbehaviour. - fn disconnect_peer(&mut self, who: &PeerId); - - /// Send to `who` a request for a header. - fn send_header_request(&mut self, who: &PeerId, id: RequestId, block: <::Header as HeaderT>::Number); - - /// Send to `who` a read request. - fn send_read_request( - &mut self, - who: &PeerId, - id: RequestId, - block: ::Hash, - keys: Vec>, - ); - - /// Send to `who` a child read request. - fn send_read_child_request( - &mut self, - who: &PeerId, - id: RequestId, - block: ::Hash, - storage_key: Vec, - child_info: Vec, - child_type: u32, - keys: Vec>, - ); - - /// Send to `who` a call request. - fn send_call_request( - &mut self, - who: &PeerId, - id: RequestId, - block: ::Hash, - method: String, - data: Vec - ); - - /// Send to `who` a changes request. - fn send_changes_request( - &mut self, - who: &PeerId, - id: RequestId, - first: ::Hash, - last: ::Hash, - min: ::Hash, - max: ::Hash, - storage_key: Option>, - key: Vec, - ); - - /// Send to `who` a body request. - fn send_body_request( - &mut self, - who: &PeerId, - id: RequestId, - fields: BlockAttributes, - from: FromBlock<::Hash, <::Header as HeaderT>::Number>, - to: Option, - direction: Direction, - max: Option - ); -} - -/// Light client requests service. Dispatches requests to appropriate peers. -pub struct LightDispatch { - /// Verifies that responses are correct. Passed at initialization. - checker: Arc>, - /// Numeric ID to assign to the next outgoing request. Used to assign responses to their - /// corresponding request. - next_request_id: u64, - /// Requests that we have yet to send out on the network. - pending_requests: VecDeque>, - /// List of nodes to which we have sent a request and that are yet to answer. - active_peers: LinkedHashMap>, - /// List of nodes that we know of that aren't doing anything and that are available for new - /// requests. - idle_peers: VecDeque, - /// Best known block for each node in `active_peers` and `idle_peers`. - best_blocks: HashMap>, -} - -struct Request { - id: u64, - /// When the request got created or sent out to the network. - timestamp: Instant, - /// Number of remaining attempts to fulfill this request. If it reaches 0, we interrupt the - /// attempt. - retry_count: usize, - data: RequestData, -} - -/// One request for data made by the `Client`. -/// -/// Contains a `Sender` where to send the result. -pub(crate) enum RequestData { - RemoteBody(RemoteBodyRequest, OneShotSender, ClientError>>), - RemoteHeader(RemoteHeaderRequest, OneShotSender>), - RemoteRead( - RemoteReadRequest, - OneShotSender, Option>>, ClientError>>, - ), - RemoteReadChild( - RemoteReadChildRequest, - OneShotSender, Option>>, ClientError>> - ), - RemoteCall(RemoteCallRequest, OneShotSender, ClientError>>), - RemoteChanges( - RemoteChangesRequest, - OneShotSender, u32)>, ClientError>> - ), -} - -enum Accept { - Ok, - CheckFailed(ClientError, RequestData), - Unexpected(RequestData), -} - -/// Dummy implementation of `FetchChecker` that always assumes that responses are bad. -/// -/// Considering that it is the responsibility of the client to build the fetcher, it can use this -/// implementation if it knows that it will never perform any request. -#[derive(Default, Clone)] -pub struct AlwaysBadChecker; - -impl FetchChecker for AlwaysBadChecker { - fn check_header_proof( - &self, - _request: &RemoteHeaderRequest, - _remote_header: Option, - _remote_proof: StorageProof, - ) -> Result { - Err(ClientError::Msg("AlwaysBadChecker".into())) - } - - fn check_read_proof( - &self, - _request: &RemoteReadRequest, - _remote_proof: StorageProof, - ) -> Result,Option>>, ClientError> { - Err(ClientError::Msg("AlwaysBadChecker".into())) - } - - fn check_read_child_proof( - &self, - _request: &RemoteReadChildRequest, - _remote_proof: StorageProof, - ) -> Result, Option>>, ClientError> { - Err(ClientError::Msg("AlwaysBadChecker".into())) - } - - fn check_execution_proof( - &self, - _request: &RemoteCallRequest, - _remote_proof: StorageProof, - ) -> Result, ClientError> { - Err(ClientError::Msg("AlwaysBadChecker".into())) - } - - fn check_changes_proof( - &self, - _request: &RemoteChangesRequest, - _remote_proof: ChangesProof - ) -> Result, u32)>, ClientError> { - Err(ClientError::Msg("AlwaysBadChecker".into())) - } - - fn check_body_proof( - &self, - _request: &RemoteBodyRequest, - _body: Vec - ) -> Result, ClientError> { - Err(ClientError::Msg("AlwaysBadChecker".into())) - } -} - -impl LightDispatch where - B::Header: HeaderT, -{ - /// Creates new light client requests processor. - pub fn new(checker: Arc>) -> Self { - LightDispatch { - checker, - next_request_id: 0, - pending_requests: VecDeque::new(), - active_peers: LinkedHashMap::new(), - idle_peers: VecDeque::new(), - best_blocks: HashMap::new(), - } - } - - /// Inserts a new request in the list of requests to execute. - pub(crate) fn add_request(&mut self, network: impl LightDispatchNetwork, data: RequestData) { - self.insert(RETRY_COUNT, data); - self.dispatch(network); - } - - /// Inserts a new request in the list of requests to execute. - fn insert(&mut self, retry_count: usize, data: RequestData) { - let request_id = self.next_request_id; - self.next_request_id += 1; - - self.pending_requests.push_back(Request { - id: request_id, - timestamp: Instant::now(), - retry_count, - data, - }); - } - - /// Try to accept response from given peer. - fn accept_response( - &mut self, - rtype: &str, - mut network: impl LightDispatchNetwork, - peer: PeerId, - request_id: u64, - try_accept: impl FnOnce(Request, &Arc>) -> Accept - ) { - let request = match self.remove(peer.clone(), request_id) { - Some(request) => request, - None => { - info!("💔 Invalid remote {} response from peer {}", rtype, peer); - network.report_peer(&peer, ReputationChange::new_fatal("Invalid remote response")); - network.disconnect_peer(&peer); - self.remove_peer(&peer); - return; - }, - }; - - let retry_count = request.retry_count; - let (retry_count, retry_request_data) = match try_accept(request, &self.checker) { - Accept::Ok => (retry_count, None), - Accept::CheckFailed(error, retry_request_data) => { - info!("💔 Failed to check remote {} response from peer {}: {}", rtype, peer, error); - network.report_peer(&peer, ReputationChange::new_fatal("Failed remote response check")); - network.disconnect_peer(&peer); - self.remove_peer(&peer); - - if retry_count > 0 { - (retry_count - 1, Some(retry_request_data)) - } else { - trace!(target: "sync", "Failed to get remote {} response for given number of retries", rtype); - retry_request_data.fail(ClientError::RemoteFetchFailed.into()); - (0, None) - } - }, - Accept::Unexpected(retry_request_data) => { - info!("💔 Unexpected response to remote {} from peer", rtype); - network.report_peer(&peer, ReputationChange::new_fatal("Unexpected remote response")); - network.disconnect_peer(&peer); - self.remove_peer(&peer); - - (retry_count, Some(retry_request_data)) - }, - }; - - if let Some(request_data) = retry_request_data { - self.insert(retry_count, request_data); - } - - self.dispatch(network); - } - - /// Call this when we connect to a node on the network. - pub fn on_connect( - &mut self, - network: impl LightDispatchNetwork, - peer: PeerId, - role: Roles, - best_number: NumberFor - ) { - if !role.is_full() { - return; - } - - self.idle_peers.push_back(peer.clone()); - self.best_blocks.insert(peer, best_number); - - self.dispatch(network); - } - - /// Sets the best seen block for the given node. - pub fn update_best_number(&mut self, network: impl LightDispatchNetwork, peer: PeerId, best_number: NumberFor) { - self.best_blocks.insert(peer, best_number); - self.dispatch(network); - } - - /// Call this when we disconnect from a node. - pub fn on_disconnect(&mut self, network: impl LightDispatchNetwork, peer: &PeerId) { - self.remove_peer(peer); - self.dispatch(network); - } - - /// Must be called periodically in order to perform maintenance. - pub fn maintain_peers(&mut self, mut network: impl LightDispatchNetwork) { - let now = Instant::now(); - - loop { - match self.active_peers.front() { - Some((_, request)) if now - request.timestamp >= REQUEST_TIMEOUT => (), - _ => break, - } - - let (bad_peer, request) = self.active_peers.pop_front().expect("front() is Some as checked above"); - self.pending_requests.push_front(request); - network.report_peer(&bad_peer, ReputationChange::new(TIMEOUT_REPUTATION_CHANGE, "Light request timeout")); - network.disconnect_peer(&bad_peer); - } - - self.dispatch(network); - } - - /// Handles a remote header response message from on the network. - pub fn on_remote_header_response( - &mut self, - network: impl LightDispatchNetwork, - peer: PeerId, - response: message::RemoteHeaderResponse - ) { - self.accept_response("header", network, peer, response.id, |request, checker| match request.data { - RequestData::RemoteHeader(request, sender) => match checker.check_header_proof( - &request, - response.header, - response.proof - ) { - Ok(response) => { - // we do not bother if receiver has been dropped already - let _ = sender.send(Ok(response)); - Accept::Ok - }, - Err(error) => Accept::CheckFailed(error, RequestData::RemoteHeader(request, sender)), - }, - data => Accept::Unexpected(data), - }) - } - - /// Handles a remote read response message from on the network. - pub fn on_remote_read_response( - &mut self, - network: impl LightDispatchNetwork, - peer: PeerId, - response: message::RemoteReadResponse - ) { - self.accept_response("read", network, peer, response.id, |request, checker| match request.data { - RequestData::RemoteRead(request, sender) => { - match checker.check_read_proof(&request, response.proof) { - Ok(response) => { - // we do not bother if receiver has been dropped already - let _ = sender.send(Ok(response)); - Accept::Ok - }, - Err(error) => Accept::CheckFailed( - error, - RequestData::RemoteRead(request, sender) - ), - }}, - RequestData::RemoteReadChild(request, sender) => { - match checker.check_read_child_proof(&request, response.proof) { - Ok(response) => { - // we do not bother if receiver has been dropped already - let _ = sender.send(Ok(response)); - Accept::Ok - }, - Err(error) => Accept::CheckFailed( - error, - RequestData::RemoteReadChild(request, sender) - ), - }}, - data => Accept::Unexpected(data), - }) - } - - /// Handles a remote call response message from on the network. - pub fn on_remote_call_response( - &mut self, - network: impl LightDispatchNetwork, - peer: PeerId, - response: message::RemoteCallResponse - ) { - self.accept_response("call", network, peer, response.id, |request, checker| match request.data { - RequestData::RemoteCall(request, sender) => match checker.check_execution_proof(&request, response.proof) { - Ok(response) => { - // we do not bother if receiver has been dropped already - let _ = sender.send(Ok(response)); - Accept::Ok - }, - Err(error) => Accept::CheckFailed(error, RequestData::RemoteCall(request, sender)), - }, - data => Accept::Unexpected(data), - }) - } - - /// Handles a remote changes response message from on the network. - pub fn on_remote_changes_response( - &mut self, - network: impl LightDispatchNetwork, - peer: PeerId, - response: message::RemoteChangesResponse, B::Hash> - ) { - self.accept_response("changes", network, peer, response.id, |request, checker| match request.data { - RequestData::RemoteChanges(request, sender) => match checker.check_changes_proof( - &request, ChangesProof { - max_block: response.max, - proof: response.proof, - roots: response.roots.into_iter().collect(), - roots_proof: response.roots_proof, - }) { - Ok(response) => { - // we do not bother if receiver has been dropped already - let _ = sender.send(Ok(response)); - Accept::Ok - }, - Err(error) => Accept::CheckFailed(error, RequestData::RemoteChanges(request, sender)), - }, - data => Accept::Unexpected(data), - }) - } - - /// Handles a remote body response message from on the network. - pub fn on_remote_body_response( - &mut self, - network: impl LightDispatchNetwork, - peer: PeerId, - response: message::BlockResponse - ) { - self.accept_response("body", network, peer, response.id, |request, checker| match request.data { - RequestData::RemoteBody(request, sender) => { - let mut bodies: Vec<_> = response - .blocks - .into_iter() - .filter_map(|b| b.body) - .collect(); - - // Number of bodies are hardcoded to 1 for valid `RemoteBodyResponses` - if bodies.len() != 1 { - return Accept::CheckFailed( - "RemoteBodyResponse: invalid number of blocks".into(), - RequestData::RemoteBody(request, sender), - ) - } - let body = bodies.remove(0); - - match checker.check_body_proof(&request, body) { - Ok(body) => { - let _ = sender.send(Ok(body)); - Accept::Ok - } - Err(error) => Accept::CheckFailed(error, RequestData::RemoteBody(request, sender)), - } - } - other => Accept::Unexpected(other), - }) - } - - pub fn is_light_response(&self, peer: &PeerId, request_id: message::RequestId) -> bool { - self.active_peers.get(peer).map_or(false, |r| r.id == request_id) - } - - fn remove(&mut self, peer: PeerId, id: u64) -> Option> { - match self.active_peers.entry(peer.clone()) { - Entry::Occupied(entry) => match entry.get().id == id { - true => { - self.idle_peers.push_back(peer); - Some(entry.remove()) - }, - false => None, - }, - Entry::Vacant(_) => None, - } - } - - /// Removes a peer from the list of known peers. - /// - /// Puts back the active request that this node was performing into `pending_requests`. - fn remove_peer(&mut self, peer: &PeerId) { - self.best_blocks.remove(peer); - - if let Some(request) = self.active_peers.remove(peer) { - self.pending_requests.push_front(request); - return; - } - - if let Some(idle_index) = self.idle_peers.iter().position(|i| i == peer) { - self.idle_peers.swap_remove_back(idle_index); - } - } - - /// Dispatches pending requests. - fn dispatch(&mut self, mut network: impl LightDispatchNetwork) { - let mut last_peer = self.idle_peers.back().cloned(); - let mut unhandled_requests = VecDeque::new(); - - loop { - let peer = match self.idle_peers.pop_front() { - Some(peer) => peer, - None => break, - }; - - // check if request can (optimistically) be processed by the peer - let can_be_processed_by_peer = { - let request = match self.pending_requests.front() { - Some(r) => r, - None => { - self.idle_peers.push_front(peer); - break; - }, - }; - let peer_best_block = self.best_blocks.get(&peer) - .expect("entries are inserted into best_blocks when peer is connected; - entries are removed from best_blocks when peer is disconnected; - peer is in idle_peers and thus connected; qed"); - request.required_block() <= *peer_best_block - }; - - if !can_be_processed_by_peer { - // return peer to the back of the queue - self.idle_peers.push_back(peer.clone()); - - // we have enumerated all peers and no one can handle request - if Some(peer) == last_peer { - let request = self.pending_requests.pop_front().expect("checked in loop condition; qed"); - unhandled_requests.push_back(request); - last_peer = self.idle_peers.back().cloned(); - } - - continue; - } - - last_peer = self.idle_peers.back().cloned(); - - let mut request = self.pending_requests.pop_front().expect("checked in loop condition; qed"); - request.timestamp = Instant::now(); - trace!(target: "sync", "Dispatching remote request {} to peer {}", request.id, peer); - request.send_to(&mut network, &peer); - self.active_peers.insert(peer, request); - } - - self.pending_requests.append(&mut unhandled_requests); - } -} - -impl Request { - /// Returns the block that the remote needs to have in order to be able to fulfill - /// this request. - fn required_block(&self) -> NumberFor { - match self.data { - RequestData::RemoteHeader(ref data, _) => data.block, - RequestData::RemoteRead(ref data, _) => *data.header.number(), - RequestData::RemoteReadChild(ref data, _) => *data.header.number(), - RequestData::RemoteCall(ref data, _) => *data.header.number(), - RequestData::RemoteChanges(ref data, _) => data.max_block.0, - RequestData::RemoteBody(ref data, _) => *data.header.number(), - } - } - - fn send_to(&self, out: &mut impl LightDispatchNetwork, peer: &PeerId) { - match self.data { - RequestData::RemoteHeader(ref data, _) => - out.send_header_request( - peer, - self.id, - data.block, - ), - RequestData::RemoteRead(ref data, _) => - out.send_read_request( - peer, - self.id, - data.block, - data.keys.clone(), - ), - RequestData::RemoteReadChild(ref data, _) => - out.send_read_child_request( - peer, - self.id, - data.block, - data.storage_key.clone(), - data.child_info.clone(), - data.child_type, - data.keys.clone(), - ), - RequestData::RemoteCall(ref data, _) => - out.send_call_request( - peer, - self.id, - data.block, - data.method.clone(), - data.call_data.clone(), - ), - RequestData::RemoteChanges(ref data, _) => - out.send_changes_request( - peer, - self.id, - data.first_block.1.clone(), - data.last_block.1.clone(), - data.tries_roots.1.clone(), - data.max_block.1.clone(), - data.storage_key.clone(), - data.key.clone(), - ), - RequestData::RemoteBody(ref data, _) => - out.send_body_request( - peer, - self.id, - message::BlockAttributes::BODY, - message::FromBlock::Hash(data.header.hash()), - None, - message::Direction::Ascending, - Some(1) - ), - } - } -} - -impl RequestData { - fn fail(self, error: ClientError) { - // don't care if anyone is listening - match self { - RequestData::RemoteHeader(_, sender) => { let _ = sender.send(Err(error)); }, - RequestData::RemoteCall(_, sender) => { let _ = sender.send(Err(error)); }, - RequestData::RemoteRead(_, sender) => { let _ = sender.send(Err(error)); }, - RequestData::RemoteReadChild(_, sender) => { let _ = sender.send(Err(error)); }, - RequestData::RemoteChanges(_, sender) => { let _ = sender.send(Err(error)); }, - RequestData::RemoteBody(_, sender) => { let _ = sender.send(Err(error)); }, - } - } -} - -#[cfg(test)] -pub mod tests { - use std::collections::{HashMap, HashSet}; - use std::sync::Arc; - use std::time::Instant; - use futures::channel::oneshot; - use sp_core::storage::ChildInfo; - use sp_runtime::traits::{Block as BlockT, NumberFor, Header as HeaderT}; - use sp_blockchain::{Error as ClientError, Result as ClientResult}; - use sc_client_api::{FetchChecker, RemoteHeaderRequest, - ChangesProof, RemoteCallRequest, RemoteReadRequest, - RemoteReadChildRequest, RemoteChangesRequest, RemoteBodyRequest}; - use crate::config::Roles; - use crate::protocol::message::{self, BlockAttributes, Direction, FromBlock, RequestId}; - use libp2p::PeerId; - use super::{REQUEST_TIMEOUT, LightDispatch, LightDispatchNetwork, RequestData, StorageProof}; - use sp_test_primitives::{Block, Header}; - - pub(crate) struct DummyFetchChecker { - pub(crate) ok: bool, - _mark: std::marker::PhantomData - } - - impl DummyFetchChecker { - pub(crate) fn new(ok: bool) -> Self { - DummyFetchChecker { ok, _mark: std::marker::PhantomData } - } - } - - impl FetchChecker for DummyFetchChecker { - fn check_header_proof( - &self, - _request: &RemoteHeaderRequest, - header: Option, - _remote_proof: StorageProof, - ) -> ClientResult { - match self.ok { - true if header.is_some() => Ok(header.unwrap()), - _ => Err(ClientError::Backend("Test error".into())), - } - } - - fn check_read_proof( - &self, - request: &RemoteReadRequest, - _: StorageProof, - ) -> ClientResult, Option>>> { - match self.ok { - true => Ok(request.keys - .iter() - .cloned() - .map(|k| (k, Some(vec![42]))) - .collect() - ), - false => Err(ClientError::Backend("Test error".into())), - } - } - - fn check_read_child_proof( - &self, - request: &RemoteReadChildRequest, - _: StorageProof, - ) -> ClientResult, Option>>> { - match self.ok { - true => Ok(request.keys - .iter() - .cloned() - .map(|k| (k, Some(vec![42]))) - .collect() - ), - false => Err(ClientError::Backend("Test error".into())), - } - } - - fn check_execution_proof( - &self, - _: &RemoteCallRequest, - _: StorageProof, - ) -> ClientResult> { - match self.ok { - true => Ok(vec![42]), - false => Err(ClientError::Backend("Test error".into())), - } - } - - fn check_changes_proof( - &self, - _: &RemoteChangesRequest, - _: ChangesProof - ) -> ClientResult, u32)>> { - match self.ok { - true => Ok(vec![(100.into(), 2)]), - false => Err(ClientError::Backend("Test error".into())), - } - } - - fn check_body_proof( - &self, - _: &RemoteBodyRequest, - body: Vec - ) -> ClientResult> { - match self.ok { - true => Ok(body), - false => Err(ClientError::Backend("Test error".into())), - } - } - } - - fn dummy(ok: bool) -> LightDispatch { - LightDispatch::new(Arc::new(DummyFetchChecker::new(ok))) - } - - fn total_peers(light_dispatch: &LightDispatch) -> usize { - light_dispatch.idle_peers.len() + light_dispatch.active_peers.len() - } - - fn receive_call_response( - network_interface: impl LightDispatchNetwork, - light_dispatch: &mut LightDispatch, - peer: PeerId, - id: message::RequestId - ) { - light_dispatch.on_remote_call_response(network_interface, peer, message::RemoteCallResponse { - id: id, - proof: StorageProof::empty(), - }); - } - - pub(crate) fn dummy_header() -> Header { - Header { - parent_hash: Default::default(), - number: 0, - state_root: Default::default(), - extrinsics_root: Default::default(), - digest: Default::default(), - } - } - - #[derive(Default)] - struct DummyNetwork { - disconnected_peers: HashSet, - } - - impl<'a, B: BlockT> LightDispatchNetwork for &'a mut DummyNetwork { - fn report_peer(&mut self, _: &PeerId, _: crate::ReputationChange) {} - fn disconnect_peer(&mut self, who: &PeerId) { - self.disconnected_peers.insert(who.clone()); - } - fn send_header_request(&mut self, _: &PeerId, _: RequestId, _: <::Header as HeaderT>::Number) {} - fn send_read_request(&mut self, _: &PeerId, _: RequestId, _: ::Hash, _: Vec>) {} - fn send_read_child_request(&mut self, _: &PeerId, _: RequestId, _: ::Hash, _: Vec, - _: Vec, _: u32, _: Vec>) {} - fn send_call_request(&mut self, _: &PeerId, _: RequestId, _: ::Hash, _: String, _: Vec) {} - fn send_changes_request(&mut self, _: &PeerId, _: RequestId, _: ::Hash, _: ::Hash, - _: ::Hash, _: ::Hash, _: Option>, _: Vec) {} - fn send_body_request(&mut self, _: &PeerId, _: RequestId, _: BlockAttributes, _: FromBlock<::Hash, - <::Header as HeaderT>::Number>, _: Option, _: Direction, _: Option) {} - } - - fn assert_disconnected_peer(dummy: &DummyNetwork) { - assert_eq!(dummy.disconnected_peers.len(), 1); - } - - #[test] - fn knows_about_peers_roles() { - let mut network_interface = DummyNetwork::default(); - let mut light_dispatch = dummy(true); - let peer0 = PeerId::random(); - let peer1 = PeerId::random(); - let peer2 = PeerId::random(); - light_dispatch.on_connect(&mut network_interface, peer0, Roles::LIGHT, 1000); - light_dispatch.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 2000); - light_dispatch.on_connect(&mut network_interface, peer2.clone(), Roles::AUTHORITY, 3000); - assert_eq!(vec![peer1.clone(), peer2.clone()], light_dispatch.idle_peers.iter().cloned().collect::>()); - assert_eq!(light_dispatch.best_blocks.get(&peer1), Some(&2000)); - assert_eq!(light_dispatch.best_blocks.get(&peer2), Some(&3000)); - } - - #[test] - fn disconnects_from_idle_peer() { - let peer0 = PeerId::random(); - - let mut network_interface = DummyNetwork::default(); - let mut light_dispatch = dummy(true); - light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 100); - assert_eq!(1, total_peers(&light_dispatch)); - assert!(!light_dispatch.best_blocks.is_empty()); - - light_dispatch.on_disconnect(&mut network_interface, &peer0); - assert_eq!(0, total_peers(&light_dispatch)); - assert!(light_dispatch.best_blocks.is_empty()); - } - - #[test] - fn disconnects_from_timeouted_peer() { - let mut light_dispatch = dummy(true); - let mut network_interface = DummyNetwork::default(); - let peer0 = PeerId::random(); - let peer1 = PeerId::random(); - light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); - light_dispatch.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 1000); - assert_eq!(vec![peer0.clone(), peer1.clone()], light_dispatch.idle_peers.iter().cloned().collect::>()); - assert!(light_dispatch.active_peers.is_empty()); - - light_dispatch.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest { - block: Default::default(), - header: dummy_header(), - method: "test".into(), - call_data: vec![], - retry_count: None, - }, oneshot::channel().0)); - assert_eq!(vec![peer1.clone()], light_dispatch.idle_peers.iter().cloned().collect::>()); - assert_eq!(vec![peer0.clone()], light_dispatch.active_peers.keys().cloned().collect::>()); - - light_dispatch.active_peers[&peer0].timestamp = Instant::now() - REQUEST_TIMEOUT - REQUEST_TIMEOUT; - light_dispatch.maintain_peers(&mut network_interface); - assert!(light_dispatch.idle_peers.is_empty()); - assert_eq!(vec![peer1.clone()], light_dispatch.active_peers.keys().cloned().collect::>()); - assert_disconnected_peer(&network_interface); - } - - #[test] - fn disconnects_from_peer_on_response_with_wrong_id() { - let mut light_dispatch = dummy(true); - let peer0 = PeerId::random(); - let mut network_interface = DummyNetwork::default(); - light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); - - light_dispatch.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest { - block: Default::default(), - header: dummy_header(), - method: "test".into(), - call_data: vec![], - retry_count: None, - }, oneshot::channel().0)); - receive_call_response(&mut network_interface, &mut light_dispatch, peer0, 1); - assert_disconnected_peer(&network_interface); - assert_eq!(light_dispatch.pending_requests.len(), 1); - } - - #[test] - fn disconnects_from_peer_on_incorrect_response() { - let mut light_dispatch = dummy(false); - let mut network_interface = DummyNetwork::default(); - let peer0 = PeerId::random(); - light_dispatch.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest { - block: Default::default(), - header: dummy_header(), - method: "test".into(), - call_data: vec![], - retry_count: Some(1), - }, oneshot::channel().0)); - - light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); - receive_call_response(&mut network_interface, &mut light_dispatch, peer0.clone(), 0); - assert_disconnected_peer(&network_interface); - assert_eq!(light_dispatch.pending_requests.len(), 1); - } - - #[test] - fn disconnects_from_peer_on_unexpected_response() { - let mut light_dispatch = dummy(true); - let mut network_interface = DummyNetwork::default(); - let peer0 = PeerId::random(); - light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); - - receive_call_response(&mut network_interface, &mut light_dispatch, peer0, 0); - assert_disconnected_peer(&network_interface); - } - - #[test] - fn disconnects_from_peer_on_wrong_response_type() { - let mut light_dispatch = dummy(false); - let peer0 = PeerId::random(); - let mut network_interface = DummyNetwork::default(); - light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); - - light_dispatch.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest { - block: Default::default(), - header: dummy_header(), - method: "test".into(), - call_data: vec![], - retry_count: Some(1), - }, oneshot::channel().0)); - - light_dispatch.on_remote_read_response(&mut network_interface, peer0.clone(), message::RemoteReadResponse { - id: 0, - proof: StorageProof::empty(), - }); - assert_disconnected_peer(&network_interface); - assert_eq!(light_dispatch.pending_requests.len(), 1); - } - - #[test] - fn receives_remote_failure_after_retry_count_failures() { - let retry_count = 2; - let peer_ids = (0 .. retry_count + 1).map(|_| PeerId::random()).collect::>(); - let mut light_dispatch = dummy(false); - let mut network_interface = DummyNetwork::default(); - for i in 0..retry_count+1 { - light_dispatch.on_connect(&mut network_interface, peer_ids[i].clone(), Roles::FULL, 1000); - } - - let (tx, mut response) = oneshot::channel(); - light_dispatch.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest { - block: Default::default(), - header: dummy_header(), - method: "test".into(), - call_data: vec![], - retry_count: Some(retry_count) - }, tx)); - - for i in 0..retry_count { - assert!(response.try_recv().unwrap().is_none()); - receive_call_response(&mut network_interface, &mut light_dispatch, peer_ids[i].clone(), i as u64); - } - - assert!(response.try_recv().unwrap().unwrap().is_err()); - } - - #[test] - fn receives_remote_call_response() { - let mut light_dispatch = dummy(true); - let mut network_interface = DummyNetwork::default(); - let peer0 = PeerId::random(); - light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); - - let (tx, response) = oneshot::channel(); - light_dispatch.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest { - block: Default::default(), - header: dummy_header(), - method: "test".into(), - call_data: vec![], - retry_count: None, - }, tx)); - - receive_call_response(&mut network_interface, &mut light_dispatch, peer0.clone(), 0); - assert_eq!(futures::executor::block_on(response).unwrap().unwrap(), vec![42]); - } - - #[test] - fn receives_remote_read_response() { - let mut light_dispatch = dummy(true); - let mut network_interface = DummyNetwork::default(); - let peer0 = PeerId::random(); - light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); - - let (tx, response) = oneshot::channel(); - light_dispatch.add_request(&mut network_interface, RequestData::RemoteRead(RemoteReadRequest { - header: dummy_header(), - block: Default::default(), - keys: vec![b":key".to_vec()], - retry_count: None, - }, tx)); - - light_dispatch.on_remote_read_response(&mut network_interface, peer0.clone(), message::RemoteReadResponse { - id: 0, - proof: StorageProof::empty(), - }); - assert_eq!( - futures::executor::block_on(response).unwrap().unwrap().remove(b":key".as_ref()).unwrap(), - Some(vec![42]) - ); - } - - #[test] - fn receives_remote_read_child_response() { - let mut light_dispatch = dummy(true); - let mut network_interface = DummyNetwork::default(); - let peer0 = PeerId::random(); - light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); - - let (tx, response) = oneshot::channel(); - let child_info = ChildInfo::new_default(b"unique_id_1"); - let (child_info, child_type) = child_info.info(); - light_dispatch.add_request(&mut network_interface, RequestData::RemoteReadChild(RemoteReadChildRequest { - header: dummy_header(), - block: Default::default(), - storage_key: b":child_storage:sub".to_vec(), - child_info: child_info.to_vec(), - child_type, - keys: vec![b":key".to_vec()], - retry_count: None, - }, tx)); - - light_dispatch.on_remote_read_response(&mut network_interface, - peer0.clone(), message::RemoteReadResponse { - id: 0, - proof: StorageProof::empty(), - }); - assert_eq!(futures::executor::block_on(response).unwrap().unwrap().remove(b":key".as_ref()).unwrap(), Some(vec![42])); - } - - #[test] - fn receives_remote_header_response() { - let mut light_dispatch = dummy(true); - let mut network_interface = DummyNetwork::default(); - let peer0 = PeerId::random(); - light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); - - let (tx, response) = oneshot::channel(); - light_dispatch.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { - cht_root: Default::default(), - block: 1, - retry_count: None, - }, tx)); - - light_dispatch.on_remote_header_response(&mut network_interface, peer0.clone(), message::RemoteHeaderResponse { - id: 0, - header: Some(Header { - parent_hash: Default::default(), - number: 1, - state_root: Default::default(), - extrinsics_root: Default::default(), - digest: Default::default(), - }), - proof: StorageProof::empty(), - }); - assert_eq!( - futures::executor::block_on(response).unwrap().unwrap().hash(), - "6443a0b46e0412e626363028115a9f2cf963eeed526b8b33e5316f08b50d0dc3".parse().unwrap(), - ); - } - - #[test] - fn receives_remote_changes_response() { - let mut light_dispatch = dummy(true); - let mut network_interface = DummyNetwork::default(); - let peer0 = PeerId::random(); - light_dispatch.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); - - let (tx, response) = oneshot::channel(); - light_dispatch.add_request(&mut network_interface, RequestData::RemoteChanges(RemoteChangesRequest { - changes_trie_configs: vec![sp_core::ChangesTrieConfigurationRange { - zero: (0, Default::default()), - end: None, - config: Some(sp_core::ChangesTrieConfiguration::new(4, 2)), - }], - first_block: (1, Default::default()), - last_block: (100, Default::default()), - max_block: (100, Default::default()), - tries_roots: (1, Default::default(), vec![]), - key: vec![], - storage_key: None, - retry_count: None, - }, tx)); - - light_dispatch.on_remote_changes_response(&mut network_interface, peer0.clone(), message::RemoteChangesResponse { - id: 0, - max: 1000, - proof: vec![vec![2]], - roots: vec![], - roots_proof: StorageProof::empty(), - }); - assert_eq!(futures::executor::block_on(response).unwrap().unwrap(), vec![(100, 2)]); - } - - #[test] - fn does_not_sends_request_to_peer_who_has_no_required_block() { - let mut light_dispatch = dummy(true); - let mut network_interface = DummyNetwork::default(); - let peer1 = PeerId::random(); - let peer2 = PeerId::random(); - - light_dispatch.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 100); - - light_dispatch.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { - cht_root: Default::default(), - block: 200, - retry_count: None, - }, oneshot::channel().0)); - light_dispatch.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { - cht_root: Default::default(), - block: 250, - retry_count: None, - }, oneshot::channel().0)); - light_dispatch.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { - cht_root: Default::default(), - block: 250, - retry_count: None, - }, oneshot::channel().0)); - - light_dispatch.on_connect(&mut network_interface, peer2.clone(), Roles::FULL, 150); - - assert_eq!(vec![peer1.clone(), peer2.clone()], light_dispatch.idle_peers.iter().cloned().collect::>()); - assert_eq!(light_dispatch.pending_requests.len(), 3); - - light_dispatch.update_best_number(&mut network_interface, peer1.clone(), 250); - - assert_eq!(vec![peer2.clone()], light_dispatch.idle_peers.iter().cloned().collect::>()); - assert_eq!(light_dispatch.pending_requests.len(), 2); - - light_dispatch.update_best_number(&mut network_interface, peer2.clone(), 250); - - assert!(!light_dispatch.idle_peers.iter().any(|_| true)); - assert_eq!(light_dispatch.pending_requests.len(), 1); - - light_dispatch.on_remote_header_response(&mut network_interface, peer1.clone(), message::RemoteHeaderResponse { - id: 0, - header: Some(dummy_header()), - proof: StorageProof::empty(), - }); - - assert!(!light_dispatch.idle_peers.iter().any(|_| true)); - assert_eq!(light_dispatch.pending_requests.len(), 0); - } - - #[test] - fn does_not_loop_forever_after_dispatching_request_to_last_peer() { - // this test is a regression for a bug where the dispatch function would - // loop forever after dispatching a request to the last peer, since the - // last peer was not updated - let mut light_dispatch = dummy(true); - let mut network_interface = DummyNetwork::default(); - let peer1 = PeerId::random(); - let peer2 = PeerId::random(); - let peer3 = PeerId::random(); - - light_dispatch.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { - cht_root: Default::default(), - block: 250, - retry_count: None, - }, oneshot::channel().0)); - light_dispatch.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { - cht_root: Default::default(), - block: 250, - retry_count: None, - }, oneshot::channel().0)); - - light_dispatch.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 200); - light_dispatch.on_connect(&mut network_interface, peer2.clone(), Roles::FULL, 200); - light_dispatch.on_connect(&mut network_interface, peer3.clone(), Roles::FULL, 250); - - assert_eq!(vec![peer1.clone(), peer2.clone()], light_dispatch.idle_peers.iter().cloned().collect::>()); - assert_eq!(light_dispatch.pending_requests.len(), 1); - } - - #[test] - fn tries_to_send_all_pending_requests() { - let mut light_dispatch = dummy(true); - let mut network_interface = DummyNetwork::default(); - let peer1 = PeerId::random(); - - light_dispatch.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { - cht_root: Default::default(), - block: 300, - retry_count: None, - }, oneshot::channel().0)); - light_dispatch.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { - cht_root: Default::default(), - block: 250, - retry_count: None, - }, oneshot::channel().0)); - - light_dispatch.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 250); - - assert!(light_dispatch.idle_peers.iter().cloned().collect::>().is_empty()); - assert_eq!(light_dispatch.pending_requests.len(), 1); - } - - #[test] - fn remote_body_with_one_block_body_should_succeed() { - let mut light_dispatch = dummy(true); - let mut network_interface = DummyNetwork::default(); - let peer1 = PeerId::random(); - - let header = dummy_header(); - light_dispatch.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 250); - - light_dispatch.add_request(&mut network_interface, RequestData::RemoteBody(RemoteBodyRequest { - header: header.clone(), - retry_count: None, - }, oneshot::channel().0)); - - assert!(light_dispatch.pending_requests.is_empty()); - assert_eq!(light_dispatch.active_peers.len(), 1); - - let block = message::BlockData:: { - hash: sp_core::H256::random(), - header: None, - body: Some(Vec::new()), - message_queue: None, - receipt: None, - justification: None, - }; - - let response = message::generic::BlockResponse { - id: 0, - blocks: vec![block], - }; - - light_dispatch.on_remote_body_response(&mut network_interface, peer1.clone(), response); - - assert!(light_dispatch.active_peers.is_empty()); - assert_eq!(light_dispatch.idle_peers.len(), 1); - } - - #[test] - fn remote_body_with_three_bodies_should_fail() { - let mut light_dispatch = dummy(true); - let mut network_interface = DummyNetwork::default(); - let peer1 = PeerId::random(); - - let header = dummy_header(); - light_dispatch.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 250); - - light_dispatch.add_request(&mut network_interface, RequestData::RemoteBody(RemoteBodyRequest { - header: header.clone(), - retry_count: None, - }, oneshot::channel().0)); - - assert!(light_dispatch.pending_requests.is_empty()); - assert_eq!(light_dispatch.active_peers.len(), 1); - - let response = { - let blocks: Vec<_> = (0..3).map(|_| message::BlockData:: { - hash: sp_core::H256::random(), - header: None, - body: Some(Vec::new()), - message_queue: None, - receipt: None, - justification: None, - }).collect(); - - message::generic::BlockResponse { - id: 0, - blocks, - } - }; - - light_dispatch.on_remote_body_response(&mut network_interface, peer1.clone(), response); - assert!(light_dispatch.active_peers.is_empty()); - assert!(light_dispatch.idle_peers.is_empty(), "peer should be disconnected after bad response"); - } -} From 27eae1106857410f80db59c45144e5b3edb41e55 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 31 Mar 2020 16:51:32 +0200 Subject: [PATCH 3/4] Fix tests --- client/network/src/protocol.rs | 2 - .../src/protocol/light_client_handler.rs | 119 ++++++++++++++++-- 2 files changed, 107 insertions(+), 14 deletions(-) diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index e5c6b2328e2b3..7a86c865424bf 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -2047,7 +2047,6 @@ impl Drop for Protocol { #[cfg(test)] mod tests { use crate::PeerId; - use crate::protocol::light_dispatch::AlwaysBadChecker; use crate::config::{EmptyTransactionPool, Roles}; use super::{CustomMessageOutcome, Protocol, ProtocolConfig}; @@ -2066,7 +2065,6 @@ mod tests { max_parallel_downloads: 10, }, client.clone(), - Arc::new(AlwaysBadChecker), Arc::new(EmptyTransactionPool), None, None, diff --git a/client/network/src/protocol/light_client_handler.rs b/client/network/src/protocol/light_client_handler.rs index d4b0c01c8c41f..3c39be124c461 100644 --- a/client/network/src/protocol/light_client_handler.rs +++ b/client/network/src/protocol/light_client_handler.rs @@ -1236,7 +1236,7 @@ mod tests { use crate::{ chain::Client, config::ProtocolId, - protocol::{api, light_dispatch::tests::{DummyFetchChecker, dummy_header}} + protocol::api, }; use futures::{channel::oneshot, prelude::*}; use libp2p::{ @@ -1258,15 +1258,15 @@ mod tests { use sp_blockchain::{Error as ClientError}; use sp_core::storage::ChildInfo; use std::{ - collections::HashSet, + collections::{HashMap, HashSet}, io, iter::{self, FromIterator}, pin::Pin, sync::Arc, task::{Context, Poll} }; - use sp_runtime::{generic::Header, traits::BlakeTwo256}; - use super::{Event, LightClientHandler, Request, OutboundProtocol, PeerStatus}; + use sp_runtime::{generic::Header, traits::{BlakeTwo256, Block as BlockT, NumberFor}}; + use super::{Event, LightClientHandler, Request, Response, OutboundProtocol, PeerStatus}; use void::Void; const CHILD_INFO: ChildInfo<'static> = ChildInfo::new_default(b"foobarbaz"); @@ -1281,7 +1281,7 @@ mod tests { fn make_swarm(ok: bool, ps: sc_peerset::PeersetHandle, cf: super::Config) -> Swarm { let client = Arc::new(substrate_test_runtime_client::new()); - let checker = Arc::new(DummyFetchChecker::new(ok)); + let checker = Arc::new(DummyFetchChecker { ok, _mark: std::marker::PhantomData }); let id_key = identity::Keypair::generate_ed25519(); let dh_key = Keypair::::new().into_authentic(&id_key).unwrap(); let local_peer = id_key.public().into_peer_id(); @@ -1295,10 +1295,104 @@ mod tests { Swarm::new(transport, LightClientHandler::new(cf, client, checker, ps), local_peer) } + struct DummyFetchChecker { + ok: bool, + _mark: std::marker::PhantomData + } + + impl fetcher::FetchChecker for DummyFetchChecker { + fn check_header_proof( + &self, + _request: &fetcher::RemoteHeaderRequest, + header: Option, + _remote_proof: fetcher::StorageProof, + ) -> Result { + match self.ok { + true if header.is_some() => Ok(header.unwrap()), + _ => Err(ClientError::Backend("Test error".into())), + } + } + + fn check_read_proof( + &self, + request: &fetcher::RemoteReadRequest, + _: fetcher::StorageProof, + ) -> Result, Option>>, ClientError> { + match self.ok { + true => Ok(request.keys + .iter() + .cloned() + .map(|k| (k, Some(vec![42]))) + .collect() + ), + false => Err(ClientError::Backend("Test error".into())), + } + } + + fn check_read_child_proof( + &self, + request: &fetcher::RemoteReadChildRequest, + _: fetcher::StorageProof, + ) -> Result, Option>>, ClientError> { + match self.ok { + true => Ok(request.keys + .iter() + .cloned() + .map(|k| (k, Some(vec![42]))) + .collect() + ), + false => Err(ClientError::Backend("Test error".into())), + } + } + + fn check_execution_proof( + &self, + _: &fetcher::RemoteCallRequest, + _: fetcher::StorageProof, + ) -> Result, ClientError> { + match self.ok { + true => Ok(vec![42]), + false => Err(ClientError::Backend("Test error".into())), + } + } + + fn check_changes_proof( + &self, + _: &fetcher::RemoteChangesRequest, + _: fetcher::ChangesProof + ) -> Result, u32)>, ClientError> { + match self.ok { + true => Ok(vec![(100.into(), 2)]), + false => Err(ClientError::Backend("Test error".into())), + } + } + + fn check_body_proof( + &self, + _: &fetcher::RemoteBodyRequest, + body: Vec + ) -> Result, ClientError> { + match self.ok { + true => Ok(body), + false => Err(ClientError::Backend("Test error".into())), + } + } + } + fn make_config() -> super::Config { super::Config::new(&ProtocolId::from(&b"foo"[..])) } + fn dummy_header() -> sp_test_primitives::Header { + sp_test_primitives::Header { + parent_hash: Default::default(), + number: 0, + state_root: Default::default(), + extrinsics_root: Default::default(), + digest: Default::default(), + } + } + struct EmptyPollParams(PeerId); impl PollParameters for EmptyPollParams { @@ -1341,7 +1435,7 @@ mod tests { ) -> LightClientHandler { let client = Arc::new(substrate_test_runtime_client::new()); - let checker = Arc::new(DummyFetchChecker::new(ok)); + let checker = Arc::new(DummyFetchChecker { ok, _mark: std::marker::PhantomData }); LightClientHandler::new(cf, client, checker, ps) } @@ -1468,7 +1562,7 @@ mod tests { } }; - behaviour.inject_node_event(peer.clone(), Event::Response(request_id, response)); + behaviour.inject_node_event(peer.clone(), Event::Response(request_id, Response::Light(response))); assert!(behaviour.peers.is_empty()); poll(&mut behaviour); // More progress @@ -1497,7 +1591,7 @@ mod tests { } }; - behaviour.inject_node_event(peer.clone(), Event::Response(2347895932, response)); + behaviour.inject_node_event(peer.clone(), Event::Response(2347895932, Response::Light(response))); assert!(behaviour.peers.is_empty()); poll(&mut behaviour); @@ -1539,7 +1633,7 @@ mod tests { } }; - behaviour.inject_node_event(peer.clone(), Event::Response(request_id, response)); + behaviour.inject_node_event(peer.clone(), Event::Response(request_id, Response::Light(response))); assert!(behaviour.peers.is_empty()); poll(&mut behaviour); // More progress @@ -1591,7 +1685,7 @@ mod tests { response: Some(api::v1::light::response::Response::RemoteCallResponse(r)) } }; - behaviour.inject_node_event(responding_peer, Event::Response(request_id, response.clone())); + behaviour.inject_node_event(responding_peer, Event::Response(request_id, Response::Light(response.clone()))); assert_matches!(poll(&mut behaviour), Poll::Ready(NetworkBehaviourAction::SendEvent { .. })); assert_matches!(chan.1.try_recv(), Ok(None)) } @@ -1604,7 +1698,7 @@ mod tests { response: Some(api::v1::light::response::Response::RemoteCallResponse(r)), } }; - behaviour.inject_node_event(responding_peer, Event::Response(request_id, response)); + behaviour.inject_node_event(responding_peer, Event::Response(request_id, Response::Light(response))); assert_matches!(poll(&mut behaviour), Poll::Pending); assert_matches!(chan.1.try_recv(), Ok(Some(Err(ClientError::RemoteFetchFailed)))) } @@ -1618,6 +1712,7 @@ mod tests { assert_eq!(1, behaviour.peers.len()); let response = match request { + Request::Body { .. } => unimplemented!(), Request::Header{..} => { let r = api::v1::light::RemoteHeaderResponse { header: dummy_header().encode(), @@ -1667,7 +1762,7 @@ mod tests { assert_eq!(1, behaviour.outstanding.len()); assert_eq!(1, *behaviour.outstanding.keys().next().unwrap()); - behaviour.inject_node_event(peer.clone(), Event::Response(1, response)); + behaviour.inject_node_event(peer.clone(), Event::Response(1, Response::Light(response))); poll(&mut behaviour); From 32e0c8d3d1d3d73219e3f6475d75cc396fa31606 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 1 Apr 2020 10:29:49 +0200 Subject: [PATCH 4/4] Address review --- client/network/src/protocol.rs | 44 +++++++--------------------------- client/network/src/service.rs | 4 +++- 2 files changed, 11 insertions(+), 37 deletions(-) diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 7a86c865424bf..92e06f611ec2f 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -553,20 +553,20 @@ impl Protocol { GenericMessage::Transactions(m) => self.on_extrinsics(who, m), GenericMessage::RemoteCallRequest(request) => self.on_remote_call_request(who, request), - GenericMessage::RemoteCallResponse(response) => - self.on_remote_call_response(who, response), + GenericMessage::RemoteCallResponse(_) => + warn!(target: "sub-libp2p", "Received unexpected RemoteCallResponse"), GenericMessage::RemoteReadRequest(request) => self.on_remote_read_request(who, request), - GenericMessage::RemoteReadResponse(response) => - self.on_remote_read_response(who, response), + GenericMessage::RemoteReadResponse(_) => + warn!(target: "sub-libp2p", "Received unexpected RemoteReadResponse"), GenericMessage::RemoteHeaderRequest(request) => self.on_remote_header_request(who, request), - GenericMessage::RemoteHeaderResponse(response) => - self.on_remote_header_response(who, response), + GenericMessage::RemoteHeaderResponse(_) => + warn!(target: "sub-libp2p", "Received unexpected RemoteHeaderResponse"), GenericMessage::RemoteChangesRequest(request) => self.on_remote_changes_request(who, request), - GenericMessage::RemoteChangesResponse(response) => - self.on_remote_changes_response(who, response), + GenericMessage::RemoteChangesResponse(_) => + warn!(target: "sub-libp2p", "Received unexpected RemoteChangesResponse"), GenericMessage::FinalityProofRequest(request) => self.on_finality_proof_request(who, request), GenericMessage::FinalityProofResponse(response) => @@ -1446,13 +1446,6 @@ impl Protocol { self.sync.on_finality_proof_import(request_block, finalization_result) } - fn on_remote_call_response( - &mut self, - _: PeerId, - _: message::RemoteCallResponse - ) { - } - fn on_remote_read_request( &mut self, who: PeerId, @@ -1567,13 +1560,6 @@ impl Protocol { ); } - fn on_remote_read_response( - &mut self, - _: PeerId, - _: message::RemoteReadResponse - ) { - } - fn on_remote_header_request( &mut self, who: PeerId, @@ -1604,13 +1590,6 @@ impl Protocol { ); } - fn on_remote_header_response( - &mut self, - _: PeerId, - _: message::RemoteHeaderResponse, - ) { - } - fn on_remote_changes_request( &mut self, who: PeerId, @@ -1672,13 +1651,6 @@ impl Protocol { ); } - fn on_remote_changes_response( - &mut self, - _: PeerId, - _: message::RemoteChangesResponse, B::Hash>, - ) { - } - fn on_finality_proof_request( &mut self, who: PeerId, diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 4625015c1ea49..22d4eb051952b 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -913,7 +913,9 @@ impl Future for NetworkWorker { if let Some(light_client_rqs) = this.light_client_rqs.as_mut() { while let Poll::Ready(Some(rq)) = light_client_rqs.poll_next_unpin(cx) { // This can error if there are too many queued requests already. - let _ = this.network_service.light_client_request(rq); + if this.network_service.light_client_request(rq).is_err() { + log::warn!("Couldn't start light client request: too many pending requests"); + } if let Some(metrics) = this.metrics.as_ref() { metrics.issued_light_requests.inc(); }