MT Showcase SDK
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
ValueStreamChannels.hpp
1 #pragma once
2 
3 #include "ValueStream.hpp"
4 
5 #include "BackendComponents.hpp"
6 
7 #include <boost/intrusive/set.hpp>
8 #include <deque>
9 #include <queue>
10 
11 namespace Showcase
12 {
13 
14  template <typename T> class ChannelGuard;
15  template <typename T> class CallbackLock;
16 
17  template <typename T> class ValueStreamSplitter;
18 
19 
20  // ------------------------------------------------------------------
21  // ------------------------------------------------------------------
22  // ------------------------------------------------------------------
23 
24 
26 
28  template <typename T>
29  class ValueStreamChannel : public ValueStream<T>
30  {
31  public:
32  ValueStreamChannel(int channel, std::shared_ptr<ValueStreamSplitter<T>> source);
33  virtual ~ValueStreamChannel();
34 
35  virtual folly::Future<T> next() override;
36 
37  std::shared_ptr<ValueStreamSplitter<T>> splitter();
38 
39  private:
40  std::shared_ptr<ValueStreamSplitter<T>> m_source;
41  const int m_channel;
42  };
43  template <typename T>
44  using ValueStreamChannelPtr = std::shared_ptr<ValueStreamChannel<T>>;
45 
46 
47  // ------------------------------------------------------------------
48  // ------------------------------------------------------------------
49  // ------------------------------------------------------------------
50 
51 
54  template <typename T>
55  class CloneableValueStreamChannel : public ValueStream<CallbackLock<T>>
56  {
57  public:
58  CloneableValueStreamChannel(int channel, std::shared_ptr<ValueStreamSplitter<T>> source);
59  virtual ~CloneableValueStreamChannel();
60 
61  virtual folly::Future<CallbackLock<T>> next() override;
62  folly::Future<T> nextUnwrapped();
63 
66  ChannelGuard<CloneableValueStreamChannel<T>> clone2(); // a bit crappy name....
67 
68  std::shared_ptr<ValueStreamSplitter<T>> splitter();
69 
70  private:
71  std::shared_ptr<ValueStreamSplitter<T>> m_source;
72  const int m_channel;
73  };
74  template <typename T>
75  using CloneableValueStreamChannelPtr = std::shared_ptr<CloneableValueStreamChannel<T>>;
76 
77 
83  template <typename T>
84  std::shared_ptr<CloneableValueStreamChannel<T>>
85  getCloneableChannel(ValueStreamPtr<T> stream)
86  {
87  if(auto ch = std::dynamic_pointer_cast<CloneableValueStreamChannel<T>>(stream)) {
88  return ch;
89  } else {
90  return getSplitter(stream)->openCloneableChannel().channel();
91  }
92  }
93 
94 
95  // ------------------------------------------------------------------
96  // ------------------------------------------------------------------
97  // ------------------------------------------------------------------
98 
99 
101  template <typename T>
102  struct ChannelInfo : public boost::intrusive::set_base_hook<>
103  {
104  ChannelInfo(int ch, bool cloneable)
105  : nextCachedValue(0), channel(ch), onTheWay(0), isCloneable(cloneable)
106  {}
107 
108  bool operator<(const ChannelInfo& ci) const { return channel < ci.channel; }
109  bool operator>(const ChannelInfo& ci) const { return channel > ci.channel; }
110  bool operator==(const ChannelInfo& ci) const { return channel == ci.channel; }
111 
112  size_t nextCachedValue;
113  int channel;
114  std::queue<folly::Promise<CallbackLock<T>>> promises;
115 
117 
124  std::mutex channelMutex;
125  std::condition_variable condition;
126  size_t onTheWay;
127  const bool isCloneable;
128  };
129 
130 
131  // ------------------------------------------------------------------
132  // ------------------------------------------------------------------
133  // ------------------------------------------------------------------
134 
135 
139  template <typename T>
140  class ValueStreamSplitter : public ValueStream<T>
141  {
142  public:
143  using ReturnType = CallbackLock<T>;
144 
145  ValueStreamSplitter(std::shared_ptr<ValueStream<T>> stream);
146  virtual ~ValueStreamSplitter();
147 
148  folly::Future<ReturnType> next(int channel);
149 
150  ChannelGuard<ValueStreamChannel<T>> pauseChannels();
151 
155  ChannelGuard<CloneableValueStreamChannel<T>> openCloneableChannel();
156 
159  ChannelGuard<CloneableValueStreamChannel<T>> cloneChannel2(int target);
160 
161  void closeChannel(int channel);
162  bool hasChannel(int channel) const;
163 
169  std::shared_ptr<ValueStream<T>> stream();
170 
171  size_t channelsOpen() const;
172 
173  private:
174  virtual folly::Future<T> next() override;
175 
176  ChannelGuard<ValueStreamChannel<T>> newNonCloneableChannel(int cloneTarget, bool clone);
177  ChannelGuard<CloneableValueStreamChannel<T>> newCloneableChannel(int cloneTarget, bool clone);
178 
179  int newChannelImpl(int cloneTarget, bool clone, bool isCloneable,
180  std::unique_lock<std::recursive_mutex>& splitterLock,
181  std::unique_lock<std::mutex>& channelLock);
182 
183  void clearUsedCache();
184  int findFreeChannel() const;
185  void initChannel(int channel, bool isCloneable);
186  void launchNewQuery();
187  void receivedValue(const T& val, std::unique_lock<std::recursive_mutex>&& channelLock);
188 
190  boost::intrusive::set<ChannelInfo<T>> m_channels;
191 
192  std::shared_ptr<ValueStream<T>> m_stream;
193  size_t m_futuresWaited;
194  // Used for keeping the values in the same order they are fed from source
195  size_t m_valueCounter;
196  size_t m_valuesHandled;
197  std::condition_variable m_valuesInOrderCondition;
198  std::mutex m_valuesHandledMutex;
199 
200  std::deque<T> m_cachedValues;
201 
203  mutable std::recursive_mutex m_channelsMutex;
204  };
205 
209  template <typename T>
210  std::shared_ptr<ValueStreamSplitter<T>> getSplitter(ValueStreamPtr<T> stream)
211  {
212  if(auto ptr = std::dynamic_pointer_cast<ValueStreamSplitter<T>>(stream))
213  return ptr;
214  else if(auto ptr = std::dynamic_pointer_cast<ValueStreamChannel<T>>(stream))
215  return ptr->splitter();
216  else if(auto ptr = std::dynamic_pointer_cast<CloneableValueStreamChannel<T>>(stream))
217  return ptr->splitter();
218  else
219  return std::make_shared<ValueStreamSplitter<T>>(stream);
220  }
221 
222 
223  // ------------------------------------------------------------------
224  // ------------------------------------------------------------------
225  // ------------------------------------------------------------------
226 
227 
228 
239  template <typename Channel>
240  class ChannelGuard
241  {
242  public:
243  /* Please give me static_if or some sane way to do sanity checks!
244  static_assert(HasValueType<Channel>::value && (
245  std::is_same<Channel, ValueStreamChannel<typename Channel::ValueType>>::value
246  || (HasValueType<typename Channel::ValueType>::value &&
247  std::is_same<typename Channel::ValueType, CallbackLock<typename Channel::ValueType::ValueType>>::value &&
248  std::is_same<Channel, CloneableValueStreamChannel<typename Channel::ValueType::ValueType>>::value)
249  ),
250  "Template parameter is not ValueStreamChannel.");
251  */
252 
253  ChannelGuard();
254  ChannelGuard(std::shared_ptr<Channel> channel, std::unique_lock<std::recursive_mutex>&& splitterLock,
255  std::unique_lock<std::mutex>&& channelLock);
256 
257  ChannelGuard(ChannelGuard&& b) = default;
258  ChannelGuard& operator=(ChannelGuard&& b);
259 
260  ChannelGuard(const ChannelGuard&) = delete;
261  ChannelGuard& operator=(const ChannelGuard&) = delete;
262 
263  std::shared_ptr<Channel> channel() const;
264 
265  private:
266  std::shared_ptr<Channel> m_channel;
268  std::unique_lock<std::recursive_mutex> m_splitterLock;
270  std::unique_lock<std::mutex> m_channelLock;
271  };
272 
273 
274  // ------------------------------------------------------------------
275  // ------------------------------------------------------------------
276  // ------------------------------------------------------------------
277 
278 
280  template <typename T>
281  class CallbackLock
282  {
283  public:
284  typedef T ValueType;
285 
286  CallbackLock(T&& val, std::mutex& channelMutex,
287  std::condition_variable& cv, size_t& onTheWay, bool useSync);
288 
289  CallbackLock(CallbackLock&& b);
290 
291  CallbackLock& operator=(CallbackLock&& b) = delete;
292  CallbackLock(const CallbackLock&) = delete;
293  CallbackLock& operator=(const CallbackLock&) = delete;
294 
295  ~CallbackLock();
296 
297  const T& value() const;
298 
299  private:
302  std::mutex& m_channelMutex;
303  std::condition_variable& m_condition;
304  size_t& m_onTheWay;
305  T m_value;
306  bool m_useGuard;
307  };
308 
309 
310  // ------------------------------------------------------------------
311  // ------------------------------------------------------------------
312  // ------------------------------------------------------------------
313 
314  template <typename Stream>
315  ChannelGuard<Stream>::ChannelGuard()
316  : m_channel(nullptr)
317  {
318  }
319 
320  template <typename Stream>
321  ChannelGuard<Stream>::ChannelGuard(std::shared_ptr<Stream> channel,
322  std::unique_lock<std::recursive_mutex>&& splitterLock,
323  std::unique_lock<std::mutex>&& channelLock)
324  : m_channel(channel),
325  m_splitterLock(std::move(splitterLock)),
326  m_channelLock(std::move(channelLock))
327  {
328  }
329 
330  template <typename Stream>
331  ChannelGuard<Stream>& ChannelGuard<Stream>::operator=(ChannelGuard<Stream> &&b)
332  {
333  m_channel = std::move(b.m_channel);
334  m_splitterLock = std::move(b.m_splitterLock);
335  m_channelLock = std::move(b.m_channelLock);
336  return *this;
337  }
338 
339  template <typename Stream>
340  std::shared_ptr<Stream> ChannelGuard<Stream>::channel() const
341  {
342  return m_channel;
343  }
344 
345 
346  // ------------------------------------------------------------------
347  // ------------------------------------------------------------------
348  // ------------------------------------------------------------------
349 
350 
351  template <typename T>
352  CallbackLock<T>::CallbackLock(T&& val, std::mutex& channelMutex,
353  std::condition_variable& cv, size_t& onTheWay,
354  bool useSync)
355  : m_channelMutex(channelMutex),
356  m_condition(cv),
357  m_onTheWay(onTheWay),
358  m_value(std::move(val)),
359  m_useGuard(useSync)
360  {
361  }
362 
363  template <typename T>
364  CallbackLock<T>::CallbackLock(CallbackLock &&b)
365  : m_channelMutex(b.m_channelMutex),
366  m_condition(b.m_condition),
367  m_onTheWay(b.m_onTheWay),
368  m_value(std::move(b.m_value)),
369  m_useGuard(b.m_useGuard)
370  {
371  b.m_useGuard = false;
372  }
373 
374  template <typename T>
376  {
377  if(m_useGuard) {
378  std::unique_lock<std::mutex> lck(m_channelMutex);
379  assert(m_onTheWay > 0);
380  --m_onTheWay;
381  bool signal = m_onTheWay == 0;
384  lck.unlock();
385  if(signal)
386  m_condition.notify_one();
387  }
388  }
389 
390  template <typename T>
391  const T& CallbackLock<T>::value() const
392  {
393  return m_value;
394  }
395 
396 
397  // ------------------------------------------------------------------
398  // ------------------------------------------------------------------
399  // ------------------------------------------------------------------
400 
401 
402  template <typename T>
403  ValueStreamChannel<T>::ValueStreamChannel(int channel, std::shared_ptr<ValueStreamSplitter<T>> source)
404  : m_source(source),
405  m_channel(channel)
406  {
407  assert(m_source);
408  }
409 
410  template <typename T>
411  ValueStreamChannel<T>::~ValueStreamChannel()
412  {
413  m_source->closeChannel(m_channel);
414  }
415 
416  template <typename T>
417  folly::Future<T> ValueStreamChannel<T>::next()
418  {
419  auto unwrap = [] (const CallbackLock<T>& cbl) -> T {
420  return cbl.value();
421  };
422  return m_source->next(m_channel).thenValue(unwrap);
423  }
424 
425  template <typename T>
426  std::shared_ptr<ValueStreamSplitter<T>> ValueStreamChannel<T>::splitter()
427  {
428  return m_source;
429  }
430 
431 
432  // ------------------------------------------------------------------
433  // ------------------------------------------------------------------
434  // ------------------------------------------------------------------
435 
436 
437  template <typename T>
438  CloneableValueStreamChannel<T>::
439  CloneableValueStreamChannel(int channel, std::shared_ptr<ValueStreamSplitter<T>> source)
440  : m_source(source),
441  m_channel(channel)
442  {
443  assert(m_source);
444  }
445 
446  template <typename T>
447  CloneableValueStreamChannel<T>::~CloneableValueStreamChannel()
448  {
449  m_source->closeChannel(m_channel);
450  }
451 
452  template <typename T>
453  folly::Future<CallbackLock<T>> CloneableValueStreamChannel<T>::next()
454  {
455  return m_source->next(m_channel);
456  }
457 
458  template <typename T>
459  std::shared_ptr<ValueStreamSplitter<T>>
460  CloneableValueStreamChannel<T>::splitter()
461  {
462  return m_source;
463  }
464 
465  template <typename T>
466  folly::Future<T> CloneableValueStreamChannel<T>::nextUnwrapped()
467  {
468  auto unwrap = [] (const CallbackLock<T>& cbl) -> T {
469  return cbl.value();
470  };
471  return m_source->next(m_channel).thenValue(unwrap);
472  }
473 
474  template <typename T>
476  {
477  return m_source->cloneChannel(m_channel);
478  }
479 
480  template <typename T>
482  {
483  return m_source->cloneChannel2(m_channel);
484  }
485 
486  // ------------------------------------------------------------------
487  // ------------------------------------------------------------------
488  // ------------------------------------------------------------------
489 
490 
492  template <typename T>
494  {
495  bool operator()(int channel, const ChannelInfo<T>& ci) const {
496  return channel < ci.channel;
497  }
498 
499  bool operator()(const ChannelInfo<T>& ci, int channel) const {
500  return ci.channel < channel;
501  }
502  };
503 
505  template <typename T>
507  {
509  {
510  if(ci->isCloneable) {
513  std::unique_lock<std::mutex> g(ci->channelMutex);
514  if(ci->onTheWay > 0) {
515  ci->condition.wait(g, [ci] { return ci->onTheWay == 0; });
517  assert(ci->onTheWay == 0);
518  }
519  }
520  delete ci;
521  }
522  };
523 
524 
525  // ------------------------------------------------------------------
526  // ------------------------------------------------------------------
527  // ------------------------------------------------------------------
528 
529 
530  template <typename T>
531  ValueStreamSplitter<T>::ValueStreamSplitter(ValueStreamPtr<T> stream)
532  : m_stream(stream),
533  m_futuresWaited(0),
534  m_valueCounter(0),
535  m_valuesHandled(0)
536  {
537  }
538 
539  template <typename T>
540  ValueStreamSplitter<T>::~ValueStreamSplitter()
541  {
542  std::lock_guard<std::recursive_mutex> g(m_channelsMutex);
543  m_stream.reset();
544  m_channels.clear_and_dispose(ChannelInfoDisposer<T>());
545  }
546 
547  template <typename T>
548  folly::Future<CallbackLock<T>>
550  {
551  folly::Future<ReturnType> future = folly::makeFuture<ReturnType>(
552  folly::exception_wrapper(std::runtime_error("empty")));
553 
554  bool newQuery = false;
555  std::unique_lock<std::recursive_mutex> channelsLock(m_channelsMutex);
556  {
557  auto cit = m_channels.find(channel, ChannelInfoCmp<T>());
558  assert(cit != m_channels.end());
559  ChannelInfo<T>& ci = *cit;
560 
561  std::unique_lock<std::mutex> lck(ci.channelMutex);
562 
563  if(ci.nextCachedValue < m_cachedValues.size()) {
564  T val = m_cachedValues[ci.nextCachedValue++];
565 
566  channelsLock.unlock();
567 
568  assert(ci.promises.empty());
569 
570  ++ci.onTheWay;
571 
572  CallbackLock<T> cbl(std::move(val), ci.channelMutex,
573  ci.condition, ci.onTheWay, ci.isCloneable);
574  return folly::makeFuture(std::move(cbl));
575  } else {
577  assert(ci.promises.size() <= m_futuresWaited);
578  if(m_futuresWaited == ci.promises.size()) {
579  newQuery = true;
580  ++m_futuresWaited;
581  }
582 
583  folly::Promise<ReturnType> promise;
584  future = promise.getFuture();
585  ci.promises.emplace(std::move(promise));
586  }
587  }
588 
589  if(newQuery) {
590  launchNewQuery();
591  }
592 
593  return future;
594  }
595 
596  template <typename T>
598  {
599  std::unique_lock<std::recursive_mutex> lock(m_channelsMutex);
600  std::unique_lock<std::mutex> dummy;
601  return ChannelGuard<ValueStreamChannel<T>>(nullptr, std::move(lock), std::move(dummy));
602  }
603 
604  template <typename T>
606  {
607  return newNonCloneableChannel(0, false);
608  }
609 
610  template <typename T>
613  {
614  return newCloneableChannel(0, false);
615  }
616 
617  template <typename T>
619  {
620  return newNonCloneableChannel(target, false);
621  }
622 
623  template <typename T>
626  {
627  return newCloneableChannel(target, true);
628  }
629 
630  template <typename T>
631  void ValueStreamSplitter<T>::closeChannel(int channel)
632  {
633  std::lock_guard<std::recursive_mutex> g(m_channelsMutex);
634  auto it = m_channels.find(channel, ChannelInfoCmp<T>());
635  assert(it != m_channels.end());
636  m_channels.erase_and_dispose(it, ChannelInfoDisposer<T>());
637  }
638 
639  template <typename T>
640  bool ValueStreamSplitter<T>::hasChannel(int channel) const
641  {
642  std::lock_guard<std::recursive_mutex> g(m_channelsMutex);
643  return m_channels.find(channel, ChannelInfoCmp<T>()) == m_channels.end();
644  }
645 
646  template <typename T>
647  std::shared_ptr<ValueStream<T>> ValueStreamSplitter<T>::stream()
648  {
649  assert(m_channels.empty());
650  return m_stream;
651  }
652 
653  template <typename T>
655  {
656  std::lock_guard<std::recursive_mutex> g(m_channelsMutex);
657  return m_channels.size();
658  }
659 
660  template <typename T>
661  folly::Future<T> ValueStreamSplitter<T>::next()
662  {
663  return next(0).thenValue([] (const ReturnType& p) -> T { return p.value(); });
664  }
665 
666  template <typename T>
667  ChannelGuard<ValueStreamChannel<T>>
668  ValueStreamSplitter<T>::newNonCloneableChannel(int cloneTarget, bool clone)
669  {
670  std::unique_lock<std::recursive_mutex> splitterLock;
671  std::unique_lock<std::mutex> channelLock;
672  int channel = newChannelImpl(cloneTarget, clone, false, splitterLock, channelLock);
673 
674  auto me = std::static_pointer_cast<ValueStreamSplitter<T>>(this->shared_from_this());
675  auto stream = std::make_shared<ValueStreamChannel<T>>(channel, me);
676 
677  return ChannelGuard<ValueStreamChannel<T>>(stream, std::move(splitterLock), std::move(channelLock));
678  }
679 
680  template <typename T>
681  ChannelGuard<CloneableValueStreamChannel<T>>
682  ValueStreamSplitter<T>::newCloneableChannel(int cloneTarget, bool clone)
683  {
684  std::unique_lock<std::recursive_mutex> splitterLock;
685  std::unique_lock<std::mutex> channelLock;
686  int channel = newChannelImpl(cloneTarget, clone, true, splitterLock, channelLock);
687 
688  auto me = std::static_pointer_cast<ValueStreamSplitter<T>>(this->shared_from_this());
689  auto stream = std::make_shared<CloneableValueStreamChannel<T>>(channel, me);
690 
691  return ChannelGuard<CloneableValueStreamChannel<T>>(stream, std::move(splitterLock), std::move(channelLock));
692  }
693 
694  template <typename T>
695  int ValueStreamSplitter<T>::
696  newChannelImpl(int target, bool clone, bool isCloneable,
697  std::unique_lock<std::recursive_mutex>& splitterLock,
698  std::unique_lock<std::mutex>& channelLock)
699  {
700  splitterLock = std::unique_lock<std::recursive_mutex>(m_channelsMutex);
701 
702  int channel = findFreeChannel();
703 
704  initChannel(channel, isCloneable);
705 
706  auto itt = m_channels.find(channel, ChannelInfoCmp<T>());
707  assert(itt != m_channels.end());
708  ChannelInfo<T>& newChannel = *itt;
709 
710  if(clone) {
711  assert(m_channels.find(target, ChannelInfoCmp<T>()) != m_channels.end());
712  auto it = m_channels.find(target, ChannelInfoCmp<T>());
713  ChannelInfo<T>& ci = *it;
714 
715  assert(ci.isCloneable);
716  channelLock = std::unique_lock<std::mutex>(ci.channelMutex);
717 
718  size_t& onTheWay = ci.onTheWay;
719  ci.condition.wait(channelLock, [&onTheWay] { return onTheWay == 0; });
720 
721  newChannel.nextCachedValue = ci.nextCachedValue;
722 
723  assert(ci.promises.size() <= m_futuresWaited);
724  } else {
725  newChannel.nextCachedValue = m_cachedValues.size();
726  }
727  return channel;
728  }
729 
730  template <typename T>
731  void ValueStreamSplitter<T>::clearUsedCache()
732  {
734  for(auto it = m_channels.begin(); it != m_channels.end(); ++it) {
735  it->channelMutex.lock();
736  }
737 
738  size_t minNextValue = m_cachedValues.size();
740  for(auto it = m_channels.begin(); it != m_channels.end(); ++it) {
741  assert(it->nextCachedValue <= m_cachedValues.size());
742  minNextValue = std::min(it->nextCachedValue, minNextValue);
743  }
744 
745  auto updateIndices = [&] {
746  for(auto it = m_channels.begin(); it != m_channels.end(); ++it) {
747  assert(it->nextCachedValue >= minNextValue);
748  it->nextCachedValue -= minNextValue;
749  }
750  };
751 
752 
753  if(minNextValue > 0 && minNextValue == m_cachedValues.size()) {
754  m_cachedValues.clear();
755  updateIndices();
756  } else if(minNextValue > 0) {
757  assert(minNextValue < m_cachedValues.size());
758  auto eraseIt = m_cachedValues.begin();
759  std::advance(eraseIt, minNextValue);
760  m_cachedValues.erase(m_cachedValues.begin(), eraseIt);
761  updateIndices();
762  }
763 
764  for(auto it = m_channels.begin(); it != m_channels.end(); ++it) {
765  it->channelMutex.unlock();
766  }
767  }
768 
769  template <typename T>
770  int ValueStreamSplitter<T>::findFreeChannel() const
771  {
773 
777  auto it = m_channels.begin();
778  if(it == m_channels.end()) {
779  return 0;
780  } else if (it->channel > 0) {
781  return it->channel - 1;
782  } else {
783  auto itt = m_channels.rbegin();
784  assert(itt != m_channels.rend());
785  return itt->channel + 1;
786  }
787  }
788 
789  template <typename T>
790  void ValueStreamSplitter<T>::initChannel(int channel, bool isCloneable)
791  {
792  typename boost::intrusive::set<ChannelInfo<T>>::insert_commit_data insert_data;
793  auto ins_pair = m_channels.insert_check(channel, ChannelInfoCmp<T>(), insert_data);
794  (void)ins_pair;
795  assert(ins_pair.second);
796  m_channels.insert_commit(*new ChannelInfo<T>(channel, isCloneable), insert_data);
797  }
798 
799  template <typename T>
800  void ValueStreamSplitter<T>::launchNewQuery()
801  {
802  std::weak_ptr<ValueStreamSplitter<T>> weak =
803  std::static_pointer_cast<ValueStreamSplitter<T>>(this->shared_from_this());
804 
805  folly::Future<T> fut = folly::makeFuture<T>(
806  folly::exception_wrapper(std::runtime_error("Empty future")));
807 
808  size_t count;
809  {
810  std::lock_guard<std::recursive_mutex> g(m_channelsMutex);
811 
812  count = m_valueCounter;
813  ++m_valueCounter;
814  fut = m_stream->next();
815  }
816 
817  std::move(fut).thenValue([weak, count](const T& val) {
818  auto me = weak.lock();
819  if(!me)
820  return;
821 
822  {
823  std::unique_lock<std::mutex> valueMutex(me->m_valuesHandledMutex);
824  me->m_valuesInOrderCondition.wait(valueMutex, [count, me] {
825  return count == me->m_valuesHandled;
826  });
827 
828  std::unique_lock<std::recursive_mutex> channelLock(me->m_channelsMutex);
829  valueMutex.unlock();
830  me->receivedValue(val, std::move(channelLock));
831  }
832  me->m_valuesInOrderCondition.notify_all();
833  });
834  }
835 
836  template <typename T>
837  void ValueStreamSplitter<T>::receivedValue(const T& val, std::unique_lock<std::recursive_mutex>&& channelLock)
838  {
839  clearUsedCache();
840 
841  assert(m_futuresWaited > 0);
842  --m_futuresWaited;
843 
844  m_cachedValues.push_back(val);
845 
846  std::set<int> channels;
847  for(auto it = m_channels.begin(); it != m_channels.end(); ++it) {
848  channels.insert(it->channel);
849  }
850 
853 
854  for(auto it = channels.begin(); it != channels.end(); ++it) {
855 
856  auto cit = m_channels.find(*it, ChannelInfoCmp<T>());
857  if(cit == m_channels.end())
858  continue;
859 
860  ChannelInfo<T>& ci = *cit;
861 
862  std::unique_lock<std::mutex> lck(ci.channelMutex);
863 
864  assert(ci.promises.size() <= m_futuresWaited + 1);
865 
866  if(!ci.promises.empty()) {
867  size_t next = ci.nextCachedValue++;
868  assert(ci.nextCachedValue == m_cachedValues.size() );
869 
870  ++ci.onTheWay;
871 
872  auto promise = std::move(ci.promises.front());
873  ci.promises.pop();
874 
875  T value = m_cachedValues[next];
876  auto& mutex = ci.channelMutex;
877  auto& cond = ci.condition;
878  auto& onTheWay = ci.onTheWay;
879  bool cloneable = ci.isCloneable;
880 
881  lck.unlock();
882 
883  auto cblock = CallbackLock<T>(std::move(value), mutex, cond, onTheWay, cloneable);
884 
885  // promise may end up making calls to next via callback chain
886  promise.setValue(std::move(cblock));
887  }
888  }
889  std::lock_guard<std::mutex> g(m_valuesHandledMutex);
890  ++m_valuesHandled;
891  channelLock.unlock();
892  }
893 
894 }