MT Showcase SDK
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
StreamConsumer.hpp
1 #pragma once
2 
3 #include <memory>
4 
5 #include "ValueStream.hpp"
6 
7 namespace Showcase
8 {
9 
10  template <typename T>
11  class StreamConsumer : public std::enable_shared_from_this<StreamConsumer<T>>
12  {
13  public:
17  StreamConsumer(ValueStreamPtr<T> stream);
18  virtual ~StreamConsumer();
19 
20  void init();
21 
22  virtual void onValue(const T& val) = 0;
23 
27  void pauseConsumption();
28  void continueConsumption();
29 
30  protected:
31  void initListener();
32 
33  private:
34  ValueStreamPtr<T> m_stream;
35  bool m_consume;
36  };
37  template <typename T>
38  using StreamConsumerPtr = std::shared_ptr<StreamConsumer<T>>;
39 
40  // ------------------------------------------------------------------
41  // ------------------------------------------------------------------
42  // ------------------------------------------------------------------
43 
44  template <typename T>
45  class FuncStreamConsumer : public StreamConsumer<T>
46  {
47  public:
48  FuncStreamConsumer(ValueStreamPtr<T> stream,
49  std::function<void(const T&)> onValue);
50  virtual void onValue(const T& val) override;
51 
52  private:
53  std::function<void(const T&)> m_onValue;
54  };
55 
56  // ------------------------------------------------------------------
57  // ------------------------------------------------------------------
58  // ------------------------------------------------------------------
59 
60  template <typename T>
61  StreamConsumer<T>::StreamConsumer(ValueStreamPtr<T> stream)
62  : m_stream(stream),
63  m_consume(true)
64  {
65  assert(stream);
66  }
67 
68  template <typename T>
69  StreamConsumer<T>::~StreamConsumer()
70  {
71  }
72 
73  template <typename T>
74  void StreamConsumer<T>::init()
75  {
76  initListener();
77  }
78 
79  template <typename T>
80  void StreamConsumer<T>::pauseConsumption()
81  {
82  m_consume = false;
83  }
84 
85  template <typename T>
86  void StreamConsumer<T>::continueConsumption()
87  {
88  bool didConsume = m_consume;
89  m_consume = true;
90  if(!didConsume)
91  initListener();
92  }
93 
94  template <typename T>
95  void StreamConsumer<T>::initListener()
96  {
97  std::weak_ptr<StreamConsumer<T>> weak = this->shared_from_this();
98  auto func = [weak](T&& ev) {
99  auto me = weak.lock();
100  if(me) {
106  auto tmp = std::move(ev);
107  me->onValue(tmp);
108  }
109 
111  auto f = [weak] {
112  auto me = weak.lock();
113  if(me && me->m_consume) {
114  me->initListener();
115  }
116  };
117 
118  f();
119  };
120 
124  //m_stream->next().via(m_executor).then(func);
125  m_stream->next().thenValue(func);
126  }
127 
128  // ------------------------------------------------------------------
129  // ------------------------------------------------------------------
130  // ------------------------------------------------------------------
131 
132  template <typename T>
133  FuncStreamConsumer<T>::FuncStreamConsumer(ValueStreamPtr<T> stream,
134  std::function<void(const T&)> onValue)
135  : StreamConsumer<T>(stream),
136  m_onValue(onValue)
137  {
138  }
139 
140  template <typename T>
141  inline void FuncStreamConsumer<T>::onValue(const T& val)
142  {
143  assert(m_onValue);
144  m_onValue(val);
145  }
146 
147 }