libdap  Updated for version 3.17.0
XDRStreamMarshaller.cc
00001 // XDRStreamMarshaller.cc
00002 
00003 // -*- mode: c++; c-basic-offset:4 -*-
00004 
00005 // This file is part of libdap, A C++ implementation of the OPeNDAP Data
00006 // Access Protocol.
00007 
00008 // Copyright (c) 2002,2003 OPeNDAP, Inc.
00009 // Author: Patrick West <pwest@ucar.edu>
00010 //
00011 // This library is free software; you can redistribute it and/or
00012 // modify it under the terms of the GNU Lesser General Public
00013 // License as published by the Free Software Foundation; either
00014 // version 2.1 of the License, or (at your option) any later version.
00015 //
00016 // This library is distributed in the hope that it will be useful,
00017 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00018 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00019 // Lesser General Public License for more details.
00020 //
00021 // You should have received a copy of the GNU Lesser General Public
00022 // License along with this library; if not, write to the Free Software
00023 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
00024 //
00025 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
00026 
00027 // (c) COPYRIGHT URI/MIT 1994-1999
00028 // Please read the full copyright statement in the file COPYRIGHT_URI.
00029 //
00030 // Authors:
00031 //      pwest       Patrick West <pwest@ucar.edu>
00032 
00033 #include "config.h"
00034 
00035 #ifdef HAVE_PTHREAD_H
00036 #include <pthread.h>
00037 #endif
00038 
00039 #include <cassert>
00040 
00041 #include <iostream>
00042 #include <sstream>
00043 #include <iomanip>
00044 
00045 // #define DODS_DEBUG
00046 
00047 #include "XDRStreamMarshaller.h"
00048 #ifdef USE_POSIX_THREADS
00049 #include "MarshallerThread.h"
00050 #endif
00051 #include "Vector.h"
00052 #include "XDRUtils.h"
00053 #include "util.h"
00054 
00055 #include "debug.h"
00056 
00057 using namespace std;
00058 
00059 // Build this code so it does not use pthreads to write some kinds of
00060 // data (see the put_vector() and put_vector_part() methods) in a child thread.
00061 // #undef USE_POSIX_THREADS
00062 
00063 namespace libdap {
00064 
00065 char *XDRStreamMarshaller::d_buf = 0;
00066 #define XDR_DAP_BUFF_SIZE 256
00067 
00068 
00077 XDRStreamMarshaller::XDRStreamMarshaller(ostream &out) :
00078     d_out(out), d_partial_put_byte_count(0), tm(0)
00079 {
00080     if (!d_buf) d_buf = (char *) malloc(XDR_DAP_BUFF_SIZE);
00081     if (!d_buf) throw Error(internal_error, "Failed to allocate memory for data serialization.");
00082 
00083     xdrmem_create(&d_sink, d_buf, XDR_DAP_BUFF_SIZE, XDR_ENCODE);
00084 
00085 #ifdef USE_POSIX_THREADS
00086     tm = new MarshallerThread;
00087 #endif
00088 }
00089 
00090 XDRStreamMarshaller::~XDRStreamMarshaller()
00091 {
00092     delete tm;
00093 
00094     xdr_destroy(&d_sink);
00095 }
00096 
00097 void XDRStreamMarshaller::put_byte(dods_byte val)
00098 {
00099      if (!xdr_setpos(&d_sink, 0))
00100         throw Error("Network I/O Error. Could not send byte data - unable to set stream position.");
00101 
00102     if (!xdr_char(&d_sink, (char *) &val))
00103         throw Error(
00104             "Network I/O Error. Could not send byte data.");
00105 
00106     unsigned int bytes_written = xdr_getpos(&d_sink);
00107     if (!bytes_written)
00108         throw Error(
00109             "Network I/O Error. Could not send byte data - unable to get stream position.");
00110 
00111 #ifdef USE_POSIX_THREADS
00112     Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
00113 #endif
00114 
00115     d_out.write(d_buf, bytes_written);
00116 }
00117 
00118 void XDRStreamMarshaller::put_int16(dods_int16 val)
00119 {
00120     if (!xdr_setpos(&d_sink, 0))
00121         throw Error(
00122             "Network I/O Error. Could not send int 16 data - unable to set stream position.");
00123 
00124     if (!XDR_INT16(&d_sink, &val))
00125         throw Error(
00126             "Network I/O Error. Could not send int 16 data.");
00127 
00128     unsigned int bytes_written = xdr_getpos(&d_sink);
00129     if (!bytes_written)
00130         throw Error(
00131             "Network I/O Error. Could not send int 16 data - unable to get stream position.");
00132 
00133 #ifdef USE_POSIX_THREADS
00134     Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
00135 #endif
00136 
00137     d_out.write(d_buf, bytes_written);
00138 }
00139 
00140 void XDRStreamMarshaller::put_int32(dods_int32 val)
00141 {
00142     if (!xdr_setpos(&d_sink, 0))
00143         throw Error(
00144             "Network I/O Error. Could not send int 32 data - unable to set stream position.");
00145 
00146     if (!XDR_INT32(&d_sink, &val))
00147         throw Error(
00148             "Network I/O Error. Culd not read int 32 data.");
00149 
00150     unsigned int bytes_written = xdr_getpos(&d_sink);
00151     if (!bytes_written)
00152         throw Error(
00153             "Network I/O Error. Could not send int 32 data - unable to get stream position.");
00154 
00155 #ifdef USE_POSIX_THREADS
00156     Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
00157 #endif
00158 
00159     d_out.write(d_buf, bytes_written);
00160 }
00161 
00162 void XDRStreamMarshaller::put_float32(dods_float32 val)
00163 {
00164     if (!xdr_setpos(&d_sink, 0))
00165         throw Error(
00166             "Network I/O Error. Could not send float 32 data - unable to set stream position.");
00167 
00168     if (!xdr_float(&d_sink, &val))
00169         throw Error(
00170             "Network I/O Error. Could not send float 32 data.");
00171 
00172     unsigned int bytes_written = xdr_getpos(&d_sink);
00173     if (!bytes_written)
00174         throw Error(
00175             "Network I/O Error. Could not send float 32 data - unable to get stream position.");
00176 
00177 #ifdef USE_POSIX_THREADS
00178     Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
00179 #endif
00180 
00181     d_out.write(d_buf, bytes_written);
00182 }
00183 
00184 void XDRStreamMarshaller::put_float64(dods_float64 val)
00185 {
00186     if (!xdr_setpos(&d_sink, 0))
00187         throw Error(
00188             "Network I/O Error. Could not send float 64 data - unable to set stream position.");
00189 
00190     if (!xdr_double(&d_sink, &val))
00191         throw Error(
00192             "Network I/O Error. Could not send float 64 data.");
00193 
00194     unsigned int bytes_written = xdr_getpos(&d_sink);
00195     if (!bytes_written)
00196         throw Error(
00197             "Network I/O Error. Could not send float 64 data - unable to get stream position.");
00198 
00199 #ifdef USE_POSIX_THREADS
00200     Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
00201 #endif
00202 
00203     d_out.write(d_buf, bytes_written);
00204 }
00205 
00206 void XDRStreamMarshaller::put_uint16(dods_uint16 val)
00207 {
00208     if (!xdr_setpos(&d_sink, 0))
00209         throw Error(
00210             "Network I/O Error. Could not send uint 16 data - unable to set stream position.");
00211 
00212     if (!XDR_UINT16(&d_sink, &val))
00213         throw Error(
00214             "Network I/O Error. Could not send uint 16 data.");
00215 
00216     unsigned int bytes_written = xdr_getpos(&d_sink);
00217     if (!bytes_written)
00218         throw Error(
00219             "Network I/O Error. Could not send uint 16 data - unable to get stream position.");
00220 
00221 #ifdef USE_POSIX_THREADS
00222     Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
00223 #endif
00224 
00225     d_out.write(d_buf, bytes_written);
00226 }
00227 
00228 void XDRStreamMarshaller::put_uint32(dods_uint32 val)
00229 {
00230     if (!xdr_setpos(&d_sink, 0))
00231         throw Error(
00232             "Network I/O Error. Could not send uint 32 data - unable to set stream position.");
00233 
00234     if (!XDR_UINT32(&d_sink, &val))
00235         throw Error(
00236             "Network I/O Error. Could not send uint 32 data.");
00237 
00238     unsigned int bytes_written = xdr_getpos(&d_sink);
00239     if (!bytes_written)
00240         throw Error(
00241             "Network I/O Error. Could not send uint 32 data - unable to get stream position.");
00242 
00243 #ifdef USE_POSIX_THREADS
00244     Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
00245 #endif
00246 
00247     d_out.write(d_buf, bytes_written);
00248 }
00249 
00250 void XDRStreamMarshaller::put_str(const string &val)
00251 {
00252     int size = val.length() + 8;
00253 
00254     XDR str_sink;
00255     vector<char> str_buf(size);
00256 
00257     try {
00258         xdrmem_create(&str_sink, &str_buf[0], size, XDR_ENCODE);
00259 
00260         if (!xdr_setpos(&str_sink, 0))
00261             throw Error(
00262                 "Network I/O Error. Could not send string data - unable to set stream position.");
00263 
00264         const char *out_tmp = val.c_str();
00265         if (!xdr_string(&str_sink, (char **) &out_tmp, size))
00266             throw Error(
00267                 "Network I/O Error. Could not send string data.");
00268 
00269         unsigned int bytes_written = xdr_getpos(&str_sink);
00270         if (!bytes_written)
00271             throw Error(
00272                 "Network I/O Error. Could not send string data - unable to get stream position.");
00273 
00274 #ifdef USE_POSIX_THREADS
00275         Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
00276 #endif
00277 
00278         d_out.write(&str_buf[0], bytes_written);
00279 
00280         xdr_destroy(&str_sink);
00281     }
00282     catch (...) {
00283         xdr_destroy(&str_sink);
00284         throw;
00285     }
00286 }
00287 
00288 void XDRStreamMarshaller::put_url(const string &val)
00289 {
00290     put_str(val);
00291 }
00292 
00293 void XDRStreamMarshaller::put_opaque(char *val, unsigned int len)
00294 {
00295     if (len > XDR_DAP_BUFF_SIZE)
00296         throw Error("Network I/O Error. Could not send opaque data - length of opaque data larger than allowed");
00297 
00298     if (!xdr_setpos(&d_sink, 0))
00299         throw Error(
00300             "Network I/O Error. Could not send opaque data - unable to set stream position.");
00301 
00302     if (!xdr_opaque(&d_sink, val, len))
00303         throw Error(
00304             "Network I/O Error. Could not send opaque data.");
00305 
00306     unsigned int bytes_written = xdr_getpos(&d_sink);
00307     if (!bytes_written)
00308         throw Error(
00309             "Network I/O Error. Could not send opaque data - unable to get stream position.");
00310 
00311 #ifdef USE_POSIX_THREADS
00312     Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
00313 #endif
00314 
00315     d_out.write(d_buf, bytes_written);
00316 }
00317 
00318 void XDRStreamMarshaller::put_int(int val)
00319 {
00320     if (!xdr_setpos(&d_sink, 0))
00321         throw Error(
00322             "Network I/O Error. Could not send int data - unable to set stream position.");
00323 
00324     if (!xdr_int(&d_sink, &val))
00325         throw Error(
00326             "Network I/O Error(1). Could not send int data.");
00327 
00328     unsigned int bytes_written = xdr_getpos(&d_sink);
00329     if (!bytes_written)
00330         throw Error(
00331             "Network I/O Error. Could not send int data - unable to get stream position.");
00332 
00333 #ifdef USE_POSIX_THREADS
00334     Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
00335 #endif
00336 
00337     d_out.write(d_buf, bytes_written);
00338 }
00339 
00340 void XDRStreamMarshaller::put_vector(char *val, int num, int width, Vector &vec)
00341 {
00342     put_vector(val, num, width, vec.var()->type());
00343 }
00344 
00345 
00353 void XDRStreamMarshaller::put_vector_start(int num)
00354 {
00355     put_int(num);
00356     put_int(num);
00357 
00358     d_partial_put_byte_count = 0;
00359 }
00360 
00367 void XDRStreamMarshaller::put_vector_end()
00368 {
00369 #ifdef USE_POSIX_THREADS
00370     Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
00371 #endif
00372 
00373     // Compute the trailing (padding) bytes
00374 
00375     // Note that the XDR standard pads values to 4 byte boundaries.
00376     //unsigned int pad = (d_partial_put_byte_count % 4) == 0 ? 0: 4 - (d_partial_put_byte_count % 4);
00377     unsigned int mod_4 = d_partial_put_byte_count & 0x03;
00378     unsigned int pad = (mod_4 == 0) ? 0: 4 - mod_4;
00379 
00380     if (pad) {
00381         vector<char> padding(4, 0); // 4 zeros
00382 
00383         d_out.write(&padding[0], pad);
00384         if (d_out.fail()) throw Error("Network I/O Error. Could not send vector data padding");
00385     }
00386 }
00387 
00388 // Start of parallel I/O support. jhrg 8/19/15
00389 void XDRStreamMarshaller::put_vector(char *val, int num, Vector &)
00390 {
00391     if (!val) throw InternalErr(__FILE__, __LINE__, "Could not send byte vector data. Buffer pointer is not set.");
00392 
00393     // write the number of members of the array being written and then set the position to 0
00394     put_int(num);
00395 
00396     // this is the word boundary for writing xdr bytes in a vector.
00397     const unsigned int add_to = 8;
00398     // switch to memory on the heap since the thread will need to access it
00399     // after this code returns.
00400     char *byte_buf = new char[num + add_to];
00401     XDR byte_sink;
00402     try {
00403         xdrmem_create(&byte_sink, byte_buf, num + add_to, XDR_ENCODE);
00404         if (!xdr_setpos(&byte_sink, 0))
00405             throw Error("Network I/O Error. Could not send byte vector data - unable to set stream position.");
00406 
00407         if (!xdr_bytes(&byte_sink, (char **) &val, (unsigned int *) &num, num + add_to))
00408             throw Error("Network I/O Error(2). Could not send byte vector data - unable to encode data.");
00409 
00410         unsigned int bytes_written = xdr_getpos(&byte_sink);
00411         if (!bytes_written)
00412             throw Error("Network I/O Error. Could not send byte vector data - unable to get stream position.");
00413 
00414 #ifdef USE_POSIX_THREADS
00415         Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
00416         tm->increment_child_thread_count();
00417         tm->start_thread(MarshallerThread::write_thread, d_out, byte_buf, bytes_written);
00418         xdr_destroy(&byte_sink);
00419 #else
00420         d_out.write(byte_buf, bytes_written);
00421         xdr_destroy(&byte_sink);
00422         delete byte_buf;
00423 #endif
00424 
00425     }
00426     catch (...) {
00427         DBG(cerr << "Caught an exception in put_vector_thread" << endl);
00428         xdr_destroy(&byte_sink);
00429         delete byte_buf;
00430         throw;
00431     }
00432 }
00433 
00434 // private
00445 void XDRStreamMarshaller::put_vector(char *val, unsigned int num, int width, Type type)
00446 {
00447 #if 0
00448     if (!val) throw InternalErr(__FILE__, __LINE__, "Buffer pointer is not set.");
00449 #endif
00450     assert(val || num == 0);
00451 
00452     // write the number of array members being written, then set the position back to 0
00453     put_int(num);
00454 
00455     if (num == 0)
00456         return;
00457 
00458     int use_width = width;
00459     if (use_width < 4) use_width = 4;
00460 
00461     // the size is the number of elements num times the width of each
00462     // element, then add 4 bytes for the number of elements
00463     int size = (num * use_width) + 4;
00464 
00465     // allocate enough memory for the elements
00466     //vector<char> vec_buf(size);
00467     char *vec_buf = new char[size];
00468     XDR vec_sink;
00469     try {
00470         xdrmem_create(&vec_sink, vec_buf, size, XDR_ENCODE);
00471 
00472         // set the position of the sink to 0, we're starting at the beginning
00473         if (!xdr_setpos(&vec_sink, 0))
00474             throw Error("Network I/O Error. Could not send vector data - unable to set stream position.");
00475 
00476         // write the array to the buffer
00477         if (!xdr_array(&vec_sink, (char **) &val, (unsigned int *) &num, size, width, XDRUtils::xdr_coder(type)))
00478             throw Error("Network I/O Error(2). Could not send vector data - unable to encode.");
00479 
00480         // how much was written to the buffer
00481         unsigned int bytes_written = xdr_getpos(&vec_sink);
00482         if (!bytes_written)
00483             throw Error("Network I/O Error. Could not send vector data - unable to get stream position.");
00484 
00485 #ifdef USE_POSIX_THREADS
00486         Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
00487         tm->increment_child_thread_count();
00488         tm->start_thread(MarshallerThread::write_thread, d_out, vec_buf, bytes_written);
00489         xdr_destroy(&vec_sink);
00490 #else
00491         d_out.write(vec_buf, bytes_written);
00492         xdr_destroy(&vec_sink);
00493         delete vec_buf;
00494 #endif
00495     }
00496     catch (...) {
00497         xdr_destroy(&vec_sink);
00498         delete vec_buf;
00499         throw;
00500     }
00501 }
00502 
00514 void XDRStreamMarshaller::put_vector_part(char *val, unsigned int num, int width, Type type)
00515 {
00516     if (width == 1) {
00517         // Add space for the 4 bytes of length info and 4 bytes for padding, even though
00518         // we will not send either of those.
00519         const unsigned int add_to = 8;
00520         unsigned int bufsiz = num + add_to;
00521         //vector<char> byte_buf(bufsiz);
00522         char *byte_buf = new char[bufsiz];
00523         XDR byte_sink;
00524         try {
00525             xdrmem_create(&byte_sink, byte_buf, bufsiz, XDR_ENCODE);
00526             if (!xdr_setpos(&byte_sink, 0))
00527                 throw Error("Network I/O Error. Could not send byte vector data - unable to set stream position.");
00528 
00529             if (!xdr_bytes(&byte_sink, (char **) &val, (unsigned int *) &num, bufsiz))
00530                 throw Error("Network I/O Error(2). Could not send byte vector data - unable to encode data.");
00531 
00532 #ifdef USE_POSIX_THREADS
00533             Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
00534             tm->increment_child_thread_count();
00535 
00536             // Increment the element count so we can figure out about the padding in put_vector_last()
00537             d_partial_put_byte_count += num;
00538 
00539             tm->start_thread(MarshallerThread::write_thread_part, d_out, byte_buf, num);
00540             xdr_destroy(&byte_sink);
00541 #else
00542             // Only send the num bytes that follow the 4 bytes of length info - we skip the
00543             // length info because it's already been sent and we don't send any trailing padding
00544             // bytes in this method (see put_vector_last() for that).
00545             d_out.write(byte_buf + 4, num);
00546 
00547             if (d_out.fail())
00548             throw Error ("Network I/O Error. Could not send initial part of byte vector data");
00549 
00550             // Now increment the element count so we can figure out about the padding in put_vector_last()
00551             d_partial_put_byte_count += num;
00552 
00553             xdr_destroy(&byte_sink);
00554             delete byte_buf;
00555 #endif
00556         }
00557         catch (...) {
00558             xdr_destroy(&byte_sink);
00559             delete byte_buf;
00560             throw;
00561         }
00562     }
00563     else {
00564         int use_width = (width < 4) ? 4 : width;
00565 
00566         // the size is the number of elements num times the width of each
00567         // element, then add 4 bytes for the (int) number of elements
00568         int size = (num * use_width) + 4;
00569 
00570         // allocate enough memory for the elements
00571         //vector<char> vec_buf(size);
00572         char *vec_buf = new char[size];
00573         XDR vec_sink;
00574         try {
00575             xdrmem_create(&vec_sink, vec_buf, size, XDR_ENCODE);
00576 
00577             // set the position of the sink to 0, we're starting at the beginning
00578             if (!xdr_setpos(&vec_sink, 0))
00579                 throw Error("Network I/O Error. Could not send vector data - unable to set stream position.");
00580 
00581             // write the array to the buffer
00582             if (!xdr_array(&vec_sink, (char **) &val, (unsigned int *) &num, size, width, XDRUtils::xdr_coder(type)))
00583                 throw Error("Network I/O Error(2). Could not send vector data -unable to encode data.");
00584 
00585 #ifdef USE_POSIX_THREADS
00586             Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
00587             tm->increment_child_thread_count();
00588 
00589             // Increment the element count so we can figure out about the padding in put_vector_last()
00590             d_partial_put_byte_count += (size - 4);
00591             tm->start_thread(MarshallerThread::write_thread_part, d_out, vec_buf, size - 4);
00592             xdr_destroy(&vec_sink);
00593 #else
00594             // write that much out to the output stream, skipping the length data that
00595             // XDR writes since we have already written the length info using put_vector_start()
00596             d_out.write(vec_buf + 4, size - 4);
00597 
00598             if (d_out.fail())
00599                 throw Error ("Network I/O Error. Could not send part of vector data");
00600 
00601             // Now increment the element count so we can figure out about the padding in put_vector_last()
00602             d_partial_put_byte_count += (size - 4);
00603 
00604             xdr_destroy(&vec_sink);
00605             delete vec_buf;
00606 #endif
00607         }
00608         catch (...) {
00609             xdr_destroy(&vec_sink);
00610             delete vec_buf;
00611             throw;
00612         }
00613     }
00614 }
00615 
00616 void XDRStreamMarshaller::dump(ostream &strm) const
00617 {
00618     strm << DapIndent::LMarg << "XDRStreamMarshaller::dump - (" << (void *) this << ")" << endl;
00619 }
00620 
00621 } // namespace libdap
00622