from typing import Callable, Iterator, Optional, Tuple, List, Generator
import sys
import pathlib
+import pwd
+import random
import signal
+import socket
import subprocess
import shlex
import shutil
+import stat
+import string
import os
import queue
import tempfile
import threading
import contextlib
+import bt2
+
class TemporaryDirectory:
def __init__(self, prefix):
self._directory_path = tempfile.mkdtemp(prefix=prefix)
def __del__(self):
- shutil.rmtree(self._directory_path, ignore_errors=True)
+ if os.getenv("LTTNG_TEST_PRESERVE_TEST_ENV", "0") != "1":
+ shutil.rmtree(self._directory_path, ignore_errors=True)
@property
def path(self):
signal.signal(signal_number, original_handler)
+class _LiveViewer:
+ """
+ Create a babeltrace2 live viewer.
+ """
+
+ def __init__(
+ self,
+ environment, # type: Environment
+ session, # type: str
+ hostname=None, # type: Optional[str]
+ ):
+ self._environment = environment
+ self._session = session
+ self._hostname = hostname
+ if self._hostname is None:
+ self._hostname = socket.gethostname()
+ self._events = []
+
+ ctf_live_cc = bt2.find_plugin("ctf").source_component_classes["lttng-live"]
+ self._live_iterator = bt2.TraceCollectionMessageIterator(
+ bt2.ComponentSpec(
+ ctf_live_cc,
+ {
+ "inputs": [
+ "net://localhost:{}/host/{}/{}".format(
+ environment.lttng_relayd_live_port,
+ self._hostname,
+ session,
+ )
+ ],
+ "session-not-found-action": "end",
+ },
+ )
+ )
+
+ try:
+ # Cause the connection to be initiated since tests
+ # tend to wait for a viewer to be connected before proceeding.
+ msg = next(self._live_iterator)
+ self._events.append(msg)
+ except bt2.TryAgain:
+ pass
+
+ @property
+ def output(self):
+ return self._events
+
+ @property
+ def messages(self):
+ return [x for x in self._events if type(x) is bt2._EventMessageConst]
+
+ def _drain(self, retry=False):
+ while True:
+ try:
+ for msg in self._live_iterator:
+ self._events.append(msg)
+ break
+ except bt2.TryAgain as e:
+ if retry:
+ time.sleep(0.01)
+ continue
+ else:
+ break
+
+ def wait_until_connected(self, timeout=0):
+ ctf_live_cc = bt2.find_plugin("ctf").source_component_classes["lttng-live"]
+ self._environment._log(
+ "Checking for connected clients at 'net://localhost:{}'".format(
+ self._environment.lttng_relayd_live_port
+ )
+ )
+ query_executor = bt2.QueryExecutor(
+ ctf_live_cc,
+ "sessions",
+ params={
+ "url": "net://localhost:{}".format(
+ self._environment.lttng_relayd_live_port
+ )
+ },
+ )
+ connected = False
+ started = time.time()
+ while not connected:
+ try:
+ if timeout != 0 and (time.time() - started) > timeout:
+ raise RuntimeError(
+ "Timed out waiting for connected clients on session '{}' after {}s".format(
+ self._session, time.time() - started
+ )
+ )
+ query_result = query_executor.query()
+ except bt2._Error:
+ time.sleep(0.01)
+ continue
+ for live_session in query_result:
+ if (
+ live_session["session-name"] == self._session
+ and live_session["client-count"] >= 1
+ ):
+ connected = True
+ self._environment._log(
+ "Session '{}' has {} connected clients".format(
+ live_session["session-name"], live_session["client-count"]
+ )
+ )
+ break
+ return connected
+
+ def wait(self):
+ if self._live_iterator:
+ self._drain(retry=True)
+ del self._live_iterator
+ self._live_iterator = None
+
+ def __del__(self):
+ pass
+
+
class _WaitTraceTestApplication:
"""
Create an application that waits before tracing. This allows a test to
wait_time_between_events_us=0, # type: int
wait_before_exit=False, # type: bool
wait_before_exit_file_path=None, # type: Optional[pathlib.Path]
+ run_as=None, # type: Optional[str]
):
+ self._process = None
self._environment = environment # type: Environment
self._iteration_count = event_count
# File that the application will wait to see before tracing its events.
+ dir = self._compat_pathlike(environment.lttng_home_location)
+ if run_as is not None:
+ dir = os.path.join(dir, run_as)
self._app_start_tracing_file_path = pathlib.Path(
tempfile.mktemp(
prefix="app_",
suffix="_start_tracing",
- dir=self._compat_pathlike(environment.lttng_home_location),
+ dir=dir,
)
)
+
# File that the application will create when all events have been emitted.
self._app_tracing_done_file_path = pathlib.Path(
tempfile.mktemp(
prefix="app_",
suffix="_done_tracing",
- dir=self._compat_pathlike(environment.lttng_home_location),
+ dir=dir,
)
)
tempfile.mktemp(
prefix="app_",
suffix="_exit",
- dir=self._compat_pathlike(environment.lttng_home_location),
+ dir=dir,
)
)
-
+ self._wait_before_exit_file_path = wait_before_exit_file_path
self._has_returned = False
+ self._tracing_started = False
test_app_env = os.environ.copy()
test_app_env["LTTNG_HOME"] = str(environment.lttng_home_location)
app_ready_file_path = tempfile.mktemp(
prefix="app_",
suffix="_ready",
- dir=self._compat_pathlike(environment.lttng_home_location),
+ dir=dir,
) # type: str
test_app_args = [str(binary_path)]
test_app_args.extend(
["--sync-before-exit-touch", str(self._app_tracing_done_file_path)]
)
+ if wait_before_exit:
+ test_app_args.extend(
+ ["--sync-before-exit", str(self._wait_before_exit_file_path)]
+ )
if wait_time_between_events_us != 0:
test_app_args.extend(["--wait", str(wait_time_between_events_us)])
+ if run_as is not None:
+ # When running as root and reducing the permissions to run as another
+ # user, the test binary needs to be readable and executable by the
+ # world; however, the file may be in a deep path or on systems where
+ # we don't want to modify the filesystem state (eg. for a person who
+ # has downloaded and ran the tests manually).
+ # Therefore, the binary_path is copied to a temporary file in the
+ # `run_as` user's home directory
+ new_binary_path = os.path.join(
+ str(environment.lttng_home_location),
+ run_as,
+ os.path.basename(str(binary_path)),
+ )
+
+ if not os.path.exists(new_binary_path):
+ shutil.copy(str(binary_path), new_binary_path)
+
+ test_app_args[0] = new_binary_path
+
+ lib_dir = environment.lttng_home_location / run_as / "lib"
+ if not os.path.isdir(str(lib_dir)):
+ os.mkdir(str(lib_dir))
+ # When running dropping privileges, the libraries built in the
+ # root-owned directories may not be reachable and readable by
+ # the loader running as an unprivileged user. These should also be
+ # copied.
+ _ldd = subprocess.Popen(
+ ["ldd", new_binary_path],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
+ if _ldd.wait() != 0:
+ raise RuntimeError(
+ "Error while using `ldd` to determine test application dependencies: `{}`".format(
+ stderr.read().decode("utf-8")
+ )
+ )
+ libs = [
+ x.decode("utf-8").split(sep="=>") for x in _ldd.stdout.readlines()
+ ]
+ libs = [
+ x[1].split(sep=" ")[1]
+ for x in libs
+ if len(x) >= 2 and x[1].find("lttng") != -1
+ ]
+ for lib in libs:
+ shutil.copy(lib, lib_dir)
+
+ test_app_env["LD_LIBRARY_PATH"] = "{}:{}".format(
+ test_app_env["LD_LIBRARY_PATH"],
+ str(lib_dir),
+ )
+
+ # As of python 3.9, subprocess.Popen supports a user parameter which
+ # runs `setreuid()` before executing the proces and will be preferable
+ # when support for older python versions is no longer required.
+ test_app_args = [
+ "runuser",
+ "-u",
+ run_as,
+ "--",
+ ] + test_app_args
+
+ self._environment._log(
+ "Launching test application: '{}'".format(
+ self._compat_shlex_join(test_app_args)
+ )
+ )
self._process = subprocess.Popen(
test_app_args,
env=test_app_env,
stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
+ stderr=subprocess.PIPE,
) # type: subprocess.Popen
# Wait for the application to create the file indicating it has fully
if self._process.poll() is not None:
# Application has unexepectedly returned.
raise RuntimeError(
- "Test application has unexepectedly returned while waiting for synchronization file to be created: sync_file=`{sync_file}`, return_code=`{return_code}`".format(
- sync_file=sync_file_path, return_code=self._process.returncode
+ "Test application has unexepectedly returned while waiting for synchronization file to be created: sync_file=`{sync_file}`, return_code=`{return_code}`, output=`{output}`".format(
+ sync_file=sync_file_path,
+ return_code=self._process.returncode,
+ output=self._process.stderr.read().decode("utf-8"),
)
)
time.sleep(0.001)
+ def touch_exit_file(self):
+ open(self._compat_pathlike(self._wait_before_exit_file_path), mode="x")
+
def trace(self):
# type: () -> None
if self._process.poll() is not None:
)
)
open(self._compat_pathlike(self._app_start_tracing_file_path), mode="x")
+ self._environment._log("[{}] Tracing started".format(self.vpid))
+ self._tracing_started = True
def wait_for_tracing_done(self):
# type: () -> None
+ if not self._tracing_started:
+ raise RuntimeError("Tracing hasn't been started")
self._wait_for_file_to_be_created(self._app_tracing_done_file_path)
+ self._environment._log("[{}] Tracing done".format(self.vpid))
def wait_for_exit(self):
# type: () -> None
if self._process.wait() != 0:
raise RuntimeError(
- "Test application has exit with return code `{return_code}`".format(
- return_code=self._process.returncode
+ "Test application [{pid}] has exit with return code `{return_code}`, output=`{output}`".format(
+ pid=self.vpid,
+ return_code=self._process.returncode,
+ output=self._process.stderr.read().decode("utf-8"),
)
)
self._has_returned = True
else:
return str(path)
+ @staticmethod
+ def _compat_shlex_join(args):
+ # type: list[str] -> str
+ # shlex.join was added in python 3.8
+ return " ".join([shlex.quote(x) for x in args])
+
def __del__(self):
- if not self._has_returned:
+ if self._process is not None and not self._has_returned:
# This is potentially racy if the pid has been recycled. However,
# we can't use pidfd_open since it is only available in python >= 3.9.
self._process.kill()
for app in self._apps:
app.wait_for_tracing_done()
- open(
- _WaitTraceTestApplication._compat_pathlike(
- self._wait_before_exit_file_path
- ),
- mode="x",
- )
+ self._apps[0].touch_exit_file()
+
# Performed in two passes to allow tests to stress the unregistration of many applications.
# Waiting for each app to exit turn-by-turn would defeat the purpose here.
if wait_for_apps:
def __init__(self, binary_path, environment):
# type: (pathlib.Path, Environment)
+ self._process = None
self._environment = environment # type: Environment
self._has_returned = False
self._has_returned = True
def __del__(self):
- if not self._has_returned:
+ if self._process is not None and not self._has_returned:
# This is potentially racy if the pid has been recycled. However,
# we can't use pidfd_open since it is only available in python >= 3.9.
self._process.kill()
self._log("{prefix}: {line}".format(prefix=self._prefix, line=line))
+class SavingProcessOutputConsumer(ProcessOutputConsumer):
+ def __init__(self, process, name, log):
+ self._lines = []
+ super().__init__(process=process, name=name, log=log)
+
+ def run(self):
+ # type: () -> None
+ while self._process.poll() is None:
+ assert self._process.stdout
+ line = self._process.stdout.readline().decode("utf-8").replace("\n", "")
+ if len(line) != 0:
+ self._lines.append(line)
+ self._log("{prefix}: {line}".format(prefix=self._prefix, line=line))
+
+ @property
+ def output(self):
+ return self._lines
+
+
# Generate a temporary environment in which to execute a test.
class _Environment(logger._Logger):
def __init__(
signal.signal(signal.SIGTERM, self._handle_termination_signal)
signal.signal(signal.SIGINT, self._handle_termination_signal)
+ if os.getenv("LTTNG_TEST_VERBOSE_BABELTRACE", "0") == "1":
+ # @TODO: Is there a way to feed the logging output to
+ # the logger._Logger instead of directly to stderr?
+ bt2.set_global_logging_level(bt2.LoggingLevel.TRACE)
+
# Assumes the project's hierarchy to this file is:
# tests/utils/python/this_file
self._project_root = (
pathlib.Path(__file__).absolute().parents[3]
) # type: pathlib.Path
+
self._lttng_home = TemporaryDirectory(
"lttng_test_env_home"
- ) # type: Optional[TemporaryDirectory]
+ ) # type: Optional[str]
+ os.chmod(
+ str(self._lttng_home.path),
+ stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IROTH | stat.S_IXOTH,
+ )
self._relayd = (
self._launch_lttng_relayd() if with_relayd else None
self._launch_lttng_sessiond() if with_sessiond else None
) # type: Optional[subprocess.Popen[bytes]]
+ self._dummy_users = {} # type: Dictionary[int, string]
+ self._preserve_test_env = os.getenv("LTTNG_TEST_PRESERVE_TEST_ENV", "0") != "1"
+
@property
def lttng_home_location(self):
# type: () -> pathlib.Path
# type: () -> int
return 5402
+ @property
+ def preserve_test_env(self):
+ # type: () -> bool
+ return self._preserve_test_env
+
+ @staticmethod
+ def allows_destructive():
+ # type: () -> bool
+ return os.getenv("LTTNG_ENABLE_DESTRUCTIVE_TESTS", "") == "will-break-my-system"
+
+ def create_dummy_user(self):
+ # type: () -> (int, str)
+ # Create a dummy user. The uid and username will be eturned in a tuple.
+ # If the name already exists, an exception will be thrown.
+ # The users will be removed when the environment is cleaned up.
+ name = "".join([random.choice(string.ascii_lowercase) for x in range(10)])
+
+ try:
+ entry = pwd.getpwnam(name)
+ raise Exception("User '{}' already exists".format(name))
+ except KeyError:
+ pass
+
+ # Create user
+ proc = subprocess.Popen(
+ [
+ "useradd",
+ "--base-dir",
+ str(self._lttng_home.path),
+ "--create-home",
+ "--no-user-group",
+ "--shell",
+ "/bin/sh",
+ name,
+ ]
+ )
+ proc.wait()
+ if proc.returncode != 0:
+ raise Exception(
+ "Failed to create user '{}', useradd returned {}".format(
+ name, proc.returncode
+ )
+ )
+
+ entry = pwd.getpwnam(name)
+ self._dummy_users[entry[2]] = name
+ return (entry[2], name)
+
def create_temporary_directory(self, prefix=None):
# type: (Optional[str]) -> pathlib.Path
# Simply return a path that is contained within LTTNG_HOME; it will
self._log(
"Launching relayd with LTTNG_HOME='${}'".format(str(self._lttng_home.path))
)
+ verbose = []
+ if os.environ.get("LTTNG_TEST_VERBOSE_RELAYD") is not None:
+ verbose = ["-vvv"]
process = subprocess.Popen(
[
str(relayd_path),
"tcp://0.0.0.0:{}".format(self.lttng_relayd_data_port),
"-L",
"tcp://localhost:{}".format(self.lttng_relayd_live_port),
- ],
+ ]
+ + verbose,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=relayd_env,
home_dir=str(self._lttng_home.path)
)
)
+ verbose = []
+ if os.environ.get("LTTNG_TEST_VERBOSE_SESSIOND") is not None:
+ verbose = ["-vvv", "--verbose-consumer"]
process = subprocess.Popen(
[
str(sessiond_path),
consumerd_path_option_name,
str(consumerd_path),
"--sig-parent",
- ],
+ ]
+ + verbose,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=sessiond_env,
)
self._cleanup()
+ def launch_live_viewer(self, session, hostname=None):
+ # Make sure the relayd is ready
+ ready = False
+ ctf_live_cc = bt2.find_plugin("ctf").source_component_classes["lttng-live"]
+ query_executor = bt2.QueryExecutor(
+ ctf_live_cc,
+ "sessions",
+ params={"url": "net://localhost:{}".format(self.lttng_relayd_live_port)},
+ )
+ while not ready:
+ try:
+ query_result = query_executor.query()
+ except bt2._Error:
+ time.sleep(0.1)
+ continue
+ for live_session in query_result:
+ if live_session["session-name"] == session:
+ ready = True
+ self._log(
+ "Session '{}' is available at net://localhost:{}".format(
+ session, self.lttng_relayd_live_port
+ )
+ )
+ break
+ return _LiveViewer(self, session, hostname)
+
def launch_wait_trace_test_application(
self,
event_count, # type: int
wait_time_between_events_us=0,
wait_before_exit=False,
wait_before_exit_file_path=None,
+ run_as=None,
):
- # type: (int, int, bool, Optional[pathlib.Path]) -> _WaitTraceTestApplication
+ # type: (int, int, bool, Optional[pathlib.Path], Optional[str]) -> _WaitTraceTestApplication
"""
Launch an application that will wait before tracing `event_count` events.
"""
wait_time_between_events_us,
wait_before_exit,
wait_before_exit_file_path,
+ run_as,
)
- def launch_trace_test_constructor_application(self):
+ def launch_test_application(self, subpath):
# type () -> TraceTestApplication
"""
Launch an application that will trace from within constructors.
"""
return _TraceTestApplication(
- self._project_root
- / "tests"
- / "utils"
- / "testapp"
- / "gen-ust-events-constructor"
- / "gen-ust-events-constructor",
+ self._project_root / "tests" / "utils" / "testapp" / subpath,
self,
)
+ def _terminate_relayd(self):
+ if self._relayd and self._relayd.poll() is None:
+ self._relayd.terminate()
+ self._relayd.wait()
+ if self._relayd_output_consumer:
+ self._relayd_output_consumer.join()
+ self._relayd_output_consumer = None
+ self._log("Relayd killed")
+ self._relayd = None
+
# Clean-up managed processes
def _cleanup(self):
# type: () -> None
self._log("Session daemon killed")
self._sessiond = None
- if self._relayd and self._relayd.poll() is None:
- self._relayd.terminate()
- self._relayd.wait()
- if self._relayd_output_consumer:
- self._relayd_output_consumer.join()
- self._relayd_output_consumer = None
- self._log("Relayd killed")
- self._relayd = None
+ self._terminate_relayd()
+
+ # The user accounts will always be deleted, but the home directories will
+ # be retained unless the user has opted to preserve the test environment.
+ userdel = ["userdel"]
+ if not self.preserve_test_env:
+ userdel += ["--remove"]
+ for uid, name in self._dummy_users.items():
+ # When subprocess is run during the interpreter teardown, ImportError
+ # may be raised; however, the commands seem to execute correctly.
+ # Eg.
+ #
+ # Exception ignored in: <function _Environment.__del__ at 0x7f2d62e3b9c0>
+ # Traceback (most recent call last):
+ # File "tests/utils/lttngtest/environment.py", line 1024, in __del__
+ # File "tests/utils/lttngtest/environment.py", line 1016, in _cleanup
+ # File "/usr/lib/python3.11/subprocess.py", line 1026, in __init__
+ # File "/usr/lib/python3.11/subprocess.py", line 1880, in _execute_child
+ # File "<frozen os>", line 629, in get_exec_path
+ # ImportError: sys.meta_path is None, Python is likely shutting down
+ #
+ try:
+ _proc = subprocess.Popen(
+ ["pkill", "--uid", str(uid)], stderr=subprocess.PIPE
+ )
+ _proc.wait()
+ except ImportError:
+ pass
+ try:
+ _proc = subprocess.Popen(userdel + [name], stderr=subprocess.PIPE)
+ _proc.wait()
+ except ImportError:
+ pass
self._lttng_home = None