Skip to content

Commit

Permalink
Addressed the comment of the op state being RAII
Browse files Browse the repository at this point in the history
  • Loading branch information
georgikoyrushki95 committed Jan 19, 2024
1 parent c9d9bc7 commit e124f34
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 19 deletions.
47 changes: 29 additions & 18 deletions include/unifex/filter_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ struct _next_receiver<StreamSender, FilterFunc, Receiver>::type {
op.next_.construct_with([&] {
return unifex::connect(next(op.stream_), next_receiver_t{op});
});
op.nextEngaged_ = true;
unifex::start(op.next_.get());
} else {
unifex::set_value(
Expand Down Expand Up @@ -117,27 +118,37 @@ struct _op<StreamSender, FilterFunc, Receiver>::type {

template <typename StreamSender2, typename FilterFunc2, typename Receiver2>
explicit type(
StreamSender2&& stream, FilterFunc2&& filter, Receiver2&& receiver)
StreamSender2&& stream,
FilterFunc2&& filter,
Receiver2&&
receiver) noexcept(std::
is_nothrow_constructible_v<
StreamSender,
StreamSender2>&&
std::is_nothrow_constructible_v<
FilterFunc,
FilterFunc2>&&
std::is_nothrow_constructible_v<
Receiver,
Receiver2>&&
is_nothrow_connectable_v<
next_sender_t<StreamSender>,
Receiver&>)
: stream_(std::forward<StreamSender2>(stream))
, filter_(std::forward<FilterFunc2>(filter))
, receiver_(std::forward<Receiver2>(receiver)) {}

// Question to @ispeters: This didn't make a lot of sense to me, but now that
// we don't have a union, do we want to remove this?
~type() {} // Due to the union member, this is load-bearing. DO NOT DELETE.
, receiver_(std::forward<Receiver2>(receiver)) {
next_.construct_with(
[&] { return unifex::connect(next(stream_), next_receiver_t{*this}); });
nextEngaged_ = true;
}

void start() noexcept {
UNIFEX_TRY {
next_.construct_with([&] {
return unifex::connect(next(stream_), next_receiver_t{*this});
});
nextEngaged_ = true;
unifex::start(next_.get());
}
UNIFEX_CATCH(...) {
unifex::set_error(std::move(receiver_), std::current_exception());
~type() {
if (nextEngaged_) {
next_.destruct();
}
}

void start() noexcept { unifex::start(next_.get()); }
};

template <typename StreamSender, typename FilterFunc>
Expand Down Expand Up @@ -201,8 +212,8 @@ struct _filter_stream {

template <typename StreamSender, typename FilterFunc>
struct _filter_stream<StreamSender, FilterFunc>::type {
StreamSender stream_;
FilterFunc filter_;
UNIFEX_NO_UNIQUE_ADDRESS StreamSender stream_;
UNIFEX_NO_UNIQUE_ADDRESS FilterFunc filter_;

friend auto tag_invoke(tag_t<next>, type& s) noexcept(
std::is_nothrow_constructible_v<
Expand Down
1 change: 0 additions & 1 deletion test/filter_stream_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include <unifex/range_stream.hpp>
#include <unifex/reduce_stream.hpp>
#include <unifex/sync_wait.hpp>
#include <unifex/task.hpp>
#include <unifex/then.hpp>
#include <unifex/trampoline_scheduler.hpp>
#include <unifex/transform_stream.hpp>
Expand Down

0 comments on commit e124f34

Please sign in to comment.