svcore  1.9
FileReadThread.cpp
Go to the documentation of this file.
00001 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*-  vi:set ts=8 sts=4 sw=4: */
00002 
00003 /*
00004     Sonic Visualiser
00005     An audio file viewer and annotation editor.
00006     Centre for Digital Music, Queen Mary, University of London.
00007     This file copyright 2006 Chris Cannam.
00008     
00009     This program is free software; you can redistribute it and/or
00010     modify it under the terms of the GNU General Public License as
00011     published by the Free Software Foundation; either version 2 of the
00012     License, or (at your option) any later version.  See the file
00013     COPYING included with this distribution for more information.
00014 */
00015 
00016 #include "FileReadThread.h"
00017 
00018 #include "base/Profiler.h"
00019 #include "base/Thread.h"
00020 
00021 #include <iostream>
00022 #include <unistd.h>
00023 #include <cstdio>
00024 
00025 //#define DEBUG_FILE_READ_THREAD 1
00026 
00027 FileReadThread::FileReadThread() :
00028     m_nextToken(0),
00029     m_exiting(false)
00030 {
00031 }
00032 
00033 void
00034 FileReadThread::run()
00035 {
00036     MutexLocker locker(&m_mutex, "FileReadThread::run::m_mutex");
00037 
00038     while (!m_exiting) {
00039         if (m_queue.empty()) {
00040             m_condition.wait(&m_mutex, 1000);
00041         } else {
00042             process();
00043         }
00044         notifyCancelled();
00045     }
00046 
00047     notifyCancelled();
00048 
00049 #ifdef DEBUG_FILE_READ_THREAD
00050     SVDEBUG << "FileReadThread::run() exiting" << endl;
00051 #endif
00052 }
00053 
00054 void
00055 FileReadThread::finish()
00056 {
00057 #ifdef DEBUG_FILE_READ_THREAD
00058     SVDEBUG << "FileReadThread::finish()" << endl;
00059 #endif
00060 
00061     {
00062         MutexLocker locker(&m_mutex, "FileReadThread::finish::m_mutex");
00063 
00064         while (!m_queue.empty()) {
00065             m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second;
00066             m_newlyCancelled.insert(m_queue.begin()->first);
00067             m_queue.erase(m_queue.begin());
00068         }
00069         
00070         m_exiting = true;
00071     }
00072 
00073     m_condition.wakeAll();
00074 
00075 #ifdef DEBUG_FILE_READ_THREAD
00076     SVDEBUG << "FileReadThread::finish() exiting" << endl;
00077 #endif
00078 }
00079 
00080 int
00081 FileReadThread::request(const Request &request)
00082 {
00083     int token;
00084 
00085     {
00086         MutexLocker locker(&m_mutex, "FileReadThread::request::m_mutex");
00087     
00088         token = m_nextToken++;
00089         m_queue[token] = request;
00090     }
00091 
00092     m_condition.wakeAll();
00093 
00094     return token;
00095 }
00096 
00097 void
00098 FileReadThread::cancel(int token)
00099 {
00100     {
00101         MutexLocker locker(&m_mutex, "FileReadThread::cancel::m_mutex");
00102 
00103         if (m_queue.find(token) != m_queue.end()) {
00104             m_cancelledRequests[token] = m_queue[token];
00105             m_queue.erase(token);
00106             m_newlyCancelled.insert(token);
00107         } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
00108             m_cancelledRequests[token] = m_readyRequests[token];
00109             m_readyRequests.erase(token);
00110         } else {
00111             cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << endl;
00112         }
00113     }
00114 
00115 #ifdef DEBUG_FILE_READ_THREAD
00116     SVDEBUG << "FileReadThread::cancel(" << token << ") waking condition" << endl;
00117 #endif
00118 
00119     m_condition.wakeAll();
00120 }
00121 
00122 bool
00123 FileReadThread::isReady(int token)
00124 {
00125     MutexLocker locker(&m_mutex, "FileReadThread::isReady::m_mutex");
00126 
00127     bool ready = m_readyRequests.find(token) != m_readyRequests.end();
00128 
00129     return ready;
00130 }
00131 
00132 bool
00133 FileReadThread::isCancelled(int token)
00134 {
00135     MutexLocker locker(&m_mutex, "FileReadThread::isCancelled::m_mutex");
00136 
00137     bool cancelled = 
00138         m_cancelledRequests.find(token) != m_cancelledRequests.end() &&
00139         m_newlyCancelled.find(token) == m_newlyCancelled.end();
00140 
00141     return cancelled;
00142 }
00143 
00144 bool
00145 FileReadThread::haveRequest(int token)
00146 {
00147     MutexLocker locker(&m_mutex, "FileReadThread::haveRequest::m_mutex");
00148 
00149     bool found = false;
00150 
00151     if (m_queue.find(token) != m_queue.end()) {
00152         found = true;
00153     } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
00154         found = true;
00155     } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
00156         found = true;
00157     }
00158 
00159     return found;
00160 }
00161 
00162 bool
00163 FileReadThread::getRequest(int token, Request &request)
00164 {
00165     MutexLocker locker(&m_mutex, "FileReadThread::getRequest::m_mutex");
00166 
00167     bool found = false;
00168 
00169     if (m_queue.find(token) != m_queue.end()) {
00170         request = m_queue[token];
00171         found = true;
00172     } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
00173         request = m_cancelledRequests[token];
00174         found = true;
00175     } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
00176         request = m_readyRequests[token];
00177         found = true;
00178     }
00179 
00180     return found;
00181 }
00182 
00183 void
00184 FileReadThread::done(int token)
00185 {
00186     MutexLocker locker(&m_mutex, "FileReadThread::done::m_mutex");
00187 
00188     bool found = false;
00189 
00190     if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
00191         m_cancelledRequests.erase(token);
00192         m_newlyCancelled.erase(token);
00193         found = true;
00194     } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
00195         m_readyRequests.erase(token);
00196         found = true;
00197     } else if (m_queue.find(token) != m_queue.end()) {
00198         cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << endl;
00199     }
00200 
00201     if (!found) {
00202         cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << endl;
00203     }
00204 }
00205 
00206 void
00207 FileReadThread::process()
00208 {
00209     // entered with m_mutex locked and m_queue non-empty
00210 
00211     Profiler profiler("FileReadThread::process", true);
00212 
00213     int token = m_queue.begin()->first;
00214     Request request = m_queue.begin()->second;
00215 
00216     m_mutex.unlock();
00217 
00218 #ifdef DEBUG_FILE_READ_THREAD
00219     SVDEBUG << "FileReadThread::process: reading " << request.start << ", " << request.size << " on " << request.fd << endl;
00220 #endif
00221 
00222     bool successful = false;
00223     bool seekFailed = false;
00224     ssize_t r = 0;
00225 
00226     { 
00227         MutexLocker rlocker(request.mutex, "FileReadThread::process::request.mutex");
00228 
00229         if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) {
00230             seekFailed = true;
00231         } else {
00232         
00233             // if request.size is large, we want to avoid making a single
00234             // system call to read it all as it may block too much
00235             
00236             static const size_t blockSize = 256 * 1024;
00237             
00238             size_t size = request.size;
00239             char *destination = request.data;
00240             
00241             while (size > 0) {
00242                 size_t readSize = size;
00243                 if (readSize > blockSize) readSize = blockSize;
00244                 ssize_t br = ::read(request.fd, destination, readSize);
00245                 if (br < 0) { 
00246                     r = br;
00247                     break;
00248                 } else {
00249                     r += br;
00250                     if (br < ssize_t(readSize)) break;
00251                 }
00252                 destination += readSize;
00253                 size -= readSize;
00254             }
00255         }
00256     }
00257 
00258     if (seekFailed) {
00259         ::perror("Seek failed");
00260         cerr << "ERROR: FileReadThread::process: seek to "
00261                   << request.start << " failed" << endl;
00262         request.size = 0;
00263     } else {
00264         if (r < 0) {
00265             ::perror("ERROR: FileReadThread::process: Read failed");
00266             cerr << "ERROR: FileReadThread::process: read of "
00267                       << request.size << " at "
00268                       << request.start << " failed" << endl;
00269             request.size = 0;
00270         } else if (r < ssize_t(request.size)) {
00271             cerr << "WARNING: FileReadThread::process: read "
00272                       << request.size << " returned only " << r << " bytes"
00273                       << endl;
00274             request.size = r;
00275             usleep(100000);
00276         } else {
00277             successful = true;
00278         }
00279     }
00280         
00281     // Check that the token hasn't been cancelled and the thread
00282     // hasn't been asked to finish
00283     
00284     m_mutex.lock();
00285 
00286     request.successful = successful;
00287         
00288     if (m_queue.find(token) != m_queue.end() && !m_exiting) {
00289         m_queue.erase(token);
00290         m_readyRequests[token] = request;
00291 #ifdef DEBUG_FILE_READ_THREAD
00292         SVDEBUG << "FileReadThread::process: done, marking as ready (success = " << m_readyRequests[token].successful << ")" << endl;
00293 #endif
00294     } else {
00295 #ifdef DEBUG_FILE_READ_THREAD
00296         if (m_exiting) {
00297             SVDEBUG << "FileReadThread::process: exiting" << endl;
00298         } else {
00299             SVDEBUG << "FileReadThread::process: request disappeared" << endl;
00300         }
00301 #endif
00302     }
00303 }
00304 
00305 void
00306 FileReadThread::notifyCancelled()
00307 {
00308     // entered with m_mutex locked
00309 
00310     while (!m_newlyCancelled.empty()) {
00311 
00312         int token = *m_newlyCancelled.begin();
00313 
00314 #ifdef DEBUG_FILE_READ_THREAD
00315         SVDEBUG << "FileReadThread::notifyCancelled: token " << token << endl;
00316 #endif
00317 
00318         m_newlyCancelled.erase(token);
00319     }
00320 }
00321         
00322