/usr/lib/python2.7/dist-packages/pex/util.py is in python-pex 1.1.14-2ubuntu2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 | # Copyright 2014 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).
from __future__ import absolute_import
import contextlib
import os
import shutil
import tempfile
import uuid
from hashlib import sha1
from threading import Lock
from pkg_resources import find_distributions, resource_isdir, resource_listdir, resource_string
from .common import rename_if_empty, safe_mkdir, safe_mkdtemp, safe_open
from .finders import register_finders
class DistributionHelper(object):
@classmethod
def walk_data(cls, dist, path='/'):
"""Yields filename, stream for files identified as data in the distribution"""
for rel_fn in filter(None, dist.resource_listdir(path)):
full_fn = os.path.join(path, rel_fn)
if dist.resource_isdir(full_fn):
for fn, stream in cls.walk_data(dist, full_fn):
yield fn, stream
else:
yield full_fn[1:], dist.get_resource_stream(dist._provider, full_fn)
@staticmethod
def zipsafe(dist):
"""Returns whether or not we determine a distribution is zip-safe."""
# zip-safety is only an attribute of eggs. wheels are considered never
# zip safe per implications of PEP 427.
if hasattr(dist, 'egg_info') and dist.egg_info.endswith('EGG-INFO'):
egg_metadata = dist.metadata_listdir('')
return 'zip-safe' in egg_metadata and 'native_libs.txt' not in egg_metadata
else:
return False
@classmethod
def access_zipped_assets(cls, static_module_name, static_path, dir_location=None):
"""
Create a copy of static resource files as we can't serve them from within the pex file.
:param static_module_name: Module name containing module to cache in a tempdir
:type static_module_name: string, for example 'twitter.common.zookeeper' or similar
:param static_path: Module name, for example 'serverset'
:param dir_location: create a new temporary directory inside, or None to have one created
:returns temp_dir: Temporary directory with the zipped assets inside
:rtype: str
"""
# asset_path is initially a module name that's the same as the static_path, but will be
# changed to walk the directory tree
def walk_zipped_assets(static_module_name, static_path, asset_path, temp_dir):
for asset in resource_listdir(static_module_name, asset_path):
asset_target = os.path.normpath(
os.path.join(os.path.relpath(asset_path, static_path), asset))
if resource_isdir(static_module_name, os.path.join(asset_path, asset)):
safe_mkdir(os.path.join(temp_dir, asset_target))
walk_zipped_assets(static_module_name, static_path, os.path.join(asset_path, asset),
temp_dir)
else:
with open(os.path.join(temp_dir, asset_target), 'wb') as fp:
path = os.path.join(static_path, asset_target)
file_data = resource_string(static_module_name, path)
fp.write(file_data)
if dir_location is None:
temp_dir = safe_mkdtemp()
else:
temp_dir = dir_location
walk_zipped_assets(static_module_name, static_path, static_path, temp_dir)
return temp_dir
@classmethod
def distribution_from_path(cls, path, name=None):
"""Return a distribution from a path.
If name is provided, find the distribution. If none is found matching the name,
return None. If name is not provided and there is unambiguously a single
distribution, return that distribution otherwise None.
"""
# Monkeypatch pkg_resources finders should it not already be so.
register_finders()
if name is None:
distributions = list(find_distributions(path))
if len(distributions) == 1:
return distributions[0]
else:
for dist in find_distributions(path):
if dist.project_name == name:
return dist
class CacheHelper(object):
@classmethod
def update_hash(cls, filelike, digest):
"""Update the digest of a single file in a memory-efficient manner."""
block_size = digest.block_size * 1024
for chunk in iter(lambda: filelike.read(block_size), b''):
digest.update(chunk)
@classmethod
def hash(cls, path, digest=None, hasher=sha1):
"""Return the digest of a single file in a memory-efficient manner."""
if digest is None:
digest = hasher()
with open(path, 'rb') as fh:
cls.update_hash(fh, digest)
return digest.hexdigest()
@classmethod
def _compute_hash(cls, names, stream_factory):
digest = sha1()
# Always use / as the path separator, since that's what zip uses.
hashed_names = [n.replace(os.sep, '/') for n in names]
digest.update(''.join(hashed_names).encode('utf-8'))
for name in names:
with contextlib.closing(stream_factory(name)) as fp:
cls.update_hash(fp, digest)
return digest.hexdigest()
@classmethod
def zip_hash(cls, zf, prefix=''):
"""Return the hash of the contents of a zipfile, comparable with a cls.dir_hash."""
prefix_length = len(prefix)
names = sorted(name[prefix_length:] for name in zf.namelist()
if name.startswith(prefix) and not name.endswith('.pyc') and not name.endswith('/'))
def stream_factory(name):
return zf.open(prefix + name)
return cls._compute_hash(names, stream_factory)
@classmethod
def _iter_files(cls, directory):
normpath = os.path.realpath(os.path.normpath(directory))
for root, _, files in os.walk(normpath):
for f in files:
yield os.path.relpath(os.path.join(root, f), normpath)
@classmethod
def pex_hash(cls, d):
"""Return a reproducible hash of the contents of a directory."""
names = sorted(f for f in cls._iter_files(d) if not (f.endswith('.pyc') or f.startswith('.')))
def stream_factory(name):
return open(os.path.join(d, name), 'rb') # noqa: T802
return cls._compute_hash(names, stream_factory)
@classmethod
def dir_hash(cls, d):
"""Return a reproducible hash of the contents of a directory."""
names = sorted(f for f in cls._iter_files(d) if not f.endswith('.pyc'))
def stream_factory(name):
return open(os.path.join(d, name), 'rb') # noqa: T802
return cls._compute_hash(names, stream_factory)
@classmethod
def cache_distribution(cls, zf, source, target_dir):
"""Possibly cache an egg from within a zipfile into target_cache.
Given a zipfile handle and a filename corresponding to an egg distribution within
that zip, maybe write to the target cache and return a Distribution."""
dependency_basename = os.path.basename(source)
if not os.path.exists(target_dir):
target_dir_tmp = target_dir + '.' + uuid.uuid4().hex
for name in zf.namelist():
if name.startswith(source) and not name.endswith('/'):
# strip off prefix + '/'
target_name = os.path.join(dependency_basename, name[len(source) + 1:])
with contextlib.closing(zf.open(name)) as zi:
with safe_open(os.path.join(target_dir_tmp, target_name), 'wb') as fp:
shutil.copyfileobj(zi, fp)
rename_if_empty(target_dir_tmp, target_dir)
dist = DistributionHelper.distribution_from_path(target_dir)
assert dist is not None, 'Failed to cache distribution %s' % source
return dist
class Memoizer(object):
"""A thread safe class for memoizing the results of a computation."""
def __init__(self):
self._data = {}
self._lock = Lock()
def get(self, key, default=None):
with self._lock:
return self._data.get(key, default)
def store(self, key, value):
with self._lock:
self._data[key] = value
@contextlib.contextmanager
def named_temporary_file(*args, **kwargs):
"""
Due to a bug in python (https://bugs.python.org/issue14243), we need
this to be able to use the temporary file without deleting it.
"""
assert 'delete' not in kwargs
kwargs['delete'] = False
fp = tempfile.NamedTemporaryFile(*args, **kwargs)
try:
with fp:
yield fp
finally:
os.remove(fp.name)
|