123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233 |
- # -*- coding: utf-8 -*-
- __version__ = '6.0.5'
- # v
- import io
- import stat
- import zipfile
- ZIP64_LIMIT = (1 << 31) + 1
- class LargePredictionSize(Exception):
- """
- Raised when Buffer is larger than ZIP64
- """
- class ZipflyStream(io.RawIOBase):
- """
- The RawIOBase ABC extends IOBase. It deals with
- the reading and writing of bytes to a stream. FileIO subclasses
- RawIOBase to provide an interface to files in the machine’s file system.
- """
- def __init__(self):
- self._buffer = b''
- self._size = 0
- def writable(self):
- return True
- def write(self, b):
- if self.closed:
- raise RuntimeError("ZipFly stream was closed!")
- self._buffer += b
- return len(b)
- def get(self):
- chunk = self._buffer
- self._buffer = b''
- self._size += len(chunk)
- return chunk
- def size(self):
- return self._size
- class ZipFly:
- def __init__(self,
- mode = 'w',
- paths = [],
- chunksize = 0x8000,
- compression = zipfile.ZIP_STORED,
- allowZip64 = True,
- compresslevel = None,
- storesize = 0,
- filesystem = 'fs',
- arcname = 'n',
- encode = 'utf-8',):
- """
- @param store size : int : size of all files
- in paths without compression
- """
- if mode not in ('w',):
- raise RuntimeError("ZipFly requires 'w' mode")
- if compression not in ( zipfile.ZIP_STORED,):
- raise RuntimeError("Not compression supported")
- if compresslevel not in (None, ):
- raise RuntimeError("Not compression level supported")
- if isinstance(chunksize, str):
- chunksize = int(chunksize, 16)
- self.comment = f'Written using Zipfly v{__version__}'
- self.mode = mode
- self.paths = paths
- self.filesystem = filesystem
- self.arcname = arcname
- self.compression = compression
- self.chunksize = chunksize
- self.allowZip64 = allowZip64
- self.compresslevel = compresslevel
- self.storesize = storesize
- self.encode = encode
- self.ezs = int('0x8e', 16) # empty zip size in bytes
- def set_comment(self, comment):
- if not isinstance(comment, bytes):
- comment = str.encode(comment)
- if len(comment) >= zipfile.ZIP_MAX_COMMENT:
- # trunk comment
- comment = comment[:zipfile.ZIP_MAX_COMMENT]
- self.comment = comment
- def reader(self, entry):
- def get_chunk():
- return entry.read( self.chunksize )
- return get_chunk()
- def buffer_size(self):
- '''
- FOR UNIT TESTING (not used)
- using to get the buffer size
- this size is different from the size of each file added
- '''
- for i in self.generator(): pass
- return self._buffer_size
- def buffer_prediction_size(self):
- if not self.allowZip64:
- raise RuntimeError("ZIP64 extensions required")
- # End of Central Directory Record
- EOCD = int('0x16', 16)
- FILE_OFFSET = int('0x5e', 16) * len(self.paths)
- tmp_comment = self.comment
- if isinstance(self.comment, bytes):
- tmp_comment = ( self.comment ).decode()
- size_comment = len(tmp_comment.encode( self.encode ))
- # path-name
- size_paths = 0
- #for path in self.paths:
- for idx in range(len(self.paths)):
- '''
- getting bytes from character in UTF-8 format
- example:
- '传' has 3 bytes in utf-8 format ( b'\xe4\xbc\xa0' )
- '''
- #path = paths[idx]
- name = self.arcname
- if not self.arcname in self.paths[idx]:
- name = self.filesystem
- tmp_name = self.paths[idx][name]
- if (tmp_name)[0] in ('/', ):
- # is dir then trunk
- tmp_name = (tmp_name)[ 1 : len( tmp_name ) ]
- size_paths += (len(tmp_name.encode( self.encode )) - int( '0x1', 16)) * int('0x2', 16)
- # zipsize
- zs = sum([EOCD,FILE_OFFSET,size_comment,size_paths,self.storesize,])
- if zs > ZIP64_LIMIT:
- raise LargePredictionSize(
- "Prediction size for zip file greater than 2 GB not supported"
- )
- return zs
- def generator(self):
- # stream
- stream = ZipflyStream()
- with zipfile.ZipFile(
- stream,
- mode = self.mode,
- compression = self.compression,
- allowZip64 = self.allowZip64,) as zf:
- for path in self.paths:
- if not self.filesystem in path:
- raise RuntimeError(f"'{self.filesystem}' key is required")
- """
- filesystem should be the path to a file or directory on the filesystem.
- arcname is the name which it will have within the archive (by default,
- this will be the same as filename
- """
- if not self.arcname in path:
- # arcname will be default path
- path[self.arcname] = path[self.filesystem]
- z_info = zipfile.ZipInfo.from_file(
- path[self.filesystem],
- path[self.arcname]
- )
- with open( path[self.filesystem], 'rb' ) as e:
- # Read from filesystem:
- with zf.open( z_info, mode=self.mode ) as d:
- for chunk in iter( lambda: e.read(self.chunksize), b'' ):
- d.write(chunk)
- yield stream.get()
- self.set_comment(self.comment)
- zf.comment = self.comment
- yield stream.get()
- self._buffer_size = stream.size()
- # Flush and close this stream.
- stream.close()
- def get_size(self):
- return self._buffer_size
|