Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add filter_stream #594

Merged
merged 10 commits into from
Jan 22, 2024
Merged

Conversation

georgikoyrushki95
Copy link
Contributor

@georgikoyrushki95 georgikoyrushki95 commented Jan 4, 2024

Description

I did not see a filter_stream operation for streams, which is quite convenient when wanting to skip certain asynchronous values. An illustrative example below (something I encounter quite often in my work):

unifex::task<size_t> getFooSum(/* some params here */)
{
    auto request = /* build request from params */

    auto streamOfMultipartResponses = someRPCClient->sendRequest(request);

    auto fooValueSum = std::move(streamOfMultipartResponses)
         // heartbeat responses keep the connection alive, but they carry no useful data
         | unifex::filter_stream([](auto resp){ resp.isHeartbeatResponse(); })
         // actual data is in fooResponse
         | unifex::reduce_stream(0, [](auto resp){ resp.fooResponse().fooValue(); });
    co_return co_await fooValueSum;
}

Most of the code for the filter_stream is adapter from reduce_stream. The difference is the next_receiver. reduce_stream collapses everything into a single value within the operation_state:

template <typename... Values>
void set_value(Values... values) && noexcept {
  // stuff happening before
  UNIFEX_TRY {
    // collapse (or reduce) all values into the op state & request the next value in the stream
    op.state_ =
        std::invoke(op.reducer_, std::move(op.state_), (Values &&) values...);
    unifex::activate_union_member_with(op.next_, [&] {
      return unifex::connect(next(op.stream_), next_receiver_t{op});
    });
    unifex::start(op.next_.get());
  }
  // stuff happening after
}

On the other hand, the filter_stream, either returns the current sender (if the predicate matches) or requests the next one:

template <typename... Values>
void set_value(Values... values) && noexcept {
  // stuff happening before
  UNIFEX_TRY {
    const bool doFilter = !std::invoke(op.filter_, (Values &&) values...);
    // if predicate matches, just return the sender
    if (doFilter) {
      unifex::activate_union_member_with(op.next_, [&] {
        return unifex::connect(next(op.stream_), next_receiver_t{op});
      });
      unifex::start(op.next_.get());
    } else {
      unifex::set_value(std::move(op.receiver_), std::move(values)...);
    }
  }
  // stuff happening after
}

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Jan 4, 2024
Copy link
Contributor

@ispeters ispeters left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great idea, thanks for the contribution!

I started to review the code, but I eventually realized that I think you've confused yourself by modelling filter_stream off reduce_stream so it's probably more useful to give high-level feedback first.

filter_stream maps a Stream to a Stream, but reduce_stream maps a Stream to a Sender, which means they have some significant structural differences. The filter_stream Stream should have:

  • next(), which returns an adapted next-sender, and
  • cleanup(), which should return the underlying Stream's cleanup-sender unmodified (you've got this part right already).

The next-sender should connect the underlying Stream's next-sender to an "internal" next-receiver with the following:

  • set_error and set_done that are pass-through (error and end-of-stream don't need filtering), and
  • set_value that implements the core of the algorithm by choosing between forwarding the underlying Stream's value pack or skipping the current value pack by connecting and starting the next next-sender from the underlying Stream.
    • Any errors that happen during set_value should be propagated out of the next-sender, not forwarded to a cleanup-sender—this is different from reduce_stream.

Some things I'd love to see in a test suite to confirm we've got things right:

  • a throwing predicate that actually throws
  • an underlying Stream whose next-sender throws from connect
  • connecting the next-sender to a Receiver whose set_value throws an exception

I'd also like to understand what happens if you filter out many values in a row when the underlying next-sender is synchronous (e.g. the predicate is return false;). I suspect you'll get stack exhaustion (from the next-operation's start() invoking the next-receiver's set_value(), leading to a recursive call to the next-operation's start()) and I wonder if there's any way to build a trampoline that supports unlimited filtration of long sequences.

Comment on lines 308 to 309
using value_types = typename next_sender_t<
StreamSender>::template value_types<Variant, Tuple>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use sender_value_types_t in case the sender's traits are provided by specialization of sender_traits<> instead of as declared members within the sender.

Suggested change
using value_types = typename next_sender_t<
StreamSender>::template value_types<Variant, Tuple>;
using value_types = sender_value_types_t<next_sender_t<
StreamSender>, Variant, Tuple>;

Comment on lines 312 to 313
using error_types =
typename next_sender_t<StreamSender>::template error_types<Variant>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a song-and-dance you need to do to ensure this set includes std::exception_ptr because the next-sender may not declare it, but you definitely produce std::exception_ptrs when you catch-and-forward exceptions. You also need to use sender_error_types_t for reasons similar to why you need sender_value_types_t, above.

Suggested change
using error_types =
typename next_sender_t<StreamSender>::template error_types<Variant>;
using error_types = typename concat_type_lists_unique_t<
sender_error_types_t<next_sender_t<StreamSender>, type_list>,
type_list<std::exception_ptr>>::template apply<Variant>;

constexpr auto operator()(FilterFunc&& filterFunc) const
noexcept(is_nothrow_callable_v<tag_t<bind_back>, _fn, FilterFunc>)
-> bind_back_result_t<_fn, FilterFunc> {
return bind_back(*this, (FilterFunc &&) filterFunc);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return bind_back(*this, (FilterFunc &&) filterFunc);
return bind_back(*this, std::forward<FilterFunc>(filterFunc));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my understanding, isn't (FilterFunc &&) filterFunc the same as std::forward<FilterFunc>(filterFunc)? The reference collapsing will end up producing the same reference type, based on my understanding.

I have seen in the code a lot of usages of the (T&&) t style, when forwarding references are involved.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the same, yes, and the existing uses of (T&&) t instead of std::forward<T>(t) are there because of concerns about compile times (the C-style cast doesn't require a function template instantiation but the std::forward version does). We decided recently, though (maybe 4 months ago?), that the difference in compile time didn't merit the loss in clarity and safety so we're planning to slowly update the code as we visit files, and to follow the new style in new code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed these and other places, basically:

  1. T&& where T&& is a fwd reference -> changed to std::forward.
  2. T&& where T&& is an rvalue reference -> changed to std::move (there are a few of those, too).

Comment on lines 365 to 371

struct _fn {
// TODO add a type requirement that filterFunc's return is boolean
template <typename StreamSender, typename FilterFunc>
auto operator()(StreamSender&& stream, FilterFunc&& filterFunc) const {
return typename _filter_stream<StreamSender, FilterFunc>::type{
(StreamSender &&) stream, (FilterFunc &&) filterFunc};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that passing either stream or filterFunc as an lvalue will result in StreamSender or FilterFunc, respectively, being deduced as an lvalue reference, which will result in the returned filter_stream<...>::type having members of reference type, which is probably a bad idea. Instead, _filter_stream should be instantiated with cv-ref-unqualified types, and then the members move- or copy-constructed from these arguments, as appropriate.

Also, let's make this conditionally noexcept.

Suggested change
struct _fn {
// TODO add a type requirement that filterFunc's return is boolean
template <typename StreamSender, typename FilterFunc>
auto operator()(StreamSender&& stream, FilterFunc&& filterFunc) const {
return typename _filter_stream<StreamSender, FilterFunc>::type{
(StreamSender &&) stream, (FilterFunc &&) filterFunc};
template <typename StreamSender, typename FilterFunc>
using filter_stream =
typename _filter_stream<remove_cvref_t<StreamSender>, remove_cvref_t<FilterFunc>>::type;
struct _fn {
// TODO add a type requirement that filterFunc's return is boolean
template <typename StreamSender, typename FilterFunc>
auto operator()(StreamSender&& stream, FilterFunc&& filterFunc) const
noexcept(std::is_nothrow_constructible_v<filter_stream<StreamSender, FilterFunc>, StreamSender, FilterFunc>)
-> filter_stream<StreamSender, FilterFunc>, StreamSender, FilterFunc> {
return filter_stream<StreamSender, FilterFunc>, StreamSender, FilterFunc>{
std::forward<StreamSender>(stream), std::forward<FilterFunc>(filterFunc)};

@ispeters
Copy link
Contributor

ispeters commented Jan 4, 2024

I thought of some more test cases that'd be good to have:

  • an underlying Stream of something move-only (maybe unique_ptr<int>, and the filter removes nulls, or something)
  • an underlying Stream of references

@georgikoyrushki95
Copy link
Contributor Author

georgikoyrushki95 commented Jan 5, 2024

I started to review the code, but I eventually realized that I think you've confused yourself by modelling filter_stream off reduce_stream so it's probably more useful to give high-level feedback first.

@ispeters Thanks so much for the feedback & outlining how the filter stream should behave. A few questions to make sure I get it right, all related to the next_receiver of the filter_stream:

  1. set_value(...) : if an exception happens (e.g. during the call of the filter function, next sender throwing in connect, etc.), we don't handle it at all, right: just let it pass through? That's my interpretation of what you said + how then(), transform_stream(), etc., seem to behave. However, set_value would not be noexcept then?

  2. set_error and set_done just call & propagate the input to set_error and set_done of the underlying receiver (not next_receiver), but the one our clients connect with?

@georgikoyrushki95
Copy link
Contributor Author

I'd also like to understand what happens if you filter out many values in a row when the underlying next-sender is synchronous (e.g. the predicate is return false;)...

Is this not what reduce_stream potentially suffers from as well? Over there it's not even conditional, but the recursion takes place for the whole range.

@georgikoyrushki95
Copy link
Contributor Author

Some things I'd love to see in a test suite to confirm we've got things right: ...

So in all those cases you listed, there's an exception being thrown within next_receiver::set_value(...). Assuming we don't handle it & let it propagate, what should we verify in our tests? I tried doing this for transform, where I throw an error in the function & the code just crashed:

TEST(filter_stream, TransformThrows) {
  range_stream{1, 11} |
  transform_stream([](int val) {
    if (val % 2 == 0) {
      throw;  // when this happens, there's an uncaught exception & the process terminates
    }
    return val * 2;
  }) |
  for_each([](int el){
    std::cout << "el=" << el << std::endl;
  }) | sync_wait();
}

Wondering if above is correct? Don't we just want to catch it & set_error: in that case the for_each (which in fact is backed-up by a reduce_stream) can handle it & recover appropriately.

@ispeters
Copy link
Contributor

ispeters commented Jan 5, 2024

  1. set_value(...) : if an exception happens (e.g. during the call of the filter function, next sender throwing in connect, etc.), we don't handle it at all, right: just let it pass through? That's my interpretation of what you said + how then(), transform_stream(), etc., seem to behave. However, set_value would not be noexcept then?

I would expect a UNIFEX_TRY {} UNIFEX_CATCH(...) {} in set_value; in the catch block, I'd expect set_error(std::move(outerReceiver), std::current_exception()). Since all exceptions are thus handled, the set_value function itself can be noexcept.

  1. set_error and set_done just call & propagate the input to set_error and set_done of the underlying receiver (not next_receiver), but the one our clients connect with?

Correct.

I'd also like to understand what happens if you filter out many values in a row when the underlying next-sender is synchronous (e.g. the predicate is return false;)...

Is this not what reduce_stream potentially suffers from as well? Over there it's not even conditional, but the recursion takes place for the whole range.

Yes, reduce_stream does potentially suffer from the same problem. If we're lucky, the code triggers a tail-call optimization and the compiler saves us from stack exhaustion; if we're unlucky, that doesn't happen. I don't know which case we're in—I and my team inherited maintainership of this repo and the Streams code is still the least well understood—so I'd just like to understand so we can document it. If we're in the unlucky case, I think it'd be worth merging anyway, and then spending some time thinking about whether a trampoline can be built to remove the limitation as a later improvement.

Some things I'd love to see in a test suite to confirm we've got things right: ...

So in all those cases you listed, there's an exception being thrown within next_receiver::set_value(...). Assuming we don't handle it & let it propagate, what should we verify in our tests?

Maybe something like this?

TEST(filter_stream, FilterThrows) {
  auto st = range_stream{1, 11} | filter_stream([](int) -> bool { throw 42; });

  EXPECT_THROW(sync_wait(next(st)), int);
}

I tried doing this for transform, where I throw an error in the function & the code just crashed:

TEST(filter_stream, TransformThrows) {
  range_stream{1, 11} |
  transform_stream([](int val) {
    if (val % 2 == 0) {
      throw;  // when this happens, there's an uncaught exception & the process terminates
    }
    return val * 2;
  }) |
  for_each([](int el){
    std::cout << "el=" << el << std::endl;
  }) | sync_wait();
}

Wondering if above is correct?

If that code is exactly what crashed, then I'd expect your problem to be that throw; with no active exception leads to undefined behaviour. If you actually threw something (say, throw 42;), then I'd expect the transform_stream's next-sender to complete with set_error with a std::exception_ptr, and that the exception should bubble along the error channel until the terminal sync_wait converts it back to a thrown exception.

Don't we just want to catch it & set_error: in that case the for_each (which in fact is backed-up by a reduce_stream) can handle it & recover appropriately.

Yes, I think we want that.

@georgikoyrushki95
Copy link
Contributor Author

@ispeters I believe we're ready for another round of reviews.

Appears your concern about stack exhaustion can be addressed with via_stream(trampoline_scheduler{}), so I added a test for this.

The only thing I haven't done (to my knowledge) is:

an underlying Stream of references

Not entirely sure how that'd look.

Copy link
Contributor

@ispeters ispeters left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay in reviewing this; it's looking good, but I think there are a few more changes needed.

return unifex::next(underlyingStream_);
}

auto cleanup() { unifex::cleanup(underlyingStream_); }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this should return the result of cleanup(); I suppose it doesn't much matter since you're never invoking it.

Suggested change
auto cleanup() { unifex::cleanup(underlyingStream_); }
auto cleanup() { return unifex::cleanup(underlyingStream_); }

Comment on lines 212 to 216
friend auto tag_invoke(tag_t<next>, type& s) {
return _filter::sender<StreamSender, FilterFunc>{s.stream_, s.filter_};
}

friend auto tag_invoke(tag_t<cleanup>, type& s) { return cleanup(s.stream_); }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make these conditionally noexcept.

Suggested change
friend auto tag_invoke(tag_t<next>, type& s) {
return _filter::sender<StreamSender, FilterFunc>{s.stream_, s.filter_};
}
friend auto tag_invoke(tag_t<cleanup>, type& s) { return cleanup(s.stream_); }
friend auto tag_invoke(tag_t<next>, type& s)
noexcept(std::is_nothrow_constructible_v<_filter::sender<StreamSender, FilterFunc>, StreamSender&, FilterFunc&> {
return _filter::sender<StreamSender, FilterFunc>{s.stream_, s.filter_};
}
friend auto tag_invoke(tag_t<cleanup>, type& s) noexcept(noexcept(cleanup(s.stream_)) {
return cleanup(s.stream_);
}

Comment on lines 209 to 210
StreamSender stream_;
FilterFunc filter_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these can both be [[no_unique_address]].

Suggested change
StreamSender stream_;
FilterFunc filter_;
UNIFEX_NO_UNIQUE_ADDRESS StreamSender stream_;
UNIFEX_NO_UNIQUE_ADDRESS FilterFunc filter_;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is C++20 btw, will C++17 targets build with it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The macro expands to [[no_unique_address]] when the compiler supports it and to the empty string when it's not supported.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies, I misread what the current code is. I see what you mean now.

sender_error_types_t<next_sender_t<StreamSender>, type_list>,
type_list<std::exception_ptr>>::template apply<Variant>;

static constexpr bool sends_done = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be the same as next_sender_t<StreamSender>, no?

Suggested change
static constexpr bool sends_done = false;
static constexpr bool sends_done = sender_traits<next_sender_t<StreamSender>>::sends_done;

Comment on lines 125 to 147
template <typename StreamSender2, typename FilterFunc2, typename Receiver2>
explicit type(
StreamSender2&& stream, FilterFunc2&& filter, Receiver2&& receiver)
: stream_(std::forward<StreamSender2>(stream))
, filter_(std::forward<FilterFunc2>(filter))
, receiver_(std::forward<Receiver2>(receiver)) {}

// Question to @ispeters: This didn't make a lot of sense to me, but now that
// we don't have a union, do we want to remove this?
~type() {} // Due to the union member, this is load-bearing. DO NOT DELETE.

void start() noexcept {
UNIFEX_TRY {
next_.construct_with([&] {
return unifex::connect(next(stream_), next_receiver_t{*this});
});
unifex::start(next_.get());
}
UNIFEX_CATCH(...) {
unifex::set_error(std::move(receiver_), std::current_exception());
}
}
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should connect the underlying stream's next-sender into next_ in the operation state's constructor; that way, this type will have the invariant that next_ always contains a fully-constructed value, which means this type's destructor can unconditionally destruct next_, which will, I think, simplify some lifetime concerns in the next-receiver.

Suggested change
template <typename StreamSender2, typename FilterFunc2, typename Receiver2>
explicit type(
StreamSender2&& stream, FilterFunc2&& filter, Receiver2&& receiver)
: stream_(std::forward<StreamSender2>(stream))
, filter_(std::forward<FilterFunc2>(filter))
, receiver_(std::forward<Receiver2>(receiver)) {}
// Question to @ispeters: This didn't make a lot of sense to me, but now that
// we don't have a union, do we want to remove this?
~type() {} // Due to the union member, this is load-bearing. DO NOT DELETE.
void start() noexcept {
UNIFEX_TRY {
next_.construct_with([&] {
return unifex::connect(next(stream_), next_receiver_t{*this});
});
unifex::start(next_.get());
}
UNIFEX_CATCH(...) {
unifex::set_error(std::move(receiver_), std::current_exception());
}
}
};
template <typename StreamSender2, typename FilterFunc2, typename Receiver2>
explicit type(
StreamSender2&& stream, FilterFunc2&& filter, Receiver2&& receiver)
noexcept(
std::is_nothrow_constructible_v<StreamSender, StreamSender2>
&& std::is_nothrow_constructible_v<FilterFunc, FilterFunc2>
&& std::is_nothrow_constructible_v<Receiver, Receiver2>
&& is_nothrow_connectable_v<next_sender_t<StreamSender>, Receiver&>)
: stream_(std::forward<StreamSender2>(stream))
, filter_(std::forward<FilterFunc2>(filter))
, receiver_(std::forward<Receiver2>(receiver)) {
next_.construct_with([&] {
return unifex::connect(next(stream_), next_receiver_t{*this});
});
}
~type() {
next_.destruct();
}
void start() noexcept {
unifex::start(next_.get());
}
};

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After commenting on the next-receiver about relying on this supposed invariant, I realize it's not maintained when connect can throw so we'll need a bool to keep track of whether next_ has a value.

Suggested change
template <typename StreamSender2, typename FilterFunc2, typename Receiver2>
explicit type(
StreamSender2&& stream, FilterFunc2&& filter, Receiver2&& receiver)
: stream_(std::forward<StreamSender2>(stream))
, filter_(std::forward<FilterFunc2>(filter))
, receiver_(std::forward<Receiver2>(receiver)) {}
// Question to @ispeters: This didn't make a lot of sense to me, but now that
// we don't have a union, do we want to remove this?
~type() {} // Due to the union member, this is load-bearing. DO NOT DELETE.
void start() noexcept {
UNIFEX_TRY {
next_.construct_with([&] {
return unifex::connect(next(stream_), next_receiver_t{*this});
});
unifex::start(next_.get());
}
UNIFEX_CATCH(...) {
unifex::set_error(std::move(receiver_), std::current_exception());
}
}
};
bool nextEngaged_{false};
template <typename StreamSender2, typename FilterFunc2, typename Receiver2>
explicit type(
StreamSender2&& stream, FilterFunc2&& filter, Receiver2&& receiver)
noexcept(
std::is_nothrow_constructible_v<StreamSender, StreamSender2>
&& std::is_nothrow_constructible_v<FilterFunc, FilterFunc2>
&& std::is_nothrow_constructible_v<Receiver, Receiver2>
&& is_nothrow_connectable_v<next_sender_t<StreamSender>, Receiver&>)
: stream_(std::forward<StreamSender2>(stream))
, filter_(std::forward<FilterFunc2>(filter))
, receiver_(std::forward<Receiver2>(receiver)) {
next_.construct_with([&] {
return unifex::connect(next(stream_), next_receiver_t{*this});
});
nextEngaged_ = true;
}
~type() {
if (nextEngaged_) {
next_.destruct();
}
}
void start() noexcept {
unifex::start(next_.get());
}
};

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I'm not seeing is when we connect to the next receiver & that potentially throws (assuming also the constructor is noexcept(false)), why don't we set_error(...) on the receiver?

Also with your version, it is not the case that start is noexcept.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I'm not seeing is when we connect to the next receiver & that potentially throws (assuming also the constructor is noexcept(false)), why don't we set_error(...) on the receiver?

The receiver contract says that, once you've invoked start() on an operation state, exactly one of the receiver's completion methods will be invoked exactly once (with an exception for when set_value throws an exception). If the operation state's constructor throws an exception (i.e. if connect throws) then you haven't invoked start(), yet, so it's wrong to invoke any completion methods on the receiver.

Also with your version, it is not the case that start is noexcept.

Why? unifex::start(auto& op) is required to be noexcept so unifex::start(next_.get()) ought to be noexcept.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining :) I wasn't familiar with those, makes sense now.

#endif

template <typename... Values>
void set_value(Values... values) && noexcept {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should take its arguments as forwarding references so that the value category and constness can be preserved and thus match the declared types in the sender's value_types alias.

Suggested change
void set_value(Values... values) && noexcept {
void set_value(Values&&... values) && noexcept {

Comment on lines 76 to 92
op.next_.destruct();
UNIFEX_TRY {
const bool doFilter =
!std::invoke(op.filter_, std::forward<Values>(values)...);

if (doFilter) {
op.next_.construct_with([&] {
return unifex::connect(next(op.stream_), next_receiver_t{op});
});
unifex::start(op.next_.get());
} else {
unifex::set_value(std::move(op.receiver_), std::move(values)...);
}
}
UNIFEX_CATCH(...) {
unifex::set_error(std::move(op.receiver_), std::current_exception());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we rely on the operation state's destructor to destroy next_, the lifetime of values... as forwarding references can be made safe (i.e. we can forward the values into set_value even if they happen to refer to values stored somewhere in next_).

Unfortunately, I've realized while typing this out that, because the call to connect might throw, we need a bool to keep track of whether next_ contains a value to be destructed in op's destructor. :(

Suggested change
op.next_.destruct();
UNIFEX_TRY {
const bool doFilter =
!std::invoke(op.filter_, std::forward<Values>(values)...);
if (doFilter) {
op.next_.construct_with([&] {
return unifex::connect(next(op.stream_), next_receiver_t{op});
});
unifex::start(op.next_.get());
} else {
unifex::set_value(std::move(op.receiver_), std::move(values)...);
}
}
UNIFEX_CATCH(...) {
unifex::set_error(std::move(op.receiver_), std::current_exception());
}
UNIFEX_TRY {
const bool doFilter =
!std::invoke(op.filter_, std::as_const(values)...);
if (doFilter) {
op.next_.destruct();
op.nextEngaged_ = false;
op.next_.construct_with([&] {
return unifex::connect(next(op.stream_), next_receiver_t{op});
});
op.nextEngaged_ = true;
unifex::start(op.next_.get());
} else {
unifex::set_value(std::move(op.receiver_), std::forward<Values>(values)...);
}
}
UNIFEX_CATCH(...) {
unifex::set_error(std::move(op.receiver_), std::current_exception());
}

Comment on lines 95 to 110
void set_done() && noexcept {
auto& op = op_;
op.next_.destruct();
unifex::set_done(std::move(op.receiver_));
}

void set_error(std::exception_ptr ex) && noexcept {
auto& op = op_;
op.next_.destruct();
unifex::set_error(std::move(op.receiver_), std::move(ex));
}

template <typename Error>
void set_error(Error&& e) && noexcept {
std::move(*this).set_error(make_exception_ptr(std::forward<Error>(e)));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These can all be simplified by relying on op's destructor to destroy next_.

Suggested change
void set_done() && noexcept {
auto& op = op_;
op.next_.destruct();
unifex::set_done(std::move(op.receiver_));
}
void set_error(std::exception_ptr ex) && noexcept {
auto& op = op_;
op.next_.destruct();
unifex::set_error(std::move(op.receiver_), std::move(ex));
}
template <typename Error>
void set_error(Error&& e) && noexcept {
std::move(*this).set_error(make_exception_ptr(std::forward<Error>(e)));
}
void set_done() && noexcept {
unifex::set_done(std::move(op_.receiver_));
}
template <typename Error>
void set_error(Error&& e) && noexcept {
unifex::set_error(std::move(op.receiver_), std::forward<Error>(e));
}

@ispeters
Copy link
Contributor

For a stream of references, I think you could construct a stream something like the below:

std::array<int, 5> ints{1, 2, 3, 4, 5};

auto evens = range_stream{0, 4} | transform_stream([&](int index) -> int& {
  return ints[index];
}) | filter_stream([](int value) { return value % 2 == 0; });

I don't know how well streams of references are supported so I'm not sure what kind of test case you could write with the above stream.

@georgikoyrushki95
Copy link
Contributor Author

For a stream of references, I think you could construct a stream something like the below:

std::array<int, 5> ints{1, 2, 3, 4, 5};

auto evens = range_stream{0, 4} | transform_stream([&](int index) -> int& {
  return ints[index];
}) | filter_stream([](int value) { return value % 2 == 0; });

I don't know how well streams of references are supported so I'm not sure what kind of test case you could write with the above stream.

I see your point! You can take a look at TEST(filter_stream, StreamOfReferences): a test where after a filter, we ensure the filtered values refer to the original source.

@georgikoyrushki95
Copy link
Contributor Author

To my best knowledge, the comments are addressed.

@georgikoyrushki95
Copy link
Contributor Author

Is there a way in my own time I can trigger this build? So I can know, if it's failing or not & fix it while I am making changes.

Copy link
Contributor

@ispeters ispeters left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Just a couple more changes and we should be able to merge this.

template <typename... Values>
void set_value(Values&&... values) && noexcept {
auto& op = op_;
op.next_.destruct();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll have to delete this—it'll lead to double-deletes (either on line 81 when this value is filtered out, or in the op-state destructor) and it will sometimes lead to use-after-free when values... are references to objects stored inside next_.

Suggested change
op.next_.destruct();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Damn nice catch!

Comment on lines 108 to 138
StreamSender& stream_;
FilterFunc filter_;
Receiver receiver_;

using next_receiver_t = next_receiver<StreamSender, FilterFunc, Receiver>;
using next_op =
manual_lifetime<next_operation_t<StreamSender, next_receiver_t>>;

next_op next_;
bool nextEngaged_{false};

template <typename StreamSender2, typename FilterFunc2, typename Receiver2>
explicit type(
StreamSender2&& stream,
FilterFunc2&& filter,
Receiver2&&
receiver) noexcept(std::
is_nothrow_constructible_v<
StreamSender,
StreamSender2>&&
std::is_nothrow_constructible_v<
FilterFunc,
FilterFunc2>&&
std::is_nothrow_constructible_v<
Receiver,
Receiver2>&&
is_nothrow_connectable_v<
next_sender_t<StreamSender>,
Receiver&>)
: stream_(std::forward<StreamSender2>(stream))
, filter_(std::forward<FilterFunc2>(filter))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It occurs to me that we've over-complicated this constructor. Sorry for the review noise.

filter_ can be a reference back out to the value that sits in the stream that owns it, just like stream_ is. And there's no need to "construct" references from perfectly-forwarded arguments.

Suggested change
StreamSender& stream_;
FilterFunc filter_;
Receiver receiver_;
using next_receiver_t = next_receiver<StreamSender, FilterFunc, Receiver>;
using next_op =
manual_lifetime<next_operation_t<StreamSender, next_receiver_t>>;
next_op next_;
bool nextEngaged_{false};
template <typename StreamSender2, typename FilterFunc2, typename Receiver2>
explicit type(
StreamSender2&& stream,
FilterFunc2&& filter,
Receiver2&&
receiver) noexcept(std::
is_nothrow_constructible_v<
StreamSender,
StreamSender2>&&
std::is_nothrow_constructible_v<
FilterFunc,
FilterFunc2>&&
std::is_nothrow_constructible_v<
Receiver,
Receiver2>&&
is_nothrow_connectable_v<
next_sender_t<StreamSender>,
Receiver&>)
: stream_(std::forward<StreamSender2>(stream))
, filter_(std::forward<FilterFunc2>(filter))
StreamSender& stream_;
FilterFunc& filter_;
Receiver receiver_;
using next_receiver_t = next_receiver<StreamSender, FilterFunc, Receiver>;
using next_op =
manual_lifetime<next_operation_t<StreamSender, next_receiver_t>>;
next_op next_;
bool nextEngaged_{false};
template <typename Receiver2>
explicit type(
StreamSender& stream,
FilterFunc& filter,
Receiver2&&
receiver) noexcept(std::is_nothrow_constructible_v<
Receiver,
Receiver2>&&
is_nothrow_connectable_v<
next_sender_t<StreamSender>,
Receiver&>)
: stream_(stream)
, filter_(filter)

@ispeters
Copy link
Contributor

Is there a way in my own time I can trigger this build? So I can know, if it's failing or not & fix it while I am making changes.

Once we merge this PR you'll no longer be a "first-time contributor" and GitHub will run the CI on your subsequent PRs without waiting for us to click a button. Other than that, you'll need a local repro, I guess? I'm not sure; my usual workflow is to start with an internal diff that runs CI against our internal builds, and then I publish a PR—this usually leads to a fairly clean iteration cycle.

Copy link
Contributor

@ispeters ispeters left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, thanks!

@ispeters ispeters merged commit daae513 into facebookexperimental:main Jan 22, 2024
47 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants