Use compiler-agnostic defines to silence warning
[lttng-tools.git] / tests / utils / lttngtest / environment.py
index 2710a7efb957dd46994f20454054bbc1acc3ddfd..c6dd0397a773e6bf263c1082456576f422b4dc9c 100644 (file)
@@ -9,10 +9,15 @@ from types import FrameType
 from typing import Callable, Iterator, Optional, Tuple, List, Generator
 import sys
 import pathlib
 from typing import Callable, Iterator, Optional, Tuple, List, Generator
 import sys
 import pathlib
+import pwd
+import random
 import signal
 import signal
+import socket
 import subprocess
 import shlex
 import shutil
 import subprocess
 import shlex
 import shutil
+import stat
+import string
 import os
 import queue
 import tempfile
 import os
 import queue
 import tempfile
@@ -21,6 +26,8 @@ import time
 import threading
 import contextlib
 
 import threading
 import contextlib
 
+import bt2
+
 
 class TemporaryDirectory:
     def __init__(self, prefix):
 
 class TemporaryDirectory:
     def __init__(self, prefix):
@@ -28,7 +35,8 @@ class TemporaryDirectory:
         self._directory_path = tempfile.mkdtemp(prefix=prefix)
 
     def __del__(self):
         self._directory_path = tempfile.mkdtemp(prefix=prefix)
 
     def __del__(self):
-        shutil.rmtree(self._directory_path, ignore_errors=True)
+        if os.getenv("LTTNG_TEST_PRESERVE_TEST_ENV", "0") != "1":
+            shutil.rmtree(self._directory_path, ignore_errors=True)
 
     @property
     def path(self):
 
     @property
     def path(self):
@@ -77,7 +85,125 @@ class _SignalWaitQueue:
             signal.signal(signal_number, original_handler)
 
 
             signal.signal(signal_number, original_handler)
 
 
-class WaitTraceTestApplication:
+class _LiveViewer:
+    """
+    Create a babeltrace2 live viewer.
+    """
+
+    def __init__(
+        self,
+        environment,  # type: Environment
+        session,  # type: str
+        hostname=None,  # type: Optional[str]
+    ):
+        self._environment = environment
+        self._session = session
+        self._hostname = hostname
+        if self._hostname is None:
+            self._hostname = socket.gethostname()
+        self._events = []
+
+        ctf_live_cc = bt2.find_plugin("ctf").source_component_classes["lttng-live"]
+        self._live_iterator = bt2.TraceCollectionMessageIterator(
+            bt2.ComponentSpec(
+                ctf_live_cc,
+                {
+                    "inputs": [
+                        "net://localhost:{}/host/{}/{}".format(
+                            environment.lttng_relayd_live_port,
+                            self._hostname,
+                            session,
+                        )
+                    ],
+                    "session-not-found-action": "end",
+                },
+            )
+        )
+
+        try:
+            # Cause the connection to be initiated since tests
+            # tend to wait for a viewer to be connected before proceeding.
+            msg = next(self._live_iterator)
+            self._events.append(msg)
+        except bt2.TryAgain:
+            pass
+
+    @property
+    def output(self):
+        return self._events
+
+    @property
+    def messages(self):
+        return [x for x in self._events if type(x) is bt2._EventMessageConst]
+
+    def _drain(self, retry=False):
+        while True:
+            try:
+                for msg in self._live_iterator:
+                    self._events.append(msg)
+                break
+            except bt2.TryAgain as e:
+                if retry:
+                    time.sleep(0.01)
+                    continue
+                else:
+                    break
+
+    def wait_until_connected(self, timeout=0):
+        ctf_live_cc = bt2.find_plugin("ctf").source_component_classes["lttng-live"]
+        self._environment._log(
+            "Checking for connected clients at 'net://localhost:{}'".format(
+                self._environment.lttng_relayd_live_port
+            )
+        )
+        query_executor = bt2.QueryExecutor(
+            ctf_live_cc,
+            "sessions",
+            params={
+                "url": "net://localhost:{}".format(
+                    self._environment.lttng_relayd_live_port
+                )
+            },
+        )
+        connected = False
+        started = time.time()
+        while not connected:
+            try:
+                if timeout != 0 and (time.time() - started) > timeout:
+                    raise RuntimeError(
+                        "Timed out waiting for connected clients on session '{}' after {}s".format(
+                            self._session, time.time() - started
+                        )
+                    )
+                query_result = query_executor.query()
+            except bt2._Error:
+                time.sleep(0.01)
+                continue
+            for live_session in query_result:
+                if (
+                    live_session["session-name"] == self._session
+                    and live_session["client-count"] >= 1
+                ):
+                    connected = True
+                    self._environment._log(
+                        "Session '{}' has {} connected clients".format(
+                            live_session["session-name"], live_session["client-count"]
+                        )
+                    )
+                    break
+        return connected
+
+    def wait(self):
+        if self._live_iterator:
+            self._drain(retry=True)
+            del self._live_iterator
+            self._live_iterator = None
+
+    def __del__(self):
+        pass
+
+
+class _WaitTraceTestApplication:
     """
     Create an application that waits before tracing. This allows a test to
     launch an application, get its PID, and get it to start tracing when it
     """
     Create an application that waits before tracing. This allows a test to
     launch an application, get its PID, and get it to start tracing when it
@@ -90,21 +216,45 @@ class WaitTraceTestApplication:
         event_count,  # type: int
         environment,  # type: Environment
         wait_time_between_events_us=0,  # type: int
         event_count,  # type: int
         environment,  # type: Environment
         wait_time_between_events_us=0,  # type: int
+        wait_before_exit=False,  # type: bool
+        wait_before_exit_file_path=None,  # type: Optional[pathlib.Path]
+        run_as=None,  # type: Optional[str]
     ):
     ):
+        self._process = None
         self._environment = environment  # type: Environment
         self._environment = environment  # type: Environment
-        if event_count % 5:
-            # The test application currently produces 5 different events per iteration.
-            raise ValueError("event count must be a multiple of 5")
-        self._iteration_count = int(event_count / 5)  # type: int
+        self._iteration_count = event_count
         # File that the application will wait to see before tracing its events.
         # File that the application will wait to see before tracing its events.
+        dir = self._compat_pathlike(environment.lttng_home_location)
+        if run_as is not None:
+            dir = os.path.join(dir, run_as)
         self._app_start_tracing_file_path = pathlib.Path(
             tempfile.mktemp(
                 prefix="app_",
                 suffix="_start_tracing",
         self._app_start_tracing_file_path = pathlib.Path(
             tempfile.mktemp(
                 prefix="app_",
                 suffix="_start_tracing",
-                dir=self._compat_open_path(environment.lttng_home_location),
+                dir=dir,
             )
         )
             )
         )
+
+        # File that the application will create when all events have been emitted.
+        self._app_tracing_done_file_path = pathlib.Path(
+            tempfile.mktemp(
+                prefix="app_",
+                suffix="_done_tracing",
+                dir=dir,
+            )
+        )
+
+        if wait_before_exit and wait_before_exit_file_path is None:
+            wait_before_exit_file_path = pathlib.Path(
+                tempfile.mktemp(
+                    prefix="app_",
+                    suffix="_exit",
+                    dir=dir,
+                )
+            )
+        self._wait_before_exit_file_path = wait_before_exit_file_path
         self._has_returned = False
         self._has_returned = False
+        self._tracing_started = False
 
         test_app_env = os.environ.copy()
         test_app_env["LTTNG_HOME"] = str(environment.lttng_home_location)
 
         test_app_env = os.environ.copy()
         test_app_env["LTTNG_HOME"] = str(environment.lttng_home_location)
@@ -116,42 +266,127 @@ class WaitTraceTestApplication:
         app_ready_file_path = tempfile.mktemp(
             prefix="app_",
             suffix="_ready",
         app_ready_file_path = tempfile.mktemp(
             prefix="app_",
             suffix="_ready",
-            dir=self._compat_open_path(environment.lttng_home_location),
+            dir=dir,
         )  # type: str
 
         test_app_args = [str(binary_path)]
         )  # type: str
 
         test_app_args = [str(binary_path)]
+        test_app_args.extend(["--iter", str(event_count)])
         test_app_args.extend(
         test_app_args.extend(
-            shlex.split(
-                "--iter {iteration_count} --create-in-main {app_ready_file_path} --wait-before-first-event {app_start_tracing_file_path} --wait {wait_time_between_events_us}".format(
-                    iteration_count=self._iteration_count,
-                    app_ready_file_path=app_ready_file_path,
-                    app_start_tracing_file_path=self._app_start_tracing_file_path,
-                    wait_time_between_events_us=wait_time_between_events_us,
+            ["--sync-application-in-main-touch", str(app_ready_file_path)]
+        )
+        test_app_args.extend(
+            ["--sync-before-first-event", str(self._app_start_tracing_file_path)]
+        )
+        test_app_args.extend(
+            ["--sync-before-exit-touch", str(self._app_tracing_done_file_path)]
+        )
+        if wait_before_exit:
+            test_app_args.extend(
+                ["--sync-before-exit", str(self._wait_before_exit_file_path)]
+            )
+        if wait_time_between_events_us != 0:
+            test_app_args.extend(["--wait", str(wait_time_between_events_us)])
+
+        if run_as is not None:
+            # When running as root and reducing the permissions to run as another
+            # user, the test binary needs to be readable and executable by the
+            # world; however, the file may be in a deep path or on systems where
+            # we don't want to modify the filesystem state (eg. for a person who
+            # has downloaded and ran the tests manually).
+            # Therefore, the binary_path is copied to a temporary file in the
+            # `run_as` user's home directory
+            new_binary_path = os.path.join(
+                str(environment.lttng_home_location),
+                run_as,
+                os.path.basename(str(binary_path)),
+            )
+
+            if not os.path.exists(new_binary_path):
+                shutil.copy(str(binary_path), new_binary_path)
+
+            test_app_args[0] = new_binary_path
+
+            lib_dir = environment.lttng_home_location / run_as / "lib"
+            if not os.path.isdir(str(lib_dir)):
+                os.mkdir(str(lib_dir))
+                # When running dropping privileges, the libraries built in the
+                # root-owned directories may not be reachable and readable by
+                # the loader running as an unprivileged user. These should also be
+                # copied.
+                _ldd = subprocess.Popen(
+                    ["ldd", new_binary_path],
+                    stdout=subprocess.PIPE,
+                    stderr=subprocess.PIPE,
                 )
                 )
+                if _ldd.wait() != 0:
+                    raise RuntimeError(
+                        "Error while using `ldd` to determine test application dependencies: `{}`".format(
+                            stderr.read().decode("utf-8")
+                        )
+                    )
+                libs = [
+                    x.decode("utf-8").split(sep="=>") for x in _ldd.stdout.readlines()
+                ]
+                libs = [
+                    x[1].split(sep=" ")[1]
+                    for x in libs
+                    if len(x) >= 2 and x[1].find("lttng") != -1
+                ]
+                for lib in libs:
+                    shutil.copy(lib, lib_dir)
+
+            test_app_env["LD_LIBRARY_PATH"] = "{}:{}".format(
+                test_app_env["LD_LIBRARY_PATH"],
+                str(lib_dir),
             )
             )
-        )
 
 
+            # As of python 3.9, subprocess.Popen supports a user parameter which
+            # runs `setreuid()` before executing the proces and will be preferable
+            # when support for older python versions is no longer required.
+            test_app_args = [
+                "runuser",
+                "-u",
+                run_as,
+                "--",
+            ] + test_app_args
+
+        self._environment._log(
+            "Launching test application: '{}'".format(
+                self._compat_shlex_join(test_app_args)
+            )
+        )
         self._process = subprocess.Popen(
             test_app_args,
             env=test_app_env,
         self._process = subprocess.Popen(
             test_app_args,
             env=test_app_env,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE,
         )  # type: subprocess.Popen
 
         # Wait for the application to create the file indicating it has fully
         # initialized. Make sure the app hasn't crashed in order to not wait
         # forever.
         )  # type: subprocess.Popen
 
         # Wait for the application to create the file indicating it has fully
         # initialized. Make sure the app hasn't crashed in order to not wait
         # forever.
+        self._wait_for_file_to_be_created(pathlib.Path(app_ready_file_path))
+
+    def _wait_for_file_to_be_created(self, sync_file_path):
+        # type: (pathlib.Path) -> None
         while True:
         while True:
-            if os.path.exists(app_ready_file_path):
+            if os.path.exists(self._compat_pathlike(sync_file_path)):
                 break
 
             if self._process.poll() is not None:
                 # Application has unexepectedly returned.
                 raise RuntimeError(
                 break
 
             if self._process.poll() is not None:
                 # Application has unexepectedly returned.
                 raise RuntimeError(
-                    "Test application has unexepectedly returned during its initialization with return code `{return_code}`".format(
-                        return_code=self._process.returncode
+                    "Test application has unexepectedly returned while waiting for synchronization file to be created: sync_file=`{sync_file}`, return_code=`{return_code}`, output=`{output}`".format(
+                        sync_file=sync_file_path,
+                        return_code=self._process.returncode,
+                        output=self._process.stderr.read().decode("utf-8"),
                     )
                 )
 
                     )
                 )
 
-            time.sleep(0.1)
+            time.sleep(0.001)
+
+    def touch_exit_file(self):
+        open(self._compat_pathlike(self._wait_before_exit_file_path), mode="x")
 
     def trace(self):
         # type: () -> None
 
     def trace(self):
         # type: () -> None
@@ -162,14 +397,25 @@ class WaitTraceTestApplication:
                     return_code=self._process.returncode
                 )
             )
                     return_code=self._process.returncode
                 )
             )
-        open(self._compat_open_path(self._app_start_tracing_file_path), mode="x")
+        open(self._compat_pathlike(self._app_start_tracing_file_path), mode="x")
+        self._environment._log("[{}] Tracing started".format(self.vpid))
+        self._tracing_started = True
+
+    def wait_for_tracing_done(self):
+        # type: () -> None
+        if not self._tracing_started:
+            raise RuntimeError("Tracing hasn't been started")
+        self._wait_for_file_to_be_created(self._app_tracing_done_file_path)
+        self._environment._log("[{}] Tracing done".format(self.vpid))
 
     def wait_for_exit(self):
         # type: () -> None
         if self._process.wait() != 0:
             raise RuntimeError(
 
     def wait_for_exit(self):
         # type: () -> None
         if self._process.wait() != 0:
             raise RuntimeError(
-                "Test application has exit with return code `{return_code}`".format(
-                    return_code=self._process.returncode
+                "Test application [{pid}] has exit with return code `{return_code}`, output=`{output}`".format(
+                    pid=self.vpid,
+                    return_code=self._process.returncode,
+                    output=self._process.stderr.read().decode("utf-8"),
                 )
             )
         self._has_returned = True
                 )
             )
         self._has_returned = True
@@ -180,27 +426,107 @@ class WaitTraceTestApplication:
         return self._process.pid
 
     @staticmethod
         return self._process.pid
 
     @staticmethod
-    def _compat_open_path(path):
+    def _compat_pathlike(path):
         # type: (pathlib.Path) -> pathlib.Path | str
         """
         # type: (pathlib.Path) -> pathlib.Path | str
         """
-        The builtin open() in python >= 3.6 expects a path-like object while
-        prior versions expect a string or bytes object. Return the correct type
-        based on the presence of the "__fspath__" attribute specified in PEP-519.
+        The builtin open() and many methods of the 'os' library in Python >= 3.6
+        expect a path-like object while prior versions expect a string or
+        bytes object. Return the correct type based on the presence of the
+        "__fspath__" attribute specified in PEP-519.
         """
         if hasattr(path, "__fspath__"):
             return path
         else:
             return str(path)
 
         """
         if hasattr(path, "__fspath__"):
             return path
         else:
             return str(path)
 
+    @staticmethod
+    def _compat_shlex_join(args):
+        # type: list[str] -> str
+        # shlex.join was added in python 3.8
+        return " ".join([shlex.quote(x) for x in args])
+
     def __del__(self):
     def __del__(self):
-        if not self._has_returned:
+        if self._process is not None and not self._has_returned:
             # This is potentially racy if the pid has been recycled. However,
             # we can't use pidfd_open since it is only available in python >= 3.9.
             self._process.kill()
             self._process.wait()
 
 
             # This is potentially racy if the pid has been recycled. However,
             # we can't use pidfd_open since it is only available in python >= 3.9.
             self._process.kill()
             self._process.wait()
 
 
-class TraceTestApplication:
+class WaitTraceTestApplicationGroup:
+    def __init__(
+        self,
+        environment,  # type: Environment
+        application_count,  # type: int
+        event_count,  # type: int
+        wait_time_between_events_us=0,  # type: int
+        wait_before_exit=False,  # type: bool
+    ):
+        self._wait_before_exit_file_path = (
+            pathlib.Path(
+                tempfile.mktemp(
+                    prefix="app_group_",
+                    suffix="_exit",
+                    dir=_WaitTraceTestApplication._compat_pathlike(
+                        environment.lttng_home_location
+                    ),
+                )
+            )
+            if wait_before_exit
+            else None
+        )
+
+        self._apps = []
+        self._consumers = []
+        for i in range(application_count):
+            new_app = environment.launch_wait_trace_test_application(
+                event_count,
+                wait_time_between_events_us,
+                wait_before_exit,
+                self._wait_before_exit_file_path,
+            )
+
+            # Attach an output consumer to log the application's error output (if any).
+            if environment._logging_function:
+                app_output_consumer = ProcessOutputConsumer(
+                    new_app._process,
+                    "app-{}".format(str(new_app.vpid)),
+                    environment._logging_function,
+                )  # type: Optional[ProcessOutputConsumer]
+                app_output_consumer.daemon = True
+                app_output_consumer.start()
+                self._consumers.append(app_output_consumer)
+
+            self._apps.append(new_app)
+
+    def trace(self):
+        # type: () -> None
+        for app in self._apps:
+            app.trace()
+
+    def exit(
+        self, wait_for_apps=False  # type: bool
+    ):
+        if self._wait_before_exit_file_path is None:
+            raise RuntimeError(
+                "Can't call exit on an application group created with `wait_before_exit=False`"
+            )
+
+        # Wait for apps to have produced all of their events so that we can
+        # cause the death of all apps to happen within a short time span.
+        for app in self._apps:
+            app.wait_for_tracing_done()
+
+        self._apps[0].touch_exit_file()
+
+        # Performed in two passes to allow tests to stress the unregistration of many applications.
+        # Waiting for each app to exit turn-by-turn would defeat the purpose here.
+        if wait_for_apps:
+            for app in self._apps:
+                app.wait_for_exit()
+
+
+class _TraceTestApplication:
     """
     Create an application that emits events as soon as it is launched. In most
     scenarios, it is preferable to use a WaitTraceTestApplication.
     """
     Create an application that emits events as soon as it is launched. In most
     scenarios, it is preferable to use a WaitTraceTestApplication.
@@ -208,6 +534,7 @@ class TraceTestApplication:
 
     def __init__(self, binary_path, environment):
         # type: (pathlib.Path, Environment)
 
     def __init__(self, binary_path, environment):
         # type: (pathlib.Path, Environment)
+        self._process = None
         self._environment = environment  # type: Environment
         self._has_returned = False
 
         self._environment = environment  # type: Environment
         self._has_returned = False
 
@@ -234,7 +561,7 @@ class TraceTestApplication:
         self._has_returned = True
 
     def __del__(self):
         self._has_returned = True
 
     def __del__(self):
-        if not self._has_returned:
+        if self._process is not None and not self._has_returned:
             # This is potentially racy if the pid has been recycled. However,
             # we can't use pidfd_open since it is only available in python >= 3.9.
             self._process.kill()
             # This is potentially racy if the pid has been recycled. However,
             # we can't use pidfd_open since it is only available in python >= 3.9.
             self._process.kill()
@@ -262,30 +589,68 @@ class ProcessOutputConsumer(threading.Thread, logger._Logger):
                 self._log("{prefix}: {line}".format(prefix=self._prefix, line=line))
 
 
                 self._log("{prefix}: {line}".format(prefix=self._prefix, line=line))
 
 
+class SavingProcessOutputConsumer(ProcessOutputConsumer):
+    def __init__(self, process, name, log):
+        self._lines = []
+        super().__init__(process=process, name=name, log=log)
+
+    def run(self):
+        # type: () -> None
+        while self._process.poll() is None:
+            assert self._process.stdout
+            line = self._process.stdout.readline().decode("utf-8").replace("\n", "")
+            if len(line) != 0:
+                self._lines.append(line)
+                self._log("{prefix}: {line}".format(prefix=self._prefix, line=line))
+
+    @property
+    def output(self):
+        return self._lines
+
+
 # Generate a temporary environment in which to execute a test.
 class _Environment(logger._Logger):
     def __init__(
         self,
         with_sessiond,  # type: bool
         log=None,  # type: Optional[Callable[[str], None]]
 # Generate a temporary environment in which to execute a test.
 class _Environment(logger._Logger):
     def __init__(
         self,
         with_sessiond,  # type: bool
         log=None,  # type: Optional[Callable[[str], None]]
+        with_relayd=False,  # type: bool
     ):
         super().__init__(log)
         signal.signal(signal.SIGTERM, self._handle_termination_signal)
         signal.signal(signal.SIGINT, self._handle_termination_signal)
 
     ):
         super().__init__(log)
         signal.signal(signal.SIGTERM, self._handle_termination_signal)
         signal.signal(signal.SIGINT, self._handle_termination_signal)
 
+        if os.getenv("LTTNG_TEST_VERBOSE_BABELTRACE", "0") == "1":
+            # @TODO: Is there a way to feed the logging output to
+            # the logger._Logger instead of directly to stderr?
+            bt2.set_global_logging_level(bt2.LoggingLevel.TRACE)
+
         # Assumes the project's hierarchy to this file is:
         # tests/utils/python/this_file
         self._project_root = (
             pathlib.Path(__file__).absolute().parents[3]
         )  # type: pathlib.Path
         # Assumes the project's hierarchy to this file is:
         # tests/utils/python/this_file
         self._project_root = (
             pathlib.Path(__file__).absolute().parents[3]
         )  # type: pathlib.Path
+
         self._lttng_home = TemporaryDirectory(
             "lttng_test_env_home"
         self._lttng_home = TemporaryDirectory(
             "lttng_test_env_home"
-        )  # type: Optional[TemporaryDirectory]
+        )  # type: Optional[str]
+        os.chmod(
+            str(self._lttng_home.path),
+            stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IROTH | stat.S_IXOTH,
+        )
+
+        self._relayd = (
+            self._launch_lttng_relayd() if with_relayd else None
+        )  # type: Optional[subprocess.Popen[bytes]]
+        self._relayd_output_consumer = None
 
         self._sessiond = (
             self._launch_lttng_sessiond() if with_sessiond else None
         )  # type: Optional[subprocess.Popen[bytes]]
 
 
         self._sessiond = (
             self._launch_lttng_sessiond() if with_sessiond else None
         )  # type: Optional[subprocess.Popen[bytes]]
 
+        self._dummy_users = {}  # type: Dictionary[int, string]
+        self._preserve_test_env = os.getenv("LTTNG_TEST_PRESERVE_TEST_ENV", "0") != "1"
+
     @property
     def lttng_home_location(self):
         # type: () -> pathlib.Path
     @property
     def lttng_home_location(self):
         # type: () -> pathlib.Path
@@ -298,6 +663,69 @@ class _Environment(logger._Logger):
         # type: () -> pathlib.Path
         return self._project_root / "src" / "bin" / "lttng" / "lttng"
 
         # type: () -> pathlib.Path
         return self._project_root / "src" / "bin" / "lttng" / "lttng"
 
+    @property
+    def lttng_relayd_control_port(self):
+        # type: () -> int
+        return 5400
+
+    @property
+    def lttng_relayd_data_port(self):
+        # type: () -> int
+        return 5401
+
+    @property
+    def lttng_relayd_live_port(self):
+        # type: () -> int
+        return 5402
+
+    @property
+    def preserve_test_env(self):
+        # type: () -> bool
+        return self._preserve_test_env
+
+    @staticmethod
+    def allows_destructive():
+        # type: () -> bool
+        return os.getenv("LTTNG_ENABLE_DESTRUCTIVE_TESTS", "") == "will-break-my-system"
+
+    def create_dummy_user(self):
+        # type: () -> (int, str)
+        # Create a dummy user. The uid and username will be eturned in a tuple.
+        # If the name already exists, an exception will be thrown.
+        # The users will be removed when the environment is cleaned up.
+        name = "".join([random.choice(string.ascii_lowercase) for x in range(10)])
+
+        try:
+            entry = pwd.getpwnam(name)
+            raise Exception("User '{}' already exists".format(name))
+        except KeyError:
+            pass
+
+        # Create user
+        proc = subprocess.Popen(
+            [
+                "useradd",
+                "--base-dir",
+                str(self._lttng_home.path),
+                "--create-home",
+                "--no-user-group",
+                "--shell",
+                "/bin/sh",
+                name,
+            ]
+        )
+        proc.wait()
+        if proc.returncode != 0:
+            raise Exception(
+                "Failed to create user '{}', useradd returned {}".format(
+                    name, proc.returncode
+                )
+            )
+
+        entry = pwd.getpwnam(name)
+        self._dummy_users[entry[2]] = name
+        return (entry[2], name)
+
     def create_temporary_directory(self, prefix=None):
         # type: (Optional[str]) -> pathlib.Path
         # Simply return a path that is contained within LTTNG_HOME; it will
     def create_temporary_directory(self, prefix=None):
         # type: (Optional[str]) -> pathlib.Path
         # Simply return a path that is contained within LTTNG_HOME; it will
@@ -333,6 +761,57 @@ class _Environment(logger._Logger):
 
         return unpacked_vars
 
 
         return unpacked_vars
 
+    def _launch_lttng_relayd(self):
+        # type: () -> Optional[subprocess.Popen]
+        relayd_path = (
+            self._project_root / "src" / "bin" / "lttng-relayd" / "lttng-relayd"
+        )
+        if os.environ.get("LTTNG_TEST_NO_RELAYD", "0") == "1":
+            # Run without a relay daemon; the user may be running one
+            # under gdb, for example.
+            return None
+
+        relayd_env_vars = os.environ.get("LTTNG_RELAYD_ENV_VARS")
+        relayd_env = os.environ.copy()
+        if relayd_env_vars:
+            self._log("Additional lttng-relayd environment variables:")
+            for name, value in self._unpack_env_vars(relayd_env_vars):
+                self._log("{}={}".format(name, value))
+                relayd_env[name] = value
+
+        assert self._lttng_home is not None
+        relayd_env["LTTNG_HOME"] = str(self._lttng_home.path)
+        self._log(
+            "Launching relayd with LTTNG_HOME='${}'".format(str(self._lttng_home.path))
+        )
+        verbose = []
+        if os.environ.get("LTTNG_TEST_VERBOSE_RELAYD") is not None:
+            verbose = ["-vvv"]
+        process = subprocess.Popen(
+            [
+                str(relayd_path),
+                "-C",
+                "tcp://0.0.0.0:{}".format(self.lttng_relayd_control_port),
+                "-D",
+                "tcp://0.0.0.0:{}".format(self.lttng_relayd_data_port),
+                "-L",
+                "tcp://localhost:{}".format(self.lttng_relayd_live_port),
+            ]
+            + verbose,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.STDOUT,
+            env=relayd_env,
+        )
+
+        if self._logging_function:
+            self._relayd_output_consumer = ProcessOutputConsumer(
+                process, "lttng-relayd", self._logging_function
+            )
+            self._relayd_output_consumer.daemon = True
+            self._relayd_output_consumer.start()
+
+        return process
+
     def _launch_lttng_sessiond(self):
         # type: () -> Optional[subprocess.Popen]
         is_64bits_host = sys.maxsize > 2**32
     def _launch_lttng_sessiond(self):
         # type: () -> Optional[subprocess.Popen]
         is_64bits_host = sys.maxsize > 2**32
@@ -377,13 +856,17 @@ class _Environment(logger._Logger):
                     home_dir=str(self._lttng_home.path)
                 )
             )
                     home_dir=str(self._lttng_home.path)
                 )
             )
+            verbose = []
+            if os.environ.get("LTTNG_TEST_VERBOSE_SESSIOND") is not None:
+                verbose = ["-vvv", "--verbose-consumer"]
             process = subprocess.Popen(
                 [
                     str(sessiond_path),
                     consumerd_path_option_name,
                     str(consumerd_path),
                     "--sig-parent",
             process = subprocess.Popen(
                 [
                     str(sessiond_path),
                     consumerd_path_option_name,
                     str(consumerd_path),
                     "--sig-parent",
-                ],
+                ]
+                + verbose,
                 stdout=subprocess.PIPE,
                 stderr=subprocess.STDOUT,
                 env=sessiond_env,
                 stdout=subprocess.PIPE,
                 stderr=subprocess.STDOUT,
                 env=sessiond_env,
@@ -410,37 +893,79 @@ class _Environment(logger._Logger):
         )
         self._cleanup()
 
         )
         self._cleanup()
 
-    def launch_wait_trace_test_application(self, event_count):
-        # type: (int) -> WaitTraceTestApplication
+    def launch_live_viewer(self, session, hostname=None):
+        # Make sure the relayd is ready
+        ready = False
+        ctf_live_cc = bt2.find_plugin("ctf").source_component_classes["lttng-live"]
+        query_executor = bt2.QueryExecutor(
+            ctf_live_cc,
+            "sessions",
+            params={"url": "net://localhost:{}".format(self.lttng_relayd_live_port)},
+        )
+        while not ready:
+            try:
+                query_result = query_executor.query()
+            except bt2._Error:
+                time.sleep(0.1)
+                continue
+            for live_session in query_result:
+                if live_session["session-name"] == session:
+                    ready = True
+                    self._log(
+                        "Session '{}' is available at net://localhost:{}".format(
+                            session, self.lttng_relayd_live_port
+                        )
+                    )
+                    break
+        return _LiveViewer(self, session, hostname)
+
+    def launch_wait_trace_test_application(
+        self,
+        event_count,  # type: int
+        wait_time_between_events_us=0,
+        wait_before_exit=False,
+        wait_before_exit_file_path=None,
+        run_as=None,
+    ):
+        # type: (int, int, bool, Optional[pathlib.Path], Optional[str]) -> _WaitTraceTestApplication
         """
         Launch an application that will wait before tracing `event_count` events.
         """
         """
         Launch an application that will wait before tracing `event_count` events.
         """
-        return WaitTraceTestApplication(
+        return _WaitTraceTestApplication(
             self._project_root
             / "tests"
             / "utils"
             / "testapp"
             self._project_root
             / "tests"
             / "utils"
             / "testapp"
-            / "gen-ust-nevents"
-            / "gen-ust-nevents",
+            / "gen-ust-events"
+            / "gen-ust-events",
             event_count,
             self,
             event_count,
             self,
+            wait_time_between_events_us,
+            wait_before_exit,
+            wait_before_exit_file_path,
+            run_as,
         )
 
         )
 
-    def launch_trace_test_constructor_application(self):
+    def launch_test_application(self, subpath):
         # type () -> TraceTestApplication
         """
         Launch an application that will trace from within constructors.
         """
         # type () -> TraceTestApplication
         """
         Launch an application that will trace from within constructors.
         """
-        return TraceTestApplication(
-            self._project_root
-            / "tests"
-            / "utils"
-            / "testapp"
-            / "gen-ust-events-constructor"
-            / "gen-ust-events-constructor",
+        return _TraceTestApplication(
+            self._project_root / "tests" / "utils" / "testapp" / subpath,
             self,
         )
 
             self,
         )
 
+    def _terminate_relayd(self):
+        if self._relayd and self._relayd.poll() is None:
+            self._relayd.terminate()
+            self._relayd.wait()
+            if self._relayd_output_consumer:
+                self._relayd_output_consumer.join()
+                self._relayd_output_consumer = None
+            self._log("Relayd killed")
+            self._relayd = None
+
     # Clean-up managed processes
     def _cleanup(self):
         # type: () -> None
     # Clean-up managed processes
     def _cleanup(self):
         # type: () -> None
@@ -461,6 +986,40 @@ class _Environment(logger._Logger):
             self._log("Session daemon killed")
             self._sessiond = None
 
             self._log("Session daemon killed")
             self._sessiond = None
 
+        self._terminate_relayd()
+
+        # The user accounts will always be deleted, but the home directories will
+        # be retained unless the user has opted to preserve the test environment.
+        userdel = ["userdel"]
+        if not self.preserve_test_env:
+            userdel += ["--remove"]
+        for uid, name in self._dummy_users.items():
+            # When subprocess is run during the interpreter teardown, ImportError
+            # may be raised; however, the commands seem to execute correctly.
+            # Eg.
+            #
+            # Exception ignored in: <function _Environment.__del__ at 0x7f2d62e3b9c0>
+            # Traceback (most recent call last):
+            #   File "tests/utils/lttngtest/environment.py", line 1024, in __del__
+            #   File "tests/utils/lttngtest/environment.py", line 1016, in _cleanup
+            #   File "/usr/lib/python3.11/subprocess.py", line 1026, in __init__
+            #   File "/usr/lib/python3.11/subprocess.py", line 1880, in _execute_child
+            #   File "<frozen os>", line 629, in get_exec_path
+            # ImportError: sys.meta_path is None, Python is likely shutting down
+            #
+            try:
+                _proc = subprocess.Popen(
+                    ["pkill", "--uid", str(uid)], stderr=subprocess.PIPE
+                )
+                _proc.wait()
+            except ImportError:
+                pass
+            try:
+                _proc = subprocess.Popen(userdel + [name], stderr=subprocess.PIPE)
+                _proc.wait()
+            except ImportError:
+                pass
+
         self._lttng_home = None
 
     def __del__(self):
         self._lttng_home = None
 
     def __del__(self):
@@ -468,9 +1027,9 @@ class _Environment(logger._Logger):
 
 
 @contextlib.contextmanager
 
 
 @contextlib.contextmanager
-def test_environment(with_sessiond, log=None):
-    # type: (bool, Optional[Callable[[str], None]]) -> Iterator[_Environment]
-    env = _Environment(with_sessiond, log)
+def test_environment(with_sessiond, log=None, with_relayd=False):
+    # type: (bool, Optional[Callable[[str], None]], bool) -> Iterator[_Environment]
+    env = _Environment(with_sessiond, log, with_relayd)
     try:
         yield env
     finally:
     try:
         yield env
     finally:
This page took 0.038425 seconds and 4 git commands to generate.