3 #include "BackendComponents.hpp"
4 #include "ValueStream.hpp"
6 #include <Radiant/BGThreadExecutor.hpp>
8 #include <folly/MoveWrapper.h>
18 template <
typename Generator>
22 static_assert(IsGenerator<Generator>::value,
23 "Need to use Generator as a type parameter "
24 "for GeneratorValueStream.");
25 typedef typename Generator::ValueType ValueType;
26 using GeneratorPtr = std::shared_ptr<Generator>;
39 virtual folly::Future<ValueType>
next()
override;
52 void setCallback(std::function<
void(
const ValueType&)> f);
58 const QByteArray & key = QByteArray());
60 void removeAddedGenerators(
const QByteArray & key);
66 bool isPauseValue(
const ValueType& v);
67 void initFuture(
size_t i);
68 void futureDone(
size_t i,
const ValueType& ev);
72 std::vector<GeneratorPtr> m_generators;
73 std::shared_ptr<folly::Executor> m_executor;
75 std::function<bool(const ValueType&)> m_syncTest;
76 std::function<void(const ValueType&)> m_onValue;
80 mutable std::recursive_mutex m_mutex;
81 std::atomic<bool> m_init;
86 std::vector<folly::Future<ValueType>> m_futures;
87 std::vector<bool> m_isPaused;
88 std::vector<bool> m_futuresPending;
89 std::vector<bool> m_isDisabled;
93 size_t m_promisesWithoutFutures;
96 size_t m_generatorsPending;
98 size_t m_futuresWaited;
100 size_t m_disabledGenerators;
102 std::queue<folly::Promise<ValueType>> m_promises;
104 std::map<QByteArray, std::vector<size_t> > m_addedGenerators;
109 template <
typename Generator>
114 m_promisesWithoutFutures(0),
115 m_generatorsPending(0),
117 m_disabledGenerators(0)
121 template<
typename Generator>
124 if(m_promisesWithoutFutures > 0)
127 for(
size_t i = 0; i < m_futures.size(); ++i) {
128 if(m_futuresPending[i] && !m_promises.empty()) {
129 auto promise = folly::makeMoveWrapper(m_promises.front());
132 std::move(m_futures[i]).thenValue([promise] (
const ValueType& ev)
mutable {
133 promise->setValue(ev);
141 while(!m_promises.empty()) {
142 auto promise = folly::makeMoveWrapper(m_promises.front());
145 if (m_disabledGenerators < m_generators.size()) {
147 while (m_isDisabled[i])
148 i = (i+1) % m_generators.size();
151 auto gen = m_generators[i];
152 auto future = gen->next(m_executor.get());
155 std::move(future).thenValue([promise, gen] (ValueType ev)
mutable {
156 promise->setValue(ev);
159 i = (i+1) % m_generators.size();
163 template <
typename Generator>
167 m_generators.push_back(generator);
172 template<
typename Generator>
176 std::unique_lock<std::recursive_mutex> lck(m_mutex);
184 std::unique_lock<std::recursive_mutex> lck(m_mutex);
186 assert(m_futuresWaited > 0);
189 bool fulfillPromise =
true;
191 assert(!m_isPaused[i]);
192 assert(m_futuresPending[i]);
193 m_futuresPending[i] =
false;
195 if(isPauseValue(ev)) {
196 ++m_generatorsPending;
197 m_isPaused[i] =
true;
198 fulfillPromise = (m_generators.size() - m_disabledGenerators) == m_generatorsPending;
201 bool setPromise =
false;
202 folly::Promise<ValueType> promise;
206 if(m_promisesWithoutFutures > 0 || m_promises.empty()) {
207 folly::Promise<ValueType> promise;
208 promise.setValue(ev);
209 m_promises.emplace(std::move(promise));
210 ++m_promisesWithoutFutures;
212 promise = std::move(m_promises.front());
218 if(isPauseValue(ev) && fulfillPromise) {
219 m_generatorsPending = 0;
220 std::fill(m_isPaused.begin(), m_isPaused.end(),
false);
223 size_t promises = m_promises.size();
224 if(promises > 0 && m_promisesWithoutFutures == 0) {
225 if(!isPauseValue(ev)) {
231 for(
size_t i = 0; i < m_generators.size(); ++i) {
232 if (!m_isDisabled[i])
238 for(
size_t i = 0; i < m_generators.size(); ++i) {
239 if(!m_isPaused[i] && !m_futuresPending[i] && !m_isDisabled[i]) {
249 promise.setValue(ev);
252 template<
typename Generator>
253 void GeneratorValueStream<Generator>::initFuture(
size_t i)
255 assert(i < m_futures.size());
257 auto ptr = this->shared_from_this();
258 std::weak_ptr<GeneratorValueStream<Generator>> weak =
259 std::static_pointer_cast<GeneratorValueStream<Generator>>(ptr);
261 assert(i >= m_isPaused.size() || !m_isPaused[i]);
263 assert(!m_futuresPending[i]);
264 m_futuresPending[i] =
true;
266 auto cb = [weak, i] (
const ValueType& ev) -> ValueType {
267 auto me = weak.lock();
269 me->futureDone(i, ev);
275 m_futures[i] = m_generators[i]->next(m_executor.get()).thenValue(cb);
278 template<
typename Generator>
279 void GeneratorValueStream<Generator>::init()
282 for(
size_t i = 0; i < m_generators.size(); ++i) {
284 m_futures.emplace_back(folly::makeFuture<ValueType>(
285 folly::exception_wrapper(std::runtime_error(
"empty"))));
286 m_isPaused.push_back(
false);
287 m_futuresPending.push_back(
false);
288 m_isDisabled.push_back(
false);
293 template <
typename Generator>
294 folly::Future<typename GeneratorValueStream<Generator>::ValueType>
299 std::lock_guard<std::recursive_mutex> g(m_mutex);
308 assert(!m_generators.empty());
309 if(m_generators.empty() || m_disabledGenerators == m_generators.size())
310 return folly::makeFuture<ValueType>(
311 folly::exception_wrapper(std::runtime_error(
"No generators")));
314 std::lock_guard<std::recursive_mutex> g(m_mutex);
316 if(m_promisesWithoutFutures > 0) {
318 --m_promisesWithoutFutures;
320 auto promise = std::move(m_promises.front());
323 return promise.getFuture();
327 folly::Promise<ValueType> promise;
328 auto result = promise.getFuture();
329 m_promises.emplace(std::move(promise));
332 for(
size_t i = 0; i < m_generators.size(); ++i) {
333 if(!m_isPaused[i] && !m_futuresPending[i] && !m_isDisabled[i]) {
343 template <
typename Generator>
346 std::lock_guard<std::recursive_mutex> g(m_mutex);
348 return m_generators.empty();
351 template <
typename Generator>
352 bool GeneratorValueStream<Generator>::isPauseValue(
const ValueType& v)
354 return m_syncTest && m_syncTest(v);
357 template <
typename Generator>
364 template <
typename Generator>
371 template <
typename Generator>
374 const QByteArray & key)
376 std::lock_guard<std::recursive_mutex> g(m_mutex);
378 assert(!other->m_init);
379 size_t offset = m_generators.size();
383 bool storeIds = !key.isEmpty();
384 std::vector<size_t> generators;
386 for(
size_t i = 0; i < other->m_generators.size(); ++i) {
387 m_generators.emplace_back(other->m_generators[i]);
388 m_futures.emplace_back(folly::makeFuture<ValueType>(
389 folly::exception_wrapper(std::runtime_error(
"empty"))));
390 m_isPaused.push_back(
false);
391 m_futuresPending.push_back(
false);
392 m_isDisabled.push_back(
false);
393 initFuture(offset + i);
396 generators.push_back(offset + i);
400 m_addedGenerators[key] = generators;
403 template <
typename Generator>
404 void GeneratorValueStream<Generator>::
405 removeAddedGenerators(
const QByteArray & key)
407 std::lock_guard<std::recursive_mutex> g(m_mutex);
409 auto mapIt = m_addedGenerators.find(key);
410 if (mapIt == m_addedGenerators.end())
416 for(
auto it = mapIt->second.begin(); it != mapIt->second.end(); it++) {
418 assert(!m_isDisabled[i]);
419 m_isDisabled[i] =
true;
420 m_disabledGenerators++;
423 m_generatorsPending--;
424 if (m_futuresPending[i])
428 m_addedGenerators.erase(mapIt);