1
    2
    3
    4
    5
    6
    7
    8
    9
   10
   11
   12
   13
   14
   15
   16
   17
   18
   19
   20
   21
   22
   23
   24
   25
   26
   27
   28
   29
   30
   31
   32
   33
   34
   35
   36
   37
   38
   39
   40
   41
   42
   43
   44
   45
   46
   47
   48
   49
   50
   51
   52
   53
   54
   55
   56
   57
   58
   59
   60
   61
   62
   63
   64
   65
   66
   67
   68
   69
   70
   71
   72
   73
   74
   75
   76
   77
   78
   79
   80
   81
   82
   83
   84
   85
   86
   87
   88
   89
   90
   91
   92
   93
   94
   95
   96
   97
   98
   99
  100
  101
  102
  103
  104
  105
  106
  107
  108
  109
  110
  111
  112
  113
  114
  115
  116
  117
  118
  119
  120
  121
  122
  123
  124
  125
  126
  127
  128
  129
  130
  131
  132
  133
  134
  135
  136
  137
  138
  139
  140
  141
  142
  143
  144
  145
  146
  147
  148
  149
  150
  151
  152
  153
  154
  155
  156
  157
  158
  159
  160
  161
  162
  163
  164
  165
  166
  167
  168
  169
  170
  171
  172
  173
  174
  175
  176
  177
  178
  179
  180
  181
  182
  183
  184
  185
  186
  187
  188
  189
  190
  191
  192
  193
  194
  195
  196
  197
  198
  199
  200
  201
  202
  203
  204
  205
  206
  207
  208
  209
  210
  211
  212
  213
  214
  215
  216
  217
  218
  219
  220
  221
  222
  223
  224
  225
  226
  227
  228
  229
  230
  231
  232
  233
  234
  235
  236
  237
  238

build / zip_helpers.py [blame]

# Copyright 2023 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Helper functions for dealing with .zip files."""

import os
import pathlib
import posixpath
import stat
import time
import zipfile

_FIXED_ZIP_HEADER_LEN = 30


def _set_alignment(zip_obj, zip_info, alignment):
  """Sets a ZipInfo's extra field such that the file will be aligned.

  Args:
    zip_obj: The ZipFile object that is being written.
    zip_info: The ZipInfo object about to be written.
    alignment: The amount of alignment (e.g. 4, or 4*1024).
  """
  header_size = _FIXED_ZIP_HEADER_LEN + len(zip_info.filename)
  pos = zip_obj.fp.tell() + header_size
  padding_needed = (alignment - (pos % alignment)) % alignment

  # Python writes |extra| to both the local file header and the central
  # directory's file header. Android's zipalign tool writes only to the
  # local file header, so there is more overhead in using Python to align.
  zip_info.extra = b'\0' * padding_needed


def _hermetic_date_time(timestamp=None):
  if not timestamp:
    return (2001, 1, 1, 0, 0, 0)
  utc_time = time.gmtime(timestamp)
  return (utc_time.tm_year, utc_time.tm_mon, utc_time.tm_mday, utc_time.tm_hour,
          utc_time.tm_min, utc_time.tm_sec)


def add_to_zip_hermetic(zip_file,
                        zip_path,
                        *,
                        src_path=None,
                        data=None,
                        compress=None,
                        alignment=None,
                        timestamp=None):
  """Adds a file to the given ZipFile with a hard-coded modified time.

  Args:
    zip_file: ZipFile instance to add the file to.
    zip_path: Destination path within the zip file (or ZipInfo instance).
    src_path: Path of the source file. Mutually exclusive with |data|.
    data: File data as a string.
    compress: Whether to enable compression. Default is taken from ZipFile
        constructor.
    alignment: If set, align the data of the entry to this many bytes.
    timestamp: The last modification date and time for the archive member.
  """
  assert (src_path is None) != (data is None), (
      '|src_path| and |data| are mutually exclusive.')
  if isinstance(zip_path, zipfile.ZipInfo):
    zipinfo = zip_path
    zip_path = zipinfo.filename
  else:
    zipinfo = zipfile.ZipInfo(filename=zip_path)
    zipinfo.external_attr = 0o644 << 16

  zipinfo.date_time = _hermetic_date_time(timestamp)

  if alignment:
    _set_alignment(zip_file, zipinfo, alignment)

  # Filenames can contain backslashes, but it is more likely that we've
  # forgotten to use forward slashes as a directory separator.
  assert '\\' not in zip_path, 'zip_path should not contain \\: ' + zip_path
  assert not posixpath.isabs(zip_path), 'Absolute zip path: ' + zip_path
  assert not zip_path.startswith('..'), 'Should not start with ..: ' + zip_path
  assert posixpath.normpath(zip_path) == zip_path, (
      f'Non-canonical zip_path: {zip_path} vs: {posixpath.normpath(zip_path)}')
  assert zip_path not in zip_file.namelist(), (
      'Tried to add a duplicate zip entry: ' + zip_path)

  if src_path and os.path.islink(src_path):
    zipinfo.external_attr |= stat.S_IFLNK << 16  # mark as a symlink
    zip_file.writestr(zipinfo, os.readlink(src_path))
    return

  # Maintain the executable bit.
  if src_path:
    st = os.stat(src_path)
    for mode in (stat.S_IXUSR, stat.S_IXGRP, stat.S_IXOTH):
      if st.st_mode & mode:
        zipinfo.external_attr |= mode << 16

  if src_path:
    with open(src_path, 'rb') as f:
      data = f.read()

  # zipfile will deflate even when it makes the file bigger. To avoid
  # growing files, disable compression at an arbitrary cut off point.
  if len(data) < 16:
    compress = False

  # None converts to ZIP_STORED, when passed explicitly rather than the
  # default passed to the ZipFile constructor.
  compress_type = zip_file.compression
  if compress is not None:
    compress_type = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
  zip_file.writestr(zipinfo, data, compress_type)


def add_files_to_zip(inputs,
                     output,
                     *,
                     base_dir=None,
                     compress=None,
                     zip_prefix_path=None,
                     timestamp=None):
  """Creates a zip file from a list of files.

  Args:
    inputs: A list of paths to zip, or a list of (zip_path, fs_path) tuples.
    output: Path, fileobj, or ZipFile instance to add files to.
    base_dir: Prefix to strip from inputs.
    compress: Whether to compress
    zip_prefix_path: Path prepended to file path in zip file.
    timestamp: Unix timestamp to use for files in the archive.
  """
  if base_dir is None:
    base_dir = '.'
  input_tuples = []
  for tup in inputs:
    if isinstance(tup, str):
      src_path = tup
      zip_path = os.path.relpath(src_path, base_dir)
      # Zip files always use / as path separator.
      if os.path.sep != posixpath.sep:
        zip_path = str(pathlib.Path(zip_path).as_posix())
      tup = (zip_path, src_path)
    input_tuples.append(tup)

  # Sort by zip path to ensure stable zip ordering.
  input_tuples.sort(key=lambda tup: tup[0])

  out_zip = output
  if not isinstance(output, zipfile.ZipFile):
    out_zip = zipfile.ZipFile(output, 'w')

  try:
    for zip_path, fs_path in input_tuples:
      if zip_prefix_path:
        zip_path = posixpath.join(zip_prefix_path, zip_path)
      add_to_zip_hermetic(out_zip,
                          zip_path,
                          src_path=fs_path,
                          compress=compress,
                          timestamp=timestamp)
  finally:
    if output is not out_zip:
      out_zip.close()


def zip_directory(output, base_dir, **kwargs):
  """Zips all files in the given directory."""
  inputs = []
  for root, _, files in os.walk(base_dir):
    for f in files:
      inputs.append(os.path.join(root, f))

  add_files_to_zip(inputs, output, base_dir=base_dir, **kwargs)


def merge_zips(output, input_zips, path_transform=None, compress=None):
  """Combines all files from |input_zips| into |output|.

  Args:
    output: Path, fileobj, or ZipFile instance to add files to.
    input_zips: Iterable of paths to zip files to merge.
    path_transform: Called for each entry path. Returns a new path, or None to
        skip the file.
    compress: Overrides compression setting from origin zip entries.
  """
  assert not isinstance(input_zips, str)  # Easy mistake to make.
  if isinstance(output, zipfile.ZipFile):
    out_zip = output
    out_filename = output.filename
  else:
    assert isinstance(output, str), 'Was: ' + repr(output)
    out_zip = zipfile.ZipFile(output, 'w')
    out_filename = output

  # Include paths in the existing zip here to avoid adding duplicate files.
  crc_by_name = {i.filename: (out_filename, i.CRC) for i in out_zip.infolist()}

  try:
    for in_file in input_zips:
      with zipfile.ZipFile(in_file, 'r') as in_zip:
        for info in in_zip.infolist():
          # Ignore directories.
          if info.filename[-1] == '/':
            continue
          if path_transform:
            dst_name = path_transform(info.filename)
            if dst_name is None:
              continue
          else:
            dst_name = info.filename

          data = in_zip.read(info)

          # If there's a duplicate file, ensure contents is the same and skip
          # adding it multiple times.
          if dst_name in crc_by_name:
            orig_filename, orig_crc = crc_by_name[dst_name]
            new_crc = zipfile.crc32(data)
            if new_crc == orig_crc:
              continue
            msg = f"""File appeared in multiple inputs with differing contents.
File: {dst_name}
Input1: {orig_filename}
Input2: {in_file}"""
            raise Exception(msg)

          if compress is not None:
            compress_entry = compress
          else:
            compress_entry = info.compress_type != zipfile.ZIP_STORED
          add_to_zip_hermetic(out_zip,
                              dst_name,
                              data=data,
                              compress=compress_entry)
          crc_by_name[dst_name] = (in_file, out_zip.getinfo(dst_name).CRC)
  finally:
    if output is not out_zip:
      out_zip.close()