Change-Id: I8974e9955dd4200b50b954697a03f428e4474c89
Signed-off-by: Kienan Stewart <kstewart@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
15 files changed:
# This error will be raised is something goes wrong
class LTTngError(Exception):
# This error will be raised is something goes wrong
class LTTngError(Exception):
- def __init__(self, value):
- self.value = value
- def __str__(self):
- return repr(self.value)
+ def __init__(self, value):
+ self.value = value
+
+ def __str__(self):
+ return repr(self.value)
+
-#Setting up the domain to use
+# Setting up the domain to use
dom = Domain()
dom.type = DOMAIN_KERNEL
dom = Domain()
dom.type = DOMAIN_KERNEL
-#Setting up a channel to use
+# Setting up a channel to use
channel = Channel()
channel.name = "mychan"
channel.attr.overwrite = 0
channel = Channel()
channel.name = "mychan"
channel.attr.overwrite = 0
channel.attr.read_timer_interval = 200
channel.attr.output = EVENT_SPLICE
channel.attr.read_timer_interval = 200
channel.attr.output = EVENT_SPLICE
-#Setting up some events that will be used
+# Setting up some events that will be used
event = Event()
event.type = EVENT_TRACEPOINT
event.loglevel_type = EVENT_LOGLEVEL_ALL
event = Event()
event.type = EVENT_TRACEPOINT
event.loglevel_type = EVENT_LOGLEVEL_ALL
sched_process_free.loglevel_type = EVENT_LOGLEVEL_ALL
sched_process_free.loglevel_type = EVENT_LOGLEVEL_ALL
-#Creating a new session
-res = create("test","/lttng-traces/test")
-if res<0:
- raise LTTngError(strerror(res))
+# Creating a new session
+res = create("test", "/lttng-traces/test")
+if res < 0:
+ raise LTTngError(strerror(res))
han = None
han = Handle("test", dom)
if han is None:
han = None
han = Handle("test", dom)
if han is None:
- raise LTTngError("Handle not created")
+ raise LTTngError("Handle not created")
-#Enabling the kernel channel
+# Enabling the kernel channel
res = enable_channel(han, channel)
res = enable_channel(han, channel)
-if res<0:
- raise LTTngError(strerror(res))
+if res < 0:
+ raise LTTngError(strerror(res))
-#Enabling some events in given channel
-#To enable all events in default channel, use
-#enable_event(han, event, None)
+# Enabling some events in given channel
+# To enable all events in default channel, use
+# enable_event(han, event, None)
res = enable_event(han, sched_switch, channel.name)
res = enable_event(han, sched_switch, channel.name)
-if res<0:
- raise LTTngError(strerror(res))
+if res < 0:
+ raise LTTngError(strerror(res))
res = enable_event(han, sched_process_exit, channel.name)
res = enable_event(han, sched_process_exit, channel.name)
-if res<0:
- raise LTTngError(strerror(res))
+if res < 0:
+ raise LTTngError(strerror(res))
res = enable_event(han, sched_process_free, channel.name)
res = enable_event(han, sched_process_free, channel.name)
-if res<0:
- raise LTTngError(strerror(res))
+if res < 0:
+ raise LTTngError(strerror(res))
res = disable_event(han, sched_switch.name, channel.name)
res = disable_event(han, sched_switch.name, channel.name)
-if res<0:
- raise LTTngError(strerror(res))
+if res < 0:
+ raise LTTngError(strerror(res))
-#Getting a list of the channels
+# Getting a list of the channels
l = list_channels(han)
if type(l) is int:
l = list_channels(han)
if type(l) is int:
- raise LTTngError(strerror(l))
+ raise LTTngError(strerror(l))
-if res<0:
- raise LTTngError(strerror(res))
+if res < 0:
+ raise LTTngError(strerror(res))
-if res<0:
- raise LTTngError(strerror(res))
+if res < 0:
+ raise LTTngError(strerror(res))
res = disable_channel(han, channel.name)
res = disable_channel(han, channel.name)
-if res<0:
- raise LTTngError(strerror(res))
+if res < 0:
+ raise LTTngError(strerror(res))
+# Destroying the session
-if res<0:
- raise LTTngError(strerror(res))
+if res < 0:
+ raise LTTngError(strerror(res))
import tempfile
from lttng import *
import tempfile
from lttng import *
-class TestLttngPythonModule (unittest.TestCase):
+
+class TestLttngPythonModule(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.TemporaryDirectory()
def setUp(self):
self.tmpdir = tempfile.TemporaryDirectory()
r = destroy("test_kernel_all_ev")
self.assertGreaterEqual(r, 0, strerror(r))
r = destroy("test_kernel_all_ev")
self.assertGreaterEqual(r, 0, strerror(r))
def test_kernel_event(self):
dom = Domain()
def test_kernel_event(self):
dom = Domain()
dom.buf_type = BUFFER_GLOBAL
channel = Channel()
dom.buf_type = BUFFER_GLOBAL
channel = Channel()
+ channel.name = "mychan"
channel.attr.overwrite = 0
channel.attr.subbuf_size = 4096
channel.attr.num_subbuf = 8
channel.attr.overwrite = 0
channel.attr.subbuf_size = 4096
channel.attr.num_subbuf = 8
han = Handle("test_kernel_event", dom)
han = Handle("test_kernel_event", dom)
r = create("test_kernel_event", self.tmpdir.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = create("test_kernel_event", self.tmpdir.name)
self.assertGreaterEqual(r, 0, strerror(r))
- #Enabling channel tests
+ # Enabling channel tests
r = enable_channel(han, channel)
self.assertGreaterEqual(r, 0, strerror(r))
r = enable_channel(han, channel)
self.assertGreaterEqual(r, 0, strerror(r))
+ # Enabling events tests
r = enable_event(han, sched_switch, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = enable_event(han, sched_switch, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = enable_event(han, sched_process_free, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = enable_event(han, sched_process_free, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
- #Disabling events tests
+ # Disabling events tests
r = disable_event(han, sched_switch.name, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = disable_event(han, sched_process_free.name, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = disable_event(han, sched_switch.name, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = disable_event(han, sched_process_free.name, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
- #Renabling events tests
+ # Renabling events tests
r = enable_event(han, sched_switch, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = enable_event(han, sched_process_free, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = enable_event(han, sched_switch, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = enable_event(han, sched_process_free, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = start("test_kernel_event")
self.assertGreaterEqual(r, 0, strerror(r))
time.sleep(2)
r = start("test_kernel_event")
self.assertGreaterEqual(r, 0, strerror(r))
time.sleep(2)
r = stop("test_kernel_event")
self.assertGreaterEqual(r, 0, strerror(r))
r = stop("test_kernel_event")
self.assertGreaterEqual(r, 0, strerror(r))
- r=disable_channel(han, channel.name)
+ r = disable_channel(han, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
self.assertGreaterEqual(r, 0, strerror(r))
- r=destroy("test_kernel_event")
+ r = destroy("test_kernel_event")
self.assertGreaterEqual(r, 0, strerror(r))
self.assertGreaterEqual(r, 0, strerror(r))
def test_ust_all_events(self):
dom = Domain()
dom.type = DOMAIN_UST
def test_ust_all_events(self):
dom = Domain()
dom.type = DOMAIN_UST
r = destroy("test_ust_all_ev")
self.assertGreaterEqual(r, 0, strerror(r))
r = destroy("test_ust_all_ev")
self.assertGreaterEqual(r, 0, strerror(r))
def test_ust_event(self):
dom = Domain()
def test_ust_event(self):
dom = Domain()
dom.buf_type = BUFFER_PER_UID
channel = Channel()
dom.buf_type = BUFFER_PER_UID
channel = Channel()
+ channel.name = "mychan"
channel.attr.overwrite = 0
channel.attr.subbuf_size = 4096
channel.attr.num_subbuf = 8
channel.attr.overwrite = 0
channel.attr.subbuf_size = 4096
channel.attr.num_subbuf = 8
han = Handle("test_ust_event", dom)
han = Handle("test_ust_event", dom)
r = create("test_ust_event", self.tmpdir.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = create("test_ust_event", self.tmpdir.name)
self.assertGreaterEqual(r, 0, strerror(r))
- #Enabling channel tests
+ # Enabling channel tests
r = enable_channel(han, channel)
self.assertGreaterEqual(r, 0, strerror(r))
r = enable_channel(han, channel)
self.assertGreaterEqual(r, 0, strerror(r))
+ # Enabling events tests
r = enable_event(han, ev1, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = enable_event(han, ev1, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = enable_event(han, ev3, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = enable_event(han, ev3, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
- #Disabling events tests
+ # Disabling events tests
r = disable_event(han, ev1.name, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = disable_event(han, ev3.name, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = disable_event(han, ev1.name, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = disable_event(han, ev3.name, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
- #Renabling events tests
+ # Renabling events tests
r = enable_event(han, ev1, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = enable_event(han, ev3, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = enable_event(han, ev1, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = enable_event(han, ev3, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = start("test_ust_event")
self.assertGreaterEqual(r, 0, strerror(r))
time.sleep(2)
r = start("test_ust_event")
self.assertGreaterEqual(r, 0, strerror(r))
time.sleep(2)
r = stop("test_ust_event")
self.assertGreaterEqual(r, 0, strerror(r))
r = stop("test_ust_event")
self.assertGreaterEqual(r, 0, strerror(r))
r = start("test_ust_event")
self.assertGreaterEqual(r, 0, strerror(r))
time.sleep(2)
r = start("test_ust_event")
self.assertGreaterEqual(r, 0, strerror(r))
time.sleep(2)
r = stop("test_ust_event")
self.assertGreaterEqual(r, 0, strerror(r))
r = stop("test_ust_event")
self.assertGreaterEqual(r, 0, strerror(r))
- #Disabling channel and destroy
- r=disable_channel(han, channel.name)
+ # Disabling channel and destroy
+ r = disable_channel(han, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
self.assertGreaterEqual(r, 0, strerror(r))
- r=destroy("test_ust_event")
+ r = destroy("test_ust_event")
self.assertGreaterEqual(r, 0, strerror(r))
self.assertGreaterEqual(r, 0, strerror(r))
def test_other_functions(self):
dom = Domain()
def test_other_functions(self):
dom = Domain()
+ dom.type = DOMAIN_KERNEL
dom.buf_type = BUFFER_GLOBAL
dom.buf_type = BUFFER_GLOBAL
- event=Event()
- event.type=EVENT_TRACEPOINT
- event.loglevel_type=EVENT_LOGLEVEL_ALL
+ event = Event()
+ event.type = EVENT_TRACEPOINT
+ event.loglevel_type = EVENT_LOGLEVEL_ALL
- ctx.type=EVENT_CONTEXT_PID
+ ctx.type = EVENT_CONTEXT_PID
chattr = ChannelAttr()
chattr.overwrite = 0
chattr = ChannelAttr()
chattr.overwrite = 0
chattr.read_timer_interval = 200
chattr.output = EVENT_SPLICE
chattr.read_timer_interval = 200
chattr.output = EVENT_SPLICE
- han = Handle("test_otherf" , dom)
+ han = Handle("test_otherf", dom)
r = create("test_otherf", self.tmpdir.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = create("test_otherf", self.tmpdir.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = enable_event(han, event, None)
self.assertGreaterEqual(r, 0, strerror(r))
r = enable_event(han, event, None)
self.assertGreaterEqual(r, 0, strerror(r))
r = add_context(han, ctx, "sched_switch", "channel0")
self.assertGreaterEqual(r, 0, strerror(r))
r = add_context(han, ctx, "sched_switch", "channel0")
self.assertGreaterEqual(r, 0, strerror(r))
r = add_context(han, ctx, "sched_wakeup", None)
self.assertGreaterEqual(r, 0, strerror(r))
r = add_context(han, ctx, "sched_wakeup", None)
self.assertGreaterEqual(r, 0, strerror(r))
r = add_context(han, ctx, None, None)
self.assertGreaterEqual(r, 0, strerror(r))
r = add_context(han, ctx, None, None)
self.assertGreaterEqual(r, 0, strerror(r))
channel_set_default_attr(dom, chattr)
channel_set_default_attr(None, None)
channel_set_default_attr(dom, chattr)
channel_set_default_attr(None, None)
r = session_daemon_alive()
self.assertTrue(r == 1 or r == 0, strerror(r))
r = session_daemon_alive()
self.assertTrue(r == 1 or r == 0, strerror(r))
r = set_tracing_group("testing")
self.assertGreaterEqual(r, 0, strerror(r))
r = set_tracing_group("testing")
self.assertGreaterEqual(r, 0, strerror(r))
r = start("test_otherf")
self.assertGreaterEqual(r, 0, strerror(r))
time.sleep(2)
r = start("test_otherf")
self.assertGreaterEqual(r, 0, strerror(r))
time.sleep(2)
suite.addTest(TestLttngPythonModule("test_ust_all_events"))
return suite
suite.addTest(TestLttngPythonModule("test_ust_all_events"))
return suite
def kernel_suite():
suite = unittest.TestSuite()
suite.addTest(TestLttngPythonModule("test_kernel_event"))
def kernel_suite():
suite = unittest.TestSuite()
suite.addTest(TestLttngPythonModule("test_kernel_event"))
suite.addTest(TestLttngPythonModule("test_other_functions"))
return suite
suite.addTest(TestLttngPythonModule("test_other_functions"))
return suite
-if __name__ == '__main__':
+
+if __name__ == "__main__":
destroy("test_kernel_event")
destroy("test_kernel_all_events")
destroy("test_ust_all_events")
destroy("test_kernel_event")
destroy("test_kernel_all_events")
destroy("test_ust_all_events")
import bt2
except ImportError:
# quick fix for debian-based distros
import bt2
except ImportError:
# quick fix for debian-based distros
- sys.path.append("/usr/local/lib/python%d.%d/site-packages" %
- (sys.version_info.major, sys.version_info.minor))
+ sys.path.append(
+ "/usr/local/lib/python%d.%d/site-packages"
+ % (sys.version_info.major, sys.version_info.minor)
+ )
import bt2
NSEC_PER_SEC = 1000000000
import bt2
NSEC_PER_SEC = 1000000000
class TraceParser:
def __init__(self, trace_msg_iter, pid):
self.trace = trace_msg_iter
class TraceParser:
def __init__(self, trace_msg_iter, pid):
self.trace = trace_msg_iter
# Each test classes checks the payload of different events. Each of
# those checks are stored in a event_name specific dictionnary in this
# data structure.
# Each test classes checks the payload of different events. Each of
# those checks are stored in a event_name specific dictionnary in this
# data structure.
- self.expect = defaultdict(lambda : defaultdict(int))
+ self.expect = defaultdict(lambda: defaultdict(int))
# This dictionnary holds the value recorded in the trace that are
# tested. Its content is use to print the values that caused a test to
# This dictionnary holds the value recorded in the trace that are
# tested. Its content is use to print the values that caused a test to
self.recorded_values = {}
def ns_to_hour_nsec(self, ns):
self.recorded_values = {}
def ns_to_hour_nsec(self, ns):
- d = time.localtime(ns/NSEC_PER_SEC)
- return "%02d:%02d:%02d.%09d" % (d.tm_hour, d.tm_min, d.tm_sec,
- ns % NSEC_PER_SEC)
+ d = time.localtime(ns / NSEC_PER_SEC)
+ return "%02d:%02d:%02d.%09d" % (
+ d.tm_hour,
+ d.tm_min,
+ d.tm_sec,
+ ns % NSEC_PER_SEC,
+ )
def parse(self):
# iterate over all the events
def parse(self):
# iterate over all the events
continue
method_name = "handle_%s" % msg.event.name.replace(":", "_").replace(
continue
method_name = "handle_%s" % msg.event.name.replace(":", "_").replace(
# call the function to handle each event individually
if hasattr(TraceParser, method_name):
func = getattr(TraceParser, method_name)
# call the function to handle each event individually
if hasattr(TraceParser, method_name):
func = getattr(TraceParser, method_name)
class WorkingCases(TraceParser):
def __init__(self, trace, validation_args):
class WorkingCases(TraceParser):
def __init__(self, trace, validation_args):
- super().__init__(trace, validation_args['pid'])
+ super().__init__(trace, validation_args["pid"])
# Values expected in the trace
# Values expected in the trace
- self.epoll_wait_fd = validation_args['epoll_wait_fd']
- self.epoll_pwait_fd = validation_args['epoll_pwait_fd']
+ self.epoll_wait_fd = validation_args["epoll_wait_fd"]
+ self.epoll_pwait_fd = validation_args["epoll_pwait_fd"]
self.expect["select_entry"]["select_in_fd0"] = 0
self.expect["select_entry"]["select_in_fd1023"] = 0
self.expect["select_entry"]["select_in_fd0"] = 0
self.expect["select_entry"]["select_in_fd1023"] = 0
exceptfd_127 = event["exceptfds"][127]
# check that the FD 1023 is actually set in the readfds
exceptfd_127 = event["exceptfds"][127]
# check that the FD 1023 is actually set in the readfds
- if readfd_127 == 0x40 and writefd_127 == 0 and \
- exceptfd_127 == 0 and overflow == 0:
+ if (
+ readfd_127 == 0x40
+ and writefd_127 == 0
+ and exceptfd_127 == 0
+ and overflow == 0
+ ):
self.expect["select_entry"]["select_in_fd1023"] = 1
# Save values of local variables to print in case of test failure
self.expect["select_entry"]["select_in_fd1023"] = 1
# Save values of local variables to print in case of test failure
readfd_127 = event["readfds"][127]
writefd_127 = event["writefds"][127]
exceptfd_127 = event["exceptfds"][127]
readfd_127 = event["readfds"][127]
writefd_127 = event["writefds"][127]
exceptfd_127 = event["exceptfds"][127]
- if readfd_127 == 0x40 and writefd_127 == 0 and \
- exceptfd_127 == 0 and tvp == 0:
+ if (
+ readfd_127 == 0x40
+ and writefd_127 == 0
+ and exceptfd_127 == 0
+ and tvp == 0
+ ):
self.expect["select_exit"]["select_out_fd1023"] = 1
# Save values of local variables to print in case of test failure
self.expect["select_exit"]["select_out_fd1023"] = 1
# Save values of local variables to print in case of test failure
# the raw value matches the events bit field.
if nfds == 1 and fds_length == 1:
fd_0 = event["fds"][0]
# the raw value matches the events bit field.
if nfds == 1 and fds_length == 1:
fd_0 = event["fds"][0]
- if fd_0["raw_events"] == 0x3 and fd_0["events"]["POLLIN"] == 1 and \
- fd_0["events"]["padding"] == 0:
+ if (
+ fd_0["raw_events"] == 0x3
+ and fd_0["events"]["POLLIN"] == 1
+ and fd_0["events"]["padding"] == 0
+ ):
self.expect["poll_entry"]["poll_in_nfds1"] = 1
# Save values of local variables to print in case of test failure
self.expect["poll_entry"]["poll_in_nfds1"] = 1
# Save values of local variables to print in case of test failure
# the raw value matches the events bit field.
if ret == 1 and fds_length == 1:
fd_0 = event["fds"][0]
# the raw value matches the events bit field.
if ret == 1 and fds_length == 1:
fd_0 = event["fds"][0]
- if fd_0["raw_events"] == 0x1 and fd_0["events"]["POLLIN"] == 1 and \
- fd_0["events"]["padding"] == 0:
+ if (
+ fd_0["raw_events"] == 0x1
+ and fd_0["events"]["POLLIN"] == 1
+ and fd_0["events"]["padding"] == 0
+ ):
self.expect["poll_exit"]["poll_out_nfds1"] = 1
# Save values of local variables to print in case of test failure
self.expect["poll_exit"]["poll_out_nfds1"] = 1
# Save values of local variables to print in case of test failure
# check that we have FD 0 waiting for EPOLLIN|EPOLLPRI and that
# data.fd = 0
# check that we have FD 0 waiting for EPOLLIN|EPOLLPRI and that
# data.fd = 0
- if (epfd == self.epoll_wait_fd or epfd == self.epoll_pwait_fd) and 'EPOLL_CTL_ADD' in op_enum.labels and fd == 0 and \
- _event["data_union"]["fd"] == 0 and \
- _event["events"]["EPOLLIN"] == 1 and \
- _event["events"]["EPOLLPRI"] == 1:
+ if (
+ (epfd == self.epoll_wait_fd or epfd == self.epoll_pwait_fd)
+ and "EPOLL_CTL_ADD" in op_enum.labels
+ and fd == 0
+ and _event["data_union"]["fd"] == 0
+ and _event["events"]["EPOLLIN"] == 1
+ and _event["events"]["EPOLLPRI"] == 1
+ ):
self.expect["epoll_ctl_entry"]["epoll_ctl_in_add"] = 1
# Save values of local variables to print in case of test failure
self.expect["epoll_ctl_entry"]["epoll_ctl_in_add"] = 1
# Save values of local variables to print in case of test failure
# check that FD 0 returned with EPOLLIN and the right data.fd
if ret == 1 and fds_length == 1:
fd_0 = event["fds"][0]
# check that FD 0 returned with EPOLLIN and the right data.fd
if ret == 1 and fds_length == 1:
fd_0 = event["fds"][0]
- if overflow == 0 and fd_0["data_union"]["fd"] == 0 and \
- fd_0["events"]["EPOLLIN"] == 1:
+ if (
+ overflow == 0
+ and fd_0["data_union"]["fd"] == 0
+ and fd_0["events"]["EPOLLIN"] == 1
+ ):
self.expect["epoll_wait_exit"]["epoll_wait_out_fd0"] = 1
# Save values of local variables to print in case of test failure
self.expect["epoll_wait_exit"]["epoll_wait_out_fd0"] = 1
# Save values of local variables to print in case of test failure
# check that FD 0 returned with EPOLLIN and the right data.fd
if ret == 1 and fds_length == 1:
fd_0 = event["fds"][0]
# check that FD 0 returned with EPOLLIN and the right data.fd
if ret == 1 and fds_length == 1:
fd_0 = event["fds"][0]
- if overflow == 0 and fd_0["data_union"]["fd"] == 0 and \
- fd_0["events"]["EPOLLIN"] == 1:
+ if (
+ overflow == 0
+ and fd_0["data_union"]["fd"] == 0
+ and fd_0["events"]["EPOLLIN"] == 1
+ ):
self.expect["epoll_pwait_exit"]["epoll_pwait_out_fd0"] = 1
# Save values of local variables to print in case of test failure
self.recorded_values["epoll_pwait_exit"] = locals()
self.expect["epoll_pwait_exit"]["epoll_pwait_out_fd0"] = 1
# Save values of local variables to print in case of test failure
self.recorded_values["epoll_pwait_exit"] = locals()
class WorkingCasesTimeout(TraceParser):
def __init__(self, trace, validation_args):
class WorkingCasesTimeout(TraceParser):
def __init__(self, trace, validation_args):
- super().__init__(trace, validation_args['pid'])
+ super().__init__(trace, validation_args["pid"])
self.expect["select_entry"]["select_timeout_in_fd0"] = 0
self.expect["select_entry"]["select_timeout_in_fd1023"] = 0
self.expect["select_exit"]["select_timeout_out"] = 0
self.expect["select_entry"]["select_timeout_in_fd0"] = 0
self.expect["select_entry"]["select_timeout_in_fd1023"] = 0
self.expect["select_exit"]["select_timeout_out"] = 0
writefd_127 = event["writefds"][127]
exceptfd_127 = event["exceptfds"][127]
writefd_127 = event["writefds"][127]
exceptfd_127 = event["exceptfds"][127]
- if readfd_127 == 0x40 and writefd_127 == 0 and \
- exceptfd_127 == 0 and tvp != 0:
+ if (
+ readfd_127 == 0x40
+ and writefd_127 == 0
+ and exceptfd_127 == 0
+ and tvp != 0
+ ):
self.expect["select_entry"]["select_timeout_in_fd1023"] = 1
# Save values of local variables to print in case of test failure
self.expect["select_entry"]["select_timeout_in_fd1023"] = 1
# Save values of local variables to print in case of test failure
# field matches the value of POLLIN
if nfds == 1 and fds_length == 1:
fd_0 = event["fds"][0]
# field matches the value of POLLIN
if nfds == 1 and fds_length == 1:
fd_0 = event["fds"][0]
- if fd_0["raw_events"] == 0x3 and \
- fd_0["events"]["POLLIN"] == 1 and \
- fd_0["events"]["padding"] == 0:
+ if (
+ fd_0["raw_events"] == 0x3
+ and fd_0["events"]["POLLIN"] == 1
+ and fd_0["events"]["padding"] == 0
+ ):
self.expect["poll_entry"]["poll_timeout_in"] = 1
# Save values of local variables to print in case of test failure
self.expect["poll_entry"]["poll_timeout_in"] = 1
# Save values of local variables to print in case of test failure
_event = event["event"]
# make sure we see a EPOLLIN|EPOLLPRI
_event = event["event"]
# make sure we see a EPOLLIN|EPOLLPRI
- if 'EPOLL_CTL_ADD' in op_enum.labels and \
- _event["events"]["EPOLLIN"] == 1 and \
- _event["events"]["EPOLLPRI"] == 1:
+ if (
+ "EPOLL_CTL_ADD" in op_enum.labels
+ and _event["events"]["EPOLLIN"] == 1
+ and _event["events"]["EPOLLPRI"] == 1
+ ):
self.expect["epoll_ctl_entry"]["epoll_ctl_timeout_in_add"] = 1
# Save values of local variables to print in case of test failure
self.expect["epoll_ctl_entry"]["epoll_ctl_timeout_in_add"] = 1
# Save values of local variables to print in case of test failure
class PselectInvalidFd(TraceParser):
def __init__(self, trace, validation_args):
class PselectInvalidFd(TraceParser):
def __init__(self, trace, validation_args):
- super().__init__(trace, validation_args['pid'])
+ super().__init__(trace, validation_args["pid"])
self.expect["select_entry"]["select_invalid_fd_in"] = 0
self.expect["select_exit"]["select_invalid_fd_out"] = 0
self.expect["select_entry"]["select_invalid_fd_in"] = 0
self.expect["select_exit"]["select_invalid_fd_out"] = 0
class PpollBig(TraceParser):
def __init__(self, trace, validation_args):
class PpollBig(TraceParser):
def __init__(self, trace, validation_args):
- super().__init__(trace, validation_args['pid'])
+ super().__init__(trace, validation_args["pid"])
self.expect["poll_entry"]["big_poll_in"] = 0
self.expect["poll_exit"]["big_poll_out"] = 0
self.expect["poll_entry"]["big_poll_in"] = 0
self.expect["poll_exit"]["big_poll_out"] = 0
if nfds == 2047 and fds_length == 512 and overflow == 1:
fd_0 = event["fds"][0]
fd_511 = event["fds"][511]
if nfds == 2047 and fds_length == 512 and overflow == 1:
fd_0 = event["fds"][0]
fd_511 = event["fds"][511]
- if fd_0["raw_events"] == 0x3 and fd_0["events"]["POLLIN"] == 1 and \
- fd_0["events"]["padding"] == 0 and \
- fd_511["events"]["POLLIN"] == 1 and \
- fd_511["events"]["POLLPRI"] == 1:
+ if (
+ fd_0["raw_events"] == 0x3
+ and fd_0["events"]["POLLIN"] == 1
+ and fd_0["events"]["padding"] == 0
+ and fd_511["events"]["POLLIN"] == 1
+ and fd_511["events"]["POLLPRI"] == 1
+ ):
self.expect["poll_entry"]["big_poll_in"] = 1
# Save values of local variables to print in case of test failure
self.expect["poll_entry"]["big_poll_in"] = 1
# Save values of local variables to print in case of test failure
# test of big list of FDs and the behaviour of the overflow
if ret == 2047 and nfds == 2047 and fds_length == 512 and overflow == 1:
# test of big list of FDs and the behaviour of the overflow
if ret == 2047 and nfds == 2047 and fds_length == 512 and overflow == 1:
- fd_0 = event["fds"][0]
- fd_511 = event["fds"][511]
- if fd_0["events"]["POLLIN"] == 1 and fd_511["events"]["POLLIN"] == 1:
- self.expect["poll_exit"]["big_poll_out"] = 1
+ fd_0 = event["fds"][0]
+ fd_511 = event["fds"][511]
+ if fd_0["events"]["POLLIN"] == 1 and fd_511["events"]["POLLIN"] == 1:
+ self.expect["poll_exit"]["big_poll_out"] = 1
# Save values of local variables to print in case of test failure
self.recorded_values["poll_exit"] = locals()
# Save values of local variables to print in case of test failure
self.recorded_values["poll_exit"] = locals()
class PpollFdsBufferOverflow(TraceParser):
def __init__(self, trace, validation_args):
class PpollFdsBufferOverflow(TraceParser):
def __init__(self, trace, validation_args):
- super().__init__(trace, validation_args['pid'])
+ super().__init__(trace, validation_args["pid"])
self.expect["poll_entry"]["poll_overflow_in"] = 0
self.expect["poll_exit"]["poll_overflow_out"] = 0
self.expect["poll_entry"]["poll_overflow_in"] = 0
self.expect["poll_exit"]["poll_overflow_out"] = 0
class PselectInvalidPointer(TraceParser):
def __init__(self, trace, validation_args):
class PselectInvalidPointer(TraceParser):
def __init__(self, trace, validation_args):
- super().__init__(trace, validation_args['pid'])
+ super().__init__(trace, validation_args["pid"])
self.expect["select_entry"]["pselect_invalid_in"] = 0
self.expect["select_exit"]["pselect_invalid_out"] = 0
self.expect["select_entry"]["pselect_invalid_in"] = 0
self.expect["select_exit"]["pselect_invalid_out"] = 0
class PpollFdsULongMax(TraceParser):
def __init__(self, trace, validation_args):
class PpollFdsULongMax(TraceParser):
def __init__(self, trace, validation_args):
- super().__init__(trace, validation_args['pid'])
+ super().__init__(trace, validation_args["pid"])
self.expect["poll_entry"]["poll_max_in"] = 0
self.expect["poll_exit"]["poll_max_out"] = 0
self.expect["poll_entry"]["poll_max_in"] = 0
self.expect["poll_exit"]["poll_max_out"] = 0
# Save values of local variables to print in case of test failure
self.recorded_values["poll_entry"] = locals()
# Save values of local variables to print in case of test failure
self.recorded_values["poll_entry"] = locals()
def poll_exit(self, event):
ret = event["ret"]
nfds = event["nfds"]
def poll_exit(self, event):
ret = event["ret"]
nfds = event["nfds"]
class EpollPwaitInvalidPointer(TraceParser):
def __init__(self, trace, validation_args):
class EpollPwaitInvalidPointer(TraceParser):
def __init__(self, trace, validation_args):
- super().__init__(trace, validation_args['pid'])
+ super().__init__(trace, validation_args["pid"])
# Values expected in the trace
# Values expected in the trace
- self.epoll_fd = validation_args['epollfd']
+ self.epoll_fd = validation_args["epollfd"]
self.expect["epoll_wait_entry"]["epoll_wait_invalid_in"] = 0
self.expect["epoll_wait_exit"]["epoll_wait_invalid_out"] = 0
self.expect["epoll_wait_entry"]["epoll_wait_invalid_in"] = 0
self.expect["epoll_wait_exit"]["epoll_wait_invalid_out"] = 0
class EpollPwaitIntMax(TraceParser):
def __init__(self, trace, validation_args):
class EpollPwaitIntMax(TraceParser):
def __init__(self, trace, validation_args):
- super().__init__(trace, validation_args['pid'])
+ super().__init__(trace, validation_args["pid"])
# Values expected in the trace
# Values expected in the trace
- self.epoll_fd = validation_args['epollfd']
+ self.epoll_fd = validation_args["epollfd"]
self.expect["epoll_wait_entry"]["epoll_wait_max_in"] = 0
self.expect["epoll_wait_exit"]["epoll_wait_max_out"] = 0
self.expect["epoll_wait_entry"]["epoll_wait_max_in"] = 0
self.expect["epoll_wait_exit"]["epoll_wait_max_out"] = 0
if __name__ == "__main__":
if __name__ == "__main__":
- parser = argparse.ArgumentParser(description='Trace parser')
- parser.add_argument('path', metavar="<path/to/trace>", help='Trace path')
- parser.add_argument('-t', '--test', type=str, help='Test to validate')
- parser.add_argument('-o', '--validation-file', type=str, help='Validation file path')
+ parser = argparse.ArgumentParser(description="Trace parser")
+ parser.add_argument("path", metavar="<path/to/trace>", help="Trace path")
+ parser.add_argument("-t", "--test", type=str, help="Test to validate")
+ parser.add_argument(
+ "-o", "--validation-file", type=str, help="Validation file path"
+ )
args = parser.parse_args()
if not args.test:
args = parser.parse_args()
if not args.test:
try:
test_validation_args = json.load(f)
except Exception as e:
try:
test_validation_args = json.load(f)
except Exception as e:
- print('Failed to parse validation file: ' + str(e))
+ print("Failed to parse validation file: " + str(e))
# Check if a sessiond is running... bail out if none found.
if session_daemon_alive() == 0:
# Check if a sessiond is running... bail out if none found.
if session_daemon_alive() == 0:
- bail("""No sessiond running. Please make sure you are running this test
+ bail(
+ """No sessiond running. Please make sure you are running this test
with the "run" shell script and verify that the lttng tools are
with the "run" shell script and verify that the lttng tools are
- properly installed.""")
+ properly installed."""
+ )
session_info = create_session()
enable_ust_tracepoint_event(session_info, "*")
start_session(session_info)
session_info = create_session()
enable_ust_tracepoint_event(session_info, "*")
start_session(session_info)
-test_process = subprocess.Popen(test_path + "prog.strip", stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+test_process = subprocess.Popen(
+ test_path + "prog.strip", stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
+)
-print_test_result(test_process.returncode == 0, current_test, "Test application exited normally")
+print_test_result(
+ test_process.returncode == 0, current_test, "Test application exited normally"
+)
current_test += 1
stop_session(session_info)
# Check for statedump events in the resulting trace
try:
current_test += 1
stop_session(session_info)
# Check for statedump events in the resulting trace
try:
- babeltrace_process = subprocess.Popen([BABELTRACE_BIN, session_info.trace_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ babeltrace_process = subprocess.Popen(
+ [BABELTRACE_BIN, session_info.trace_path],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
except FileNotFoundError:
except FileNotFoundError:
- bail("Could not open {}. Please make sure it is installed.".format(BABELTRACE_BIN), session_info)
+ bail(
+ "Could not open {}. Please make sure it is installed.".format(BABELTRACE_BIN),
+ session_info,
+ )
start_event_found = False
bin_info_event_found = False
start_event_found = False
bin_info_event_found = False
for event_line in babeltrace_process.stdout:
# Let babeltrace finish to get the return code
for event_line in babeltrace_process.stdout:
# Let babeltrace finish to get the return code
- if start_event_found and bin_info_event_found and build_id_event_found and \
- debug_link_event_found and end_event_found:
+ if (
+ start_event_found
+ and bin_info_event_found
+ and build_id_event_found
+ and debug_link_event_found
+ and end_event_found
+ ):
- event_line = event_line.decode('utf-8').replace("\n", "")
+ event_line = event_line.decode("utf-8").replace("\n", "")
if re.search(r".*lttng_ust_statedump:start.*", event_line) is not None:
start_event_found = True
elif re.search(r".*lttng_ust_statedump:bin_info.*", event_line) is not None:
if re.search(r".*lttng_ust_statedump:start.*", event_line) is not None:
start_event_found = True
elif re.search(r".*lttng_ust_statedump:bin_info.*", event_line) is not None:
babeltrace_process.wait()
babeltrace_process.wait()
-print_test_result(babeltrace_process.returncode == 0, current_test, "Resulting trace is readable")
+print_test_result(
+ babeltrace_process.returncode == 0, current_test, "Resulting trace is readable"
+)
-print_test_result(start_event_found, current_test, "lttng_ust_statedump:start event found in resulting trace")
+print_test_result(
+ start_event_found,
+ current_test,
+ "lttng_ust_statedump:start event found in resulting trace",
+)
-print_test_result(bin_info_event_found, current_test, "lttng_ust_statedump:bin_info event found in resulting trace")
+print_test_result(
+ bin_info_event_found,
+ current_test,
+ "lttng_ust_statedump:bin_info event found in resulting trace",
+)
-print_test_result(build_id_event_found, current_test, "lttng_ust_statedump:build_id event found in resulting trace")
+print_test_result(
+ build_id_event_found,
+ current_test,
+ "lttng_ust_statedump:build_id event found in resulting trace",
+)
-print_test_result(debug_link_event_found, current_test, "lttng_ust_statedump:debug_link event found in resulting trace")
+print_test_result(
+ debug_link_event_found,
+ current_test,
+ "lttng_ust_statedump:debug_link event found in resulting trace",
+)
-print_test_result(end_event_found, current_test, "lttng_ust_statedump:end event found in resulting trace")
+print_test_result(
+ end_event_found,
+ current_test,
+ "lttng_ust_statedump:end event found in resulting trace",
+)
current_test += 1
shutil.rmtree(session_info.tmp_directory)
current_test += 1
shutil.rmtree(session_info.tmp_directory)
# Check if a sessiond is running... bail out if none found.
if session_daemon_alive() == 0:
# Check if a sessiond is running... bail out if none found.
if session_daemon_alive() == 0:
- bail("No sessiond running. Please make sure you are running this test with the \"run\" shell script and verify that the lttng tools are properly installed.")
+ bail(
+ 'No sessiond running. Please make sure you are running this test with the "run" shell script and verify that the lttng tools are properly installed.'
+ )
session_info = create_session()
enable_ust_tracepoint_event(session_info, "*")
session_info = create_session()
enable_ust_tracepoint_event(session_info, "*")
daemon_pid = None
daemon_process = subprocess.Popen(test_path + "daemon", stdout=subprocess.PIPE)
for line in daemon_process.stdout:
daemon_pid = None
daemon_process = subprocess.Popen(test_path + "daemon", stdout=subprocess.PIPE)
for line in daemon_process.stdout:
- name, pid = line.decode('utf-8').split()
+ name, pid = line.decode("utf-8").split()
if name == "child_pid":
daemon_pid = int(pid)
if name == "parent_pid":
if name == "child_pid":
daemon_pid = int(pid)
if name == "parent_pid":
daemon_process_return_code = daemon_process.wait()
if parent_pid is None or daemon_pid is None:
daemon_process_return_code = daemon_process.wait()
if parent_pid is None or daemon_pid is None:
- bail("Unexpected output received from daemon test executable." + str(daemon_process_output))
-
-print_test_result(daemon_process_return_code == 0, current_test, "Successful call to daemon() and normal exit")
+ bail(
+ "Unexpected output received from daemon test executable."
+ + str(daemon_process_output)
+ )
+
+print_test_result(
+ daemon_process_return_code == 0,
+ current_test,
+ "Successful call to daemon() and normal exit",
+)
current_test += 1
if daemon_process_return_code != 0:
current_test += 1
if daemon_process_return_code != 0:
stop_session(session_info)
try:
stop_session(session_info)
try:
- babeltrace_process = subprocess.Popen([BABELTRACE_BIN, session_info.trace_path], stdout=subprocess.PIPE)
+ babeltrace_process = subprocess.Popen(
+ [BABELTRACE_BIN, session_info.trace_path], stdout=subprocess.PIPE
+ )
except FileNotFoundError:
bail("Could not open {}. Please make sure it is installed.".format(BABELTRACE_BIN))
except FileNotFoundError:
bail("Could not open {}. Please make sure it is installed.".format(BABELTRACE_BIN))
after_daemon_event_pid = -1
for event_line in babeltrace_process.stdout:
after_daemon_event_pid = -1
for event_line in babeltrace_process.stdout:
- event_line = event_line.decode('utf-8').replace("\n", "")
+ event_line = event_line.decode("utf-8").replace("\n", "")
if re.search(r"before_daemon", event_line) is not None:
if before_daemon_event_found:
if re.search(r"before_daemon", event_line) is not None:
if before_daemon_event_found:
- bail("Multiple instances of the before_daemon event found. Please make sure only one instance of this test is runnning.")
+ bail(
+ "Multiple instances of the before_daemon event found. Please make sure only one instance of this test is runnning."
+ )
before_daemon_event_found = True
match = re.search(r"(?<=pid = )\d+", event_line)
before_daemon_event_found = True
match = re.search(r"(?<=pid = )\d+", event_line)
if re.search(r"after_daemon", event_line) is not None:
if after_daemon_event_found:
if re.search(r"after_daemon", event_line) is not None:
if after_daemon_event_found:
- bail("Multiple instances of the after_daemon event found. Please make sure only one instance of this test is runnning.")
+ bail(
+ "Multiple instances of the after_daemon event found. Please make sure only one instance of this test is runnning."
+ )
after_daemon_event_found = True
match = re.search(r"(?<=pid = )\d+", event_line)
after_daemon_event_found = True
match = re.search(r"(?<=pid = )\d+", event_line)
after_daemon_event_pid = int(match.group(0))
babeltrace_process.wait()
after_daemon_event_pid = int(match.group(0))
babeltrace_process.wait()
-print_test_result(babeltrace_process.returncode == 0, current_test, "Resulting trace is readable")
+print_test_result(
+ babeltrace_process.returncode == 0, current_test, "Resulting trace is readable"
+)
current_test += 1
if babeltrace_process.returncode != 0:
bail("Unreadable trace; can't proceed with analysis.")
current_test += 1
if babeltrace_process.returncode != 0:
bail("Unreadable trace; can't proceed with analysis.")
-print_test_result(before_daemon_event_found, current_test, "before_daemon event found in resulting trace")
+print_test_result(
+ before_daemon_event_found,
+ current_test,
+ "before_daemon event found in resulting trace",
+)
-print_test_result(before_daemon_event_pid == parent_pid, current_test, "Parent pid reported in trace is correct")
+print_test_result(
+ before_daemon_event_pid == parent_pid,
+ current_test,
+ "Parent pid reported in trace is correct",
+)
-print_test_result(before_daemon_event_found, current_test, "after_daemon event found in resulting trace")
+print_test_result(
+ before_daemon_event_found,
+ current_test,
+ "after_daemon event found in resulting trace",
+)
-print_test_result(after_daemon_event_pid == daemon_pid, current_test, "Daemon pid reported in trace is correct")
+print_test_result(
+ after_daemon_event_pid == daemon_pid,
+ current_test,
+ "Daemon pid reported in trace is correct",
+)
current_test += 1
shutil.rmtree(session_info.tmp_directory)
current_test += 1
shutil.rmtree(session_info.tmp_directory)
# Check if a sessiond is running... bail out if none found.
if session_daemon_alive() == 0:
# Check if a sessiond is running... bail out if none found.
if session_daemon_alive() == 0:
- bail("No sessiond running. Please make sure you are running this test with the \"run\" shell script and verify that the lttng tools are properly installed.")
+ bail(
+ 'No sessiond running. Please make sure you are running this test with the "run" shell script and verify that the lttng tools are properly installed.'
+ )
session_info = create_session()
enable_ust_tracepoint_event(session_info, "ust_tests_exitfast*")
session_info = create_session()
enable_ust_tracepoint_event(session_info, "ust_tests_exitfast*")
test_env = os.environ.copy()
test_env["LTTNG_UST_REGISTER_TIMEOUT"] = "-1"
test_env = os.environ.copy()
test_env["LTTNG_UST_REGISTER_TIMEOUT"] = "-1"
-exit_fast_process = subprocess.Popen(test_path + "exit-fast", stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, env=test_env)
+exit_fast_process = subprocess.Popen(
+ test_path + "exit-fast",
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ env=test_env,
+)
-print_test_result(exit_fast_process.returncode == 0, current_test, "Test application exited normally")
+print_test_result(
+ exit_fast_process.returncode == 0, current_test, "Test application exited normally"
+)
-exit_fast_process = subprocess.Popen([test_path + "exit-fast", "suicide"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, env=test_env)
+exit_fast_process = subprocess.Popen(
+ [test_path + "exit-fast", "suicide"],
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ env=test_env,
+)
exit_fast_process.wait()
stop_session(session_info)
# Check both events (normal exit and suicide messages) are present in the resulting trace
try:
exit_fast_process.wait()
stop_session(session_info)
# Check both events (normal exit and suicide messages) are present in the resulting trace
try:
- babeltrace_process = subprocess.Popen([BABELTRACE_BIN, session_info.trace_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ babeltrace_process = subprocess.Popen(
+ [BABELTRACE_BIN, session_info.trace_path],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
except FileNotFoundError:
bail("Could not open {}. Please make sure it is installed.".format(BABELTRACE_BIN))
event_lines = []
for event_line in babeltrace_process.stdout:
except FileNotFoundError:
bail("Could not open {}. Please make sure it is installed.".format(BABELTRACE_BIN))
event_lines = []
for event_line in babeltrace_process.stdout:
- event_line = event_line.decode('utf-8').replace("\n", "")
+ event_line = event_line.decode("utf-8").replace("\n", "")
event_lines.append(event_line)
babeltrace_process.wait()
event_lines.append(event_line)
babeltrace_process.wait()
-print_test_result(babeltrace_process.returncode == 0, current_test, "Resulting trace is readable")
+print_test_result(
+ babeltrace_process.returncode == 0, current_test, "Resulting trace is readable"
+)
current_test += 1
if babeltrace_process.returncode != 0:
bail("Unreadable trace; can't proceed with analysis.")
current_test += 1
if babeltrace_process.returncode != 0:
bail("Unreadable trace; can't proceed with analysis.")
-print_test_result(len(event_lines) == 2, current_test, "Correct number of events found in resulting trace")
+print_test_result(
+ len(event_lines) == 2,
+ current_test,
+ "Correct number of events found in resulting trace",
+)
current_test += 1
if len(event_lines) != 2:
current_test += 1
if len(event_lines) != 2:
- bail("Unexpected number of events found in resulting trace (" + session_info.trace_path + ")." )
+ bail(
+ "Unexpected number of events found in resulting trace ("
+ + session_info.trace_path
+ + ")."
+ )
match = re.search(r".*message = \"(.*)\"", event_lines[0])
match = re.search(r".*message = \"(.*)\"", event_lines[0])
-print_test_result(match is not None and match.group(1) == normal_exit_message, current_test,\
- "Tracepoint message generated during normal exit run is present in trace and has the expected value")
+print_test_result(
+ match is not None and match.group(1) == normal_exit_message,
+ current_test,
+ "Tracepoint message generated during normal exit run is present in trace and has the expected value",
+)
current_test += 1
match = re.search(r".*message = \"(.*)\"", event_lines[1])
current_test += 1
match = re.search(r".*message = \"(.*)\"", event_lines[1])
-print_test_result(match is not None and match.group(1) == suicide_exit_message, current_test,\
- "Tracepoint message generated during suicide run is present in trace and has the expected value")
+print_test_result(
+ match is not None and match.group(1) == suicide_exit_message,
+ current_test,
+ "Tracepoint message generated during suicide run is present in trace and has the expected value",
+)
current_test += 1
shutil.rmtree(session_info.tmp_directory)
current_test += 1
shutil.rmtree(session_info.tmp_directory)
# Check if a sessiond is running... bail out if none found.
if session_daemon_alive() == 0:
# Check if a sessiond is running... bail out if none found.
if session_daemon_alive() == 0:
- bail("No sessiond running. Please make sure you are running this test with the \"run\" shell script and verify that the lttng tools are properly installed.")
+ bail(
+ 'No sessiond running. Please make sure you are running this test with the "run" shell script and verify that the lttng tools are properly installed.'
+ )
session_info = create_session()
enable_ust_tracepoint_event(session_info, "ust_tests_fork*")
start_session(session_info)
session_info = create_session()
enable_ust_tracepoint_event(session_info, "ust_tests_fork*")
start_session(session_info)
-fork_process = subprocess.Popen([test_path + "fork", test_path + "fork2"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+fork_process = subprocess.Popen(
+ [test_path + "fork", test_path + "fork2"],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+)
parent_pid = -1
child_pid = -1
for line in fork_process.stdout:
parent_pid = -1
child_pid = -1
for line in fork_process.stdout:
- line = line.decode('utf-8').replace("\n", "")
+ line = line.decode("utf-8").replace("\n", "")
match = re.search(r"child_pid (\d+)", line)
if match:
child_pid = match.group(1)
match = re.search(r"child_pid (\d+)", line)
if match:
child_pid = match.group(1)
-print_test_result(fork_process.returncode == 0, current_test, "Fork test application exited normally")
+print_test_result(
+ fork_process.returncode == 0, current_test, "Fork test application exited normally"
+)
current_test += 1
stop_session(session_info)
# Check both events (normal exit and suicide messages) are present in the resulting trace
try:
current_test += 1
stop_session(session_info)
# Check both events (normal exit and suicide messages) are present in the resulting trace
try:
- babeltrace_process = subprocess.Popen([BABELTRACE_BIN, session_info.trace_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ babeltrace_process = subprocess.Popen(
+ [BABELTRACE_BIN, session_info.trace_path],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
except FileNotFoundError:
bail("Could not open {}. Please make sure it is installed.".format(BABELTRACE_BIN))
event_lines = []
for event_line in babeltrace_process.stdout:
except FileNotFoundError:
bail("Could not open {}. Please make sure it is installed.".format(BABELTRACE_BIN))
event_lines = []
for event_line in babeltrace_process.stdout:
- event_line = event_line.decode('utf-8').replace("\n", "")
- if re.search(r"warning", event_line) is not None or re.search(r"error", event_line) is not None:
- print( "# " + event_line )
+ event_line = event_line.decode("utf-8").replace("\n", "")
+ if (
+ re.search(r"warning", event_line) is not None
+ or re.search(r"error", event_line) is not None
+ ):
+ print("# " + event_line)
else:
event_lines.append(event_line)
babeltrace_process.wait()
else:
event_lines.append(event_line)
babeltrace_process.wait()
-print_test_result(babeltrace_process.returncode == 0, current_test, "Resulting trace is readable")
+print_test_result(
+ babeltrace_process.returncode == 0, current_test, "Resulting trace is readable"
+)
current_test += 1
if babeltrace_process.returncode != 0:
current_test += 1
if babeltrace_process.returncode != 0:
continue
if re.search(r"before_fork", event_line):
continue
if re.search(r"before_fork", event_line):
- event_before_fork = (event_pid == parent_pid)
+ event_before_fork = event_pid == parent_pid
if re.search(r"after_fork_parent", event_line):
if re.search(r"after_fork_parent", event_line):
- event_after_fork_parent = (event_pid == parent_pid)
+ event_after_fork_parent = event_pid == parent_pid
if re.search(r"after_fork_child", event_line):
if re.search(r"after_fork_child", event_line):
- event_after_fork_child = (event_pid == child_pid)
+ event_after_fork_child = event_pid == child_pid
if re.search(r"after_exec", event_line):
if re.search(r"after_exec", event_line):
- event_after_exec = (event_pid == child_pid)
+ event_after_exec = event_pid == child_pid
-print_test_result(event_before_fork, current_test, "before_fork event logged by parent process found in trace")
+print_test_result(
+ event_before_fork,
+ current_test,
+ "before_fork event logged by parent process found in trace",
+)
-print_test_result(event_after_fork_parent, current_test, "after_fork_parent event logged by parent process found in trace")
+print_test_result(
+ event_after_fork_parent,
+ current_test,
+ "after_fork_parent event logged by parent process found in trace",
+)
-print_test_result(event_after_fork_child, current_test, "after_fork_child event logged by child process found in trace")
+print_test_result(
+ event_after_fork_child,
+ current_test,
+ "after_fork_child event logged by child process found in trace",
+)
-print_test_result(event_after_exec, current_test, "after_exec event logged by child process found in trace")
+print_test_result(
+ event_after_exec,
+ current_test,
+ "after_exec event logged by child process found in trace",
+)
current_test += 1
shutil.rmtree(session_info.tmp_directory)
current_test += 1
shutil.rmtree(session_info.tmp_directory)
# Check if a sessiond is running... bail out if none found.
if session_daemon_alive() == 0:
# Check if a sessiond is running... bail out if none found.
if session_daemon_alive() == 0:
- bail("No sessiond running. Please make sure you are running this test with the \"run\" shell script and verify that the lttng tools are properly installed.")
+ bail(
+ 'No sessiond running. Please make sure you are running this test with the "run" shell script and verify that the lttng tools are properly installed.'
+ )
session_info = create_session()
enable_ust_tracepoint_event(session_info, "lttng_ust_libc*")
start_session(session_info)
session_info = create_session()
enable_ust_tracepoint_event(session_info, "lttng_ust_libc*")
start_session(session_info)
-malloc_process = subprocess.Popen(test_path + "prog", stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+malloc_process = subprocess.Popen(
+ test_path + "prog", stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
+)
-print_test_result(malloc_process.returncode == 0, current_test, "Test application exited normally")
+print_test_result(
+ malloc_process.returncode == 0, current_test, "Test application exited normally"
+)
current_test += 1
stop_session(session_info)
# Check for malloc events in the resulting trace
try:
current_test += 1
stop_session(session_info)
# Check for malloc events in the resulting trace
try:
- babeltrace_process = subprocess.Popen([BABELTRACE_BIN, session_info.trace_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ babeltrace_process = subprocess.Popen(
+ [BABELTRACE_BIN, session_info.trace_path],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
except FileNotFoundError:
except FileNotFoundError:
- bail("Could not open {}. Please make sure it is installed.".format(BABELTRACE_BIN), session_info)
+ bail(
+ "Could not open {}. Please make sure it is installed.".format(BABELTRACE_BIN),
+ session_info,
+ )
malloc_event_found = False
free_event_found = False
malloc_event_found = False
free_event_found = False
if malloc_event_found and free_event_found:
continue
if malloc_event_found and free_event_found:
continue
- event_line = event_line.decode('utf-8').replace("\n", "")
+ event_line = event_line.decode("utf-8").replace("\n", "")
if re.search(r".*lttng_ust_libc:malloc.*", event_line) is not None:
malloc_event_found = True
if re.search(r".*lttng_ust_libc:malloc.*", event_line) is not None:
malloc_event_found = True
babeltrace_process.wait()
babeltrace_process.wait()
-print_test_result(babeltrace_process.returncode == 0, current_test, "Resulting trace is readable")
+print_test_result(
+ babeltrace_process.returncode == 0, current_test, "Resulting trace is readable"
+)
-print_test_result(malloc_event_found, current_test, "lttng_ust_libc:malloc event found in resulting trace")
+print_test_result(
+ malloc_event_found,
+ current_test,
+ "lttng_ust_libc:malloc event found in resulting trace",
+)
-print_test_result(free_event_found, current_test, "lttng_ust_libc:free event found in resulting trace")
+print_test_result(
+ free_event_found, current_test, "lttng_ust_libc:free event found in resulting trace"
+)
current_test += 1
shutil.rmtree(session_info.tmp_directory)
current_test += 1
shutil.rmtree(session_info.tmp_directory)
match = re.search(r".*_seqfield1_length = (\d+)", event_line)
if match is None or int(match.group(1)) != 4:
return False
match = re.search(r".*_seqfield1_length = (\d+)", event_line)
if match is None or int(match.group(1)) != 4:
return False
- match = re.search(r".*seqfield1 = \[ \[0\] = (\d+), \[1\] = (\d+), \[2\] = (\d+), \[3\] = (\d+) \]", event_line)
- if match is None or int(match.group(1)) != 116 or int(match.group(2)) != 101 or int(match.group(3)) != 115 or int(match.group(4)) != 116:
+ match = re.search(
+ r".*seqfield1 = \[ \[0\] = (\d+), \[1\] = (\d+), \[2\] = (\d+), \[3\] = (\d+) \]",
+ event_line,
+ )
+ if (
+ match is None
+ or int(match.group(1)) != 116
+ or int(match.group(2)) != 101
+ or int(match.group(3)) != 115
+ or int(match.group(4)) != 116
+ ):
- match = re.search(r".*arrfield1 = \[ \[0\] = (\d), \[1\] = (\d), \[2\] = (\d) \]", event_line)
- if match is None or int(match.group(1)) != 1 or int(match.group(2)) != 2 or int(match.group(3)) != 3:
+ match = re.search(
+ r".*arrfield1 = \[ \[0\] = (\d), \[1\] = (\d), \[2\] = (\d) \]", event_line
+ )
+ if (
+ match is None
+ or int(match.group(1)) != 1
+ or int(match.group(2)) != 2
+ or int(match.group(3)) != 3
+ ):
return False
match = re.search(r".*arrfield2 = \"([a-z]*)\"", event_line)
if match is None or match.group(1) != "test":
return False
match = re.search(r".*arrfield2 = \"([a-z]*)\"", event_line)
if match is None or match.group(1) != "test":
NR_TESTS = 0
DYNAMIC_TEST_ENABLED = False
NR_TESTS = 0
DYNAMIC_TEST_ENABLED = False
DYNAMIC_TEST_ENABLED = True
# Only enable tests that were compiled successfully
DYNAMIC_TEST_ENABLED = True
# Only enable tests that were compiled successfully
-test_executables = [executable for executable in test_executables if os.path.exists(executable)]
+test_executables = [
+ executable for executable in test_executables if os.path.exists(executable)
+]
NR_TESTS += len(test_executables) * 10
NR_TESTS += len(test_executables) * 10
# Check if a sessiond is running... bail out if none found.
if session_daemon_alive() == 0:
# Check if a sessiond is running... bail out if none found.
if session_daemon_alive() == 0:
- bail("No sessiond running. Please make sure you are running this test with the \"run\" shell script and verify that the lttng tools are properly installed.")
+ bail(
+ 'No sessiond running. Please make sure you are running this test with the "run" shell script and verify that the lttng tools are properly installed.'
+ )
if DYNAMIC_TEST_ENABLED:
session_info = create_session()
if DYNAMIC_TEST_ENABLED:
session_info = create_session()
start_session(session_info)
# Dry run, no events should be logged
start_session(session_info)
# Dry run, no events should be logged
- demo_process = subprocess.Popen(test_path + "demo", stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+ demo_process = subprocess.Popen(
+ test_path + "demo", stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
+ )
demo_process.wait()
stop_session(session_info)
demo_process.wait()
stop_session(session_info)
- print_test_result(demo_process.returncode == 0, current_test,\
- "Running application dynamically linked to providers, no preload")
+ print_test_result(
+ demo_process.returncode == 0,
+ current_test,
+ "Running application dynamically linked to providers, no preload",
+ )
current_test += 1
trace_path = os.path.join(session_info.trace_path, "ust", "uid")
current_test += 1
trace_path = os.path.join(session_info.trace_path, "ust", "uid")
- print_test_result(not os.path.exists(trace_path), current_test,\
- "No events logged when running demo application without preloading providers")
+ print_test_result(
+ not os.path.exists(trace_path),
+ current_test,
+ "No events logged when running demo application without preloading providers",
+ )
current_test += 1
shutil.rmtree(session_info.tmp_directory)
current_test += 1
shutil.rmtree(session_info.tmp_directory)
enable_ust_tracepoint_event(session_info, "ust_tests_demo*")
start_session(session_info)
enable_ust_tracepoint_event(session_info, "ust_tests_demo*")
start_session(session_info)
- demo_process = subprocess.Popen(executable, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+ demo_process = subprocess.Popen(
+ executable, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
+ )
demo_process.wait()
stop_session(session_info)
trace_found = os.path.exists(session_info.trace_path)
demo_process.wait()
stop_session(session_info)
trace_found = os.path.exists(session_info.trace_path)
- print_test_result(trace_found, current_test,\
- "{0}, resulting trace found".format(executable_name))
+ print_test_result(
+ trace_found, current_test, "{0}, resulting trace found".format(executable_name)
+ )
current_test += 1
if not trace_found:
current_test += 1
if not trace_found:
- babeltrace_process = subprocess.Popen([BABELTRACE_BIN, session_info.trace_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ babeltrace_process = subprocess.Popen(
+ [BABELTRACE_BIN, session_info.trace_path],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
except FileNotFoundError:
except FileNotFoundError:
- bail("Could not open {}. Please make sure it is installed.".format(BABELTRACE_BIN))
+ bail(
+ "Could not open {}. Please make sure it is installed.".format(
+ BABELTRACE_BIN
+ )
+ )
# We should find 8 events in the resulting trace
event_entries = []
for event_line in babeltrace_process.stdout:
# We should find 8 events in the resulting trace
event_entries = []
for event_line in babeltrace_process.stdout:
- event_line = event_line.decode('utf-8').replace("\n", "")
+ event_line = event_line.decode("utf-8").replace("\n", "")
event_entries.append(event_line)
if len(event_entries) != 8:
event_entries.append(event_line)
if len(event_entries) != 8:
- bail("{0}, wrong number of events found in resulting trace.".format(executable_name))
+ bail(
+ "{0}, wrong number of events found in resulting trace.".format(
+ executable_name
+ )
+ )
shutil.rmtree(session_info.tmp_directory)
shutil.rmtree(session_info.tmp_directory)
- print_test_result(len(event_entries) == 8, current_test,\
- "{0}, total number of events logged is correct".format(executable_name))
+ print_test_result(
+ len(event_entries) == 8,
+ current_test,
+ "{0}, total number of events logged is correct".format(executable_name),
+ )
current_test += 1
# Check each loop event
match = re.search(r".*ust_tests_demo:starting.*value = (\d+) ", event_entries[0])
current_test += 1
# Check each loop event
match = re.search(r".*ust_tests_demo:starting.*value = (\d+) ", event_entries[0])
- print_test_result(match is not None and (int(match.group(1)) == 123), current_test,\
- "{0}, ust_tests_demo:starting event found in trace with a correct integer argument".format(executable_name))
+ print_test_result(
+ match is not None and (int(match.group(1)) == 123),
+ current_test,
+ "{0}, ust_tests_demo:starting event found in trace with a correct integer argument".format(
+ executable_name
+ ),
+ )
current_test += 1
for i in range(5):
current_test += 1
for i in range(5):
- print_test_result(check_ust_test_demo2_event(event_entries[i+1], i), current_test,\
- "{0}, ust_tests_demo2:loop event found in trace and arguments are correct, iteration ".format(executable_name)\
- + str(i + 1))
+ print_test_result(
+ check_ust_test_demo2_event(event_entries[i + 1], i),
+ current_test,
+ "{0}, ust_tests_demo2:loop event found in trace and arguments are correct, iteration ".format(
+ executable_name
+ )
+ + str(i + 1),
+ )
current_test += 1
match = re.search(r".*ust_tests_demo:done.*value = (\d+)", event_entries[6])
current_test += 1
match = re.search(r".*ust_tests_demo:done.*value = (\d+)", event_entries[6])
- print_test_result(match is not None and (int(match.group(1)) == 456), current_test,\
- "{0}, ust_tests_demo:done event found in resulting trace with a correct integer argument".format(executable_name))
+ print_test_result(
+ match is not None and (int(match.group(1)) == 456),
+ current_test,
+ "{0}, ust_tests_demo:done event found in resulting trace with a correct integer argument".format(
+ executable_name
+ ),
+ )
current_test += 1
match = re.search(r".*ust_tests_demo3:done.*value = (\d+)", event_entries[7])
current_test += 1
match = re.search(r".*ust_tests_demo3:done.*value = (\d+)", event_entries[7])
- print_test_result(match is not None and (int(match.group(1)) == 42), current_test,\
- "{0}, ust_tests_demo3:done event found in resulting trace with a correct integer argument".format(executable_name))
+ print_test_result(
+ match is not None and (int(match.group(1)) == 42),
+ current_test,
+ "{0}, ust_tests_demo3:done event found in resulting trace with a correct integer argument".format(
+ executable_name
+ ),
+ )
# Check if a sessiond is running... bail out if none found.
if session_daemon_alive() == 0:
# Check if a sessiond is running... bail out if none found.
if session_daemon_alive() == 0:
- bail("No sessiond running. Please make sure you are running this test with the \"run\" shell script and verify that the lttng tools are properly installed.")
+ bail(
+ 'No sessiond running. Please make sure you are running this test with the "run" shell script and verify that the lttng tools are properly installed.'
+ )
session_info = create_session()
enable_ust_tracepoint_event(session_info, "ust_tests_td*")
session_info = create_session()
enable_ust_tracepoint_event(session_info, "ust_tests_td*")
test_env = os.environ.copy()
test_env["LTTNG_UST_REGISTER_TIMEOUT"] = "-1"
test_env = os.environ.copy()
test_env["LTTNG_UST_REGISTER_TIMEOUT"] = "-1"
-td_process = subprocess.Popen(test_path + "type-declarations", stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, env=test_env)
+td_process = subprocess.Popen(
+ test_path + "type-declarations",
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ env=test_env,
+)
-print_test_result(td_process.returncode == 0, current_test, "Test application exited normally")
+print_test_result(
+ td_process.returncode == 0, current_test, "Test application exited normally"
+)
current_test += 1
stop_session(session_info)
# Check event fields using type declarations are present
try:
current_test += 1
stop_session(session_info)
# Check event fields using type declarations are present
try:
- babeltrace_process = subprocess.Popen([BABELTRACE_BIN, session_info.trace_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ babeltrace_process = subprocess.Popen(
+ [BABELTRACE_BIN, session_info.trace_path],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
except FileNotFoundError:
bail("Could not open {}. Please make sure it is installed.".format(BABELTRACE_BIN))
event_lines = []
for event_line in babeltrace_process.stdout:
except FileNotFoundError:
bail("Could not open {}. Please make sure it is installed.".format(BABELTRACE_BIN))
event_lines = []
for event_line in babeltrace_process.stdout:
- event_line = event_line.decode('utf-8').replace("\n", "")
+ event_line = event_line.decode("utf-8").replace("\n", "")
event_lines.append(event_line)
babeltrace_process.wait()
event_lines.append(event_line)
babeltrace_process.wait()
-print_test_result(babeltrace_process.returncode == 0, current_test, "Resulting trace is readable")
+print_test_result(
+ babeltrace_process.returncode == 0, current_test, "Resulting trace is readable"
+)
current_test += 1
if babeltrace_process.returncode != 0:
bail("Unreadable trace; can't proceed with analysis.")
current_test += 1
if babeltrace_process.returncode != 0:
bail("Unreadable trace; can't proceed with analysis.")
-print_test_result(len(event_lines) == 5, current_test, "Correct number of events found in resulting trace")
+print_test_result(
+ len(event_lines) == 5,
+ current_test,
+ "Correct number of events found in resulting trace",
+)
current_test += 1
if len(event_lines) != 5:
current_test += 1
if len(event_lines) != 5:
- bail("Unexpected number of events found in resulting trace (" + session_info.trace_path + ")." )
-
-match = re.search(r".*ust_tests_td:(.*):.*enumfield = \( \"(.*)\" :.*enumfield_bis = \( \"(.*)\" :.*enumfield_third = .*:.*", event_lines[0])
-print_test_result(match is not None and match.group(1) == "tptest", current_test,\
- "First tracepoint is present")
+ bail(
+ "Unexpected number of events found in resulting trace ("
+ + session_info.trace_path
+ + ")."
+ )
+
+match = re.search(
+ r".*ust_tests_td:(.*):.*enumfield = \( \"(.*)\" :.*enumfield_bis = \( \"(.*)\" :.*enumfield_third = .*:.*",
+ event_lines[0],
+)
+print_test_result(
+ match is not None and match.group(1) == "tptest",
+ current_test,
+ "First tracepoint is present",
+)
-print_test_result(match is not None and match.group(2) == "zero", current_test,\
- "First tracepoint's enum value maps to zero")
+print_test_result(
+ match is not None and match.group(2) == "zero",
+ current_test,
+ "First tracepoint's enum value maps to zero",
+)
-print_test_result(match is not None and match.group(3) == "one", current_test,\
- "First tracepoint's second enum value maps to one")
+print_test_result(
+ match is not None and match.group(3) == "one",
+ current_test,
+ "First tracepoint's second enum value maps to one",
+)
current_test += 1
match = re.search(r".*ust_tests_td:(.*):.*enumfield = \( \"(.*)\" :.*", event_lines[1])
current_test += 1
match = re.search(r".*ust_tests_td:(.*):.*enumfield = \( \"(.*)\" :.*", event_lines[1])
-print_test_result(match is not None and match.group(1) == "tptest_bis", current_test,\
- "Second tracepoint is present")
+print_test_result(
+ match is not None and match.group(1) == "tptest_bis",
+ current_test,
+ "Second tracepoint is present",
+)
-print_test_result(match is not None and match.group(2) == "zero", current_test,\
- "Second tracepoint's enum value maps to zero")
+print_test_result(
+ match is not None and match.group(2) == "zero",
+ current_test,
+ "Second tracepoint's enum value maps to zero",
+)
-match = re.search(r".*ust_tests_td:(.*):.*enumfield = \( \"(.*)\" :.*enumfield_bis = \( \"(.*)\" .*", event_lines[2])
+match = re.search(
+ r".*ust_tests_td:(.*):.*enumfield = \( \"(.*)\" :.*enumfield_bis = \( \"(.*)\" .*",
+ event_lines[2],
+)
-print_test_result(match is not None and match.group(2) == "one", current_test,\
- "Third tracepoint's enum value maps to one")
+print_test_result(
+ match is not None and match.group(2) == "one",
+ current_test,
+ "Third tracepoint's enum value maps to one",
+)
-print_test_result('{ zero = ( "zero" : container = 0 ), two = ( "two" : container = 2 ), three = ( "three" : container = 3 ), fifteen = ( "ten_to_twenty" : container = 15 ), twenty_one = ( "twenty_one" : container = 21 ) }' in event_lines[4],
- current_test, 'Auto-incrementing enum values are correct')
+print_test_result(
+ '{ zero = ( "zero" : container = 0 ), two = ( "two" : container = 2 ), three = ( "three" : container = 3 ), fifteen = ( "ten_to_twenty" : container = 15 ), twenty_one = ( "twenty_one" : container = 21 ) }'
+ in event_lines[4],
+ current_test,
+ "Auto-incrementing enum values are correct",
+)
shutil.rmtree(session_info.tmp_directory)
shutil.rmtree(session_info.tmp_directory)
-have_dlmopen = (os.environ.get('LTTNG_TOOLS_HAVE_DLMOPEN') == '1')
+have_dlmopen = os.environ.get("LTTNG_TOOLS_HAVE_DLMOPEN") == "1"
# Check if a sessiond is running... bail out if none found.
if session_daemon_alive() == 0:
# Check if a sessiond is running... bail out if none found.
if session_daemon_alive() == 0:
- bail("""No sessiond running. Please make sure you are running this test
+ bail(
+ """No sessiond running. Please make sure you are running this test
with the "run" shell script and verify that the lttng tools are
with the "run" shell script and verify that the lttng tools are
- properly installed.""")
+ properly installed."""
+ )
session_info = create_session()
enable_ust_tracepoint_event(session_info, "*")
session_info = create_session()
enable_ust_tracepoint_event(session_info, "*")
test_env = os.environ.copy()
test_env["LD_PRELOAD"] = test_env.get("LD_PRELOAD", "") + ":liblttng-ust-dl.so"
test_env["LD_LIBRARY_PATH"] = test_env.get("LD_LIBRARY_PATH", "") + ":" + test_path
test_env = os.environ.copy()
test_env["LD_PRELOAD"] = test_env.get("LD_PRELOAD", "") + ":liblttng-ust-dl.so"
test_env["LD_LIBRARY_PATH"] = test_env.get("LD_LIBRARY_PATH", "") + ":" + test_path
-test_process = subprocess.Popen(test_path + "prog",
- stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
- env=test_env)
+test_process = subprocess.Popen(
+ test_path + "prog",
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ env=test_env,
+)
-print_test_result(test_process.returncode == 0, current_test, "Test application exited normally")
+print_test_result(
+ test_process.returncode == 0, current_test, "Test application exited normally"
+)
current_test += 1
stop_session(session_info)
# Check for dl events in the resulting trace
try:
current_test += 1
stop_session(session_info)
# Check for dl events in the resulting trace
try:
- babeltrace_process = subprocess.Popen([BABELTRACE_BIN, session_info.trace_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ babeltrace_process = subprocess.Popen(
+ [BABELTRACE_BIN, session_info.trace_path],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
except FileNotFoundError:
except FileNotFoundError:
- bail("Could not open {}. Please make sure it is installed.".format(BABELTRACE_BIN), session_info)
+ bail(
+ "Could not open {}. Please make sure it is installed.".format(BABELTRACE_BIN),
+ session_info,
+ )
dlopen_event_found = 0
dlmopen_event_found = 0
dlopen_event_found = 0
dlmopen_event_found = 0
for event_line in babeltrace_process.stdout:
for event_line in babeltrace_process.stdout:
- event_line = event_line.decode('utf-8').replace("\n", "")
+ event_line = event_line.decode("utf-8").replace("\n", "")
if re.search(r".*lttng_ust_dl:dlopen.*", event_line) is not None:
dlopen_event_found += 1
elif re.search(r".*lttng_ust_dl:dlmopen.*", event_line) is not None:
if re.search(r".*lttng_ust_dl:dlopen.*", event_line) is not None:
dlopen_event_found += 1
elif re.search(r".*lttng_ust_dl:dlmopen.*", event_line) is not None:
babeltrace_process.wait()
babeltrace_process.wait()
-print_test_result(babeltrace_process.returncode == 0, current_test, "Resulting trace is readable")
+print_test_result(
+ babeltrace_process.returncode == 0, current_test, "Resulting trace is readable"
+)
-print_test_result(dlopen_event_found > 0, current_test, "lttng_ust_dl:dlopen event found in resulting trace")
+print_test_result(
+ dlopen_event_found > 0,
+ current_test,
+ "lttng_ust_dl:dlopen event found in resulting trace",
+)
current_test += 1
if have_dlmopen:
current_test += 1
if have_dlmopen:
- print_test_result(dlmopen_event_found > 0, current_test, "lttng_ust_dl:dlmopen event found in resulting trace")
+ print_test_result(
+ dlmopen_event_found > 0,
+ current_test,
+ "lttng_ust_dl:dlmopen event found in resulting trace",
+ )
- skip_test(current_test, 'dlmopen() is not available')
+ skip_test(current_test, "dlmopen() is not available")
-print_test_result(build_id_event_found > 0, current_test, "lttng_ust_dl:build_id event found in resulting trace")
+print_test_result(
+ build_id_event_found > 0,
+ current_test,
+ "lttng_ust_dl:build_id event found in resulting trace",
+)
-print_test_result(debug_link_event_found > 0, current_test, "lttng_ust_dl:debug_link event found in resulting trace")
+print_test_result(
+ debug_link_event_found > 0,
+ current_test,
+ "lttng_ust_dl:debug_link event found in resulting trace",
+)
-print_test_result(dlclose_event_found > 0, current_test, "lttng_ust_dl:dlclose event found in resulting trace")
+print_test_result(
+ dlclose_event_found > 0,
+ current_test,
+ "lttng_ust_dl:dlclose event found in resulting trace",
+)
-print_test_result(load_event_found > 0, current_test, "lttng_ust_lib:load event found in resulting trace")
+print_test_result(
+ load_event_found > 0,
+ current_test,
+ "lttng_ust_lib:load event found in resulting trace",
+)
-print_test_result(load_build_id_event_found > 0, current_test, "lttng_ust_lib:build_id event found in resulting trace")
+print_test_result(
+ load_build_id_event_found > 0,
+ current_test,
+ "lttng_ust_lib:build_id event found in resulting trace",
+)
-print_test_result(load_debug_link_event_found > 0, current_test, "lttng_ust_lib:debug_link event found in resulting trace")
+print_test_result(
+ load_debug_link_event_found > 0,
+ current_test,
+ "lttng_ust_lib:debug_link event found in resulting trace",
+)
-print_test_result(unload_event_found == 3, current_test, "lttng_ust_lib:unload event found 3 times in resulting trace")
+print_test_result(
+ unload_event_found == 3,
+ current_test,
+ "lttng_ust_lib:unload event found 3 times in resulting trace",
+)
-print_test_result(load_libfoo_found == 1, current_test, "lttng_ust_lib:load libfoo.so event found once in resulting trace")
+print_test_result(
+ load_libfoo_found == 1,
+ current_test,
+ "lttng_ust_lib:load libfoo.so event found once in resulting trace",
+)
-print_test_result(load_libbar_found == 1, current_test, "lttng_ust_lib:load libbar.so event found once in resulting trace")
+print_test_result(
+ load_libbar_found == 1,
+ current_test,
+ "lttng_ust_lib:load libbar.so event found once in resulting trace",
+)
-print_test_result(load_libzzz_found == 1, current_test, "lttng_ust_lib:load libzzz.so event found once in resulting trace")
+print_test_result(
+ load_libzzz_found == 1,
+ current_test,
+ "lttng_ust_lib:load libzzz.so event found once in resulting trace",
+)
current_test += 1
shutil.rmtree(session_info.tmp_directory)
current_test += 1
shutil.rmtree(session_info.tmp_directory)
session_name=self.name,
domain_name=domain_option_name,
channel_name=channel_name,
session_name=self.name,
domain_name=domain_option_name,
channel_name=channel_name,
- buffer_sharing_policy="--buffers-uid"
- if buffer_sharing_policy == lttngctl.BufferSharingPolicy.PerUID
- else "--buffers-pid",
+ buffer_sharing_policy=(
+ "--buffers-uid"
+ if buffer_sharing_policy == lttngctl.BufferSharingPolicy.PerUID
+ else "--buffers-pid"
+ ),
)
)
return _Channel(self._client, channel_name, domain, self)
)
)
return _Channel(self._client, channel_name, domain, self)
import subprocess
import re
import subprocess
import re
def addr2line(executable, addr):
"""
def addr2line(executable, addr):
"""
- Uses binutils' addr2line to get function containing a given address
+ Uses binutils' addr2line to get function containing a given address
- cmd += ['-e', executable]
+ cmd += ["-e", executable]
# Expand inlined functions
# Expand inlined functions
- cmd += ['--addresses', addr]
+ cmd += ["--addresses", addr]
status = subprocess.run(cmd, stdout=subprocess.PIPE, check=True)
status = subprocess.run(cmd, stdout=subprocess.PIPE, check=True)
# - function name
# - source location
if len(addr2line_output) % 3 != 0:
# - function name
# - source location
if len(addr2line_output) % 3 != 0:
- raise Exception('Unexpected addr2line output:\n\t{}'.format('\n\t'.join(addr2line_output)))
+ raise Exception(
+ "Unexpected addr2line output:\n\t{}".format("\n\t".join(addr2line_output))
+ )
function_names = []
for address_line_number in range(0, len(addr2line_output), 3):
function_names = []
for address_line_number in range(0, len(addr2line_output), 3):
def extract_user_func_names(executable, raw_callstack):
"""
def extract_user_func_names(executable, raw_callstack):
"""
- Given a callstack from the Babeltrace CLI output, returns a set
- containing the name of the functions. This assumes that the binary have
- not changed since the execution.
+ Given a callstack from the Babeltrace CLI output, returns a set
+ containing the name of the functions. This assumes that the binary have
+ not changed since the execution.
"""
recorded_callstack = set()
# Remove commas and split on spaces
"""
recorded_callstack = set()
# Remove commas and split on spaces
- for index, addr in enumerate(raw_callstack.replace(',', '').split(' ')):
+ for index, addr in enumerate(raw_callstack.replace(",", "").split(" ")):
# Consider only the elements starting with '0x' which are the
# addresses recorded in the callstack
# Consider only the elements starting with '0x' which are the
# addresses recorded in the callstack
funcs = addr2line(executable, addr)
recorded_callstack.update(funcs)
return recorded_callstack
funcs = addr2line(executable, addr)
recorded_callstack.update(funcs)
return recorded_callstack
def extract_kernel_func_names(raw_callstack):
"""
def extract_kernel_func_names(raw_callstack):
"""
- Given a callstack from the Babeltrace CLI output, returns a set
- containing the name of the functions.
- Uses the /proc/kallsyms procfile to find the symbol associated with an
- address. This function should only be used if the user is root or has
- access to /proc/kallsyms.
+ Given a callstack from the Babeltrace CLI output, returns a set
+ containing the name of the functions.
+ Uses the /proc/kallsyms procfile to find the symbol associated with an
+ address. This function should only be used if the user is root or has
+ access to /proc/kallsyms.
"""
recorded_callstack = set()
"""
recorded_callstack = set()
+ syms = []
+ addresses = []
# We read kallsyms file and save the output
# We read kallsyms file and save the output
- with open('/proc/kallsyms') as kallsyms_f:
+ with open("/proc/kallsyms") as kallsyms_f:
for line in kallsyms_f:
line_tokens = line.split()
addr = line_tokens[0]
symbol = line_tokens[2]
addresses.append(int(addr, 16))
for line in kallsyms_f:
line_tokens = line.split()
addr = line_tokens[0]
symbol = line_tokens[2]
addresses.append(int(addr, 16))
- syms.append({'addr':int(addr, 16), 'symbol':symbol})
+ syms.append({"addr": int(addr, 16), "symbol": symbol})
# Save the address and symbol in a sorted list of tupple
# Save the address and symbol in a sorted list of tupple
- syms = sorted(syms, key=lambda k:k['addr'])
+ syms = sorted(syms, key=lambda k: k["addr"])
# We save the list of addresses in a seperate sorted list to easily bisect
# the closer address of a symbol.
addresses = sorted(addresses)
# Remove commas and split on spaces
# We save the list of addresses in a seperate sorted list to easily bisect
# the closer address of a symbol.
addresses = sorted(addresses)
# Remove commas and split on spaces
- for addr in raw_callstack.replace(',', '').split(' '):
- if '0x' in addr[:2]:
+ for addr in raw_callstack.replace(",", "").split(" "):
+ if "0x" in addr[:2]:
# Search the location of the address in the addresses list and
# deference this location in the syms list to add the associated
# symbol.
loc = bisect.bisect(addresses, int(addr, 16))
# Search the location of the address in the addresses list and
# deference this location in the syms list to add the associated
# symbol.
loc = bisect.bisect(addresses, int(addr, 16))
- recorded_callstack.add(syms[loc-1]['symbol'])
+ recorded_callstack.add(syms[loc - 1]["symbol"])
return recorded_callstack
return recorded_callstack
# Regex capturing the callstack_user and callstack_kernel context
# Regex capturing the callstack_user and callstack_kernel context
-user_cs_rexp='.*callstack_user\ \=\ \[(.*)\]\ .*\}, \{.*\}'
-kernel_cs_rexp='.*callstack_kernel\ \=\ \[(.*)\]\ .*\}, \{.*\}'
+user_cs_rexp = ".*callstack_user\ \=\ \[(.*)\]\ .*\}, \{.*\}"
+kernel_cs_rexp = ".*callstack_kernel\ \=\ \[(.*)\]\ .*\}, \{.*\}"
+
- Reads a line from stdin and expect it to be a wellformed Babeltrace CLI
- output containing containing a callstack context of the domain passed
- as argument.
+ Reads a line from stdin and expect it to be a wellformed Babeltrace CLI
+ output containing containing a callstack context of the domain passed
+ as argument.
"""
expected_callstack = set()
recorded_callstack = set()
"""
expected_callstack = set()
recorded_callstack = set()
if len(sys.argv) <= 2:
print(sys.argv)
if len(sys.argv) <= 2:
print(sys.argv)
- raise ValueError('USAGE: ./{} (--kernel|--user EXE) FUNC-NAMES'.format(sys.argv[0]))
+ raise ValueError(
+ "USAGE: ./{} (--kernel|--user EXE) FUNC-NAMES".format(sys.argv[0])
+ )
# If the `--user` option is passed, save the next argument as the path
# to the executable
# If the `--user` option is passed, save the next argument as the path
# to the executable
- argc=1
- executable=None
- if sys.argv[argc] in '--kernel':
+ argc = 1
+ executable = None
+ if sys.argv[argc] in "--kernel":
- cs_type='kernel'
- elif sys.argv[argc] in '--user':
+ cs_type = "kernel"
+ elif sys.argv[argc] in "--user":
- cs_type='user'
- argc+=1
+ cs_type = "user"
+ argc += 1
executable = sys.argv[argc]
else:
executable = sys.argv[argc]
else:
- raise Exception('Unknown domain')
+ raise Exception("Unknown domain")
# Extract the function names that are expected to be found call stack of
# the current events
# Extract the function names that are expected to be found call stack of
# the current events
# If there is no match, exit with error
if m is None:
# If there is no match, exit with error
if m is None:
- raise re.error('Callstack not found in event line')
+ raise re.error("Callstack not found in event line")
else:
raw_callstack = str(m.group(1))
else:
raw_callstack = str(m.group(1))
- if cs_type in 'user':
- recorded_callstack=extract_user_func_names(executable, raw_callstack)
- elif cs_type in 'kernel':
- recorded_callstack=extract_kernel_func_names(raw_callstack)
+ if cs_type in "user":
+ recorded_callstack = extract_user_func_names(executable, raw_callstack)
+ elif cs_type in "kernel":
+ recorded_callstack = extract_kernel_func_names(raw_callstack)
- raise Exception('Unknown domain')
+ raise Exception("Unknown domain")
# Verify that all expected function are present in the callstack
for e in expected_callstack:
if e not in recorded_callstack:
# Verify that all expected function are present in the callstack
for e in expected_callstack:
if e not in recorded_callstack:
- raise Exception('Expected function name not found in recorded callstack')
+ raise Exception("Expected function name not found in recorded callstack")
-if __name__ == '__main__':
+
+if __name__ == "__main__":
_last_time = _get_time_ns()
_last_time = _get_time_ns()
-BABELTRACE_BIN="babeltrace2"
+BABELTRACE_BIN = "babeltrace2"
+
class SessionInfo:
def __init__(self, handle, session_name, tmp_directory, channel_name):
class SessionInfo:
def __init__(self, handle, session_name, tmp_directory, channel_name):
self.trace_path = tmp_directory + "/" + session_name
self.channel_name = channel_name
self.trace_path = tmp_directory + "/" + session_name
self.channel_name = channel_name
-def bail(diag, session_info = None):
+
+def bail(diag, session_info=None):
print("Bail out!")
print("#", diag)
print("Bail out!")
print("#", diag)
shutil.rmtree(session_info.tmp_directory)
exit(-1)
shutil.rmtree(session_info.tmp_directory)
exit(-1)
def print_automatic_test_timing():
global _time_tests
global _last_time
def print_automatic_test_timing():
global _time_tests
global _last_time
print(" ---\n duration_ms: {:02f}\n ...".format(duration_ns / 1000000))
_last_time = _get_time_ns()
print(" ---\n duration_ms: {:02f}\n ...".format(duration_ns / 1000000))
_last_time = _get_time_ns()
def print_test_result(result, number, description):
result_string = None
if result is True:
def print_test_result(result, number, description):
result_string = None
if result is True:
print(result_string)
print_automatic_test_timing()
print(result_string)
print_automatic_test_timing()
def skip_test(number, description):
def skip_test(number, description):
- print('ok {} # skip {}'.format(number, description))
+ print("ok {} # skip {}".format(number, description))
print_automatic_test_timing()
print_automatic_test_timing()
def enable_ust_tracepoint_event(session_info, event_name):
event = Event()
event.name = event_name
def enable_ust_tracepoint_event(session_info, event_name):
event = Event()
event.name = event_name
if res < 0:
bail("Failed to enable userspace event " + event_name, session_info)
if res < 0:
bail("Failed to enable userspace event " + event_name, session_info)
def create_session():
dom = Domain()
dom.type = DOMAIN_UST
def create_session():
dom = Domain()
dom.type = DOMAIN_UST
bail("Failed to enable channel " + channel.name, session_info)
return session_info
bail("Failed to enable channel " + channel.name, session_info)
return session_info
def start_session(session_info):
start(session_info.name)
def start_session(session_info):
start(session_info.name)
-def stop_session(session_info, bailing = False):
+
+def stop_session(session_info, bailing=False):
# Workaround lttng-ctl outputing directly to stdout by spawning a subprocess.
lttng_binary_path = os.path.dirname(os.path.abspath(__file__)) + "/"
for i in range(3):
lttng_binary_path = os.path.dirname(lttng_binary_path)
lttng_binary_path = lttng_binary_path + "/src/bin/lttng/lttng"
# Workaround lttng-ctl outputing directly to stdout by spawning a subprocess.
lttng_binary_path = os.path.dirname(os.path.abspath(__file__)) + "/"
for i in range(3):
lttng_binary_path = os.path.dirname(lttng_binary_path)
lttng_binary_path = lttng_binary_path + "/src/bin/lttng/lttng"
- retcode = subprocess.call([lttng_binary_path, "stop", session_info.name], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ retcode = subprocess.call(
+ [lttng_binary_path, "stop", session_info.name],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
if retcode != 0 and not bailing:
bail("Unable to stop session " + session_info.name, session_info)
destroy(session_info.name)
if retcode != 0 and not bailing:
bail("Unable to stop session " + session_info.name, session_info)
destroy(session_info.name)
-except (ImportError) as e:
- _perror('lttngust package not found: {}'.format(e))
+except ImportError as e:
+ _perror("lttngust package not found: {}".format(e))
- ev1 = logging.getLogger('python-ev-test1');
- ev2 = logging.getLogger('python-ev-test2');
+ ev1 = logging.getLogger("python-ev-test1")
+ ev2 = logging.getLogger("python-ev-test2")
logging.basicConfig()
parser = argparse.ArgumentParser()
logging.basicConfig()
parser = argparse.ArgumentParser()
- parser.add_argument('-n', '--nr-iter', required=True)
- parser.add_argument('-s', '--wait', required=True)
- parser.add_argument('-d', '--fire-debug-event', action="store_true")
- parser.add_argument('-e', '--fire-second-event', action="store_true")
- parser.add_argument('-r', '--ready-file')
- parser.add_argument('-g', '--go-file')
+ parser.add_argument("-n", "--nr-iter", required=True)
+ parser.add_argument("-s", "--wait", required=True)
+ parser.add_argument("-d", "--fire-debug-event", action="store_true")
+ parser.add_argument("-e", "--fire-second-event", action="store_true")
+ parser.add_argument("-r", "--ready-file")
+ parser.add_argument("-g", "--go-file")
args = parser.parse_args()
nr_iter = int(args.nr_iter)
args = parser.parse_args()
nr_iter = int(args.nr_iter)
go_file = args.go_file
if ready_file is not None and os.path.exists(ready_file):
go_file = args.go_file
if ready_file is not None and os.path.exists(ready_file):
- raise ValueError('Ready file already exist')
+ raise ValueError("Ready file already exist")
if go_file is not None and os.path.exists(go_file):
if go_file is not None and os.path.exists(go_file):
- raise ValueError('Go file already exist. Review synchronization')
+ raise ValueError("Go file already exist. Review synchronization")
if (ready_file is None) != (go_file is None):
if (ready_file is None) != (go_file is None):
- raise ValueError('--go-file and --ready-file need each others, review'
- 'synchronization')
-
+ raise ValueError(
+ "--go-file and --ready-file need each others, review" "synchronization"
+ )
# Inform that we are ready, if necessary
if ready_file is not None:
# Inform that we are ready, if necessary
if ready_file is not None:
- open(ready_file, 'a').close()
+ open(ready_file, "a").close()
# Wait for go, if necessary
while go_file is not None and not os.path.exists(go_file):
time.sleep(0.5)
for i in range(nr_iter):
# Wait for go, if necessary
while go_file is not None and not os.path.exists(go_file):
time.sleep(0.5)
for i in range(nr_iter):
- ev1.info('{} fired [INFO]'.format(ev1.name))
+ ev1.info("{} fired [INFO]".format(ev1.name))
- ev1.debug('{} fired [DEBUG]'.format(ev1.name))
+ ev1.debug("{} fired [DEBUG]".format(ev1.name))
time.sleep(wait_time)
if fire_second_ev:
time.sleep(wait_time)
if fire_second_ev:
- ev2.info('{} fired [INFO]'.format(ev2.name))
+ ev2.info("{} fired [INFO]".format(ev2.name))
if ready_file is not None:
try:
if ready_file is not None:
try:
-if __name__ == '__main__':
+if __name__ == "__main__":