SourceXtractorPlusPlus 1.0.3
SourceXtractor++, the next generation SExtractor
Loading...
Searching...
No Matches
MultithreadedMeasurement.cpp
Go to the documentation of this file.
1
17/*
18 * MultiThreadedMeasurement.cpp
19 *
20 * Created on: May 23, 2018
21 * Author: mschefer
22 */
23
24#include <chrono>
25#include <ElementsKernel/Logging.h>
26#include <csignal>
27
30
31using namespace SourceXtractor;
32
34
35
41
45
47 m_input_done = true;
48 m_thread_pool->block();
49 m_output_thread->join();
50 logger.debug() << "All worker threads done!";
51}
52
54 // Wait until all worker threads are done
55 m_thread_pool->block();
56
57 // Wait until the output queue is empty
58 while (true) {
59 {
61 if (m_output_queue.empty()) {
62 break;
63 }
64 else if (m_thread_pool->checkForException(false)) {
65 logger.fatal() << "An exception was thrown from a worker thread";
66 m_thread_pool->checkForException(true);
67 }
68 else if (m_thread_pool->activeThreads() == 0) {
69 throw Elements::Exception() << "No active threads and the queue is not empty! Please, report this as a bug";
70 }
71 }
73 }
74}
75
77 // Force computation of SourceID here, where the order is still deterministic
78 for (auto& source : *source_group) {
79 source.getProperty<SourceID>();
80 }
81
82 // Put the new SourceGroup into the input queue
83 auto order_number = m_group_counter;
84 auto lambda = [this, order_number, source_group = std::move(source_group)]() mutable {
85 // Trigger measurements
86 for (auto& source : *source_group) {
87 m_source_to_row(source);
88 }
89 // Pass to the output thread
90 {
92 m_output_queue.emplace_back(order_number, std::move(source_group));
93 }
94 m_new_output.notify_one();
95 };
96 auto lambda_copyable = [lambda = std::make_shared<decltype(lambda)>(std::move(lambda))](){
97 (*lambda)();
98 };
99 m_thread_pool->submit(lambda_copyable);
101}
102
104 logger.debug() << "Starting output thread";
105 try {
106 measurement->outputThreadLoop();
107 }
108 catch (const Elements::Exception& e) {
109 logger.fatal() << "Output thread got an exception!";
110 logger.fatal() << e.what();
111 if (!measurement->m_abort_raised.exchange(true)) {
112 logger.fatal() << "Aborting the execution";
113 ::raise(SIGTERM);
114 }
115 }
116 logger.debug() << "Stopping output thread";
117}
118
120 while (m_thread_pool->activeThreads() > 0) {
122
123 // Wait for something in the output queue
124 if (m_output_queue.empty()) {
125 m_new_output.wait_for(output_lock, std::chrono::milliseconds(100));
126 }
127
128 // Process the output queue
129 while (!m_output_queue.empty()) {
130 sendSource(std::move(m_output_queue.front().second));
131 m_output_queue.pop_front();
132 }
133
134 if (m_input_done && m_thread_pool->running() + m_thread_pool->queued() == 0 &&
135 m_output_queue.empty()) {
136 break;
137 }
138 }
139}
140
static Elements::Logging logger
static Logging getLogger(const std::string &name="")
std::shared_ptr< Euclid::ThreadPool > m_thread_pool
void receiveSource(std::unique_ptr< SourceGroupInterface > source_group) override
MultithreadedMeasurement(SourceToRowConverter source_to_row, const std::shared_ptr< Euclid::ThreadPool > &thread_pool, unsigned max_queue_size)
std::list< std::pair< int, std::unique_ptr< SourceGroupInterface > > > m_output_queue
void receiveProcessSignal(const ProcessSourcesEvent &event) override
static void outputThreadStatic(MultithreadedMeasurement *measurement)
void sendProcessSignal(const ProcessSourcesEvent &event) const
void sendSource(std::unique_ptr< SourceGroupInterface > source) const
T make_shared(T... args)
T move(T... args)
static Elements::Logging logger
std::unique_ptr< T > make_unique(Args &&... args)
T raise(T... args)
T sleep_for(T... args)
Event received by SourceGrouping to request the processing of some of the Sources stored.