Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add decorators to modify map function behaviour #8

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

philsmt
Copy link
Collaborator

@philsmt philsmt commented Mar 29, 2021

I've developed a few common patterns when using pasha and its spiritual predecessors, which can add up quite a bit of boilerplate at times and/or not fully work across all context types if implemented lazily.

I always wanted to somehow automatize or at least make these things more convenient, but I was wary of making pasha's somewhat clean API more ugly with keywords and calls and such. Over the weekend, I had an inspiration on how one might get away with this as a purely optional feature. After some tests, it actually turned into a fully featured API to hook into various parts of the mapping process via decorators.

An example for the already implemented decorators:

a) Initialize/finalize a worker scope

import numpy as np
import pasha as psh

outp = psh.alloc(shape=10, dtype=np.int32)

def prepare_some_values(worker_id):
    worker_id_plus_2 = worker_id + 2
    yield

def check_worker_ids(worker_id, scope):
    assert scope['worker_id_plus_2'] == worker_id + 2

@psh.with_init(prepare_some_values)
@psh.with_finalize(check_worker_ids)
def kernel(wid, index, row):
    outp[index] = worker_id_plus_2 - worker_id

psh.map(kernel, np.arange(10))
np.testing.assert_allclose(outp, 2)

b) Special case of a) for local buffers

import numpy as np
import pasha as psh

psh.set_default_context('threads')  # test only works in same process.

buf = np.full(10, -1, dtype=np.int32)
local_bufs = psh.alloc_per_worker((), dtype=object)

@psh.with_local_copies('buf')
def kernel(wid, index, row):
    buf[:] = wid
    np.testing.assert_allclose(buf, wid)
    local_bufs[wid] = buf
    

psh.map(kernel, np.arange(10))
np.testing.assert_allclose(buf, -1)

for wid, local_buf in enumerate(local_bufs):
    np.testing.assert_allclose(local_buf, wid)

c) Automatize reduction

import numpy as np
import pasha as psh

inp = np.tile(np.arange(10), [10, 1])
outp = psh.alloc(like=inp, shape=10, fill=0)

@psh.with_reduction('outp')
def kernel(wid, index, row):
    outp[:] += row

psh.map(kernel, inp)
np.testing.assert_allclose(inp.sum(axis=0), outp)

Any thoughts?

(The MRs compares against the single_alloc_api branch to keep the diff smaller)

@takluyver
Copy link
Member

Can you show how you would achieve the same things without these decorators? Perhaps just for one of the examples to start with - I don't want to take up too much of your time.

I'd like to see the problem you're trying to solve, but my initial reaction is that you're optimising too much for short code rather than clear code. In particular:

  • Systems that assemble a sequence of steps to be executed are always uglier than writing those steps as plain lines of code. This is what we're trying to get away from in online calibration, for instance. Doing it with decorators has an extra ugly quirk, because they run bottom to top, the opposite of how we normally read code.
  • Modifying the global variables as seen by a function: very clever, but surely this is going to make it harder to reason about the code inside the function.

@philsmt
Copy link
Collaborator Author

philsmt commented Mar 29, 2021

My hope is that, by keeping the extra logic outside of the actual implementations, it's more akin to syntactical sugar than built-in features. It's supposed to be unnecessary, not adding something you cannot do without, yet making it shorter to write if it fits. And honestly, one can probably be opiniated in these cases (as with @psh.with_reduction).

Another pattern I like about the decorators is that these details rest with the defined map function, not with the map call. One could define competing map functions, each having slightly different semantics and encode these details within them rather than around the map call.

No worries btw, it's an off-time project currently anyways, all the happier if it's also useful.

Here are recent examples, which actually happened when prototyping REMI calibration code. I won't make the effort to have runnable code, just to make a point:

a) Initializer
This is actually a feature somewhat tricky to do without actual support. I have a class written in Cython, binding a C++ library to Python. Naturally, it requires some buffers etc to set-up and is then fed per-train data. In a pattern which will replicate in b), I cheated by creating the object in the host process, knowing that any modification after forking will lead to COW. This code would horribly break with threads.

sort = sort_class([0, 1, 2, 3, 4, 5, 6])
sort.set_scale_factors(0.34, 0.34, 0.386)

def process_train(worker_id, index, train_id, data):
    ...
    sort.set_data(group)

The new pattern would allow me to properly create a custom object for each worker in an initializer, and if necessary even de-allocate it.

b) Local copies
Very similar to a) and could be done with initializers, but because it already happened frequently (and I cheated it the same way), I want to list it explicitly. The problem here is to require some buffers during iteration (again, typically for interaction with native code). Each worker will need its own buffer, but allocating it per iteration would be expensive and defeat the point of having pre-allocated buffers. Note that this one could do "properly" by per-worker arrays, but with processes I typically went the easy way again of allocating on the heap and letting COW do the rest.

indices = np.empty((100000,), dtype=np.float64)

def process_train(worker_id, index, train_id, data):
    ....
    edge_idx = cfd_native(trace, indices, ...)
    # indices[:edge_idx] contains relevant data

c) Reduction
This one is debatable. I want to compare it to some of the ready-to-go high-level constructs available in cupy or pycuda, which - when fitting the problem - can express it in very few lines of code. It comes down to doing a reduction operation, which can commonly be solved in pasha by having a per-worker reduction buffer, across which is reduced afterwards. Let's say I want to get the average of a run:

avg_spectra = psh.array_per_worker((100000,), dtype=np.float)

def average_run(worker_id, index, train_id, data)::
    avg_spectra[worker_id] += data['my_data_source']

psh.map(average_run, run)
avg_spectra = avg_spectra.sum(axis=0) / len(run.train_ids)

This one's a fairly common pattern for me in near-online analysis, as it can tremendously accelerate this simple operation to get a mean or sum spectra of a run (especially on ONC with its SSD storage). I wrote the lines above countless of times, and often you do a few processing steps on the raw data before adding it.

@takluyver
Copy link
Member

I agree with you that it's nice to have the different steps together. But I don't particularly like gluing them together with decorators.

For the pre/post worker steps, what about if you had the option to supply a worker function rather than a 'kernel', so you tie the steps together inside a single function. Rewriting your first example:

import numpy as np
import pasha as psh

outp = psh.alloc(shape=10, dtype=np.int32)

def worker(functor, share, worker_id):
    worker_id_plus_2 = worker_id + 2
    for index, row in functor.iterate(share):
        outp[index] = worker_id_plus_2 - worker_id
    assert worker_id_plus_2 == worker_id + 2

psh.do_work(worker, np.arange(10))
np.testing.assert_allclose(outp, 2)

This example looks rather ridiculous now, because worker_id_plus_2 is just a local variable. But that's the point! There's nothing special going on, it's just a standard Python variable.

This is possible because you've made a rather nice, general 'functor' interface, so having a 'worker' function with a loop is hardly any more code than a 'kernel' function which pasha calls in a loop. It's also potentially more flexible, e.g. if you want to do some vectorised operations on an array, you might be able to avoid looping, or you can loop over the values twice if that's useful for some reason.

For making local copies, I'd do it explicitly:

def worker(functor, share, worker_id):
    local_buf = buf.copy()
    for index, row in functor.iterate(share):
        local_buf[:] = wid
        np.testing.assert_allclose(buf, wid)
        local_bufs[wid] = local_buf

You could have a helper function to simplify copying several values at once (a_loc, b_loc = psh.copy(a, b)), or to copy except when you're using forked processes, if that's a priority.

There's a bit more going on in the reduction case. Maybe a decorator based solution is useful here - I'm still thinking about how this could be done. One thing that might make it clearer is injecting the special variables as arguments to the kernel/worker function, rather than as globals, so you can see that they're coming from the machinery calling it.

@psh.with_reduction('outp')
def kernel(wid, index, row, outp):

@philsmt
Copy link
Collaborator Author

philsmt commented Apr 6, 2021

Thanks a lot for your feedback! I understand your preference to making it explicit, and as it turns out, this is definitely something one should add (see below.), too.

That being said, I'm still a fan of making some syntactic sugar for it. The inspiration of pasha was the "near-online analysis" situation done during the experiment, where you're craving for exactly these kind of shortcuts to express common idioms. If you remember the map API proposed to EXtra-data, the tight integration was the main point in xts, apply some algorithm to data quick rather than structure it out. Be able to replace either quickly.
I would like to keep this scope of pasha, in the faint hope of actually being in that situation again, just with better tested libraries at my back 😍

This is not meant to dismiss your concerns, but maybe you have another idea on how to somehow make these patterns more convenient without the need to be fully explicit?

For the pre/post worker steps, what about if you had the option to supply a worker function rather than a 'kernel', so you tie the steps together inside a single function. Rewriting your first example:

[...]

This example looks rather ridiculous now, because worker_id_plus_2 is just a local variable. But that's the point! There's nothing special going on, it's just a standard Python variable.

This is possible because you've made a rather nice, general 'functor' interface, so having a 'worker' function with a loop is hardly any more code than a 'kernel' function which pasha calls in a loop. It's also potentially more flexible, e.g. if you want to do some vectorised operations on an array, you might be able to avoid looping, or you can loop over the values twice if that's useful for some reason.

With the concerns voiced in the beginning, your idea of a worker function turned out very wonderful in another portion of the code I'm working 😄 By now, I've split the calculation into several map calls, and once of them is living entirely in Cython. With a worker function, one can eliminate even more Python overhead by moving the whole loop into Cython. And that map call is iterating A LOT.

For making local copies, I'd do it explicitly:

def worker(functor, share, worker_id):
    local_buf = buf.copy()
    for index, row in functor.iterate(share):
        local_buf[:] = wid
        np.testing.assert_allclose(buf, wid)
        local_bufs[wid] = local_buf

You could have a helper function to simplify copying several values at once (a_loc, b_loc = psh.copy(a, b)), or to copy except when you're using forked processes, if that's a priority.

I'm feeling similar to above, it's quite a change for a minor thing as "give me a local copy please". It's kind of what's happening behind the scenes, but I'm wondering if one can make a expressive interface for it.

There's a bit more going on in the reduction case. Maybe a decorator based solution is useful here - I'm still thinking about how this could be done. One thing that might make it clearer is injecting the special variables as arguments to the kernel/worker function, rather than as globals, so you can see that they're coming from the machinery calling it.

@psh.with_reduction('outp')
def kernel(wid, index, row, outp):

That was actually my first version (for all three features), until I figured out the generator trick. I'm not sure I'm liking it anymore though, so this might be a more stable, yet less fancy way to do it.

@philsmt philsmt force-pushed the single_alloc_api branch from e729be7 to 5b57124 Compare April 8, 2021 14:31
Base automatically changed from single_alloc_api to master April 16, 2021 17:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants