PStreams
pstream.h
Go to the documentation of this file.
00001 /*
00002 PStreams - POSIX Process I/O for C++
00003 Copyright (C) 2001-2014 Jonathan Wakely
00004 
00005 This file is part of PStreams.
00006 
00007 PStreams is free software; you can redistribute it and/or modify
00008 it under the terms of the GNU Lesser General Public License as published by
00009 the Free Software Foundation; either version 3 of the License, or
00010 (at your option) any later version.
00011 
00012 PStreams is distributed in the hope that it will be useful,
00013 but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 GNU Lesser General Public License for more details.
00016 
00017 You should have received a copy of the GNU Lesser General Public License
00018 along with this program.  If not, see <http://www.gnu.org/licenses/>.
00019 */
00020 
00030 #ifndef REDI_PSTREAM_H_SEEN
00031 #define REDI_PSTREAM_H_SEEN
00032 
00033 #include <ios>
00034 #include <streambuf>
00035 #include <istream>
00036 #include <ostream>
00037 #include <string>
00038 #include <vector>
00039 #include <algorithm>    // for min()
00040 #include <cerrno>       // for errno
00041 #include <cstddef>      // for size_t, NULL
00042 #include <cstdlib>      // for exit()
00043 #include <sys/types.h>  // for pid_t
00044 #include <sys/wait.h>   // for waitpid()
00045 #include <sys/ioctl.h>  // for ioctl() and FIONREAD
00046 #if defined(__sun)
00047 # include <sys/filio.h> // for FIONREAD on Solaris 2.5
00048 #endif
00049 #include <unistd.h>     // for pipe() fork() exec() and filedes functions
00050 #include <signal.h>     // for kill()
00051 #include <fcntl.h>      // for fcntl()
00052 #if REDI_EVISCERATE_PSTREAMS
00053 # include <stdio.h>     // for FILE, fdopen()
00054 #endif
00055 
00056 
00058 #define PSTREAMS_VERSION 0x0081   // 0.8.1
00059 
00073 namespace redi
00074 {
00076   struct pstreams
00077   {
00079     typedef std::ios_base::openmode           pmode;
00080 
00082     typedef std::vector<std::string>          argv_type;
00083 
00085     typedef int                               fd_type;
00086 
00087     static const pmode pstdin  = std::ios_base::out; 
00088     static const pmode pstdout = std::ios_base::in;  
00089     static const pmode pstderr = std::ios_base::app; 
00090 
00092     static const pmode newpg   = std::ios_base::trunc;
00093 
00094   protected:
00095     enum { bufsz = 32 };  
00096     enum { pbsz  = 2 };   
00097   };
00098 
00100   template <typename CharT, typename Traits = std::char_traits<CharT> >
00101     class basic_pstreambuf
00102     : public std::basic_streambuf<CharT, Traits>
00103     , public pstreams
00104     {
00105     public:
00106       // Type definitions for dependent types
00107       typedef CharT                             char_type;
00108       typedef Traits                            traits_type;
00109       typedef typename traits_type::int_type    int_type;
00110       typedef typename traits_type::off_type    off_type;
00111       typedef typename traits_type::pos_type    pos_type;
00113       typedef fd_type                           fd_t;
00114 
00116       basic_pstreambuf();
00117 
00119       basic_pstreambuf(const std::string& command, pmode mode);
00120 
00122       basic_pstreambuf( const std::string& file,
00123                         const argv_type& argv,
00124                         pmode mode );
00125 
00127       ~basic_pstreambuf();
00128 
00130       basic_pstreambuf*
00131       open(const std::string& command, pmode mode);
00132 
00134       basic_pstreambuf*
00135       open(const std::string& file, const argv_type& argv, pmode mode);
00136 
00138       basic_pstreambuf*
00139       close();
00140 
00142       basic_pstreambuf*
00143       kill(int signal = SIGTERM);
00144 
00146       basic_pstreambuf*
00147       killpg(int signal = SIGTERM);
00148 
00150       void
00151       peof();
00152 
00154       bool
00155       read_err(bool readerr = true);
00156 
00158       bool
00159       is_open() const;
00160 
00162       bool
00163       exited();
00164 
00165 #if REDI_EVISCERATE_PSTREAMS
00166 
00167       std::size_t
00168       fopen(FILE*& in, FILE*& out, FILE*& err);
00169 #endif
00170 
00172       int
00173       status() const;
00174 
00176       int
00177       error() const;
00178 
00179     protected:
00181       int_type
00182       overflow(int_type c);
00183 
00185       int_type
00186       underflow();
00187 
00189       int_type
00190       pbackfail(int_type c = traits_type::eof());
00191 
00193       int
00194       sync();
00195 
00197       std::streamsize
00198       xsputn(const char_type* s, std::streamsize n);
00199 
00201       std::streamsize
00202       write(const char_type* s, std::streamsize n);
00203 
00205       std::streamsize
00206       read(char_type* s, std::streamsize n);
00207 
00209       std::streamsize
00210       showmanyc();
00211 
00212     protected:
00214       enum buf_read_src { rsrc_out = 0, rsrc_err = 1 };
00215 
00217       pid_t
00218       fork(pmode mode);
00219 
00221       int
00222       wait(bool nohang = false);
00223 
00225       fd_type&
00226       wpipe();
00227 
00229       fd_type&
00230       rpipe();
00231 
00233       fd_type&
00234       rpipe(buf_read_src which);
00235 
00236       void
00237       create_buffers(pmode mode);
00238 
00239       void
00240       destroy_buffers(pmode mode);
00241 
00243       bool
00244       empty_buffer();
00245 
00246       bool
00247       fill_buffer(bool non_blocking = false);
00248 
00250       char_type*
00251       rbuffer();
00252 
00253       buf_read_src
00254       switch_read_buffer(buf_read_src);
00255 
00256     private:
00257       basic_pstreambuf(const basic_pstreambuf&);
00258       basic_pstreambuf& operator=(const basic_pstreambuf&);
00259 
00260       void
00261       init_rbuffers();
00262 
00263       pid_t         ppid_;        // pid of process
00264       fd_type       wpipe_;       // pipe used to write to process' stdin
00265       fd_type       rpipe_[2];    // two pipes to read from, stdout and stderr
00266       char_type*    wbuffer_;
00267       char_type*    rbuffer_[2];
00268       char_type*    rbufstate_[3];
00270       buf_read_src  rsrc_;
00271       int           status_;      // hold exit status of child process
00272       int           error_;       // hold errno if fork() or exec() fails
00273     };
00274 
00276   template <typename CharT, typename Traits = std::char_traits<CharT> >
00277     class pstream_common
00278     : virtual public std::basic_ios<CharT, Traits>
00279     , virtual public pstreams
00280     {
00281     protected:
00282       typedef basic_pstreambuf<CharT, Traits>       streambuf_type;
00283 
00284       typedef pstreams::pmode                       pmode;
00285       typedef pstreams::argv_type                   argv_type;
00286 
00288       pstream_common();
00289 
00291       pstream_common(const std::string& command, pmode mode);
00292 
00294       pstream_common(const std::string& file, const argv_type& argv, pmode mode);
00295 
00297       virtual
00298       ~pstream_common() = 0;
00299 
00301       void
00302       do_open(const std::string& command, pmode mode);
00303 
00305       void
00306       do_open(const std::string& file, const argv_type& argv, pmode mode);
00307 
00308     public:
00310       void
00311       close();
00312 
00314       bool
00315       is_open() const;
00316 
00318       const std::string&
00319       command() const;
00320 
00322       streambuf_type*
00323       rdbuf() const;
00324 
00325 #if REDI_EVISCERATE_PSTREAMS
00326 
00327       std::size_t
00328       fopen(FILE*& in, FILE*& out, FILE*& err);
00329 #endif
00330 
00331     protected:
00332       std::string       command_; 
00333       streambuf_type    buf_;     
00334     };
00335 
00336 
00347   template <typename CharT, typename Traits = std::char_traits<CharT> >
00348     class basic_ipstream
00349     : public std::basic_istream<CharT, Traits>
00350     , public pstream_common<CharT, Traits>
00351     , virtual public pstreams
00352     {
00353       typedef std::basic_istream<CharT, Traits>     istream_type;
00354       typedef pstream_common<CharT, Traits>         pbase_type;
00355 
00356       using pbase_type::buf_;  // declare name in this scope
00357 
00358       // Ensure a basic_ipstream will read from at least one pipe
00359       pmode readable(pmode mode)
00360       {
00361         if (!(mode & (pstdout|pstderr)))
00362           mode |= pstdout;
00363         return mode;
00364       }
00365 
00366     public:
00368       typedef typename pbase_type::pmode            pmode;
00369 
00371       typedef typename pbase_type::argv_type        argv_type;
00372 
00374       basic_ipstream()
00375       : istream_type(NULL), pbase_type()
00376       { }
00377 
00388       explicit
00389       basic_ipstream(const std::string& command, pmode mode = pstdout)
00390       : istream_type(NULL), pbase_type(command, readable(mode))
00391       { }
00392 
00404       basic_ipstream( const std::string& file,
00405                       const argv_type& argv,
00406                       pmode mode = pstdout )
00407       : istream_type(NULL), pbase_type(file, argv, readable(mode))
00408       { }
00409 
00420       explicit
00421       basic_ipstream(const argv_type& argv, pmode mode = pstdout)
00422       : istream_type(NULL), pbase_type(argv.at(0), argv, readable(mode))
00423       { }
00424 
00425 #if __cplusplus >= 201103L
00426       template<typename T>
00427         explicit
00428         basic_ipstream(std::initializer_list<T> args, pmode mode = pstdout)
00429         : basic_ipstream(argv_type(args.begin(), args.end()), mode)
00430         { }
00431 #endif
00432 
00438       ~basic_ipstream()
00439       { }
00440 
00450       void
00451       open(const std::string& command, pmode mode = pstdout)
00452       {
00453         this->do_open(command, readable(mode));
00454       }
00455 
00466       void
00467       open( const std::string& file,
00468             const argv_type& argv,
00469             pmode mode = pstdout )
00470       {
00471         this->do_open(file, argv, readable(mode));
00472       }
00473 
00478       basic_ipstream&
00479       out()
00480       {
00481         this->buf_.read_err(false);
00482         return *this;
00483       }
00484 
00489       basic_ipstream&
00490       err()
00491       {
00492         this->buf_.read_err(true);
00493         return *this;
00494       }
00495     };
00496 
00497 
00507   template <typename CharT, typename Traits = std::char_traits<CharT> >
00508     class basic_opstream
00509     : public std::basic_ostream<CharT, Traits>
00510     , public pstream_common<CharT, Traits>
00511     , virtual public pstreams
00512     {
00513       typedef std::basic_ostream<CharT, Traits>     ostream_type;
00514       typedef pstream_common<CharT, Traits>         pbase_type;
00515 
00516       using pbase_type::buf_;  // declare name in this scope
00517 
00518     public:
00520       typedef typename pbase_type::pmode            pmode;
00521 
00523       typedef typename pbase_type::argv_type        argv_type;
00524 
00526       basic_opstream()
00527       : ostream_type(NULL), pbase_type()
00528       { }
00529 
00540       explicit
00541       basic_opstream(const std::string& command, pmode mode = pstdin)
00542       : ostream_type(NULL), pbase_type(command, mode|pstdin)
00543       { }
00544 
00556       basic_opstream( const std::string& file,
00557                       const argv_type& argv,
00558                       pmode mode = pstdin )
00559       : ostream_type(NULL), pbase_type(file, argv, mode|pstdin)
00560       { }
00561 
00572       explicit
00573       basic_opstream(const argv_type& argv, pmode mode = pstdin)
00574       : ostream_type(NULL), pbase_type(argv.at(0), argv, mode|pstdin)
00575       { }
00576 
00577 #if __cplusplus >= 201103L
00578 
00585       template<typename T>
00586         explicit
00587         basic_opstream(std::initializer_list<T> args, pmode mode = pstdin)
00588         : basic_opstream(argv_type(args.begin(), args.end()), mode)
00589         { }
00590 #endif
00591 
00597       ~basic_opstream() { }
00598 
00608       void
00609       open(const std::string& command, pmode mode = pstdin)
00610       {
00611         this->do_open(command, mode|pstdin);
00612       }
00613 
00624       void
00625       open( const std::string& file,
00626             const argv_type& argv,
00627             pmode mode = pstdin)
00628       {
00629         this->do_open(file, argv, mode|pstdin);
00630       }
00631     };
00632 
00633 
00647   template <typename CharT, typename Traits = std::char_traits<CharT> >
00648     class basic_pstream
00649     : public std::basic_iostream<CharT, Traits>
00650     , public pstream_common<CharT, Traits>
00651     , virtual public pstreams
00652     {
00653       typedef std::basic_iostream<CharT, Traits>    iostream_type;
00654       typedef pstream_common<CharT, Traits>         pbase_type;
00655 
00656       using pbase_type::buf_;  // declare name in this scope
00657 
00658     public:
00660       typedef typename pbase_type::pmode            pmode;
00661 
00663       typedef typename pbase_type::argv_type        argv_type;
00664 
00666       basic_pstream()
00667       : iostream_type(NULL), pbase_type()
00668       { }
00669 
00680       explicit
00681       basic_pstream(const std::string& command, pmode mode = pstdout|pstdin)
00682       : iostream_type(NULL), pbase_type(command, mode)
00683       { }
00684 
00696       basic_pstream( const std::string& file,
00697                      const argv_type& argv,
00698                      pmode mode = pstdout|pstdin )
00699       : iostream_type(NULL), pbase_type(file, argv, mode)
00700       { }
00701 
00712       explicit
00713       basic_pstream(const argv_type& argv, pmode mode = pstdout|pstdin)
00714       : iostream_type(NULL), pbase_type(argv.at(0), argv, mode)
00715       { }
00716 
00717 #if __cplusplus >= 201103L
00718 
00725       template<typename T>
00726         explicit
00727         basic_pstream(std::initializer_list<T> l, pmode mode = pstdout|pstdin)
00728         : basic_pstream(argv_type(l.begin(), l.end()), mode)
00729         { }
00730 #endif
00731 
00737       ~basic_pstream() { }
00738 
00748       void
00749       open(const std::string& command, pmode mode = pstdout|pstdin)
00750       {
00751         this->do_open(command, mode);
00752       }
00753 
00764       void
00765       open( const std::string& file,
00766             const argv_type& argv,
00767             pmode mode = pstdout|pstdin )
00768       {
00769         this->do_open(file, argv, mode);
00770       }
00771 
00776       basic_pstream&
00777       out()
00778       {
00779         this->buf_.read_err(false);
00780         return *this;
00781       }
00782 
00787       basic_pstream&
00788       err()
00789       {
00790         this->buf_.read_err(true);
00791         return *this;
00792       }
00793     };
00794 
00795 
00817   template <typename CharT, typename Traits = std::char_traits<CharT> >
00818     class basic_rpstream
00819     : public std::basic_ostream<CharT, Traits>
00820     , private std::basic_istream<CharT, Traits>
00821     , private pstream_common<CharT, Traits>
00822     , virtual public pstreams
00823     {
00824       typedef std::basic_ostream<CharT, Traits>     ostream_type;
00825       typedef std::basic_istream<CharT, Traits>     istream_type;
00826       typedef pstream_common<CharT, Traits>         pbase_type;
00827 
00828       using pbase_type::buf_;  // declare name in this scope
00829 
00830     public:
00832       typedef typename pbase_type::pmode            pmode;
00833 
00835       typedef typename pbase_type::argv_type        argv_type;
00836 
00838       basic_rpstream()
00839       : ostream_type(NULL), istream_type(NULL), pbase_type()
00840       { }
00841 
00852       explicit
00853       basic_rpstream(const std::string& command, pmode mode = pstdout|pstdin)
00854       : ostream_type(NULL) , istream_type(NULL) , pbase_type(command, mode)
00855       { }
00856 
00868       basic_rpstream( const std::string& file,
00869                       const argv_type& argv,
00870                       pmode mode = pstdout|pstdin )
00871       : ostream_type(NULL), istream_type(NULL), pbase_type(file, argv, mode)
00872       { }
00873 
00884       explicit
00885       basic_rpstream(const argv_type& argv, pmode mode = pstdout|pstdin)
00886       : ostream_type(NULL), istream_type(NULL),
00887         pbase_type(argv.at(0), argv, mode)
00888       { }
00889 
00890 #if __cplusplus >= 201103L
00891 
00898       template<typename T>
00899         explicit
00900         basic_rpstream(std::initializer_list<T> l, pmode mode = pstdout|pstdin)
00901         : basic_rpstream(argv_type(l.begin(), l.end()), mode)
00902         { }
00903 #endif
00904 
00906       ~basic_rpstream() { }
00907 
00917       void
00918       open(const std::string& command, pmode mode = pstdout|pstdin)
00919       {
00920         this->do_open(command, mode);
00921       }
00922 
00933       void
00934       open( const std::string& file,
00935             const argv_type& argv,
00936             pmode mode = pstdout|pstdin )
00937       {
00938         this->do_open(file, argv, mode);
00939       }
00940 
00946       istream_type&
00947       out()
00948       {
00949         this->buf_.read_err(false);
00950         return *this;
00951       }
00952 
00958       istream_type&
00959       err()
00960       {
00961         this->buf_.read_err(true);
00962         return *this;
00963       }
00964     };
00965 
00966 
00968   typedef basic_pstreambuf<char> pstreambuf;
00970   typedef basic_ipstream<char> ipstream;
00972   typedef basic_opstream<char> opstream;
00974   typedef basic_pstream<char> pstream;
00976   typedef basic_rpstream<char> rpstream;
00977 
00978 
00991   template <typename C, typename T>
00992     inline std::basic_ostream<C,T>&
00993     peof(std::basic_ostream<C,T>& s)
00994     {
00995       typedef basic_pstreambuf<C,T> pstreambuf;
00996       if (pstreambuf* p = dynamic_cast<pstreambuf*>(s.rdbuf()))
00997         p->peof();
00998       return s;
00999     }
01000 
01001 
01002   /*
01003    * member definitions for pstreambuf
01004    */
01005 
01006 
01013   template <typename C, typename T>
01014     inline
01015     basic_pstreambuf<C,T>::basic_pstreambuf()
01016     : ppid_(-1)   // initialise to -1 to indicate no process run yet.
01017     , wpipe_(-1)
01018     , wbuffer_(NULL)
01019     , rsrc_(rsrc_out)
01020     , status_(-1)
01021     , error_(0)
01022     {
01023       init_rbuffers();
01024     }
01025 
01034   template <typename C, typename T>
01035     inline
01036     basic_pstreambuf<C,T>::basic_pstreambuf(const std::string& command, pmode mode)
01037     : ppid_(-1)   // initialise to -1 to indicate no process run yet.
01038     , wpipe_(-1)
01039     , wbuffer_(NULL)
01040     , rsrc_(rsrc_out)
01041     , status_(-1)
01042     , error_(0)
01043     {
01044       init_rbuffers();
01045       open(command, mode);
01046     }
01047 
01057   template <typename C, typename T>
01058     inline
01059     basic_pstreambuf<C,T>::basic_pstreambuf( const std::string& file,
01060                                              const argv_type& argv,
01061                                              pmode mode )
01062     : ppid_(-1)   // initialise to -1 to indicate no process run yet.
01063     , wpipe_(-1)
01064     , wbuffer_(NULL)
01065     , rsrc_(rsrc_out)
01066     , status_(-1)
01067     , error_(0)
01068     {
01069       init_rbuffers();
01070       open(file, argv, mode);
01071     }
01072 
01077   template <typename C, typename T>
01078     inline
01079     basic_pstreambuf<C,T>::~basic_pstreambuf()
01080     {
01081       close();
01082     }
01083 
01111   template <typename C, typename T>
01112     basic_pstreambuf<C,T>*
01113     basic_pstreambuf<C,T>::open(const std::string& command, pmode mode)
01114     {
01115       const char * shell_path = "/bin/sh";
01116 #if 0
01117       const std::string argv[] = { "sh", "-c", command };
01118       return this->open(shell_path, argv_type(argv, argv+3), mode);
01119 #else
01120       basic_pstreambuf<C,T>* ret = NULL;
01121 
01122       if (!is_open())
01123       {
01124         switch(fork(mode))
01125         {
01126         case 0 :
01127           // this is the new process, exec command
01128           ::execl(shell_path, "sh", "-c", command.c_str(), (char*)NULL);
01129 
01130           // can only reach this point if exec() failed
01131 
01132           // parent can get exit code from waitpid()
01133           ::_exit(errno);
01134           // using std::exit() would make static dtors run twice
01135 
01136         case -1 :
01137           // couldn't fork, error already handled in pstreambuf::fork()
01138           break;
01139 
01140         default :
01141           // this is the parent process
01142           // activate buffers
01143           create_buffers(mode);
01144           ret = this;
01145         }
01146       }
01147       return ret;
01148 #endif
01149     }
01150 
01159   inline void
01160   close_fd(pstreams::fd_type& fd)
01161   {
01162     if (fd >= 0 && ::close(fd) == 0)
01163       fd = -1;
01164   }
01165 
01176   template <int N>
01177     inline void
01178     close_fd_array(pstreams::fd_type (&fds)[N])
01179     {
01180       for (std::size_t i = 0; i < N; ++i)
01181         close_fd(fds[i]);
01182     }
01183 
01213   template <typename C, typename T>
01214     basic_pstreambuf<C,T>*
01215     basic_pstreambuf<C,T>::open( const std::string& file,
01216                                  const argv_type& argv,
01217                                  pmode mode )
01218     {
01219       basic_pstreambuf<C,T>* ret = NULL;
01220 
01221       if (!is_open())
01222       {
01223         // constants for read/write ends of pipe
01224         enum { RD, WR };
01225 
01226         // open another pipe and set close-on-exec
01227         fd_type ck_exec[] = { -1, -1 };
01228         if (-1 == ::pipe(ck_exec)
01229             || -1 == ::fcntl(ck_exec[RD], F_SETFD, FD_CLOEXEC)
01230             || -1 == ::fcntl(ck_exec[WR], F_SETFD, FD_CLOEXEC))
01231         {
01232           error_ = errno;
01233           close_fd_array(ck_exec);
01234         }
01235         else
01236         {
01237           switch(fork(mode))
01238           {
01239           case 0 :
01240             // this is the new process, exec command
01241             {
01242               char** arg_v = new char*[argv.size()+1];
01243               for (std::size_t i = 0; i < argv.size(); ++i)
01244               {
01245                 const std::string& src = argv[i];
01246                 char*& dest = arg_v[i];
01247                 dest = new char[src.size()+1];
01248                 dest[ src.copy(dest, src.size()) ] = '\0';
01249               }
01250               arg_v[argv.size()] = NULL;
01251 
01252               ::execvp(file.c_str(), arg_v);
01253 
01254               // can only reach this point if exec() failed
01255 
01256               // parent can get error code from ck_exec pipe
01257               error_ = errno;
01258 
01259               while (::write(ck_exec[WR], &error_, sizeof(error_)) == -1
01260                   && errno == EINTR)
01261               { }
01262 
01263               ::close(ck_exec[WR]);
01264               ::close(ck_exec[RD]);
01265 
01266               ::_exit(error_);
01267               // using std::exit() would make static dtors run twice
01268             }
01269 
01270           case -1 :
01271             // couldn't fork, error already handled in pstreambuf::fork()
01272             close_fd_array(ck_exec);
01273             break;
01274 
01275           default :
01276             // this is the parent process
01277 
01278             // check child called exec() successfully
01279             ::close(ck_exec[WR]);
01280             switch (::read(ck_exec[RD], &error_, sizeof(error_)))
01281             {
01282             case 0:
01283               // activate buffers
01284               create_buffers(mode);
01285               ret = this;
01286               break;
01287             case -1:
01288               error_ = errno;
01289               break;
01290             default:
01291               // error_ contains error code from child
01292               // call wait() to clean up and set ppid_ to 0
01293               this->wait();
01294               break;
01295             }
01296             ::close(ck_exec[RD]);
01297           }
01298         }
01299       }
01300       return ret;
01301     }
01302 
01319   template <typename C, typename T>
01320     pid_t
01321     basic_pstreambuf<C,T>::fork(pmode mode)
01322     {
01323       pid_t pid = -1;
01324 
01325       // Three pairs of file descriptors, for pipes connected to the
01326       // process' stdin, stdout and stderr
01327       // (stored in a single array so close_fd_array() can close all at once)
01328       fd_type fd[] = { -1, -1, -1, -1, -1, -1 };
01329       fd_type* const pin = fd;
01330       fd_type* const pout = fd+2;
01331       fd_type* const perr = fd+4;
01332 
01333       // constants for read/write ends of pipe
01334       enum { RD, WR };
01335 
01336       // N.B.
01337       // For the pstreambuf pin is an output stream and
01338       // pout and perr are input streams.
01339 
01340       if (!error_ && mode&pstdin && ::pipe(pin))
01341         error_ = errno;
01342 
01343       if (!error_ && mode&pstdout && ::pipe(pout))
01344         error_ = errno;
01345 
01346       if (!error_ && mode&pstderr && ::pipe(perr))
01347         error_ = errno;
01348 
01349       if (!error_)
01350       {
01351         pid = ::fork();
01352         switch (pid)
01353         {
01354           case 0 :
01355           {
01356             // this is the new process
01357 
01358             // for each open pipe close one end and redirect the
01359             // respective standard stream to the other end
01360 
01361             if (*pin >= 0)
01362             {
01363               ::close(pin[WR]);
01364               ::dup2(pin[RD], STDIN_FILENO);
01365               ::close(pin[RD]);
01366             }
01367             if (*pout >= 0)
01368             {
01369               ::close(pout[RD]);
01370               ::dup2(pout[WR], STDOUT_FILENO);
01371               ::close(pout[WR]);
01372             }
01373             if (*perr >= 0)
01374             {
01375               ::close(perr[RD]);
01376               ::dup2(perr[WR], STDERR_FILENO);
01377               ::close(perr[WR]);
01378             }
01379 
01380 #ifdef _POSIX_JOB_CONTROL
01381             if (mode&newpg)
01382               ::setpgid(0, 0); // Change to a new process group
01383 #endif
01384 
01385             break;
01386           }
01387           case -1 :
01388           {
01389             // couldn't fork for some reason
01390             error_ = errno;
01391             // close any open pipes
01392             close_fd_array(fd);
01393             break;
01394           }
01395           default :
01396           {
01397             // this is the parent process, store process' pid
01398             ppid_ = pid;
01399 
01400             // store one end of open pipes and close other end
01401             if (*pin >= 0)
01402             {
01403               wpipe_ = pin[WR];
01404               ::close(pin[RD]);
01405             }
01406             if (*pout >= 0)
01407             {
01408               rpipe_[rsrc_out] = pout[RD];
01409               ::close(pout[WR]);
01410             }
01411             if (*perr >= 0)
01412             {
01413               rpipe_[rsrc_err] = perr[RD];
01414               ::close(perr[WR]);
01415             }
01416           }
01417         }
01418       }
01419       else
01420       {
01421         // close any pipes we opened before failure
01422         close_fd_array(fd);
01423       }
01424       return pid;
01425     }
01426 
01436   template <typename C, typename T>
01437     basic_pstreambuf<C,T>*
01438     basic_pstreambuf<C,T>::close()
01439     {
01440       const bool running = is_open();
01441 
01442       sync(); // this might call wait() and reap the child process
01443 
01444       // rather than trying to work out whether or not we need to clean up
01445       // just do it anyway, all cleanup functions are safe to call twice.
01446 
01447       destroy_buffers(pstdin|pstdout|pstderr);
01448 
01449       // close pipes before wait() so child gets EOF/SIGPIPE
01450       close_fd(wpipe_);
01451       close_fd_array(rpipe_);
01452 
01453       do
01454       {
01455         error_ = 0;
01456       } while (wait() == -1 && error() == EINTR);
01457 
01458       return running ? this : NULL;
01459     }
01460 
01464   template <typename C, typename T>
01465     inline void
01466     basic_pstreambuf<C,T>::init_rbuffers()
01467     {
01468       rpipe_[rsrc_out] = rpipe_[rsrc_err] = -1;
01469       rbuffer_[rsrc_out] = rbuffer_[rsrc_err] = NULL;
01470       rbufstate_[0] = rbufstate_[1] = rbufstate_[2] = NULL;
01471     }
01472 
01473   template <typename C, typename T>
01474     void
01475     basic_pstreambuf<C,T>::create_buffers(pmode mode)
01476     {
01477       if (mode & pstdin)
01478       {
01479         delete[] wbuffer_;
01480         wbuffer_ = new char_type[bufsz];
01481         this->setp(wbuffer_, wbuffer_ + bufsz);
01482       }
01483       if (mode & pstdout)
01484       {
01485         delete[] rbuffer_[rsrc_out];
01486         rbuffer_[rsrc_out] = new char_type[bufsz];
01487         rsrc_ = rsrc_out;
01488         this->setg(rbuffer_[rsrc_out] + pbsz, rbuffer_[rsrc_out] + pbsz,
01489             rbuffer_[rsrc_out] + pbsz);
01490       }
01491       if (mode & pstderr)
01492       {
01493         delete[] rbuffer_[rsrc_err];
01494         rbuffer_[rsrc_err] = new char_type[bufsz];
01495         if (!(mode & pstdout))
01496         {
01497           rsrc_ = rsrc_err;
01498           this->setg(rbuffer_[rsrc_err] + pbsz, rbuffer_[rsrc_err] + pbsz,
01499               rbuffer_[rsrc_err] + pbsz);
01500         }
01501       }
01502     }
01503 
01504   template <typename C, typename T>
01505     void
01506     basic_pstreambuf<C,T>::destroy_buffers(pmode mode)
01507     {
01508       if (mode & pstdin)
01509       {
01510         this->setp(NULL, NULL);
01511         delete[] wbuffer_;
01512         wbuffer_ = NULL;
01513       }
01514       if (mode & pstdout)
01515       {
01516         if (rsrc_ == rsrc_out)
01517           this->setg(NULL, NULL, NULL);
01518         delete[] rbuffer_[rsrc_out];
01519         rbuffer_[rsrc_out] = NULL;
01520       }
01521       if (mode & pstderr)
01522       {
01523         if (rsrc_ == rsrc_err)
01524           this->setg(NULL, NULL, NULL);
01525         delete[] rbuffer_[rsrc_err];
01526         rbuffer_[rsrc_err] = NULL;
01527       }
01528     }
01529 
01530   template <typename C, typename T>
01531     typename basic_pstreambuf<C,T>::buf_read_src
01532     basic_pstreambuf<C,T>::switch_read_buffer(buf_read_src src)
01533     {
01534       if (rsrc_ != src)
01535       {
01536         char_type* tmpbufstate[] = {this->eback(), this->gptr(), this->egptr()};
01537         this->setg(rbufstate_[0], rbufstate_[1], rbufstate_[2]);
01538         for (std::size_t i = 0; i < 3; ++i)
01539           rbufstate_[i] = tmpbufstate[i];
01540         rsrc_ = src;
01541       }
01542       return rsrc_;
01543     }
01544 
01561   template <typename C, typename T>
01562     int
01563     basic_pstreambuf<C,T>::wait(bool nohang)
01564     {
01565       int exited = -1;
01566       if (is_open())
01567       {
01568         int status;
01569         switch(::waitpid(ppid_, &status, nohang ? WNOHANG : 0))
01570         {
01571           case 0 :
01572             // nohang was true and process has not exited
01573             exited = 0;
01574             break;
01575           case -1 :
01576             error_ = errno;
01577             break;
01578           default :
01579             // process has exited
01580             ppid_ = 0;
01581             status_ = status;
01582             exited = 1;
01583             // Close wpipe, would get SIGPIPE if we used it.
01584             destroy_buffers(pstdin);
01585             close_fd(wpipe_);
01586             // Must free read buffers and pipes on destruction
01587             // or next call to open()/close()
01588             break;
01589         }
01590       }
01591       return exited;
01592     }
01593 
01604   template <typename C, typename T>
01605     inline basic_pstreambuf<C,T>*
01606     basic_pstreambuf<C,T>::kill(int signal)
01607     {
01608       basic_pstreambuf<C,T>* ret = NULL;
01609       if (is_open())
01610       {
01611         if (::kill(ppid_, signal))
01612           error_ = errno;
01613         else
01614         {
01615 #if 0
01616           // TODO call exited() to check for exit and clean up? leave to user?
01617           if (signal==SIGTERM || signal==SIGKILL)
01618             this->exited();
01619 #endif
01620           ret = this;
01621         }
01622       }
01623       return ret;
01624     }
01625 
01639   template <typename C, typename T>
01640     inline basic_pstreambuf<C,T>*
01641     basic_pstreambuf<C,T>::killpg(int signal)
01642     {
01643       basic_pstreambuf<C,T>* ret = NULL;
01644 #ifdef _POSIX_JOB_CONTROL
01645       if (is_open())
01646       {
01647         pid_t pgid = ::getpgid(ppid_);
01648         if (pgid == -1)
01649           error_ = errno;
01650         else if (pgid == ::getpgrp())
01651           error_ = EPERM;  // Don't commit suicide
01652         else if (::killpg(pgid, signal))
01653           error_ = errno;
01654         else
01655           ret = this;
01656       }
01657 #else
01658       error_ = ENOTSUP;
01659 #endif
01660       return ret;
01661     }
01662 
01670   template <typename C, typename T>
01671     inline bool
01672     basic_pstreambuf<C,T>::exited()
01673     {
01674       return ppid_ == 0 || wait(true)==1;
01675     }
01676 
01677 
01683   template <typename C, typename T>
01684     inline int
01685     basic_pstreambuf<C,T>::status() const
01686     {
01687       return status_;
01688     }
01689 
01693   template <typename C, typename T>
01694     inline int
01695     basic_pstreambuf<C,T>::error() const
01696     {
01697       return error_;
01698     }
01699 
01704   template <typename C, typename T>
01705     inline void
01706     basic_pstreambuf<C,T>::peof()
01707     {
01708       sync();
01709       destroy_buffers(pstdin);
01710       close_fd(wpipe_);
01711     }
01712 
01723   template <typename C, typename T>
01724     inline bool
01725     basic_pstreambuf<C,T>::is_open() const
01726     {
01727       return ppid_ > 0;
01728     }
01729 
01738   template <typename C, typename T>
01739     inline bool
01740     basic_pstreambuf<C,T>::read_err(bool readerr)
01741     {
01742       buf_read_src src = readerr ? rsrc_err : rsrc_out;
01743       if (rpipe_[src]>=0)
01744       {
01745         switch_read_buffer(src);
01746         return true;
01747       }
01748       return false;
01749     }
01750 
01761   template <typename C, typename T>
01762     typename basic_pstreambuf<C,T>::int_type
01763     basic_pstreambuf<C,T>::overflow(int_type c)
01764     {
01765       if (!empty_buffer())
01766         return traits_type::eof();
01767       else if (!traits_type::eq_int_type(c, traits_type::eof()))
01768         return this->sputc(c);
01769       else
01770         return traits_type::not_eof(c);
01771     }
01772 
01773 
01774   template <typename C, typename T>
01775     int
01776     basic_pstreambuf<C,T>::sync()
01777     {
01778       return !exited() && empty_buffer() ? 0 : -1;
01779     }
01780 
01786   template <typename C, typename T>
01787     std::streamsize
01788     basic_pstreambuf<C,T>::xsputn(const char_type* s, std::streamsize n)
01789     {
01790       std::streamsize done = 0;
01791       while (done < n)
01792       {
01793         if (std::streamsize nbuf = this->epptr() - this->pptr())
01794         {
01795           nbuf = std::min(nbuf, n - done);
01796           traits_type::copy(this->pptr(), s + done, nbuf);
01797           this->pbump(nbuf);
01798           done += nbuf;
01799         }
01800         else if (!empty_buffer())
01801           break;
01802       }
01803       return done;
01804     }
01805 
01809   template <typename C, typename T>
01810     bool
01811     basic_pstreambuf<C,T>::empty_buffer()
01812     {
01813       const std::streamsize count = this->pptr() - this->pbase();
01814       if (count > 0)
01815       {
01816         const std::streamsize written = this->write(this->wbuffer_, count);
01817         if (written > 0)
01818         {
01819           if (const std::streamsize unwritten = count - written)
01820             traits_type::move(this->pbase(), this->pbase()+written, unwritten);
01821           this->pbump(-written);
01822           return true;
01823         }
01824       }
01825       return false;
01826     }
01827 
01835   template <typename C, typename T>
01836     typename basic_pstreambuf<C,T>::int_type
01837     basic_pstreambuf<C,T>::underflow()
01838     {
01839       if (this->gptr() < this->egptr() || fill_buffer())
01840         return traits_type::to_int_type(*this->gptr());
01841       else
01842         return traits_type::eof();
01843     }
01844 
01853   template <typename C, typename T>
01854     typename basic_pstreambuf<C,T>::int_type
01855     basic_pstreambuf<C,T>::pbackfail(int_type c)
01856     {
01857       if (this->gptr() != this->eback())
01858       {
01859         this->gbump(-1);
01860         if (!traits_type::eq_int_type(c, traits_type::eof()))
01861           *this->gptr() = traits_type::to_char_type(c);
01862         return traits_type::not_eof(c);
01863       }
01864       else
01865          return traits_type::eof();
01866     }
01867 
01868   template <typename C, typename T>
01869     std::streamsize
01870     basic_pstreambuf<C,T>::showmanyc()
01871     {
01872       int avail = 0;
01873       if (sizeof(char_type) == 1)
01874         avail = fill_buffer(true) ? this->egptr() - this->gptr() : -1;
01875 #ifdef FIONREAD
01876       else
01877       {
01878         if (::ioctl(rpipe(), FIONREAD, &avail) == -1)
01879           avail = -1;
01880         else if (avail)
01881           avail /= sizeof(char_type);
01882       }
01883 #endif
01884       return std::streamsize(avail);
01885     }
01886 
01890   template <typename C, typename T>
01891     bool
01892     basic_pstreambuf<C,T>::fill_buffer(bool non_blocking)
01893     {
01894       const std::streamsize pb1 = this->gptr() - this->eback();
01895       const std::streamsize pb2 = pbsz;
01896       const std::streamsize npb = std::min(pb1, pb2);
01897 
01898       char_type* const rbuf = rbuffer();
01899 
01900       traits_type::move(rbuf + pbsz - npb, this->gptr() - npb, npb);
01901 
01902       std::streamsize rc = -1;
01903 
01904       if (non_blocking)
01905       {
01906         const int flags = ::fcntl(rpipe(), F_GETFL);
01907         if (flags != -1)
01908         {
01909           const bool blocking = !(flags & O_NONBLOCK);
01910           if (blocking)
01911             ::fcntl(rpipe(), F_SETFL, flags | O_NONBLOCK);  // set non-blocking
01912 
01913           error_ = 0;
01914           rc = read(rbuf + pbsz, bufsz - pbsz);
01915 
01916           if (rc == -1 && error_ == EAGAIN)  // nothing available
01917             rc = 0;
01918           else if (rc == 0)  // EOF
01919             rc = -1;
01920 
01921           if (blocking)
01922             ::fcntl(rpipe(), F_SETFL, flags); // restore
01923         }
01924       }
01925       else
01926         rc = read(rbuf + pbsz, bufsz - pbsz);
01927 
01928       if (rc > 0 || (rc == 0 && non_blocking))
01929       {
01930         this->setg( rbuf + pbsz - npb,
01931                     rbuf + pbsz,
01932                     rbuf + pbsz + rc );
01933         return true;
01934       }
01935       else
01936       {
01937         this->setg(NULL, NULL, NULL);
01938         return false;
01939       }
01940     }
01941 
01949   template <typename C, typename T>
01950     inline std::streamsize
01951     basic_pstreambuf<C,T>::write(const char_type* s, std::streamsize n)
01952     {
01953       std::streamsize nwritten = 0;
01954       if (wpipe() >= 0)
01955       {
01956         nwritten = ::write(wpipe(), s, n * sizeof(char_type));
01957         if (nwritten == -1)
01958           error_ = errno;
01959         else
01960           nwritten /= sizeof(char_type);
01961       }
01962       return nwritten;
01963     }
01964 
01972   template <typename C, typename T>
01973     inline std::streamsize
01974     basic_pstreambuf<C,T>::read(char_type* s, std::streamsize n)
01975     {
01976       std::streamsize nread = 0;
01977       if (rpipe() >= 0)
01978       {
01979         nread = ::read(rpipe(), s, n * sizeof(char_type));
01980         if (nread == -1)
01981           error_ = errno;
01982         else
01983           nread /= sizeof(char_type);
01984       }
01985       return nread;
01986     }
01987 
01989   template <typename C, typename T>
01990     inline pstreams::fd_type&
01991     basic_pstreambuf<C,T>::wpipe()
01992     {
01993       return wpipe_;
01994     }
01995 
01997   template <typename C, typename T>
01998     inline pstreams::fd_type&
01999     basic_pstreambuf<C,T>::rpipe()
02000     {
02001       return rpipe_[rsrc_];
02002     }
02003 
02005   template <typename C, typename T>
02006     inline pstreams::fd_type&
02007     basic_pstreambuf<C,T>::rpipe(buf_read_src which)
02008     {
02009       return rpipe_[which];
02010     }
02011 
02013   template <typename C, typename T>
02014     inline typename basic_pstreambuf<C,T>::char_type*
02015     basic_pstreambuf<C,T>::rbuffer()
02016     {
02017       return rbuffer_[rsrc_];
02018     }
02019 
02020 
02021   /*
02022    * member definitions for pstream_common
02023    */
02024 
02034   template <typename C, typename T>
02035     inline
02036     pstream_common<C,T>::pstream_common()
02037     : std::basic_ios<C,T>(NULL)
02038     , command_()
02039     , buf_()
02040     {
02041       this->std::basic_ios<C,T>::rdbuf(&buf_);
02042     }
02043 
02052   template <typename C, typename T>
02053     inline
02054     pstream_common<C,T>::pstream_common(const std::string& command, pmode mode)
02055     : std::basic_ios<C,T>(NULL)
02056     , command_(command)
02057     , buf_()
02058     {
02059       this->std::basic_ios<C,T>::rdbuf(&buf_);
02060       do_open(command, mode);
02061     }
02062 
02072   template <typename C, typename T>
02073     inline
02074     pstream_common<C,T>::pstream_common( const std::string& file,
02075                                          const argv_type& argv,
02076                                          pmode mode )
02077     : std::basic_ios<C,T>(NULL)
02078     , command_(file)
02079     , buf_()
02080     {
02081       this->std::basic_ios<C,T>::rdbuf(&buf_);
02082       do_open(file, argv, mode);
02083     }
02084 
02094   template <typename C, typename T>
02095     inline
02096     pstream_common<C,T>::~pstream_common()
02097     {
02098     }
02099 
02108   template <typename C, typename T>
02109     inline void
02110     pstream_common<C,T>::do_open(const std::string& command, pmode mode)
02111     {
02112       if (!buf_.open((command_=command), mode))
02113         this->setstate(std::ios_base::failbit);
02114     }
02115 
02125   template <typename C, typename T>
02126     inline void
02127     pstream_common<C,T>::do_open( const std::string& file,
02128                                   const argv_type& argv,
02129                                   pmode mode )
02130     {
02131       if (!buf_.open((command_=file), argv, mode))
02132         this->setstate(std::ios_base::failbit);
02133     }
02134 
02136   template <typename C, typename T>
02137     inline void
02138     pstream_common<C,T>::close()
02139     {
02140       if (!buf_.close())
02141         this->setstate(std::ios_base::failbit);
02142     }
02143 
02148   template <typename C, typename T>
02149     inline bool
02150     pstream_common<C,T>::is_open() const
02151     {
02152       return buf_.is_open();
02153     }
02154 
02156   template <typename C, typename T>
02157     inline const std::string&
02158     pstream_common<C,T>::command() const
02159     {
02160       return command_;
02161     }
02162 
02164   // TODO  document behaviour if buffer replaced.
02165   template <typename C, typename T>
02166     inline typename pstream_common<C,T>::streambuf_type*
02167     pstream_common<C,T>::rdbuf() const
02168     {
02169       return const_cast<streambuf_type*>(&buf_);
02170     }
02171 
02172 
02173 #if REDI_EVISCERATE_PSTREAMS
02174 
02206   template <typename C, typename T>
02207     std::size_t
02208     basic_pstreambuf<C,T>::fopen(FILE*& in, FILE*& out, FILE*& err)
02209     {
02210       in = out = err = NULL;
02211       std::size_t open_files = 0;
02212       if (wpipe() > -1)
02213       {
02214         if ((in = ::fdopen(wpipe(), "w")))
02215         {
02216             open_files |= pstdin;
02217         }
02218       }
02219       if (rpipe(rsrc_out) > -1)
02220       {
02221         if ((out = ::fdopen(rpipe(rsrc_out), "r")))
02222         {
02223             open_files |= pstdout;
02224         }
02225       }
02226       if (rpipe(rsrc_err) > -1)
02227       {
02228         if ((err = ::fdopen(rpipe(rsrc_err), "r")))
02229         {
02230             open_files |= pstderr;
02231         }
02232       }
02233       return open_files;
02234     }
02235 
02246   template <typename C, typename T>
02247     inline std::size_t
02248     pstream_common<C,T>::fopen(FILE*& fin, FILE*& fout, FILE*& ferr)
02249     {
02250       return buf_.fopen(fin, fout, ferr);
02251     }
02252 
02253 #endif // REDI_EVISCERATE_PSTREAMS
02254 
02255 
02256 } // namespace redi
02257 
02263 #endif  // REDI_PSTREAM_H_SEEN
02264 
02265 // vim: ts=2 sw=2 expandtab
02266