Fix: leftover python inline type hint
[lttng-tools.git] / tests / utils / lttngtest / environment.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (C) 2022 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 #
5 # SPDX-License-Identifier: GPL-2.0-only
6 #
7
8 from types import FrameType
9 from typing import Callable, Iterator, Optional, Tuple, List
10 import sys
11 import pathlib
12 import signal
13 import subprocess
14 import shlex
15 import shutil
16 import os
17 import queue
18 import tempfile
19 from . import logger
20 import time
21 import threading
22 import contextlib
23
24
25 class TemporaryDirectory:
26 def __init__(self, prefix):
27 # type: (str) -> None
28 self._directory_path = tempfile.mkdtemp(prefix=prefix)
29
30 def __del__(self):
31 shutil.rmtree(self._directory_path, ignore_errors=True)
32
33 @property
34 def path(self):
35 # type: () -> pathlib.Path
36 return pathlib.Path(self._directory_path)
37
38
39 class _SignalWaitQueue:
40 """
41 Utility class useful to wait for a signal before proceeding.
42
43 Simply register the `signal` method as the handler for the signal you are
44 interested in and call `wait_for_signal` to wait for its reception.
45
46 Registering a signal:
47 signal.signal(signal.SIGWHATEVER, queue.signal)
48
49 Waiting for the signal:
50 queue.wait_for_signal()
51 """
52
53 def __init__(self):
54 self._queue = queue.Queue() # type: queue.Queue
55
56 def signal(
57 self,
58 signal_number,
59 frame, # type: Optional[FrameType]
60 ):
61 self._queue.put_nowait(signal_number)
62
63 def wait_for_signal(self):
64 self._queue.get(block=True)
65
66
67 class WaitTraceTestApplication:
68 """
69 Create an application that waits before tracing. This allows a test to
70 launch an application, get its PID, and get it to start tracing when it
71 has completed its setup.
72 """
73
74 def __init__(
75 self,
76 binary_path, # type: pathlib.Path
77 event_count, # type: int
78 environment, # type: Environment
79 wait_time_between_events_us=0, # type: int
80 ):
81 self._environment = environment # type: Environment
82 if event_count % 5:
83 # The test application currently produces 5 different events per iteration.
84 raise ValueError("event count must be a multiple of 5")
85 self._iteration_count = int(event_count / 5) # type: int
86 # File that the application will wait to see before tracing its events.
87 self._app_start_tracing_file_path = pathlib.Path(
88 tempfile.mktemp(
89 prefix="app_",
90 suffix="_start_tracing",
91 dir=self._compat_open_path(environment.lttng_home_location),
92 )
93 )
94 self._has_returned = False
95
96 test_app_env = os.environ.copy()
97 test_app_env["LTTNG_HOME"] = str(environment.lttng_home_location)
98 # Make sure the app is blocked until it is properly registered to
99 # the session daemon.
100 test_app_env["LTTNG_UST_REGISTER_TIMEOUT"] = "-1"
101
102 # File that the application will create to indicate it has completed its initialization.
103 app_ready_file_path = tempfile.mktemp(
104 prefix="app_",
105 suffix="_ready",
106 dir=self._compat_open_path(environment.lttng_home_location),
107 ) # type: str
108
109 test_app_args = [str(binary_path)]
110 test_app_args.extend(
111 shlex.split(
112 "--iter {iteration_count} --create-in-main {app_ready_file_path} --wait-before-first-event {app_start_tracing_file_path} --wait {wait_time_between_events_us}".format(
113 iteration_count=self._iteration_count,
114 app_ready_file_path=app_ready_file_path,
115 app_start_tracing_file_path=self._app_start_tracing_file_path,
116 wait_time_between_events_us=wait_time_between_events_us,
117 )
118 )
119 )
120
121 self._process = subprocess.Popen(
122 test_app_args,
123 env=test_app_env,
124 ) # type: subprocess.Popen
125
126 # Wait for the application to create the file indicating it has fully
127 # initialized. Make sure the app hasn't crashed in order to not wait
128 # forever.
129 while True:
130 if os.path.exists(app_ready_file_path):
131 break
132
133 if self._process.poll() is not None:
134 # Application has unexepectedly returned.
135 raise RuntimeError(
136 "Test application has unexepectedly returned during its initialization with return code `{return_code}`".format(
137 return_code=self._process.returncode
138 )
139 )
140
141 time.sleep(0.1)
142
143 def trace(self):
144 # type: () -> None
145 if self._process.poll() is not None:
146 # Application has unexepectedly returned.
147 raise RuntimeError(
148 "Test application has unexepectedly before tracing with return code `{return_code}`".format(
149 return_code=self._process.returncode
150 )
151 )
152 open(self._compat_open_path(self._app_start_tracing_file_path), mode="x")
153
154 def wait_for_exit(self):
155 # type: () -> None
156 if self._process.wait() != 0:
157 raise RuntimeError(
158 "Test application has exit with return code `{return_code}`".format(
159 return_code=self._process.returncode
160 )
161 )
162 self._has_returned = True
163
164 @property
165 def vpid(self):
166 # type: () -> int
167 return self._process.pid
168
169 @staticmethod
170 def _compat_open_path(path):
171 # type: (pathlib.Path) -> pathlib.Path | str
172 """
173 The builtin open() in python >= 3.6 expects a path-like object while
174 prior versions expect a string or bytes object. Return the correct type
175 based on the presence of the "__fspath__" attribute specified in PEP-519.
176 """
177 if hasattr(path, "__fspath__"):
178 return path
179 else:
180 return str(path)
181
182 def __del__(self):
183 if not self._has_returned:
184 # This is potentially racy if the pid has been recycled. However,
185 # we can't use pidfd_open since it is only available in python >= 3.9.
186 self._process.kill()
187 self._process.wait()
188
189
190 class ProcessOutputConsumer(threading.Thread, logger._Logger):
191 def __init__(
192 self,
193 process, # type: subprocess.Popen
194 name, # type: str
195 log, # type: Callable[[str], None]
196 ):
197 threading.Thread.__init__(self)
198 self._prefix = name
199 logger._Logger.__init__(self, log)
200 self._process = process
201
202 def run(self):
203 # type: () -> None
204 while self._process.poll() is None:
205 assert self._process.stdout
206 line = self._process.stdout.readline().decode("utf-8").replace("\n", "")
207 if len(line) != 0:
208 self._log("{prefix}: {line}".format(prefix=self._prefix, line=line))
209
210
211 # Generate a temporary environment in which to execute a test.
212 class _Environment(logger._Logger):
213 def __init__(
214 self,
215 with_sessiond, # type: bool
216 log=None, # type: Optional[Callable[[str], None]]
217 ):
218 super().__init__(log)
219 signal.signal(signal.SIGTERM, self._handle_termination_signal)
220 signal.signal(signal.SIGINT, self._handle_termination_signal)
221
222 # Assumes the project's hierarchy to this file is:
223 # tests/utils/python/this_file
224 self._project_root = (
225 pathlib.Path(__file__).absolute().parents[3]
226 ) # type: pathlib.Path
227 self._lttng_home = TemporaryDirectory(
228 "lttng_test_env_home"
229 ) # type: Optional[TemporaryDirectory]
230
231 self._sessiond = (
232 self._launch_lttng_sessiond() if with_sessiond else None
233 ) # type: Optional[subprocess.Popen[bytes]]
234
235 @property
236 def lttng_home_location(self):
237 # type: () -> pathlib.Path
238 if self._lttng_home is None:
239 raise RuntimeError("Attempt to access LTTng home after clean-up")
240 return self._lttng_home.path
241
242 @property
243 def lttng_client_path(self):
244 # type: () -> pathlib.Path
245 return self._project_root / "src" / "bin" / "lttng" / "lttng"
246
247 def create_temporary_directory(self, prefix=None):
248 # type: (Optional[str]) -> pathlib.Path
249 # Simply return a path that is contained within LTTNG_HOME; it will
250 # be destroyed when the temporary home goes out of scope.
251 assert self._lttng_home
252 return pathlib.Path(
253 tempfile.mkdtemp(
254 prefix="tmp" if prefix is None else prefix,
255 dir=str(self._lttng_home.path),
256 )
257 )
258
259 # Unpack a list of environment variables from a string
260 # such as "HELLO=is_it ME='/you/are/looking/for'"
261 @staticmethod
262 def _unpack_env_vars(env_vars_string):
263 # type: (str) -> List[Tuple[str, str]]
264 unpacked_vars = []
265 for var in shlex.split(env_vars_string):
266 equal_position = var.find("=")
267 # Must have an equal sign and not end with an equal sign
268 if equal_position == -1 or equal_position == len(var) - 1:
269 raise ValueError(
270 "Invalid sessiond environment variable: `{}`".format(var)
271 )
272
273 var_name = var[0:equal_position]
274 var_value = var[equal_position + 1 :]
275 # Unquote any paths
276 var_value = var_value.replace("'", "")
277 var_value = var_value.replace('"', "")
278 unpacked_vars.append((var_name, var_value))
279
280 return unpacked_vars
281
282 def _launch_lttng_sessiond(self):
283 # type: () -> Optional[subprocess.Popen]
284 is_64bits_host = sys.maxsize > 2**32
285
286 sessiond_path = (
287 self._project_root / "src" / "bin" / "lttng-sessiond" / "lttng-sessiond"
288 )
289 consumerd_path_option_name = "--consumerd{bitness}-path".format(
290 bitness="64" if is_64bits_host else "32"
291 )
292 consumerd_path = (
293 self._project_root / "src" / "bin" / "lttng-consumerd" / "lttng-consumerd"
294 )
295
296 no_sessiond_var = os.environ.get("TEST_NO_SESSIOND")
297 if no_sessiond_var and no_sessiond_var == "1":
298 # Run test without a session daemon; the user probably
299 # intends to run one under gdb for example.
300 return None
301
302 # Setup the session daemon's environment
303 sessiond_env_vars = os.environ.get("LTTNG_SESSIOND_ENV_VARS")
304 sessiond_env = os.environ.copy()
305 if sessiond_env_vars:
306 self._log("Additional lttng-sessiond environment variables:")
307 additional_vars = self._unpack_env_vars(sessiond_env_vars)
308 for var_name, var_value in additional_vars:
309 self._log(" {name}={value}".format(name=var_name, value=var_value))
310 sessiond_env[var_name] = var_value
311
312 sessiond_env["LTTNG_SESSION_CONFIG_XSD_PATH"] = str(
313 self._project_root / "src" / "common"
314 )
315
316 assert self._lttng_home is not None
317 sessiond_env["LTTNG_HOME"] = str(self._lttng_home.path)
318
319 wait_queue = _SignalWaitQueue()
320 signal.signal(signal.SIGUSR1, wait_queue.signal)
321
322 self._log(
323 "Launching session daemon with LTTNG_HOME=`{home_dir}`".format(
324 home_dir=str(self._lttng_home.path)
325 )
326 )
327 process = subprocess.Popen(
328 [
329 str(sessiond_path),
330 consumerd_path_option_name,
331 str(consumerd_path),
332 "--sig-parent",
333 ],
334 stdout=subprocess.PIPE,
335 stderr=subprocess.STDOUT,
336 env=sessiond_env,
337 )
338
339 if self._logging_function:
340 self._sessiond_output_consumer = ProcessOutputConsumer(
341 process, "lttng-sessiond", self._logging_function
342 ) # type: Optional[ProcessOutputConsumer]
343 self._sessiond_output_consumer.daemon = True
344 self._sessiond_output_consumer.start()
345
346 # Wait for SIGUSR1, indicating the sessiond is ready to proceed
347 wait_queue.wait_for_signal()
348 signal.signal(signal.SIGUSR1, wait_queue.signal)
349
350 return process
351
352 def _handle_termination_signal(self, signal_number, frame):
353 # type: (int, Optional[FrameType]) -> None
354 self._log(
355 "Killed by {signal_name} signal, cleaning-up".format(
356 signal_name=signal.strsignal(signal_number)
357 )
358 )
359 self._cleanup()
360
361 def launch_wait_trace_test_application(self, event_count):
362 # type: (int) -> WaitTraceTestApplication
363 """
364 Launch an application that will wait before tracing `event_count` events.
365 """
366 return WaitTraceTestApplication(
367 self._project_root
368 / "tests"
369 / "utils"
370 / "testapp"
371 / "gen-ust-nevents"
372 / "gen-ust-nevents",
373 event_count,
374 self,
375 )
376
377 # Clean-up managed processes
378 def _cleanup(self):
379 # type: () -> None
380 if self._sessiond and self._sessiond.poll() is None:
381 # The session daemon is alive; kill it.
382 self._log(
383 "Killing session daemon (pid = {sessiond_pid})".format(
384 sessiond_pid=self._sessiond.pid
385 )
386 )
387
388 self._sessiond.terminate()
389 self._sessiond.wait()
390 if self._sessiond_output_consumer:
391 self._sessiond_output_consumer.join()
392 self._sessiond_output_consumer = None
393
394 self._log("Session daemon killed")
395 self._sessiond = None
396
397 self._lttng_home = None
398
399 def __del__(self):
400 self._cleanup()
401
402
403 @contextlib.contextmanager
404 def test_environment(with_sessiond, log=None):
405 # type: (bool, Optional[Callable[[str], None]]) -> Iterator[_Environment]
406 env = _Environment(with_sessiond, log)
407 try:
408 yield env
409 finally:
410 env._cleanup()
This page took 0.042275 seconds and 4 git commands to generate.