#
# SPDX-License-Identifier: GPL-2.0-only
-from concurrent.futures import process
+
from . import lttngctl, logger, environment
import pathlib
import os
import shlex
import subprocess
import enum
+import xml.etree.ElementTree
"""
Implementation of the lttngctl interface based on the `lttng` command line client.
class Unsupported(lttngctl.ControlException):
- def __init__(self, msg: str):
+ def __init__(self, msg):
+ # type: (str) -> None
super().__init__(msg)
-def _get_domain_option_name(domain: lttngctl.TracingDomain) -> str:
+class InvalidMI(lttngctl.ControlException):
+ def __init__(self, msg):
+ # type: (str) -> None
+ super().__init__(msg)
+
+
+def _get_domain_option_name(domain):
+ # type: (lttngctl.TracingDomain) -> str
if domain == lttngctl.TracingDomain.User:
return "userspace"
elif domain == lttngctl.TracingDomain.Kernel:
raise Unsupported("Domain `{domain_name}` is not supported by the LTTng client")
-def _get_context_type_name(context: lttngctl.ContextType) -> str:
+def _get_context_type_name(context):
+ # type: (lttngctl.ContextType) -> str
if isinstance(context, lttngctl.VgidContextType):
return "vgid"
elif isinstance(context, lttngctl.VuidContextType):
class _Channel(lttngctl.Channel):
def __init__(
self,
- client: "LTTngClient",
- name: str,
- domain: lttngctl.TracingDomain,
- session: "_Session",
+ client, # type: LTTngClient
+ name, # type: str
+ domain, # type: lttngctl.TracingDomain
+ session, # type: _Session
):
- self._client: LTTngClient = client
- self._name: str = name
- self._domain: lttngctl.TracingDomain = domain
- self._session: _Session = session
+ self._client = client # type: LTTngClient
+ self._name = name # type: str
+ self._domain = domain # type: lttngctl.TracingDomain
+ self._session = session # type: _Session
- def add_context(self, context_type: lttngctl.ContextType) -> None:
+ def add_context(self, context_type):
+ # type: (lttngctl.ContextType) -> None
domain_option_name = _get_domain_option_name(self.domain)
context_type_name = _get_context_type_name(context_type)
self._client._run_cmd(
- "add-context --{domain_option_name} --type {context_type_name}".format(
+ "add-context --{domain_option_name} --channel '{channel_name}' --type {context_type_name}".format(
domain_option_name=domain_option_name,
+ channel_name=self.name,
context_type_name=context_type_name,
)
)
- def add_recording_rule(self, rule: Type[lttngctl.EventRule]) -> None:
+ def add_recording_rule(self, rule):
+ # type: (Type[lttngctl.EventRule]) -> None
client_args = (
"enable-event --session {session_name} --channel {channel_name}".format(
session_name=self._session.name, channel_name=self.name
self._client._run_cmd(client_args)
@property
- def name(self) -> str:
+ def name(self):
+ # type: () -> str
return self._name
@property
- def domain(self) -> lttngctl.TracingDomain:
+ def domain(self):
+ # type: () -> lttngctl.TracingDomain
return self._domain
+@enum.unique
class _ProcessAttribute(enum.Enum):
- PID = (enum.auto(),)
- VPID = (enum.auto(),)
- UID = (enum.auto(),)
- VUID = (enum.auto(),)
- GID = (enum.auto(),)
- VGID = (enum.auto(),)
+ PID = "Process ID"
+ VPID = "Virtual Process ID"
+ UID = "User ID"
+ VUID = "Virtual User ID"
+ GID = "Group ID"
+ VGID = "Virtual Group ID"
+
+ def __repr__(self):
+ return "<%s.%s>" % (self.__class__.__name__, self.name)
-def _get_process_attribute_option_name(attribute: _ProcessAttribute) -> str:
+def _get_process_attribute_option_name(attribute):
+ # type: (_ProcessAttribute) -> str
return {
_ProcessAttribute.PID: "pid",
_ProcessAttribute.VPID: "vpid",
class _ProcessAttributeTracker(lttngctl.ProcessAttributeTracker):
def __init__(
self,
- client: "LTTngClient",
- attribute: _ProcessAttribute,
- domain: lttngctl.TracingDomain,
- session: "_Session",
+ client, # type: LTTngClient
+ attribute, # type: _ProcessAttribute
+ domain, # type: lttngctl.TracingDomain
+ session, # type: _Session
):
- self._client: LTTngClient = client
- self._tracked_attribute: _ProcessAttribute = attribute
- self._domain: lttngctl.TracingDomain = domain
- self._session: "_Session" = session
+ self._client = client # type: LTTngClient
+ self._tracked_attribute = attribute # type: _ProcessAttribute
+ self._domain = domain # type: lttngctl.TracingDomain
+ self._session = session # type: _Session
if attribute == _ProcessAttribute.PID or attribute == _ProcessAttribute.VPID:
- self._allowed_value_types: list[type] = [int, str]
+ self._allowed_value_types = [int, str] # type: list[type]
else:
- self._allowed_value_types: list[type] = [int]
+ self._allowed_value_types = [int] # type: list[type]
- def _call_client(self, cmd_name: str, value: Union[int, str]) -> None:
+ def _call_client(self, cmd_name, value):
+ # type: (str, Union[int, str]) -> None
if type(value) not in self._allowed_value_types:
raise TypeError(
"Value of type `{value_type}` is not allowed for process attribute {attribute_name}".format(
)
domain_name = _get_domain_option_name(self._domain)
self._client._run_cmd(
- "{cmd_name} --session {session_name} --{domain_name} --{tracked_attribute_name} {value}".format(
+ "{cmd_name} --session '{session_name}' --{domain_name} --{tracked_attribute_name} {value}".format(
cmd_name=cmd_name,
session_name=self._session.name,
domain_name=domain_name,
)
)
- def track(self, value: Union[int, str]) -> None:
+ def track(self, value):
+ # type: (Union[int, str]) -> None
self._call_client("track", value)
- def untrack(self, value: Union[int, str]) -> None:
+ def untrack(self, value):
+ # type: (Union[int, str]) -> None
self._call_client("untrack", value)
class _Session(lttngctl.Session):
def __init__(
self,
- client: "LTTngClient",
- name: str,
- output: Optional[Type[lttngctl.SessionOutputLocation]],
+ client, # type: LTTngClient
+ name, # type: str
+ output, # type: Optional[lttngctl.SessionOutputLocation]
):
- self._client: LTTngClient = client
- self._name: str = name
- self._output: Optional[Type[lttngctl.SessionOutputLocation]] = output
+ self._client = client # type: LTTngClient
+ self._name = name # type: str
+ self._output = output # type: Optional[lttngctl.SessionOutputLocation]
@property
- def name(self) -> str:
+ def name(self):
+ # type: () -> str
return self._name
def add_channel(
- self, domain: lttngctl.TracingDomain, channel_name: Optional[str] = None
- ) -> lttngctl.Channel:
+ self,
+ domain,
+ channel_name=None,
+ buffer_sharing_policy=lttngctl.BufferSharingPolicy.PerUID,
+ ):
+ # type: (lttngctl.TracingDomain, Optional[str], lttngctl.BufferSharingPolicy) -> lttngctl.Channel
channel_name = lttngctl.Channel._generate_name()
domain_option_name = _get_domain_option_name(domain)
self._client._run_cmd(
- "enable-channel --{domain_name} {channel_name}".format(
- domain_name=domain_option_name, channel_name=channel_name
+ "enable-channel --session '{session_name}' --{domain_name} '{channel_name}' {buffer_sharing_policy}".format(
+ session_name=self.name,
+ domain_name=domain_option_name,
+ channel_name=channel_name,
+ buffer_sharing_policy="--buffers-uid"
+ if buffer_sharing_policy == lttngctl.BufferSharingPolicy.PerUID
+ else "--buffers-pid",
)
)
return _Channel(self._client, channel_name, domain, self)
- def add_context(self, context_type: lttngctl.ContextType) -> None:
+ def add_context(self, context_type):
+ # type: (lttngctl.ContextType) -> None
pass
@property
- def output(self) -> Optional[Type[lttngctl.SessionOutputLocation]]:
- return self._output
+ def output(self):
+ # type: () -> "Optional[Type[lttngctl.SessionOutputLocation]]"
+ return self._output # type: ignore
- def start(self) -> None:
- self._client._run_cmd("start {session_name}".format(session_name=self.name))
+ def start(self):
+ # type: () -> None
+ self._client._run_cmd("start '{session_name}'".format(session_name=self.name))
- def stop(self) -> None:
- self._client._run_cmd("stop {session_name}".format(session_name=self.name))
+ def stop(self):
+ # type: () -> None
+ self._client._run_cmd("stop '{session_name}'".format(session_name=self.name))
- def destroy(self) -> None:
- self._client._run_cmd("destroy {session_name}".format(session_name=self.name))
+ def destroy(self):
+ # type: () -> None
+ self._client._run_cmd("destroy '{session_name}'".format(session_name=self.name))
+
+ def rotate(self, wait=True):
+ # type: (bool) -> None
+ self._client.rotate_session_by_name(self.name, wait)
@property
- def kernel_pid_process_attribute_tracker(
- self,
- ) -> Type[lttngctl.ProcessIDProcessAttributeTracker]:
+ def is_active(self):
+ # type: () -> bool
+ list_session_xml = self._client._run_cmd(
+ "list '{session_name}'".format(session_name=self.name),
+ LTTngClient.CommandOutputFormat.MI_XML,
+ )
+
+ root = xml.etree.ElementTree.fromstring(list_session_xml)
+ command_output = LTTngClient._mi_find_in_element(root, "output")
+ sessions = LTTngClient._mi_find_in_element(command_output, "sessions")
+ session_mi = LTTngClient._mi_find_in_element(sessions, "session")
+
+ enabled_text = LTTngClient._mi_find_in_element(session_mi, "enabled").text
+ if enabled_text not in ["true", "false"]:
+ raise InvalidMI(
+ "Expected boolean value in element '{}': value='{}'".format(
+ session_mi.tag, enabled_text
+ )
+ )
+
+ return enabled_text == "true"
+
+ @property
+ def kernel_pid_process_attribute_tracker(self):
+ # type: () -> Type[lttngctl.ProcessIDProcessAttributeTracker]
return _ProcessAttributeTracker(self._client, _ProcessAttribute.PID, lttngctl.TracingDomain.Kernel, self) # type: ignore
@property
- def kernel_vpid_process_attribute_tracker(
- self,
- ) -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]:
+ def kernel_vpid_process_attribute_tracker(self):
+ # type: () -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]
return _ProcessAttributeTracker(self._client, _ProcessAttribute.VPID, lttngctl.TracingDomain.Kernel, self) # type: ignore
@property
- def user_vpid_process_attribute_tracker(
- self,
- ) -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]:
+ def user_vpid_process_attribute_tracker(self):
+ # type: () -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]
return _ProcessAttributeTracker(self._client, _ProcessAttribute.VPID, lttngctl.TracingDomain.User, self) # type: ignore
@property
- def kernel_gid_process_attribute_tracker(
- self,
- ) -> Type[lttngctl.GroupIDProcessAttributeTracker]:
+ def kernel_gid_process_attribute_tracker(self):
+ # type: () -> Type[lttngctl.GroupIDProcessAttributeTracker]
return _ProcessAttributeTracker(self._client, _ProcessAttribute.GID, lttngctl.TracingDomain.Kernel, self) # type: ignore
@property
- def kernel_vgid_process_attribute_tracker(
- self,
- ) -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]:
+ def kernel_vgid_process_attribute_tracker(self):
+ # type: () -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]
return _ProcessAttributeTracker(self._client, _ProcessAttribute.VGID, lttngctl.TracingDomain.Kernel, self) # type: ignore
@property
- def user_vgid_process_attribute_tracker(
- self,
- ) -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]:
+ def user_vgid_process_attribute_tracker(self):
+ # type: () -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]
return _ProcessAttributeTracker(self._client, _ProcessAttribute.VGID, lttngctl.TracingDomain.User, self) # type: ignore
@property
- def kernel_uid_process_attribute_tracker(
- self,
- ) -> Type[lttngctl.UserIDProcessAttributeTracker]:
+ def kernel_uid_process_attribute_tracker(self):
+ # type: () -> Type[lttngctl.UserIDProcessAttributeTracker]
return _ProcessAttributeTracker(self._client, _ProcessAttribute.UID, lttngctl.TracingDomain.Kernel, self) # type: ignore
@property
- def kernel_vuid_process_attribute_tracker(
- self,
- ) -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]:
+ def kernel_vuid_process_attribute_tracker(self):
+ # type: () -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]
return _ProcessAttributeTracker(self._client, _ProcessAttribute.VUID, lttngctl.TracingDomain.Kernel, self) # type: ignore
@property
- def user_vuid_process_attribute_tracker(
- self,
- ) -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]:
+ def user_vuid_process_attribute_tracker(self):
+ # type: () -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]
return _ProcessAttributeTracker(self._client, _ProcessAttribute.VUID, lttngctl.TracingDomain.User, self) # type: ignore
class LTTngClientError(lttngctl.ControlException):
- def __init__(self, command_args: str, error_output: str):
- self._command_args: str = command_args
- self._output: str = error_output
+ def __init__(
+ self,
+ command_args, # type: str
+ error_output, # type: str
+ ):
+ self._command_args = command_args # type: str
+ self._output = error_output # type: str
class LTTngClient(logger._Logger, lttngctl.Controller):
Implementation of a LTTngCtl Controller that uses the `lttng` client as a back-end.
"""
+ class CommandOutputFormat(enum.Enum):
+ MI_XML = 0
+ HUMAN = 1
+
+ _MI_NS = "{https://lttng.org/xml/ns/lttng-mi}"
+
def __init__(
self,
- test_environment: environment._Environment,
- log: Optional[Callable[[str], None]],
+ test_environment, # type: environment._Environment
+ log, # type: Optional[Callable[[str], None]]
):
logger._Logger.__init__(self, log)
- self._environment: environment._Environment = test_environment
+ self._environment = test_environment # type: environment._Environment
- def _run_cmd(self, command_args: str) -> None:
+ @staticmethod
+ def _namespaced_mi_element(property):
+ # type: (str) -> str
+ return LTTngClient._MI_NS + property
+
+ def _run_cmd(self, command_args, output_format=CommandOutputFormat.MI_XML):
+ # type: (str, CommandOutputFormat) -> str
"""
Invoke the `lttng` client with a set of arguments. The command is
executed in the context of the client's test environment.
"""
- args: list[str] = [str(self._environment.lttng_client_path)]
+ args = [str(self._environment.lttng_client_path)] # type: list[str]
+ if output_format == LTTngClient.CommandOutputFormat.MI_XML:
+ args.extend(["--mi", "xml"])
+
args.extend(shlex.split(command_args))
self._log("lttng {command_args}".format(command_args=command_args))
- client_env: dict[str, str] = os.environ.copy()
+ client_env = os.environ.copy() # type: dict[str, str]
client_env["LTTNG_HOME"] = str(self._environment.lttng_home_location)
process = subprocess.Popen(
for error_line in decoded_output.splitlines():
self._log(error_line)
raise LTTngClientError(command_args, decoded_output)
+ else:
+ return out.decode("utf-8")
- def create_session(
- self,
- name: Optional[str] = None,
- output: Optional[lttngctl.SessionOutputLocation] = None,
- ) -> lttngctl.Session:
+ def create_session(self, name=None, output=None):
+ # type: (Optional[str], Optional[lttngctl.SessionOutputLocation]) -> lttngctl.Session
name = name if name else lttngctl.Session._generate_name()
if isinstance(output, lttngctl.LocalSessionOutputLocation):
raise TypeError("LTTngClient only supports local or no output")
self._run_cmd(
- "create {session_name} {output_option}".format(
+ "create '{session_name}' {output_option}".format(
session_name=name, output_option=output_option
)
)
return _Session(self, name, output)
+
+ def start_session_by_name(self, name):
+ # type: (str) -> None
+ self._run_cmd("start '{session_name}'".format(session_name=name))
+
+ def start_session_by_glob_pattern(self, pattern):
+ # type: (str) -> None
+ self._run_cmd("start --glob '{pattern}'".format(pattern=pattern))
+
+ def start_sessions_all(self):
+ # type: () -> None
+ self._run_cmd("start --all")
+
+ def stop_session_by_name(self, name):
+ # type: (str) -> None
+ self._run_cmd("stop '{session_name}'".format(session_name=name))
+
+ def stop_session_by_glob_pattern(self, pattern):
+ # type: (str) -> None
+ self._run_cmd("stop --glob '{pattern}'".format(pattern=pattern))
+
+ def stop_sessions_all(self):
+ # type: () -> None
+ self._run_cmd("stop --all")
+
+ def destroy_session_by_name(self, name):
+ # type: (str) -> None
+ self._run_cmd("destroy '{session_name}'".format(session_name=name))
+
+ def destroy_session_by_glob_pattern(self, pattern):
+ # type: (str) -> None
+ self._run_cmd("destroy --glob '{pattern}'".format(pattern=pattern))
+
+ def destroy_sessions_all(self):
+ # type: () -> None
+ self._run_cmd("destroy --all")
+
+ def rotate_session_by_name(self, name, wait=True):
+ self._run_cmd(
+ "rotate '{session_name}' {wait_option}".format(
+ session_name=name, wait_option="-n" if wait is False else ""
+ )
+ )
+
+ def schedule_size_based_rotation(self, name, size_bytes):
+ # type (str, int) -> None
+ self._run_cmd(
+ "enable-rotation --session '{session_name}' --size {size}".format(
+ session_name=name, size=size_bytes
+ )
+ )
+
+ def schedule_time_based_rotation(self, name, period_seconds):
+ # type (str, int) -> None
+ self._run_cmd(
+ "enable-rotation --session '{session_name}' --timer {period_seconds}s".format(
+ session_name=name, period_seconds=period_seconds
+ )
+ )
+
+ @staticmethod
+ def _mi_find_in_element(element, sub_element_name):
+ # type: (xml.etree.ElementTree.Element, str) -> xml.etree.ElementTree.Element
+ result = element.find(LTTngClient._namespaced_mi_element(sub_element_name))
+ if result is None:
+ raise InvalidMI(
+ "Failed to find element '{}' within command MI element '{}'".format(
+ element.tag, sub_element_name
+ )
+ )
+
+ return result
+
+ def list_sessions(self):
+ # type () -> List[Session]
+ list_sessions_xml = self._run_cmd(
+ "list", LTTngClient.CommandOutputFormat.MI_XML
+ )
+
+ root = xml.etree.ElementTree.fromstring(list_sessions_xml)
+ command_output = self._mi_find_in_element(root, "output")
+ sessions = self._mi_find_in_element(command_output, "sessions")
+
+ ctl_sessions = [] # type: list[lttngctl.Session]
+
+ for session_mi in sessions:
+ name = self._mi_find_in_element(session_mi, "name").text
+ path = self._mi_find_in_element(session_mi, "path").text
+
+ if name is None:
+ raise InvalidMI(
+ "Invalid empty 'name' element in '{}'".format(session_mi.tag)
+ )
+ if path is None:
+ raise InvalidMI(
+ "Invalid empty 'path' element in '{}'".format(session_mi.tag)
+ )
+ if not path.startswith("/"):
+ raise Unsupported(
+ "{} does not support non-local session outputs".format(type(self))
+ )
+
+ ctl_sessions.append(
+ _Session(self, name, lttngctl.LocalSessionOutputLocation(path))
+ )
+
+ return ctl_sessions