1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
build / zip_helpers.py [blame]
# Copyright 2023 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Helper functions for dealing with .zip files."""
import os
import pathlib
import posixpath
import stat
import time
import zipfile
_FIXED_ZIP_HEADER_LEN = 30
def _set_alignment(zip_obj, zip_info, alignment):
"""Sets a ZipInfo's extra field such that the file will be aligned.
Args:
zip_obj: The ZipFile object that is being written.
zip_info: The ZipInfo object about to be written.
alignment: The amount of alignment (e.g. 4, or 4*1024).
"""
header_size = _FIXED_ZIP_HEADER_LEN + len(zip_info.filename)
pos = zip_obj.fp.tell() + header_size
padding_needed = (alignment - (pos % alignment)) % alignment
# Python writes |extra| to both the local file header and the central
# directory's file header. Android's zipalign tool writes only to the
# local file header, so there is more overhead in using Python to align.
zip_info.extra = b'\0' * padding_needed
def _hermetic_date_time(timestamp=None):
if not timestamp:
return (2001, 1, 1, 0, 0, 0)
utc_time = time.gmtime(timestamp)
return (utc_time.tm_year, utc_time.tm_mon, utc_time.tm_mday, utc_time.tm_hour,
utc_time.tm_min, utc_time.tm_sec)
def add_to_zip_hermetic(zip_file,
zip_path,
*,
src_path=None,
data=None,
compress=None,
alignment=None,
timestamp=None):
"""Adds a file to the given ZipFile with a hard-coded modified time.
Args:
zip_file: ZipFile instance to add the file to.
zip_path: Destination path within the zip file (or ZipInfo instance).
src_path: Path of the source file. Mutually exclusive with |data|.
data: File data as a string.
compress: Whether to enable compression. Default is taken from ZipFile
constructor.
alignment: If set, align the data of the entry to this many bytes.
timestamp: The last modification date and time for the archive member.
"""
assert (src_path is None) != (data is None), (
'|src_path| and |data| are mutually exclusive.')
if isinstance(zip_path, zipfile.ZipInfo):
zipinfo = zip_path
zip_path = zipinfo.filename
else:
zipinfo = zipfile.ZipInfo(filename=zip_path)
zipinfo.external_attr = 0o644 << 16
zipinfo.date_time = _hermetic_date_time(timestamp)
if alignment:
_set_alignment(zip_file, zipinfo, alignment)
# Filenames can contain backslashes, but it is more likely that we've
# forgotten to use forward slashes as a directory separator.
assert '\\' not in zip_path, 'zip_path should not contain \\: ' + zip_path
assert not posixpath.isabs(zip_path), 'Absolute zip path: ' + zip_path
assert not zip_path.startswith('..'), 'Should not start with ..: ' + zip_path
assert posixpath.normpath(zip_path) == zip_path, (
f'Non-canonical zip_path: {zip_path} vs: {posixpath.normpath(zip_path)}')
assert zip_path not in zip_file.namelist(), (
'Tried to add a duplicate zip entry: ' + zip_path)
if src_path and os.path.islink(src_path):
zipinfo.external_attr |= stat.S_IFLNK << 16 # mark as a symlink
zip_file.writestr(zipinfo, os.readlink(src_path))
return
# Maintain the executable bit.
if src_path:
st = os.stat(src_path)
for mode in (stat.S_IXUSR, stat.S_IXGRP, stat.S_IXOTH):
if st.st_mode & mode:
zipinfo.external_attr |= mode << 16
if src_path:
with open(src_path, 'rb') as f:
data = f.read()
# zipfile will deflate even when it makes the file bigger. To avoid
# growing files, disable compression at an arbitrary cut off point.
if len(data) < 16:
compress = False
# None converts to ZIP_STORED, when passed explicitly rather than the
# default passed to the ZipFile constructor.
compress_type = zip_file.compression
if compress is not None:
compress_type = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
zip_file.writestr(zipinfo, data, compress_type)
def add_files_to_zip(inputs,
output,
*,
base_dir=None,
compress=None,
zip_prefix_path=None,
timestamp=None):
"""Creates a zip file from a list of files.
Args:
inputs: A list of paths to zip, or a list of (zip_path, fs_path) tuples.
output: Path, fileobj, or ZipFile instance to add files to.
base_dir: Prefix to strip from inputs.
compress: Whether to compress
zip_prefix_path: Path prepended to file path in zip file.
timestamp: Unix timestamp to use for files in the archive.
"""
if base_dir is None:
base_dir = '.'
input_tuples = []
for tup in inputs:
if isinstance(tup, str):
src_path = tup
zip_path = os.path.relpath(src_path, base_dir)
# Zip files always use / as path separator.
if os.path.sep != posixpath.sep:
zip_path = str(pathlib.Path(zip_path).as_posix())
tup = (zip_path, src_path)
input_tuples.append(tup)
# Sort by zip path to ensure stable zip ordering.
input_tuples.sort(key=lambda tup: tup[0])
out_zip = output
if not isinstance(output, zipfile.ZipFile):
out_zip = zipfile.ZipFile(output, 'w')
try:
for zip_path, fs_path in input_tuples:
if zip_prefix_path:
zip_path = posixpath.join(zip_prefix_path, zip_path)
add_to_zip_hermetic(out_zip,
zip_path,
src_path=fs_path,
compress=compress,
timestamp=timestamp)
finally:
if output is not out_zip:
out_zip.close()
def zip_directory(output, base_dir, **kwargs):
"""Zips all files in the given directory."""
inputs = []
for root, _, files in os.walk(base_dir):
for f in files:
inputs.append(os.path.join(root, f))
add_files_to_zip(inputs, output, base_dir=base_dir, **kwargs)
def merge_zips(output, input_zips, path_transform=None, compress=None):
"""Combines all files from |input_zips| into |output|.
Args:
output: Path, fileobj, or ZipFile instance to add files to.
input_zips: Iterable of paths to zip files to merge.
path_transform: Called for each entry path. Returns a new path, or None to
skip the file.
compress: Overrides compression setting from origin zip entries.
"""
assert not isinstance(input_zips, str) # Easy mistake to make.
if isinstance(output, zipfile.ZipFile):
out_zip = output
out_filename = output.filename
else:
assert isinstance(output, str), 'Was: ' + repr(output)
out_zip = zipfile.ZipFile(output, 'w')
out_filename = output
# Include paths in the existing zip here to avoid adding duplicate files.
crc_by_name = {i.filename: (out_filename, i.CRC) for i in out_zip.infolist()}
try:
for in_file in input_zips:
with zipfile.ZipFile(in_file, 'r') as in_zip:
for info in in_zip.infolist():
# Ignore directories.
if info.filename[-1] == '/':
continue
if path_transform:
dst_name = path_transform(info.filename)
if dst_name is None:
continue
else:
dst_name = info.filename
data = in_zip.read(info)
# If there's a duplicate file, ensure contents is the same and skip
# adding it multiple times.
if dst_name in crc_by_name:
orig_filename, orig_crc = crc_by_name[dst_name]
new_crc = zipfile.crc32(data)
if new_crc == orig_crc:
continue
msg = f"""File appeared in multiple inputs with differing contents.
File: {dst_name}
Input1: {orig_filename}
Input2: {in_file}"""
raise Exception(msg)
if compress is not None:
compress_entry = compress
else:
compress_entry = info.compress_type != zipfile.ZIP_STORED
add_to_zip_hermetic(out_zip,
dst_name,
data=data,
compress=compress_entry)
crc_by_name[dst_name] = (in_file, out_zip.getinfo(dst_name).CRC)
finally:
if output is not out_zip:
out_zip.close()