3 # Copyright (C) 2022 Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 # SPDX-License-Identifier: GPL-2.0-only
7 from concurrent
.futures
import process
8 from . import lttngctl
, logger
, environment
11 from typing
import Callable
, Optional
, Type
, Union
17 Implementation of the lttngctl interface based on the `lttng` command line client.
21 class Unsupported(lttngctl
.ControlException
):
22 def __init__(self
, msg
: str):
26 def _get_domain_option_name(domain
: lttngctl
.TracingDomain
) -> str:
27 if domain
== lttngctl
.TracingDomain
.User
:
29 elif domain
== lttngctl
.TracingDomain
.Kernel
:
31 elif domain
== lttngctl
.TracingDomain
.Log4j
:
33 elif domain
== lttngctl
.TracingDomain
.JUL
:
35 elif domain
== lttngctl
.TracingDomain
.Python
:
38 raise Unsupported("Domain `{domain_name}` is not supported by the LTTng client")
41 def _get_context_type_name(context
: lttngctl
.ContextType
) -> str:
42 if isinstance(context
, lttngctl
.VgidContextType
):
44 elif isinstance(context
, lttngctl
.VuidContextType
):
46 elif isinstance(context
, lttngctl
.VpidContextType
):
48 elif isinstance(context
, lttngctl
.JavaApplicationContextType
):
49 return "$app.{retriever}:{field}".format(
50 retriever
=context
.retriever_name
, field
=context
.field_name
54 "Context `{context_name}` is not supported by the LTTng client".format(
55 type(context
).__name
__
60 class _Channel(lttngctl
.Channel
):
63 client
: "LTTngClient",
65 domain
: lttngctl
.TracingDomain
,
68 self
._client
: LTTngClient
= client
69 self
._name
: str = name
70 self
._domain
: lttngctl
.TracingDomain
= domain
71 self
._session
: _Session
= session
73 def add_context(self
, context_type
: lttngctl
.ContextType
) -> None:
74 domain_option_name
= _get_domain_option_name(self
.domain
)
75 context_type_name
= _get_context_type_name(context_type
)
76 self
._client
._run
_cmd
(
77 "add-context --{domain_option_name} --type {context_type_name}".format(
78 domain_option_name
=domain_option_name
,
79 context_type_name
=context_type_name
,
83 def add_recording_rule(self
, rule
: Type
[lttngctl
.EventRule
]) -> None:
85 "enable-event --session {session_name} --channel {channel_name}".format(
86 session_name
=self
._session
.name
, channel_name
=self
.name
89 if isinstance(rule
, lttngctl
.TracepointEventRule
):
90 domain_option_name
= (
92 if isinstance(rule
, lttngctl
.UserTracepointEventRule
)
95 client_args
= client_args
+ " --{domain_option_name}".format(
96 domain_option_name
=domain_option_name
100 client_args
= client_args
+ " " + rule
.name_pattern
102 client_args
= client_args
+ " --all"
104 if rule
.filter_expression
:
105 client_args
= client_args
+ " " + rule
.filter_expression
107 if rule
.log_level_rule
:
108 if isinstance(rule
.log_level_rule
, lttngctl
.LogLevelRuleAsSevereAs
):
109 client_args
= client_args
+ " --loglevel {log_level}".format(
110 log_level
=rule
.log_level_rule
.level
112 elif isinstance(rule
.log_level_rule
, lttngctl
.LogLevelRuleExactly
):
113 client_args
= client_args
+ " --loglevel-only {log_level}".format(
114 log_level
=rule
.log_level_rule
.level
118 "Unsupported log level rule type `{log_level_rule_type}`".format(
119 log_level_rule_type
=type(rule
.log_level_rule
).__name
__
123 if rule
.name_pattern_exclusions
:
124 client_args
= client_args
+ " --exclude "
125 for idx
, pattern
in enumerate(rule
.name_pattern_exclusions
):
127 client_args
= client_args
+ ","
128 client_args
= client_args
+ pattern
131 "event rule type `{event_rule_type}` is unsupported by LTTng client".format(
132 event_rule_type
=type(rule
).__name
__
136 self
._client
._run
_cmd
(client_args
)
139 def name(self
) -> str:
143 def domain(self
) -> lttngctl
.TracingDomain
:
147 class _ProcessAttribute(enum
.Enum
):
149 VPID
= (enum
.auto(),)
151 VUID
= (enum
.auto(),)
153 VGID
= (enum
.auto(),)
156 def _get_process_attribute_option_name(attribute
: _ProcessAttribute
) -> str:
158 _ProcessAttribute
.PID
: "pid",
159 _ProcessAttribute
.VPID
: "vpid",
160 _ProcessAttribute
.UID
: "uid",
161 _ProcessAttribute
.VUID
: "vuid",
162 _ProcessAttribute
.GID
: "gid",
163 _ProcessAttribute
.VGID
: "vgid",
167 class _ProcessAttributeTracker(lttngctl
.ProcessAttributeTracker
):
170 client
: "LTTngClient",
171 attribute
: _ProcessAttribute
,
172 domain
: lttngctl
.TracingDomain
,
175 self
._client
: LTTngClient
= client
176 self
._tracked
_attribute
: _ProcessAttribute
= attribute
177 self
._domain
: lttngctl
.TracingDomain
= domain
178 self
._session
: "_Session" = session
179 if attribute
== _ProcessAttribute
.PID
or attribute
== _ProcessAttribute
.VPID
:
180 self
._allowed
_value
_types
: list[type] = [int, str]
182 self
._allowed
_value
_types
: list[type] = [int]
184 def _call_client(self
, cmd_name
: str, value
: Union
[int, str]) -> None:
185 if type(value
) not in self
._allowed
_value
_types
:
187 "Value of type `{value_type}` is not allowed for process attribute {attribute_name}".format(
188 value_type
=type(value
).__name
__,
189 attribute_name
=self
._tracked
_attribute
.name
,
193 process_attribute_option_name
= _get_process_attribute_option_name(
194 self
._tracked
_attribute
196 domain_name
= _get_domain_option_name(self
._domain
)
197 self
._client
._run
_cmd
(
198 "{cmd_name} --session {session_name} --{domain_name} --{tracked_attribute_name} {value}".format(
200 session_name
=self
._session
.name
,
201 domain_name
=domain_name
,
202 tracked_attribute_name
=process_attribute_option_name
,
207 def track(self
, value
: Union
[int, str]) -> None:
208 self
._call
_client
("track", value
)
210 def untrack(self
, value
: Union
[int, str]) -> None:
211 self
._call
_client
("untrack", value
)
214 class _Session(lttngctl
.Session
):
217 client
: "LTTngClient",
219 output
: Optional
[Type
[lttngctl
.SessionOutputLocation
]],
221 self
._client
: LTTngClient
= client
222 self
._name
: str = name
223 self
._output
: Optional
[Type
[lttngctl
.SessionOutputLocation
]] = output
226 def name(self
) -> str:
230 self
, domain
: lttngctl
.TracingDomain
, channel_name
: Optional
[str] = None
231 ) -> lttngctl
.Channel
:
232 channel_name
= lttngctl
.Channel
._generate
_name
()
233 domain_option_name
= _get_domain_option_name(domain
)
234 self
._client
._run
_cmd
(
235 "enable-channel --{domain_name} {channel_name}".format(
236 domain_name
=domain_option_name
, channel_name
=channel_name
239 return _Channel(self
._client
, channel_name
, domain
, self
)
241 def add_context(self
, context_type
: lttngctl
.ContextType
) -> None:
245 def output(self
) -> Optional
[Type
[lttngctl
.SessionOutputLocation
]]:
248 def start(self
) -> None:
249 self
._client
._run
_cmd
("start {session_name}".format(session_name
=self
.name
))
251 def stop(self
) -> None:
252 self
._client
._run
_cmd
("stop {session_name}".format(session_name
=self
.name
))
254 def destroy(self
) -> None:
255 self
._client
._run
_cmd
("destroy {session_name}".format(session_name
=self
.name
))
258 def kernel_pid_process_attribute_tracker(
260 ) -> Type
[lttngctl
.ProcessIDProcessAttributeTracker
]:
261 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.PID
, lttngctl
.TracingDomain
.Kernel
, self
) # type: ignore
264 def kernel_vpid_process_attribute_tracker(
266 ) -> Type
[lttngctl
.VirtualProcessIDProcessAttributeTracker
]:
267 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.VPID
, lttngctl
.TracingDomain
.Kernel
, self
) # type: ignore
270 def user_vpid_process_attribute_tracker(
272 ) -> Type
[lttngctl
.VirtualProcessIDProcessAttributeTracker
]:
273 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.VPID
, lttngctl
.TracingDomain
.User
, self
) # type: ignore
276 def kernel_gid_process_attribute_tracker(
278 ) -> Type
[lttngctl
.GroupIDProcessAttributeTracker
]:
279 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.GID
, lttngctl
.TracingDomain
.Kernel
, self
) # type: ignore
282 def kernel_vgid_process_attribute_tracker(
284 ) -> Type
[lttngctl
.VirtualGroupIDProcessAttributeTracker
]:
285 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.VGID
, lttngctl
.TracingDomain
.Kernel
, self
) # type: ignore
288 def user_vgid_process_attribute_tracker(
290 ) -> Type
[lttngctl
.VirtualGroupIDProcessAttributeTracker
]:
291 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.VGID
, lttngctl
.TracingDomain
.User
, self
) # type: ignore
294 def kernel_uid_process_attribute_tracker(
296 ) -> Type
[lttngctl
.UserIDProcessAttributeTracker
]:
297 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.UID
, lttngctl
.TracingDomain
.Kernel
, self
) # type: ignore
300 def kernel_vuid_process_attribute_tracker(
302 ) -> Type
[lttngctl
.VirtualUserIDProcessAttributeTracker
]:
303 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.VUID
, lttngctl
.TracingDomain
.Kernel
, self
) # type: ignore
306 def user_vuid_process_attribute_tracker(
308 ) -> Type
[lttngctl
.VirtualUserIDProcessAttributeTracker
]:
309 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.VUID
, lttngctl
.TracingDomain
.User
, self
) # type: ignore
312 class LTTngClientError(lttngctl
.ControlException
):
313 def __init__(self
, command_args
: str, error_output
: str):
314 self
._command
_args
: str = command_args
315 self
._output
: str = error_output
318 class LTTngClient(logger
._Logger
, lttngctl
.Controller
):
320 Implementation of a LTTngCtl Controller that uses the `lttng` client as a back-end.
325 test_environment
: environment
._Environment
,
326 log
: Optional
[Callable
[[str], None]],
328 logger
._Logger
.__init
__(self
, log
)
329 self
._environment
: environment
._Environment
= test_environment
331 def _run_cmd(self
, command_args
: str) -> None:
333 Invoke the `lttng` client with a set of arguments. The command is
334 executed in the context of the client's test environment.
336 args
: list[str] = [str(self
._environment
.lttng_client_path
)]
337 args
.extend(shlex
.split(command_args
))
339 self
._log
("lttng {command_args}".format(command_args
=command_args
))
341 client_env
: dict[str, str] = os
.environ
.copy()
342 client_env
["LTTNG_HOME"] = str(self
._environment
.lttng_home_location
)
344 process
= subprocess
.Popen(
345 args
, stdout
=subprocess
.PIPE
, stderr
=subprocess
.STDOUT
, env
=client_env
348 out
= process
.communicate()[0]
350 if process
.returncode
!= 0:
351 decoded_output
= out
.decode("utf-8")
352 for error_line
in decoded_output
.splitlines():
353 self
._log
(error_line
)
354 raise LTTngClientError(command_args
, decoded_output
)
358 name
: Optional
[str] = None,
359 output
: Optional
[lttngctl
.SessionOutputLocation
] = None,
360 ) -> lttngctl
.Session
:
361 name
= name
if name
else lttngctl
.Session
._generate
_name
()
363 if isinstance(output
, lttngctl
.LocalSessionOutputLocation
):
364 output_option
= "--output {output_path}".format(output_path
=output
.path
)
366 output_option
= "--no-output"
368 raise TypeError("LTTngClient only supports local or no output")
371 "create {session_name} {output_option}".format(
372 session_name
=name
, output_option
=output_option
375 return _Session(self
, name
, output
)