5 #include "BackendComponents.hpp"
8 #include <folly/MoveWrapper.h>
26 virtual QString protocol()
const = 0;
40 typedef std::shared_ptr<GraphGenerator> GraphGeneratorPtr;
52 virtual QString protocol()
const = 0;
66 virtual bool init(
const GraphEdge& e);
76 bool isEdgeGenerator()
const;
77 const NodeId& sourceNode()
const;
78 const NodeId& targetNode()
const;
79 const EdgeId& edgeId()
const;
83 bool m_isEdgeGenerator;
85 typedef std::shared_ptr<PropertyGenerator> PropertyGeneratorPtr;
92 template <
typename EventGenerator>
96 typedef typename EventGenerator::ValueType Event;
100 virtual QString protocol()
const override {
return QString(); }
102 virtual folly::Future<typename EventGenerator::ValueType>
103 next(folly::Executor* executor =
nullptr)
override
105 int sent = m_eventsSent++;
108 return folly::via(executor, [] {
return Event(); });
110 return folly::makeFuture(Event());
114 m_promise.setValue(Event());
116 m_promise = folly::Promise<typename EventGenerator::ValueType>();
117 return m_promise.getFuture();
123 folly::Promise<typename EventGenerator::ValueType> m_promise;
134 virtual void changeObserved() = 0;
135 virtual ~Notifiable() =
default;
154 template <
typename GeneratorType>
158 static_assert(IsGenerator<GeneratorType>::value,
159 "Need to use Generator or its subclass as a type "
160 "parameter for SynchronousGenerator.");
161 typedef typename GeneratorType::ValueType ValueType;
163 virtual folly::Future<ValueType>
next(folly::Executor* executor =
nullptr)
override;
167 virtual bool hasEventsInCache()
const = 0;
168 virtual ValueType nextEvent() = 0;
169 virtual void updateCache() = 0;
177 Promised(folly::Promise<ValueType>&& p, folly::Executor* exec)
178 : promise(std::move(p)), executor(exec) {}
179 Promised(Promised&& other) =
default;
180 folly::Promise<ValueType> promise;
181 folly::Executor* executor =
nullptr;
183 std::queue<Promised> m_promises;
185 bool m_pendingCacheUpdate =
true;
190 template <
typename GeneratorType>
191 folly::Future<typename GeneratorType::ValueType>
195 auto ptr = this->shared_from_this();
197 std::shared_ptr<Me> me = std::dynamic_pointer_cast<Me>(ptr);
201 return folly::via(executor, [] {
return ValueType(); });
203 return folly::makeFuture<ValueType>(ValueType());
206 std::weak_ptr<Me> weak = me;
209 std::shared_ptr<Me> me = weak.lock();
213 std::unique_lock<std::mutex> lck(me->m_mutex);
215 if(!me->hasEventsInCache() && me->m_pendingCacheUpdate) {
216 me->m_pendingCacheUpdate =
false;
220 if(me->hasEventsInCache() && !me->m_promises.empty()) {
221 auto promised = std::move(me->m_promises.front());
222 me->m_promises.pop();
226 auto val = me->nextEvent();
228 promised.promise.setValue(val);
233 folly::Future<ValueType> fut(folly::exception_wrapper(std::runtime_error(
"empty")));
235 std::lock_guard<std::mutex> g(m_mutex);
236 folly::Promise<ValueType> promise;
237 fut = promise.getFuture();
238 Promised p(std::move(promise), executor);
239 m_promises.emplace(std::move(p));
250 template <
typename GeneratorType>
253 std::vector<Promised> promisesToFulfill;
254 std::vector<ValueType> valuesForPromises;
257 std::unique_lock<std::mutex> lck(m_mutex);
258 m_pendingCacheUpdate =
true;
265 if(!hasEventsInCache() && !m_promises.empty()) {
268 while(hasEventsInCache() && !m_promises.empty()) {
269 auto promised = std::move(m_promises.front());
271 promisesToFulfill.emplace_back(std::move(promised));
272 valuesForPromises.emplace_back(std::move(nextEvent()));
277 for(
size_t i = 0; i < promisesToFulfill.size(); ++i) {
278 Promised p = std::move(promisesToFulfill[i]);
279 ValueType val = std::move(valuesForPromises[i]);
281 p.promise.setValue(std::move(val));
283 folly::MoveWrapper<folly::Promise<ValueType>> promise = folly::makeMoveWrapper(std::move(p.promise));
284 folly::MoveWrapper<ValueType> v = folly::makeMoveWrapper(std::move(val));
285 folly::via(p.executor, [promise, v] ()
mutable {
286 auto& unwrapped = *promise;
287 unwrapped.setValue(v.move());