tests: Automatically time TAP tests
[lttng-tools.git] / tests / utils / lttngtest / tap_generator.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (C) 2022 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 #
5 # SPDX-License-Identifier: GPL-2.0-only
6 #
7
8 import contextlib
9 import os
10 import sys
11 import time
12 from typing import Iterator, Optional
13
14
15 class InvalidTestPlan(RuntimeError):
16 def __init__(self, msg):
17 # type: (str) -> None
18 super().__init__(msg)
19
20
21 class BailOut(RuntimeError):
22 def __init__(self, msg):
23 # type: (str) -> None
24 super().__init__(msg)
25
26
27 class TestCase:
28 def __init__(
29 self,
30 tap_generator, # type: "TapGenerator"
31 description, # type: str
32 ):
33 self._tap_generator = tap_generator # type: "TapGenerator"
34 self._result = None # type: Optional[bool]
35 self._description = description # type: str
36
37 @property
38 def result(self):
39 # type: () -> Optional[bool]
40 return self._result
41
42 @property
43 def description(self):
44 # type: () -> str
45 return self._description
46
47 def _set_result(self, result):
48 # type: (bool) -> None
49 if self._result is not None:
50 raise RuntimeError("Can't set test case result twice")
51
52 self._result = result
53 self._tap_generator.test(result, self._description)
54
55 def success(self):
56 # type: () -> None
57 self._set_result(True)
58
59 def fail(self):
60 # type: () -> None
61 self._set_result(False)
62
63
64 # Produces a test execution report in the TAP format.
65 class TapGenerator:
66 def __init__(self, total_test_count):
67 # type: (int) -> None
68 if total_test_count <= 0:
69 raise ValueError("Test count must be greater than zero")
70
71 self._total_test_count = total_test_count # type: int
72 self._last_test_case_id = 0 # type: int
73 self._printed_plan = False # type: bool
74 self._has_failure = False # type: bool
75 self._time_tests = True # type: bool
76 if os.getenv("TAP_AUTOTIME", "1") == "" or os.getenv("TAP_AUTOTIME", "1") == "0":
77 self._time_tests = False
78 self._last_time = time.monotonic_ns()
79
80 def __del__(self):
81 if self.remaining_test_cases > 0:
82 self.bail_out(
83 "Missing {remaining_test_cases} test cases".format(
84 remaining_test_cases=self.remaining_test_cases
85 )
86 )
87
88 @property
89 def remaining_test_cases(self):
90 # type: () -> int
91 return self._total_test_count - self._last_test_case_id
92
93 def _print(self, msg):
94 # type: (str) -> None
95 if not self._printed_plan:
96 print(
97 "1..{total_test_count}".format(total_test_count=self._total_test_count),
98 flush=True,
99 )
100 self._printed_plan = True
101
102 print(msg, flush=True)
103
104 def skip_all(self, reason):
105 # type: (str) -> None
106 if self._last_test_case_id != 0:
107 raise RuntimeError("Can't skip all tests after running test cases")
108
109 if reason:
110 self._print("1..0 # Skip all: {reason}".format(reason=reason))
111
112 self._last_test_case_id = self._total_test_count
113
114 def skip(self, reason, skip_count=1):
115 # type: (str, int) -> None
116 for i in range(skip_count):
117 self._last_test_case_id = self._last_test_case_id + 1
118 self._print(
119 "ok {test_number} # Skip: {reason}".format(
120 reason=reason, test_number=(i + self._last_test_case_id)
121 )
122 )
123
124 def bail_out(self, reason):
125 # type: (str) -> None
126 self._print("Bail out! {reason}".format(reason=reason))
127 self._last_test_case_id = self._total_test_count
128 raise BailOut(reason)
129
130 def test(self, result, description):
131 # type: (bool, str) -> None
132 duration = (time.monotonic_ns() - self._last_time) / 1_000_000
133 if self._last_test_case_id == self._total_test_count:
134 raise InvalidTestPlan("Executing too many tests")
135
136 if result is False:
137 self._has_failure = True
138
139 result_string = "ok" if result else "not ok"
140 self._last_test_case_id = self._last_test_case_id + 1
141 self._print(
142 "{result_string} {case_id} - {description}".format(
143 result_string=result_string,
144 case_id=self._last_test_case_id,
145 description=description,
146 )
147 )
148 if self._time_tests:
149 self._print("---\n duration_ms: {}\n...\n".format(duration))
150 self._last_time = time.monotonic_ns()
151
152 def ok(self, description):
153 # type: (str) -> None
154 self.test(True, description)
155
156 def fail(self, description):
157 # type: (str) -> None
158 self.test(False, description)
159
160 @property
161 def is_successful(self):
162 # type: () -> bool
163 return (
164 self._last_test_case_id == self._total_test_count and not self._has_failure
165 )
166
167 @contextlib.contextmanager
168 def case(self, description):
169 # type: (str) -> Iterator[TestCase]
170 test_case = TestCase(self, description)
171 try:
172 yield test_case
173 except Exception as e:
174 self.diagnostic(
175 "Exception `{exception_type}` thrown during test case `{description}`, marking as failure.".format(
176 description=test_case.description, exception_type=type(e).__name__
177 )
178 )
179
180 if str(e) != "":
181 self.diagnostic(str(e))
182
183 test_case.fail()
184 finally:
185 if test_case.result is None:
186 test_case.success()
187
188 def diagnostic(self, msg):
189 # type: (str) -> None
190 print("# {msg}".format(msg=msg), file=sys.stderr, flush=True)
This page took 0.032153 seconds and 4 git commands to generate.