SourceXtractorPlusPlus
1.0.3
SourceXtractor++, the next generation SExtractor
Loading...
Searching...
No Matches
SEImplementation
SEImplementation
Measurement
MultithreadedMeasurement.h
Go to the documentation of this file.
1
17
/*
18
* Multithreadedmeasurement.h
19
*
20
* Created on: May 17, 2018
21
* Author: mschefer
22
*/
23
24
#ifndef _SEIMPLEMENTATION_OUTPUT_MULTITHREADEDMEASUREMENT_H_
25
#define _SEIMPLEMENTATION_OUTPUT_MULTITHREADEDMEASUREMENT_H_
26
27
#include <atomic>
28
#include <thread>
29
#include <mutex>
30
#include <condition_variable>
31
#include <atomic>
32
#include "AlexandriaKernel/ThreadPool.h"
33
#include "AlexandriaKernel/Semaphore.h"
34
#include "
SEFramework/Pipeline/Measurement.h
"
35
36
namespace
SourceXtractor
{
37
38
class
MultithreadedMeasurement
:
public
Measurement
{
39
public
:
40
41
using
SourceToRowConverter
=
std::function
<
Euclid::Table::Row
(
const
SourceInterface
&)>;
42
MultithreadedMeasurement
(
SourceToRowConverter
source_to_row,
const
std::shared_ptr<Euclid::ThreadPool>
& thread_pool,
43
unsigned
max_queue_size)
44
:
m_source_to_row
(source_to_row),
45
m_thread_pool
(thread_pool),
46
m_group_counter
(0),
47
m_input_done
(false),
m_abort_raised
(false),
m_semaphore
(max_queue_size) {}
48
49
~MultithreadedMeasurement
()
override
;
50
51
void
receiveSource
(
std::unique_ptr<SourceGroupInterface>
source_group)
override
;
52
void
receiveProcessSignal
(
const
ProcessSourcesEvent
& event)
override
;
53
54
void
startThreads
()
override
;
55
void
stopThreads
()
override
;
56
void
synchronizeThreads
()
override
;
57
58
private
:
59
static
void
outputThreadStatic
(
MultithreadedMeasurement
* measurement);
60
void
outputThreadLoop
();
61
62
SourceToRowConverter
m_source_to_row
;
63
std::shared_ptr<Euclid::ThreadPool>
m_thread_pool
;
64
std::unique_ptr<std::thread>
m_output_thread
;
65
66
int
m_group_counter
;
67
std::atomic_bool
m_input_done
,
m_abort_raised
;
68
69
std::condition_variable
m_new_output
;
70
std::list<std::pair<int, std::unique_ptr<SourceGroupInterface>
>>
m_output_queue
;
71
std::mutex
m_output_queue_mutex
;
72
Euclid::Semaphore
m_semaphore
;
73
};
74
75
}
76
77
#endif
/* _SEIMPLEMENTATION_OUTPUT_MULTITHREADEDMEASUREMENT_H_ */
Measurement.h
Euclid::Semaphore
Euclid::Table::Row
SourceXtractor::Measurement
Definition
Measurement.h:34
SourceXtractor::MultithreadedMeasurement
Definition
MultithreadedMeasurement.h:38
SourceXtractor::MultithreadedMeasurement::m_output_thread
std::unique_ptr< std::thread > m_output_thread
Definition
MultithreadedMeasurement.h:64
SourceXtractor::MultithreadedMeasurement::m_new_output
std::condition_variable m_new_output
Definition
MultithreadedMeasurement.h:69
SourceXtractor::MultithreadedMeasurement::m_abort_raised
std::atomic_bool m_abort_raised
Definition
MultithreadedMeasurement.h:67
SourceXtractor::MultithreadedMeasurement::~MultithreadedMeasurement
~MultithreadedMeasurement() override
Definition
MultithreadedMeasurement.cpp:36
SourceXtractor::MultithreadedMeasurement::SourceToRowConverter
std::function< Euclid::Table::Row(const SourceInterface &)> SourceToRowConverter
Definition
MultithreadedMeasurement.h:41
SourceXtractor::MultithreadedMeasurement::m_input_done
std::atomic_bool m_input_done
Definition
MultithreadedMeasurement.h:67
SourceXtractor::MultithreadedMeasurement::m_output_queue_mutex
std::mutex m_output_queue_mutex
Definition
MultithreadedMeasurement.h:71
SourceXtractor::MultithreadedMeasurement::outputThreadLoop
void outputThreadLoop()
Definition
MultithreadedMeasurement.cpp:119
SourceXtractor::MultithreadedMeasurement::m_thread_pool
std::shared_ptr< Euclid::ThreadPool > m_thread_pool
Definition
MultithreadedMeasurement.h:63
SourceXtractor::MultithreadedMeasurement::synchronizeThreads
void synchronizeThreads() override
Definition
MultithreadedMeasurement.cpp:53
SourceXtractor::MultithreadedMeasurement::receiveSource
void receiveSource(std::unique_ptr< SourceGroupInterface > source_group) override
Definition
MultithreadedMeasurement.cpp:76
SourceXtractor::MultithreadedMeasurement::MultithreadedMeasurement
MultithreadedMeasurement(SourceToRowConverter source_to_row, const std::shared_ptr< Euclid::ThreadPool > &thread_pool, unsigned max_queue_size)
Definition
MultithreadedMeasurement.h:42
SourceXtractor::MultithreadedMeasurement::m_group_counter
int m_group_counter
Definition
MultithreadedMeasurement.h:66
SourceXtractor::MultithreadedMeasurement::m_semaphore
Euclid::Semaphore m_semaphore
Definition
MultithreadedMeasurement.h:72
SourceXtractor::MultithreadedMeasurement::m_output_queue
std::list< std::pair< int, std::unique_ptr< SourceGroupInterface > > > m_output_queue
Definition
MultithreadedMeasurement.h:70
SourceXtractor::MultithreadedMeasurement::stopThreads
void stopThreads() override
Definition
MultithreadedMeasurement.cpp:46
SourceXtractor::MultithreadedMeasurement::receiveProcessSignal
void receiveProcessSignal(const ProcessSourcesEvent &event) override
Definition
MultithreadedMeasurement.cpp:141
SourceXtractor::MultithreadedMeasurement::m_source_to_row
SourceToRowConverter m_source_to_row
Definition
MultithreadedMeasurement.h:62
SourceXtractor::MultithreadedMeasurement::outputThreadStatic
static void outputThreadStatic(MultithreadedMeasurement *measurement)
Definition
MultithreadedMeasurement.cpp:103
SourceXtractor::MultithreadedMeasurement::startThreads
void startThreads() override
Definition
MultithreadedMeasurement.cpp:42
SourceXtractor::SourceInterface
The SourceInterface is an abstract "source" that has properties attached to it.
Definition
SourceInterface.h:46
std::condition_variable
std::function
std::list
std::mutex
SourceXtractor
Definition
Aperture.h:30
std::shared_ptr
SourceXtractor::ProcessSourcesEvent
Event received by SourceGrouping to request the processing of some of the Sources stored.
Definition
PipelineStage.h:33
std::unique_ptr
Generated by
1.14.0