You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I came across an interesting problem while reading multiple files via a subclass of _ParallelRead: With the current design _ParallelRead loads all files to be processed into RAM, decides which tasks to do first and then starts the parallel reading.
I have a folder with over 1.3 million files which needs to be processed on a cloud storage. It takes ages to get to the point that mara starts reading (and when you have a invalid file, all starts over again...)
In addition, it looks to me that the calculation is inefficient when one would load a lot of files which have different sizes (= different processing times).
Here is what I came up with
I redesigned the base class ParallelTask to support using generic Worker nodes instead of tasks. This is an optional mode which needs to be activated with self.use_workers = True.
The Worker nodes will get their commands during runtime of the pipeline (in contrast to the Task node which requires that its commands are defined upfront).
An additional function feed_workers in class ParallelTask can be overloaded. This method is run in a separate process during pipeline execution and yields the commands which are then passed over to the workers. You can eigher yield a single command or a command list. In case you yield a command list, the list is only passed to a single worker. (This is necessary because in some cases you want to execute several commands in order for a single file).
Since the workers now get their files / commands passed on runtime form an "message queue", I expect this logic to work better when many files need to be processed.
This new design does not work for all _ParallelRead execution options, so it is only used when possible.
When the feed_workers function throws an exception all commands already in the queue will be processed by the worker tasks. There is no implementation done to inform the worker nodes that they should stop their work. They will stop when all open commands in the queue picket up.
When a worker node fails, the other workers and the feed woerker process will continue their work until the Queue is empty.
This implementation only works with ReadMode.ALL.
The text was updated successfully, but these errors were encountered:
User story
I came across an interesting problem while reading multiple files via a subclass of
_ParallelRead
: With the current design_ParallelRead
loads all files to be processed into RAM, decides which tasks to do first and then starts the parallel reading.I have a folder with over 1.3 million files which needs to be processed on a cloud storage. It takes ages to get to the point that mara starts reading (and when you have a invalid file, all starts over again...)
In addition, it looks to me that the calculation is inefficient when one would load a lot of files which have different sizes (= different processing times).
Here is what I came up with
I redesigned the base class
ParallelTask
to support using genericWorker
nodes instead of tasks. This is an optional mode which needs to be activated withself.use_workers = True
.The
Worker
nodes will get their commands during runtime of the pipeline (in contrast to theTask
node which requires that its commands are defined upfront).An additional function
feed_workers
in classParallelTask
can be overloaded. This method is run in a separate process during pipeline execution and yields the commands which are then passed over to the workers. You can eigher yield a single command or a command list. In case you yield a command list, the list is only passed to a single worker. (This is necessary because in some cases you want to execute several commands in order for a single file).Since the workers now get their files / commands passed on runtime form an "message queue", I expect this logic to work better when many files need to be processed.
This new design does not work for all
_ParallelRead
execution options, so it is only used when possible.PR: #74
Some points to note
feed_workers
function throws an exception all commands already in the queue will be processed by the worker tasks. There is no implementation done to inform the worker nodes that they should stop their work. They will stop when all open commands in the queue picket up.This implementation only works with
ReadMode.ALL
.The text was updated successfully, but these errors were encountered: