libdap
Updated for version 3.17.0
|
00001 // -*- mode: c++; c-basic-offset:4 -*- 00002 00003 // This file is part of libdap, A C++ implementation of the OPeNDAP Data 00004 // Access Protocol. 00005 00006 // Copyright (c) 2015 OPeNDAP, Inc. 00007 // Author: James Gallagher <jgallagher@opendap.org> 00008 // 00009 // This library is free software; you can redistribute it and/or 00010 // modify it under the terms of the GNU Lesser General Public 00011 // License as published by the Free Software Foundation; either 00012 // version 2.1 of the License, or (at your option) any later version. 00013 // 00014 // This library is distributed in the hope that it will be useful, 00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00017 // Lesser General Public License for more details. 00018 // 00019 // You should have received a copy of the GNU Lesser General Public 00020 // License along with this library; if not, write to the Free Software 00021 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 00022 // 00023 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112. 00024 00025 /* 00026 * MarshallerThread.cc 00027 * 00028 * Created on: Aug 27, 2015 00029 * Author: jimg 00030 */ 00031 00032 #include "config.h" 00033 00034 #include <pthread.h> 00035 #include <sys/time.h> 00036 #include <fcntl.h> 00037 #include <unistd.h> 00038 00039 #include <ostream> 00040 #include <sstream> 00041 00042 #include "MarshallerThread.h" 00043 #include "Error.h" 00044 #include "InternalErr.h" 00045 #include "debug.h" 00046 00047 using namespace libdap; 00048 using namespace std; 00049 00050 bool MarshallerThread::print_time = false; 00051 00057 static double time_diff_to_hundredths(struct timeval *stop, struct timeval *start) 00058 { 00059 /* Perform the carry for the later subtraction by updating y. */ 00060 if (stop->tv_usec < start->tv_usec) { 00061 int nsec = (start->tv_usec - stop->tv_usec) / 1000000 + 1; 00062 start->tv_usec -= 1000000 * nsec; 00063 start->tv_sec += nsec; 00064 } 00065 if (stop->tv_usec - start->tv_usec > 1000000) { 00066 int nsec = (start->tv_usec - stop->tv_usec) / 1000000; 00067 start->tv_usec += 1000000 * nsec; 00068 start->tv_sec -= nsec; 00069 } 00070 00071 double result = stop->tv_sec - start->tv_sec; 00072 result += double(stop->tv_usec - start->tv_usec) / 1000000; 00073 return result; 00074 } 00075 00085 Locker::Locker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count) : 00086 m_mutex(lock) 00087 { 00088 int status = pthread_mutex_lock(&m_mutex); 00089 00090 DBG(cerr << "Locking the mutex! (waiting; " << pthread_self() << ")" << endl); 00091 00092 if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not lock m_mutex"); 00093 while (count != 0) { 00094 status = pthread_cond_wait(&cond, &m_mutex); 00095 if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not wait on m_cond"); 00096 } 00097 if (count != 0) throw InternalErr(__FILE__, __LINE__, "FAIL: left m_cond wait with non-zero child thread count"); 00098 00099 DBG(cerr << "Locked! (" << pthread_self() << ")" << endl); 00100 } 00101 00105 Locker::~Locker() 00106 { 00107 DBG(cerr << "Unlocking the mutex! (" << pthread_self() << ")" << endl); 00108 00109 int status = pthread_mutex_unlock(&m_mutex); 00110 if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not unlock m_mutex"); 00111 } 00112 00113 00127 ChildLocker::ChildLocker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count) : 00128 m_mutex(lock), m_cond(cond), m_count(count) 00129 { 00130 int status = pthread_mutex_lock(&m_mutex); 00131 00132 DBG(cerr << "Locking the mutex! (simple; " << pthread_self() << ")" << endl); 00133 00134 if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not lock m_mutex"); 00135 00136 DBG(cerr << "Locked! (" << pthread_self() << ")" << endl); 00137 } 00138 00139 ChildLocker::~ChildLocker() 00140 { 00141 DBG(cerr << "Unlocking the mutex! (" << pthread_self() << ")" << endl); 00142 00143 m_count = 0; 00144 int status = pthread_cond_signal(&m_cond); 00145 if (status != 0) 00146 throw InternalErr(__FILE__, __LINE__, "Could not signal main thread from ChildLocker!"); 00147 00148 status = pthread_mutex_unlock(&m_mutex); 00149 if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not unlock m_mutex"); 00150 } 00151 00152 MarshallerThread::MarshallerThread() : 00153 d_thread(0), d_child_thread_count(0) 00154 { 00155 if (pthread_attr_init(&d_thread_attr) != 0) throw Error(internal_error, "Failed to initialize pthread attributes."); 00156 if (pthread_attr_setdetachstate(&d_thread_attr, PTHREAD_CREATE_DETACHED /*PTHREAD_CREATE_JOINABLE*/) != 0) 00157 throw Error(internal_error, "Failed to complete pthread attribute initialization."); 00158 00159 if (pthread_mutex_init(&d_out_mutex, 0) != 0) throw Error(internal_error, "Failed to initialize mutex."); 00160 if (pthread_cond_init(&d_out_cond, 0) != 0) throw Error(internal_error, "Failed to initialize cond."); 00161 } 00162 00163 MarshallerThread::~MarshallerThread() 00164 { 00165 int status = pthread_mutex_lock(&d_out_mutex); 00166 if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not lock m_mutex"); 00167 while (d_child_thread_count != 0) { 00168 status = pthread_cond_wait(&d_out_cond, &d_out_mutex); 00169 if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not wait on m_cond"); 00170 } 00171 if (d_child_thread_count != 0) 00172 throw InternalErr(__FILE__, __LINE__, "FAIL: left m_cond wait with non-zero child thread count"); 00173 00174 status = pthread_mutex_unlock(&d_out_mutex); 00175 if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not unlock m_mutex"); 00176 00177 pthread_mutex_destroy(&d_out_mutex); 00178 pthread_cond_destroy(&d_out_cond); 00179 00180 pthread_attr_destroy(&d_thread_attr); 00181 } 00182 00183 // not a static method 00189 void MarshallerThread::start_thread(void* (*thread)(void *arg), ostream &out, char *byte_buf, 00190 unsigned int bytes) 00191 { 00192 write_args *args = new write_args(d_out_mutex, d_out_cond, d_child_thread_count, d_thread_error, out, byte_buf, 00193 bytes); 00194 int status = pthread_create(&d_thread, &d_thread_attr, thread, args); 00195 if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not start child thread"); 00196 } 00197 00201 void MarshallerThread::start_thread(void* (*thread)(void *arg), int fd, char *byte_buf, unsigned int bytes) 00202 { 00203 write_args *args = new write_args(d_out_mutex, d_out_cond, d_child_thread_count, d_thread_error, fd, byte_buf, 00204 bytes); 00205 int status = pthread_create(&d_thread, &d_thread_attr, thread, args); 00206 if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not start child thread"); 00207 } 00208 00218 void * 00219 MarshallerThread::write_thread(void *arg) 00220 { 00221 write_args *args = reinterpret_cast<write_args *>(arg); 00222 00223 ChildLocker lock(args->d_mutex, args->d_cond, args->d_count); // RAII; will unlock on exit 00224 00225 struct timeval tp_s; 00226 if (print_time && gettimeofday(&tp_s, 0) != 0) cerr << "could not read time" << endl; 00227 00228 // force an error 00229 // return (void*)-1; 00230 00231 if (args->d_out_file != -1) { 00232 int bytes_written = write(args->d_out_file, args->d_buf, args->d_num); 00233 if (bytes_written != args->d_num) 00234 return (void*) -1; 00235 } 00236 else { 00237 args->d_out.write(args->d_buf, args->d_num); 00238 if (args->d_out.fail()) { 00239 ostringstream oss; 00240 oss << "Could not write data: " << __FILE__ << ":" << __LINE__; 00241 args->d_error = oss.str(); 00242 return (void*) -1; 00243 } 00244 } 00245 00246 delete args->d_buf; 00247 delete args; 00248 00249 struct timeval tp_e; 00250 if (print_time) { 00251 if (gettimeofday(&tp_e, 0) != 0) cerr << "could not read time" << endl; 00252 00253 cerr << "time for child thread write: " << time_diff_to_hundredths(&tp_e, &tp_s) << endl; 00254 } 00255 00256 return 0; 00257 } 00258 00271 void * 00272 MarshallerThread::write_thread_part(void *arg) 00273 { 00274 write_args *args = reinterpret_cast<write_args *>(arg); 00275 00276 ChildLocker lock(args->d_mutex, args->d_cond, args->d_count); // RAII; will unlock on exit 00277 00278 if (args->d_out_file != -1) { 00279 int bytes_written = write(args->d_out_file, args->d_buf, args->d_num); 00280 if (bytes_written != args->d_num) return (void*) -1; 00281 } 00282 else { 00283 args->d_out.write(args->d_buf + 4, args->d_num); 00284 if (args->d_out.fail()) { 00285 ostringstream oss; 00286 oss << "Could not write data: " << __FILE__ << ":" << __LINE__; 00287 args->d_error = oss.str(); 00288 return (void*) -1; 00289 } 00290 } 00291 00292 delete args->d_buf; 00293 delete args; 00294 00295 return 0; 00296 } 00297