3 #include "ValueStream.hpp"
5 #include "BackendComponents.hpp"
7 #include <boost/intrusive/set.hpp>
35 virtual folly::Future<T> next()
override;
37 std::shared_ptr<ValueStreamSplitter<T>> splitter();
40 std::shared_ptr<ValueStreamSplitter<T>> m_source;
44 using ValueStreamChannelPtr = std::shared_ptr<ValueStreamChannel<T>>;
61 virtual folly::Future<CallbackLock<T>> next()
override;
62 folly::Future<T> nextUnwrapped();
68 std::shared_ptr<ValueStreamSplitter<T>> splitter();
71 std::shared_ptr<ValueStreamSplitter<T>> m_source;
75 using CloneableValueStreamChannelPtr = std::shared_ptr<CloneableValueStreamChannel<T>>;
84 std::shared_ptr<CloneableValueStreamChannel<T>>
90 return getSplitter(stream)->openCloneableChannel().channel();
101 template <
typename T>
105 : nextCachedValue(0), channel(ch), onTheWay(0), isCloneable(cloneable)
108 bool operator<(
const ChannelInfo& ci)
const {
return channel < ci.channel; }
109 bool operator>(
const ChannelInfo& ci)
const {
return channel > ci.channel; }
110 bool operator==(
const ChannelInfo& ci)
const {
return channel == ci.channel; }
112 size_t nextCachedValue;
114 std::queue<folly::Promise<CallbackLock<T>>> promises;
125 std::condition_variable condition;
127 const bool isCloneable;
139 template <
typename T>
148 folly::Future<ReturnType> next(
int channel);
161 void closeChannel(
int channel);
162 bool hasChannel(
int channel)
const;
169 std::shared_ptr<ValueStream<T>>
stream();
171 size_t channelsOpen()
const;
174 virtual folly::Future<T> next()
override;
179 int newChannelImpl(
int cloneTarget,
bool clone,
bool isCloneable,
180 std::unique_lock<std::recursive_mutex>& splitterLock,
181 std::unique_lock<std::mutex>& channelLock);
183 void clearUsedCache();
184 int findFreeChannel()
const;
185 void initChannel(
int channel,
bool isCloneable);
186 void launchNewQuery();
187 void receivedValue(
const T& val, std::unique_lock<std::recursive_mutex>&& channelLock);
190 boost::intrusive::set<ChannelInfo<T>> m_channels;
192 std::shared_ptr<ValueStream<T>> m_stream;
193 size_t m_futuresWaited;
195 size_t m_valueCounter;
196 size_t m_valuesHandled;
197 std::condition_variable m_valuesInOrderCondition;
198 std::mutex m_valuesHandledMutex;
200 std::deque<T> m_cachedValues;
203 mutable std::recursive_mutex m_channelsMutex;
209 template <
typename T>
210 std::shared_ptr<ValueStreamSplitter<T>>
getSplitter(ValueStreamPtr<T> stream)
215 return ptr->splitter();
217 return ptr->splitter();
219 return std::make_shared<ValueStreamSplitter<T>>(stream);
239 template <
typename Channel>
254 ChannelGuard(std::shared_ptr<Channel> channel, std::unique_lock<std::recursive_mutex>&& splitterLock,
255 std::unique_lock<std::mutex>&& channelLock);
257 ChannelGuard(ChannelGuard&& b) =
default;
258 ChannelGuard& operator=(ChannelGuard&& b);
260 ChannelGuard(
const ChannelGuard&) =
delete;
261 ChannelGuard& operator=(
const ChannelGuard&) =
delete;
263 std::shared_ptr<Channel> channel()
const;
266 std::shared_ptr<Channel> m_channel;
268 std::unique_lock<std::recursive_mutex> m_splitterLock;
270 std::unique_lock<std::mutex> m_channelLock;
280 template <
typename T>
286 CallbackLock(T&& val, std::mutex& channelMutex,
287 std::condition_variable& cv,
size_t& onTheWay,
bool useSync);
289 CallbackLock(CallbackLock&& b);
291 CallbackLock& operator=(CallbackLock&& b) =
delete;
292 CallbackLock(
const CallbackLock&) =
delete;
293 CallbackLock& operator=(
const CallbackLock&) =
delete;
297 const T& value()
const;
302 std::mutex& m_channelMutex;
303 std::condition_variable& m_condition;
314 template <
typename Stream>
315 ChannelGuard<Stream>::ChannelGuard()
320 template <
typename Stream>
321 ChannelGuard<Stream>::ChannelGuard(std::shared_ptr<Stream> channel,
322 std::unique_lock<std::recursive_mutex>&& splitterLock,
323 std::unique_lock<std::mutex>&& channelLock)
324 : m_channel(channel),
325 m_splitterLock(std::move(splitterLock)),
326 m_channelLock(std::move(channelLock))
330 template <
typename Stream>
331 ChannelGuard<Stream>& ChannelGuard<Stream>::operator=(ChannelGuard<Stream> &&b)
333 m_channel = std::move(b.m_channel);
334 m_splitterLock = std::move(b.m_splitterLock);
335 m_channelLock = std::move(b.m_channelLock);
339 template <
typename Stream>
340 std::shared_ptr<Stream> ChannelGuard<Stream>::channel()
const
351 template <
typename T>
352 CallbackLock<T>::CallbackLock(T&& val, std::mutex& channelMutex,
353 std::condition_variable& cv,
size_t& onTheWay,
355 : m_channelMutex(channelMutex),
357 m_onTheWay(onTheWay),
358 m_value(std::move(val)),
363 template <
typename T>
364 CallbackLock<T>::CallbackLock(CallbackLock &&b)
365 : m_channelMutex(b.m_channelMutex),
366 m_condition(b.m_condition),
367 m_onTheWay(b.m_onTheWay),
368 m_value(std::move(b.m_value)),
369 m_useGuard(b.m_useGuard)
371 b.m_useGuard =
false;
374 template <
typename T>
378 std::unique_lock<std::mutex> lck(m_channelMutex);
379 assert(m_onTheWay > 0);
381 bool signal = m_onTheWay == 0;
386 m_condition.notify_one();
390 template <
typename T>
402 template <
typename T>
403 ValueStreamChannel<T>::ValueStreamChannel(
int channel, std::shared_ptr<ValueStreamSplitter<T>> source)
410 template <
typename T>
411 ValueStreamChannel<T>::~ValueStreamChannel()
413 m_source->closeChannel(m_channel);
416 template <
typename T>
417 folly::Future<T> ValueStreamChannel<T>::next()
419 auto unwrap = [] (
const CallbackLock<T>& cbl) -> T {
422 return m_source->next(m_channel).thenValue(unwrap);
425 template <
typename T>
426 std::shared_ptr<ValueStreamSplitter<T>> ValueStreamChannel<T>::splitter()
437 template <
typename T>
438 CloneableValueStreamChannel<T>::
439 CloneableValueStreamChannel(
int channel, std::shared_ptr<ValueStreamSplitter<T>> source)
446 template <
typename T>
447 CloneableValueStreamChannel<T>::~CloneableValueStreamChannel()
449 m_source->closeChannel(m_channel);
452 template <
typename T>
453 folly::Future<CallbackLock<T>> CloneableValueStreamChannel<T>::next()
455 return m_source->next(m_channel);
458 template <
typename T>
459 std::shared_ptr<ValueStreamSplitter<T>>
460 CloneableValueStreamChannel<T>::splitter()
465 template <
typename T>
466 folly::Future<T> CloneableValueStreamChannel<T>::nextUnwrapped()
468 auto unwrap = [] (
const CallbackLock<T>& cbl) -> T {
471 return m_source->next(m_channel).thenValue(unwrap);
474 template <
typename T>
477 return m_source->cloneChannel(m_channel);
480 template <
typename T>
483 return m_source->cloneChannel2(m_channel);
492 template <
typename T>
496 return channel < ci.channel;
500 return ci.channel < channel;
505 template <
typename T>
510 if(ci->isCloneable) {
514 if(ci->onTheWay > 0) {
515 ci->condition.wait(g, [ci] {
return ci->onTheWay == 0; });
517 assert(ci->onTheWay == 0);
530 template <
typename T>
531 ValueStreamSplitter<T>::ValueStreamSplitter(ValueStreamPtr<T> stream)
539 template <
typename T>
540 ValueStreamSplitter<T>::~ValueStreamSplitter()
542 std::lock_guard<std::recursive_mutex> g(m_channelsMutex);
544 m_channels.clear_and_dispose(ChannelInfoDisposer<T>());
547 template <
typename T>
548 folly::Future<CallbackLock<T>>
551 folly::Future<ReturnType> future = folly::makeFuture<ReturnType>(
552 folly::exception_wrapper(std::runtime_error(
"empty")));
554 bool newQuery =
false;
555 std::unique_lock<std::recursive_mutex> channelsLock(m_channelsMutex);
558 assert(cit != m_channels.end());
561 std::unique_lock<std::mutex> lck(ci.channelMutex);
563 if(ci.nextCachedValue < m_cachedValues.size()) {
564 T val = m_cachedValues[ci.nextCachedValue++];
566 channelsLock.unlock();
568 assert(ci.promises.empty());
573 ci.condition, ci.onTheWay, ci.isCloneable);
574 return folly::makeFuture(std::move(cbl));
577 assert(ci.promises.size() <= m_futuresWaited);
578 if(m_futuresWaited == ci.promises.size()) {
583 folly::Promise<ReturnType> promise;
584 future = promise.getFuture();
585 ci.promises.emplace(std::move(promise));
596 template <
typename T>
599 std::unique_lock<std::recursive_mutex> lock(m_channelsMutex);
600 std::unique_lock<std::mutex> dummy;
604 template <
typename T>
607 return newNonCloneableChannel(0,
false);
610 template <
typename T>
614 return newCloneableChannel(0,
false);
617 template <
typename T>
620 return newNonCloneableChannel(target,
false);
623 template <
typename T>
627 return newCloneableChannel(target,
true);
630 template <
typename T>
631 void ValueStreamSplitter<T>::closeChannel(
int channel)
633 std::lock_guard<std::recursive_mutex> g(m_channelsMutex);
634 auto it = m_channels.find(channel, ChannelInfoCmp<T>());
635 assert(it != m_channels.end());
636 m_channels.erase_and_dispose(it, ChannelInfoDisposer<T>());
639 template <
typename T>
640 bool ValueStreamSplitter<T>::hasChannel(
int channel)
const
642 std::lock_guard<std::recursive_mutex> g(m_channelsMutex);
643 return m_channels.find(channel, ChannelInfoCmp<T>()) == m_channels.end();
646 template <
typename T>
649 assert(m_channels.empty());
653 template <
typename T>
656 std::lock_guard<std::recursive_mutex> g(m_channelsMutex);
657 return m_channels.size();
660 template <
typename T>
661 folly::Future<T> ValueStreamSplitter<T>::next()
663 return next(0).thenValue([] (
const ReturnType& p) -> T {
return p.value(); });
666 template <
typename T>
667 ChannelGuard<ValueStreamChannel<T>>
668 ValueStreamSplitter<T>::newNonCloneableChannel(
int cloneTarget,
bool clone)
670 std::unique_lock<std::recursive_mutex> splitterLock;
671 std::unique_lock<std::mutex> channelLock;
672 int channel = newChannelImpl(cloneTarget, clone,
false, splitterLock, channelLock);
674 auto me = std::static_pointer_cast<ValueStreamSplitter<T>>(this->shared_from_this());
675 auto stream = std::make_shared<ValueStreamChannel<T>>(channel, me);
677 return ChannelGuard<ValueStreamChannel<T>>(stream, std::move(splitterLock), std::move(channelLock));
680 template <
typename T>
681 ChannelGuard<CloneableValueStreamChannel<T>>
682 ValueStreamSplitter<T>::newCloneableChannel(
int cloneTarget,
bool clone)
684 std::unique_lock<std::recursive_mutex> splitterLock;
685 std::unique_lock<std::mutex> channelLock;
686 int channel = newChannelImpl(cloneTarget, clone,
true, splitterLock, channelLock);
688 auto me = std::static_pointer_cast<ValueStreamSplitter<T>>(this->shared_from_this());
689 auto stream = std::make_shared<CloneableValueStreamChannel<T>>(channel, me);
691 return ChannelGuard<CloneableValueStreamChannel<T>>(stream, std::move(splitterLock), std::move(channelLock));
694 template <
typename T>
695 int ValueStreamSplitter<T>::
696 newChannelImpl(
int target,
bool clone,
bool isCloneable,
697 std::unique_lock<std::recursive_mutex>& splitterLock,
698 std::unique_lock<std::mutex>& channelLock)
700 splitterLock = std::unique_lock<std::recursive_mutex>(m_channelsMutex);
702 int channel = findFreeChannel();
704 initChannel(channel, isCloneable);
706 auto itt = m_channels.find(channel, ChannelInfoCmp<T>());
707 assert(itt != m_channels.end());
708 ChannelInfo<T>& newChannel = *itt;
711 assert(m_channels.find(target, ChannelInfoCmp<T>()) != m_channels.end());
712 auto it = m_channels.find(target, ChannelInfoCmp<T>());
713 ChannelInfo<T>& ci = *it;
715 assert(ci.isCloneable);
716 channelLock = std::unique_lock<std::mutex>(ci.channelMutex);
718 size_t& onTheWay = ci.onTheWay;
719 ci.condition.wait(channelLock, [&onTheWay] {
return onTheWay == 0; });
721 newChannel.nextCachedValue = ci.nextCachedValue;
723 assert(ci.promises.size() <= m_futuresWaited);
725 newChannel.nextCachedValue = m_cachedValues.size();
730 template <
typename T>
731 void ValueStreamSplitter<T>::clearUsedCache()
734 for(
auto it = m_channels.begin(); it != m_channels.end(); ++it) {
735 it->channelMutex.lock();
738 size_t minNextValue = m_cachedValues.size();
740 for(
auto it = m_channels.begin(); it != m_channels.end(); ++it) {
741 assert(it->nextCachedValue <= m_cachedValues.size());
742 minNextValue = std::min(it->nextCachedValue, minNextValue);
745 auto updateIndices = [&] {
746 for(
auto it = m_channels.begin(); it != m_channels.end(); ++it) {
747 assert(it->nextCachedValue >= minNextValue);
748 it->nextCachedValue -= minNextValue;
753 if(minNextValue > 0 && minNextValue == m_cachedValues.size()) {
754 m_cachedValues.clear();
756 }
else if(minNextValue > 0) {
757 assert(minNextValue < m_cachedValues.size());
758 auto eraseIt = m_cachedValues.begin();
759 std::advance(eraseIt, minNextValue);
760 m_cachedValues.erase(m_cachedValues.begin(), eraseIt);
764 for(
auto it = m_channels.begin(); it != m_channels.end(); ++it) {
765 it->channelMutex.unlock();
769 template <
typename T>
770 int ValueStreamSplitter<T>::findFreeChannel()
const
777 auto it = m_channels.begin();
778 if(it == m_channels.end()) {
780 }
else if (it->channel > 0) {
781 return it->channel - 1;
783 auto itt = m_channels.rbegin();
784 assert(itt != m_channels.rend());
785 return itt->channel + 1;
789 template <
typename T>
790 void ValueStreamSplitter<T>::initChannel(
int channel,
bool isCloneable)
792 typename boost::intrusive::set<ChannelInfo<T>>::insert_commit_data insert_data;
793 auto ins_pair = m_channels.insert_check(channel, ChannelInfoCmp<T>(), insert_data);
795 assert(ins_pair.second);
796 m_channels.insert_commit(*
new ChannelInfo<T>(channel, isCloneable), insert_data);
799 template <
typename T>
800 void ValueStreamSplitter<T>::launchNewQuery()
802 std::weak_ptr<ValueStreamSplitter<T>> weak =
803 std::static_pointer_cast<ValueStreamSplitter<T>>(this->shared_from_this());
805 folly::Future<T> fut = folly::makeFuture<T>(
806 folly::exception_wrapper(std::runtime_error(
"Empty future")));
810 std::lock_guard<std::recursive_mutex> g(m_channelsMutex);
812 count = m_valueCounter;
814 fut = m_stream->next();
817 std::move(fut).thenValue([weak, count](
const T& val) {
818 auto me = weak.lock();
823 std::unique_lock<std::mutex> valueMutex(me->m_valuesHandledMutex);
824 me->m_valuesInOrderCondition.wait(valueMutex, [count, me] {
825 return count == me->m_valuesHandled;
828 std::unique_lock<std::recursive_mutex> channelLock(me->m_channelsMutex);
830 me->receivedValue(val, std::move(channelLock));
832 me->m_valuesInOrderCondition.notify_all();
836 template <
typename T>
837 void ValueStreamSplitter<T>::receivedValue(
const T& val, std::unique_lock<std::recursive_mutex>&& channelLock)
841 assert(m_futuresWaited > 0);
844 m_cachedValues.push_back(val);
846 std::set<int> channels;
847 for(
auto it = m_channels.begin(); it != m_channels.end(); ++it) {
848 channels.insert(it->channel);
854 for(
auto it = channels.begin(); it != channels.end(); ++it) {
856 auto cit = m_channels.find(*it, ChannelInfoCmp<T>());
857 if(cit == m_channels.end())
860 ChannelInfo<T>& ci = *cit;
862 std::unique_lock<std::mutex> lck(ci.channelMutex);
864 assert(ci.promises.size() <= m_futuresWaited + 1);
866 if(!ci.promises.empty()) {
867 size_t next = ci.nextCachedValue++;
868 assert(ci.nextCachedValue == m_cachedValues.size() );
872 auto promise = std::move(ci.promises.front());
875 T value = m_cachedValues[next];
876 auto& mutex = ci.channelMutex;
877 auto& cond = ci.condition;
878 auto& onTheWay = ci.onTheWay;
879 bool cloneable = ci.isCloneable;
883 auto cblock = CallbackLock<T>(std::move(value), mutex, cond, onTheWay, cloneable);
886 promise.setValue(std::move(cblock));
889 std::lock_guard<std::mutex> g(m_valuesHandledMutex);
891 channelLock.unlock();