MT Showcase SDK
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
GeneratorValueStream.hpp
1 #pragma once
2 
3 #include "BackendComponents.hpp"
4 #include "ValueStream.hpp"
5 
6 #include <Radiant/BGThreadExecutor.hpp>
7 
8 #include <folly/MoveWrapper.h>
9 
10 #include <queue>
11 #include <vector>
12 
13 namespace Showcase
14 {
15 
18  template <typename Generator>
19  class GeneratorValueStream : public ValueStream<typename Generator::ValueType>
20  {
21  public:
22  static_assert(IsGenerator<Generator>::value,
23  "Need to use Generator as a type parameter "
24  "for GeneratorValueStream.");
25  typedef typename Generator::ValueType ValueType;
26  using GeneratorPtr = std::shared_ptr<Generator>;
27 
29  GeneratorValueStream(std::shared_ptr<folly::Executor> generatorExecutor);
30 
37  virtual ~GeneratorValueStream();
38 
39  virtual folly::Future<ValueType> next() override;
40 
48  void setSynchronizationTest(std::function<bool(const ValueType&)> f);
49 
52  void setCallback(std::function<void(const ValueType&)> f);
53 
55  void addGenerator(GeneratorPtr generator);
56 
57  void addStream(std::shared_ptr<GeneratorValueStream<Generator>> other,
58  const QByteArray & key = QByteArray());
59 
60  void removeAddedGenerators(const QByteArray & key);
61 
62  bool isEmpty() const;
63 
64  private:
65  void init();
66  bool isPauseValue(const ValueType& v);
67  void initFuture(size_t i);
68  void futureDone(size_t i, const ValueType& ev);
69 
72  std::vector<GeneratorPtr> m_generators;
73  std::shared_ptr<folly::Executor> m_executor;
74 
75  std::function<bool(const ValueType&)> m_syncTest;
76  std::function<void(const ValueType&)> m_onValue;
77 
78  // Mutex needs to be recursive, Otherwise setting callback in
79  // initFuture can deadlock if future is fulfilled immediately
80  mutable std::recursive_mutex m_mutex;
81  std::atomic<bool> m_init;
82 
86  std::vector<folly::Future<ValueType>> m_futures;
87  std::vector<bool> m_isPaused;
88  std::vector<bool> m_futuresPending;
89  std::vector<bool> m_isDisabled;
90 
93  size_t m_promisesWithoutFutures;
94 
96  size_t m_generatorsPending;
97 
98  size_t m_futuresWaited;
99 
100  size_t m_disabledGenerators;
101 
102  std::queue<folly::Promise<ValueType>> m_promises;
103 
104  std::map<QByteArray, std::vector<size_t> > m_addedGenerators;
105  };
106 
107  // --------------------------------------------------------------------------
108 
109  template <typename Generator>
110  GeneratorValueStream<Generator>::GeneratorValueStream(std::shared_ptr<folly::Executor> exec)
111  : m_executor(exec),
112  m_syncTest(nullptr),
113  m_init(false),
114  m_promisesWithoutFutures(0),
115  m_generatorsPending(0),
116  m_futuresWaited(0),
117  m_disabledGenerators(0)
118  {
119  }
120 
121  template<typename Generator>
123  {
124  if(m_promisesWithoutFutures > 0)
125  return;
126 
127  for(size_t i = 0; i < m_futures.size(); ++i) {
128  if(m_futuresPending[i] && !m_promises.empty()) {
129  auto promise = folly::makeMoveWrapper(m_promises.front());
130  m_promises.pop();
131 
132  std::move(m_futures[i]).thenValue([promise] (const ValueType& ev) mutable {
133  promise->setValue(ev);
134  });
135  }
136  }
137 
140  size_t i = 0;
141  while(!m_promises.empty()) {
142  auto promise = folly::makeMoveWrapper(m_promises.front());
143  m_promises.pop();
144 
145  if (m_disabledGenerators < m_generators.size()) {
147  while (m_isDisabled[i])
148  i = (i+1) % m_generators.size();
149  }
150 
151  auto gen = m_generators[i];
152  auto future = gen->next(m_executor.get());
153 
155  std::move(future).thenValue([promise, gen] (ValueType ev) mutable {
156  promise->setValue(ev);
157  });
158 
159  i = (i+1) % m_generators.size();
160  }
161  }
162 
163  template <typename Generator>
165  {
166  assert(!m_init);
167  m_generators.push_back(generator);
168  }
169 
172  template<typename Generator>
173  void GeneratorValueStream<Generator>::futureDone(size_t i, const ValueType &ev)
174  {
175  {
176  std::unique_lock<std::recursive_mutex> lck(m_mutex);
177  if (m_isDisabled[i])
178  return;
179  }
180 
181  if(m_onValue)
182  m_onValue(ev);
183 
184  std::unique_lock<std::recursive_mutex> lck(m_mutex);
185 
186  assert(m_futuresWaited > 0);
187  --m_futuresWaited;
188 
189  bool fulfillPromise = true;
190 
191  assert(!m_isPaused[i]);
192  assert(m_futuresPending[i]);
193  m_futuresPending[i] = false;
194 
195  if(isPauseValue(ev)) {
196  ++m_generatorsPending;
197  m_isPaused[i] = true;
198  fulfillPromise = (m_generators.size() - m_disabledGenerators) == m_generatorsPending;
199  }
200 
201  bool setPromise = false;
202  folly::Promise<ValueType> promise;
203 
204  if(fulfillPromise) {
205 
206  if(m_promisesWithoutFutures > 0 || m_promises.empty()) {
207  folly::Promise<ValueType> promise;
208  promise.setValue(ev);
209  m_promises.emplace(std::move(promise));
210  ++m_promisesWithoutFutures;
211  } else {
212  promise = std::move(m_promises.front());
213  m_promises.pop();
214  setPromise = true;
215  }
216  }
217 
218  if(isPauseValue(ev) && fulfillPromise) {
219  m_generatorsPending = 0;
220  std::fill(m_isPaused.begin(), m_isPaused.end(), false);
221  }
222 
223  size_t promises = m_promises.size();
224  if(promises > 0 && m_promisesWithoutFutures == 0) {
225  if(!isPauseValue(ev)) {
226  initFuture(i);
227  } else {
228  if(fulfillPromise) {
231  for(size_t i = 0; i < m_generators.size(); ++i) {
232  if (!m_isDisabled[i])
233  initFuture(i);
234  }
235  } else {
238  for(size_t i = 0; i < m_generators.size(); ++i) {
239  if(!m_isPaused[i] && !m_futuresPending[i] && !m_isDisabled[i]) {
240  initFuture(i);
241  }
242  }
243  }
244  }
245  }
246  lck.unlock();
247 
248  if(setPromise)
249  promise.setValue(ev);
250  }
251 
252  template<typename Generator>
253  void GeneratorValueStream<Generator>::initFuture(size_t i)
254  {
255  assert(i < m_futures.size());
256 
257  auto ptr = this->shared_from_this();
258  std::weak_ptr<GeneratorValueStream<Generator>> weak =
259  std::static_pointer_cast<GeneratorValueStream<Generator>>(ptr);
260 
261  assert(i >= m_isPaused.size() || !m_isPaused[i]);
262 
263  assert(!m_futuresPending[i]);
264  m_futuresPending[i] = true;
265 
266  auto cb = [weak, i] (const ValueType& ev) -> ValueType {
267  auto me = weak.lock();
268  if(me) {
269  me->futureDone(i, ev);
270  }
271  return ev;
272  };
273  ++m_futuresWaited;
274 
275  m_futures[i] = m_generators[i]->next(m_executor.get()).thenValue(cb);
276  }
277 
278  template<typename Generator>
279  void GeneratorValueStream<Generator>::init()
280  {
281  // Is guaranteed to be executed when mutex is locked
282  for(size_t i = 0; i < m_generators.size(); ++i) {
284  m_futures.emplace_back(folly::makeFuture<ValueType>(
285  folly::exception_wrapper(std::runtime_error("empty"))));
286  m_isPaused.push_back(false);
287  m_futuresPending.push_back(false);
288  m_isDisabled.push_back(false);
289  initFuture(i);
290  }
291  }
292 
293  template <typename Generator>
294  folly::Future<typename GeneratorValueStream<Generator>::ValueType>
296  {
298  if(!m_init) {
299  std::lock_guard<std::recursive_mutex> g(m_mutex);
300 
301  if(!m_init) {
302  init();
303  m_init = true;
304  }
305  }
306 
308  assert(!m_generators.empty());
309  if(m_generators.empty() || m_disabledGenerators == m_generators.size())
310  return folly::makeFuture<ValueType>(
311  folly::exception_wrapper(std::runtime_error("No generators")));
312 
313  {
314  std::lock_guard<std::recursive_mutex> g(m_mutex);
315 
316  if(m_promisesWithoutFutures > 0) {
318  --m_promisesWithoutFutures;
319 
320  auto promise = std::move(m_promises.front());
321  m_promises.pop();
322 
323  return promise.getFuture();
324  } else {
326 
327  folly::Promise<ValueType> promise;
328  auto result = promise.getFuture();
329  m_promises.emplace(std::move(promise));
330 
332  for(size_t i = 0; i < m_generators.size(); ++i) {
333  if(!m_isPaused[i] && !m_futuresPending[i] && !m_isDisabled[i]) {
334  initFuture(i);
335  }
336  }
337 
338  return result;
339  }
340  }
341  }
342 
343  template <typename Generator>
345  {
346  std::lock_guard<std::recursive_mutex> g(m_mutex);
347 
348  return m_generators.empty();
349  }
350 
351  template <typename Generator>
352  bool GeneratorValueStream<Generator>::isPauseValue(const ValueType& v)
353  {
354  return m_syncTest && m_syncTest(v);
355  }
356 
357  template <typename Generator>
359  setSynchronizationTest(std::function<bool(const ValueType&)> f)
360  {
361  m_syncTest = f;
362  }
363 
364  template <typename Generator>
366  setCallback(std::function<void(const ValueType&)> f)
367  {
368  m_onValue = f;
369  }
370 
371  template <typename Generator>
373  addStream(std::shared_ptr<GeneratorValueStream<Generator>> other,
374  const QByteArray & key)
375  {
376  std::lock_guard<std::recursive_mutex> g(m_mutex);
377 
378  assert(!other->m_init);
379  size_t offset = m_generators.size();
380 
381  // The stream can have a key associated with it, which can be used later to remove
382  // generators that were added from that stream
383  bool storeIds = !key.isEmpty();
384  std::vector<size_t> generators;
385 
386  for(size_t i = 0; i < other->m_generators.size(); ++i) {
387  m_generators.emplace_back(other->m_generators[i]);
388  m_futures.emplace_back(folly::makeFuture<ValueType>(
389  folly::exception_wrapper(std::runtime_error("empty"))));
390  m_isPaused.push_back(false);
391  m_futuresPending.push_back(false);
392  m_isDisabled.push_back(false);
393  initFuture(offset + i);
394 
395  if (storeIds)
396  generators.push_back(offset + i);
397  }
398 
399  if (storeIds)
400  m_addedGenerators[key] = generators;
401  }
402 
403  template <typename Generator>
404  void GeneratorValueStream<Generator>::
405  removeAddedGenerators(const QByteArray & key)
406  {
407  std::lock_guard<std::recursive_mutex> g(m_mutex);
408 
409  auto mapIt = m_addedGenerators.find(key);
410  if (mapIt == m_addedGenerators.end())
411  return;
412 
413  // Since everything in this class relies on generator positions in vector, don't
414  // actually remove the generators, just disable them
415  // @todo could do a clean up and remap when we're not waiting for any values
416  for(auto it = mapIt->second.begin(); it != mapIt->second.end(); it++) {
417  auto i = (*it);
418  assert(!m_isDisabled[i]);
419  m_isDisabled[i] = true;
420  m_disabledGenerators++;
421 
422  if (m_isPaused[i])
423  m_generatorsPending--;
424  if (m_futuresPending[i])
425  m_futuresWaited--;
426  }
427 
428  m_addedGenerators.erase(mapIt);
429  }
430 
431 
432 }