Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Seeing some backlog pending on Pulsar UI even if Spark consumer has consumed all data. #176

Open
akshay-habbu opened this issue Apr 3, 2024 · 9 comments
Labels

Comments

@akshay-habbu
Copy link

akshay-habbu commented Apr 3, 2024

[ Disclaimer - I am fairly new with Pulsar so I might not understand all the pulsar details but I have been using spark from a while now. ]
I am using Apache Spark consumer for consuming data from Pulsar on AWS EMR. I am using steamnative pulsar-spark connector.
my version stack looks like this
Spark Version- 3.4.1
Pulsar Version- 2.10.0.7
streamnative connector - pulsar-spark-connector_2.12-3.4.0.3.jar

I have created a new pulsar topic and started a fresh spark consumer on that topic, the consumer is able to connect to the topic and consume messages correctly. the only issue I have is with the backlog numbers displayed on the pulsar admin UI.
Screenshot 2024-04-03 at 7 59 40 PM

To Reproduce
Steps to reproduce the behavior:
Create a spark consumer using following code

val spark = SparkSession.builder
  .appName("pulsar_streaming_test_app")
  .enableHiveSupport()
  .getOrCreate()
spark.sparkContext.setLogLevel("WARN")

val optionsMap: mutable.Map[String, String] = mutable.Map[String, String]()
optionsMap.put("service.url", "pulsar://pulsar-service.url:6650")
optionsMap.put("admin.url", "pulsar://pulsar-admin.url:8080")
optionsMap.put("pulsar.producer.batchingEnabled", "false")
optionsMap.put("topic", "topic-name")
optionsMap.put("predefinedSubscription", "existing-subscription-name")
optionsMap.put("subscriptionType", "Exclusive/Shared")
optionsMap.put("startingOffsets", "latest")

val data = spark.readStream.format("pulsar").options(optionsMap).load()

data.writeStream
  .format("parquet")
  .option("checkpointLocation", "checkpoint/path")
  .option("path", "output/path")
  .start()
  .awaitTermination()

Also there is a side problem not very important but seems like spark does not create new subscription on its own, the job keeps on failing with

Caused by: org.apache.pulsar.client.api.PulsarClientException: {"errorMsg":"Subscription does not exist","reqId":1663032428812969942, "remote":"pulsar-broker-21/172.31.203.70:6650", "local":"/ip:46010"}

The only way I make it work is by creating a subscription manually on pulsar end and using predefinedSubscription option in spark to latch on to that subscription
I tried passing pulsar.reader.subscriptionName, pulsar.consumer.subscriptionName, subscriptionName while running job but it failed with same error.

Any help would be much appreciated.

@nlu90
Copy link
Collaborator

nlu90 commented Apr 12, 2024

@akshay-habbu

  1. Where do you access the UI? It's not StreamNative Cloud Console.

  2. For the Subscription does not exist issue, it may be caused by some Pulsar side configuration. Specifically, the following one:

 178 # Enable subscription auto creation if new consumer connected (disable auto creation with value false)
 179 allowAutoSubscriptionCreation=true

@akshay-habbu
Copy link
Author

@nlu90

Thanks for responding.

  1. The UI is not streamnative cloud console, its pulsar admin UI that comes default with pulsar, same backlog is observed on the pulsar metrics as well.
"spark-consumer" : {
      "msgRateOut" : 3872.5419823435427,
      "msgThroughputOut" : 9793786.857122486,
      "bytesOutCounter" : 2836729487,
      "msgOutCounter" : 1079519,
      "msgRateRedeliver" : 0.0,
      "messageAckRate" : 0.0,
      "chunkedMessageRate" : 0,
      "msgBacklog" : 249731,
      "backlogSize" : 0,
      "earliestMsgPublishTimeInBacklog" : 0,
      "msgBacklogNoDelayed" : 249731,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "msgRateExpired" : 0.0,
      "totalMsgExpired" : 0,
      "lastExpireTimestamp" : 0,
      "lastConsumedFlowTimestamp" : 0,
      "lastConsumedTimestamp" : 0,
      "lastAckedTimestamp" : 0,
      "lastMarkDeleteAdvancedTimestamp" : 0,
      "consumers" : [ {
        "msgRateOut" : 3872.5419823435427,
        "msgThroughputOut" : 9793786.857122486,
        "bytesOutCounter" : 62749236,
        "msgOutCounter" : 25000,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 0.0,
        "chunkedMessageRate" : 0.0,
        "availablePermits" : 0,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 0
      } ],
      "isDurable" : true,
      "isReplicated" : false,
      "allowOutOfOrderDelivery" : false,
      "consumersAfterMarkDeletePosition" : { },
      "nonContiguousDeletedMessagesRanges" : 0,
      "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
      "subscriptionProperties" : { },
      "durable" : true,
      "replicated" : false
    },
  1. yes the issue was pulsar configuration which was preventing us from creating a new subscription. Changing the pulsar config for that namespace helped. Thanks

@nlu90
Copy link
Collaborator

nlu90 commented Apr 16, 2024

@akshay-habbu Just FYI, during the spark job execution, it spawns new consumer/reader to consume messages from the last committed position. That's why you may observe some backlog.

Do you see you job is proceeding and the backlog changes after each micro-batch?

@akshay-habbu
Copy link
Author

akshay-habbu commented Apr 16, 2024

@nlu90
Yes the consumer job is progressing just fine, the job is able to process data and write to output stream.
I have tested same on scale and I have seen spark spawning new temporary readers and when the readers come the backlog reduces to ~5k from 100k temporarily and as soon as the reader goes away the backlog jumps back to ~100k
I believe that the consume is on par with topic and running on latest offset but backlog shows higher number for some reason.
Do you all see similar backlogs? Or are there any other configs that seems missing from my end?

@nlu90
Copy link
Collaborator

nlu90 commented Apr 16, 2024

@akshay-habbu We haven't heard any report for this issue from other users for now.

One possible thing is these backlogged subscription are not the one being actively used and probably is the left-over subscriptions from your previous round of test.

@akshay-habbu
Copy link
Author

I have tried with multiple names and different topic, same behaviour is observed

@sbandaru
Copy link

@akshay-habbu Hello, have you ever been able to figure out how to reduce backlog? I am seeing exactly the same issue on my end. Also, using "predefinedSubscription" vs auto creation has any impact on the backlog at all in your experience?

@murugancmi
Copy link

I am also facing the same issue after completing the pipeline. There is still a message in the backlog

@mjraa
Copy link

mjraa commented Jan 9, 2025

I have been able to reproduce at least a similar behaviour, with Pulsar version 4.0.1 and the latest Databricks runtime. After reading all the messages, the backlog displays the total number of messages.

Correct me if I am wrong, this connector seems to be using the reader interface, instead of the consumer one (relevant PR). According to the reader interface Pulsar docs (here and here), it is up to the client to manage the cursor and there is no need to acknowledge the messages, which would explain why the backlog displays the total number of messages to be consumed, since Spark manages this with checkpoints and would not acknowledge messages.

However, I have also noticed the subscription mode is set to durable (also visible in @akshay-habbu's comment in the isDurable field). In fact, after shutting down the connection the subscription remains existing. And the docs state that:

Internally, the reader interface is implemented as a consumer using an exclusive, non-durable subscription to the topic with a randomly-allocated name.

So if this connector is using the reader interface, how is a durable subscription being created? I am not a Scala expert, but it seems that this connector references the Pulsar Java client to create the reader, so could this indicate an issue in that dependency?

Furthermore, I tested creating a reader with the native Python client. A non-durable subscription is created and the backlog is reduced to zero after reading all messages (!) while the connection exists.

So I wonder if this is related: if a durable subscription is being created, perhaps Pulsar is expecting the client to acknowledge the messages, which is not happening.

But to end with the main question: why is a durable subscription created, if the connector uses the reader interface?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants