160 lines
6.1 KiB
Python
160 lines
6.1 KiB
Python
import subprocess
|
|
import sys
|
|
import itertools
|
|
from typing import List, Dict
|
|
from colorama import Fore, Back, Style
|
|
import re
|
|
import time
|
|
|
|
# https://stackoverflow.com/a/14693789/15675011
|
|
ansi_escape = re.compile(r'''
|
|
\x1B # ESC
|
|
(?: # 7-bit C1 Fe (except CSI)
|
|
[@-Z\\-_]
|
|
| # or [ for CSI, followed by a control sequence
|
|
\[
|
|
[0-?]* # Parameter bytes
|
|
[ -/]* # Intermediate bytes
|
|
[@-~] # Final byte
|
|
)
|
|
''', re.VERBOSE)
|
|
|
|
class MatrixRunner:
|
|
def __init__(self, matrix, exclude):
|
|
self.matrix = matrix
|
|
self.exclude = exclude
|
|
self.include = self.parse_includes()
|
|
self.keys = [*matrix.keys()]
|
|
self.values = [*matrix.values()]
|
|
self.results = {} # insertion-ordered
|
|
self.failed = False
|
|
self.work = self.get_work()
|
|
|
|
self.last_matrix_config = None
|
|
self.current_matrix_config = None
|
|
|
|
def parse_includes(self) -> Dict[str, List[str]]:
|
|
includes: Dict[str, List[str]] = dict()
|
|
for arg in sys.argv:
|
|
if arg.startswith("--slice="):
|
|
rest = arg[len("--slice="):]
|
|
key, value = rest.split(":")
|
|
if key not in includes:
|
|
includes[key] = []
|
|
includes[key].append(value)
|
|
return includes
|
|
|
|
def run_command(self, *args: List[str], always_output=False, output_matcher=None) -> bool:
|
|
self.log(f"{Fore.CYAN}{Style.BRIGHT}Running Command \"{' '.join(args)}\"{Style.RESET_ALL}")
|
|
start_time = time.time()
|
|
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
stdout, stderr = p.communicate()
|
|
runtime = time.time() - start_time
|
|
self.log(Style.RESET_ALL, end="") # makefile in parallel sometimes messes up colors
|
|
if p.returncode != 0:
|
|
self.log(f"{Fore.RED}{Style.BRIGHT}Command failed{Style.RESET_ALL} {Fore.MAGENTA}(time: {runtime:.2f}s){Style.RESET_ALL}")
|
|
self.log("stdout:")
|
|
self.log(stdout.decode("utf-8"), end="")
|
|
self.log("stderr:")
|
|
self.log(stderr.decode("utf-8"), end="")
|
|
self.failed = True
|
|
return False
|
|
else:
|
|
self.log(f"{Fore.GREEN}{Style.BRIGHT}Command succeeded{Style.RESET_ALL} {Fore.MAGENTA}(time: {runtime:.2f}s){Style.RESET_ALL}")
|
|
if always_output:
|
|
self.log("stdout:")
|
|
self.log(stdout.decode("utf-8"), end="")
|
|
self.log("stderr:")
|
|
self.log(stderr.decode("utf-8"), end="")
|
|
elif len(stderr) != 0:
|
|
self.log("stderr:")
|
|
self.log(stderr.decode("utf-8"), end="")
|
|
if output_matcher is not None:
|
|
if not output_matcher(stdout.decode("utf-8")):
|
|
self.failed = True
|
|
return False
|
|
return True
|
|
|
|
def set_fail(self):
|
|
self.failed = True
|
|
|
|
def current_config(self):
|
|
return self.current_matrix_config
|
|
|
|
def last_config(self):
|
|
return self.last_matrix_config
|
|
|
|
def log(self, *args, **kwargs):
|
|
print(*args, **kwargs, flush=True)
|
|
|
|
def do_exclude(self, matrix_config, exclude):
|
|
return all(map(lambda k: matrix_config[k] == exclude[k], exclude.keys()))
|
|
|
|
def do_include(self, matrix_config, include):
|
|
if len(include) == 0:
|
|
return True
|
|
return all(map(lambda k: matrix_config[k] in include[k], include.keys()))
|
|
|
|
def assignment_to_matrix_config(self, assignment):
|
|
matrix_config = {}
|
|
for k, v in zip(self.matrix.keys(), assignment):
|
|
matrix_config[k] = v
|
|
return matrix_config
|
|
|
|
def get_work(self):
|
|
work = []
|
|
for assignment in itertools.product(*self.matrix.values()):
|
|
config = self.assignment_to_matrix_config(assignment)
|
|
if any(map(lambda ex: self.do_exclude(config, ex), self.exclude)):
|
|
continue
|
|
if not self.do_include(config, self.include):
|
|
continue
|
|
work.append(assignment)
|
|
return work
|
|
|
|
def run(self, fn):
|
|
for i, assignment in enumerate(self.work):
|
|
matrix_config = self.assignment_to_matrix_config(assignment)
|
|
config_tuple = tuple(self.values[i].index(p) for i, p in enumerate(assignment))
|
|
config_str = ', '.join(map(lambda v: str(v), matrix_config.values()))
|
|
if config_str == "":
|
|
self.log(f"{Fore.BLUE}{Style.BRIGHT}{'=' * 10} [{i + 1}/{len(self.work)}] Running with blank config {'=' * 10}{Style.RESET_ALL}")
|
|
else:
|
|
self.log(f"{Fore.BLUE}{Style.BRIGHT}{'=' * 10} [{i + 1}/{len(self.work)}] Running with config {config_str} {'=' * 10}{Style.RESET_ALL}")
|
|
self.last_matrix_config = self.current_matrix_config
|
|
self.current_matrix_config = matrix_config
|
|
self.results[config_tuple] = fn(self)
|
|
self.print_results()
|
|
if self.failed:
|
|
self.log("🔴 Some checks failed")
|
|
sys.exit(1)
|
|
else:
|
|
self.log("🟢 All checks passed")
|
|
|
|
def adj_width(self, text):
|
|
return len(text) - len(ansi_escape.sub("", text))
|
|
|
|
def print_table(self, table):
|
|
columns = len(table[0])
|
|
column_widths = [1 for _ in range(columns)]
|
|
for row in table:
|
|
for i, cell in enumerate(row):
|
|
column_widths[i] = max(column_widths[i], len(ansi_escape.sub("", cell)))
|
|
for j, cell in enumerate(table[0]):
|
|
self.log("| {cell:{width}} ".format(cell=cell, width=column_widths[j] + self.adj_width(cell)), end="")
|
|
self.log("|")
|
|
for i, row in enumerate(table[1:]):
|
|
for j, cell in enumerate(row):
|
|
self.log("| {cell:{width}} ".format(cell=cell, width=column_widths[j] + self.adj_width(cell)), end="")
|
|
self.log("|")
|
|
|
|
def print_results(self):
|
|
self.log("Results:")
|
|
table = [self.keys]
|
|
for result in self.results:
|
|
table.append([
|
|
f"{Fore.GREEN if self.results[result] else Fore.RED}{Style.BRIGHT}{self.values[i][v]}{Style.RESET_ALL}"
|
|
for i, v in enumerate(result)
|
|
])
|
|
self.print_table(table)
|