2021-11-30 14:51:24 +01:00

173 lines
4.7 KiB
Python

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
"""Signal the run-build process to dump its run file and print it"""
import dataclasses
import functools
import json
import logging
import os
import pathlib
import random
import signal
import sys
import time
import lib.litani
import lib.litani_report
import lib.pid_file
"""When run-build receives this Unix signal, it will write the run file"""
DUMP_SIGNAL = signal.SIGUSR1
_DUMPED_RUN = "dumped-run.json"
@dataclasses.dataclass
class BackoffSleeper:
jitter: float
duration: float = 0.2
multiplier: int = 2
def sleep(self):
time.sleep(self.duration)
self.duration += self.jitter
self.duration *= self.multiplier
self.jitter *= self.multiplier
class InconsistentRunError(RuntimeError):
pass
def run_consistent_to_job(run, job_id):
"""True iff the reverse-dependencies of job_id are marked as complete"""
out_to_status = {}
job_ins = []
found = False
for pipe in run["pipelines"]:
for stage in pipe["ci_stages"]:
for job in stage["jobs"]:
current_id = job["wrapper_arguments"]["job_id"]
if current_id == job_id:
found = True
job_ins = job["wrapper_arguments"]["inputs"] or []
for out in job["wrapper_arguments"]["outputs"] or []:
if out in out_to_status:
logging.warning(
"Two jobs share an output '%s'. Jobs: %s and %s",
out, out_to_status[out]["job_id"], current_id)
out_to_status[out] = {
"job_id": current_id,
"complete": job["complete"],
}
if not found:
logging.error(
"Could not find job with ID '%s' in run", job_id)
raise InconsistentRunError()
for inputt in job_ins:
if inputt not in out_to_status:
continue
if not out_to_status[inputt]["complete"]:
logging.debug(
"Run inconsistent: job '%s' is a reverse-dependency of "
"job '%s' through input '%s', but the reverse-dependency "
"job is not marked as complete.",
out_to_status[inputt]["job_id"], job_id, inputt)
raise InconsistentRunError()
return True
def get_run_checker():
# If we're being run from inside a litani job, we should check that the run
# is consistent with the job before printing out the run. Otherwise, no
# check is required, just print out the run.
parent_job_id = os.getenv(lib.litani.ENV_VAR_JOB_ID)
if parent_job_id:
return functools.partial(run_consistent_to_job, job_id=parent_job_id)
return lambda run: True
def _exit_success(run):
print(json.dumps(run, indent=2))
sys.exit(0)
def _exit_error():
print(json.dumps(None))
sys.exit(0)
def _try_dump_run(cache_dir, pid, sleeper, check_run):
os.kill(pid, DUMP_SIGNAL)
try:
with open(cache_dir / _DUMPED_RUN) as handle:
run = json.load(handle)
check_run(run)
_exit_success(run)
except (
FileNotFoundError, json.decoder.JSONDecodeError,
InconsistentRunError):
sleeper.sleep()
async def dump_run(args):
random.seed()
try:
pid = lib.pid_file.read()
except FileNotFoundError:
_exit_error()
cache_dir = lib.litani.get_cache_dir()
sleeper = BackoffSleeper(jitter=random.random())
check_run = get_run_checker()
if args.retries:
for _ in range(args.retries):
_try_dump_run(cache_dir, pid, sleeper, check_run)
else:
while True:
_try_dump_run(cache_dir, pid, sleeper, check_run)
_exit_error()
@dataclasses.dataclass
class DumpRunSignalHandler:
"""Signal handler matching the API of the argument to signal.signal()"""
cache_dir: pathlib.Path
def __call__(self, _signum, _frame):
run = lib.litani_report.get_run_data(self.cache_dir)
with lib.litani.atomic_write(
self.cache_dir / _DUMPED_RUN) as handle:
print(json.dumps(run, indent=2), file=handle)