libdap  Updated for version 3.17.0
MarshallerThread.cc
00001 // -*- mode: c++; c-basic-offset:4 -*-
00002 
00003 // This file is part of libdap, A C++ implementation of the OPeNDAP Data
00004 // Access Protocol.
00005 
00006 // Copyright (c) 2015 OPeNDAP, Inc.
00007 // Author: James Gallagher <jgallagher@opendap.org>
00008 //
00009 // This library is free software; you can redistribute it and/or
00010 // modify it under the terms of the GNU Lesser General Public
00011 // License as published by the Free Software Foundation; either
00012 // version 2.1 of the License, or (at your option) any later version.
00013 //
00014 // This library is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00017 // Lesser General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU Lesser General Public
00020 // License along with this library; if not, write to the Free Software
00021 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
00022 //
00023 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
00024 
00025 /*
00026  * MarshallerThread.cc
00027  *
00028  *  Created on: Aug 27, 2015
00029  *      Author: jimg
00030  */
00031 
00032 #include "config.h"
00033 
00034 #include <pthread.h>
00035 #include <sys/time.h>
00036 #include <fcntl.h>
00037 #include <unistd.h>
00038 
00039 #include <ostream>
00040 #include <sstream>
00041 
00042 #include "MarshallerThread.h"
00043 #include "Error.h"
00044 #include "InternalErr.h"
00045 #include "debug.h"
00046 
00047 using namespace libdap;
00048 using namespace std;
00049 
00050 bool MarshallerThread::print_time = false;
00051 
00057 static double time_diff_to_hundredths(struct timeval *stop, struct timeval *start)
00058 {
00059     /* Perform the carry for the later subtraction by updating y. */
00060     if (stop->tv_usec < start->tv_usec) {
00061         int nsec = (start->tv_usec - stop->tv_usec) / 1000000 + 1;
00062         start->tv_usec -= 1000000 * nsec;
00063         start->tv_sec += nsec;
00064     }
00065     if (stop->tv_usec - start->tv_usec > 1000000) {
00066         int nsec = (start->tv_usec - stop->tv_usec) / 1000000;
00067         start->tv_usec += 1000000 * nsec;
00068         start->tv_sec -= nsec;
00069     }
00070 
00071     double result = stop->tv_sec - start->tv_sec;
00072     result += double(stop->tv_usec - start->tv_usec) / 1000000;
00073     return result;
00074 }
00075 
00085 Locker::Locker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count) :
00086     m_mutex(lock)
00087 {
00088     int status = pthread_mutex_lock(&m_mutex);
00089 
00090     DBG(cerr << "Locking the mutex! (waiting; " << pthread_self() << ")" << endl);
00091 
00092     if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not lock m_mutex");
00093     while (count != 0) {
00094         status = pthread_cond_wait(&cond, &m_mutex);
00095         if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not wait on m_cond");
00096     }
00097     if (count != 0) throw InternalErr(__FILE__, __LINE__, "FAIL: left m_cond wait with non-zero child thread count");
00098 
00099     DBG(cerr << "Locked! (" << pthread_self() << ")" << endl);
00100 }
00101 
00105 Locker::~Locker()
00106 {
00107     DBG(cerr << "Unlocking the mutex! (" << pthread_self() << ")" << endl);
00108 
00109     int status = pthread_mutex_unlock(&m_mutex);
00110     if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not unlock m_mutex");
00111 }
00112 
00113 
00127 ChildLocker::ChildLocker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count) :
00128     m_mutex(lock), m_cond(cond), m_count(count)
00129 {
00130     int status = pthread_mutex_lock(&m_mutex);
00131 
00132     DBG(cerr << "Locking the mutex! (simple; " << pthread_self() << ")" << endl);
00133 
00134     if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not lock m_mutex");
00135 
00136     DBG(cerr << "Locked! (" << pthread_self() << ")" << endl);
00137 }
00138 
00139 ChildLocker::~ChildLocker()
00140 {
00141     DBG(cerr << "Unlocking the mutex! (" << pthread_self() << ")" << endl);
00142 
00143     m_count = 0;
00144     int status = pthread_cond_signal(&m_cond);
00145     if (status != 0)
00146         throw InternalErr(__FILE__, __LINE__, "Could not signal main thread from ChildLocker!");
00147 
00148     status = pthread_mutex_unlock(&m_mutex);
00149     if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not unlock m_mutex");
00150 }
00151 
00152 MarshallerThread::MarshallerThread() :
00153     d_thread(0), d_child_thread_count(0)
00154 {
00155     if (pthread_attr_init(&d_thread_attr) != 0) throw Error(internal_error, "Failed to initialize pthread attributes.");
00156     if (pthread_attr_setdetachstate(&d_thread_attr, PTHREAD_CREATE_DETACHED /*PTHREAD_CREATE_JOINABLE*/) != 0)
00157         throw Error(internal_error, "Failed to complete pthread attribute initialization.");
00158 
00159     if (pthread_mutex_init(&d_out_mutex, 0) != 0) throw Error(internal_error, "Failed to initialize mutex.");
00160     if (pthread_cond_init(&d_out_cond, 0) != 0) throw Error(internal_error, "Failed to initialize cond.");
00161 }
00162 
00163 MarshallerThread::~MarshallerThread()
00164 {
00165     int status = pthread_mutex_lock(&d_out_mutex);
00166     if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not lock m_mutex");
00167     while (d_child_thread_count != 0) {
00168         status = pthread_cond_wait(&d_out_cond, &d_out_mutex);
00169         if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not wait on m_cond");
00170     }
00171     if (d_child_thread_count != 0)
00172         throw InternalErr(__FILE__, __LINE__, "FAIL: left m_cond wait with non-zero child thread count");
00173 
00174     status = pthread_mutex_unlock(&d_out_mutex);
00175     if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not unlock m_mutex");
00176 
00177     pthread_mutex_destroy(&d_out_mutex);
00178     pthread_cond_destroy(&d_out_cond);
00179 
00180     pthread_attr_destroy(&d_thread_attr);
00181 }
00182 
00183 // not a static method
00189 void MarshallerThread::start_thread(void* (*thread)(void *arg), ostream &out, char *byte_buf,
00190     unsigned int bytes)
00191 {
00192     write_args *args = new write_args(d_out_mutex, d_out_cond, d_child_thread_count, d_thread_error, out, byte_buf,
00193         bytes);
00194     int status = pthread_create(&d_thread, &d_thread_attr, thread, args);
00195     if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not start child thread");
00196 }
00197 
00201 void MarshallerThread::start_thread(void* (*thread)(void *arg), int fd, char *byte_buf, unsigned int bytes)
00202 {
00203     write_args *args = new write_args(d_out_mutex, d_out_cond, d_child_thread_count, d_thread_error, fd, byte_buf,
00204         bytes);
00205     int status = pthread_create(&d_thread, &d_thread_attr, thread, args);
00206     if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not start child thread");
00207 }
00208 
00218 void *
00219 MarshallerThread::write_thread(void *arg)
00220 {
00221     write_args *args = reinterpret_cast<write_args *>(arg);
00222 
00223     ChildLocker lock(args->d_mutex, args->d_cond, args->d_count); // RAII; will unlock on exit
00224 
00225     struct timeval tp_s;
00226     if (print_time && gettimeofday(&tp_s, 0) != 0) cerr << "could not read time" << endl;
00227 
00228     // force an error
00229     // return (void*)-1;
00230 
00231     if (args->d_out_file != -1) {
00232         int bytes_written = write(args->d_out_file, args->d_buf, args->d_num);
00233         if (bytes_written != args->d_num)
00234             return (void*) -1;
00235     }
00236     else {
00237         args->d_out.write(args->d_buf, args->d_num);
00238         if (args->d_out.fail()) {
00239             ostringstream oss;
00240             oss << "Could not write data: " << __FILE__ << ":" << __LINE__;
00241             args->d_error = oss.str();
00242             return (void*) -1;
00243         }
00244     }
00245 
00246     delete args->d_buf;
00247     delete args;
00248 
00249     struct timeval tp_e;
00250     if (print_time) {
00251         if (gettimeofday(&tp_e, 0) != 0) cerr << "could not read time" << endl;
00252 
00253         cerr << "time for child thread write: " << time_diff_to_hundredths(&tp_e, &tp_s) << endl;
00254     }
00255 
00256     return 0;
00257 }
00258 
00271 void *
00272 MarshallerThread::write_thread_part(void *arg)
00273 {
00274     write_args *args = reinterpret_cast<write_args *>(arg);
00275 
00276     ChildLocker lock(args->d_mutex, args->d_cond, args->d_count); // RAII; will unlock on exit
00277 
00278     if (args->d_out_file != -1) {
00279         int bytes_written = write(args->d_out_file, args->d_buf, args->d_num);
00280         if (bytes_written != args->d_num) return (void*) -1;
00281     }
00282     else {
00283         args->d_out.write(args->d_buf + 4, args->d_num);
00284         if (args->d_out.fail()) {
00285             ostringstream oss;
00286             oss << "Could not write data: " << __FILE__ << ":" << __LINE__;
00287             args->d_error = oss.str();
00288             return (void*) -1;
00289         }
00290     }
00291 
00292     delete args->d_buf;
00293     delete args;
00294 
00295     return 0;
00296 }
00297