Skip to content

Commit

Permalink
iox-#5 Add multi threaded subscriber teset
Browse files Browse the repository at this point in the history
  • Loading branch information
elBoberido committed Jun 30, 2022
1 parent 60f8de1 commit 75ea0dc
Showing 1 changed file with 46 additions and 1 deletion.
47 changes: 46 additions & 1 deletion src/tests/basic_pub_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use crate::SubscriberBuilder;

use anyhow::{anyhow, Result};

use std::thread;

#[repr(C)]
#[derive(Default)]
struct Counter {
Expand All @@ -19,7 +21,7 @@ struct Counter {
unsafe impl ShmSend for Counter {}

#[test]
fn basic_pub_sub() -> Result<()> {
fn single_threaded_subscriber() -> Result<()> {
let _roudi = RouDiEnvironment::new();

Runtime::init("basic_pub_sub");
Expand All @@ -46,5 +48,48 @@ fn basic_pub_sub() -> Result<()> {
_ => return Err(anyhow!("Could not read sample")),
}

publisher.stop_offer();
subscriber.unsubscribe(sample_receiver);

Ok(())
}

#[test]
fn multi_threaded_subscriber() -> Result<()> {
let _roudi = RouDiEnvironment::new();

Runtime::init("basic_pub_sub");

let (subscriber, sample_receive_token) =
SubscriberBuilder::<Counter>::new("Test", "BasicPubSub", "Counter")
.queue_capacity(5)
.create_mt()?;

let publisher = PublisherBuilder::<Counter>::new("Test", "BasicPubSub", "Counter").create()?;

let mut sample = publisher.loan()?;

const SEND_COUNTER: u32 = 42;
sample.counter = SEND_COUNTER;
publisher.publish(sample);

let sample_receiver = subscriber.get_sample_receiver(sample_receive_token);

let th = thread::spawn(move || {
assert!(sample_receiver.has_data());

match sample_receiver.take() {
Some(sample) => assert_eq!(sample.counter, SEND_COUNTER),
_ => assert!(false, "no sample received"),
}

sample_receiver
});

let sample_receiver = th.join().map_err(|_| anyhow!("could not join threads"))?;

publisher.stop_offer();
subscriber.unsubscribe(sample_receiver);

Ok(())
}

0 comments on commit 75ea0dc

Please sign in to comment.