fastcgi++
manager.hpp
Go to the documentation of this file.
00001 
00002 /***************************************************************************
00003 * Copyright (C) 2007 Eddie Carle [eddie@erctech.org]                       *
00004 *                                                                          *
00005 * This file is part of fastcgi++.                                          *
00006 *                                                                          *
00007 * fastcgi++ is free software: you can redistribute it and/or modify it     *
00008 * under the terms of the GNU Lesser General Public License as  published   *
00009 * by the Free Software Foundation, either version 3 of the License, or (at *
00010 * your option) any later version.                                          *
00011 *                                                                          *
00012 * fastcgi++ is distributed in the hope that it will be useful, but WITHOUT *
00013 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or    *
00014 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public     *
00015 * License for more details.                                                *
00016 *                                                                          *
00017 * You should have received a copy of the GNU Lesser General Public License *
00018 * along with fastcgi++.  If not, see <http://www.gnu.org/licenses/>.       *
00019 ****************************************************************************/
00020 
00021 
00022 #ifndef MANAGER_HPP
00023 #define MANAGER_HPP
00024 
00025 #include <map>
00026 #include <string>
00027 #include <queue>
00028 #include <algorithm>
00029 #include <cstring>
00030 
00031 #include <boost/bind.hpp>
00032 #include <boost/shared_ptr.hpp>
00033 #include <boost/thread.hpp>
00034 #include <boost/thread/shared_mutex.hpp>
00035 
00036 #include <signal.h>
00037 
00038 #include <fastcgi++/protocol.hpp>
00039 #include <fastcgi++/transceiver.hpp>
00040 
00042 namespace Fastcgipp
00043 {
00045 
00055    class ManagerPar
00056    {
00057    public:
00059 
00070       ManagerPar(int fd, const boost::function<void(Protocol::FullId, Message)>& sendMessage_, bool doSetupSignals);
00071 
00072       ~ManagerPar() { instance=0; }
00073 
00075 
00084       void stop();
00085 
00087 
00095       void terminate();
00096       
00098 
00105       void setupSignals();
00106 
00108       size_t getMessagesSize() const { return messages.size(); }
00109 
00110    protected:
00112       Transceiver transceiver;
00113 
00115 
00119       class Tasks: public std::queue<Protocol::FullId>, public boost::mutex {};
00121 
00124       Tasks tasks;
00125 
00127       std::queue<Message> messages;
00128 
00130 
00138       void localHandler(Protocol::FullId id);
00139 
00141       bool asleep;
00143       boost::mutex sleepMutex;
00144 
00146 
00149       bool stopBool;
00151       boost::mutex stopMutex;
00153 
00156       bool terminateBool;
00158       boost::mutex terminateMutex;
00159 
00160    private:
00162       static void signalHandler(int signum);
00164       static ManagerPar* instance;
00165    };
00166    
00168 
00178    template<class T> class Manager: public ManagerPar
00179    {
00180    public:
00182 
00192       Manager(int fd=0, bool doSetupSignals=true): ManagerPar(fd, boost::bind(&Manager::push, boost::ref(*this), _1, _2), doSetupSignals) {}
00193 
00195 
00202       void handler();
00203 
00205 
00220       void push(Protocol::FullId id, Message message);
00221 
00223       size_t getRequestsSize() const { return requests.size(); }
00224    private:
00226 
00230       class Requests: public std::map<Protocol::FullId, boost::shared_ptr<T> >, public boost::shared_mutex {};
00232 
00236       Requests requests;
00237    };
00238 }
00239 
00240 template<class T> void Fastcgipp::Manager<T>::push(Protocol::FullId id, Message message)
00241 {
00242    using namespace std;
00243    using namespace Protocol;
00244    using namespace boost;
00245 
00246    if(id.fcgiId)
00247    {
00248       shared_lock<shared_mutex> reqReadLock(requests);
00249       typename Requests::iterator it(requests.find(id));
00250       if(it!=requests.end())
00251       {
00252          lock_guard<mutex> mesLock(it->second->messages);
00253          it->second->messages.push(message);
00254          lock_guard<mutex> tasksLock(tasks);
00255          tasks.push(id);
00256       }
00257       else if(!message.type)
00258       {
00259          Header& header=*(Header*)message.data.get();
00260          if(header.getType()==BEGIN_REQUEST)
00261          {
00262             BeginRequest& body=*(BeginRequest*)(message.data.get()+sizeof(Header));
00263 
00264             reqReadLock.unlock();
00265             unique_lock<shared_mutex> reqWriteLock(requests);
00266 
00267             boost::shared_ptr<T>& request = requests[id];
00268             request.reset(new T);
00269             request->set(id, transceiver, body.getRole(), !body.getKeepConn(), boost::bind(&Manager::push, boost::ref(*this), id, _1));
00270          }
00271          else
00272             return;
00273       }
00274    }
00275    else
00276    {
00277       messages.push(message);
00278       tasks.push(id);
00279    }
00280 
00281    lock_guard<mutex> sleepLock(sleepMutex);
00282    if(asleep)
00283       transceiver.wake();
00284 }
00285 
00286 template<class T> void Fastcgipp::Manager<T>::handler()
00287 {
00288    using namespace std;
00289    using namespace boost;
00290 
00291    while(1)
00292    {{
00293       {
00294          lock_guard<mutex> stopLock(stopMutex);
00295          if(stopBool)
00296          {
00297             stopBool=false;
00298             return;
00299          }
00300       }
00301       
00302       bool sleep=transceiver.handler();
00303 
00304       {
00305          lock_guard<mutex> terminateLock(terminateMutex);
00306          if(terminateBool)
00307          {
00308             shared_lock<shared_mutex> requestsLock(requests);
00309             if(requests.empty() && sleep)
00310             {
00311                terminateBool=false;
00312                return;
00313             }
00314          }
00315       }
00316 
00317       unique_lock<mutex> tasksLock(tasks);
00318       unique_lock<mutex> sleepLock(sleepMutex);
00319 
00320       if(tasks.empty())
00321       {
00322          tasksLock.unlock();
00323 
00324          asleep=true;
00325          sleepLock.unlock();
00326 
00327          if(sleep) transceiver.sleep();
00328 
00329          sleepLock.lock();
00330          asleep=false;
00331          sleepLock.unlock();
00332 
00333          continue;
00334       }
00335 
00336       sleepLock.unlock();
00337 
00338       Protocol::FullId id=tasks.front();
00339       tasks.pop();
00340       tasksLock.unlock();
00341 
00342       if(id.fcgiId==0)
00343          localHandler(id);
00344       else
00345       {
00346          shared_lock<shared_mutex> reqReadLock(requests);
00347          typename map<Protocol::FullId, boost::shared_ptr<T> >::iterator it(requests.find(id));
00348          if(it!=requests.end() && it->second->handler())
00349          {
00350             reqReadLock.unlock();
00351             unique_lock<shared_mutex> reqWriteLock(requests);
00352 
00353             requests.erase(it);
00354          }
00355       }
00356    }}
00357 }
00358 
00359 #endif