+class _LiveViewer:
+ """
+ Create a babeltrace2 live viewer.
+ """
+
+ def __init__(
+ self,
+ environment, # type: Environment
+ session, # type: str
+ hostname=None, # type: Optional[str]
+ ):
+ self._environment = environment
+ self._session = session
+ self._hostname = hostname
+ if self._hostname is None:
+ self._hostname = socket.gethostname()
+ self._events = []
+
+ ctf_live_cc = bt2.find_plugin("ctf").source_component_classes["lttng-live"]
+ self._live_iterator = bt2.TraceCollectionMessageIterator(
+ bt2.ComponentSpec(
+ ctf_live_cc,
+ {
+ "inputs": [
+ "net://localhost:{}/host/{}/{}".format(
+ environment.lttng_relayd_live_port,
+ self._hostname,
+ session,
+ )
+ ],
+ "session-not-found-action": "end",
+ },
+ )
+ )
+
+ try:
+ # Cause the connection to be initiated since tests
+ # tend to wait for a viewer to be connected before proceeding.
+ msg = next(self._live_iterator)
+ self._events.append(msg)
+ except bt2.TryAgain:
+ pass
+
+ @property
+ def output(self):
+ return self._events
+
+ @property
+ def messages(self):
+ return [x for x in self._events if type(x) is bt2._EventMessageConst]
+
+ def _drain(self, retry=False):
+ while True:
+ try:
+ for msg in self._live_iterator:
+ self._events.append(msg)
+ break
+ except bt2.TryAgain as e:
+ if retry:
+ time.sleep(0.01)
+ continue
+ else:
+ break
+
+ def wait_until_connected(self, timeout=0):
+ ctf_live_cc = bt2.find_plugin("ctf").source_component_classes["lttng-live"]
+ self._environment._log(
+ "Checking for connected clients at 'net://localhost:{}'".format(
+ self._environment.lttng_relayd_live_port
+ )
+ )
+ query_executor = bt2.QueryExecutor(
+ ctf_live_cc,
+ "sessions",
+ params={
+ "url": "net://localhost:{}".format(
+ self._environment.lttng_relayd_live_port
+ )
+ },
+ )
+ connected = False
+ started = time.time()
+ while not connected:
+ try:
+ if timeout != 0 and (time.time() - started) > timeout:
+ raise RuntimeError(
+ "Timed out waiting for connected clients on session '{}' after {}s".format(
+ self._session, time.time() - started
+ )
+ )
+ query_result = query_executor.query()
+ except bt2._Error:
+ time.sleep(0.01)
+ continue
+ for live_session in query_result:
+ if (
+ live_session["session-name"] == self._session
+ and live_session["client-count"] >= 1
+ ):
+ connected = True
+ self._environment._log(
+ "Session '{}' has {} connected clients".format(
+ live_session["session-name"], live_session["client-count"]
+ )
+ )
+ break
+ return connected
+
+ def wait(self):
+ if self._live_iterator:
+ self._drain(retry=True)
+ del self._live_iterator
+ self._live_iterator = None
+
+ def __del__(self):
+ pass
+
+
+class _WaitTraceTestApplication: