Botan  1.11.15
src/lib/filters/pipe.cpp
Go to the documentation of this file.
00001 /*
00002 * Pipe
00003 * (C) 1999-2007 Jack Lloyd
00004 *
00005 * Botan is released under the Simplified BSD License (see license.txt)
00006 */
00007 
00008 #include <botan/pipe.h>
00009 #include <botan/internal/out_buf.h>
00010 #include <botan/secqueue.h>
00011 #include <botan/parsing.h>
00012 
00013 namespace Botan {
00014 
00015 namespace {
00016 
00017 /*
00018 * A Filter that does nothing
00019 */
00020 class Null_Filter : public Filter
00021    {
00022    public:
00023       void write(const byte input[], size_t length)
00024          { send(input, length); }
00025 
00026       std::string name() const { return "Null"; }
00027    };
00028 
00029 }
00030 
00031 /*
00032 * Pipe Constructor
00033 */
00034 Pipe::Pipe(Filter* f1, Filter* f2, Filter* f3, Filter* f4)
00035    {
00036    init();
00037    append(f1);
00038    append(f2);
00039    append(f3);
00040    append(f4);
00041    }
00042 
00043 /*
00044 * Pipe Constructor
00045 */
00046 Pipe::Pipe(std::initializer_list<Filter*> args)
00047    {
00048    init();
00049 
00050    for(auto i = args.begin(); i != args.end(); ++i)
00051       append(*i);
00052    }
00053 
00054 /*
00055 * Pipe Destructor
00056 */
00057 Pipe::~Pipe()
00058    {
00059    destruct(pipe);
00060    delete outputs;
00061    }
00062 
00063 /*
00064 * Initialize the Pipe
00065 */
00066 void Pipe::init()
00067    {
00068    outputs = new Output_Buffers;
00069    pipe = nullptr;
00070    default_read = 0;
00071    inside_msg = false;
00072    }
00073 
00074 /*
00075 * Reset the Pipe
00076 */
00077 void Pipe::reset()
00078    {
00079    destruct(pipe);
00080    pipe = nullptr;
00081    inside_msg = false;
00082    }
00083 
00084 /*
00085 * Destroy the Pipe
00086 */
00087 void Pipe::destruct(Filter* to_kill)
00088    {
00089    if(!to_kill || dynamic_cast<SecureQueue*>(to_kill))
00090       return;
00091    for(size_t j = 0; j != to_kill->total_ports(); ++j)
00092       destruct(to_kill->next[j]);
00093    delete to_kill;
00094    }
00095 
00096 /*
00097 * Test if the Pipe has any data in it
00098 */
00099 bool Pipe::end_of_data() const
00100    {
00101    return (remaining() == 0);
00102    }
00103 
00104 /*
00105 * Set the default read message
00106 */
00107 void Pipe::set_default_msg(message_id msg)
00108    {
00109    if(msg >= message_count())
00110       throw Invalid_Argument("Pipe::set_default_msg: msg number is too high");
00111    default_read = msg;
00112    }
00113 
00114 /*
00115 * Process a full message at once
00116 */
00117 void Pipe::process_msg(const byte input[], size_t length)
00118    {
00119    start_msg();
00120    write(input, length);
00121    end_msg();
00122    }
00123 
00124 /*
00125 * Process a full message at once
00126 */
00127 void Pipe::process_msg(const secure_vector<byte>& input)
00128    {
00129    process_msg(input.data(), input.size());
00130    }
00131 
00132 void Pipe::process_msg(const std::vector<byte>& input)
00133    {
00134    process_msg(input.data(), input.size());
00135    }
00136 
00137 /*
00138 * Process a full message at once
00139 */
00140 void Pipe::process_msg(const std::string& input)
00141    {
00142    process_msg(reinterpret_cast<const byte*>(input.data()), input.length());
00143    }
00144 
00145 /*
00146 * Process a full message at once
00147 */
00148 void Pipe::process_msg(DataSource& input)
00149    {
00150    start_msg();
00151    write(input);
00152    end_msg();
00153    }
00154 
00155 /*
00156 * Start a new message
00157 */
00158 void Pipe::start_msg()
00159    {
00160    if(inside_msg)
00161       throw Invalid_State("Pipe::start_msg: Message was already started");
00162    if(pipe == nullptr)
00163       pipe = new Null_Filter;
00164    find_endpoints(pipe);
00165    pipe->new_msg();
00166    inside_msg = true;
00167    }
00168 
00169 /*
00170 * End the current message
00171 */
00172 void Pipe::end_msg()
00173    {
00174    if(!inside_msg)
00175       throw Invalid_State("Pipe::end_msg: Message was already ended");
00176    pipe->finish_msg();
00177    clear_endpoints(pipe);
00178    if(dynamic_cast<Null_Filter*>(pipe))
00179       {
00180       delete pipe;
00181       pipe = nullptr;
00182       }
00183    inside_msg = false;
00184 
00185    outputs->retire();
00186    }
00187 
00188 /*
00189 * Find the endpoints of the Pipe
00190 */
00191 void Pipe::find_endpoints(Filter* f)
00192    {
00193    for(size_t j = 0; j != f->total_ports(); ++j)
00194       if(f->next[j] && !dynamic_cast<SecureQueue*>(f->next[j]))
00195          find_endpoints(f->next[j]);
00196       else
00197          {
00198          SecureQueue* q = new SecureQueue;
00199          f->next[j] = q;
00200          outputs->add(q);
00201          }
00202    }
00203 
00204 /*
00205 * Remove the SecureQueues attached to the Filter
00206 */
00207 void Pipe::clear_endpoints(Filter* f)
00208    {
00209    if(!f) return;
00210    for(size_t j = 0; j != f->total_ports(); ++j)
00211       {
00212       if(f->next[j] && dynamic_cast<SecureQueue*>(f->next[j]))
00213          f->next[j] = nullptr;
00214       clear_endpoints(f->next[j]);
00215       }
00216    }
00217 
00218 /*
00219 * Append a Filter to the Pipe
00220 */
00221 void Pipe::append(Filter* filter)
00222    {
00223    if(inside_msg)
00224       throw Invalid_State("Cannot append to a Pipe while it is processing");
00225    if(!filter)
00226       return;
00227    if(dynamic_cast<SecureQueue*>(filter))
00228       throw Invalid_Argument("Pipe::append: SecureQueue cannot be used");
00229    if(filter->owned)
00230       throw Invalid_Argument("Filters cannot be shared among multiple Pipes");
00231 
00232    filter->owned = true;
00233 
00234    if(!pipe) pipe = filter;
00235    else      pipe->attach(filter);
00236    }
00237 
00238 /*
00239 * Prepend a Filter to the Pipe
00240 */
00241 void Pipe::prepend(Filter* filter)
00242    {
00243    if(inside_msg)
00244       throw Invalid_State("Cannot prepend to a Pipe while it is processing");
00245    if(!filter)
00246       return;
00247    if(dynamic_cast<SecureQueue*>(filter))
00248       throw Invalid_Argument("Pipe::prepend: SecureQueue cannot be used");
00249    if(filter->owned)
00250       throw Invalid_Argument("Filters cannot be shared among multiple Pipes");
00251 
00252    filter->owned = true;
00253 
00254    if(pipe) filter->attach(pipe);
00255    pipe = filter;
00256    }
00257 
00258 /*
00259 * Pop a Filter off the Pipe
00260 */
00261 void Pipe::pop()
00262    {
00263    if(inside_msg)
00264       throw Invalid_State("Cannot pop off a Pipe while it is processing");
00265 
00266    if(!pipe)
00267       return;
00268 
00269    if(pipe->total_ports() > 1)
00270       throw Invalid_State("Cannot pop off a Filter with multiple ports");
00271 
00272    Filter* f = pipe;
00273    size_t owns = f->owns();
00274    pipe = pipe->next[0];
00275    delete f;
00276 
00277    while(owns--)
00278       {
00279       f = pipe;
00280       pipe = pipe->next[0];
00281       delete f;
00282       }
00283    }
00284 
00285 /*
00286 * Return the number of messages in this Pipe
00287 */
00288 Pipe::message_id Pipe::message_count() const
00289    {
00290    return outputs->message_count();
00291    }
00292 
00293 /*
00294 * Static Member Variables
00295 */
00296 const Pipe::message_id Pipe::LAST_MESSAGE =
00297    static_cast<Pipe::message_id>(-2);
00298 
00299 const Pipe::message_id Pipe::DEFAULT_MESSAGE =
00300    static_cast<Pipe::message_id>(-1);
00301 
00302 }