Skip to content

Commit

Permalink
Format the first quarter of win32/windows_thread_pool.hpp
Browse files Browse the repository at this point in the history
  • Loading branch information
ispeters committed Oct 31, 2023
1 parent 4d9b3c1 commit 48ae13a
Showing 1 changed file with 148 additions and 140 deletions.
288 changes: 148 additions & 140 deletions include/unifex/win32/windows_thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,201 +15,209 @@
*/
#pragma once

#include <unifex/exception.hpp>
#include <unifex/get_stop_token.hpp>
#include <unifex/manual_lifetime.hpp>
#include <unifex/receiver_concepts.hpp>
#include <unifex/sender_concepts.hpp>
#include <unifex/scheduler_concepts.hpp>
#include <unifex/get_stop_token.hpp>
#include <unifex/sender_concepts.hpp>
#include <unifex/stop_token_concepts.hpp>
#include <unifex/manual_lifetime.hpp>
#include <unifex/exception.hpp>

#include <unifex/win32/filetime_clock.hpp>

#include <windows.h>
#include <threadpoolapiset.h>
#include <windows.h>

#include <utility>
#include <atomic>
#include <cstdio>
#include <exception>
#include <new>
#include <system_error>
#include <atomic>
#include <cstdio>
#include <utility>

#include <unifex/detail/prologue.hpp>

namespace unifex {
namespace win32 {

class windows_thread_pool {
class scheduler;
class schedule_sender;
class schedule_op_base;
template<typename Receiver>
struct _schedule_op {
class type;
};
template<typename Receiver>
using schedule_op = typename _schedule_op<Receiver>::type;

template<typename StopToken>
struct _cancellable_schedule_op_base {
class type;
};
template<typename StopToken>
using cancellable_schedule_op_base = typename _cancellable_schedule_op_base<StopToken>::type;

template<typename Receiver>
struct _cancellable_schedule_op {
class type;
};
template <typename Receiver>
using cancellable_schedule_op = typename _cancellable_schedule_op<Receiver>::type;

template<typename StopToken>
struct _time_schedule_op_base {
class type;
};
template<typename StopToken>
using time_schedule_op_base = typename _time_schedule_op_base<StopToken>::type;

template<typename Receiver>
struct _time_schedule_op {
class type;
};
template<typename Receiver>
using time_schedule_op = typename _time_schedule_op<Receiver>::type;

template<typename Receiver>
struct _schedule_at_op {
class type;
};
template<typename Receiver>
using schedule_at_op = typename _schedule_at_op<Receiver>::type;

template<typename Duration, typename Receiver>
struct _schedule_after_op {
class type;
};
template<typename Duration, typename Receiver>
using schedule_after_op = typename _schedule_after_op<Duration, Receiver>::type;

class schedule_at_sender;

template<typename Duration>
struct _schedule_after_sender {
class type;
};
template<typename Duration>
using schedule_after_sender = typename _schedule_after_sender<Duration>::type;

using clock_type = filetime_clock;
class scheduler;
class schedule_sender;
class schedule_op_base;
template <typename Receiver>
struct _schedule_op {
class type;
};
template <typename Receiver>
using schedule_op = typename _schedule_op<Receiver>::type;

template <typename StopToken>
struct _cancellable_schedule_op_base {
class type;
};
template <typename StopToken>
using cancellable_schedule_op_base =
typename _cancellable_schedule_op_base<StopToken>::type;

template <typename Receiver>
struct _cancellable_schedule_op {
class type;
};
template <typename Receiver>
using cancellable_schedule_op =
typename _cancellable_schedule_op<Receiver>::type;

template <typename StopToken>
struct _time_schedule_op_base {
class type;
};
template <typename StopToken>
using time_schedule_op_base =
typename _time_schedule_op_base<StopToken>::type;

template <typename Receiver>
struct _time_schedule_op {
class type;
};
template <typename Receiver>
using time_schedule_op = typename _time_schedule_op<Receiver>::type;

template <typename Receiver>
struct _schedule_at_op {
class type;
};
template <typename Receiver>
using schedule_at_op = typename _schedule_at_op<Receiver>::type;

template <typename Duration, typename Receiver>
struct _schedule_after_op {
class type;
};
template <typename Duration, typename Receiver>
using schedule_after_op =
typename _schedule_after_op<Duration, Receiver>::type;

class schedule_at_sender;

template <typename Duration>
struct _schedule_after_sender {
class type;
};
template <typename Duration>
using schedule_after_sender = typename _schedule_after_sender<Duration>::type;

using clock_type = filetime_clock;

public:
// Initialise to use the process' default thread-pool.
windows_thread_pool() noexcept;

// Initialise to use the process' default thread-pool.
windows_thread_pool() noexcept;
// Construct to an independend thread-pool with a dynamic number of
// threads that varies between a min and a max number of threads.
explicit windows_thread_pool(
std::uint32_t minThreadCount, std::uint32_t maxThreadCount);

// Construct to an independend thread-pool with a dynamic number of
// threads that varies between a min and a max number of threads.
explicit windows_thread_pool(std::uint32_t minThreadCount, std::uint32_t maxThreadCount);
~windows_thread_pool();

~windows_thread_pool();

scheduler get_scheduler() noexcept;
scheduler get_scheduler() noexcept;

private:
PTP_POOL threadPool_;
PTP_POOL threadPool_;
};

/////////////////////////
// Non-cancellable schedule() operation

class windows_thread_pool::schedule_op_base {
public:
schedule_op_base(schedule_op_base&&) = delete;
schedule_op_base& operator=(schedule_op_base&&) = delete;
schedule_op_base(schedule_op_base&&) = delete;
schedule_op_base& operator=(schedule_op_base&&) = delete;

~schedule_op_base();
~schedule_op_base();

void start() & noexcept;
void start() & noexcept;

protected:
schedule_op_base(windows_thread_pool& pool, PTP_WORK_CALLBACK workCallback);
schedule_op_base(windows_thread_pool& pool, PTP_WORK_CALLBACK workCallback);

private:
TP_CALLBACK_ENVIRON environ_;
PTP_WORK work_;
TP_CALLBACK_ENVIRON environ_;
PTP_WORK work_;
};

template<typename Receiver>
class windows_thread_pool::_schedule_op<Receiver>::type final : public windows_thread_pool::schedule_op_base {
template <typename Receiver>
class windows_thread_pool::_schedule_op<Receiver>::type final
: public windows_thread_pool::schedule_op_base {
public:
template<typename Receiver2>
explicit type(windows_thread_pool& pool, Receiver2&& r)
template <typename Receiver2>
explicit type(windows_thread_pool& pool, Receiver2&& r)
: schedule_op_base(pool, &work_callback)
, receiver_((Receiver2&&)r)
{}
, receiver_((Receiver2 &&) r) {}

private:
static void CALLBACK work_callback(PTP_CALLBACK_INSTANCE, void* workContext, PTP_WORK) noexcept {
auto& op = *static_cast<type*>(workContext);
if constexpr (is_nothrow_callable_v<decltype(unifex::set_value), Receiver>) {
unifex::set_value(std::move(op.receiver_));
} else {
UNIFEX_TRY {
unifex::set_value(std::move(op.receiver_));
} UNIFEX_CATCH (...) {
unifex::set_error(std::move(op.receiver_), std::current_exception());
}
}
}

Receiver receiver_;
static void CALLBACK
work_callback(PTP_CALLBACK_INSTANCE, void* workContext, PTP_WORK) noexcept {
auto& op = *static_cast<type*>(workContext);
if constexpr (is_nothrow_callable_v<
decltype(unifex::set_value),
Receiver>) {
unifex::set_value(std::move(op.receiver_));
} else {
UNIFEX_TRY { unifex::set_value(std::move(op.receiver_)); }
UNIFEX_CATCH(...) {
unifex::set_error(std::move(op.receiver_), std::current_exception());
}
}
}

Receiver receiver_;
};

///////////////////////////
// Cancellable schedule() operation

template<typename StopToken>
template <typename StopToken>
class windows_thread_pool::_cancellable_schedule_op_base<StopToken>::type {
public:
type(type&&) = delete;
type& operator=(type&&) = delete;
type(type&&) = delete;
type& operator=(type&&) = delete;

~type() {
::CloseThreadpoolWork(work_);
::DestroyThreadpoolEnvironment(&environ_);
delete state_;
}
~type() {
::CloseThreadpoolWork(work_);
::DestroyThreadpoolEnvironment(&environ_);
delete state_;
}

protected:
explicit type(windows_thread_pool& pool, bool isStopPossible) {
::InitializeThreadpoolEnvironment(&environ_);
::SetThreadpoolCallbackPool(&environ_, pool.threadPool_);

work_ = ::CreateThreadpoolWork(
isStopPossible ? &stoppable_work_callback : &unstoppable_work_callback,
static_cast<void*>(this),
&environ_);
if (work_ == nullptr) {
DWORD errorCode = ::GetLastError();
::DestroyThreadpoolEnvironment(&environ_);
throw_(
std::system_error{static_cast<int>(errorCode), std::system_category(), "CreateThreadpoolWork()"});
}

if (isStopPossible) {
state_ = new (std::nothrow) std::atomic<std::uint32_t>(not_started);
if (state_ == nullptr) {
::CloseThreadpoolWork(work_);
::DestroyThreadpoolEnvironment(&environ_);
throw_(std::bad_alloc{});
}
} else {
state_ = nullptr;
}
explicit type(windows_thread_pool& pool, bool isStopPossible) {
::InitializeThreadpoolEnvironment(&environ_);
::SetThreadpoolCallbackPool(&environ_, pool.threadPool_);

work_ = ::CreateThreadpoolWork(
isStopPossible ? &stoppable_work_callback : &unstoppable_work_callback,
static_cast<void*>(this),
&environ_);
if (work_ == nullptr) {
DWORD errorCode = ::GetLastError();
::DestroyThreadpoolEnvironment(&environ_);
throw_(std::system_error{
static_cast<int>(errorCode),
std::system_category(),
"CreateThreadpoolWork()"});
}

if (isStopPossible) {
state_ = new (std::nothrow) std::atomic<std::uint32_t>(not_started);
if (state_ == nullptr) {
::CloseThreadpoolWork(work_);
::DestroyThreadpoolEnvironment(&environ_);
throw_(std::bad_alloc{});
}
} else {
state_ = nullptr;
}

}
// clang-format off
void start_impl(const StopToken& stopToken) & noexcept {
if (state_ != nullptr) {
// Short-circuit all of this if stopToken.stop_requested() is already true.
Expand Down

0 comments on commit 48ae13a

Please sign in to comment.