libdap
Updated for version 3.17.0
|
00001 // XDRStreamMarshaller.cc 00002 00003 // -*- mode: c++; c-basic-offset:4 -*- 00004 00005 // This file is part of libdap, A C++ implementation of the OPeNDAP Data 00006 // Access Protocol. 00007 00008 // Copyright (c) 2002,2003 OPeNDAP, Inc. 00009 // Author: Patrick West <pwest@ucar.edu> 00010 // 00011 // This library is free software; you can redistribute it and/or 00012 // modify it under the terms of the GNU Lesser General Public 00013 // License as published by the Free Software Foundation; either 00014 // version 2.1 of the License, or (at your option) any later version. 00015 // 00016 // This library is distributed in the hope that it will be useful, 00017 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00018 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00019 // Lesser General Public License for more details. 00020 // 00021 // You should have received a copy of the GNU Lesser General Public 00022 // License along with this library; if not, write to the Free Software 00023 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 00024 // 00025 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112. 00026 00027 // (c) COPYRIGHT URI/MIT 1994-1999 00028 // Please read the full copyright statement in the file COPYRIGHT_URI. 00029 // 00030 // Authors: 00031 // pwest Patrick West <pwest@ucar.edu> 00032 00033 #include "config.h" 00034 00035 #ifdef HAVE_PTHREAD_H 00036 #include <pthread.h> 00037 #endif 00038 00039 #include <cassert> 00040 00041 #include <iostream> 00042 #include <sstream> 00043 #include <iomanip> 00044 00045 // #define DODS_DEBUG 00046 00047 #include "XDRStreamMarshaller.h" 00048 #ifdef USE_POSIX_THREADS 00049 #include "MarshallerThread.h" 00050 #endif 00051 #include "Vector.h" 00052 #include "XDRUtils.h" 00053 #include "util.h" 00054 00055 #include "debug.h" 00056 00057 using namespace std; 00058 00059 // Build this code so it does not use pthreads to write some kinds of 00060 // data (see the put_vector() and put_vector_part() methods) in a child thread. 00061 // #undef USE_POSIX_THREADS 00062 00063 namespace libdap { 00064 00065 char *XDRStreamMarshaller::d_buf = 0; 00066 #define XDR_DAP_BUFF_SIZE 256 00067 00068 00077 XDRStreamMarshaller::XDRStreamMarshaller(ostream &out) : 00078 d_out(out), d_partial_put_byte_count(0), tm(0) 00079 { 00080 if (!d_buf) d_buf = (char *) malloc(XDR_DAP_BUFF_SIZE); 00081 if (!d_buf) throw Error(internal_error, "Failed to allocate memory for data serialization."); 00082 00083 xdrmem_create(&d_sink, d_buf, XDR_DAP_BUFF_SIZE, XDR_ENCODE); 00084 00085 #ifdef USE_POSIX_THREADS 00086 tm = new MarshallerThread; 00087 #endif 00088 } 00089 00090 XDRStreamMarshaller::~XDRStreamMarshaller() 00091 { 00092 delete tm; 00093 00094 xdr_destroy(&d_sink); 00095 } 00096 00097 void XDRStreamMarshaller::put_byte(dods_byte val) 00098 { 00099 if (!xdr_setpos(&d_sink, 0)) 00100 throw Error("Network I/O Error. Could not send byte data - unable to set stream position."); 00101 00102 if (!xdr_char(&d_sink, (char *) &val)) 00103 throw Error( 00104 "Network I/O Error. Could not send byte data."); 00105 00106 unsigned int bytes_written = xdr_getpos(&d_sink); 00107 if (!bytes_written) 00108 throw Error( 00109 "Network I/O Error. Could not send byte data - unable to get stream position."); 00110 00111 #ifdef USE_POSIX_THREADS 00112 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count()); 00113 #endif 00114 00115 d_out.write(d_buf, bytes_written); 00116 } 00117 00118 void XDRStreamMarshaller::put_int16(dods_int16 val) 00119 { 00120 if (!xdr_setpos(&d_sink, 0)) 00121 throw Error( 00122 "Network I/O Error. Could not send int 16 data - unable to set stream position."); 00123 00124 if (!XDR_INT16(&d_sink, &val)) 00125 throw Error( 00126 "Network I/O Error. Could not send int 16 data."); 00127 00128 unsigned int bytes_written = xdr_getpos(&d_sink); 00129 if (!bytes_written) 00130 throw Error( 00131 "Network I/O Error. Could not send int 16 data - unable to get stream position."); 00132 00133 #ifdef USE_POSIX_THREADS 00134 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count()); 00135 #endif 00136 00137 d_out.write(d_buf, bytes_written); 00138 } 00139 00140 void XDRStreamMarshaller::put_int32(dods_int32 val) 00141 { 00142 if (!xdr_setpos(&d_sink, 0)) 00143 throw Error( 00144 "Network I/O Error. Could not send int 32 data - unable to set stream position."); 00145 00146 if (!XDR_INT32(&d_sink, &val)) 00147 throw Error( 00148 "Network I/O Error. Culd not read int 32 data."); 00149 00150 unsigned int bytes_written = xdr_getpos(&d_sink); 00151 if (!bytes_written) 00152 throw Error( 00153 "Network I/O Error. Could not send int 32 data - unable to get stream position."); 00154 00155 #ifdef USE_POSIX_THREADS 00156 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count()); 00157 #endif 00158 00159 d_out.write(d_buf, bytes_written); 00160 } 00161 00162 void XDRStreamMarshaller::put_float32(dods_float32 val) 00163 { 00164 if (!xdr_setpos(&d_sink, 0)) 00165 throw Error( 00166 "Network I/O Error. Could not send float 32 data - unable to set stream position."); 00167 00168 if (!xdr_float(&d_sink, &val)) 00169 throw Error( 00170 "Network I/O Error. Could not send float 32 data."); 00171 00172 unsigned int bytes_written = xdr_getpos(&d_sink); 00173 if (!bytes_written) 00174 throw Error( 00175 "Network I/O Error. Could not send float 32 data - unable to get stream position."); 00176 00177 #ifdef USE_POSIX_THREADS 00178 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count()); 00179 #endif 00180 00181 d_out.write(d_buf, bytes_written); 00182 } 00183 00184 void XDRStreamMarshaller::put_float64(dods_float64 val) 00185 { 00186 if (!xdr_setpos(&d_sink, 0)) 00187 throw Error( 00188 "Network I/O Error. Could not send float 64 data - unable to set stream position."); 00189 00190 if (!xdr_double(&d_sink, &val)) 00191 throw Error( 00192 "Network I/O Error. Could not send float 64 data."); 00193 00194 unsigned int bytes_written = xdr_getpos(&d_sink); 00195 if (!bytes_written) 00196 throw Error( 00197 "Network I/O Error. Could not send float 64 data - unable to get stream position."); 00198 00199 #ifdef USE_POSIX_THREADS 00200 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count()); 00201 #endif 00202 00203 d_out.write(d_buf, bytes_written); 00204 } 00205 00206 void XDRStreamMarshaller::put_uint16(dods_uint16 val) 00207 { 00208 if (!xdr_setpos(&d_sink, 0)) 00209 throw Error( 00210 "Network I/O Error. Could not send uint 16 data - unable to set stream position."); 00211 00212 if (!XDR_UINT16(&d_sink, &val)) 00213 throw Error( 00214 "Network I/O Error. Could not send uint 16 data."); 00215 00216 unsigned int bytes_written = xdr_getpos(&d_sink); 00217 if (!bytes_written) 00218 throw Error( 00219 "Network I/O Error. Could not send uint 16 data - unable to get stream position."); 00220 00221 #ifdef USE_POSIX_THREADS 00222 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count()); 00223 #endif 00224 00225 d_out.write(d_buf, bytes_written); 00226 } 00227 00228 void XDRStreamMarshaller::put_uint32(dods_uint32 val) 00229 { 00230 if (!xdr_setpos(&d_sink, 0)) 00231 throw Error( 00232 "Network I/O Error. Could not send uint 32 data - unable to set stream position."); 00233 00234 if (!XDR_UINT32(&d_sink, &val)) 00235 throw Error( 00236 "Network I/O Error. Could not send uint 32 data."); 00237 00238 unsigned int bytes_written = xdr_getpos(&d_sink); 00239 if (!bytes_written) 00240 throw Error( 00241 "Network I/O Error. Could not send uint 32 data - unable to get stream position."); 00242 00243 #ifdef USE_POSIX_THREADS 00244 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count()); 00245 #endif 00246 00247 d_out.write(d_buf, bytes_written); 00248 } 00249 00250 void XDRStreamMarshaller::put_str(const string &val) 00251 { 00252 int size = val.length() + 8; 00253 00254 XDR str_sink; 00255 vector<char> str_buf(size); 00256 00257 try { 00258 xdrmem_create(&str_sink, &str_buf[0], size, XDR_ENCODE); 00259 00260 if (!xdr_setpos(&str_sink, 0)) 00261 throw Error( 00262 "Network I/O Error. Could not send string data - unable to set stream position."); 00263 00264 const char *out_tmp = val.c_str(); 00265 if (!xdr_string(&str_sink, (char **) &out_tmp, size)) 00266 throw Error( 00267 "Network I/O Error. Could not send string data."); 00268 00269 unsigned int bytes_written = xdr_getpos(&str_sink); 00270 if (!bytes_written) 00271 throw Error( 00272 "Network I/O Error. Could not send string data - unable to get stream position."); 00273 00274 #ifdef USE_POSIX_THREADS 00275 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count()); 00276 #endif 00277 00278 d_out.write(&str_buf[0], bytes_written); 00279 00280 xdr_destroy(&str_sink); 00281 } 00282 catch (...) { 00283 xdr_destroy(&str_sink); 00284 throw; 00285 } 00286 } 00287 00288 void XDRStreamMarshaller::put_url(const string &val) 00289 { 00290 put_str(val); 00291 } 00292 00293 void XDRStreamMarshaller::put_opaque(char *val, unsigned int len) 00294 { 00295 if (len > XDR_DAP_BUFF_SIZE) 00296 throw Error("Network I/O Error. Could not send opaque data - length of opaque data larger than allowed"); 00297 00298 if (!xdr_setpos(&d_sink, 0)) 00299 throw Error( 00300 "Network I/O Error. Could not send opaque data - unable to set stream position."); 00301 00302 if (!xdr_opaque(&d_sink, val, len)) 00303 throw Error( 00304 "Network I/O Error. Could not send opaque data."); 00305 00306 unsigned int bytes_written = xdr_getpos(&d_sink); 00307 if (!bytes_written) 00308 throw Error( 00309 "Network I/O Error. Could not send opaque data - unable to get stream position."); 00310 00311 #ifdef USE_POSIX_THREADS 00312 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count()); 00313 #endif 00314 00315 d_out.write(d_buf, bytes_written); 00316 } 00317 00318 void XDRStreamMarshaller::put_int(int val) 00319 { 00320 if (!xdr_setpos(&d_sink, 0)) 00321 throw Error( 00322 "Network I/O Error. Could not send int data - unable to set stream position."); 00323 00324 if (!xdr_int(&d_sink, &val)) 00325 throw Error( 00326 "Network I/O Error(1). Could not send int data."); 00327 00328 unsigned int bytes_written = xdr_getpos(&d_sink); 00329 if (!bytes_written) 00330 throw Error( 00331 "Network I/O Error. Could not send int data - unable to get stream position."); 00332 00333 #ifdef USE_POSIX_THREADS 00334 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count()); 00335 #endif 00336 00337 d_out.write(d_buf, bytes_written); 00338 } 00339 00340 void XDRStreamMarshaller::put_vector(char *val, int num, int width, Vector &vec) 00341 { 00342 put_vector(val, num, width, vec.var()->type()); 00343 } 00344 00345 00353 void XDRStreamMarshaller::put_vector_start(int num) 00354 { 00355 put_int(num); 00356 put_int(num); 00357 00358 d_partial_put_byte_count = 0; 00359 } 00360 00367 void XDRStreamMarshaller::put_vector_end() 00368 { 00369 #ifdef USE_POSIX_THREADS 00370 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count()); 00371 #endif 00372 00373 // Compute the trailing (padding) bytes 00374 00375 // Note that the XDR standard pads values to 4 byte boundaries. 00376 //unsigned int pad = (d_partial_put_byte_count % 4) == 0 ? 0: 4 - (d_partial_put_byte_count % 4); 00377 unsigned int mod_4 = d_partial_put_byte_count & 0x03; 00378 unsigned int pad = (mod_4 == 0) ? 0: 4 - mod_4; 00379 00380 if (pad) { 00381 vector<char> padding(4, 0); // 4 zeros 00382 00383 d_out.write(&padding[0], pad); 00384 if (d_out.fail()) throw Error("Network I/O Error. Could not send vector data padding"); 00385 } 00386 } 00387 00388 // Start of parallel I/O support. jhrg 8/19/15 00389 void XDRStreamMarshaller::put_vector(char *val, int num, Vector &) 00390 { 00391 if (!val) throw InternalErr(__FILE__, __LINE__, "Could not send byte vector data. Buffer pointer is not set."); 00392 00393 // write the number of members of the array being written and then set the position to 0 00394 put_int(num); 00395 00396 // this is the word boundary for writing xdr bytes in a vector. 00397 const unsigned int add_to = 8; 00398 // switch to memory on the heap since the thread will need to access it 00399 // after this code returns. 00400 char *byte_buf = new char[num + add_to]; 00401 XDR byte_sink; 00402 try { 00403 xdrmem_create(&byte_sink, byte_buf, num + add_to, XDR_ENCODE); 00404 if (!xdr_setpos(&byte_sink, 0)) 00405 throw Error("Network I/O Error. Could not send byte vector data - unable to set stream position."); 00406 00407 if (!xdr_bytes(&byte_sink, (char **) &val, (unsigned int *) &num, num + add_to)) 00408 throw Error("Network I/O Error(2). Could not send byte vector data - unable to encode data."); 00409 00410 unsigned int bytes_written = xdr_getpos(&byte_sink); 00411 if (!bytes_written) 00412 throw Error("Network I/O Error. Could not send byte vector data - unable to get stream position."); 00413 00414 #ifdef USE_POSIX_THREADS 00415 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count()); 00416 tm->increment_child_thread_count(); 00417 tm->start_thread(MarshallerThread::write_thread, d_out, byte_buf, bytes_written); 00418 xdr_destroy(&byte_sink); 00419 #else 00420 d_out.write(byte_buf, bytes_written); 00421 xdr_destroy(&byte_sink); 00422 delete byte_buf; 00423 #endif 00424 00425 } 00426 catch (...) { 00427 DBG(cerr << "Caught an exception in put_vector_thread" << endl); 00428 xdr_destroy(&byte_sink); 00429 delete byte_buf; 00430 throw; 00431 } 00432 } 00433 00434 // private 00445 void XDRStreamMarshaller::put_vector(char *val, unsigned int num, int width, Type type) 00446 { 00447 #if 0 00448 if (!val) throw InternalErr(__FILE__, __LINE__, "Buffer pointer is not set."); 00449 #endif 00450 assert(val || num == 0); 00451 00452 // write the number of array members being written, then set the position back to 0 00453 put_int(num); 00454 00455 if (num == 0) 00456 return; 00457 00458 int use_width = width; 00459 if (use_width < 4) use_width = 4; 00460 00461 // the size is the number of elements num times the width of each 00462 // element, then add 4 bytes for the number of elements 00463 int size = (num * use_width) + 4; 00464 00465 // allocate enough memory for the elements 00466 //vector<char> vec_buf(size); 00467 char *vec_buf = new char[size]; 00468 XDR vec_sink; 00469 try { 00470 xdrmem_create(&vec_sink, vec_buf, size, XDR_ENCODE); 00471 00472 // set the position of the sink to 0, we're starting at the beginning 00473 if (!xdr_setpos(&vec_sink, 0)) 00474 throw Error("Network I/O Error. Could not send vector data - unable to set stream position."); 00475 00476 // write the array to the buffer 00477 if (!xdr_array(&vec_sink, (char **) &val, (unsigned int *) &num, size, width, XDRUtils::xdr_coder(type))) 00478 throw Error("Network I/O Error(2). Could not send vector data - unable to encode."); 00479 00480 // how much was written to the buffer 00481 unsigned int bytes_written = xdr_getpos(&vec_sink); 00482 if (!bytes_written) 00483 throw Error("Network I/O Error. Could not send vector data - unable to get stream position."); 00484 00485 #ifdef USE_POSIX_THREADS 00486 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count()); 00487 tm->increment_child_thread_count(); 00488 tm->start_thread(MarshallerThread::write_thread, d_out, vec_buf, bytes_written); 00489 xdr_destroy(&vec_sink); 00490 #else 00491 d_out.write(vec_buf, bytes_written); 00492 xdr_destroy(&vec_sink); 00493 delete vec_buf; 00494 #endif 00495 } 00496 catch (...) { 00497 xdr_destroy(&vec_sink); 00498 delete vec_buf; 00499 throw; 00500 } 00501 } 00502 00514 void XDRStreamMarshaller::put_vector_part(char *val, unsigned int num, int width, Type type) 00515 { 00516 if (width == 1) { 00517 // Add space for the 4 bytes of length info and 4 bytes for padding, even though 00518 // we will not send either of those. 00519 const unsigned int add_to = 8; 00520 unsigned int bufsiz = num + add_to; 00521 //vector<char> byte_buf(bufsiz); 00522 char *byte_buf = new char[bufsiz]; 00523 XDR byte_sink; 00524 try { 00525 xdrmem_create(&byte_sink, byte_buf, bufsiz, XDR_ENCODE); 00526 if (!xdr_setpos(&byte_sink, 0)) 00527 throw Error("Network I/O Error. Could not send byte vector data - unable to set stream position."); 00528 00529 if (!xdr_bytes(&byte_sink, (char **) &val, (unsigned int *) &num, bufsiz)) 00530 throw Error("Network I/O Error(2). Could not send byte vector data - unable to encode data."); 00531 00532 #ifdef USE_POSIX_THREADS 00533 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count()); 00534 tm->increment_child_thread_count(); 00535 00536 // Increment the element count so we can figure out about the padding in put_vector_last() 00537 d_partial_put_byte_count += num; 00538 00539 tm->start_thread(MarshallerThread::write_thread_part, d_out, byte_buf, num); 00540 xdr_destroy(&byte_sink); 00541 #else 00542 // Only send the num bytes that follow the 4 bytes of length info - we skip the 00543 // length info because it's already been sent and we don't send any trailing padding 00544 // bytes in this method (see put_vector_last() for that). 00545 d_out.write(byte_buf + 4, num); 00546 00547 if (d_out.fail()) 00548 throw Error ("Network I/O Error. Could not send initial part of byte vector data"); 00549 00550 // Now increment the element count so we can figure out about the padding in put_vector_last() 00551 d_partial_put_byte_count += num; 00552 00553 xdr_destroy(&byte_sink); 00554 delete byte_buf; 00555 #endif 00556 } 00557 catch (...) { 00558 xdr_destroy(&byte_sink); 00559 delete byte_buf; 00560 throw; 00561 } 00562 } 00563 else { 00564 int use_width = (width < 4) ? 4 : width; 00565 00566 // the size is the number of elements num times the width of each 00567 // element, then add 4 bytes for the (int) number of elements 00568 int size = (num * use_width) + 4; 00569 00570 // allocate enough memory for the elements 00571 //vector<char> vec_buf(size); 00572 char *vec_buf = new char[size]; 00573 XDR vec_sink; 00574 try { 00575 xdrmem_create(&vec_sink, vec_buf, size, XDR_ENCODE); 00576 00577 // set the position of the sink to 0, we're starting at the beginning 00578 if (!xdr_setpos(&vec_sink, 0)) 00579 throw Error("Network I/O Error. Could not send vector data - unable to set stream position."); 00580 00581 // write the array to the buffer 00582 if (!xdr_array(&vec_sink, (char **) &val, (unsigned int *) &num, size, width, XDRUtils::xdr_coder(type))) 00583 throw Error("Network I/O Error(2). Could not send vector data -unable to encode data."); 00584 00585 #ifdef USE_POSIX_THREADS 00586 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count()); 00587 tm->increment_child_thread_count(); 00588 00589 // Increment the element count so we can figure out about the padding in put_vector_last() 00590 d_partial_put_byte_count += (size - 4); 00591 tm->start_thread(MarshallerThread::write_thread_part, d_out, vec_buf, size - 4); 00592 xdr_destroy(&vec_sink); 00593 #else 00594 // write that much out to the output stream, skipping the length data that 00595 // XDR writes since we have already written the length info using put_vector_start() 00596 d_out.write(vec_buf + 4, size - 4); 00597 00598 if (d_out.fail()) 00599 throw Error ("Network I/O Error. Could not send part of vector data"); 00600 00601 // Now increment the element count so we can figure out about the padding in put_vector_last() 00602 d_partial_put_byte_count += (size - 4); 00603 00604 xdr_destroy(&vec_sink); 00605 delete vec_buf; 00606 #endif 00607 } 00608 catch (...) { 00609 xdr_destroy(&vec_sink); 00610 delete vec_buf; 00611 throw; 00612 } 00613 } 00614 } 00615 00616 void XDRStreamMarshaller::dump(ostream &strm) const 00617 { 00618 strm << DapIndent::LMarg << "XDRStreamMarshaller::dump - (" << (void *) this << ")" << endl; 00619 } 00620 00621 } // namespace libdap 00622