Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[hash with a big number of files] Failed to execute: com.datastax.spark.connector.writer.RichBatchStatement #43

Open
EgorBu opened this issue Apr 11, 2018 · 5 comments

Comments

@EgorBu
Copy link

EgorBu commented Apr 11, 2018

Hi,
spark cassandra connector / scylla fails when you attempt to make hash step with a big number of files.
I tried 1M files - always fails, 300k files - unstable, some experiments can be completed, but after a several trials it fails with error. Before each new run of hash step I used reset_db but memory is not released from scylla (I'm not ure if it's correct behaviour of DB).

Error log ``` 18/04/10 23:04:51 ERROR QueryExecutor: Failed to execute: com.datastax.spark.connector.writer.RichBatchStatement@3ec8da3b com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query at consistency LOCAL_QUORUM (1 replica were required but only 0 acknowled ged the write) at com.datastax.driver.core.exceptions.WriteTimeoutException.copy(WriteTimeoutException.java:100) at com.datastax.driver.core.Responses$Error.asException(Responses.java:122) at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:506) at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1070) at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:993) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911) at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:934) at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:397) at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:302) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) at java.lang.Thread.run(Thread.java:748) Caused by: com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query at consistency LOCAL_QUORUM (1 replica were required but only 0 acknowledged the write) at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:59) at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:37) at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:289) at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:269) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88) ... 18 more ```
@vmarkovtsev
Copy link
Collaborator

Having a write timeout is totally logical on huge batches (though says something about the connector/protocol/server quality without splitting and heartbeat). Can you please test on Cassandra instead of Scylla?

@EgorBu
Copy link
Author

EgorBu commented Apr 11, 2018

I will try, but it's still strange. Because several attempts are successful, then it starts to fail. + It was possible to make hash step on 300k files after increasing the default parallelism from 300 to 1200

@r0mainK
Copy link
Contributor

r0mainK commented Apr 25, 2018

Had similar problem writing a large number of bag batches to scylla, see error log below, seems the solution is simply to increase spark.default.parallelism as Egor did (in my case increasing value from 360 to 720 was enough). I think we could simply add a troubleshooting section in the doc to warn about this, with the solution to this bug.

WARN TaskSetManager: Lost task 233.0 in stage 18.0 (TID 3855, 10.2.7.95, executor 13): java.io.IOException: Failed to write statements to apollo.bags.
	at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:207)
	at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:175)
	at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
	at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
	at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
	at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
	at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:175)
	at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:162)
	at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:149)
	at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
	at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

@r0mainK
Copy link
Contributor

r0mainK commented Apr 26, 2018

So since yesterday I saw that I actually was excluding a lot of files, and when rerunning I had to change the number of partitions to a much higher value (13680 partitions) which caused the error below (nothing fatal but still) and slowed down execution. So while I still think we should inform somewhere that the number of partitions has to be increased when encountering this error, I think that we should repartition the data before writing to DB (in bags and hashes commands) to a value proportional to spark.default.parallelism. Tests could be done to optimize the coefficient (at first guess value should be between 5~10). That way we should get best of both worlds

ERROR Utils: Uncaught exception in thread heartbeat-receiver-event-loop-thread
java.lang.NullPointerException
	at org.apache.spark.util.CollectionAccumulator.value(AccumulatorV2.scala:469)
	at org.apache.spark.util.CollectionAccumulator.value(AccumulatorV2.scala:444)
	at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$8$$anonfun$9.apply(TaskSchedulerImpl.scala:458)
	at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$8$$anonfun$9.apply(TaskSchedulerImpl.scala:458)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$8.apply(TaskSchedulerImpl.scala:458)
	at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$8.apply(TaskSchedulerImpl.scala:457)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
	at org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:457)
	at org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2$$anonfun$run$2.apply$mcV$sp(HeartbeatReceiver.scala:129)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1317)
	at org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2.run(HeartbeatReceiver.scala:128)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

@r0mainK
Copy link
Contributor

r0mainK commented May 10, 2018

I modified a bit the yaml files provided here by IBM: https://github.com/IBM/Scalable-Cassandra-deployment-on-Kubernetes to check if using Cassandra also gets rid of the problem on K8, if thats the case I think we ought to close this issue and advise people to avoid Scylla

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants