Skip to content

Commit

Permalink
US-317: PDT - Update listen to import all fields (#261)
Browse files Browse the repository at this point in the history
* US-317: PDT - Update listen to import all fields

* Fix listening block gap

* Fix miscounting blocks

* update image policy

* fix block listen

* undo image pull policy
  • Loading branch information
WuBruno authored Nov 1, 2023
1 parent f8ddb9b commit d0abd60
Show file tree
Hide file tree
Showing 9 changed files with 262 additions and 35 deletions.
124 changes: 124 additions & 0 deletions products/pdt/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions products/pdt/pdt/src/multi_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ pub async fn import<T: Importer, P: ZilliqaDBProject + std::marker::Sync>(
let coords = in_coords.with_client_id(&client_id);
println!("{:?}", coords);
let mut batch = 0;
let mut last_max = match start_block {
Some(x) => x,
None => 0,
};
let mut last_max = start_block.unwrap_or_default();
while match nr_batches {
None => true,
Some(val) => batch < val,
Expand Down
6 changes: 3 additions & 3 deletions products/pdt/pdtbq/src/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ impl Meta {
.query(&self.table.dataset.project_id, QueryRequest::new(&query))
.await?;
if result.next_row() {
let block = result
let start_block = result
.get_i64(0)?
.ok_or(anyhow!("No start_blk in record"))?;
let number = result.get_i64(1)?.ok_or(anyhow!("No nr_blks in record"))?;
Ok(Some(block + number))
let block_count = result.get_i64(1)?.ok_or(anyhow!("No nr_blks in record"))?;
Ok(Some(start_block + block_count - 1)) // inclusive of start_block => need subtract
} else {
Ok(None)
}
Expand Down
1 change: 1 addition & 0 deletions products/pdt/pdtdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ sqlx = { version = "0.7.1", features = [
"tls-native-tls",
] }
ethers = "2.0.10"
serde_with = "3.4.0"
1 change: 1 addition & 0 deletions products/pdt/pdtdb/src/values/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ mod transaction;

pub type BQTransaction = transaction::BQTransaction;
pub type PSQLTransaction = transaction::PSQLTransaction;
pub type ZILTransactionBody = transaction::ZILTransactionBody;
pub type BQMicroblock = microblock::BQMicroblock;
pub type PSQLMicroblock = microblock::PSQLMicroblock;
49 changes: 49 additions & 0 deletions products/pdt/pdtdb/src/values/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use pdtlib::proto::ProtoTransactionWithReceipt;
use primitive_types::{H160, H256};
use psql_derive::PSQLInsertable;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use sqlx::FromRow;
#[derive(Serialize, Deserialize, Clone, FromRow, PSQLInsertable, Debug, PartialEq)]
pub struct BQTransaction {
Expand Down Expand Up @@ -274,6 +275,31 @@ impl BQTransaction {
eth_transaction_type,
})
}

pub fn from_eth_with_zil_txn_bodies(
in_val: &Transaction,
zil_txn_body: &ZILTransactionBody,
zqversion: i64,
) -> Result<Self> {
let txn_body =
Self::from_eth(in_val, zqversion).expect("should be compatible with eth transactions");

let from_addr_zil = utils::maybe_hex_address_from_public_key(
zil_txn_body.sender_pub_key.as_bytes(),
utils::API::Zilliqa,
);
let raw_receipt = encode_u8(zil_txn_body.receipt.as_bytes());
Ok(BQTransaction {
code: zil_txn_body.code.clone(),
receipt: Some(zil_txn_body.receipt.clone()),
raw_receipt: Some(raw_receipt),
sender_public_key: Some(zil_txn_body.sender_pub_key.clone()),
from_addr_zil,
signature: Some(zil_txn_body.signature.clone()),
..txn_body
})
}

pub fn to_json(&self) -> Result<String> {
Ok(serde_json::to_string(self)?)
}
Expand Down Expand Up @@ -448,6 +474,29 @@ fn decode_u8(x: String) -> Vec<u8> {
.expect("base64-encoding should be decodeable")
}

#[serde_as]
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ZILTransactionBody {
#[serde(rename = "ID")]
pub id: String,
#[serde_as(as = "DisplayFromStr")]
pub amount: String,
pub code: Option<String>, // sometimes has
pub data: Option<String>,
#[serde_as(as = "DisplayFromStr")]
pub gas_limit: i64,
pub gas_price: String,
#[serde_as(as = "DisplayFromStr")]
pub nonce: i64,
pub receipt: String,
pub sender_pub_key: String,
pub signature: String,
pub to_addr: String,
#[serde_as(as = "DisplayFromStr")]
pub version: i64,
}

#[test]
fn check_involution() {
let bq_txn = BQTransaction {
Expand Down
2 changes: 1 addition & 1 deletion products/pdt/pdtlisten/src/importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl BatchedImporter {
if block.block < range.start {
self.range = Some(block.block..range.end)
} else if block.block > range.end {
self.range = Some(range.start..block.block)
self.range = Some(range.start..block.block + 1) // range end not inclusive
}
} else {
self.range = Some(block.block..(block.block + 1))
Expand Down
26 changes: 20 additions & 6 deletions products/pdt/pdtlisten/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@ mod importer;
mod listener;
mod types;
use anyhow::{anyhow, bail, Context, Result};
use ethers::{prelude::*, providers::StreamExt};
use ethers::{prelude::*, providers::StreamExt, utils::hex};
use jsonrpsee::{core::client::ClientT, http_client::HttpClient, rpc_params};
use pdtbq::{bq::ZilliqaBQProject, bq_utils::BigQueryDatasetLocation};
use pdtdb::{
utils::ProcessCoordinates,
values::{BQMicroblock, BQTransaction, PSQLMicroblock, PSQLTransaction},
values::{BQMicroblock, BQTransaction, PSQLMicroblock, PSQLTransaction, ZILTransactionBody},
zqproj::{Inserter, ZilliqaDBProject},
};
use pdtpsql::psql::ZilliqaPSQLProject;
use serde::Serialize;
use serde_json::{from_value, to_value, Value};
use sqlx::postgres::PgPoolOptions;
use std::{marker::PhantomData, time::Duration};
use std::{collections::HashMap, marker::PhantomData, time::Duration};
use tokio::pin;
use tokio::task::JoinSet;
use tokio_stream::StreamExt as TokioStreamExt;
Expand All @@ -33,15 +33,29 @@ async fn get_block_info(number: U64, client: &HttpClient) -> Result<types::GetTx

fn convert_block_and_txns(
block: &Block<Transaction>,
zil_txn_bodies: &Vec<ZILTransactionBody>,
) -> Result<(BQMicroblock, Vec<BQTransaction>)> {
let my_block = block.clone();
let bq_block = BQMicroblock::from_eth(&my_block)?;
let version = bq_block.header_version;
let zil_transactions: HashMap<&str, ZILTransactionBody> = zil_txn_bodies
.into_iter()
.map(|x| (x.id.as_str(), x.clone()))
.collect();

let mut txn_errs = vec![];
let bq_transactions: Vec<BQTransaction> = my_block
.transactions
.into_iter()
.map(|txn| BQTransaction::from_eth(&txn, version))
.map(|txn| {
if let Some(zil_txn_body) =
zil_transactions.get(hex::encode(&txn.hash.as_bytes()).as_str())
{
BQTransaction::from_eth_with_zil_txn_bodies(&txn, zil_txn_body, version)
} else {
Err(anyhow!("some transaction zil body not found"))
}
})
.filter_map(|r| r.map_err(|e| txn_errs.push(e)).ok())
.collect();
if !txn_errs.is_empty() {
Expand Down Expand Up @@ -264,9 +278,9 @@ pub async fn listen_bq(
bq_importer.reset_buffer(&zilliqa_bq_proj).await?;
}

for block in blocks {
for (block, zil_txn_bodies) in blocks {
// convert our blocks and insert it into our buffer
convert_block_and_txns(&block)
convert_block_and_txns(&block, &zil_txn_bodies)
.and_then(|(bq_block, bq_txns)| {
bq_importer.insert_into_buffer(bq_block, bq_txns)
})
Expand Down
Loading

0 comments on commit d0abd60

Please sign in to comment.