Skip to content

Commit

Permalink
Fixes around service discovery, engine and consumer. (#327)
Browse files Browse the repository at this point in the history
* chore: update dependencies
* fix(service-discovery): implements fast path on lookup for partitioned topic
* fix(consumer): do not poll refresh topics if consumer isn't using a regex for topic discovery
* fix(engine): handle underflow on batch size when receiving a batch message
* ci: enable tests to run on newer pulsar version

Signed-off-by: Florentin Dubois <[email protected]>
Co-authored-by: @Wonshtrum <[email protected]>
Co-authored-by: @KannarFr <[email protected]>
  • Loading branch information
3 people authored Jan 6, 2025
1 parent 715411c commit ba58c9c
Show file tree
Hide file tree
Showing 13 changed files with 141 additions and 63 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
pulsar-version: [ 2.10.4, 2.11.2, 3.0.4, 3.1.3 ]
pulsar-version: [ 2.10.6, 2.11.4, 3.0.8, 3.2.4, 3.3.3, 4.0.1 ]
steps:
- name: Start Pulsar Standalone Container
run: docker run --name pulsar -p 6650:6650 -p 8080:8080 -d -e GITHUB_ACTIONS=true -e CI=true apachepulsar/pulsar:${{ matrix.pulsar-version }} bin/pulsar standalone
Expand Down
53 changes: 25 additions & 28 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,53 +17,50 @@ keywords = ["pulsar", "api", "client"]

[dependencies]
async-channel = "^2.3.1"
async-trait = "^0.1.81"
async-std = { version = "^1.12.0", features = ["attributes", "unstable"], optional = true }
async-trait = "^0.1.83"
async-std = { version = "^1.13.0", features = ["attributes", "unstable"], optional = true }
async-native-tls = { version = "^0.5.0", optional = true }
asynchronous-codec = { version = "^0.7.0", optional = true }
bytes = "^1.6.1"
bit-vec = "^0.8.0"
chrono = { version = "^0.4.38", default-features = false, features = ["clock", "std"] }
bytes = "^1.9.0"
chrono = { version = "^0.4.39", default-features = false, features = ["clock", "std"] }
crc = "^3.2.1"
data-url = { version = "^0.3.1", optional = true }
flate2 = { version = "^1.0.30", optional = true }
futures = "^0.3.30"
futures-io = "^0.3.30"
futures-timer = "^3.0.3"
flate2 = { version = "^1.0.35", optional = true }
futures = "^0.3.31"
futures-rustls = { version = "^0.26.0", optional = true } # replacement of crate async-rustls (also a fork of tokio-rustls)
log = "^0.4.22"
lz4 = { version = "^1.26.0", optional = true }
lz4 = { version = "^1.28.0", optional = true }
native-tls = { version = "^0.2.12", optional = true }
nom = { version = "^7.1.3", default-features = false, features = ["alloc"] }
openidconnect = { version = "^3.5.0", optional = true }
oauth2 = { version = "^4.4.1", optional = true }
oauth2 = { version = "^4.4.2", optional = true }
pem = "^3.0.4"
prost = "^0.13.1"
prost-derive = "^0.13.1"
prost = "^0.13.4"
prost-derive = "^0.13.4"
rand = "^0.8.5"
regex = "^1.10.5"
rustls = { version = "^0.23.12", optional = true }
regex = "^1.11.1"
rustls = { version = "^0.23.20", optional = true }
snap = { version = "^1.1.1", optional = true }
serde = { version = "^1.0.204", features = ["derive"], optional = true }
serde_json = { version = "^1.0.121", optional = true }
tokio = { version = "^1.39.2", features = ["rt", "net", "time"], optional = true }
tokio-util = { version = "^0.7.11", features = ["codec"], optional = true }
tokio-rustls = { version = "^0.26.0", optional = true }
serde = { version = "^1.0.216", features = ["derive"], optional = true }
serde_json = { version = "^1.0.133", optional = true }
tokio = { version = "^1.42.0", features = ["rt", "net", "time"], optional = true }
tokio-util = { version = "^0.7.13", features = ["codec"], optional = true }
tokio-rustls = { version = "^0.26.1", optional = true }
tokio-native-tls = { version = "^0.3.1", optional = true }
tracing = { version = "^0.1.40", optional = true }
url = "^2.5.2"
uuid = { version = "^1.10.0", features = ["v4", "fast-rng"] }
webpki-roots = { version = "^0.26.3", optional = true }
tracing = { version = "^0.1.41", optional = true }
url = "^2.5.4"
uuid = { version = "^1.11.0", features = ["v4", "fast-rng"] }
webpki-roots = { version = "^0.26.7", optional = true }
zstd = { version = "^0.13.2", optional = true }

[dev-dependencies]
env_logger = "^0.11.5"
serde = { version = "^1.0.204", features = ["derive"] }
serde_json = "^1.0.121"
tokio = { version = "^1.39.2", features = ["macros", "rt-multi-thread"] }
serde = { version = "^1.0.216", features = ["derive"] }
serde_json = "^1.0.133"
tokio = { version = "^1.42.0", features = ["macros", "rt-multi-thread"] }

[build-dependencies]
prost-build = "^0.13.1"
prost-build = "^0.13.4"
protobuf-src = { version = "^2.1.0", optional = true }

[features]
Expand Down
6 changes: 3 additions & 3 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl SerializeMessage for () {
}
}

impl<'a> SerializeMessage for &'a [u8] {
impl SerializeMessage for &[u8] {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
fn serialize_message(input: Self) -> Result<producer::Message, Error> {
Ok(producer::Message {
Expand Down Expand Up @@ -102,7 +102,7 @@ impl SerializeMessage for String {
}
}

impl<'a> SerializeMessage for &'a String {
impl SerializeMessage for &String {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
fn serialize_message(input: Self) -> Result<producer::Message, Error> {
let payload = input.as_bytes().to_vec();
Expand All @@ -113,7 +113,7 @@ impl<'a> SerializeMessage for &'a String {
}
}

impl<'a> SerializeMessage for &'a str {
impl SerializeMessage for &str {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
fn serialize_message(input: Self) -> Result<producer::Message, Error> {
let payload = input.as_bytes().to_vec();
Expand Down
13 changes: 12 additions & 1 deletion src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,18 +348,21 @@ impl<Exe: Executor> ConnectionSender<Exe> {
self.tx.send(messages::ping()).await?,
) {
(Ok(_), ()) => {
debug!("set timeout to {:?} for ping-pong", self.operation_timeout);
let delay_f = self.executor.delay(self.operation_timeout);
pin_mut!(response);
pin_mut!(delay_f);

match select(response, delay_f).await {
Either::Left((res, _)) => res
.map_err(|oneshot::Canceled| {
error!("connection-sender: send ping, we have been canceled");
self.error.set(ConnectionError::Disconnected);
ConnectionError::Disconnected
})
.map(move |_| trace!("received pong from {}", self.connection_id)),
Either::Right(_) => {
error!("connection-sender: send ping, we did not received pong inside the timed out");
self.error.set(ConnectionError::Io(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"timeout when sending ping to the Pulsar server",
Expand Down Expand Up @@ -652,6 +655,10 @@ impl<Exe: Executor> ConnectionSender<Exe> {
response
.await
.map_err(|oneshot::Canceled| {
error!(
"response has been canceled (key = {:?}), we are disconnected",
k
);
error.set(ConnectionError::Disconnected);
ConnectionError::Disconnected
})
Expand All @@ -670,12 +677,16 @@ impl<Exe: Executor> ConnectionSender<Exe> {
let connection_id = self.connection_id;
let error = self.error.clone();
let delay_f = self.executor.delay(self.operation_timeout);
trace!(
"Create timeout futures with operation timeout at {:?}",
self.operation_timeout
);
let fut = async move {
pin_mut!(response);
pin_mut!(delay_f);
match select(response, delay_f).await {
Either::Left((res, _)) => {
// println!("recv msg: {:?}", res);
debug!("Received response: {:?}", res);
res
}
Either::Right(_) => {
Expand Down
6 changes: 4 additions & 2 deletions src/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,13 +486,15 @@ impl<Exe: Executor> ConnectionManager<Exe> {
// in a mutex, and a case appears where the Arc is cloned
// somewhere at the same time, that just means the manager
// will create a new connection the next time it is asked
let strong_count = Arc::strong_count(conn);
trace!(
"checking connection {}, is valid? {}, strong_count {}",
conn.id(),
conn.is_valid(),
Arc::strong_count(conn)
strong_count
);
conn.is_valid() && Arc::strong_count(conn) > 1

conn.is_valid() && strong_count > 1
}
});
}
Expand Down
8 changes: 3 additions & 5 deletions src/consumer/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,7 @@ impl<Exe: Executor> ConsumerBuilder<Exe> {
// Checks the builder for inconsistencies
// returns a config and a list of topics with associated brokers
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
async fn validate<T: DeserializeMessage>(
self,
) -> Result<(ConsumerConfig, Vec<(String, BrokerAddress)>), Error> {
async fn validate(self) -> Result<(ConsumerConfig, Vec<(String, BrokerAddress)>), Error> {
let ConsumerBuilder {
pulsar,
topics,
Expand Down Expand Up @@ -272,7 +270,7 @@ impl<Exe: Executor> ConsumerBuilder<Exe> {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn build<T: DeserializeMessage>(self) -> Result<Consumer<T, Exe>, Error> {
// would this clone() consume too much memory?
let (config, joined_topics) = self.clone().validate::<T>().await?;
let (config, joined_topics) = self.clone().validate().await?;

let consumers = try_join_all(joined_topics.into_iter().map(|(topic, addr)| {
TopicConsumer::new(self.pulsar.clone(), topic, addr, config.clone())
Expand Down Expand Up @@ -322,7 +320,7 @@ impl<Exe: Executor> ConsumerBuilder<Exe> {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn into_reader<T: DeserializeMessage>(self) -> Result<Reader<T, Exe>, Error> {
// would this clone() consume too much memory?
let (mut config, mut joined_topics) = self.clone().validate::<T>().await?;
let (mut config, mut joined_topics) = self.clone().validate().await?;

// Internally, the reader interface is implemented as a consumer using an exclusive,
// non-durable subscription
Expand Down
78 changes: 64 additions & 14 deletions src/consumer/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub struct ConsumerEngine<Exe: Executor> {
event_rx: mpsc::UnboundedReceiver<EngineEvent<Exe>>,
event_tx: UnboundedSender<EngineEvent<Exe>>,
batch_size: u32,
remaining_messages: u32,
remaining_messages: i64,
unacked_message_redelivery_delay: Option<Duration>,
unacked_messages: HashMap<MessageIdData, Instant>,
dead_letter_policy: Option<DeadLetterPolicy>,
Expand Down Expand Up @@ -83,7 +83,7 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
event_rx,
event_tx,
batch_size,
remaining_messages: batch_size,
remaining_messages: batch_size as i64,
unacked_message_redelivery_delay,
unacked_messages: HashMap::new(),
dead_letter_policy,
Expand All @@ -108,10 +108,12 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
}
}
}

let send_end_res = event_tx.send(mapper(None)).await;
if let Err(err) = send_end_res {
log::error!("Error sending end event to channel - {err}");
}

log::warn!("rx terminated");
}))
}
Expand Down Expand Up @@ -146,37 +148,78 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
})?;
}

if self.remaining_messages < self.batch_size.div_ceil(2) {
// In the casual workflow, we use the `batch_size` as a maximum number that we could
// send using a send flow command message to ask the broker to send us
// messages, which is why we have a subtraction below (`batch_size` -
// `remaining_messages`).
//
// In the special case of batch messages (which is defined by clients), the number of
// messages could be greater than the given batch size and the remaining
// messages goes negative, which is why we use an `i64` as we want to keep
// track of the number of negative messages as the next send flow will be
// the batch_size - minus remaining messages which allow us to retrieve the casual
// workflow of flow command messages.
//
// Here is the example of it works for a batch_size at 1000 and the error case:
//
// ```
// Message (1) -> (batch_size = 1000, remaing_messages = 999, no flow message trigger)
// ... 499 messages later ...
// Message (1) -> (batch_size = 1000, remaing_messages = 499, flow message trigger, ask broker to send => batch_size - remaining_messages = 501)
// ... 200 messages later ...
// BatchMessage (1024) -> (batch_size = 1000, remaining_messages = 4294967096, no flow message trigger) [underflow on remaining messages - without the patch]
// BatchMessage (1024) -> (batch_size = 1000, remaining_messages = -1124, flow message trigger, ask broker to send => batch_size - remaining_messages = 2124) [no underflow on remaining messages - with the patch]
// ```
if self.remaining_messages < (self.batch_size.div_ceil(2) as i64) {
match self
.connection
.sender()
.send_flow(self.id, self.batch_size - self.remaining_messages)
.send_flow(
self.id,
(self.batch_size as i64 - self.remaining_messages) as u32,
)
.await
{
Ok(()) => {}
Err(ConnectionError::Disconnected) => {
self.reconnect().await?;
self.connection
.sender()
.send_flow(self.id, self.batch_size - self.remaining_messages)
.await?;
debug!("consumer engine: consumer connection disconnected, trying to reconnect");
continue;
}
// we don't need to handle the SlowDown error, since send_flow waits on the
// channel to be not full
Err(e) => return Err(e.into()),
Err(e) => {
error!("consumer engine: we got a unrecoverable connection error, {e}");
return Err(e.into());
}
}
self.remaining_messages = self.batch_size;

self.remaining_messages = self.batch_size as i64;
}

match Self::timeout(self.event_rx.next(), Duration::from_secs(1)).await {
Err(_timeout) => {}
Err(_) => {
// If you are reading this comment, you may have an issue where you have
// received a batched message that is greater that the batch
// size and then break the way that we send flow command message.
//
// In that case, you could increase your batch size or patch this driver by
// adding the following line, if you are sure that you have
// at least 1 incoming message per second.
//
// ```rust
// self.remaining_messages = 0;
// ```
debug!("consumer engine: timeout (1s)");
}
Ok(Some(EngineEvent::Message(msg))) => {
debug!("consumer engine: received message, {:?}", msg);
let out = self.handle_message_opt(msg).await;
if let Some(res) = out {
return res;
}
}
Ok(Some(EngineEvent::EngineMessage(msg))) => {
debug!("consumer engine: received engine message");
let continue_loop = self.handle_ack_opt(msg).await;
if !continue_loop {
return Ok(());
Expand All @@ -203,8 +246,14 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
self.remaining_messages -= message
.payload
.as_ref()
.and_then(|payload| payload.metadata.num_messages_in_batch)
.unwrap_or(1i32) as u32;
.and_then(|payload| {
debug!(
"Consumer: received message payload, num_messages_in_batch = {:?}",
payload.metadata.num_messages_in_batch
);
payload.metadata.num_messages_in_batch
})
.unwrap_or(1) as i64;

match self.process_message(message).await {
// Continue
Expand Down Expand Up @@ -571,6 +620,7 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
)
.await?;

self.remaining_messages = self.batch_size as i64;
self.messages_rx = Some(messages);

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ mod tests {
msg: u32,
}

impl<'a> SerializeMessage for &'a TestData {
impl SerializeMessage for &TestData {
fn serialize_message(input: Self) -> Result<producer::Message, Error> {
let payload = serde_json::to_vec(&input).map_err(|e| Error::Custom(e.to_string()))?;
Ok(producer::Message {
Expand Down
8 changes: 5 additions & 3 deletions src/consumer/multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,11 @@ impl<T: 'static + DeserializeMessage, Exe: Executor> Stream for MultiTopicConsum
}
}

if let Poll::Ready(Some(_)) = self.refresh.as_mut().poll_next(cx) {
self.update_topics();
return self.poll_next(cx);
if self.topic_regex.is_some() {
if let Poll::Ready(Some(_)) = self.refresh.as_mut().poll_next(cx) {
self.update_topics();
return self.poll_next(cx);
}
}

let mut topics_to_remove = Vec::new();
Expand Down
5 changes: 4 additions & 1 deletion src/consumer/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,10 @@ impl<T: DeserializeMessage, Exe: Executor> Stream for TopicConsumer<T, Exe> {
self.messages_received += 1;
Poll::Ready(Some(Ok(self.create_message(id, payload))))
}
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
Poll::Ready(Some(Err(e))) => {
error!("we are using in the single-consumer and we got an error, {e}");
Poll::Ready(Some(Err(e)))
}
}
}
}
Loading

0 comments on commit ba58c9c

Please sign in to comment.