00001 #ifndef __XRDFILECACHE_FILE_HH__
00002 #define __XRDFILECACHE_FILE_HH__
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "XrdCl/XrdClXRootDResponses.hh"
00022 #include "XrdCl/XrdClDefaultEnv.hh"
00023
00024 #include "XrdOuc/XrdOucCache2.hh"
00025 #include "XrdOuc/XrdOucIOVec.hh"
00026
00027 #include "XrdFileCacheInfo.hh"
00028 #include "XrdFileCacheStats.hh"
00029
00030 #include <string>
00031 #include <map>
00032
00033 class XrdJob;
00034 class XrdOucIOVec;
00035
00036 namespace XrdCl
00037 {
00038 class Log;
00039 }
00040
00041 namespace XrdFileCache
00042 {
00043 class BlockResponseHandler;
00044 class DirectResponseHandler;
00045 class IO;
00046
00047 struct ReadVBlockListRAM;
00048 struct ReadVChunkListRAM;
00049 struct ReadVBlockListDisk;
00050 struct ReadVChunkListDisk;
00051 }
00052
00053
00054 namespace XrdFileCache
00055 {
00056
00057 class File;
00058
00059 class Block
00060 {
00061 public:
00062 std::vector<char> m_buff;
00063 long long m_offset;
00064 File *m_file;
00065 IO *m_io;
00066
00067 int m_refcnt;
00068 int m_errno;
00069 bool m_downloaded;
00070 bool m_prefetch;
00071
00072 Block(File *f, IO *io, long long off, int size, bool m_prefetch) :
00073 m_offset(off), m_file(f), m_io(io), m_refcnt(0),
00074 m_errno(0), m_downloaded(false), m_prefetch(m_prefetch)
00075 {
00076 m_buff.resize(size);
00077 }
00078
00079 char* get_buff(long long pos = 0) { return &m_buff[pos]; }
00080 int get_size() { return (int) m_buff.size(); }
00081 long long get_offset() { return m_offset; }
00082
00083 IO* get_io() const { return m_io; }
00084
00085 bool is_finished() { return m_downloaded || m_errno != 0; }
00086 bool is_ok() { return m_downloaded; }
00087 bool is_failed() { return m_errno != 0; }
00088
00089 void set_downloaded() { m_downloaded = true; }
00090 void set_error(int err) { m_errno = err; }
00091
00092 void reset_error_and_set_io(IO *io)
00093 {
00094 m_errno = 0;
00095 m_io = io;
00096 }
00097 };
00098
00099
00100
00101 class BlockResponseHandler : public XrdOucCacheIOCB
00102 {
00103 public:
00104 Block *m_block;
00105 bool m_for_prefetch;
00106
00107 BlockResponseHandler(Block *b, bool prefetch) :
00108 m_block(b), m_for_prefetch(prefetch) {}
00109
00110 virtual void Done(int result);
00111 };
00112
00113
00114
00115 class DirectResponseHandler : public XrdOucCacheIOCB
00116 {
00117 public:
00118 XrdSysCondVar m_cond;
00119 int m_to_wait;
00120 int m_errno;
00121
00122 DirectResponseHandler(int to_wait) : m_cond(0), m_to_wait(to_wait), m_errno(0) {}
00123
00124 bool is_finished() { XrdSysCondVarHelper _lck(m_cond); return m_to_wait == 0; }
00125 bool is_ok() { XrdSysCondVarHelper _lck(m_cond); return m_to_wait == 0 && m_errno == 0; }
00126 bool is_failed() { XrdSysCondVarHelper _lck(m_cond); return m_errno != 0; }
00127
00128 virtual void Done(int result);
00129 };
00130
00131
00132
00133 class File
00134 {
00135 public:
00136
00138
00139 File(const std::string &path, long long offset, long long fileSize);
00140
00141
00143
00144 static File* FileOpen(const std::string &path, long long offset, long long fileSize);
00145
00146
00148
00149 ~File();
00150
00152 void BlockRemovedFromWriteQ(Block*);
00153
00155 bool Open();
00156
00158 int ReadV(IO *io, const XrdOucIOVec *readV, int n);
00159
00161 int Read (IO *io, char* buff, long long offset, int size);
00162
00163
00165
00166 bool isOpen() const { return m_is_open; }
00167
00168
00171
00172 bool ioActive(IO *io);
00173
00174
00177
00178 void RequestSyncOfDetachStats();
00179
00180
00183
00184 bool FinalizeSyncBeforeExit();
00185
00186
00188
00189 void Sync();
00190
00191
00193
00194 Stats& GetStats() { return m_stats; }
00195
00196 void ProcessBlockResponse(BlockResponseHandler* brh, int res);
00197 void WriteBlockToDisk(Block* b);
00198
00199 void Prefetch();
00200
00201 float GetPrefetchScore() const;
00202
00204 const char* lPath() const;
00205
00206 std::string& GetLocalPath() { return m_filename; }
00207
00208 XrdSysError* GetLog();
00209 XrdSysTrace* GetTrace();
00210
00211 long long GetFileSize() { return m_fileSize; }
00212
00213 void AddIO(IO *io);
00214 int GetPrefetchCountOnIO(IO *io);
00215 void StopPrefetchingOnIO(IO *io);
00216 void RemoveIO(IO *io);
00217
00218
00219 int get_ref_cnt() { return m_ref_cnt; }
00220 int inc_ref_cnt() { return ++m_ref_cnt; }
00221 int dec_ref_cnt() { return --m_ref_cnt; }
00222
00223 private:
00224 enum PrefetchState_e { kOff=-1, kOn, kHold, kStopped, kComplete };
00225
00226 int m_ref_cnt;
00227
00228 bool m_is_open;
00229
00230 XrdOssDF *m_output;
00231 XrdOssDF *m_infoFile;
00232 Info m_cfi;
00233
00234 std::string m_filename;
00235 long long m_offset;
00236 long long m_fileSize;
00237
00238
00239
00240 struct IODetails
00241 {
00242 int m_active_prefetches;
00243 bool m_allow_prefetching;
00244
00245 IODetails() : m_active_prefetches(0), m_allow_prefetching(true) {}
00246 };
00247
00248 typedef std::map<IO*, IODetails> IoMap_t;
00249 typedef IoMap_t::iterator IoMap_i;
00250
00251 IoMap_t m_io_map;
00252 IoMap_i m_current_io;
00253 int m_ios_in_detach;
00254
00255
00256 std::vector<int> m_writes_during_sync;
00257 int m_non_flushed_cnt;
00258 bool m_in_sync;
00259
00260 typedef std::list<int> IntList_t;
00261 typedef IntList_t::iterator IntList_i;
00262
00263 typedef std::list<Block*> BlockList_t;
00264 typedef BlockList_t::iterator BlockList_i;
00265
00266 typedef std::map<int, Block*> BlockMap_t;
00267 typedef BlockMap_t::iterator BlockMap_i;
00268
00269
00270 BlockMap_t m_block_map;
00271
00272 XrdSysCondVar m_downloadCond;
00273
00274 Stats m_stats;
00275
00276 PrefetchState_e m_prefetchState;
00277
00278 int m_prefetchReadCnt;
00279 int m_prefetchHitCnt;
00280 float m_prefetchScore;
00281
00282 bool m_detachTimeIsLogged;
00283
00284 static const char *m_traceID;
00285 bool overlap(int blk,
00286 long long blk_size,
00287 long long req_off,
00288 int req_size,
00289
00290 long long &off,
00291 long long &blk_off,
00292 long long &size);
00293
00294
00295 Block* PrepareBlockRequest(int i, IO *io, bool prefetch);
00296
00297 void ProcessBlockRequest (Block *b, bool prefetch);
00298 void ProcessBlockRequests(BlockList_t& blks, bool prefetch);
00299
00300 int RequestBlocksDirect(IO *io, DirectResponseHandler *handler, IntList_t& blocks,
00301 char* buff, long long req_off, long long req_size);
00302
00303 int ReadBlocksFromDisk(IntList_t& blocks,
00304 char* req_buf, long long req_off, long long req_size);
00305
00306
00307 bool VReadValidate (const XrdOucIOVec *readV, int n);
00308 bool VReadPreProcess (IO *io, const XrdOucIOVec *readV, int n,
00309 ReadVBlockListRAM& blks_to_process,
00310 ReadVBlockListDisk& blks_on_disk,
00311 std::vector<XrdOucIOVec>& chunkVec);
00312 int VReadFromDisk (const XrdOucIOVec *readV, int n,
00313 ReadVBlockListDisk& blks_on_disk);
00314 int VReadProcessBlocks(IO *io, const XrdOucIOVec *readV, int n,
00315 std::vector<ReadVChunkListRAM>& blks_to_process,
00316 std::vector<ReadVChunkListRAM>& blks_rocessed);
00317
00318 long long BufferSize();
00319
00320 void inc_ref_count(Block*);
00321 void dec_ref_count(Block*);
00322 void free_block(Block*);
00323
00324 bool select_current_io_or_disable_prefetching(bool skip_current);
00325
00326 int offsetIdx(int idx);
00327 };
00328
00329 }
00330
00331 #endif