#
from types import FrameType
-from typing import Callable, Optional, Tuple, List
+from typing import Callable, Iterator, Optional, Tuple, List, Generator
import sys
import pathlib
import signal
class TemporaryDirectory:
- def __init__(self, prefix: str):
+ def __init__(self, prefix):
+ # type: (str) -> None
self._directory_path = tempfile.mkdtemp(prefix=prefix)
def __del__(self):
shutil.rmtree(self._directory_path, ignore_errors=True)
@property
- def path(self) -> pathlib.Path:
+ def path(self):
+ # type: () -> pathlib.Path
return pathlib.Path(self._directory_path)
"""
def __init__(self):
- self._queue: queue.Queue = queue.Queue()
+ self._queue = queue.Queue() # type: queue.Queue
- def signal(self, signal_number, frame: Optional[FrameType]):
+ def signal(
+ self,
+ signal_number,
+ frame, # type: Optional[FrameType]
+ ):
self._queue.put_nowait(signal_number)
def wait_for_signal(self):
self._queue.get(block=True)
-
-class WaitTraceTestApplication:
+ @contextlib.contextmanager
+ def intercept_signal(self, signal_number):
+ # type: (int) -> Generator[None, None, None]
+ original_handler = signal.getsignal(signal_number)
+ signal.signal(signal_number, self.signal)
+ try:
+ yield
+ except:
+ # Restore the original signal handler and forward the exception.
+ raise
+ finally:
+ signal.signal(signal_number, original_handler)
+
+
+class _WaitTraceTestApplication:
"""
Create an application that waits before tracing. This allows a test to
launch an application, get its PID, and get it to start tracing when it
def __init__(
self,
- binary_path: pathlib.Path,
- event_count: int,
- environment: "Environment",
- wait_time_between_events_us: int = 0,
+ binary_path, # type: pathlib.Path
+ event_count, # type: int
+ environment, # type: Environment
+ wait_time_between_events_us=0, # type: int
+ wait_before_exit=False, # type: bool
+ wait_before_exit_file_path=None, # type: Optional[pathlib.Path]
):
- self._environment: Environment = environment
- if event_count % 5:
- # The test application currently produces 5 different events per iteration.
- raise ValueError("event count must be a multiple of 5")
- self._iteration_count: int = int(event_count / 5)
+ self._process = None
+ self._environment = environment # type: Environment
+ self._iteration_count = event_count
# File that the application will wait to see before tracing its events.
- self._app_start_tracing_file_path: pathlib.Path = pathlib.Path(
+ self._app_start_tracing_file_path = pathlib.Path(
tempfile.mktemp(
prefix="app_",
suffix="_start_tracing",
- dir=self._compat_open_path(environment.lttng_home_location),
+ dir=self._compat_pathlike(environment.lttng_home_location),
+ )
+ )
+ # File that the application will create when all events have been emitted.
+ self._app_tracing_done_file_path = pathlib.Path(
+ tempfile.mktemp(
+ prefix="app_",
+ suffix="_done_tracing",
+ dir=self._compat_pathlike(environment.lttng_home_location),
)
)
+
+ if wait_before_exit and wait_before_exit_file_path is None:
+ wait_before_exit_file_path = pathlib.Path(
+ tempfile.mktemp(
+ prefix="app_",
+ suffix="_exit",
+ dir=self._compat_pathlike(environment.lttng_home_location),
+ )
+ )
+
self._has_returned = False
test_app_env = os.environ.copy()
test_app_env["LTTNG_UST_REGISTER_TIMEOUT"] = "-1"
# File that the application will create to indicate it has completed its initialization.
- app_ready_file_path: str = tempfile.mktemp(
+ app_ready_file_path = tempfile.mktemp(
prefix="app_",
suffix="_ready",
- dir=self._compat_open_path(environment.lttng_home_location),
- )
+ dir=self._compat_pathlike(environment.lttng_home_location),
+ ) # type: str
test_app_args = [str(binary_path)]
+ test_app_args.extend(["--iter", str(event_count)])
test_app_args.extend(
- shlex.split(
- "--iter {iteration_count} --create-in-main {app_ready_file_path} --wait-before-first-event {app_start_tracing_file_path} --wait {wait_time_between_events_us}".format(
- iteration_count=self._iteration_count,
- app_ready_file_path=app_ready_file_path,
- app_start_tracing_file_path=self._app_start_tracing_file_path,
- wait_time_between_events_us=wait_time_between_events_us,
- )
- )
+ ["--sync-application-in-main-touch", str(app_ready_file_path)]
+ )
+ test_app_args.extend(
+ ["--sync-before-first-event", str(self._app_start_tracing_file_path)]
)
+ test_app_args.extend(
+ ["--sync-before-exit-touch", str(self._app_tracing_done_file_path)]
+ )
+ if wait_time_between_events_us != 0:
+ test_app_args.extend(["--wait", str(wait_time_between_events_us)])
- self._process: subprocess.Popen = subprocess.Popen(
+ self._process = subprocess.Popen(
test_app_args,
env=test_app_env,
- )
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ ) # type: subprocess.Popen
# Wait for the application to create the file indicating it has fully
# initialized. Make sure the app hasn't crashed in order to not wait
# forever.
+ self._wait_for_file_to_be_created(pathlib.Path(app_ready_file_path))
+
+ def _wait_for_file_to_be_created(self, sync_file_path):
+ # type: (pathlib.Path) -> None
while True:
- if os.path.exists(app_ready_file_path):
+ if os.path.exists(self._compat_pathlike(sync_file_path)):
break
if self._process.poll() is not None:
# Application has unexepectedly returned.
raise RuntimeError(
- "Test application has unexepectedly returned during its initialization with return code `{return_code}`".format(
- return_code=self._process.returncode
+ "Test application has unexepectedly returned while waiting for synchronization file to be created: sync_file=`{sync_file}`, return_code=`{return_code}`".format(
+ sync_file=sync_file_path, return_code=self._process.returncode
)
)
- time.sleep(0.1)
+ time.sleep(0.001)
- def trace(self) -> None:
+ def trace(self):
+ # type: () -> None
if self._process.poll() is not None:
# Application has unexepectedly returned.
raise RuntimeError(
return_code=self._process.returncode
)
)
- open(self._compat_open_path(self._app_start_tracing_file_path), mode="x")
+ open(self._compat_pathlike(self._app_start_tracing_file_path), mode="x")
- def wait_for_exit(self) -> None:
+ def wait_for_tracing_done(self):
+ # type: () -> None
+ self._wait_for_file_to_be_created(self._app_tracing_done_file_path)
+
+ def wait_for_exit(self):
+ # type: () -> None
if self._process.wait() != 0:
raise RuntimeError(
"Test application has exit with return code `{return_code}`".format(
self._has_returned = True
@property
- def vpid(self) -> int:
+ def vpid(self):
+ # type: () -> int
return self._process.pid
@staticmethod
- def _compat_open_path(path):
- # type: (pathlib.Path)
+ def _compat_pathlike(path):
+ # type: (pathlib.Path) -> pathlib.Path | str
"""
- The builtin open() in python >= 3.6 expects a path-like object while
- prior versions expect a string or bytes object. Return the correct type
- based on the presence of the "__fspath__" attribute specified in PEP-519.
+ The builtin open() and many methods of the 'os' library in Python >= 3.6
+ expect a path-like object while prior versions expect a string or
+ bytes object. Return the correct type based on the presence of the
+ "__fspath__" attribute specified in PEP-519.
"""
if hasattr(path, "__fspath__"):
return path
return str(path)
def __del__(self):
- if not self._has_returned:
+ if self._process is not None and not self._has_returned:
+ # This is potentially racy if the pid has been recycled. However,
+ # we can't use pidfd_open since it is only available in python >= 3.9.
+ self._process.kill()
+ self._process.wait()
+
+
+class WaitTraceTestApplicationGroup:
+ def __init__(
+ self,
+ environment, # type: Environment
+ application_count, # type: int
+ event_count, # type: int
+ wait_time_between_events_us=0, # type: int
+ wait_before_exit=False, # type: bool
+ ):
+ self._wait_before_exit_file_path = (
+ pathlib.Path(
+ tempfile.mktemp(
+ prefix="app_group_",
+ suffix="_exit",
+ dir=_WaitTraceTestApplication._compat_pathlike(
+ environment.lttng_home_location
+ ),
+ )
+ )
+ if wait_before_exit
+ else None
+ )
+
+ self._apps = []
+ self._consumers = []
+ for i in range(application_count):
+ new_app = environment.launch_wait_trace_test_application(
+ event_count,
+ wait_time_between_events_us,
+ wait_before_exit,
+ self._wait_before_exit_file_path,
+ )
+
+ # Attach an output consumer to log the application's error output (if any).
+ if environment._logging_function:
+ app_output_consumer = ProcessOutputConsumer(
+ new_app._process,
+ "app-{}".format(str(new_app.vpid)),
+ environment._logging_function,
+ ) # type: Optional[ProcessOutputConsumer]
+ app_output_consumer.daemon = True
+ app_output_consumer.start()
+ self._consumers.append(app_output_consumer)
+
+ self._apps.append(new_app)
+
+ def trace(self):
+ # type: () -> None
+ for app in self._apps:
+ app.trace()
+
+ def exit(
+ self, wait_for_apps=False # type: bool
+ ):
+ if self._wait_before_exit_file_path is None:
+ raise RuntimeError(
+ "Can't call exit on an application group created with `wait_before_exit=False`"
+ )
+
+ # Wait for apps to have produced all of their events so that we can
+ # cause the death of all apps to happen within a short time span.
+ for app in self._apps:
+ app.wait_for_tracing_done()
+
+ open(
+ _WaitTraceTestApplication._compat_pathlike(
+ self._wait_before_exit_file_path
+ ),
+ mode="x",
+ )
+ # Performed in two passes to allow tests to stress the unregistration of many applications.
+ # Waiting for each app to exit turn-by-turn would defeat the purpose here.
+ if wait_for_apps:
+ for app in self._apps:
+ app.wait_for_exit()
+
+
+class _TraceTestApplication:
+ """
+ Create an application that emits events as soon as it is launched. In most
+ scenarios, it is preferable to use a WaitTraceTestApplication.
+ """
+
+ def __init__(self, binary_path, environment):
+ # type: (pathlib.Path, Environment)
+ self._process = None
+ self._environment = environment # type: Environment
+ self._has_returned = False
+
+ test_app_env = os.environ.copy()
+ test_app_env["LTTNG_HOME"] = str(environment.lttng_home_location)
+ # Make sure the app is blocked until it is properly registered to
+ # the session daemon.
+ test_app_env["LTTNG_UST_REGISTER_TIMEOUT"] = "-1"
+
+ test_app_args = [str(binary_path)]
+
+ self._process = subprocess.Popen(
+ test_app_args, env=test_app_env
+ ) # type: subprocess.Popen
+
+ def wait_for_exit(self):
+ # type: () -> None
+ if self._process.wait() != 0:
+ raise RuntimeError(
+ "Test application has exit with return code `{return_code}`".format(
+ return_code=self._process.returncode
+ )
+ )
+ self._has_returned = True
+
+ def __del__(self):
+ if self._process is not None and not self._has_returned:
# This is potentially racy if the pid has been recycled. However,
# we can't use pidfd_open since it is only available in python >= 3.9.
self._process.kill()
class ProcessOutputConsumer(threading.Thread, logger._Logger):
def __init__(
- self, process: subprocess.Popen, name: str, log: Callable[[str], None]
+ self,
+ process, # type: subprocess.Popen
+ name, # type: str
+ log, # type: Callable[[str], None]
):
threading.Thread.__init__(self)
self._prefix = name
logger._Logger.__init__(self, log)
self._process = process
- def run(self) -> None:
+ def run(self):
+ # type: () -> None
while self._process.poll() is None:
assert self._process.stdout
line = self._process.stdout.readline().decode("utf-8").replace("\n", "")
# Generate a temporary environment in which to execute a test.
class _Environment(logger._Logger):
def __init__(
- self, with_sessiond: bool, log: Optional[Callable[[str], None]] = None
+ self,
+ with_sessiond, # type: bool
+ log=None, # type: Optional[Callable[[str], None]]
+ with_relayd=False, # type: bool
):
super().__init__(log)
signal.signal(signal.SIGTERM, self._handle_termination_signal)
# Assumes the project's hierarchy to this file is:
# tests/utils/python/this_file
- self._project_root: pathlib.Path = pathlib.Path(__file__).absolute().parents[3]
- self._lttng_home: Optional[TemporaryDirectory] = TemporaryDirectory(
+ self._project_root = (
+ pathlib.Path(__file__).absolute().parents[3]
+ ) # type: pathlib.Path
+ self._lttng_home = TemporaryDirectory(
"lttng_test_env_home"
- )
+ ) # type: Optional[TemporaryDirectory]
+
+ self._relayd = (
+ self._launch_lttng_relayd() if with_relayd else None
+ ) # type: Optional[subprocess.Popen[bytes]]
+ self._relayd_output_consumer = None
- self._sessiond: Optional[subprocess.Popen[bytes]] = (
+ self._sessiond = (
self._launch_lttng_sessiond() if with_sessiond else None
- )
+ ) # type: Optional[subprocess.Popen[bytes]]
@property
- def lttng_home_location(self) -> pathlib.Path:
+ def lttng_home_location(self):
+ # type: () -> pathlib.Path
if self._lttng_home is None:
raise RuntimeError("Attempt to access LTTng home after clean-up")
return self._lttng_home.path
@property
- def lttng_client_path(self) -> pathlib.Path:
+ def lttng_client_path(self):
+ # type: () -> pathlib.Path
return self._project_root / "src" / "bin" / "lttng" / "lttng"
- def create_temporary_directory(self, prefix: Optional[str] = None) -> pathlib.Path:
+ @property
+ def lttng_relayd_control_port(self):
+ # type: () -> int
+ return 5400
+
+ @property
+ def lttng_relayd_data_port(self):
+ # type: () -> int
+ return 5401
+
+ @property
+ def lttng_relayd_live_port(self):
+ # type: () -> int
+ return 5402
+
+ def create_temporary_directory(self, prefix=None):
+ # type: (Optional[str]) -> pathlib.Path
# Simply return a path that is contained within LTTNG_HOME; it will
# be destroyed when the temporary home goes out of scope.
assert self._lttng_home
# Unpack a list of environment variables from a string
# such as "HELLO=is_it ME='/you/are/looking/for'"
@staticmethod
- def _unpack_env_vars(env_vars_string: str) -> List[Tuple[str, str]]:
+ def _unpack_env_vars(env_vars_string):
+ # type: (str) -> List[Tuple[str, str]]
unpacked_vars = []
for var in shlex.split(env_vars_string):
equal_position = var.find("=")
return unpacked_vars
- def _launch_lttng_sessiond(self) -> Optional[subprocess.Popen]:
+ def _launch_lttng_relayd(self):
+ # type: () -> Optional[subprocess.Popen]
+ relayd_path = (
+ self._project_root / "src" / "bin" / "lttng-relayd" / "lttng-relayd"
+ )
+ if os.environ.get("LTTNG_TEST_NO_RELAYD", "0") == "1":
+ # Run without a relay daemon; the user may be running one
+ # under gdb, for example.
+ return None
+
+ relayd_env_vars = os.environ.get("LTTNG_RELAYD_ENV_VARS")
+ relayd_env = os.environ.copy()
+ if relayd_env_vars:
+ self._log("Additional lttng-relayd environment variables:")
+ for name, value in self._unpack_env_vars(relayd_env_vars):
+ self._log("{}={}".format(name, value))
+ relayd_env[name] = value
+
+ assert self._lttng_home is not None
+ relayd_env["LTTNG_HOME"] = str(self._lttng_home.path)
+ self._log(
+ "Launching relayd with LTTNG_HOME='${}'".format(str(self._lttng_home.path))
+ )
+ process = subprocess.Popen(
+ [
+ str(relayd_path),
+ "-C",
+ "tcp://0.0.0.0:{}".format(self.lttng_relayd_control_port),
+ "-D",
+ "tcp://0.0.0.0:{}".format(self.lttng_relayd_data_port),
+ "-L",
+ "tcp://localhost:{}".format(self.lttng_relayd_live_port),
+ ],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ env=relayd_env,
+ )
+
+ if self._logging_function:
+ self._relayd_output_consumer = ProcessOutputConsumer(
+ process, "lttng-relayd", self._logging_function
+ )
+ self._relayd_output_consumer.daemon = True
+ self._relayd_output_consumer.start()
+
+ return process
+
+ def _launch_lttng_sessiond(self):
+ # type: () -> Optional[subprocess.Popen]
is_64bits_host = sys.maxsize > 2**32
sessiond_path = (
sessiond_env["LTTNG_HOME"] = str(self._lttng_home.path)
wait_queue = _SignalWaitQueue()
- signal.signal(signal.SIGUSR1, wait_queue.signal)
-
- self._log(
- "Launching session daemon with LTTNG_HOME=`{home_dir}`".format(
- home_dir=str(self._lttng_home.path)
+ with wait_queue.intercept_signal(signal.SIGUSR1):
+ self._log(
+ "Launching session daemon with LTTNG_HOME=`{home_dir}`".format(
+ home_dir=str(self._lttng_home.path)
+ )
+ )
+ process = subprocess.Popen(
+ [
+ str(sessiond_path),
+ consumerd_path_option_name,
+ str(consumerd_path),
+ "--sig-parent",
+ ],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ env=sessiond_env,
)
- )
- process = subprocess.Popen(
- [
- str(sessiond_path),
- consumerd_path_option_name,
- str(consumerd_path),
- "--sig-parent",
- ],
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- env=sessiond_env,
- )
- if self._logging_function:
- self._sessiond_output_consumer: Optional[
- ProcessOutputConsumer
- ] = ProcessOutputConsumer(process, "lttng-sessiond", self._logging_function)
- self._sessiond_output_consumer.daemon = True
- self._sessiond_output_consumer.start()
+ if self._logging_function:
+ self._sessiond_output_consumer = ProcessOutputConsumer(
+ process, "lttng-sessiond", self._logging_function
+ ) # type: Optional[ProcessOutputConsumer]
+ self._sessiond_output_consumer.daemon = True
+ self._sessiond_output_consumer.start()
- # Wait for SIGUSR1, indicating the sessiond is ready to proceed
- wait_queue.wait_for_signal()
- signal.signal(signal.SIGUSR1, wait_queue.signal)
+ # Wait for SIGUSR1, indicating the sessiond is ready to proceed
+ wait_queue.wait_for_signal()
return process
- def _handle_termination_signal(
- self, signal_number: int, frame: Optional[FrameType]
- ) -> None:
+ def _handle_termination_signal(self, signal_number, frame):
+ # type: (int, Optional[FrameType]) -> None
self._log(
"Killed by {signal_name} signal, cleaning-up".format(
signal_name=signal.strsignal(signal_number)
self._cleanup()
def launch_wait_trace_test_application(
- self, event_count: int
- ) -> WaitTraceTestApplication:
+ self,
+ event_count, # type: int
+ wait_time_between_events_us=0,
+ wait_before_exit=False,
+ wait_before_exit_file_path=None,
+ ):
+ # type: (int, int, bool, Optional[pathlib.Path]) -> _WaitTraceTestApplication
"""
Launch an application that will wait before tracing `event_count` events.
"""
- return WaitTraceTestApplication(
+ return _WaitTraceTestApplication(
self._project_root
/ "tests"
/ "utils"
/ "testapp"
- / "gen-ust-nevents"
- / "gen-ust-nevents",
+ / "gen-ust-events"
+ / "gen-ust-events",
event_count,
self,
+ wait_time_between_events_us,
+ wait_before_exit,
+ wait_before_exit_file_path,
)
+ def launch_test_application(self, subpath):
+ # type () -> TraceTestApplication
+ """
+ Launch an application that will trace from within constructors.
+ """
+ return _TraceTestApplication(
+ self._project_root / "tests" / "utils" / "testapp" / subpath,
+ self,
+ )
+
+ def _terminate_relayd(self):
+ if self._relayd and self._relayd.poll() is None:
+ self._relayd.terminate()
+ self._relayd.wait()
+ if self._relayd_output_consumer:
+ self._relayd_output_consumer.join()
+ self._relayd_output_consumer = None
+ self._log("Relayd killed")
+ self._relayd = None
+
# Clean-up managed processes
- def _cleanup(self) -> None:
+ def _cleanup(self):
+ # type: () -> None
if self._sessiond and self._sessiond.poll() is None:
# The session daemon is alive; kill it.
self._log(
self._log("Session daemon killed")
self._sessiond = None
+ self._terminate_relayd()
+
self._lttng_home = None
def __del__(self):
@contextlib.contextmanager
-def test_environment(with_sessiond: bool, log: Optional[Callable[[str], None]] = None):
- env = _Environment(with_sessiond, log)
+def test_environment(with_sessiond, log=None, with_relayd=False):
+ # type: (bool, Optional[Callable[[str], None]], bool) -> Iterator[_Environment]
+ env = _Environment(with_sessiond, log, with_relayd)
try:
yield env
finally: