"""Takes a 4 byte integer and converts it into a string of 4 characters.
Returns the characters in a string."""
a = array.array('B')
- a.append((myint >> 24 ) & 0xff)
- a.append((myint >> 16 ) & 0xff)
- a.append((myint >> 8 ) & 0xff)
+ a.append((myint >> 24) & 0xff)
+ a.append((myint >> 16) & 0xff)
+ a.append((myint >> 8) & 0xff)
a.append(myint & 0xff)
try:
- # Python >=3.2
+ # Python >= 3.2
return a.tobytes()
except AttributeError:
return a.tostring()
myint += mystring[0] << 24
return myint
-def xpak(rootdir,outfile=None):
- """(rootdir,outfile) -- creates an xpak segment of the directory 'rootdir'
+def xpak(rootdir, outfile=None):
+ """(rootdir, outfile) -- creates an xpak segment of the directory 'rootdir'
and under the name 'outfile' if it is specified. Otherwise it returns the
xpak segment."""
- mylist=[]
+ mylist = []
addtolist(mylist, rootdir)
mylist.sort()
del mydata_encoded
indexglob = b''
- indexpos=0
+ indexpos = 0
dataglob = b''
- datapos=0
+ datapos = 0
for x, newglob in mydata.items():
- mydatasize=len(newglob)
- indexglob=indexglob+encodeint(len(x))+x+encodeint(datapos)+encodeint(mydatasize)
- indexpos=indexpos+4+len(x)+4+4
- dataglob=dataglob+newglob
- datapos=datapos+mydatasize
+ mydatasize = len(newglob)
+ indexglob = indexglob + encodeint(len(x)) + x + encodeint(datapos) + encodeint(mydatasize)
+ indexpos = indexpos + 4 + len(x) + 4 + 4
+ dataglob = dataglob + newglob
+ datapos = datapos + mydatasize
return b'XPAKPACK' \
- + encodeint(len(indexglob)) \
- + encodeint(len(dataglob)) \
- + indexglob \
- + dataglob \
- + b'XPAKSTOP'
+ + encodeint(len(indexglob)) \
+ + encodeint(len(dataglob)) \
+ + indexglob \
+ + dataglob \
+ + b'XPAKSTOP'
def xsplit(infile):
"""(infile) -- Splits the infile into two files.
encoding=_encodings['fs'], errors='strict')
myfile = open(_unicode_encode(infile,
encoding=_encodings['fs'], errors='strict'), 'rb')
- mydat=myfile.read()
+ mydat = myfile.read()
myfile.close()
splits = xsplit_mem(mydat)
return None
if mydat[-8:] != b'XPAKSTOP':
return None
- indexsize=decodeint(mydat[8:12])
- return (mydat[16:indexsize+16], mydat[indexsize+16:-8])
+ indexsize = decodeint(mydat[8:12])
+ return (mydat[16:indexsize + 16], mydat[indexsize + 16:-8])
def getindex(infile):
"""(infile) -- grabs the index segment from the infile and returns it."""
myfile = open(_unicode_encode(infile,
encoding=_encodings['fs'], errors='strict'), 'rb')
- myheader=myfile.read(16)
+ myheader = myfile.read(16)
if myheader[0:8] != b'XPAKPACK':
myfile.close()
return
- indexsize=decodeint(myheader[8:12])
- myindex=myfile.read(indexsize)
+ indexsize = decodeint(myheader[8:12])
+ myindex = myfile.read(indexsize)
myfile.close()
return myindex
def getboth(infile):
"""(infile) -- grabs the index and data segments from the infile.
- Returns an array [indexSegment,dataSegment]"""
+ Returns an array [indexSegment, dataSegment]"""
myfile = open(_unicode_encode(infile,
encoding=_encodings['fs'], errors='strict'), 'rb')
- myheader=myfile.read(16)
+ myheader = myfile.read(16)
if myheader[0:8] != b'XPAKPACK':
myfile.close()
return
- indexsize=decodeint(myheader[8:12])
- datasize=decodeint(myheader[12:16])
- myindex=myfile.read(indexsize)
- mydata=myfile.read(datasize)
+ indexsize = decodeint(myheader[8:12])
+ datasize = decodeint(myheader[12:16])
+ myindex = myfile.read(indexsize)
+ mydata = myfile.read(datasize)
myfile.close()
return myindex, mydata
def getindex_mem(myindex):
"""Returns the filenames listed in the indexglob passed in."""
- myindexlen=len(myindex)
- startpos=0
- myret=[]
- while ((startpos+8)<myindexlen):
- mytestlen=decodeint(myindex[startpos:startpos+4])
- myret=myret+[myindex[startpos+4:startpos+4+mytestlen]]
- startpos=startpos+mytestlen+12
+ myindexlen = len(myindex)
+ startpos = 0
+ myret = []
+ while ((startpos + 8) < myindexlen):
+ mytestlen = decodeint(myindex[startpos:startpos + 4])
+ myret = myret + [myindex[startpos + 4:startpos + 4 + mytestlen]]
+ startpos = startpos + mytestlen + 12
return myret
-def searchindex(myindex,myitem):
- """(index,item) -- Finds the offset and length of the file 'item' in the
+def searchindex(myindex, myitem):
+ """(index, item) -- Finds the offset and length of the file 'item' in the
datasegment via the index 'index' provided."""
myitem = _unicode_encode(myitem,
encoding=_encodings['repo.content'], errors='backslashreplace')
- mylen=len(myitem)
- myindexlen=len(myindex)
- startpos=0
- while ((startpos+8)<myindexlen):
- mytestlen=decodeint(myindex[startpos:startpos+4])
- if mytestlen==mylen:
- if myitem==myindex[startpos+4:startpos+4+mytestlen]:
+ mylen = len(myitem)
+ myindexlen = len(myindex)
+ startpos = 0
+ while ((startpos + 8) < myindexlen):
+ mytestlen = decodeint(myindex[startpos:startpos + 4])
+ if mytestlen == mylen:
+ if myitem == myindex[startpos + 4:startpos + 4 + mytestlen]:
#found
- datapos=decodeint(myindex[startpos+4+mytestlen:startpos+8+mytestlen]);
- datalen=decodeint(myindex[startpos+8+mytestlen:startpos+12+mytestlen]);
+ datapos = decodeint(myindex[startpos + 4 + mytestlen:startpos + 8 + mytestlen])
+ datalen = decodeint(myindex[startpos + 8 + mytestlen:startpos + 12 + mytestlen])
return datapos, datalen
- startpos=startpos+mytestlen+12
+ startpos = startpos + mytestlen + 12
-def getitem(myid,myitem):
- myindex=myid[0]
- mydata=myid[1]
- myloc=searchindex(myindex,myitem)
+def getitem(myid, myitem):
+ myindex = myid[0]
+ mydata = myid[1]
+ myloc = searchindex(myindex, myitem)
if not myloc:
return None
- return mydata[myloc[0]:myloc[0]+myloc[1]]
+ return mydata[myloc[0]:myloc[0] + myloc[1]]
-def xpand(myid,mydest):
+def xpand(myid, mydest):
mydest = normalize_path(mydest) + os.sep
- myindex=myid[0]
- mydata=myid[1]
- myindexlen=len(myindex)
- startpos=0
- while ((startpos+8)<myindexlen):
- namelen=decodeint(myindex[startpos:startpos+4])
- datapos=decodeint(myindex[startpos+4+namelen:startpos+8+namelen]);
- datalen=decodeint(myindex[startpos+8+namelen:startpos+12+namelen]);
- myname=myindex[startpos+4:startpos+4+namelen]
+ myindex = myid[0]
+ mydata = myid[1]
+ myindexlen = len(myindex)
+ startpos = 0
+ while ((startpos + 8) < myindexlen):
+ namelen = decodeint(myindex[startpos:startpos + 4])
+ datapos = decodeint(myindex[startpos + 4 + namelen:startpos + 8 + namelen])
+ datalen = decodeint(myindex[startpos + 8 + namelen:startpos + 12 + namelen])
+ myname = myindex[startpos + 4:startpos + 4 + namelen]
myname = _unicode_decode(myname,
encoding=_encodings['repo.content'], errors='replace')
filename = os.path.join(mydest, myname.lstrip(os.sep))
os.makedirs(dirname)
mydat = open(_unicode_encode(filename,
encoding=_encodings['fs'], errors='strict'), 'wb')
- mydat.write(mydata[datapos:datapos+datalen])
+ mydat.write(mydata[datapos:datapos + datalen])
mydat.close()
- startpos=startpos+namelen+12
+ startpos = startpos + namelen + 12
class tbz2(object):
- def __init__(self,myfile):
- self.file=myfile
- self.filestat=None
+ def __init__(self, myfile):
+ self.file = myfile
+ self.filestat = None
self.index = b''
- self.infosize=0
- self.xpaksize=0
- self.indexsize=None
- self.datasize=None
- self.indexpos=None
- self.datapos=None
-
- def decompose(self,datadir,cleanup=1):
+ self.infosize = 0
+ self.xpaksize = 0
+ self.indexsize = None
+ self.datasize = None
+ self.indexpos = None
+ self.datapos = None
+
+ def decompose(self, datadir, cleanup=1):
"""Alias for unpackinfo() --- Complement to recompose() but optionally
deletes the destination directory. Extracts the xpak from the tbz2 into
the directory provided. Raises IOError if scan() fails.
if not os.path.exists(datadir):
os.makedirs(datadir)
return self.unpackinfo(datadir)
- def compose(self,datadir,cleanup=0):
+ def compose(self, datadir, cleanup=0):
"""Alias for recompose()."""
- return self.recompose(datadir,cleanup)
+ return self.recompose(datadir, cleanup)
def recompose(self, datadir, cleanup=0, break_hardlinks=True):
"""Creates an xpak segment from the datadir provided, truncates the tbz2
encoding=_encodings['fs'], errors='strict'), 'ab+')
if not myfile:
raise IOError
- myfile.seek(-self.xpaksize,2) # 0,2 or -0,2 just mean EOF.
+ myfile.seek(-self.xpaksize, 2) # 0,2 or -0,2 just mean EOF.
myfile.truncate()
- myfile.write(xpdata+encodeint(len(xpdata)) + b'STOP')
+ myfile.write(xpdata + encodeint(len(xpdata)) + b'STOP')
myfile.flush()
myfile.close()
return 1
"""Scans the tbz2 to locate the xpak segment and setup internal values.
This function is called by relevant functions already."""
try:
- mystat=os.stat(self.file)
+ mystat = os.stat(self.file)
if self.filestat:
- changed=0
+ changed = 0
if mystat.st_size != self.filestat.st_size \
or mystat.st_mtime != self.filestat.st_mtime \
or mystat.st_ctime != self.filestat.st_ctime:
changed = True
if not changed:
return 1
- self.filestat=mystat
+ self.filestat = mystat
a = open(_unicode_encode(self.file,
encoding=_encodings['fs'], errors='strict'), 'rb')
- a.seek(-16,2)
- trailer=a.read()
- self.infosize=0
- self.xpaksize=0
+ a.seek(-16, 2)
+ trailer = a.read()
+ self.infosize = 0
+ self.xpaksize = 0
if trailer[-4:] != b'STOP':
a.close()
return 0
if trailer[0:8] != b'XPAKSTOP':
a.close()
return 0
- self.infosize=decodeint(trailer[8:12])
- self.xpaksize=self.infosize+8
- a.seek(-(self.xpaksize),2)
- header=a.read(16)
+ self.infosize = decodeint(trailer[8:12])
+ self.xpaksize = self.infosize + 8
+ a.seek(-(self.xpaksize), 2)
+ header = a.read(16)
if header[0:8] != b'XPAKPACK':
a.close()
return 0
- self.indexsize=decodeint(header[8:12])
- self.datasize=decodeint(header[12:16])
- self.indexpos=a.tell()
- self.index=a.read(self.indexsize)
- self.datapos=a.tell()
+ self.indexsize = decodeint(header[8:12])
+ self.datasize = decodeint(header[12:16])
+ self.indexpos = a.tell()
+ self.index = a.read(self.indexsize)
+ self.datapos = a.tell()
a.close()
return 2
except SystemExit:
return None
return getindex_mem(self.index)
- def getfile(self,myfile,mydefault=None):
+ def getfile(self, myfile, mydefault=None):
"""Finds 'myfile' in the data segment and returns it."""
if not self.scan():
return None
- myresult=searchindex(self.index,myfile)
+ myresult = searchindex(self.index, myfile)
if not myresult:
return mydefault
a = open(_unicode_encode(self.file,
encoding=_encodings['fs'], errors='strict'), 'rb')
- a.seek(self.datapos+myresult[0],0)
- myreturn=a.read(myresult[1])
+ a.seek(self.datapos + myresult[0], 0)
+ myreturn = a.read(myresult[1])
a.close()
return myreturn
- def getelements(self,myfile):
+ def getelements(self, myfile):
"""A split/array representation of tbz2.getfile()"""
- mydat=self.getfile(myfile)
+ mydat = self.getfile(myfile)
if not mydat:
return []
return mydat.split()
- def unpackinfo(self,mydest):
+ def unpackinfo(self, mydest):
"""Unpacks all the files from the dataSegment into 'mydest'."""
if not self.scan():
return 0
encoding=_encodings['fs'], errors='strict'), 'rb')
if not os.path.exists(mydest):
os.makedirs(mydest)
- startpos=0
- while ((startpos+8)<self.indexsize):
- namelen=decodeint(self.index[startpos:startpos+4])
- datapos=decodeint(self.index[startpos+4+namelen:startpos+8+namelen]);
- datalen=decodeint(self.index[startpos+8+namelen:startpos+12+namelen]);
- myname=self.index[startpos+4:startpos+4+namelen]
+ startpos = 0
+ while ((startpos + 8) < self.indexsize):
+ namelen = decodeint(self.index[startpos:startpos + 4])
+ datapos = decodeint(self.index[startpos + 4 + namelen:startpos + 8 + namelen])
+ datalen = decodeint(self.index[startpos + 8 + namelen:startpos + 12 + namelen])
+ myname = self.index[startpos + 4:startpos + 4 + namelen]
myname = _unicode_decode(myname,
encoding=_encodings['repo.content'], errors='replace')
filename = os.path.join(mydest, myname.lstrip(os.sep))
os.makedirs(dirname)
mydat = open(_unicode_encode(filename,
encoding=_encodings['fs'], errors='strict'), 'wb')
- a.seek(self.datapos+datapos)
+ a.seek(self.datapos + datapos)
mydat.write(a.read(datalen))
mydat.close()
- startpos=startpos+namelen+12
+ startpos = startpos + namelen + 12
a.close()
return 1
a = open(_unicode_encode(self.file,
encoding=_encodings['fs'], errors='strict'), 'rb')
mydata = {}
- startpos=0
- while ((startpos+8)<self.indexsize):
- namelen=decodeint(self.index[startpos:startpos+4])
- datapos=decodeint(self.index[startpos+4+namelen:startpos+8+namelen]);
- datalen=decodeint(self.index[startpos+8+namelen:startpos+12+namelen]);
- myname=self.index[startpos+4:startpos+4+namelen]
- a.seek(self.datapos+datapos)
+ startpos = 0
+ while ((startpos + 8) < self.indexsize):
+ namelen = decodeint(self.index[startpos:startpos + 4])
+ datapos = decodeint(self.index[startpos + 4 + namelen:startpos + 8 + namelen])
+ datalen = decodeint(self.index[startpos + 8 + namelen:startpos + 12 + namelen])
+ myname = self.index[startpos + 4:startpos + 4 + namelen]
+ a.seek(self.datapos + datapos)
mydata[myname] = a.read(datalen)
- startpos=startpos+namelen+12
+ startpos = startpos + namelen + 12
a.close()
return mydata
def getboth(self):
- """Returns an array [indexSegment,dataSegment]"""
+ """Returns an array [indexSegment, dataSegment]"""
if not self.scan():
return None
a = open(_unicode_encode(self.file,
encoding=_encodings['fs'], errors='strict'), 'rb')
a.seek(self.datapos)
- mydata =a.read(self.datasize)
+ mydata = a.read(self.datasize)
a.close()
return self.index, mydata
-