163 lines
4.8 KiB
Python
163 lines
4.8 KiB
Python
|
import os
|
||
|
import sys
|
||
|
import time
|
||
|
|
||
|
from subprocess import CalledProcessError, check_call
|
||
|
from typing import List, IO, Optional, Tuple
|
||
|
|
||
|
|
||
|
def which(command: str, paths: Optional[str] = None) -> Optional[str]:
|
||
|
"""which(command, [paths]) - Look up the given command in the paths string
|
||
|
(or the PATH environment variable, if unspecified)."""
|
||
|
|
||
|
if paths is None:
|
||
|
paths = os.environ.get('PATH', '')
|
||
|
|
||
|
# Check for absolute match first.
|
||
|
if os.path.exists(command):
|
||
|
return command
|
||
|
|
||
|
# Would be nice if Python had a lib function for this.
|
||
|
if not paths:
|
||
|
paths = os.defpath
|
||
|
|
||
|
# Get suffixes to search.
|
||
|
# On Cygwin, 'PATHEXT' may exist but it should not be used.
|
||
|
if os.pathsep == ';':
|
||
|
pathext = os.environ.get('PATHEXT', '').split(';')
|
||
|
else:
|
||
|
pathext = ['']
|
||
|
|
||
|
# Search the paths...
|
||
|
for path in paths.split(os.pathsep):
|
||
|
for ext in pathext:
|
||
|
p = os.path.join(path, command + ext)
|
||
|
if os.path.exists(p):
|
||
|
return p
|
||
|
|
||
|
return None
|
||
|
|
||
|
|
||
|
def has_no_extension(file_name: str) -> bool:
|
||
|
root, ext = os.path.splitext(file_name)
|
||
|
return ext == ""
|
||
|
|
||
|
|
||
|
def is_valid_single_input_file(file_name: str) -> bool:
|
||
|
root, ext = os.path.splitext(file_name)
|
||
|
return ext in (".i", ".ii", ".c", ".cpp", ".m", "")
|
||
|
|
||
|
|
||
|
def time_to_str(time: float) -> str:
|
||
|
"""
|
||
|
Convert given time in seconds into a human-readable string.
|
||
|
"""
|
||
|
return f"{time:.2f}s"
|
||
|
|
||
|
|
||
|
def memory_to_str(memory: int) -> str:
|
||
|
"""
|
||
|
Convert given number of bytes into a human-readable string.
|
||
|
"""
|
||
|
if memory:
|
||
|
try:
|
||
|
import humanize
|
||
|
return humanize.naturalsize(memory, gnu=True)
|
||
|
except ImportError:
|
||
|
# no formatter installed, let's keep it in bytes
|
||
|
return f"{memory}B"
|
||
|
|
||
|
# If memory is 0, we didn't succeed measuring it.
|
||
|
return "N/A"
|
||
|
|
||
|
|
||
|
def check_and_measure_call(*popenargs, **kwargs) -> Tuple[float, int]:
|
||
|
"""
|
||
|
Run command with arguments. Wait for command to complete and measure
|
||
|
execution time and peak memory consumption.
|
||
|
If the exit code was zero then return, otherwise raise
|
||
|
CalledProcessError. The CalledProcessError object will have the
|
||
|
return code in the returncode attribute.
|
||
|
|
||
|
The arguments are the same as for the call and check_call functions.
|
||
|
|
||
|
Return a tuple of execution time and peak memory.
|
||
|
"""
|
||
|
peak_mem = 0
|
||
|
start_time = time.time()
|
||
|
|
||
|
try:
|
||
|
import psutil as ps
|
||
|
|
||
|
def get_memory(process: ps.Process) -> int:
|
||
|
mem = 0
|
||
|
|
||
|
# we want to gather memory usage from all of the child processes
|
||
|
descendants = list(process.children(recursive=True))
|
||
|
descendants.append(process)
|
||
|
|
||
|
for subprocess in descendants:
|
||
|
try:
|
||
|
mem += subprocess.memory_info().rss
|
||
|
except (ps.NoSuchProcess, ps.AccessDenied):
|
||
|
continue
|
||
|
|
||
|
return mem
|
||
|
|
||
|
with ps.Popen(*popenargs, **kwargs) as process:
|
||
|
# while the process is running calculate resource utilization.
|
||
|
while (process.is_running() and
|
||
|
process.status() != ps.STATUS_ZOMBIE):
|
||
|
# track the peak utilization of the process
|
||
|
peak_mem = max(peak_mem, get_memory(process))
|
||
|
time.sleep(.5)
|
||
|
|
||
|
if process.is_running():
|
||
|
process.kill()
|
||
|
|
||
|
if process.returncode != 0:
|
||
|
cmd = kwargs.get("args")
|
||
|
if cmd is None:
|
||
|
cmd = popenargs[0]
|
||
|
raise CalledProcessError(process.returncode, cmd)
|
||
|
|
||
|
except ImportError:
|
||
|
# back off to subprocess if we don't have psutil installed
|
||
|
peak_mem = 0
|
||
|
check_call(*popenargs, **kwargs)
|
||
|
|
||
|
return time.time() - start_time, peak_mem
|
||
|
|
||
|
|
||
|
def run_script(script_path: str, build_log_file: IO, cwd: str,
|
||
|
out=sys.stdout, err=sys.stderr, verbose: int = 0):
|
||
|
"""
|
||
|
Run the provided script if it exists.
|
||
|
"""
|
||
|
if os.path.exists(script_path):
|
||
|
try:
|
||
|
if verbose == 1:
|
||
|
out.write(f" Executing: {script_path}\n")
|
||
|
|
||
|
check_call(f"chmod +x '{script_path}'", cwd=cwd,
|
||
|
stderr=build_log_file,
|
||
|
stdout=build_log_file,
|
||
|
shell=True)
|
||
|
|
||
|
check_call(f"'{script_path}'", cwd=cwd,
|
||
|
stderr=build_log_file,
|
||
|
stdout=build_log_file,
|
||
|
shell=True)
|
||
|
|
||
|
except CalledProcessError:
|
||
|
err.write(f"Error: Running {script_path} failed. "
|
||
|
f"See {build_log_file.name} for details.\n")
|
||
|
sys.exit(-1)
|
||
|
|
||
|
|
||
|
def is_comment_csv_line(entries: List[str]) -> bool:
|
||
|
"""
|
||
|
Treat CSV lines starting with a '#' as a comment.
|
||
|
"""
|
||
|
return len(entries) > 0 and entries[0].startswith("#")
|