Botan
1.11.15
|
00001 /* 00002 * Threaded Fork 00003 * (C) 2013 Joel Low 00004 * 2013 Jack Lloyd 00005 * 00006 * Botan is released under the Simplified BSD License (see license.txt) 00007 */ 00008 00009 #include <botan/basefilt.h> 00010 #include <botan/internal/semaphore.h> 00011 00012 namespace Botan { 00013 00014 struct Threaded_Fork_Data 00015 { 00016 /* 00017 * Semaphore for indicating that there is work to be done (or to 00018 * quit) 00019 */ 00020 Semaphore m_input_ready_semaphore; 00021 00022 /* 00023 * Ensures that all threads have completed processing data. 00024 */ 00025 Semaphore m_input_complete_semaphore; 00026 00027 /* 00028 * The work that needs to be done. This should be only when the threads 00029 * are NOT running (i.e. before notifying the work condition, after 00030 * the input_complete_semaphore is completely reset.) 00031 */ 00032 const byte* m_input = nullptr; 00033 00034 /* 00035 * The length of the work that needs to be done. 00036 */ 00037 size_t m_input_length = 0; 00038 }; 00039 00040 /* 00041 * Threaded_Fork constructor 00042 */ 00043 Threaded_Fork::Threaded_Fork(Filter* f1, Filter* f2, Filter* f3, Filter* f4) : 00044 Fork(nullptr, static_cast<size_t>(0)), 00045 m_thread_data(new Threaded_Fork_Data) 00046 { 00047 Filter* filters[4] = { f1, f2, f3, f4 }; 00048 set_next(filters, 4); 00049 } 00050 00051 /* 00052 * Threaded_Fork constructor 00053 */ 00054 Threaded_Fork::Threaded_Fork(Filter* filters[], size_t count) : 00055 Fork(nullptr, static_cast<size_t>(0)), 00056 m_thread_data(new Threaded_Fork_Data) 00057 { 00058 set_next(filters, count); 00059 } 00060 00061 Threaded_Fork::~Threaded_Fork() 00062 { 00063 m_thread_data->m_input = nullptr; 00064 m_thread_data->m_input_length = 0; 00065 00066 m_thread_data->m_input_ready_semaphore.release(m_threads.size()); 00067 00068 for(auto& thread : m_threads) 00069 thread->join(); 00070 } 00071 00072 std::string Threaded_Fork::name() const 00073 { 00074 return "Threaded Fork"; 00075 } 00076 00077 void Threaded_Fork::set_next(Filter* f[], size_t n) 00078 { 00079 Fork::set_next(f, n); 00080 n = next.size(); 00081 00082 if(n < m_threads.size()) 00083 m_threads.resize(n); 00084 else 00085 { 00086 m_threads.reserve(n); 00087 for(size_t i = m_threads.size(); i != n; ++i) 00088 { 00089 m_threads.push_back( 00090 std::shared_ptr<std::thread>( 00091 new std::thread( 00092 std::bind(&Threaded_Fork::thread_entry, this, next[i])))); 00093 } 00094 } 00095 } 00096 00097 void Threaded_Fork::send(const byte input[], size_t length) 00098 { 00099 if(write_queue.size()) 00100 thread_delegate_work(&write_queue[0], write_queue.size()); 00101 thread_delegate_work(input, length); 00102 00103 bool nothing_attached = true; 00104 for(size_t j = 0; j != total_ports(); ++j) 00105 if(next[j]) 00106 nothing_attached = false; 00107 00108 if(nothing_attached) 00109 write_queue += std::make_pair(input, length); 00110 else 00111 write_queue.clear(); 00112 } 00113 00114 void Threaded_Fork::thread_delegate_work(const byte input[], size_t length) 00115 { 00116 //Set the data to do. 00117 m_thread_data->m_input = input; 00118 m_thread_data->m_input_length = length; 00119 00120 //Let the workers start processing. 00121 m_thread_data->m_input_ready_semaphore.release(total_ports()); 00122 00123 //Wait for all the filters to finish processing. 00124 for(size_t i = 0; i != total_ports(); ++i) 00125 m_thread_data->m_input_complete_semaphore.acquire(); 00126 00127 //Reset the thread data 00128 m_thread_data->m_input = nullptr; 00129 m_thread_data->m_input_length = 0; 00130 } 00131 00132 void Threaded_Fork::thread_entry(Filter* filter) 00133 { 00134 while(true) 00135 { 00136 m_thread_data->m_input_ready_semaphore.acquire(); 00137 00138 if(!m_thread_data->m_input) 00139 break; 00140 00141 filter->write(m_thread_data->m_input, m_thread_data->m_input_length); 00142 m_thread_data->m_input_complete_semaphore.release(); 00143 } 00144 } 00145 00146 }