-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for capturing return values from worker functions #15
base: master
Are you sure you want to change the base?
Conversation
This matches the behaviour of multiprocessing.Pool and ThreadPool.
Codecov ReportBase: 91.34% // Head: 90.02% // Decreases project coverage by
Additional details and impacted files@@ Coverage Diff @@
## master #15 +/- ##
==========================================
- Coverage 91.34% 90.02% -1.32%
==========================================
Files 5 5
Lines 358 361 +3
==========================================
- Hits 327 325 -2
- Misses 31 36 +5
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report at Codecov. |
Not sure what's up with the CI 🤔 |
Thanks for this MR! Now, I would like to be a bit cautious with this one. I can immediately see use cases for this, e.g. aggregating metadata like event counters across workers for which so far you would need some per-worker counter: per_worker_counter = psh.alloc(1, dtype=int, per_worker=True)
def kernel(worker_id, ...):
per_worker_counter[worker_id] += processed_events
num_events = per_worker_counter.sum() With this, one can simple return the result and have it aggregate for you: def kernel(worker_id, ...):
return processed_events
num_events = sum(psh.map(kernel, ...)) That being said, it opens up the possibility for a serious mispattern of pushing around the actual worker results. I suppose having Could you please run some numbers of the performance impact in edge situations, say very short worker functions for very long iterables? |
Sure, with: %%time
def foo(worker_id, index, value):
return value
processes_times = pd.DataFrame(columns=["Input size", "Running time"])
pasha.set_default_context("processes", num_workers=psutil.cpu_count())
for i in range(1, 40):
start = time.perf_counter()
pasha.map(foo, range(i**4))
running_time = time.perf_counter() - start
processes_times.loc[i] = [i**4, running_time] I get this on the And on the version in the (I was too lazy to overlay the plots, sorry 🙈 ) That's going up to ~2.5 million elements, where the overhead is ~0.4s. But if we use a larger kernel: def foo(worker_id, index, value):
return (value, value, value, value) Vs the version in Not sure why the baseline is lower on I think that's acceptable, but I could make it optional and off by default? |
This matches the behaviour of multiprocessing.Pool and ThreadPool.