173 lines
4.7 KiB
Python
173 lines
4.7 KiB
Python
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License").
|
|
# You may not use this file except in compliance with the License.
|
|
# A copy of the License is located at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# or in the "license" file accompanying this file. This file is distributed
|
|
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
|
# express or implied. See the License for the specific language governing
|
|
# permissions and limitations under the License.
|
|
|
|
|
|
"""Signal the run-build process to dump its run file and print it"""
|
|
|
|
|
|
import dataclasses
|
|
import functools
|
|
import json
|
|
import logging
|
|
import os
|
|
import pathlib
|
|
import random
|
|
import signal
|
|
import sys
|
|
import time
|
|
|
|
import lib.litani
|
|
import lib.litani_report
|
|
import lib.pid_file
|
|
|
|
|
|
"""When run-build receives this Unix signal, it will write the run file"""
|
|
DUMP_SIGNAL = signal.SIGUSR1
|
|
_DUMPED_RUN = "dumped-run.json"
|
|
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class BackoffSleeper:
|
|
jitter: float
|
|
duration: float = 0.2
|
|
multiplier: int = 2
|
|
|
|
|
|
def sleep(self):
|
|
time.sleep(self.duration)
|
|
|
|
self.duration += self.jitter
|
|
self.duration *= self.multiplier
|
|
self.jitter *= self.multiplier
|
|
|
|
|
|
|
|
class InconsistentRunError(RuntimeError):
|
|
pass
|
|
|
|
|
|
|
|
def run_consistent_to_job(run, job_id):
|
|
"""True iff the reverse-dependencies of job_id are marked as complete"""
|
|
|
|
out_to_status = {}
|
|
job_ins = []
|
|
found = False
|
|
|
|
for pipe in run["pipelines"]:
|
|
for stage in pipe["ci_stages"]:
|
|
for job in stage["jobs"]:
|
|
current_id = job["wrapper_arguments"]["job_id"]
|
|
if current_id == job_id:
|
|
found = True
|
|
job_ins = job["wrapper_arguments"]["inputs"] or []
|
|
|
|
for out in job["wrapper_arguments"]["outputs"] or []:
|
|
if out in out_to_status:
|
|
logging.warning(
|
|
"Two jobs share an output '%s'. Jobs: %s and %s",
|
|
out, out_to_status[out]["job_id"], current_id)
|
|
out_to_status[out] = {
|
|
"job_id": current_id,
|
|
"complete": job["complete"],
|
|
}
|
|
if not found:
|
|
logging.error(
|
|
"Could not find job with ID '%s' in run", job_id)
|
|
raise InconsistentRunError()
|
|
|
|
for inputt in job_ins:
|
|
if inputt not in out_to_status:
|
|
continue
|
|
if not out_to_status[inputt]["complete"]:
|
|
logging.debug(
|
|
"Run inconsistent: job '%s' is a reverse-dependency of "
|
|
"job '%s' through input '%s', but the reverse-dependency "
|
|
"job is not marked as complete.",
|
|
out_to_status[inputt]["job_id"], job_id, inputt)
|
|
raise InconsistentRunError()
|
|
return True
|
|
|
|
|
|
def get_run_checker():
|
|
# If we're being run from inside a litani job, we should check that the run
|
|
# is consistent with the job before printing out the run. Otherwise, no
|
|
# check is required, just print out the run.
|
|
|
|
parent_job_id = os.getenv(lib.litani.ENV_VAR_JOB_ID)
|
|
if parent_job_id:
|
|
return functools.partial(run_consistent_to_job, job_id=parent_job_id)
|
|
return lambda run: True
|
|
|
|
|
|
|
|
def _exit_success(run):
|
|
print(json.dumps(run, indent=2))
|
|
sys.exit(0)
|
|
|
|
|
|
def _exit_error():
|
|
print(json.dumps(None))
|
|
sys.exit(0)
|
|
|
|
|
|
def _try_dump_run(cache_dir, pid, sleeper, check_run):
|
|
os.kill(pid, DUMP_SIGNAL)
|
|
try:
|
|
with open(cache_dir / _DUMPED_RUN) as handle:
|
|
run = json.load(handle)
|
|
check_run(run)
|
|
_exit_success(run)
|
|
except (
|
|
FileNotFoundError, json.decoder.JSONDecodeError,
|
|
InconsistentRunError):
|
|
sleeper.sleep()
|
|
|
|
|
|
async def dump_run(args):
|
|
random.seed()
|
|
|
|
try:
|
|
pid = lib.pid_file.read()
|
|
except FileNotFoundError:
|
|
_exit_error()
|
|
|
|
cache_dir = lib.litani.get_cache_dir()
|
|
|
|
sleeper = BackoffSleeper(jitter=random.random())
|
|
check_run = get_run_checker()
|
|
|
|
if args.retries:
|
|
for _ in range(args.retries):
|
|
_try_dump_run(cache_dir, pid, sleeper, check_run)
|
|
else:
|
|
while True:
|
|
_try_dump_run(cache_dir, pid, sleeper, check_run)
|
|
_exit_error()
|
|
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class DumpRunSignalHandler:
|
|
"""Signal handler matching the API of the argument to signal.signal()"""
|
|
|
|
cache_dir: pathlib.Path
|
|
|
|
|
|
def __call__(self, _signum, _frame):
|
|
run = lib.litani_report.get_run_data(self.cache_dir)
|
|
with lib.litani.atomic_write(
|
|
self.cache_dir / _DUMPED_RUN) as handle:
|
|
print(json.dumps(run, indent=2), file=handle)
|