svcore
1.9
|
00001 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */ 00002 00003 /* 00004 Sonic Visualiser 00005 An audio file viewer and annotation editor. 00006 Centre for Digital Music, Queen Mary, University of London. 00007 This file copyright 2006 Chris Cannam. 00008 00009 This program is free software; you can redistribute it and/or 00010 modify it under the terms of the GNU General Public License as 00011 published by the Free Software Foundation; either version 2 of the 00012 License, or (at your option) any later version. See the file 00013 COPYING included with this distribution for more information. 00014 */ 00015 00016 #include "FileReadThread.h" 00017 00018 #include "base/Profiler.h" 00019 #include "base/Thread.h" 00020 00021 #include <iostream> 00022 #include <unistd.h> 00023 #include <cstdio> 00024 00025 //#define DEBUG_FILE_READ_THREAD 1 00026 00027 FileReadThread::FileReadThread() : 00028 m_nextToken(0), 00029 m_exiting(false) 00030 { 00031 } 00032 00033 void 00034 FileReadThread::run() 00035 { 00036 MutexLocker locker(&m_mutex, "FileReadThread::run::m_mutex"); 00037 00038 while (!m_exiting) { 00039 if (m_queue.empty()) { 00040 m_condition.wait(&m_mutex, 1000); 00041 } else { 00042 process(); 00043 } 00044 notifyCancelled(); 00045 } 00046 00047 notifyCancelled(); 00048 00049 #ifdef DEBUG_FILE_READ_THREAD 00050 SVDEBUG << "FileReadThread::run() exiting" << endl; 00051 #endif 00052 } 00053 00054 void 00055 FileReadThread::finish() 00056 { 00057 #ifdef DEBUG_FILE_READ_THREAD 00058 SVDEBUG << "FileReadThread::finish()" << endl; 00059 #endif 00060 00061 { 00062 MutexLocker locker(&m_mutex, "FileReadThread::finish::m_mutex"); 00063 00064 while (!m_queue.empty()) { 00065 m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second; 00066 m_newlyCancelled.insert(m_queue.begin()->first); 00067 m_queue.erase(m_queue.begin()); 00068 } 00069 00070 m_exiting = true; 00071 } 00072 00073 m_condition.wakeAll(); 00074 00075 #ifdef DEBUG_FILE_READ_THREAD 00076 SVDEBUG << "FileReadThread::finish() exiting" << endl; 00077 #endif 00078 } 00079 00080 int 00081 FileReadThread::request(const Request &request) 00082 { 00083 int token; 00084 00085 { 00086 MutexLocker locker(&m_mutex, "FileReadThread::request::m_mutex"); 00087 00088 token = m_nextToken++; 00089 m_queue[token] = request; 00090 } 00091 00092 m_condition.wakeAll(); 00093 00094 return token; 00095 } 00096 00097 void 00098 FileReadThread::cancel(int token) 00099 { 00100 { 00101 MutexLocker locker(&m_mutex, "FileReadThread::cancel::m_mutex"); 00102 00103 if (m_queue.find(token) != m_queue.end()) { 00104 m_cancelledRequests[token] = m_queue[token]; 00105 m_queue.erase(token); 00106 m_newlyCancelled.insert(token); 00107 } else if (m_readyRequests.find(token) != m_readyRequests.end()) { 00108 m_cancelledRequests[token] = m_readyRequests[token]; 00109 m_readyRequests.erase(token); 00110 } else { 00111 cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << endl; 00112 } 00113 } 00114 00115 #ifdef DEBUG_FILE_READ_THREAD 00116 SVDEBUG << "FileReadThread::cancel(" << token << ") waking condition" << endl; 00117 #endif 00118 00119 m_condition.wakeAll(); 00120 } 00121 00122 bool 00123 FileReadThread::isReady(int token) 00124 { 00125 MutexLocker locker(&m_mutex, "FileReadThread::isReady::m_mutex"); 00126 00127 bool ready = m_readyRequests.find(token) != m_readyRequests.end(); 00128 00129 return ready; 00130 } 00131 00132 bool 00133 FileReadThread::isCancelled(int token) 00134 { 00135 MutexLocker locker(&m_mutex, "FileReadThread::isCancelled::m_mutex"); 00136 00137 bool cancelled = 00138 m_cancelledRequests.find(token) != m_cancelledRequests.end() && 00139 m_newlyCancelled.find(token) == m_newlyCancelled.end(); 00140 00141 return cancelled; 00142 } 00143 00144 bool 00145 FileReadThread::haveRequest(int token) 00146 { 00147 MutexLocker locker(&m_mutex, "FileReadThread::haveRequest::m_mutex"); 00148 00149 bool found = false; 00150 00151 if (m_queue.find(token) != m_queue.end()) { 00152 found = true; 00153 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) { 00154 found = true; 00155 } else if (m_readyRequests.find(token) != m_readyRequests.end()) { 00156 found = true; 00157 } 00158 00159 return found; 00160 } 00161 00162 bool 00163 FileReadThread::getRequest(int token, Request &request) 00164 { 00165 MutexLocker locker(&m_mutex, "FileReadThread::getRequest::m_mutex"); 00166 00167 bool found = false; 00168 00169 if (m_queue.find(token) != m_queue.end()) { 00170 request = m_queue[token]; 00171 found = true; 00172 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) { 00173 request = m_cancelledRequests[token]; 00174 found = true; 00175 } else if (m_readyRequests.find(token) != m_readyRequests.end()) { 00176 request = m_readyRequests[token]; 00177 found = true; 00178 } 00179 00180 return found; 00181 } 00182 00183 void 00184 FileReadThread::done(int token) 00185 { 00186 MutexLocker locker(&m_mutex, "FileReadThread::done::m_mutex"); 00187 00188 bool found = false; 00189 00190 if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) { 00191 m_cancelledRequests.erase(token); 00192 m_newlyCancelled.erase(token); 00193 found = true; 00194 } else if (m_readyRequests.find(token) != m_readyRequests.end()) { 00195 m_readyRequests.erase(token); 00196 found = true; 00197 } else if (m_queue.find(token) != m_queue.end()) { 00198 cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << endl; 00199 } 00200 00201 if (!found) { 00202 cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << endl; 00203 } 00204 } 00205 00206 void 00207 FileReadThread::process() 00208 { 00209 // entered with m_mutex locked and m_queue non-empty 00210 00211 Profiler profiler("FileReadThread::process", true); 00212 00213 int token = m_queue.begin()->first; 00214 Request request = m_queue.begin()->second; 00215 00216 m_mutex.unlock(); 00217 00218 #ifdef DEBUG_FILE_READ_THREAD 00219 SVDEBUG << "FileReadThread::process: reading " << request.start << ", " << request.size << " on " << request.fd << endl; 00220 #endif 00221 00222 bool successful = false; 00223 bool seekFailed = false; 00224 ssize_t r = 0; 00225 00226 { 00227 MutexLocker rlocker(request.mutex, "FileReadThread::process::request.mutex"); 00228 00229 if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) { 00230 seekFailed = true; 00231 } else { 00232 00233 // if request.size is large, we want to avoid making a single 00234 // system call to read it all as it may block too much 00235 00236 static const size_t blockSize = 256 * 1024; 00237 00238 size_t size = request.size; 00239 char *destination = request.data; 00240 00241 while (size > 0) { 00242 size_t readSize = size; 00243 if (readSize > blockSize) readSize = blockSize; 00244 ssize_t br = ::read(request.fd, destination, readSize); 00245 if (br < 0) { 00246 r = br; 00247 break; 00248 } else { 00249 r += br; 00250 if (br < ssize_t(readSize)) break; 00251 } 00252 destination += readSize; 00253 size -= readSize; 00254 } 00255 } 00256 } 00257 00258 if (seekFailed) { 00259 ::perror("Seek failed"); 00260 cerr << "ERROR: FileReadThread::process: seek to " 00261 << request.start << " failed" << endl; 00262 request.size = 0; 00263 } else { 00264 if (r < 0) { 00265 ::perror("ERROR: FileReadThread::process: Read failed"); 00266 cerr << "ERROR: FileReadThread::process: read of " 00267 << request.size << " at " 00268 << request.start << " failed" << endl; 00269 request.size = 0; 00270 } else if (r < ssize_t(request.size)) { 00271 cerr << "WARNING: FileReadThread::process: read " 00272 << request.size << " returned only " << r << " bytes" 00273 << endl; 00274 request.size = r; 00275 usleep(100000); 00276 } else { 00277 successful = true; 00278 } 00279 } 00280 00281 // Check that the token hasn't been cancelled and the thread 00282 // hasn't been asked to finish 00283 00284 m_mutex.lock(); 00285 00286 request.successful = successful; 00287 00288 if (m_queue.find(token) != m_queue.end() && !m_exiting) { 00289 m_queue.erase(token); 00290 m_readyRequests[token] = request; 00291 #ifdef DEBUG_FILE_READ_THREAD 00292 SVDEBUG << "FileReadThread::process: done, marking as ready (success = " << m_readyRequests[token].successful << ")" << endl; 00293 #endif 00294 } else { 00295 #ifdef DEBUG_FILE_READ_THREAD 00296 if (m_exiting) { 00297 SVDEBUG << "FileReadThread::process: exiting" << endl; 00298 } else { 00299 SVDEBUG << "FileReadThread::process: request disappeared" << endl; 00300 } 00301 #endif 00302 } 00303 } 00304 00305 void 00306 FileReadThread::notifyCancelled() 00307 { 00308 // entered with m_mutex locked 00309 00310 while (!m_newlyCancelled.empty()) { 00311 00312 int token = *m_newlyCancelled.begin(); 00313 00314 #ifdef DEBUG_FILE_READ_THREAD 00315 SVDEBUG << "FileReadThread::notifyCancelled: token " << token << endl; 00316 #endif 00317 00318 m_newlyCancelled.erase(token); 00319 } 00320 } 00321 00322