svcore  1.9
FileReadThread.cpp
Go to the documentation of this file.
1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
2 
3 /*
4  Sonic Visualiser
5  An audio file viewer and annotation editor.
6  Centre for Digital Music, Queen Mary, University of London.
7  This file copyright 2006 Chris Cannam.
8 
9  This program is free software; you can redistribute it and/or
10  modify it under the terms of the GNU General Public License as
11  published by the Free Software Foundation; either version 2 of the
12  License, or (at your option) any later version. See the file
13  COPYING included with this distribution for more information.
14 */
15 
16 #include "FileReadThread.h"
17 
18 #include "base/Profiler.h"
19 #include "base/Thread.h"
20 
21 #include <iostream>
22 #include <unistd.h>
23 #include <cstdio>
24 
25 //#define DEBUG_FILE_READ_THREAD 1
26 
28  m_nextToken(0),
29  m_exiting(false)
30 {
31 }
32 
33 void
35 {
36  MutexLocker locker(&m_mutex, "FileReadThread::run::m_mutex");
37 
38  while (!m_exiting) {
39  if (m_queue.empty()) {
40  m_condition.wait(&m_mutex, 1000);
41  } else {
42  process();
43  }
45  }
46 
48 
49 #ifdef DEBUG_FILE_READ_THREAD
50  SVDEBUG << "FileReadThread::run() exiting" << endl;
51 #endif
52 }
53 
54 void
56 {
57 #ifdef DEBUG_FILE_READ_THREAD
58  SVDEBUG << "FileReadThread::finish()" << endl;
59 #endif
60 
61  {
62  MutexLocker locker(&m_mutex, "FileReadThread::finish::m_mutex");
63 
64  while (!m_queue.empty()) {
65  m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second;
66  m_newlyCancelled.insert(m_queue.begin()->first);
67  m_queue.erase(m_queue.begin());
68  }
69 
70  m_exiting = true;
71  }
72 
73  m_condition.wakeAll();
74 
75 #ifdef DEBUG_FILE_READ_THREAD
76  SVDEBUG << "FileReadThread::finish() exiting" << endl;
77 #endif
78 }
79 
80 int
82 {
83  int token;
84 
85  {
86  MutexLocker locker(&m_mutex, "FileReadThread::request::m_mutex");
87 
88  token = m_nextToken++;
89  m_queue[token] = request;
90  }
91 
92  m_condition.wakeAll();
93 
94  return token;
95 }
96 
97 void
99 {
100  {
101  MutexLocker locker(&m_mutex, "FileReadThread::cancel::m_mutex");
102 
103  if (m_queue.find(token) != m_queue.end()) {
104  m_cancelledRequests[token] = m_queue[token];
105  m_queue.erase(token);
106  m_newlyCancelled.insert(token);
107  } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
108  m_cancelledRequests[token] = m_readyRequests[token];
109  m_readyRequests.erase(token);
110  } else {
111  cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << endl;
112  }
113  }
114 
115 #ifdef DEBUG_FILE_READ_THREAD
116  SVDEBUG << "FileReadThread::cancel(" << token << ") waking condition" << endl;
117 #endif
118 
119  m_condition.wakeAll();
120 }
121 
122 bool
124 {
125  MutexLocker locker(&m_mutex, "FileReadThread::isReady::m_mutex");
126 
127  bool ready = m_readyRequests.find(token) != m_readyRequests.end();
128 
129  return ready;
130 }
131 
132 bool
134 {
135  MutexLocker locker(&m_mutex, "FileReadThread::isCancelled::m_mutex");
136 
137  bool cancelled =
138  m_cancelledRequests.find(token) != m_cancelledRequests.end() &&
139  m_newlyCancelled.find(token) == m_newlyCancelled.end();
140 
141  return cancelled;
142 }
143 
144 bool
146 {
147  MutexLocker locker(&m_mutex, "FileReadThread::haveRequest::m_mutex");
148 
149  bool found = false;
150 
151  if (m_queue.find(token) != m_queue.end()) {
152  found = true;
153  } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
154  found = true;
155  } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
156  found = true;
157  }
158 
159  return found;
160 }
161 
162 bool
164 {
165  MutexLocker locker(&m_mutex, "FileReadThread::getRequest::m_mutex");
166 
167  bool found = false;
168 
169  if (m_queue.find(token) != m_queue.end()) {
170  request = m_queue[token];
171  found = true;
172  } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
173  request = m_cancelledRequests[token];
174  found = true;
175  } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
176  request = m_readyRequests[token];
177  found = true;
178  }
179 
180  return found;
181 }
182 
183 void
185 {
186  MutexLocker locker(&m_mutex, "FileReadThread::done::m_mutex");
187 
188  bool found = false;
189 
190  if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
191  m_cancelledRequests.erase(token);
192  m_newlyCancelled.erase(token);
193  found = true;
194  } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
195  m_readyRequests.erase(token);
196  found = true;
197  } else if (m_queue.find(token) != m_queue.end()) {
198  cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << endl;
199  }
200 
201  if (!found) {
202  cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << endl;
203  }
204 }
205 
206 void
208 {
209  // entered with m_mutex locked and m_queue non-empty
210 
211  Profiler profiler("FileReadThread::process", true);
212 
213  int token = m_queue.begin()->first;
214  Request request = m_queue.begin()->second;
215 
216  m_mutex.unlock();
217 
218 #ifdef DEBUG_FILE_READ_THREAD
219  SVDEBUG << "FileReadThread::process: reading " << request.start << ", " << request.size << " on " << request.fd << endl;
220 #endif
221 
222  bool successful = false;
223  bool seekFailed = false;
224  ssize_t r = 0;
225 
226  {
227  MutexLocker rlocker(request.mutex, "FileReadThread::process::request.mutex");
228 
229  if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) {
230  seekFailed = true;
231  } else {
232 
233  // if request.size is large, we want to avoid making a single
234  // system call to read it all as it may block too much
235 
236  static const size_t blockSize = 256 * 1024;
237 
238  size_t size = request.size;
239  char *destination = request.data;
240 
241  while (size > 0) {
242  size_t readSize = size;
243  if (readSize > blockSize) readSize = blockSize;
244  ssize_t br = ::read(request.fd, destination, readSize);
245  if (br < 0) {
246  r = br;
247  break;
248  } else {
249  r += br;
250  if (br < ssize_t(readSize)) break;
251  }
252  destination += readSize;
253  size -= readSize;
254  }
255  }
256  }
257 
258  if (seekFailed) {
259  ::perror("Seek failed");
260  cerr << "ERROR: FileReadThread::process: seek to "
261  << request.start << " failed" << endl;
262  request.size = 0;
263  } else {
264  if (r < 0) {
265  ::perror("ERROR: FileReadThread::process: Read failed");
266  cerr << "ERROR: FileReadThread::process: read of "
267  << request.size << " at "
268  << request.start << " failed" << endl;
269  request.size = 0;
270  } else if (r < ssize_t(request.size)) {
271  cerr << "WARNING: FileReadThread::process: read "
272  << request.size << " returned only " << r << " bytes"
273  << endl;
274  request.size = r;
275  usleep(100000);
276  } else {
277  successful = true;
278  }
279  }
280 
281  // Check that the token hasn't been cancelled and the thread
282  // hasn't been asked to finish
283 
284  m_mutex.lock();
285 
286  request.successful = successful;
287 
288  if (m_queue.find(token) != m_queue.end() && !m_exiting) {
289  m_queue.erase(token);
290  m_readyRequests[token] = request;
291 #ifdef DEBUG_FILE_READ_THREAD
292  SVDEBUG << "FileReadThread::process: done, marking as ready (success = " << m_readyRequests[token].successful << ")" << endl;
293 #endif
294  } else {
295 #ifdef DEBUG_FILE_READ_THREAD
296  if (m_exiting) {
297  SVDEBUG << "FileReadThread::process: exiting" << endl;
298  } else {
299  SVDEBUG << "FileReadThread::process: request disappeared" << endl;
300  }
301 #endif
302  }
303 }
304 
305 void
307 {
308  // entered with m_mutex locked
309 
310  while (!m_newlyCancelled.empty()) {
311 
312  int token = *m_newlyCancelled.begin();
313 
314 #ifdef DEBUG_FILE_READ_THREAD
315  SVDEBUG << "FileReadThread::notifyCancelled: token " << token << endl;
316 #endif
317 
318  m_newlyCancelled.erase(token);
319  }
320 }
321 
322 
QWaitCondition m_condition
virtual bool haveRequest(int token)
virtual void cancel(int token)
RequestQueue m_cancelledRequests
RequestQueue m_readyRequests
std::set< int > m_newlyCancelled
virtual void finish()
RequestQueue m_queue
virtual int request(const Request &request)
#define SVDEBUG
Definition: Debug.h:42
virtual bool isCancelled(int token)
virtual void done(int token)
virtual bool getRequest(int token, Request &request)
virtual bool isReady(int token)
virtual void run()
Profile point instance class.
Definition: Profiler.h:86