# # Class to represent a PKZIP archive on disk # Written 19th January 1998, Wim Lewis # Release 1 --- 20th January 1998 # """PKZIP File representation class An instance of Zipfile represents one .zip archive. Example: zf = Zipfile() # creates a zipfile object zf.read(open('blegga.zip')) # finds and reads the end-of-directory marker fp = zf.openfile('foo.txt') # returns a file-like object which reads the named archive member. The classes Subfile and InflatingSubfile are implementations of the file object protocol which read a segment of an underlying file (optionally compressed). """ # Handy routines for reading integers # NOTE! These are in little-endian (non-network) byte order! def read_u2(fp): u2 = fp.read(2) return ord(u2[0]) + 256 * ord(u2[1]) def read_u4(fp): u4 = fp.read(4) top = ord(u4[3]) if top > 64: top = long(top) return ord(u4[0]) + 256 * ( ord(u4[1]) + 256 * ( ord(u4[2]) + 256 * top ) ) error = 'zipfile.error' class Zipfile: """Zipfile represents a PKZIP archive. Methods: zf = Zipfile(fp) Creates a new instance, reading the file-object fp zf.read(fp) Useful if fp was omitted from the initialization zf.directory() Reads the archive directory if necessary, and returns a list of DirEntry instances zf.names() Returns a list of archive member names. zf.findfile('name.txt') Returns a DirEntry corresponding to the named archive member, or raises an exception. zf.openfile('name.txt') Returns a file-like object with which the named archive member can be read Internal classes: DirEntry represents a central-directory entry. Useful members are 'filename' (a string) and 'getfile()' (which returns the corresponding ZipMember instance). ZipMember represents a file in the zip archive. Useful method is 'open()', which returns a file-like object which reads this ZipFile's contents. """ TOC_SEEK_STEP = 2048 def __init__(self, fp=None): self.dir = None self.dirend = None if fp != None: self.read(fp) def read(self, fp): self.fp = fp # Seek backwards from the end, looking for the # table-of-contents magic number import string keep = '' dirmagic = self.DirEnd.magic try: fp.seek( - self.TOC_SEEK_STEP, 2 ) except IOError: fp.seek(0) while 1: curpos = fp.tell() block = fp.read(self.TOC_SEEK_STEP) + keep ix = string.rfind(block, dirmagic) if ix != -1: ixpos = curpos + ix break if block[:3] == dirmagic[-3:]: keep = block[:3] elif block[:2] == dirmagic[-2:]: keep = block[:2] elif block[:1] == dirmagic[-1:]: keep = block[:1] else: keep = '' if curpos > self.TOC_SEEK_STEP: curpos = curpos - self.TOC_SEEK_STEP elif curpos > 0: curpos = 0 else: raise error, 'ZIP Central Directory not found' fp.seek(curpos, 0); fp.seek(ixpos) self.dirend = self.DirEnd() self.dirend.read(self.fp) def directory(self): if self.dir == None: if self.dirend.dirdisk != 0 or \ self.dirend.disknum != 0 or \ self.dirend.thisdircount != self.dirend.dircount: raise error, 'Multi-disk ZIP archives not supported, sorry' self.fp.seek(self.dirend.diroffset) self.dir = [ ] while len(self.dir) < self.dirend.dircount: ent = self.DirEntry() ent.read(self.fp) self.dir.append(ent) return self.dir def findfile(self, name): for ent in self.directory(): if ent.filename == name: return ent raise KeyError, 'file ' + name + ' not found in archive' def openfile(self, name): ent = self.findfile(name) loc = ent.getfile() return loc.open() def names(self): return map(lambda x: x.filename, self.directory()) class DirEntry: def read(self, fp): magic = fp.read(4) if magic != 'PK\x01\x02': raise error, 'Bad magic for directory entry' self.creator = read_u2(fp) self.extractor = read_u2(fp) self.flags = read_u2(fp) self.algorithm = read_u2(fp) modtime = read_u2(fp) moddate = read_u2(fp) self.mtime = MSDOSDate( moddate, modtime ) self.crc = fp.read(4) self.csize = read_u4(fp) self.size = read_u4(fp) namelen = read_u2(fp) eflen = read_u2(fp) commentlen = read_u2(fp) self.startdisk = read_u2(fp) self.iattrs = read_u2(fp) self.attrs = read_u4(fp) self.header_offset = read_u4(fp) self.filename = fp.read(namelen) self.extra = fp.read(eflen) self.comment = fp.read(commentlen) self.localfile = None self.fp = fp def getfile(self): if self.localfile == None: self.fp.seek(self.header_offset) local = Zipfile.ZipMember() local.read(self.fp) self.localfile = local return self.localfile class ZipMember: def read(self, fp): magic = fp.read(4) if magic != 'PK\x03\x04': raise error, 'Bad magic for zip file member' self.extractor = read_u2(fp) self.flags = read_u2(fp) self.algorithm = read_u2(fp) modtime = read_u2(fp) moddate = read_u2(fp) self.mtime = MSDOSDate( moddate, modtime ) self.crc = fp.read(4) self.csize = read_u4(fp) self.size = read_u4(fp) fnlen = read_u2(fp) eflen = read_u2(fp) self.filename = fp.read(fnlen) self.extra = fp.read(eflen) self.datastart = fp.tell() self.fp = fp def open(self): # Only supports algorithms 0 and 8, but they're pretty # much the only algorithms used any more. if self.algorithm == 0: return Subfile(self.fp, self.datastart, self.csize) if self.algorithm == 8: return InflatingSubfile(self.fp, self.datastart, self.csize) raise error, 'ZIP algorithm %d not supported' % self.algorithm class DirEnd: magic = 'PK\x05\x06' def read(self, fp, magic = None): if magic == None: magic = fp.read(4) if magic != self.magic: raise error, 'Bad magic for master directory' self.disknum = read_u2(fp) self.dirdisk = read_u2(fp) self.thisdircount = read_u2(fp) self.dircount = read_u2(fp) self.dirsize = read_u4(fp) self.diroffset = read_u4(fp) commentlen = read_u2(fp) self.comment = fp.read(commentlen) self.fp = fp # # Partial, read-only subfile object # class Subfile: def __init__(self, fp, start, length): self.fp = fp self.offset = start self.length = length self.seekpos = 0 def close(self): self.fp = None def flush(self): self.fp.flush() def isatty(self): return 0 def read(self, size = -1): if size < 0 or ( size + self.seekpos ) > self.length: size = self.length - self.seekpos self.fp.seek(self.offset + self.seekpos) retval = self.fp.read(size) self.seekpos = self.fp.tell() - self.offset return retval def readline(self, size = -1): if size < 0 or ( size + self.seekpos ) > self.length: size = self.length - self.seekpos self.fp.seek(self.offset + self.seekpos) if size == 0: return '' retval = self.fp.readline(size) self.seekpos = self.fp.tell() - self.offset return retval def seek(self, pos, whence=0): if whence == 0: self.offset = pos elif whence == 1: self.offset = self.offset + pos elif whence == 2: self.offset = self.offset + self.length + pos if self.offset > self.length: self.offset = self.length raise IOError, 'Seek past end of read-only Subfile' if self.offset < 0: self.offset = 0 return def tell(self): return self.seekpos # # Read-only, non-seekable subfile object # class InflatingSubfile: BLOCKSIZE = 8192 def __init__(self, fp, start, length): import zlib self.fp = fp self.curpos = start self.endpos = start + length self.outbuf = '' # NB the (undocumented, natch) negative argument # means that the compressed data does not have the # standard zlib header or checksum. 15 means that the # window size is 32K. self.zobj = zlib.decompressobj(-15) def close(self): self.fp = None self.zobj = None self.outbuf = None def isatty(self): return 0 def flush(self): # I don't think this needs to do anything for a read only file? pass def _fillbuf(self, size = -1): while 1: if (size >= 0 and size <= len(self.outbuf)) or self.zobj == None: break getsize = self.BLOCKSIZE if getsize + self.curpos > self.endpos: getsize = self.endpos - self.curpos if getsize > 0: self.fp.seek(self.curpos) getbuf = self.fp.read(getsize) self.curpos = self.curpos + getsize self.outbuf = self.outbuf + self.zobj.decompress(getbuf) else: # bug(?) in zlib sometimes requires an extra byte self.outbuf = self.outbuf + self.zobj.decompress('\x00') self.outbuf = self.outbuf + self.zobj.flush() self.zobj = None def read(self, size = -1): if size > len(self.outbuf) or size < 0: self._fillbuf(size) if size >= self.outbuf or size < 0: retval = self.outbuf self.outbuf = '' else: retval = self.outbuf[:size] self.outbuf = self.outbuf[size:] return retval def readline(self, size = -1): import string if size >= 0: self._fillbuf(size) while 1: pos = string.find(self.outbuf, '\n') if pos >= 0 or size >= len(self.outbuf) or self.zobj == None: pos = pos + 1 if size >= 0 and (pos > size or pos < 0): pos = size if size < 0 and self.zobj == None: pos = len(self.outbuf) retval = self.outbuf[:pos] self.outbuf = self.outbuf[pos:] return retval self._fillbuf(len(self.outbuf) + self.BLOCKSIZE) # # Represents date and time actually # class MSDOSDate: monthnames = ( 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec' ) def __init__(self, dosdate, dostime): self.dosdate = dosdate self.dostime = dostime self._parse() def _parse(self): self.disecs = ( ( self.dostime & 0x001F ) ); self.mins = ( ( self.dostime & 0x07E0 ) >> 5 ); self.hours = ( ( self.dostime & 0xF800 ) >> 11 ); self.mday = ( ( self.dosdate & 0x001F ) ); self.month = ( ( self.dosdate & 0x01E0 ) >> 5 ) - 1; self.year = ( ( self.dosdate & 0xFE00 ) >> 9 ); def __str__(self): return '%s %2d %2d:%02d:%02d %d' % ( self.monthnames[self.month], self.mday, self.hours, self.mins, 2 * self.disecs, self.year + 1980)