#!/usr/bin/env python3 # Sample backup and restore utility that simply backs up EOS files iunder /eos/pathName # to local directories /BackupPrefix.something # eos-backup clone -B /BackupPrefix [-P ] /eos/pathName # - performs the actual clone, based on parentId or full, creates catalogFile # Outputs "cloneId catalog " # # eos-backup backup -B /BackupPrefix [-F catalogFile] [-P ] /eos/pathName # - unless specified with '-F' uses "clone" to produce a clone and catalogFile # and then backs up files into /BackupPrefix/cloneId and deletes the clone. # Outputs "cloneId backup media " followed by whatever # the deletion of the clone says. # # eos-backup restore -F /inputCatalog1[,/inputCatalog2[,...]] /outputDirectory # - performs a restore into outputDirectory based on 1 or more catalogFiles # for clone/backup the data are stored in /BackupPrefix/cloneId and /BackupPrefix/cloneId.catalog # the /eos pathnames are as seen by the MGM, not necessarily fuse-mounted paths in the local file system # environment: # EOS_MGM_URL=root://eos-mgm-test.eoscluster.cern.ch must point to the MGM serving /eos/pathName # A primitive full_backup/make_changes/incremental_backup/restore example in bash: # # full clone and backup in one go # read xx cloneId1 xxx media1 <<<$(eos-backup backup -B /tmp/Backup /eos/dockertest/backuptest) # # : make some changes, delete/add/modify files # # # an incremental clone based on first backup # read xx cloneId2 xxx cloneFile2 <<<$(eos-backup clone -B /tmp/Backup -P $cloneId1 /eos/dockertest/backuptest) # # # back those files up # read xx cloneId2 xxx media2 <<<$(eos-backup backup -B /tmp/Backup -F cloneFile2 /eos/dockertest/backuptest) # # # restore the lot # eos-backup restore -F $media1/catalog,$media2/catalog -B /tmp/Backup /tmp/Restore2 from __future__ import print_function import getopt, json, os, re, signal, stat, subprocess, sys, tempfile, threading, time, traceback import pdb # allow for orderly termination stop_requested = False def request_stop(signum, frame): global stop_requested stop_requested = True signal.signal(signal.SIGINT, request_stop) # options try: subcmd = sys.argv[1] opts, args = getopt.getopt(sys.argv[2:], "B:F:P:U:bvS:") except: print("Unable to parse arguments") sys.exit(1) # default backupDirPrefix and parentId dflt = [('-B', "/tmp/backupDir"),('-P', 10),('-S',1024*1024*1024)] # resulting options opt = dict(dflt+opts) if '-U' not in opt: opt['-U'] = os.getenv("EOS_MGM_URL") if opt['-U'] is None: print("specify a valid URL ('-U root://my_mgm')", file=sys.stderr) sys.exit(1) try: os.putenv("XRD_WORKERTHREADS", "50") # os.environ would not work here from XRootD import client from XRootD.client.flags import MkDirFlags, OpenFlags canXroot = True copyprocess = client.CopyProcess() copyjobs = [] Ufs = client.FileSystem(opt['-U']) except: canXroot = False if '-P' in opt: try: parentId = int(opt['-P']) except ValueError: print("invalid parent Id", file=sys.stderr) sys.exit(22) opt['-S'] = int(opt['-S']) if opt['-B'].startswith("root:"): if not canXroot: print("backup media specify 'xroot://' but XRootD.client module did not load", file=sys.stderr) sys.exit(1) eos_instance = os.popen("eos version").readlines()[0].split('=')[1].rstrip() def localpath(url): xx = re.match("^root://\S+/(/\S+)", url) if xx: return xx.group(1) else: return url # create the clone, results in a catalogFile def do_clone(subcmd): catalogFile = tempfile.mkstemp() if '-P' not in opt: opt['-P'] = 10 cmd = "eos oldfind -j -x sys.clone=+%d %s > %s" % (int(opt['-P']), args[0], catalogFile[1]) try: os.system(cmd) except: print("Unexpected error:", sys.exc_info()[0], file=sys.stderr) print("cmd:", cmd, file=sys.stderr) raise line1 = open(catalogFile[1]).readline() try: xx = json.loads(line1) cloneId = xx['c'] except Exception as e: print(e, "catalogFile", catalogFile[1], file=sys.stderr) if hasattr(e, 'msg'): print(e.msg, file=sys.stderr) if hasattr(e, 'message'): print(e.message, file=sys.stderr) print("line #1: '%s'" % line1, file=sys.stderr) raise tCatFile = "%s.%s.clone" % (catalogFile[1], cloneId) os.rename(catalogFile[1], tCatFile) if subcmd == "clone": print("cloneId %s catalog %s" % (cloneId, tCatFile)) return (cloneId, tCatFile) # blob: holds a single BIG file, a flat set of files copied with xrdcp, # or a flat set of files copied asynchronously BLOBS = [] blobCV = threading.Condition() blobq = [] blob_slots = 6 # run slots blobs_active = [] class Blob: # the files where data are stored in def __init__(self, prefix, isCopy=False): # blob chain management self.blob_num = len(BLOBS) BLOBS.append(self) self.prefix = prefix self.isCopy = isCopy self.closing = False self.blobname = "%sb.%d" % (prefix, self.blob_num) self.size = 0 # current file size self.csize = 0 # current commited size # files to be transferred into this blob (or placeholder for copy-blobs and xrdcp) self.bufsz = 1<<24 # 16 MB self.items = [] # list of (jsonTag, path, cpath, urlopts) self.currItem = -1 # item current being backed up self.state = -1 # stopped=-1, idle=0, copy_clone=1, copy_live=2, override=3, ... in close=98, closed=99 self.blXroot = canXroot and prefix.startswith("root:") if self.blXroot: if not self.isCopy: self.bfile = client.File() self.bfile.open(self.blobname, OpenFlags.NEW) def cb(st, resp, hotlist): with blobCV: blobq.append((self, (st, resp, hotlist,))) blobCV.notify() self.XrdCB = cb else: self.bfile = open(self.blobname, "wb") # 'wb' or 'w' hardly matters, we don't write ourselves os.lseek(self.bfile.fileno(), 0, 0) # this may be implicit, but vital for the subprocess writes print("blob", self.blob_num, f"created as {self.blobname} copy={self.isCopy}", file=sys.stderr) def getBlob(self, sz, noCopy=False): if sz >= opt['-S']/10 and not noCopy: # create a copyjob item, blob will be a single file new_blob = Blob(self.prefix, isCopy=True) new_blob.csize = sz # save projected size, we'll start at offset 0 anyway return (self, new_blob) elif sz > opt['-S']: new_blob = Blob(self.prefix) return (self, new_blob) if self.csize > opt['-S']: if "-v" in opt: print(f"blob {self.blob_num} committed {self.csize} files {len(self.items)}", file=sys.stderr) new_blob = Blob(self.prefix) new_blob.csize = sz blobs_active.remove(self) blobs_active.append(new_blob) self.close() return (new_blob, new_blob) self.csize += sz return (self, self) def config_copyjob(self, src, altsrc): # clone / live file self.state = 0 self.src = (src, altsrc) self.trg = "%sb.%d" % (self.prefix, self.blob_num) if "-v" in opt: print(f"*debug* copyjob {self.src} -> {self.trg}", file=sys.stderr) self.srcIndex = -1 self.size = self.csize # There is a complication here: Ufs.stat() on the clone works (if it exists); # Ufs.stat() on the "/.fxid:xxxx" was not always supported in EOS. But since reading # from the live file is plan-B anyway no stat is really needed - just read until csize if self.srcIndex < 0: st, info = Ufs.stat(self.src[1]) # typically tries the clone first print(f"copyjob st={st.ok}/{st.code} info={info}", file=sys.stderr) #debug if st.ok: self.srcIndex = 1 else: # else the live file self.srcIndex = 0 if "-v" in opt: print(f"Copyjob live file {self.src[0]}", file=sys.stderr) st, info = Ufs.stat(self.src[0]) # this should be supported now if "-v" in opt: print(f"st {st}, info {info}") if st.ok: self.size = info.size if self.srcIndex < 0: return False copyjobs.append(self) if "-v" in opt: print("copy job", self.src[self.srcIndex], self.blobname, "size", self.size, file=sys.stderr) copyprocess.add_job(self.src[self.srcIndex], self.blobname, thirdparty="only") # could be "first" return True def close(self): global blob_slots self.closing = True if self.state == 0: self.state_machine() if self.state == 98: self.state = 99 if not self.isCopy: self.bfile.close() blob_slots += 1 print(f"blob {self.blobname} closed size {self.size} slots {blob_slots}", file=sys.stderr) for n in range(self.blob_num, len(BLOBS)): if BLOBS[n].state == -1: BLOBS[n].run() break def addItem(self, json, path, cpath, urlopts=""): if path is not None: path = "/.fxid:" + cpath.split('/')[-1] # access by fxid, protects against renames if self.isCopy: # use a more efficient (3rd-party) xrootd copy into a single blob json['b'] = "%d:%d:%d" % (self.blob_num, 0, self.csize) self.items.append((json, None, None, None,)) self.config_copyjob("%s/%s" % (opt['-U'], path), "%s/%s" % (opt['-U'], cpath)) else: self.items.append((json, path, cpath, urlopts,)) self.run() def run(self): global blob_slots if self.state == -1: if blob_slots <= 0: return blob_slots -= 1 self.state = 0 # set to idle if self.state == 0: # is idle self.state_machine() def state_machine(self, cbargs=()): if len(cbargs) == 3 and self.state >= 0 and self.state <=9: st, resp, hotlist = cbargs i = self.items[self.currItem] self.continue_read(*cbargs) if self.state == 0: # idle self.pick_next_element() if self.state == 98: self.close() # This is the part that reads files or clones def pick_next_element(self): # only called with self.state == 0! item = None for i in range(self.currItem+1, len(self.items)): self.currItem = i if self.items[i][1] is not None: # this could be a non-File (e.g. Directory) entry item = self.items[i] # (jsonTag, path, cpath, urlopts="") break if item is None: if self.closing: self.state = 98 return self.sm_oldSize = self.size # remember where this file started in the blob # try copy the clone first self.sm_src = client.File() self.state = 1 # copy clone path = "%s/%s%s" % (opt['-U'], item[2], item[3]) st, xx = self.sm_src.open(path) if st.ok: if "-v" in opt: print(f"Copying clone {path} for {item[1]}", file=sys.stderr) else: # copy live file self.sm_src = client.File() path = "%s/%s%s" % (opt['-U'], item[1], item[3]) st, xx = self.sm_src.open(path) self.state = 2 # copying live file # start the first read self.sm_offset = 0 try: st = self.sm_src.read(offset=self.sm_offset, size=self.bufsz, callback=self.XrdCB) if "-v" in opt: print(f"1st read blob {self.blob_num} file {path} st.ok {st.ok} st.code {st.code}", file=sys.stderr) if st.ok: return # state machine continues after callback except Exception as e: exc_type, exc_obj, exc_tb = sys.exc_info() print(f"line {exc_tb.tb_lineno} error {e} reading {item[1]} @{self.sm_offset}:{self.bufsz}", file=sys.stderr) if hasattr(e, 'msg'): print(e.msg, file=sys.stderr) if hasattr(e, 'message'): print(e.message, file=sys.stderr) self.state = 0 self.state_machine() def continue_read(self, st, buff, hl): # handle callback with data if not st.ok: print(f"failed read {st} blob {self.blob_num} file {self.currItem} '{self.items[self.currItem]}'", file=sys.stderr) self.sm_src.close() self.state = 0 else: # read completed, buff holds data # sanity checks try: sz = int(eval(self.items[self.currItem][0]['st'])[stat.ST_SIZE]) if len(buff) > sz: raise AssertionError("buffer exceeds file size") if self.sm_offset > sz: raise AssertionError("sm_offset %d exceeds file size: %s" %(self.sm_offset, self.items[self.currItem][0])) if self.size-self.sm_oldSize > sz: raise AssertionError("blob space used exceeds file size") except Exception as e: exc_type, exc_obj, exc_tb = sys.exc_info() print(f"line {exc_tb.tb_lineno} error {e}", file=sys.stderr) pdb.set_trace() if len(buff) > 0: # write buffer, read next one st2, xx = self.bfile.write(buff, offset=self.size) self.size += len(buff) self.sm_offset += len(buff) # sanity check try: sz = int(eval(self.items[self.currItem][0]['st'])[stat.ST_SIZE]) if self.sm_offset > sz: raise AssertionError("sm_offset %d exceeds file size: %s" %(self.sm_offset, self.items[self.currItem][0])) except Exception as e: exc_type, exc_obj, exc_tb = sys.exc_info() print(f"line {exc_tb.tb_lineno} error {e}", file=sys.stderr) pdb.set_trace() st2 = self.sm_src.read(offset=self.sm_offset, size=self.bufsz, callback=self.XrdCB) else: # a zero length read signals EOF item = self.items[self.currItem] rc = st.errno self.sm_src.close() item[0]['b'] = "%d:%d:%d" % (self.blob_num, self.sm_oldSize, self.size-self.sm_oldSize) if self.state == 2: # finished live file, try the clone again st2, xx = self.sm_src.open("%s/%s%s" % (opt['-U'], item[2], item[3])) if not st2.ok: # can't read clone, the live file be it then self.state = 0 else: # clone should overwrite what we just copied self.state = 3 # override with clone self.sm_offset = 0 self.size = self.sm_oldSize self.bfile.truncate(self.size) try: st2 = self.sm_src.read(offset=self.sm_offset, size=self.bufsz, callback=self.XrdCB) if "-v" in opt: print("1st read clone ok", st.ok, "code", st.code, file=sys.stderr) except Exception as e: exc_type, exc_obj, exc_tb = sys.exc_info() print(f"line {exc_tb.tb_lineno} error {e} reading {item[2]} @{self.sm_offset}:{self.bufsz}", file=sys.stderr) if hasattr(e, 'msg'): print(e.msg, file=sys.stderr) if hasattr(e, 'message'): print(e.message, file=sys.stderr) self.state = 0 elif self.state == 3: self.state = 0 else: self.state = 0 def copy_xrdcp(path, cpath, blob): startpos = blob.size # blob.bfile.tell() would be useless, pertubated by subprocess writes # try copy the clone first p1 = subprocess.run(["xrdcp", "-s", "%s/%s" % (opt['-U'], cpath), "-"], stdout=blob.bfile.fileno(), stderr=subprocess.PIPE) x1 = p1.stderr rc = p1.returncode if rc > 256: rc = rc >> 8 # recover xrdcp return code if rc == 0: # 54 would mean clone not found blob.size = os.lseek(blob.bfile.fileno(), 0, 1) # get current (new) position else: # copy the live file p2 = subprocess.run(["xrdcp", "-s", "%s/%s" % (opt['-U'], path), "-"], stdout=blob.bfile.fileno(), stderr=subprocess.PIPE) x2 = p2.stderr rc = p2.returncode if rc > 256: rc = rc >> 8 # recover xrdcp return code copied_live = True newpos = os.lseek(blob.bfile.fileno(), 0, 1) # get current (new) position # try the clone again os.lseek(blob.bfile.fileno(), startpos, 0) # rewind to previous pos p3 = subprocess.run(["xrdcp", "-s", "%s/%s" % (opt['-U'], cpath), "-"], stdout=blob.bfile.fileno(), stderr=subprocess.PIPE) x3 = p3.stderr rc = p3.returncode if rc > 256: rc = rc >> 8 # recover xrdcp return code if rc > 0: # clone still does not exist os.lseek(blob.bfile.fileno(), newpos, 0) # rewind to previous pos blob.size = newpos else: copied_live = False # correction: copied the clone after all blob.size = os.lseek(blob.bfile.fileno(), 0, 1) # new position # truncate whatever the live copy may have written if blob.size < newpos: os.ftruncate(blob.bfile.fileno(), blob.size) return rc class MyCopyProgressHandler(client.utils.CopyProgressHandler): def begin(self, id, total, source, target): if "-v" in opt: print("%s CopyProgress %s/%s" % (time.strftime("%Y%m%d %H%M%S"), id, total), source, target, file=sys.stderr) def end(self, status): if "-v" in opt: print("CopyProgress end", file=sys.stderr) #print('end status:', status, file=sys.stderr) def update(self, processed, total): if "-v" in opt: print("CopyProgress update", file=sys.stderr) #print('processed: %d, total: %d' % (processed, total), file=sys.stderr) def myCopyprocess(): try: if len(copyjobs) > 0: if "-v" in opt: print("running %d xrootd copies" % len(copyjobs), file=sys.stderr) copyprocess.parallel(8) copyprocess.prepare() copies = [None] try: copies = copyprocess.run(MyCopyProgressHandler()) except Exception as e: exc_type, exc_obj, exc_tb = sys.exc_info() print("Exception %s %r running copy jobs in line %d:" % (type(e), e, exc_tb.tb_lineno), file=sys.stderr) traceback.print_tb(exc_tb, None, sys.stderr) xst = copies[0] if xst == None: raise NameError if "-v" in opt: print("xrootd copies first pass finished, ok=%r message='%s'" % (xst.ok, xst.message), file=sys.stderr) copyjobs2=[] copyprocess2 = client.CopyProcess() try: sts = copies[1] for i in range(len(sts)): st = sts[i]['status'] if not st.ok: blob = copyjobs[i] print(blob.src[blob.srcIndex], "to", blob.trg, ": ok", st.ok, st.message, file=sys.stderr) blob.srcIndex = (blob.srcIndex+1) % len(blob.src) # cycle through (=flip) sources copyjobs2.append(blob) print("retry as:", blob.src[blob.srcIndex], "to", blob.trg, file=sys.stderr) copyprocess2.add_job(blob.src[blob.srcIndex], blob.trg, force=True) except Exception as e: exc_type, exc_obj, exc_tb = sys.exc_info() print("Exception %s re-running copy jobs in line %d:" % (type(e), exc_tb.tb_lineno), file=sys.stderr) for cc in copies[1]: print(cc, file=sys.stderr) if len(copyjobs2) > 0: if "-v" in opt: print("re-running %d xrootd copies" % len(copyjobs2), file=sys.stderr) copyprocess2.prepare() xst = None try: copies2 = copyprocess2.run(MyCopyProgressHandler()) xst = copies2[0] except Exception as e: exc_type, exc_obj, exc_tb = sys.exc_info() print("Exception %s %r retrying copy jobs in line %d" % (type(e), e, exc_tb.tb_lineno), file=sys.stderr) if xst is None: for blob in copyjobs2: print("failed:", blob.src[0], "or", blob.src[1], "to", blob.trg, file=sys.stderr) elif not xst.ok: print("failed to re-run copy jobs", xst, file=sys.stderr) try: for i in range(len(copyjobs2)): st = copies2[1][i]['status'] if not st.ok: blob = copyjobs2[i] print("failed:", blob.src[0], "or", blob.src[1], "to", blob.trg, file=sys.stderr) except Exception as e: exc_type, exc_obj, exc_tb = sys.exc_info() print("Exception %s running copy jobs in line %d:" % (type(e), exc_tb.tb_lineno), file=sys.stderr) for cc in copies2[1]: print(cc, file=sys.stderr) except Exception as e: exc_type, exc_obj, exc_tb = sys.exc_info() print("Oops %s %r running copy jobs in line %d:" % (type(e), e, exc_tb.tb_lineno), file=sys.stderr) traceback.print_tb(exc_tb, None, sys.stderr) # perform a clone unless a catalogFile has been given, then back up files def do_backup(): xxxdebugxxx = True b_Xroot = canXroot and opt['-B'].startswith("root:") if '-U' not in opt and not b_Xroot: try: opt['-U'] = os.environ['EOS_MGM_URL'] if opt['-U'] == '': raise KeyError except KeyError: print("need to specify 'U' or define EOS_MGM_URL in the environment", file=sys.stderr) sys.exit(1) if '-F' in opt: catalogFile = opt['-F'] else: cloneId, catalogFile = do_clone("backup") c_Xroot = canXroot and catalogFile.startswith("root:") try: if c_Xroot: f = client.File() f.open(catalogFile) else: f = open(catalogFile, "r") except IOError as e: print("cannot open catalog: %s" % e, file=sys.stderr) sys.exit(5) #findOut = f.readlines() line1 = f.readline() jj = json.loads(line1) cloneId = jj['c'] rootPath = jj['n'] backupDir = "%s/%s/" % (opt['-B'].rstrip('/'), cloneId) if b_Xroot: b_fs = client.FileSystem(backupDir) xx = re.match("^root://\S+/(/\S+)", backupDir) backupDirPath = xx.group(1) b_fs.mkdir(backupDirPath, MkDirFlags.MAKEPATH) newCatalog = client.File() status, xxx = newCatalog.open(backupDir + "catalog", OpenFlags.NEW) newCatalog.write(line1, offset=0) newCatPos = len(line1) else: if not os.path.isdir(backupDir): os.mkdir(backupDir) newCatalog = open(backupDir + "catalog", "w") newCatalog.write(line1) num_files = 0 for n in range(3): blobs_active.append(Blob(backupDir)) print("cloneId %s backup_media %s" % (cloneId, backupDir[:-1])) cloneDir = "/eos/%s/proc/clone/" % (eos_instance[3:] if eos_instance.startswith("eos") else eos_instance) while not stop_requested: while True: with blobCV: tblob = None # shouldn't be needed! if len(blobq) == 0: break tblob, cbargs = blobq.pop(0) tblob.state_machine(cbargs) l = f.readline() if len(l) == 0: break curr_blob = blobs_active[int(num_files/1) % len(blobs_active)] jj = json.loads(l) fCloneId = jj['c'] # id of clone the file belongs to, might be from an earlier dump path = jj['n'] if not path.endswith('/'): # this is a file clonePath = "%s/%s" % (fCloneId, jj['p']) stime = jj['t'] # jj['st']: "(st_mode, st_ino, st_dev, st_nlink, st_uid, st_gid, st_size, st_atime, st_mtime, st_ctime)" # Insert catalog-only entry for a dir, a file unmodified in incremental dump, or a symlink if path.endswith('/') or fCloneId != cloneId or 'S' in jj: if b_Xroot: curr_blob.addItem(jj, None, None, "") else: newCatalog.write(l) continue # this is a plain file num_files += 1 st = eval(jj['st']) curr_blob, tblob = curr_blob.getBlob(st[stat.ST_SIZE], noCopy='H' in jj) cpath = cloneDir + clonePath o_opt = '' if 'H' in jj: # problem encountered: cksum on git hard-links not (always?) updated ?? o_opt = "?eos.checksum=ignore" if "-v" in opt: print("Hard link %s copied with %s" % (path, o_opt), file=sys.stderr) if tblob.isCopy or tblob.blXroot: # add file to the list tblob.addItem(jj, path, cpath, o_opt) else: startpos = tblob.size # blob.bfile.tell() would be useless, pertubated by subprocess writes copy_xrdcp(path, cpath, tblob) jj['b'] = "%d:%d:%d" % (tblob.blob_num, startpos, tblob.size-startpos) jxx=json.dumps(jj) + "\n" newCatalog.write(jxx) if tblob is not curr_blob: # an individual blob for a BIG file tblob.close() f.close() for b in blobs_active: b.close() copythread = threading.Thread(target=myCopyprocess) copythread.start() if b_Xroot: for b in BLOBS: while (not b.isCopy) and b.state != 99: with blobCV: if blobCV.wait_for(lambda: len(blobq) > 0, timeout=5): tblob, cbargs = blobq.pop(0) else: tblob = None if tblob: tblob.state_machine(cbargs) elif "-v" in opt: print(f"Waiting for close on blob {b.blob_num} closing {b.closing} state {b.state} current item {b.currItem+1} (of {len(b.items)})", file=sys.stderr) if "-v" in opt: print(f"blob {b.blob_num} state {b.state} items {len(b.items)}", file=sys.stderr) for i in b.items: jxx = json.dumps(i[0]) + "\n" newCatalog.write(jxx, offset=newCatPos) newCatPos += len(jxx) copythread.join() newCatalog.close() cmd = "eos oldfind -f -x sys.clone=-%s %s > /dev/null" % (cloneId, args[0] if len(args) > 0 else rootPath) os.system(cmd) # perform a restore based on multiple catalogFiles def do_restore(): trg = args[0] + "/" catalogFiles = opt['-F'].split(',') cat = dict() backupDirs = dict() # build a dictionary of files to be restored, implementing the logic to select most recent files for fn in catalogFiles: lastCatFile = fn == catalogFiles[-1] c_Xroot = canXroot and fn.startswith("root:") if c_Xroot: cFile = client.File() cFile.open(fn) else: cFile = open(fn, "r") line1 = cFile.readline() jj = json.loads(line1) cloneId = jj['c'] rootPath = jj['n'] backupDirs[cloneId] = os.path.dirname(fn) lineNum = 0 while True: if lineNum > 0: # next line (except 1st) l = cFile.readline() if len(l) == 0: break try: jj = json.loads(l) except Exception as e: print("error %r line %d ='%s'" % (e, lineNum, l), file=sys.stderr) sys.exit(1) lineNum += 1 path = jj['n'] stime = jj['t'] rpath = path[len(rootPath):] # skip the "root" part if rpath in cat and not path.endswith('/'): if stime < cat[rpath]['stime'] or jj['c'] != cloneId: # file exists but not part of this backup if lastCatFile: cat[rpath]['keep'] = True continue # restore latest version else: cat[rpath] = dict() cat[rpath]['stime'] = stime cat[rpath]['c'] = cloneId for k in ['b', 'S', 'H', 'L']: if k in jj: cat[rpath][k] = jj[k] if lastCatFile: cat[rpath]['keep'] = True cFile.close() # restore files for p in sorted(cat.keys()): # sorted so that dirs come before their files if stop_requested: break if "/...eos.ino..." in p: continue # not a plain file: a deleted hard link target if not 'keep' in cat[p]: continue if p.endswith('/') or p == '': # a directory os.makedirs(trg + p) elif 'S' in cat[p]: # this is a symlink os.symlink(cat[p]['S'], f"{trg}/{p}") else: try: bIndex, bOffset, bLen = map(int, cat[p]['b'].split(":")) blobFn = "%s/b.%d" % (backupDirs[cat[p]['c']], bIndex) bl_Xroot = canXroot and blobFn.startswith("root:") if bl_Xroot: blobFile = client.File() st, xx = blobFile.open(blobFn) blobOff = bOffset fileOff = 0 if not st.ok: print(f"Failed to open blob {blobFn}: {st.message}", file=sys.stderr) raise IOError else: blobFile = open(blobFn, "rb") blobFile.seek(bOffset, 0) trgFile = open("%s/%s" % (trg, p), "wb") bufsz = 1<<20 # 1 MB while bLen > 0: if bufsz > bLen: bufsz = bLen bLen -= bufsz if bl_Xroot: st, buff = blobFile.read(offset=blobOff, size=bufsz) trgFile.write(buff) blobOff += bufsz fileOff += bufsz else: trgFile.write(blobFile.read(bufsz)) blobFile.close() trgFile.close() except Exception as e: exc_type, exc_obj, exc_tb = sys.exc_info() print("Exception %s in line %d:" % (type(e), exc_tb.tb_lineno), file=sys.stderr) print(f"p {p} bl_Xroot {bl_Xroot} blobFn {blobFn}", file=sys.stderr) if bl_Xroot: print(f"offset {blobOff} size {bufsz}", file=sys.stderr) if p in cat: print(f"cat: {cat[p]}", file=sys.stderr) # main if subcmd == 'clone': do_clone(subcmd) elif subcmd == 'backup': do_backup() elif subcmd == 'restore': do_restore() else: print("incorrect argument", subcmd, file=sys.stderr) sys.exit(1)