5 #include "ValueStream.hpp"
11 class StreamConsumer :
public std::enable_shared_from_this<StreamConsumer<T>>
17 StreamConsumer(ValueStreamPtr<T> stream);
18 virtual ~StreamConsumer();
22 virtual void onValue(
const T& val) = 0;
27 void pauseConsumption();
28 void continueConsumption();
34 ValueStreamPtr<T> m_stream;
38 using StreamConsumerPtr = std::shared_ptr<StreamConsumer<T>>;
45 class FuncStreamConsumer :
public StreamConsumer<T>
48 FuncStreamConsumer(ValueStreamPtr<T> stream,
49 std::function<
void(
const T&)> onValue);
50 virtual void onValue(
const T& val)
override;
53 std::function<void(const T&)> m_onValue;
61 StreamConsumer<T>::StreamConsumer(ValueStreamPtr<T> stream)
69 StreamConsumer<T>::~StreamConsumer()
74 void StreamConsumer<T>::init()
80 void StreamConsumer<T>::pauseConsumption()
86 void StreamConsumer<T>::continueConsumption()
88 bool didConsume = m_consume;
95 void StreamConsumer<T>::initListener()
97 std::weak_ptr<StreamConsumer<T>> weak = this->shared_from_this();
98 auto func = [weak](T&& ev) {
99 auto me = weak.lock();
106 auto tmp = std::move(ev);
112 auto me = weak.lock();
113 if(me && me->m_consume) {
125 m_stream->next().thenValue(func);
132 template <
typename T>
133 FuncStreamConsumer<T>::FuncStreamConsumer(ValueStreamPtr<T> stream,
134 std::function<
void(
const T&)> onValue)
135 : StreamConsumer<T>(stream),
140 template <
typename T>
141 inline void FuncStreamConsumer<T>::onValue(
const T& val)