from lttng import *
_time_tests = True
-if os.getenv("TAP_AUTOTIME", "1") == "0":
+if os.getenv("LTTNG_TESTS_TAP_AUTOTIME", "1") == "0":
_time_tests = False
_last_time = _get_time_ns()
-BABELTRACE_BIN="babeltrace2"
+BABELTRACE_BIN = "babeltrace2"
+
class SessionInfo:
def __init__(self, handle, session_name, tmp_directory, channel_name):
self.trace_path = tmp_directory + "/" + session_name
self.channel_name = channel_name
-def bail(diag, session_info = None):
+
+def bail(diag, session_info=None):
print("Bail out!")
print("#", diag)
shutil.rmtree(session_info.tmp_directory)
exit(-1)
+
def print_automatic_test_timing():
global _time_tests
global _last_time
print(" ---\n duration_ms: {:02f}\n ...".format(duration_ns / 1000000))
_last_time = _get_time_ns()
+
def print_test_result(result, number, description):
result_string = None
if result is True:
print(result_string)
print_automatic_test_timing()
+
def skip_test(number, description):
- print('ok {} # skip {}'.format(number, description))
+ print("ok {} # skip {}".format(number, description))
print_automatic_test_timing()
+
def enable_ust_tracepoint_event(session_info, event_name):
event = Event()
event.name = event_name
if res < 0:
bail("Failed to enable userspace event " + event_name, session_info)
+
def create_session():
dom = Domain()
dom.type = DOMAIN_UST
bail("Failed to enable channel " + channel.name, session_info)
return session_info
+
def start_session(session_info):
start(session_info.name)
-def stop_session(session_info, bailing = False):
+
+def stop_session(session_info, bailing=False):
# Workaround lttng-ctl outputing directly to stdout by spawning a subprocess.
lttng_binary_path = os.path.dirname(os.path.abspath(__file__)) + "/"
for i in range(3):
lttng_binary_path = os.path.dirname(lttng_binary_path)
lttng_binary_path = lttng_binary_path + "/src/bin/lttng/lttng"
- retcode = subprocess.call([lttng_binary_path, "stop", session_info.name], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ retcode = subprocess.call(
+ [lttng_binary_path, "stop", session_info.name],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
if retcode != 0 and not bailing:
bail("Unable to stop session " + session_info.name, session_info)
destroy(session_info.name)