Commit | Line | Data |
---|---|---|
ef945e4d JG |
1 | #!/usr/bin/env python3 |
2 | # | |
3 | # Copyright (C) 2022 Jérémie Galarneau <jeremie.galarneau@efficios.com> | |
4 | # | |
5 | # SPDX-License-Identifier: GPL-2.0-only | |
6 | # | |
7 | ||
8 | import contextlib | |
9 | import sys | |
10 | from typing import Optional | |
11 | ||
12 | ||
13 | class InvalidTestPlan(RuntimeError): | |
14 | def __init__(self, msg: str): | |
15 | super().__init__(msg) | |
16 | ||
17 | ||
18 | class BailOut(RuntimeError): | |
19 | def __init__(self, msg: str): | |
20 | super().__init__(msg) | |
21 | ||
22 | ||
23 | class TestCase: | |
24 | def __init__(self, tap_generator: "TapGenerator", description: str): | |
25 | self._tap_generator = tap_generator | |
26 | self._result: Optional[bool] = None | |
27 | self._description = description | |
28 | ||
29 | @property | |
30 | def result(self) -> Optional[bool]: | |
31 | return self._result | |
32 | ||
33 | @property | |
34 | def description(self) -> str: | |
35 | return self._description | |
36 | ||
37 | def _set_result(self, result: bool) -> None: | |
38 | if self._result is not None: | |
39 | raise RuntimeError("Can't set test case result twice") | |
40 | ||
41 | self._result = result | |
42 | self._tap_generator.test(result, self._description) | |
43 | ||
44 | def success(self) -> None: | |
45 | self._set_result(True) | |
46 | ||
47 | def fail(self) -> None: | |
48 | self._set_result(False) | |
49 | ||
50 | ||
51 | # Produces a test execution report in the TAP format. | |
52 | class TapGenerator: | |
53 | def __init__(self, total_test_count: int): | |
54 | if total_test_count <= 0: | |
55 | raise ValueError("Test count must be greater than zero") | |
56 | ||
57 | self._total_test_count: int = total_test_count | |
58 | self._last_test_case_id: int = 0 | |
59 | self._printed_plan: bool = False | |
60 | self._has_failure: bool = False | |
61 | ||
62 | def __del__(self): | |
63 | if self.remaining_test_cases > 0: | |
64 | self.bail_out( | |
65 | "Missing {remaining_test_cases} test cases".format( | |
66 | remaining_test_cases=self.remaining_test_cases | |
67 | ) | |
68 | ) | |
69 | ||
70 | @property | |
71 | def remaining_test_cases(self) -> int: | |
72 | return self._total_test_count - self._last_test_case_id | |
73 | ||
74 | def _print(self, msg: str) -> None: | |
75 | if not self._printed_plan: | |
76 | print( | |
77 | "1..{total_test_count}".format(total_test_count=self._total_test_count), | |
78 | flush=True, | |
79 | ) | |
80 | self._printed_plan = True | |
81 | ||
82 | print(msg, flush=True) | |
83 | ||
84 | def skip_all(self, reason) -> None: | |
85 | if self._last_test_case_id != 0: | |
86 | raise RuntimeError("Can't skip all tests after running test cases") | |
87 | ||
88 | if reason: | |
89 | self._print("1..0 # Skip all: {reason}".format(reason=reason)) | |
90 | ||
91 | self._last_test_case_id = self._total_test_count | |
92 | ||
93 | def skip(self, reason, skip_count: int = 1) -> None: | |
94 | for i in range(skip_count): | |
95 | self._last_test_case_id = self._last_test_case_id + 1 | |
96 | self._print( | |
97 | "ok {test_number} # Skip: {reason}".format( | |
98 | reason=reason, test_number=(i + self._last_test_case_id) | |
99 | ) | |
100 | ) | |
101 | ||
102 | def bail_out(self, reason: str) -> None: | |
103 | self._print("Bail out! {reason}".format(reason=reason)) | |
104 | self._last_test_case_id = self._total_test_count | |
105 | raise BailOut(reason) | |
106 | ||
107 | def test(self, result: bool, description: str) -> None: | |
108 | if self._last_test_case_id == self._total_test_count: | |
109 | raise InvalidTestPlan("Executing too many tests") | |
110 | ||
111 | if result is False: | |
112 | self._has_failure = True | |
113 | ||
114 | result_string = "ok" if result else "not ok" | |
115 | self._last_test_case_id = self._last_test_case_id + 1 | |
116 | self._print( | |
117 | "{result_string} {case_id} - {description}".format( | |
118 | result_string=result_string, | |
119 | case_id=self._last_test_case_id, | |
120 | description=description, | |
121 | ) | |
122 | ) | |
123 | ||
124 | def ok(self, description: str) -> None: | |
125 | self.test(True, description) | |
126 | ||
127 | def fail(self, description: str) -> None: | |
128 | self.test(False, description) | |
129 | ||
130 | @property | |
131 | def is_successful(self) -> bool: | |
132 | return ( | |
133 | self._last_test_case_id == self._total_test_count and not self._has_failure | |
134 | ) | |
135 | ||
136 | @contextlib.contextmanager | |
137 | def case(self, description: str): | |
138 | test_case = TestCase(self, description) | |
139 | try: | |
140 | yield test_case | |
141 | except Exception as e: | |
142 | self.diagnostic( | |
143 | "Exception `{exception_type}` thrown during test case `{description}`, marking as failure.".format( | |
144 | description=test_case.description, exception_type=type(e).__name__ | |
145 | ) | |
146 | ) | |
147 | ||
148 | if str(e) != "": | |
149 | self.diagnostic(str(e)) | |
150 | ||
151 | test_case.fail() | |
152 | finally: | |
153 | if test_case.result is None: | |
154 | test_case.success() | |
155 | ||
156 | def diagnostic(self, msg) -> None: | |
157 | print("# {msg}".format(msg=msg), file=sys.stderr, flush=True) |