Skip to content

Commit

Permalink
add graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
bonedaddy committed Aug 23, 2021
1 parent 5afc36b commit 62f8cca
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 664 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ crossbeam = "0.8.1"
crossbeam-channel = "0.5.1"
crossbeam-utils = "0.8.5"
crossbeam-queue = "0.3.2"
signal-hook = "0.3.9"
[profile.release]
lto = "fat"
codegen-units = 1
Expand Down
59 changes: 12 additions & 47 deletions src/crank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use crate::{
config::{Configuration, ParsedMarketKeys},
};
use anyhow::{anyhow, format_err, Result};
use crossbeam::sync::WaitGroup;
use crossbeam::{select, sync::WaitGroup};
use crossbeam_channel::Receiver;
use crossbeam_queue::ArrayQueue;
use log::{debug, error, info, warn};
use safe_transmute::{
Expand Down Expand Up @@ -44,14 +45,21 @@ impl Crank {
pub fn new(config: Arc<Configuration>) -> Arc<Self> {
Arc::new(Self { config })
}
pub fn start(self: &Arc<Self>) -> Result<()> {
pub fn start(self: &Arc<Self>, exit_chan: Receiver<bool>) -> Result<()> {
let rpc_client = Arc::new(RpcClient::new(self.config.http_rpc_url.clone()));
let payer = Arc::new(self.config.payer());
let dex_program = Pubkey::from_str(self.config.crank.dex_program.as_str()).unwrap();
let market_keys = self.config.crank.market_keys(&rpc_client, dex_program)?;
let slot_height_map: Arc<RwLock<HashMap<String, u64>>> =
Arc::new(RwLock::new(HashMap::new()));
loop {
select! {
recv(exit_chan) -> _msg => {
warn!("caught exit signal");
return Ok(());
},
default => {}
}
let work_loop =
|market_key: &ParsedMarketKeys| -> Result<Option<Vec<Instruction>>> {
let event_q_value_and_context = rpc_client.get_account_with_commitment(
Expand Down Expand Up @@ -173,12 +181,10 @@ impl Crank {
account_metas.push(AccountMeta::new(**pubkey, false));
}
let instructions = consume_events_ix(
&rpc_client,
&dex_program,
&payer,
account_metas,
self.config.crank.events_per_worker,
&market_key.keys.event_q,
)?;
Ok(Some(instructions))
};
Expand Down Expand Up @@ -263,7 +269,7 @@ impl Crank {
// so split it up
let instructions_chunks = instructions.chunks(instructions.len() / 2);
info!("starting chunked crank instruction processing");
for (idx, chunk) in instructions_chunks.enumerate() {
for (_idx, chunk) in instructions_chunks.enumerate() {
let res = run_loop(&chunk.to_vec());
if res.is_err() {
error!(
Expand All @@ -281,7 +287,7 @@ impl Crank {
error!("failed to send crank instructions {:#?}", res.err());
} else {
info!(
"crank ran {}. processed {} instructions for {} markets: {:#?}",
"crank ran {} processed {} instructions for {} markets: {:#?}",
res.unwrap(),
instructions.len(),
instructions_markets.len(),
Expand Down Expand Up @@ -320,12 +326,10 @@ impl Crank {
}

fn consume_events_ix(
client: &Arc<RpcClient>,
program_id: &Pubkey,
payer: &Keypair,
account_metas: Vec<AccountMeta>,
to_consume: usize,
event_q: &Pubkey,
) -> Result<Vec<Instruction>> {
let instruction_data: Vec<u8> = MarketInstruction::ConsumeEvents(to_consume as u16).pack();
let instruction = Instruction {
Expand Down Expand Up @@ -447,42 +451,3 @@ pub struct MarketPubkeys {
pub pc_vault: Box<Pubkey>,
pub vault_signer_key: Box<Pubkey>,
}

fn consume_events_once(
client: &Arc<RpcClient>,
program_id: &Pubkey,
payer: &Keypair,
account_metas: Vec<AccountMeta>,
to_consume: usize,
event_q: &Pubkey,
) -> Result<Signature> {
let _start = std::time::Instant::now();
let instruction_data: Vec<u8> = MarketInstruction::ConsumeEvents(to_consume as u16).pack();
let instruction = Instruction {
program_id: *program_id,
accounts: account_metas,
data: instruction_data,
};
let random_instruction = solana_sdk::system_instruction::transfer(
&payer.pubkey(),
&payer.pubkey(),
rand::random::<u64>() % 10000 + 1,
);
let (recent_hash, _fee_calc) = client.get_recent_blockhash()?;
let txn = Transaction::new_signed_with_payer(
&[instruction, random_instruction],
Some(&payer.pubkey()),
&[payer],
recent_hash,
);

info!("Consuming events ...");
let signature = client.send_transaction_with_config(
&txn,
RpcSendTransactionConfig {
skip_preflight: true,
..RpcSendTransactionConfig::default()
},
)?;
Ok(signature)
}
34 changes: 31 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@
static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc;

use std::sync::Arc;

use crossbeam::sync::WaitGroup;
use crossbeam_channel;
use anyhow::{anyhow, Result};
use clap::{App, Arg, SubCommand};
use log::{error, info, warn};
use signal_hook::{
consts::{SIGINT, SIGQUIT, SIGTERM},
iterator::Signals,
};
pub mod config;
pub mod crank;

Expand Down Expand Up @@ -55,8 +60,31 @@ async fn process_matches<'a>(
false,
)?);
cfg.init_log(false)?;
let crank_turner = crank::Crank::new(cfg);
crank_turner.start()?;
let mut signals =
Signals::new(vec![SIGINT, SIGTERM, SIGQUIT]).expect("failed to registers signals");
let (s, r) = crossbeam_channel::unbounded();
let wg = WaitGroup::new();
{
let wg = wg.clone();
tokio::task::spawn(async move {
let crank_turner = crank::Crank::new(cfg);
let res = crank_turner.start(r);
if res.is_err() {
error!("encountered error while turning crank {:#?}", res.err());
}
drop(wg);
});
}
for signal in signals.forever() {
warn!("encountered exit signal {}", signal);
break;
}
let err = s.send(true);
if err.is_err() {
error!("failed to send exit notif {:#?}", err.err());
return Err(anyhow!("unexpected error during shutdown, failed to send exit notifications").into());
}
wg.wait()
}
_ => return Err(anyhow!("failed to match subcommand")),
}
Expand Down
Loading

0 comments on commit 62f8cca

Please sign in to comment.