SourceXtractorPlusPlus
1.0.3
SourceXtractor++, the next generation SExtractor
Loading...
Searching...
No Matches
SEImplementation
src
lib
Prefetcher
Prefetcher.cpp
Go to the documentation of this file.
1
17
18
#include <ElementsKernel/Logging.h>
19
#include "AlexandriaKernel/memory_tools.h"
20
#include "
SEImplementation/Prefetcher/Prefetcher.h
"
21
22
static
Elements::Logging
logger
=
Elements::Logging::getLogger
(
"Prefetcher"
);
23
24
25
namespace
SourceXtractor
{
26
30
template
<
typename
Lock>
31
struct
ReverseLock
{
32
explicit
ReverseLock
(Lock&
lock
) :
m_lock
(
lock
) {
33
m_lock
.unlock();
34
}
35
36
~ReverseLock
() {
37
m_lock
.lock();
38
}
39
40
private
:
41
Lock&
m_lock
;
42
};
43
44
Prefetcher::Prefetcher
(
const
std::shared_ptr<Euclid::ThreadPool>
& thread_pool,
unsigned
max_queue_size)
45
:
m_thread_pool
(thread_pool),
m_stop
(false),
m_semaphore
(max_queue_size) {
46
m_output_thread
=
Euclid::make_unique<std::thread>
(&
Prefetcher::outputLoop
,
this
);
47
}
48
49
Prefetcher::~Prefetcher
() {
50
if
(
m_output_thread
->joinable())
51
wait
();
52
}
53
54
void
Prefetcher::receiveSource
(
std::unique_ptr<SourceInterface>
message) {
55
m_semaphore
.acquire();
56
57
intptr_t
source_addr =
reinterpret_cast<
intptr_t
>
(message.
get
());
58
{
59
std::lock_guard<std::mutex>
queue_lock(
m_queue_mutex
);
60
m_received
.emplace_back(
EventType::SOURCE
, source_addr);
61
}
62
63
// Pre-fetch in separate threads
64
auto
lambda = [
this
, source_addr, message =
std::move
(message)]()
mutable
{
65
for
(
auto
& prop :
m_prefetch_set
) {
66
message->getProperty(prop);
67
}
68
{
69
std::lock_guard<std::mutex>
lock
(
m_queue_mutex
);
70
m_finished_sources
.emplace(source_addr,
std::move
(message));
71
}
72
m_new_output
.notify_one();
73
};
74
auto
lambda_copyable = [lambda =
std::make_shared<decltype(lambda)>
(
std::move
(lambda))](){
75
(*lambda)();
76
};
77
m_thread_pool
->submit(lambda_copyable);
78
}
79
80
void
Prefetcher::requestProperty
(
const
PropertyId
& property_id) {
81
m_prefetch_set
.emplace(property_id);
82
logger
.debug() <<
"Requesting prefetch of "
<< property_id.
getString
();
83
}
84
85
void
Prefetcher::outputLoop
() {
86
logger
.debug() <<
"Starting prefetcher output loop"
;
87
88
while
(
m_thread_pool
->activeThreads() > 0) {
89
std::unique_lock<std::mutex>
output_lock(
m_queue_mutex
);
90
91
// Wait for something new
92
m_new_output
.wait_for(output_lock,
std::chrono::milliseconds
(1000));
93
94
// Process the output queue
95
// This is, release sources when the front of the received has been processed
96
while
(!
m_received
.empty()) {
97
auto
next
=
m_received
.front();
98
// If the front is a ProcessSourceEvent, everything received before is done,
99
// so pass downstream
100
if
(
next
.m_event_type ==
EventType::PROCESS_SOURCE
) {
101
auto
event
=
m_event_queue
.front();
102
m_event_queue
.pop_front();
103
logger
.debug() <<
"ProcessSourceEvent released"
;
104
{
105
ReverseLock
<
decltype
(output_lock)> release_lock(output_lock);
106
sendProcessSignal
(event);
107
}
108
m_received
.pop_front();
109
continue
;
110
}
111
// Find if the matching source is done
112
auto
processed =
m_finished_sources
.find(
next
.m_source_addr);
113
// If not, we can't keep going, so exit here
114
if
(processed ==
m_finished_sources
.end()) {
115
logger
.debug() <<
"Next source "
<<
next
.m_source_addr <<
" not done yet"
;
116
break
;
117
}
118
// If it is, send it downstream
119
logger
.debug() <<
"Source "
<<
next
.m_source_addr <<
" sent downstream"
;
120
{
121
ReverseLock
<
decltype
(output_lock)> release_lock(output_lock);
122
sendSource
(
std::move
(processed->second));
123
}
124
m_finished_sources
.erase(processed);
125
m_received
.pop_front();
126
m_semaphore
.release();
127
}
128
129
if
(
m_stop
&&
m_received
.empty()) {
130
break
;
131
}
132
}
133
logger
.debug() <<
"Stopping prefetcher output loop"
;
134
}
135
136
void
Prefetcher::receiveProcessSignal
(
const
ProcessSourcesEvent
& message) {
137
{
138
std::lock_guard<std::mutex>
output_lock(
m_queue_mutex
);
139
m_received
.emplace_back(
EventType::PROCESS_SOURCE
);
140
m_event_queue
.emplace_back(message);
141
}
142
m_new_output
.notify_one();
143
logger
.debug() <<
"ProcessSourceEvent received"
;
144
}
145
146
void
Prefetcher::wait
() {
147
m_stop
=
true
;
148
m_output_thread
->join();
149
}
150
151
void
Prefetcher::synchronize
() {
152
// Wait until the output queue is empty
153
while
(
true
) {
154
{
155
std::unique_lock<std::mutex>
output_lock(
m_queue_mutex
);
156
if
(
m_received
.empty()) {
157
break
;
158
}
159
else
if
(
m_thread_pool
->checkForException(
false
)) {
160
logger
.fatal() <<
"An exception was thrown from a worker thread"
;
161
m_thread_pool
->checkForException(
true
);
162
}
163
else
if
(
m_thread_pool
->activeThreads() == 0) {
164
throw
Elements::Exception
() <<
"No active threads and the queue is not empty! Please, report this as a bug"
;
165
}
166
}
167
std::this_thread::sleep_for
(
std::chrono::milliseconds
(100));
168
}
169
}
170
171
}
// end of namespace SourceXtractor
logger
static Elements::Logging logger
Prefetcher.h
Elements::Exception
Elements::Logging
Elements::Logging::getLogger
static Logging getLogger(const std::string &name="")
SourceXtractor::PipelineEmitter< SourceInterface >::sendProcessSignal
void sendProcessSignal(const ProcessSourcesEvent &event) const
Definition
PipelineStage.h:92
SourceXtractor::PipelineEmitter< SourceInterface >::sendSource
void sendSource(std::unique_ptr< SourceInterface > source) const
Definition
PipelineStage.h:85
SourceXtractor::Prefetcher::requestProperty
void requestProperty(const PropertyId &property_id)
Definition
Prefetcher.cpp:80
SourceXtractor::Prefetcher::receiveProcessSignal
void receiveProcessSignal(const ProcessSourcesEvent &event) override
Definition
Prefetcher.cpp:136
SourceXtractor::Prefetcher::m_event_queue
std::deque< ProcessSourcesEvent > m_event_queue
Queue of received ProcessSourceEvent, order preserved.
Definition
Prefetcher.h:119
SourceXtractor::Prefetcher::m_thread_pool
std::shared_ptr< Euclid::ThreadPool > m_thread_pool
Pointer to the pool of worker threads.
Definition
Prefetcher.h:109
SourceXtractor::Prefetcher::m_queue_mutex
std::mutex m_queue_mutex
Definition
Prefetcher.h:123
SourceXtractor::Prefetcher::outputLoop
void outputLoop()
Definition
Prefetcher.cpp:85
SourceXtractor::Prefetcher::Prefetcher
Prefetcher(const std::shared_ptr< Euclid::ThreadPool > &thread_pool, unsigned max_queue_size)
Definition
Prefetcher.cpp:44
SourceXtractor::Prefetcher::m_new_output
std::condition_variable m_new_output
Notifies there is a new source done processing.
Definition
Prefetcher.h:115
SourceXtractor::Prefetcher::m_received
std::deque< EventType > m_received
Queue of type of received events. Used to pass downstream events respecting the received order.
Definition
Prefetcher.h:121
SourceXtractor::Prefetcher::wait
void wait()
Definition
Prefetcher.cpp:146
SourceXtractor::Prefetcher::m_semaphore
Euclid::Semaphore m_semaphore
Keep the queue under control.
Definition
Prefetcher.h:129
SourceXtractor::Prefetcher::m_prefetch_set
std::set< PropertyId > m_prefetch_set
Properties to prefetch.
Definition
Prefetcher.h:111
SourceXtractor::Prefetcher::m_finished_sources
std::map< intptr_t, std::unique_ptr< SourceInterface > > m_finished_sources
Finished sources.
Definition
Prefetcher.h:117
SourceXtractor::Prefetcher::m_output_thread
std::unique_ptr< std::thread > m_output_thread
Orchestration thread.
Definition
Prefetcher.h:113
SourceXtractor::Prefetcher::receiveSource
void receiveSource(std::unique_ptr< SourceInterface > source) override
Definition
Prefetcher.cpp:54
SourceXtractor::Prefetcher::synchronize
void synchronize()
Definition
Prefetcher.cpp:151
SourceXtractor::Prefetcher::~Prefetcher
virtual ~Prefetcher()
Definition
Prefetcher.cpp:49
SourceXtractor::Prefetcher::m_stop
std::atomic_bool m_stop
Termination condition for the output loop.
Definition
Prefetcher.h:126
SourceXtractor::PropertyId
Identifier used to set and retrieve properties.
Definition
PropertyId.h:40
SourceXtractor::PropertyId::getString
std::string getString() const
Definition
PropertyId.cpp:36
std::chrono::milliseconds
std::unique_ptr::get
T get(T... args)
std::intptr_t
std::lock_guard
std::lock
T lock(T... args)
std::make_shared
T make_shared(T... args)
std::move
T move(T... args)
Euclid::Configuration::logger
static Elements::Logging logger
Euclid::make_unique
std::unique_ptr< T > make_unique(Args &&... args)
SourceXtractor
Definition
Aperture.h:30
std::next
T next(T... args)
std::shared_ptr
std::this_thread::sleep_for
T sleep_for(T... args)
SourceXtractor::Prefetcher::EventType::PROCESS_SOURCE
@ PROCESS_SOURCE
Definition
Prefetcher.h:100
SourceXtractor::Prefetcher::EventType::SOURCE
@ SOURCE
Definition
Prefetcher.h:100
SourceXtractor::ProcessSourcesEvent
Event received by SourceGrouping to request the processing of some of the Sources stored.
Definition
PipelineStage.h:33
SourceXtractor::ReverseLock
Definition
Prefetcher.cpp:31
SourceXtractor::ReverseLock::~ReverseLock
~ReverseLock()
Definition
Prefetcher.cpp:36
SourceXtractor::ReverseLock::m_lock
Lock & m_lock
Definition
Prefetcher.cpp:41
SourceXtractor::ReverseLock::ReverseLock
ReverseLock(Lock &lock)
Definition
Prefetcher.cpp:32
std::unique_lock
std::unique_ptr
Generated by
1.14.0