-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmanager.h
121 lines (99 loc) · 3.33 KB
/
manager.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
/**
* @file manager.h
* @author Krzysztof Trzepla
* @copyright (C) 2016 ACK CYFRONET AGH
* @copyright This software is released under the MIT license cited in
* 'LICENSE.txt'
*/
#ifndef ONECLIENT_EVENTS_MANAGER_H
#define ONECLIENT_EVENTS_MANAGER_H
#include "events/declarations.h"
#include "router.h"
#include <atomic>
#include <string>
#include <vector>
namespace one {
class Scheduler;
namespace messages {
class Configuration;
} // namespace messages
namespace client {
class Context;
namespace events {
/**
* @c Manager is reponsible for creating and managing event streams based on
* subscriptions and forwarding events to the associated event streams.
*/
class Manager {
friend class Router;
public:
/**
* Constructor.
* @param context An @c Context instance.
*/
Manager(std::shared_ptr<Context> context);
virtual ~Manager() = default;
/**
* Forwards event to the associated event stream if present.
* @param event An event to be forwarded.
*/
void emit(EventPtr<> event);
/**
* A convenience overload function that creates and emits an event.
* @param args Arguments required to construct an event.
*/
template <class T, class... Args> void emit(Args &&... args);
/**
* Adds subscription and creates associated event stream if not present.
* @param subscription A subscription to be added.
* @return A subscription ID, that can be used to cancel subscription.
*/
virtual std::int64_t subscribe(const Subscription &subscription);
/**
* Adds subscriptions based on the remote configuration.
* @param configuration Remote configuration.
*/
void subscribe(const messages::Configuration &configuration);
/**
* Cancels subscription given by the subscription ID.
* @param subscriptionId ID of subscription which should be removed.
* @return True if subscription was found and cancelled, otherwise false.
*/
virtual bool unsubscribe(std::int64_t subscriptionId);
/**
* Checks whether subscription given by the subscription ID exists.
* @param subscriptionId ID of subscription which existence should be
* checked.
* @return True if subscription exists, otherwise false.
*/
bool existsSubscription(std::int64_t subscriptionId);
/**
* Requests handling of events aggregated in all streams.
*/
void flush();
/**
* Requests handling of events aggregated in the stream.
* @param streamKey A key that identifies a stream that should be flushed.
*/
virtual void flush(StreamKey streamKey);
private:
std::int64_t subscribe(
std::int64_t subscriptionId, const Subscription &subscription);
std::atomic<std::int64_t> m_nextSubscriptionId{0};
Streams m_streams;
Scheduler &m_scheduler;
SequencerManager m_sequencerManager;
SequencerStreamPtr m_sequencerStream;
Router m_router;
tbb::concurrent_hash_map<std::int64_t, SubscriptionHandlePtr> m_handles;
using HandleAcc = typename decltype(m_handles)::accessor;
using HandleConstAcc = typename decltype(m_handles)::const_accessor;
};
template <class T, class... Args> void Manager::emit(Args &&... args)
{
emit(std::make_unique<T>(std::forward<Args>(args)...));
}
} // namespace events
} // namespace client
} // namespace one
#endif // ONECLIENT_EVENTS_MANAGER_H