PStreams
|
00001 /* 00002 PStreams - POSIX Process I/O for C++ 00003 Copyright (C) 2001-2014 Jonathan Wakely 00004 00005 This file is part of PStreams. 00006 00007 PStreams is free software; you can redistribute it and/or modify 00008 it under the terms of the GNU Lesser General Public License as published by 00009 the Free Software Foundation; either version 3 of the License, or 00010 (at your option) any later version. 00011 00012 PStreams is distributed in the hope that it will be useful, 00013 but WITHOUT ANY WARRANTY; without even the implied warranty of 00014 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00015 GNU Lesser General Public License for more details. 00016 00017 You should have received a copy of the GNU Lesser General Public License 00018 along with this program. If not, see <http://www.gnu.org/licenses/>. 00019 */ 00020 00030 #ifndef REDI_PSTREAM_H_SEEN 00031 #define REDI_PSTREAM_H_SEEN 00032 00033 #include <ios> 00034 #include <streambuf> 00035 #include <istream> 00036 #include <ostream> 00037 #include <string> 00038 #include <vector> 00039 #include <algorithm> // for min() 00040 #include <cerrno> // for errno 00041 #include <cstddef> // for size_t, NULL 00042 #include <cstdlib> // for exit() 00043 #include <sys/types.h> // for pid_t 00044 #include <sys/wait.h> // for waitpid() 00045 #include <sys/ioctl.h> // for ioctl() and FIONREAD 00046 #if defined(__sun) 00047 # include <sys/filio.h> // for FIONREAD on Solaris 2.5 00048 #endif 00049 #include <unistd.h> // for pipe() fork() exec() and filedes functions 00050 #include <signal.h> // for kill() 00051 #include <fcntl.h> // for fcntl() 00052 #if REDI_EVISCERATE_PSTREAMS 00053 # include <stdio.h> // for FILE, fdopen() 00054 #endif 00055 00056 00058 #define PSTREAMS_VERSION 0x0081 // 0.8.1 00059 00073 namespace redi 00074 { 00076 struct pstreams 00077 { 00079 typedef std::ios_base::openmode pmode; 00080 00082 typedef std::vector<std::string> argv_type; 00083 00085 typedef int fd_type; 00086 00087 static const pmode pstdin = std::ios_base::out; 00088 static const pmode pstdout = std::ios_base::in; 00089 static const pmode pstderr = std::ios_base::app; 00090 00092 static const pmode newpg = std::ios_base::trunc; 00093 00094 protected: 00095 enum { bufsz = 32 }; 00096 enum { pbsz = 2 }; 00097 }; 00098 00100 template <typename CharT, typename Traits = std::char_traits<CharT> > 00101 class basic_pstreambuf 00102 : public std::basic_streambuf<CharT, Traits> 00103 , public pstreams 00104 { 00105 public: 00106 // Type definitions for dependent types 00107 typedef CharT char_type; 00108 typedef Traits traits_type; 00109 typedef typename traits_type::int_type int_type; 00110 typedef typename traits_type::off_type off_type; 00111 typedef typename traits_type::pos_type pos_type; 00113 typedef fd_type fd_t; 00114 00116 basic_pstreambuf(); 00117 00119 basic_pstreambuf(const std::string& command, pmode mode); 00120 00122 basic_pstreambuf( const std::string& file, 00123 const argv_type& argv, 00124 pmode mode ); 00125 00127 ~basic_pstreambuf(); 00128 00130 basic_pstreambuf* 00131 open(const std::string& command, pmode mode); 00132 00134 basic_pstreambuf* 00135 open(const std::string& file, const argv_type& argv, pmode mode); 00136 00138 basic_pstreambuf* 00139 close(); 00140 00142 basic_pstreambuf* 00143 kill(int signal = SIGTERM); 00144 00146 basic_pstreambuf* 00147 killpg(int signal = SIGTERM); 00148 00150 void 00151 peof(); 00152 00154 bool 00155 read_err(bool readerr = true); 00156 00158 bool 00159 is_open() const; 00160 00162 bool 00163 exited(); 00164 00165 #if REDI_EVISCERATE_PSTREAMS 00166 00167 std::size_t 00168 fopen(FILE*& in, FILE*& out, FILE*& err); 00169 #endif 00170 00172 int 00173 status() const; 00174 00176 int 00177 error() const; 00178 00179 protected: 00181 int_type 00182 overflow(int_type c); 00183 00185 int_type 00186 underflow(); 00187 00189 int_type 00190 pbackfail(int_type c = traits_type::eof()); 00191 00193 int 00194 sync(); 00195 00197 std::streamsize 00198 xsputn(const char_type* s, std::streamsize n); 00199 00201 std::streamsize 00202 write(const char_type* s, std::streamsize n); 00203 00205 std::streamsize 00206 read(char_type* s, std::streamsize n); 00207 00209 std::streamsize 00210 showmanyc(); 00211 00212 protected: 00214 enum buf_read_src { rsrc_out = 0, rsrc_err = 1 }; 00215 00217 pid_t 00218 fork(pmode mode); 00219 00221 int 00222 wait(bool nohang = false); 00223 00225 fd_type& 00226 wpipe(); 00227 00229 fd_type& 00230 rpipe(); 00231 00233 fd_type& 00234 rpipe(buf_read_src which); 00235 00236 void 00237 create_buffers(pmode mode); 00238 00239 void 00240 destroy_buffers(pmode mode); 00241 00243 bool 00244 empty_buffer(); 00245 00246 bool 00247 fill_buffer(bool non_blocking = false); 00248 00250 char_type* 00251 rbuffer(); 00252 00253 buf_read_src 00254 switch_read_buffer(buf_read_src); 00255 00256 private: 00257 basic_pstreambuf(const basic_pstreambuf&); 00258 basic_pstreambuf& operator=(const basic_pstreambuf&); 00259 00260 void 00261 init_rbuffers(); 00262 00263 pid_t ppid_; // pid of process 00264 fd_type wpipe_; // pipe used to write to process' stdin 00265 fd_type rpipe_[2]; // two pipes to read from, stdout and stderr 00266 char_type* wbuffer_; 00267 char_type* rbuffer_[2]; 00268 char_type* rbufstate_[3]; 00270 buf_read_src rsrc_; 00271 int status_; // hold exit status of child process 00272 int error_; // hold errno if fork() or exec() fails 00273 }; 00274 00276 template <typename CharT, typename Traits = std::char_traits<CharT> > 00277 class pstream_common 00278 : virtual public std::basic_ios<CharT, Traits> 00279 , virtual public pstreams 00280 { 00281 protected: 00282 typedef basic_pstreambuf<CharT, Traits> streambuf_type; 00283 00284 typedef pstreams::pmode pmode; 00285 typedef pstreams::argv_type argv_type; 00286 00288 pstream_common(); 00289 00291 pstream_common(const std::string& command, pmode mode); 00292 00294 pstream_common(const std::string& file, const argv_type& argv, pmode mode); 00295 00297 virtual 00298 ~pstream_common() = 0; 00299 00301 void 00302 do_open(const std::string& command, pmode mode); 00303 00305 void 00306 do_open(const std::string& file, const argv_type& argv, pmode mode); 00307 00308 public: 00310 void 00311 close(); 00312 00314 bool 00315 is_open() const; 00316 00318 const std::string& 00319 command() const; 00320 00322 streambuf_type* 00323 rdbuf() const; 00324 00325 #if REDI_EVISCERATE_PSTREAMS 00326 00327 std::size_t 00328 fopen(FILE*& in, FILE*& out, FILE*& err); 00329 #endif 00330 00331 protected: 00332 std::string command_; 00333 streambuf_type buf_; 00334 }; 00335 00336 00347 template <typename CharT, typename Traits = std::char_traits<CharT> > 00348 class basic_ipstream 00349 : public std::basic_istream<CharT, Traits> 00350 , public pstream_common<CharT, Traits> 00351 , virtual public pstreams 00352 { 00353 typedef std::basic_istream<CharT, Traits> istream_type; 00354 typedef pstream_common<CharT, Traits> pbase_type; 00355 00356 using pbase_type::buf_; // declare name in this scope 00357 00358 // Ensure a basic_ipstream will read from at least one pipe 00359 pmode readable(pmode mode) 00360 { 00361 if (!(mode & (pstdout|pstderr))) 00362 mode |= pstdout; 00363 return mode; 00364 } 00365 00366 public: 00368 typedef typename pbase_type::pmode pmode; 00369 00371 typedef typename pbase_type::argv_type argv_type; 00372 00374 basic_ipstream() 00375 : istream_type(NULL), pbase_type() 00376 { } 00377 00388 explicit 00389 basic_ipstream(const std::string& command, pmode mode = pstdout) 00390 : istream_type(NULL), pbase_type(command, readable(mode)) 00391 { } 00392 00404 basic_ipstream( const std::string& file, 00405 const argv_type& argv, 00406 pmode mode = pstdout ) 00407 : istream_type(NULL), pbase_type(file, argv, readable(mode)) 00408 { } 00409 00420 explicit 00421 basic_ipstream(const argv_type& argv, pmode mode = pstdout) 00422 : istream_type(NULL), pbase_type(argv.at(0), argv, readable(mode)) 00423 { } 00424 00425 #if __cplusplus >= 201103L 00426 template<typename T> 00427 explicit 00428 basic_ipstream(std::initializer_list<T> args, pmode mode = pstdout) 00429 : basic_ipstream(argv_type(args.begin(), args.end()), mode) 00430 { } 00431 #endif 00432 00438 ~basic_ipstream() 00439 { } 00440 00450 void 00451 open(const std::string& command, pmode mode = pstdout) 00452 { 00453 this->do_open(command, readable(mode)); 00454 } 00455 00466 void 00467 open( const std::string& file, 00468 const argv_type& argv, 00469 pmode mode = pstdout ) 00470 { 00471 this->do_open(file, argv, readable(mode)); 00472 } 00473 00478 basic_ipstream& 00479 out() 00480 { 00481 this->buf_.read_err(false); 00482 return *this; 00483 } 00484 00489 basic_ipstream& 00490 err() 00491 { 00492 this->buf_.read_err(true); 00493 return *this; 00494 } 00495 }; 00496 00497 00507 template <typename CharT, typename Traits = std::char_traits<CharT> > 00508 class basic_opstream 00509 : public std::basic_ostream<CharT, Traits> 00510 , public pstream_common<CharT, Traits> 00511 , virtual public pstreams 00512 { 00513 typedef std::basic_ostream<CharT, Traits> ostream_type; 00514 typedef pstream_common<CharT, Traits> pbase_type; 00515 00516 using pbase_type::buf_; // declare name in this scope 00517 00518 public: 00520 typedef typename pbase_type::pmode pmode; 00521 00523 typedef typename pbase_type::argv_type argv_type; 00524 00526 basic_opstream() 00527 : ostream_type(NULL), pbase_type() 00528 { } 00529 00540 explicit 00541 basic_opstream(const std::string& command, pmode mode = pstdin) 00542 : ostream_type(NULL), pbase_type(command, mode|pstdin) 00543 { } 00544 00556 basic_opstream( const std::string& file, 00557 const argv_type& argv, 00558 pmode mode = pstdin ) 00559 : ostream_type(NULL), pbase_type(file, argv, mode|pstdin) 00560 { } 00561 00572 explicit 00573 basic_opstream(const argv_type& argv, pmode mode = pstdin) 00574 : ostream_type(NULL), pbase_type(argv.at(0), argv, mode|pstdin) 00575 { } 00576 00577 #if __cplusplus >= 201103L 00578 00585 template<typename T> 00586 explicit 00587 basic_opstream(std::initializer_list<T> args, pmode mode = pstdin) 00588 : basic_opstream(argv_type(args.begin(), args.end()), mode) 00589 { } 00590 #endif 00591 00597 ~basic_opstream() { } 00598 00608 void 00609 open(const std::string& command, pmode mode = pstdin) 00610 { 00611 this->do_open(command, mode|pstdin); 00612 } 00613 00624 void 00625 open( const std::string& file, 00626 const argv_type& argv, 00627 pmode mode = pstdin) 00628 { 00629 this->do_open(file, argv, mode|pstdin); 00630 } 00631 }; 00632 00633 00647 template <typename CharT, typename Traits = std::char_traits<CharT> > 00648 class basic_pstream 00649 : public std::basic_iostream<CharT, Traits> 00650 , public pstream_common<CharT, Traits> 00651 , virtual public pstreams 00652 { 00653 typedef std::basic_iostream<CharT, Traits> iostream_type; 00654 typedef pstream_common<CharT, Traits> pbase_type; 00655 00656 using pbase_type::buf_; // declare name in this scope 00657 00658 public: 00660 typedef typename pbase_type::pmode pmode; 00661 00663 typedef typename pbase_type::argv_type argv_type; 00664 00666 basic_pstream() 00667 : iostream_type(NULL), pbase_type() 00668 { } 00669 00680 explicit 00681 basic_pstream(const std::string& command, pmode mode = pstdout|pstdin) 00682 : iostream_type(NULL), pbase_type(command, mode) 00683 { } 00684 00696 basic_pstream( const std::string& file, 00697 const argv_type& argv, 00698 pmode mode = pstdout|pstdin ) 00699 : iostream_type(NULL), pbase_type(file, argv, mode) 00700 { } 00701 00712 explicit 00713 basic_pstream(const argv_type& argv, pmode mode = pstdout|pstdin) 00714 : iostream_type(NULL), pbase_type(argv.at(0), argv, mode) 00715 { } 00716 00717 #if __cplusplus >= 201103L 00718 00725 template<typename T> 00726 explicit 00727 basic_pstream(std::initializer_list<T> l, pmode mode = pstdout|pstdin) 00728 : basic_pstream(argv_type(l.begin(), l.end()), mode) 00729 { } 00730 #endif 00731 00737 ~basic_pstream() { } 00738 00748 void 00749 open(const std::string& command, pmode mode = pstdout|pstdin) 00750 { 00751 this->do_open(command, mode); 00752 } 00753 00764 void 00765 open( const std::string& file, 00766 const argv_type& argv, 00767 pmode mode = pstdout|pstdin ) 00768 { 00769 this->do_open(file, argv, mode); 00770 } 00771 00776 basic_pstream& 00777 out() 00778 { 00779 this->buf_.read_err(false); 00780 return *this; 00781 } 00782 00787 basic_pstream& 00788 err() 00789 { 00790 this->buf_.read_err(true); 00791 return *this; 00792 } 00793 }; 00794 00795 00817 template <typename CharT, typename Traits = std::char_traits<CharT> > 00818 class basic_rpstream 00819 : public std::basic_ostream<CharT, Traits> 00820 , private std::basic_istream<CharT, Traits> 00821 , private pstream_common<CharT, Traits> 00822 , virtual public pstreams 00823 { 00824 typedef std::basic_ostream<CharT, Traits> ostream_type; 00825 typedef std::basic_istream<CharT, Traits> istream_type; 00826 typedef pstream_common<CharT, Traits> pbase_type; 00827 00828 using pbase_type::buf_; // declare name in this scope 00829 00830 public: 00832 typedef typename pbase_type::pmode pmode; 00833 00835 typedef typename pbase_type::argv_type argv_type; 00836 00838 basic_rpstream() 00839 : ostream_type(NULL), istream_type(NULL), pbase_type() 00840 { } 00841 00852 explicit 00853 basic_rpstream(const std::string& command, pmode mode = pstdout|pstdin) 00854 : ostream_type(NULL) , istream_type(NULL) , pbase_type(command, mode) 00855 { } 00856 00868 basic_rpstream( const std::string& file, 00869 const argv_type& argv, 00870 pmode mode = pstdout|pstdin ) 00871 : ostream_type(NULL), istream_type(NULL), pbase_type(file, argv, mode) 00872 { } 00873 00884 explicit 00885 basic_rpstream(const argv_type& argv, pmode mode = pstdout|pstdin) 00886 : ostream_type(NULL), istream_type(NULL), 00887 pbase_type(argv.at(0), argv, mode) 00888 { } 00889 00890 #if __cplusplus >= 201103L 00891 00898 template<typename T> 00899 explicit 00900 basic_rpstream(std::initializer_list<T> l, pmode mode = pstdout|pstdin) 00901 : basic_rpstream(argv_type(l.begin(), l.end()), mode) 00902 { } 00903 #endif 00904 00906 ~basic_rpstream() { } 00907 00917 void 00918 open(const std::string& command, pmode mode = pstdout|pstdin) 00919 { 00920 this->do_open(command, mode); 00921 } 00922 00933 void 00934 open( const std::string& file, 00935 const argv_type& argv, 00936 pmode mode = pstdout|pstdin ) 00937 { 00938 this->do_open(file, argv, mode); 00939 } 00940 00946 istream_type& 00947 out() 00948 { 00949 this->buf_.read_err(false); 00950 return *this; 00951 } 00952 00958 istream_type& 00959 err() 00960 { 00961 this->buf_.read_err(true); 00962 return *this; 00963 } 00964 }; 00965 00966 00968 typedef basic_pstreambuf<char> pstreambuf; 00970 typedef basic_ipstream<char> ipstream; 00972 typedef basic_opstream<char> opstream; 00974 typedef basic_pstream<char> pstream; 00976 typedef basic_rpstream<char> rpstream; 00977 00978 00991 template <typename C, typename T> 00992 inline std::basic_ostream<C,T>& 00993 peof(std::basic_ostream<C,T>& s) 00994 { 00995 typedef basic_pstreambuf<C,T> pstreambuf; 00996 if (pstreambuf* p = dynamic_cast<pstreambuf*>(s.rdbuf())) 00997 p->peof(); 00998 return s; 00999 } 01000 01001 01002 /* 01003 * member definitions for pstreambuf 01004 */ 01005 01006 01013 template <typename C, typename T> 01014 inline 01015 basic_pstreambuf<C,T>::basic_pstreambuf() 01016 : ppid_(-1) // initialise to -1 to indicate no process run yet. 01017 , wpipe_(-1) 01018 , wbuffer_(NULL) 01019 , rsrc_(rsrc_out) 01020 , status_(-1) 01021 , error_(0) 01022 { 01023 init_rbuffers(); 01024 } 01025 01034 template <typename C, typename T> 01035 inline 01036 basic_pstreambuf<C,T>::basic_pstreambuf(const std::string& command, pmode mode) 01037 : ppid_(-1) // initialise to -1 to indicate no process run yet. 01038 , wpipe_(-1) 01039 , wbuffer_(NULL) 01040 , rsrc_(rsrc_out) 01041 , status_(-1) 01042 , error_(0) 01043 { 01044 init_rbuffers(); 01045 open(command, mode); 01046 } 01047 01057 template <typename C, typename T> 01058 inline 01059 basic_pstreambuf<C,T>::basic_pstreambuf( const std::string& file, 01060 const argv_type& argv, 01061 pmode mode ) 01062 : ppid_(-1) // initialise to -1 to indicate no process run yet. 01063 , wpipe_(-1) 01064 , wbuffer_(NULL) 01065 , rsrc_(rsrc_out) 01066 , status_(-1) 01067 , error_(0) 01068 { 01069 init_rbuffers(); 01070 open(file, argv, mode); 01071 } 01072 01077 template <typename C, typename T> 01078 inline 01079 basic_pstreambuf<C,T>::~basic_pstreambuf() 01080 { 01081 close(); 01082 } 01083 01111 template <typename C, typename T> 01112 basic_pstreambuf<C,T>* 01113 basic_pstreambuf<C,T>::open(const std::string& command, pmode mode) 01114 { 01115 const char * shell_path = "/bin/sh"; 01116 #if 0 01117 const std::string argv[] = { "sh", "-c", command }; 01118 return this->open(shell_path, argv_type(argv, argv+3), mode); 01119 #else 01120 basic_pstreambuf<C,T>* ret = NULL; 01121 01122 if (!is_open()) 01123 { 01124 switch(fork(mode)) 01125 { 01126 case 0 : 01127 // this is the new process, exec command 01128 ::execl(shell_path, "sh", "-c", command.c_str(), (char*)NULL); 01129 01130 // can only reach this point if exec() failed 01131 01132 // parent can get exit code from waitpid() 01133 ::_exit(errno); 01134 // using std::exit() would make static dtors run twice 01135 01136 case -1 : 01137 // couldn't fork, error already handled in pstreambuf::fork() 01138 break; 01139 01140 default : 01141 // this is the parent process 01142 // activate buffers 01143 create_buffers(mode); 01144 ret = this; 01145 } 01146 } 01147 return ret; 01148 #endif 01149 } 01150 01159 inline void 01160 close_fd(pstreams::fd_type& fd) 01161 { 01162 if (fd >= 0 && ::close(fd) == 0) 01163 fd = -1; 01164 } 01165 01176 template <int N> 01177 inline void 01178 close_fd_array(pstreams::fd_type (&fds)[N]) 01179 { 01180 for (std::size_t i = 0; i < N; ++i) 01181 close_fd(fds[i]); 01182 } 01183 01213 template <typename C, typename T> 01214 basic_pstreambuf<C,T>* 01215 basic_pstreambuf<C,T>::open( const std::string& file, 01216 const argv_type& argv, 01217 pmode mode ) 01218 { 01219 basic_pstreambuf<C,T>* ret = NULL; 01220 01221 if (!is_open()) 01222 { 01223 // constants for read/write ends of pipe 01224 enum { RD, WR }; 01225 01226 // open another pipe and set close-on-exec 01227 fd_type ck_exec[] = { -1, -1 }; 01228 if (-1 == ::pipe(ck_exec) 01229 || -1 == ::fcntl(ck_exec[RD], F_SETFD, FD_CLOEXEC) 01230 || -1 == ::fcntl(ck_exec[WR], F_SETFD, FD_CLOEXEC)) 01231 { 01232 error_ = errno; 01233 close_fd_array(ck_exec); 01234 } 01235 else 01236 { 01237 switch(fork(mode)) 01238 { 01239 case 0 : 01240 // this is the new process, exec command 01241 { 01242 char** arg_v = new char*[argv.size()+1]; 01243 for (std::size_t i = 0; i < argv.size(); ++i) 01244 { 01245 const std::string& src = argv[i]; 01246 char*& dest = arg_v[i]; 01247 dest = new char[src.size()+1]; 01248 dest[ src.copy(dest, src.size()) ] = '\0'; 01249 } 01250 arg_v[argv.size()] = NULL; 01251 01252 ::execvp(file.c_str(), arg_v); 01253 01254 // can only reach this point if exec() failed 01255 01256 // parent can get error code from ck_exec pipe 01257 error_ = errno; 01258 01259 while (::write(ck_exec[WR], &error_, sizeof(error_)) == -1 01260 && errno == EINTR) 01261 { } 01262 01263 ::close(ck_exec[WR]); 01264 ::close(ck_exec[RD]); 01265 01266 ::_exit(error_); 01267 // using std::exit() would make static dtors run twice 01268 } 01269 01270 case -1 : 01271 // couldn't fork, error already handled in pstreambuf::fork() 01272 close_fd_array(ck_exec); 01273 break; 01274 01275 default : 01276 // this is the parent process 01277 01278 // check child called exec() successfully 01279 ::close(ck_exec[WR]); 01280 switch (::read(ck_exec[RD], &error_, sizeof(error_))) 01281 { 01282 case 0: 01283 // activate buffers 01284 create_buffers(mode); 01285 ret = this; 01286 break; 01287 case -1: 01288 error_ = errno; 01289 break; 01290 default: 01291 // error_ contains error code from child 01292 // call wait() to clean up and set ppid_ to 0 01293 this->wait(); 01294 break; 01295 } 01296 ::close(ck_exec[RD]); 01297 } 01298 } 01299 } 01300 return ret; 01301 } 01302 01319 template <typename C, typename T> 01320 pid_t 01321 basic_pstreambuf<C,T>::fork(pmode mode) 01322 { 01323 pid_t pid = -1; 01324 01325 // Three pairs of file descriptors, for pipes connected to the 01326 // process' stdin, stdout and stderr 01327 // (stored in a single array so close_fd_array() can close all at once) 01328 fd_type fd[] = { -1, -1, -1, -1, -1, -1 }; 01329 fd_type* const pin = fd; 01330 fd_type* const pout = fd+2; 01331 fd_type* const perr = fd+4; 01332 01333 // constants for read/write ends of pipe 01334 enum { RD, WR }; 01335 01336 // N.B. 01337 // For the pstreambuf pin is an output stream and 01338 // pout and perr are input streams. 01339 01340 if (!error_ && mode&pstdin && ::pipe(pin)) 01341 error_ = errno; 01342 01343 if (!error_ && mode&pstdout && ::pipe(pout)) 01344 error_ = errno; 01345 01346 if (!error_ && mode&pstderr && ::pipe(perr)) 01347 error_ = errno; 01348 01349 if (!error_) 01350 { 01351 pid = ::fork(); 01352 switch (pid) 01353 { 01354 case 0 : 01355 { 01356 // this is the new process 01357 01358 // for each open pipe close one end and redirect the 01359 // respective standard stream to the other end 01360 01361 if (*pin >= 0) 01362 { 01363 ::close(pin[WR]); 01364 ::dup2(pin[RD], STDIN_FILENO); 01365 ::close(pin[RD]); 01366 } 01367 if (*pout >= 0) 01368 { 01369 ::close(pout[RD]); 01370 ::dup2(pout[WR], STDOUT_FILENO); 01371 ::close(pout[WR]); 01372 } 01373 if (*perr >= 0) 01374 { 01375 ::close(perr[RD]); 01376 ::dup2(perr[WR], STDERR_FILENO); 01377 ::close(perr[WR]); 01378 } 01379 01380 #ifdef _POSIX_JOB_CONTROL 01381 if (mode&newpg) 01382 ::setpgid(0, 0); // Change to a new process group 01383 #endif 01384 01385 break; 01386 } 01387 case -1 : 01388 { 01389 // couldn't fork for some reason 01390 error_ = errno; 01391 // close any open pipes 01392 close_fd_array(fd); 01393 break; 01394 } 01395 default : 01396 { 01397 // this is the parent process, store process' pid 01398 ppid_ = pid; 01399 01400 // store one end of open pipes and close other end 01401 if (*pin >= 0) 01402 { 01403 wpipe_ = pin[WR]; 01404 ::close(pin[RD]); 01405 } 01406 if (*pout >= 0) 01407 { 01408 rpipe_[rsrc_out] = pout[RD]; 01409 ::close(pout[WR]); 01410 } 01411 if (*perr >= 0) 01412 { 01413 rpipe_[rsrc_err] = perr[RD]; 01414 ::close(perr[WR]); 01415 } 01416 } 01417 } 01418 } 01419 else 01420 { 01421 // close any pipes we opened before failure 01422 close_fd_array(fd); 01423 } 01424 return pid; 01425 } 01426 01436 template <typename C, typename T> 01437 basic_pstreambuf<C,T>* 01438 basic_pstreambuf<C,T>::close() 01439 { 01440 const bool running = is_open(); 01441 01442 sync(); // this might call wait() and reap the child process 01443 01444 // rather than trying to work out whether or not we need to clean up 01445 // just do it anyway, all cleanup functions are safe to call twice. 01446 01447 destroy_buffers(pstdin|pstdout|pstderr); 01448 01449 // close pipes before wait() so child gets EOF/SIGPIPE 01450 close_fd(wpipe_); 01451 close_fd_array(rpipe_); 01452 01453 do 01454 { 01455 error_ = 0; 01456 } while (wait() == -1 && error() == EINTR); 01457 01458 return running ? this : NULL; 01459 } 01460 01464 template <typename C, typename T> 01465 inline void 01466 basic_pstreambuf<C,T>::init_rbuffers() 01467 { 01468 rpipe_[rsrc_out] = rpipe_[rsrc_err] = -1; 01469 rbuffer_[rsrc_out] = rbuffer_[rsrc_err] = NULL; 01470 rbufstate_[0] = rbufstate_[1] = rbufstate_[2] = NULL; 01471 } 01472 01473 template <typename C, typename T> 01474 void 01475 basic_pstreambuf<C,T>::create_buffers(pmode mode) 01476 { 01477 if (mode & pstdin) 01478 { 01479 delete[] wbuffer_; 01480 wbuffer_ = new char_type[bufsz]; 01481 this->setp(wbuffer_, wbuffer_ + bufsz); 01482 } 01483 if (mode & pstdout) 01484 { 01485 delete[] rbuffer_[rsrc_out]; 01486 rbuffer_[rsrc_out] = new char_type[bufsz]; 01487 rsrc_ = rsrc_out; 01488 this->setg(rbuffer_[rsrc_out] + pbsz, rbuffer_[rsrc_out] + pbsz, 01489 rbuffer_[rsrc_out] + pbsz); 01490 } 01491 if (mode & pstderr) 01492 { 01493 delete[] rbuffer_[rsrc_err]; 01494 rbuffer_[rsrc_err] = new char_type[bufsz]; 01495 if (!(mode & pstdout)) 01496 { 01497 rsrc_ = rsrc_err; 01498 this->setg(rbuffer_[rsrc_err] + pbsz, rbuffer_[rsrc_err] + pbsz, 01499 rbuffer_[rsrc_err] + pbsz); 01500 } 01501 } 01502 } 01503 01504 template <typename C, typename T> 01505 void 01506 basic_pstreambuf<C,T>::destroy_buffers(pmode mode) 01507 { 01508 if (mode & pstdin) 01509 { 01510 this->setp(NULL, NULL); 01511 delete[] wbuffer_; 01512 wbuffer_ = NULL; 01513 } 01514 if (mode & pstdout) 01515 { 01516 if (rsrc_ == rsrc_out) 01517 this->setg(NULL, NULL, NULL); 01518 delete[] rbuffer_[rsrc_out]; 01519 rbuffer_[rsrc_out] = NULL; 01520 } 01521 if (mode & pstderr) 01522 { 01523 if (rsrc_ == rsrc_err) 01524 this->setg(NULL, NULL, NULL); 01525 delete[] rbuffer_[rsrc_err]; 01526 rbuffer_[rsrc_err] = NULL; 01527 } 01528 } 01529 01530 template <typename C, typename T> 01531 typename basic_pstreambuf<C,T>::buf_read_src 01532 basic_pstreambuf<C,T>::switch_read_buffer(buf_read_src src) 01533 { 01534 if (rsrc_ != src) 01535 { 01536 char_type* tmpbufstate[] = {this->eback(), this->gptr(), this->egptr()}; 01537 this->setg(rbufstate_[0], rbufstate_[1], rbufstate_[2]); 01538 for (std::size_t i = 0; i < 3; ++i) 01539 rbufstate_[i] = tmpbufstate[i]; 01540 rsrc_ = src; 01541 } 01542 return rsrc_; 01543 } 01544 01561 template <typename C, typename T> 01562 int 01563 basic_pstreambuf<C,T>::wait(bool nohang) 01564 { 01565 int exited = -1; 01566 if (is_open()) 01567 { 01568 int status; 01569 switch(::waitpid(ppid_, &status, nohang ? WNOHANG : 0)) 01570 { 01571 case 0 : 01572 // nohang was true and process has not exited 01573 exited = 0; 01574 break; 01575 case -1 : 01576 error_ = errno; 01577 break; 01578 default : 01579 // process has exited 01580 ppid_ = 0; 01581 status_ = status; 01582 exited = 1; 01583 // Close wpipe, would get SIGPIPE if we used it. 01584 destroy_buffers(pstdin); 01585 close_fd(wpipe_); 01586 // Must free read buffers and pipes on destruction 01587 // or next call to open()/close() 01588 break; 01589 } 01590 } 01591 return exited; 01592 } 01593 01604 template <typename C, typename T> 01605 inline basic_pstreambuf<C,T>* 01606 basic_pstreambuf<C,T>::kill(int signal) 01607 { 01608 basic_pstreambuf<C,T>* ret = NULL; 01609 if (is_open()) 01610 { 01611 if (::kill(ppid_, signal)) 01612 error_ = errno; 01613 else 01614 { 01615 #if 0 01616 // TODO call exited() to check for exit and clean up? leave to user? 01617 if (signal==SIGTERM || signal==SIGKILL) 01618 this->exited(); 01619 #endif 01620 ret = this; 01621 } 01622 } 01623 return ret; 01624 } 01625 01639 template <typename C, typename T> 01640 inline basic_pstreambuf<C,T>* 01641 basic_pstreambuf<C,T>::killpg(int signal) 01642 { 01643 basic_pstreambuf<C,T>* ret = NULL; 01644 #ifdef _POSIX_JOB_CONTROL 01645 if (is_open()) 01646 { 01647 pid_t pgid = ::getpgid(ppid_); 01648 if (pgid == -1) 01649 error_ = errno; 01650 else if (pgid == ::getpgrp()) 01651 error_ = EPERM; // Don't commit suicide 01652 else if (::killpg(pgid, signal)) 01653 error_ = errno; 01654 else 01655 ret = this; 01656 } 01657 #else 01658 error_ = ENOTSUP; 01659 #endif 01660 return ret; 01661 } 01662 01670 template <typename C, typename T> 01671 inline bool 01672 basic_pstreambuf<C,T>::exited() 01673 { 01674 return ppid_ == 0 || wait(true)==1; 01675 } 01676 01677 01683 template <typename C, typename T> 01684 inline int 01685 basic_pstreambuf<C,T>::status() const 01686 { 01687 return status_; 01688 } 01689 01693 template <typename C, typename T> 01694 inline int 01695 basic_pstreambuf<C,T>::error() const 01696 { 01697 return error_; 01698 } 01699 01704 template <typename C, typename T> 01705 inline void 01706 basic_pstreambuf<C,T>::peof() 01707 { 01708 sync(); 01709 destroy_buffers(pstdin); 01710 close_fd(wpipe_); 01711 } 01712 01723 template <typename C, typename T> 01724 inline bool 01725 basic_pstreambuf<C,T>::is_open() const 01726 { 01727 return ppid_ > 0; 01728 } 01729 01738 template <typename C, typename T> 01739 inline bool 01740 basic_pstreambuf<C,T>::read_err(bool readerr) 01741 { 01742 buf_read_src src = readerr ? rsrc_err : rsrc_out; 01743 if (rpipe_[src]>=0) 01744 { 01745 switch_read_buffer(src); 01746 return true; 01747 } 01748 return false; 01749 } 01750 01761 template <typename C, typename T> 01762 typename basic_pstreambuf<C,T>::int_type 01763 basic_pstreambuf<C,T>::overflow(int_type c) 01764 { 01765 if (!empty_buffer()) 01766 return traits_type::eof(); 01767 else if (!traits_type::eq_int_type(c, traits_type::eof())) 01768 return this->sputc(c); 01769 else 01770 return traits_type::not_eof(c); 01771 } 01772 01773 01774 template <typename C, typename T> 01775 int 01776 basic_pstreambuf<C,T>::sync() 01777 { 01778 return !exited() && empty_buffer() ? 0 : -1; 01779 } 01780 01786 template <typename C, typename T> 01787 std::streamsize 01788 basic_pstreambuf<C,T>::xsputn(const char_type* s, std::streamsize n) 01789 { 01790 std::streamsize done = 0; 01791 while (done < n) 01792 { 01793 if (std::streamsize nbuf = this->epptr() - this->pptr()) 01794 { 01795 nbuf = std::min(nbuf, n - done); 01796 traits_type::copy(this->pptr(), s + done, nbuf); 01797 this->pbump(nbuf); 01798 done += nbuf; 01799 } 01800 else if (!empty_buffer()) 01801 break; 01802 } 01803 return done; 01804 } 01805 01809 template <typename C, typename T> 01810 bool 01811 basic_pstreambuf<C,T>::empty_buffer() 01812 { 01813 const std::streamsize count = this->pptr() - this->pbase(); 01814 if (count > 0) 01815 { 01816 const std::streamsize written = this->write(this->wbuffer_, count); 01817 if (written > 0) 01818 { 01819 if (const std::streamsize unwritten = count - written) 01820 traits_type::move(this->pbase(), this->pbase()+written, unwritten); 01821 this->pbump(-written); 01822 return true; 01823 } 01824 } 01825 return false; 01826 } 01827 01835 template <typename C, typename T> 01836 typename basic_pstreambuf<C,T>::int_type 01837 basic_pstreambuf<C,T>::underflow() 01838 { 01839 if (this->gptr() < this->egptr() || fill_buffer()) 01840 return traits_type::to_int_type(*this->gptr()); 01841 else 01842 return traits_type::eof(); 01843 } 01844 01853 template <typename C, typename T> 01854 typename basic_pstreambuf<C,T>::int_type 01855 basic_pstreambuf<C,T>::pbackfail(int_type c) 01856 { 01857 if (this->gptr() != this->eback()) 01858 { 01859 this->gbump(-1); 01860 if (!traits_type::eq_int_type(c, traits_type::eof())) 01861 *this->gptr() = traits_type::to_char_type(c); 01862 return traits_type::not_eof(c); 01863 } 01864 else 01865 return traits_type::eof(); 01866 } 01867 01868 template <typename C, typename T> 01869 std::streamsize 01870 basic_pstreambuf<C,T>::showmanyc() 01871 { 01872 int avail = 0; 01873 if (sizeof(char_type) == 1) 01874 avail = fill_buffer(true) ? this->egptr() - this->gptr() : -1; 01875 #ifdef FIONREAD 01876 else 01877 { 01878 if (::ioctl(rpipe(), FIONREAD, &avail) == -1) 01879 avail = -1; 01880 else if (avail) 01881 avail /= sizeof(char_type); 01882 } 01883 #endif 01884 return std::streamsize(avail); 01885 } 01886 01890 template <typename C, typename T> 01891 bool 01892 basic_pstreambuf<C,T>::fill_buffer(bool non_blocking) 01893 { 01894 const std::streamsize pb1 = this->gptr() - this->eback(); 01895 const std::streamsize pb2 = pbsz; 01896 const std::streamsize npb = std::min(pb1, pb2); 01897 01898 char_type* const rbuf = rbuffer(); 01899 01900 traits_type::move(rbuf + pbsz - npb, this->gptr() - npb, npb); 01901 01902 std::streamsize rc = -1; 01903 01904 if (non_blocking) 01905 { 01906 const int flags = ::fcntl(rpipe(), F_GETFL); 01907 if (flags != -1) 01908 { 01909 const bool blocking = !(flags & O_NONBLOCK); 01910 if (blocking) 01911 ::fcntl(rpipe(), F_SETFL, flags | O_NONBLOCK); // set non-blocking 01912 01913 error_ = 0; 01914 rc = read(rbuf + pbsz, bufsz - pbsz); 01915 01916 if (rc == -1 && error_ == EAGAIN) // nothing available 01917 rc = 0; 01918 else if (rc == 0) // EOF 01919 rc = -1; 01920 01921 if (blocking) 01922 ::fcntl(rpipe(), F_SETFL, flags); // restore 01923 } 01924 } 01925 else 01926 rc = read(rbuf + pbsz, bufsz - pbsz); 01927 01928 if (rc > 0 || (rc == 0 && non_blocking)) 01929 { 01930 this->setg( rbuf + pbsz - npb, 01931 rbuf + pbsz, 01932 rbuf + pbsz + rc ); 01933 return true; 01934 } 01935 else 01936 { 01937 this->setg(NULL, NULL, NULL); 01938 return false; 01939 } 01940 } 01941 01949 template <typename C, typename T> 01950 inline std::streamsize 01951 basic_pstreambuf<C,T>::write(const char_type* s, std::streamsize n) 01952 { 01953 std::streamsize nwritten = 0; 01954 if (wpipe() >= 0) 01955 { 01956 nwritten = ::write(wpipe(), s, n * sizeof(char_type)); 01957 if (nwritten == -1) 01958 error_ = errno; 01959 else 01960 nwritten /= sizeof(char_type); 01961 } 01962 return nwritten; 01963 } 01964 01972 template <typename C, typename T> 01973 inline std::streamsize 01974 basic_pstreambuf<C,T>::read(char_type* s, std::streamsize n) 01975 { 01976 std::streamsize nread = 0; 01977 if (rpipe() >= 0) 01978 { 01979 nread = ::read(rpipe(), s, n * sizeof(char_type)); 01980 if (nread == -1) 01981 error_ = errno; 01982 else 01983 nread /= sizeof(char_type); 01984 } 01985 return nread; 01986 } 01987 01989 template <typename C, typename T> 01990 inline pstreams::fd_type& 01991 basic_pstreambuf<C,T>::wpipe() 01992 { 01993 return wpipe_; 01994 } 01995 01997 template <typename C, typename T> 01998 inline pstreams::fd_type& 01999 basic_pstreambuf<C,T>::rpipe() 02000 { 02001 return rpipe_[rsrc_]; 02002 } 02003 02005 template <typename C, typename T> 02006 inline pstreams::fd_type& 02007 basic_pstreambuf<C,T>::rpipe(buf_read_src which) 02008 { 02009 return rpipe_[which]; 02010 } 02011 02013 template <typename C, typename T> 02014 inline typename basic_pstreambuf<C,T>::char_type* 02015 basic_pstreambuf<C,T>::rbuffer() 02016 { 02017 return rbuffer_[rsrc_]; 02018 } 02019 02020 02021 /* 02022 * member definitions for pstream_common 02023 */ 02024 02034 template <typename C, typename T> 02035 inline 02036 pstream_common<C,T>::pstream_common() 02037 : std::basic_ios<C,T>(NULL) 02038 , command_() 02039 , buf_() 02040 { 02041 this->std::basic_ios<C,T>::rdbuf(&buf_); 02042 } 02043 02052 template <typename C, typename T> 02053 inline 02054 pstream_common<C,T>::pstream_common(const std::string& command, pmode mode) 02055 : std::basic_ios<C,T>(NULL) 02056 , command_(command) 02057 , buf_() 02058 { 02059 this->std::basic_ios<C,T>::rdbuf(&buf_); 02060 do_open(command, mode); 02061 } 02062 02072 template <typename C, typename T> 02073 inline 02074 pstream_common<C,T>::pstream_common( const std::string& file, 02075 const argv_type& argv, 02076 pmode mode ) 02077 : std::basic_ios<C,T>(NULL) 02078 , command_(file) 02079 , buf_() 02080 { 02081 this->std::basic_ios<C,T>::rdbuf(&buf_); 02082 do_open(file, argv, mode); 02083 } 02084 02094 template <typename C, typename T> 02095 inline 02096 pstream_common<C,T>::~pstream_common() 02097 { 02098 } 02099 02108 template <typename C, typename T> 02109 inline void 02110 pstream_common<C,T>::do_open(const std::string& command, pmode mode) 02111 { 02112 if (!buf_.open((command_=command), mode)) 02113 this->setstate(std::ios_base::failbit); 02114 } 02115 02125 template <typename C, typename T> 02126 inline void 02127 pstream_common<C,T>::do_open( const std::string& file, 02128 const argv_type& argv, 02129 pmode mode ) 02130 { 02131 if (!buf_.open((command_=file), argv, mode)) 02132 this->setstate(std::ios_base::failbit); 02133 } 02134 02136 template <typename C, typename T> 02137 inline void 02138 pstream_common<C,T>::close() 02139 { 02140 if (!buf_.close()) 02141 this->setstate(std::ios_base::failbit); 02142 } 02143 02148 template <typename C, typename T> 02149 inline bool 02150 pstream_common<C,T>::is_open() const 02151 { 02152 return buf_.is_open(); 02153 } 02154 02156 template <typename C, typename T> 02157 inline const std::string& 02158 pstream_common<C,T>::command() const 02159 { 02160 return command_; 02161 } 02162 02164 // TODO document behaviour if buffer replaced. 02165 template <typename C, typename T> 02166 inline typename pstream_common<C,T>::streambuf_type* 02167 pstream_common<C,T>::rdbuf() const 02168 { 02169 return const_cast<streambuf_type*>(&buf_); 02170 } 02171 02172 02173 #if REDI_EVISCERATE_PSTREAMS 02174 02206 template <typename C, typename T> 02207 std::size_t 02208 basic_pstreambuf<C,T>::fopen(FILE*& in, FILE*& out, FILE*& err) 02209 { 02210 in = out = err = NULL; 02211 std::size_t open_files = 0; 02212 if (wpipe() > -1) 02213 { 02214 if ((in = ::fdopen(wpipe(), "w"))) 02215 { 02216 open_files |= pstdin; 02217 } 02218 } 02219 if (rpipe(rsrc_out) > -1) 02220 { 02221 if ((out = ::fdopen(rpipe(rsrc_out), "r"))) 02222 { 02223 open_files |= pstdout; 02224 } 02225 } 02226 if (rpipe(rsrc_err) > -1) 02227 { 02228 if ((err = ::fdopen(rpipe(rsrc_err), "r"))) 02229 { 02230 open_files |= pstderr; 02231 } 02232 } 02233 return open_files; 02234 } 02235 02246 template <typename C, typename T> 02247 inline std::size_t 02248 pstream_common<C,T>::fopen(FILE*& fin, FILE*& fout, FILE*& ferr) 02249 { 02250 return buf_.fopen(fin, fout, ferr); 02251 } 02252 02253 #endif // REDI_EVISCERATE_PSTREAMS 02254 02255 02256 } // namespace redi 02257 02263 #endif // REDI_PSTREAM_H_SEEN 02264 02265 // vim: ts=2 sw=2 expandtab 02266