import binascii
import collections
import collections.abc
import contextlib
import functools
import glob
import io
import itertools
import json
import keyword
import os
import platform
import shutil
import sys
import tarfile
import tempfile
import time
import token
import tokenize
import uuid
import parso
is_windows = platform.system() == 'Windows'
# gzip support detection
try:
import zlib # pylint: disable=unused-import # noqa: F401
import gzip
gzip.GzipFile # pylint: disable=pointless-statement
except (ImportError, AttributeError): # pragma: no cover
has_gz_support = False
else:
has_gz_support = True
# multiprocessing support detection and CPU_CNT retrieval
try: # try first
import multiprocessing
except ImportError: # pragma: no cover
multiprocessing = None
else: # CPU number if multiprocessing supported
if os.name == 'posix' and 'SC_NPROCESSORS_CONF' in os.sysconf_names: # pylint: disable=no-member # pragma: no cover
CPU_CNT = os.sysconf('SC_NPROCESSORS_CONF') # pylint: disable=no-member
elif 'sched_getaffinity' in os.__all__: # pragma: no cover
CPU_CNT = len(os.sched_getaffinity(0)) # pylint: disable=no-member
else: # pragma: no cover
CPU_CNT = os.cpu_count() or 1
finally: # alias and aftermath
mp = multiprocessing
del multiprocessing
parallel_available = mp is not None and CPU_CNT > 1
try:
from contextlib import nullcontext # novermin
except ImportError: # backport contextlib.nullcontext for Python < 3.7 # pragma: no cover
@contextlib.contextmanager
def nullcontext(enter_result=None):
yield enter_result
LOOKUP_TABLE = '_lookup_table.json'
PARSO_GRAMMAR_VERSIONS = []
for file in glob.iglob(os.path.join(parso.__path__[0], 'python', 'grammar*.txt')):
version = os.path.basename(file)[7:-4]
PARSO_GRAMMAR_VERSIONS.append((int(version[0]), int(version[1:])))
PARSO_GRAMMAR_VERSIONS = sorted(PARSO_GRAMMAR_VERSIONS)
del file, version # pylint: disable=undefined-loop-variable
[docs]def get_parso_grammar_versions(minimum=None):
"""Get Python versions that parso supports to parse grammar.
Args:
minimum (str): filter result by this minimum version
Returns:
List[str]: a list of Python versions that parso supports to parse grammar
Raises:
ValueError: if ``minimum`` is invalid
"""
if minimum is None:
return ['{}.{}'.format(*v) for v in PARSO_GRAMMAR_VERSIONS]
try:
minimum = tuple(map(int, minimum.split('.')))
except Exception:
raise ValueError('invalid minimum version') from None
else:
return ['{}.{}'.format(*v) for v in PARSO_GRAMMAR_VERSIONS if v >= minimum]
[docs]def first_truthy(*args):
"""Return the first *truthy* value from a list of values.
Args:
*args: variable length argument list
* If one positional argument is provided, it should be an iterable of the values.
* If two or more positional arguments are provided, then the value list is the positional argument list.
Returns:
Any: the first *truthy* value, if no *truthy* values found or sequence is empty, return :data:`None`
Raises:
TypeError: if no arguments provided
"""
if not args:
raise TypeError('no arguments provided')
if len(args) == 1:
args = args[0]
return next(filter(bool, args), None) # pylint: disable=filter-builtin-not-iterating
[docs]def first_non_none(*args):
"""Return the first non-:data:`None` value from a list of values.
Args:
*args: variable length argument list
* If one positional argument is provided, it should be an iterable of the values.
* If two or more positional arguments are provided, then the value list is the positional argument list.
Returns:
Any: the first non-:data:`None` value, if all values are :data:`None` or sequence is empty, return :data:`None`
Raises:
TypeError: if no arguments provided
"""
if not args:
raise TypeError('no arguments provided')
if len(args) == 1:
args = args[0]
return next(filter(lambda x: x is not None, args), None) # pylint: disable=filter-builtin-not-iterating
#: Dict[str, bool]: A mapping from string representation to boolean states.
#: The values are used for :func:`parse_boolean_state`.
_boolean_state_lookup = {
'1': True,
'yes': True,
'y': True,
'true': True,
'on': True,
'0': False,
'no': False,
'n': False,
'false': False,
'off': False,
}
[docs]def parse_boolean_state(s):
"""Parse a boolean state from a string representation.
* These values are regarded as :data:`True`: ``'1'``, ``'yes'``, ``'y'``, ``'true'``, ``'on'``
* These values are regarded as :data:`False`: ``'0'``, ``'no'``, ``'n'``, ``'false'``, ``'off'``
Value matching is case **insensitive**.
Args:
s (Optional[str]): string representation of a boolean state
Returns:
Optional[bool]: the parsed boolean result, return :data:`None` if input is :data:`None`
Raises:
ValueError: if ``s`` is an invalid boolean state value
See Also:
See :data:`_boolean_state_lookup` for default lookup mapping values.
"""
if s is None:
return None
try:
return _boolean_state_lookup[s.lower()]
except KeyError:
raise ValueError('invalid boolean state value {!r}'.format(s)) from None
#: Dict[str, str]: A mapping from string representation to linesep.
#: The values are used for :func:`parse_linesep`.
_linesep_lookup = {
'\n': '\n',
'lf': '\n',
'\r\n': '\r\n',
'crlf': '\r\n',
'\r': '\r',
'cr': '\r',
}
[docs]def parse_linesep(s):
r"""Parse linesep from a string representation.
* These values are regarded as ``'\n'``: ``'\n'``, ``'lf'``
* These values are regarded as ``'\r\n'``: ``'\r\n'``, ``'crlf'``
* These values are regarded as ``'\r'``: ``'\r'``, ``'cr'``
Value matching is **case insensitive**.
Args:
s (Optional[str]): string representation of linesep
Returns:
Optional[Literal['\\n', '\\r\\n', '\\r']]: the parsed linesep result,
return :data:`None` if input is :data:`None` or empty string
Raises:
ValueError: if ``s`` is an invalid linesep value
See Also:
See :data:`_linesep_lookup` for default lookup mapping values.
"""
if not s:
return None
try:
return _linesep_lookup[s.lower()]
except KeyError:
raise ValueError('invalid linesep value {!r}'.format(s)) from None
[docs]def parse_indentation(s):
"""Parse indentation from a string representation.
* If a string of positive integer ``n`` is specified, then indentation is ``n`` spaces.
* If ``'t'`` or ``'tab'`` is specified, then indentation is tab.
Value matching is **case insensitive**.
Args:
s (Optional[str]): string representation of indentation
Returns:
Optional[str]: the parsed indentation result, return :data:`None` if input is :data:`None` or empty string
Raises:
ValueError: if ``s`` is an invalid indentation value
"""
if not s:
return None
if s.lower() in {'t', 'tab'}:
return '\t'
try:
n = int(s)
if n <= 0:
raise ValueError
return ' ' * n
except ValueError:
raise ValueError('invalid indentation value {!r}'.format(s)) from None
[docs]class BPCSyntaxError(SyntaxError):
"""Syntax error detected when parsing code."""
[docs]class UUID4Generator:
"""UUID 4 generator wrapper to prevent UUID collisions."""
[docs] def __init__(self, dash=True):
"""Constructor of UUID 4 generator wrapper.
Args:
dash (bool): whether the generated UUID string has dashes or not
"""
self.used_uuids = set()
self.dash = dash
[docs] def gen(self):
"""Generate a new UUID 4 string that is guaranteed not to collide with used UUIDs.
Returns:
str: a new UUID 4 string
"""
while True:
nuid = uuid.uuid4()
nuid = str(nuid) if self.dash else nuid.hex
if nuid not in self.used_uuids: # pragma: no cover
break
self.used_uuids.add(nuid)
return nuid
def is_python_filename(filename):
"""Determine whether a file is a Python source file by its extension.
Args:
filename (str): the name of the file
Returns:
bool: whether the file is a Python source file
"""
if is_windows: # pragma: no cover
filename = filename.lower()
return os.path.splitext(filename)[1] in {'.py', '.pyw'}
#: Wrapper function to perform glob expansion.
expand_glob_iter = glob.iglob if sys.version_info[:2] < (3, 5) else functools.partial(glob.iglob, recursive=True)
[docs]def detect_files(files):
"""Get a list of Python files to be processed according to user input.
This will perform *glob* expansion on Windows, make all paths absolute,
resolve symbolic links and remove duplicates.
Args:
files (List[str]): a list of files and directories to process
(usually provided by users on command-line)
Returns:
List[str]: a list of Python files to be processed
See Also:
See :func:`expand_glob_iter` for more information.
"""
file_list = []
directory_queue = collections.deque()
directory_visited = set()
# perform glob expansion on windows
if is_windows: # pragma: no cover
files = itertools.chain.from_iterable(map(expand_glob_iter, files))
# find top-level files and directories
for file in files: # pylint: disable=redefined-outer-name
file = os.path.realpath(file)
if os.path.isfile(file): # user specified files should be added even without .py extension
file_list.append(file)
elif os.path.isdir(file):
directory_queue.appendleft(file)
directory_visited.add(file)
# find files in subdirectories
while directory_queue:
directory = directory_queue.pop()
for item in os.listdir(directory):
item_path = os.path.join(directory, item)
item_realpath = os.path.realpath(item_path)
if os.path.isfile(item_realpath) and (is_python_filename(item_path) or is_python_filename(item_realpath)):
file_list.append(item_realpath)
elif os.path.isdir(item_realpath):
if item_realpath not in directory_visited: # avoid symlink directory loops
directory_queue.appendleft(item_realpath)
directory_visited.add(item_realpath)
# remove duplicates (including hard links pointing to the same file)
file_dict = {}
for file in file_list:
file_stat = os.stat(file)
file_dict[(file_stat.st_ino, file_stat.st_dev)] = file
return list(file_dict.values())
[docs]def archive_files(files, archive_dir):
"""Archive the list of files into a *tar* file.
Args:
files (List[str]): a list of files to be archived (should be *absolute path*)
archive_dir (os.PathLike): the directory to save the archive
Returns:
str: path to the generated *tar* archive
"""
uuid_gen = UUID4Generator()
lookup_table = {uuid_gen.gen() + '.py': file for file in files}
archive_file = 'archive-{}-{}.tar'.format(time.strftime('%Y%m%d%H%M%S'), binascii.hexlify(os.urandom(8)))
archive_mode = 'w'
if has_gz_support: # pragma: no cover
archive_file += '.gz'
archive_mode += ':gz'
archive_file = os.path.join(archive_dir, archive_file)
os.makedirs(archive_dir, exist_ok=True)
with tarfile.open(archive_file, archive_mode) as tarf:
for arcname, realname in lookup_table.items():
tarf.add(realname, arcname)
with tempfile.NamedTemporaryFile('w', encoding='utf-8', prefix='bpc-archive-lookup-',
suffix='.json', delete=False) as tmpf:
json.dump(lookup_table, tmpf, indent=4)
tarf.add(tmpf.name, LOOKUP_TABLE)
with contextlib.suppress(OSError):
os.remove(tmpf.name)
return archive_file
[docs]def recover_files(archive_file):
"""Recover files from a *tar* archive.
Args:
archive_file (os.PathLike): path to the *tar* archive file
"""
with tarfile.open(archive_file, 'r') as tarf:
with tempfile.TemporaryDirectory(prefix='bpc-archive-extract-') as tmpd:
tarf.extractall(tmpd)
with open(os.path.join(tmpd, LOOKUP_TABLE)) as lookupf:
lookup_table = json.load(lookupf)
for arcname, realname in lookup_table.items():
os.makedirs(os.path.dirname(realname), exist_ok=True)
shutil.move(os.path.join(tmpd, arcname), realname)
[docs]def detect_encoding(code):
"""Detect encoding of Python source code as specified in :pep:`263`.
Args:
code (bytes): the code to detect encoding
Returns:
str: the detected encoding, or the default encoding (``utf-8``)
Raises:
TypeError: if ``code`` is not a :obj:`bytes` string
"""
if not isinstance(code, bytes):
raise TypeError("'code' should be bytes")
with io.BytesIO(code) as file: # pylint: disable=redefined-outer-name
return tokenize.detect_encoding(file.readline)[0]
[docs]class MakeTextIO:
"""Context wrapper class to handle :obj:`str` and *file* objects together.
Attributes:
obj (Union[str, TextIO]): the object to manage in the context
sio (Optional[StringIO]): the I/O object to manage in the context
only if :attr:`self.obj <MakeTextIO.obj>` is :obj:`str`
pos (Optional[int]): the original offset of :attr:`self.obj <MakeTextIO.obj>`,
only if :attr:`self.obj <MakeTextIO.obj>` is a seekable *file* object
"""
[docs] def __init__(self, obj):
"""Initialize context.
Args:
obj (Union[str, TextIO]): the object to manage in the context
"""
self.obj = obj
[docs] def __enter__(self):
"""Enter context.
* If :attr:`self.obj <MakeTextIO.obj>` is :obj:`str`, a
:class:`~io.StringIO` will be created and returned.
* If :attr:`self.obj <MakeTextIO.obj>` is a seekable *file* object,
it will be seeked to the beginning and returned.
* If :attr:`self.obj <MakeTextIO.obj>` is an unseekable *file* object,
it will be returned directly.
"""
if isinstance(self.obj, str):
#: StringIO: the I/O object to manage in the context
#: only if :attr:`self.obj <MakeTextIO.obj>` is :obj:`str`
self.sio = io.StringIO(self.obj, newline='') # turn off newline translation # pylint: disable=W0201
return self.sio
if self.obj.seekable():
#: int: the original offset of :attr:`self.obj <MakeTextIO.obj>`,
#: only if :attr:`self.obj <MakeTextIO.obj>` is a seekable
#: :class:`TextIO <io.TextIOWrapper>`
self.pos = self.obj.tell() # pylint: disable=W0201
#: Union[str, TextIO]: the object to manage in the context
self.obj.seek(0)
return self.obj
[docs] def __exit__(self, exc_type, exc_value, traceback):
"""Exit context.
* If :attr:`self.obj <MakeTextIO.obj>` is :obj:`str`, the
:class:`~io.StringIO` (:attr:`self.sio <MakeTextIO.sio>`) will be closed.
* If :attr:`self.obj <MakeTextIO.obj>` is a seekable *file* object,
its stream position (:attr:`self.pos <MakeTextIO.pos>`) will be recovered.
"""
if isinstance(self.obj, str):
self.sio.close()
elif self.obj.seekable():
self.obj.seek(self.pos)
[docs]def detect_linesep(code):
r"""Detect linesep of Python source code.
Args:
code (Union[str, bytes, TextIO, parso.tree.NodeOrLeaf]): the code to detect linesep
Returns:
Literal['\\n', '\\r\\n', '\\r']: the detected linesep (one of ``'\n'``, ``'\r\n'`` and ``'\r'``)
Notes:
In case of mixed linesep, try voting by the number of occurrences of each linesep value.
When there is a tie, prefer ``LF`` to ``CRLF``, prefer ``CRLF`` to ``CR``.
"""
if isinstance(code, parso.tree.NodeOrLeaf):
code = code.get_code()
if isinstance(code, bytes):
code = code.decode(detect_encoding(code))
pool = {
'CR': 0,
'CRLF': 0,
'LF': 0,
}
with MakeTextIO(code) as file: # pylint: disable=redefined-outer-name
for line in file:
if line.endswith('\r'):
pool['CR'] += 1
elif line.endswith('\r\n'):
pool['CRLF'] += 1
elif line.endswith('\n'):
pool['LF'] += 1
# when there is a tie, prefer LF to CRLF, prefer CRLF to CR
return max((pool['LF'], 3, '\n'), (pool['CRLF'], 2, '\r\n'), (pool['CR'], 1, '\r'))[2]
[docs]def detect_indentation(code):
"""Detect indentation of Python source code.
Args:
code (Union[str, bytes, TextIO, parso.tree.NodeOrLeaf]): the code to detect indentation
Returns:
str: the detected indentation sequence
Notes:
In case of mixed indentation, try voting by the number of occurrences of
each indentation value (*spaces* and *tabs*).
When there is a tie between *spaces* and *tabs*, prefer **4 spaces** for :pep:`8`.
"""
if isinstance(code, parso.tree.NodeOrLeaf):
code = code.get_code()
if isinstance(code, bytes):
code = code.decode(detect_encoding(code))
pool = {
'space': 0,
'tab': 0
}
min_spaces = None
with MakeTextIO(code) as file: # pylint: disable=redefined-outer-name
for token_info in tokenize.generate_tokens(file.readline):
if token_info.type == token.INDENT:
if '\t' in token_info.string and ' ' in token_info.string:
continue # skip indentation with mixed spaces and tabs
if '\t' in token_info.string:
pool['tab'] += 1
else:
pool['space'] += 1
if min_spaces is None:
min_spaces = len(token_info.string)
else:
min_spaces = min(min_spaces, len(token_info.string))
if pool['space'] > pool['tab']:
return ' ' * min_spaces
if pool['space'] < pool['tab']:
return '\t'
return ' ' * 4 # same number of spaces and tabs, prefer 4 spaces for PEP 8
[docs]def parso_parse(code, filename=None, *, version=None): # pylint: disable=redefined-outer-name
"""Parse Python source code with parso.
Args:
code (Union[str, bytes]): the code to be parsed
filename (str): an optional source file name to provide a context in case of error
version (str): parse the code as this version (uses the latest version by default)
Returns:
parso.python.tree.Module: parso AST
Raises:
:exc:`BPCSyntaxError`: when source code contains syntax errors
"""
grammar = parso.load_grammar(version=version if version is not None else get_parso_grammar_versions()[-1])
if isinstance(code, bytes):
code = code.decode(detect_encoding(code))
module = grammar.parse(code, error_recovery=True)
errors = grammar.iter_errors(module)
if errors:
error_messages = '\n'.join('[L%dC%d] %s' % (error.start_pos + (error.message,)) for error in errors)
raise BPCSyntaxError('source file %r contains the following syntax errors:\n' %
first_non_none(filename, '<unknown>') + error_messages)
return module
[docs]def _mp_map_wrapper(args):
"""Map wrapper function for :mod:`multiprocessing`.
Args:
args (Tuple[Callable, Iterable[Any], Mapping[str, Any]]): the function to execute,
the positional arguments and the keyword arguments packed into a tuple
Returns:
Any: the function execution result
"""
func, posargs, kwargs = args
return func(*posargs, **kwargs)
[docs]def _mp_init_lock(lock): # pragma: no cover
"""Initialize lock for :mod:`multiprocessing`.
Args:
lock (multiprocessing.synchronize.Lock): the lock to be shared among tasks
"""
global task_lock # pylint: disable=global-statement
task_lock = lock
[docs]def map_tasks(func, iterable, posargs=None, kwargs=None, *, processes=None, chunksize=None):
"""Execute tasks in parallel if :mod:`multiprocessing` is available, otherwise execute them sequentially.
Args:
func (Callable): the task function to execute
iterable (Iterable[Any]): the items to process
posargs (Optional[Iterable[Any]]): additional positional arguments to pass to ``func``
kwargs (Optional[Mapping[str, Any]]): keyword arguments to pass to ``func``
processes (Optional[int]): the number of worker processes (default: auto determine)
chunksize (Optional[int]): chunk size for multiprocessing
Returns:
List[Any]: the return values of the task function applied on the input items and additional arguments
"""
global task_lock # pylint: disable=global-statement
if posargs is None:
posargs = ()
if kwargs is None:
kwargs = {}
if not parallel_available or processes == 1: # sequential execution
return [func(item, *posargs, **kwargs) for item in iterable]
processes = processes or CPU_CNT
lock = mp.Lock()
with mp.Pool(processes=processes, initializer=_mp_init_lock, initargs=(lock,)) as pool: # parallel execution
result = pool.map(_mp_map_wrapper, [(func, (item,) + tuple(posargs), kwargs) for item in iterable], chunksize)
task_lock = nullcontext()
return result
task_lock = nullcontext()
[docs]def TaskLock():
"""Function that returns a lock for possibly concurrent tasks.
Returns:
Union[contextlib.nullcontext, multiprocessing.synchronize.Lock]: a lock for possibly concurrent tasks
"""
return task_lock
[docs]class Config(collections.abc.MutableMapping):
"""Configuration namespace.
This class is inspired from :class:`argparse.Namespace` for storing
internal attributes and/or configuration variables.
"""
def __init__(self, **kwargs):
for name, value in kwargs.items():
setattr(self, name, value)
def __contains__(self, key):
return key in self.__dict__
def __iter__(self):
return iter(self.__dict__)
def __len__(self):
return len(self.__dict__)
def __getitem__(self, key):
return self.__dict__[key]
def __setitem__(self, key, value):
self.__dict__[key] = value
def __delitem__(self, key):
del self.__dict__[key]
def __eq__(self, other):
return isinstance(other, Config) and self.__dict__ == other.__dict__
def __repr__(self):
type_name = type(self).__name__
arg_strings = []
star_args = {}
for name, value in sorted(self.__dict__.items()):
if name.isidentifier() and not keyword.iskeyword(name) and name != '__debug__':
arg_strings.append('%s=%r' % (name, value))
else: # wrap invalid names into a dict to make __repr__ round-trip
star_args[name] = value
if star_args:
arg_strings.append('**%s' % repr(star_args))
return '%s(%s)' % (type_name, ', '.join(arg_strings))
__all__ = ['get_parso_grammar_versions', 'first_truthy', 'first_non_none',
'parse_boolean_state', 'parse_linesep', 'parse_indentation', 'BPCSyntaxError', 'UUID4Generator',
'detect_files', 'archive_files', 'recover_files', 'detect_encoding', 'detect_linesep',
'detect_indentation', 'parso_parse', 'map_tasks', 'TaskLock', 'Config']