Botan  1.11.15
src/lib/filters/threaded_fork.cpp
Go to the documentation of this file.
00001 /*
00002 * Threaded Fork
00003 * (C) 2013 Joel Low
00004 *     2013 Jack Lloyd
00005 *
00006 * Botan is released under the Simplified BSD License (see license.txt)
00007 */
00008 
00009 #include <botan/basefilt.h>
00010 #include <botan/internal/semaphore.h>
00011 
00012 namespace Botan {
00013 
00014 struct Threaded_Fork_Data
00015    {
00016    /*
00017    * Semaphore for indicating that there is work to be done (or to
00018    * quit)
00019    */
00020    Semaphore m_input_ready_semaphore;
00021 
00022    /*
00023    * Ensures that all threads have completed processing data.
00024    */
00025    Semaphore m_input_complete_semaphore;
00026 
00027    /*
00028    * The work that needs to be done. This should be only when the threads
00029    * are NOT running (i.e. before notifying the work condition, after
00030    * the input_complete_semaphore is completely reset.)
00031    */
00032    const byte* m_input = nullptr;
00033 
00034    /*
00035    * The length of the work that needs to be done.
00036    */
00037    size_t m_input_length = 0;
00038    };
00039 
00040 /*
00041 * Threaded_Fork constructor
00042 */
00043 Threaded_Fork::Threaded_Fork(Filter* f1, Filter* f2, Filter* f3, Filter* f4) :
00044    Fork(nullptr, static_cast<size_t>(0)),
00045    m_thread_data(new Threaded_Fork_Data)
00046    {
00047    Filter* filters[4] = { f1, f2, f3, f4 };
00048    set_next(filters, 4);
00049    }
00050 
00051 /*
00052 * Threaded_Fork constructor
00053 */
00054 Threaded_Fork::Threaded_Fork(Filter* filters[], size_t count) :
00055    Fork(nullptr, static_cast<size_t>(0)),
00056    m_thread_data(new Threaded_Fork_Data)
00057    {
00058    set_next(filters, count);
00059    }
00060 
00061 Threaded_Fork::~Threaded_Fork()
00062    {
00063    m_thread_data->m_input = nullptr;
00064    m_thread_data->m_input_length = 0;
00065 
00066    m_thread_data->m_input_ready_semaphore.release(m_threads.size());
00067 
00068    for(auto& thread : m_threads)
00069      thread->join();
00070    }
00071 
00072 std::string Threaded_Fork::name() const
00073    {
00074    return "Threaded Fork";
00075    }
00076 
00077 void Threaded_Fork::set_next(Filter* f[], size_t n)
00078    {
00079    Fork::set_next(f, n);
00080    n = next.size();
00081 
00082    if(n < m_threads.size())
00083       m_threads.resize(n);
00084    else
00085       {
00086       m_threads.reserve(n);
00087       for(size_t i = m_threads.size(); i != n; ++i)
00088          {
00089          m_threads.push_back(
00090             std::shared_ptr<std::thread>(
00091                new std::thread(
00092                   std::bind(&Threaded_Fork::thread_entry, this, next[i]))));
00093          }
00094       }
00095    }
00096 
00097 void Threaded_Fork::send(const byte input[], size_t length)
00098    {
00099    if(write_queue.size())
00100       thread_delegate_work(&write_queue[0], write_queue.size());
00101    thread_delegate_work(input, length);
00102 
00103    bool nothing_attached = true;
00104    for(size_t j = 0; j != total_ports(); ++j)
00105       if(next[j])
00106          nothing_attached = false;
00107 
00108    if(nothing_attached)
00109       write_queue += std::make_pair(input, length);
00110    else
00111       write_queue.clear();
00112    }
00113 
00114 void Threaded_Fork::thread_delegate_work(const byte input[], size_t length)
00115    {
00116    //Set the data to do.
00117    m_thread_data->m_input = input;
00118    m_thread_data->m_input_length = length;
00119 
00120    //Let the workers start processing.
00121    m_thread_data->m_input_ready_semaphore.release(total_ports());
00122 
00123    //Wait for all the filters to finish processing.
00124    for(size_t i = 0; i != total_ports(); ++i)
00125       m_thread_data->m_input_complete_semaphore.acquire();
00126 
00127    //Reset the thread data
00128    m_thread_data->m_input = nullptr;
00129    m_thread_data->m_input_length = 0;
00130    }
00131 
00132 void Threaded_Fork::thread_entry(Filter* filter)
00133    {
00134    while(true)
00135       {
00136       m_thread_data->m_input_ready_semaphore.acquire();
00137 
00138       if(!m_thread_data->m_input)
00139          break;
00140 
00141       filter->write(m_thread_data->m_input, m_thread_data->m_input_length);
00142       m_thread_data->m_input_complete_semaphore.release();
00143       }
00144    }
00145 
00146 }