X-Git-Url: http://git.lttng.org/?a=blobdiff_plain;f=python-lttngust%2Flttngust%2Fagent.py;fp=python-lttngust%2Flttngust%2Fagent.py;h=0000000000000000000000000000000000000000;hb=9d4c8b2d907edb9ebc9bfde55602598e7ba0832e;hp=8658372819c5e5cd60e6412ac2e26db3a62e6a56;hpb=6ba6fd60507f8e045bdc4f1be14e9d99c6a15f7f;p=lttng-ust.git diff --git a/python-lttngust/lttngust/agent.py b/python-lttngust/lttngust/agent.py deleted file mode 100644 index 86583728..00000000 --- a/python-lttngust/lttngust/agent.py +++ /dev/null @@ -1,385 +0,0 @@ -# -*- coding: utf-8 -*- -# -# SPDX-License-Identifier: LGPL-2.1-only -# -# Copyright (C) 2015 Philippe Proulx -# Copyright (C) 2014 David Goulet - -from __future__ import unicode_literals -from __future__ import print_function -from __future__ import division -import lttngust.debug as dbg -import lttngust.loghandler -import lttngust.compat -import lttngust.cmd -from io import open -import threading -import logging -import socket -import time -import sys -import os - - -try: - # Python 2 - import Queue as queue -except ImportError: - # Python 3 - import queue - - -_PROTO_DOMAIN = 5 -_PROTO_MAJOR = 2 -_PROTO_MINOR = 0 - - -def _get_env_value_ms(key, default_s): - try: - val = int(os.getenv(key, default_s * 1000)) / 1000 - except: - val = -1 - - if val < 0: - fmt = 'invalid ${} value; {} seconds will be used' - dbg._pwarning(fmt.format(key, default_s)) - val = default_s - - return val - - -_REG_TIMEOUT = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_TIMEOUT', 5) -_RETRY_REG_DELAY = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_RETRY_DELAY', 3) - - -class _TcpClient(object): - def __init__(self, name, host, port, reg_queue): - super(self.__class__, self).__init__() - self._name = name - self._host = host - self._port = port - - try: - self._log_handler = lttngust.loghandler._Handler() - except (OSError) as e: - dbg._pwarning('cannot load library: {}'.format(e)) - raise e - - self._root_logger = logging.getLogger() - self._root_logger.setLevel(logging.NOTSET) - self._ref_count = 0 - self._sessiond_sock = None - self._reg_queue = reg_queue - self._server_cmd_handlers = { - lttngust.cmd._ServerCmdRegistrationDone: self._handle_server_cmd_reg_done, - lttngust.cmd._ServerCmdEnable: self._handle_server_cmd_enable, - lttngust.cmd._ServerCmdDisable: self._handle_server_cmd_disable, - lttngust.cmd._ServerCmdList: self._handle_server_cmd_list, - } - - def _debug(self, msg): - return 'client "{}": {}'.format(self._name, msg) - - def run(self): - while True: - try: - # connect to the session daemon - dbg._pdebug(self._debug('connecting to session daemon')) - self._connect_to_sessiond() - - # register to the session daemon after a successful connection - dbg._pdebug(self._debug('registering to session daemon')) - self._register() - - # wait for commands from the session daemon - self._wait_server_cmd() - except (Exception) as e: - # Whatever happens here, we have to close the socket and - # retry to connect to the session daemon since either - # the socket was closed, a network timeout occured, or - # invalid data was received. - dbg._pdebug(self._debug('got exception: {}'.format(e))) - self._cleanup_socket() - dbg._pdebug(self._debug('sleeping for {} s'.format(_RETRY_REG_DELAY))) - time.sleep(_RETRY_REG_DELAY) - - def _recv_server_cmd_header(self): - data = self._sessiond_sock.recv(lttngust.cmd._SERVER_CMD_HEADER_SIZE) - - if not data: - dbg._pdebug(self._debug('received empty server command header')) - return None - - assert(len(data) == lttngust.cmd._SERVER_CMD_HEADER_SIZE) - dbg._pdebug(self._debug('received server command header ({} bytes)'.format(len(data)))) - - return lttngust.cmd._server_cmd_header_from_data(data) - - def _recv_server_cmd(self): - server_cmd_header = self._recv_server_cmd_header() - - if server_cmd_header is None: - return None - - dbg._pdebug(self._debug('server command header: data size: {} bytes'.format(server_cmd_header.data_size))) - dbg._pdebug(self._debug('server command header: command ID: {}'.format(server_cmd_header.cmd_id))) - dbg._pdebug(self._debug('server command header: command version: {}'.format(server_cmd_header.cmd_version))) - data = bytes() - - if server_cmd_header.data_size > 0: - data = self._sessiond_sock.recv(server_cmd_header.data_size) - assert(len(data) == server_cmd_header.data_size) - - return lttngust.cmd._server_cmd_from_data(server_cmd_header, data) - - def _send_cmd_reply(self, cmd_reply): - data = cmd_reply.get_data() - dbg._pdebug(self._debug('sending command reply ({} bytes)'.format(len(data)))) - self._sessiond_sock.sendall(data) - - def _handle_server_cmd_reg_done(self, server_cmd): - dbg._pdebug(self._debug('got "registration done" server command')) - - if self._reg_queue is not None: - dbg._pdebug(self._debug('notifying _init_threads()')) - - try: - self._reg_queue.put(True) - except (Exception) as e: - # read side could be closed by now; ignore it - pass - - self._reg_queue = None - - def _handle_server_cmd_enable(self, server_cmd): - dbg._pdebug(self._debug('got "enable" server command')) - self._ref_count += 1 - - if self._ref_count == 1: - dbg._pdebug(self._debug('adding our handler to the root logger')) - self._root_logger.addHandler(self._log_handler) - - dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count))) - - return lttngust.cmd._ClientCmdReplyEnable() - - def _handle_server_cmd_disable(self, server_cmd): - dbg._pdebug(self._debug('got "disable" server command')) - self._ref_count -= 1 - - if self._ref_count < 0: - # disable command could be sent again when a session is destroyed - self._ref_count = 0 - - if self._ref_count == 0: - dbg._pdebug(self._debug('removing our handler from the root logger')) - self._root_logger.removeHandler(self._log_handler) - - dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count))) - - return lttngust.cmd._ClientCmdReplyDisable() - - def _handle_server_cmd_list(self, server_cmd): - dbg._pdebug(self._debug('got "list" server command')) - names = logging.Logger.manager.loggerDict.keys() - dbg._pdebug(self._debug('found {} loggers'.format(len(names)))) - cmd_reply = lttngust.cmd._ClientCmdReplyList(names=names) - - return cmd_reply - - def _handle_server_cmd(self, server_cmd): - cmd_reply = None - - if server_cmd is None: - dbg._pdebug(self._debug('bad server command')) - status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD - cmd_reply = lttngust.cmd._ClientCmdReply(status) - elif type(server_cmd) in self._server_cmd_handlers: - cmd_reply = self._server_cmd_handlers[type(server_cmd)](server_cmd) - else: - dbg._pdebug(self._debug('unknown server command')) - status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD - cmd_reply = lttngust.cmd._ClientCmdReply(status) - - if cmd_reply is not None: - self._send_cmd_reply(cmd_reply) - - def _wait_server_cmd(self): - while True: - try: - server_cmd = self._recv_server_cmd() - except socket.timeout: - # simply retry here; the protocol has no KA and we could - # wait for hours - continue - - self._handle_server_cmd(server_cmd) - - def _cleanup_socket(self): - try: - self._sessiond_sock.shutdown(socket.SHUT_RDWR) - self._sessiond_sock.close() - except: - pass - - self._sessiond_sock = None - - def _connect_to_sessiond(self): - # create session daemon TCP socket - if self._sessiond_sock is None: - self._sessiond_sock = socket.socket(socket.AF_INET, - socket.SOCK_STREAM) - - # Use str(self._host) here. Since this host could be a string - # literal, and since we're importing __future__.unicode_literals, - # we want to make sure the host is a native string in Python 2. - # This avoids an indirect module import (unicode module to - # decode the unicode string, eventually imported by the - # socket module if needed), which is not allowed in a thread - # directly created by a module in Python 2 (our case). - # - # tl;dr: Do NOT remove str() here, or this call in Python 2 - # _will_ block on an interpreter's mutex until the waiting - # register queue timeouts. - self._sessiond_sock.connect((str(self._host), self._port)) - - def _register(self): - cmd = lttngust.cmd._ClientRegisterCmd(_PROTO_DOMAIN, os.getpid(), - _PROTO_MAJOR, _PROTO_MINOR) - data = cmd.get_data() - self._sessiond_sock.sendall(data) - - -def _get_port_from_file(path): - port = None - dbg._pdebug('reading port from file "{}"'.format(path)) - - try: - f = open(path) - r_port = int(f.readline()) - f.close() - - if r_port > 0 or r_port <= 65535: - port = r_port - except: - pass - - return port - - -def _get_user_home_path(): - # $LTTNG_HOME overrides $HOME if it exists - return os.getenv('LTTNG_HOME', os.path.expanduser('~')) - - -_initialized = False -_SESSIOND_HOST = '127.0.0.1' - - -def _client_thread_target(name, port, reg_queue): - dbg._pdebug('creating client "{}" using TCP port {}'.format(name, port)) - client = _TcpClient(name, _SESSIOND_HOST, port, reg_queue) - dbg._pdebug('starting client "{}"'.format(name)) - client.run() - - -def _init_threads(): - global _initialized - - dbg._pdebug('entering') - - if _initialized: - dbg._pdebug('agent is already initialized') - return - - # This makes sure that the appropriate modules for encoding and - # decoding strings/bytes are imported now, since no import should - # happen within a thread at import time (our case). - 'lttng'.encode().decode() - - _initialized = True - sys_port = _get_port_from_file('/var/run/lttng/agent.port') - user_port_file = os.path.join(_get_user_home_path(), '.lttng', 'agent.port') - user_port = _get_port_from_file(user_port_file) - reg_queue = queue.Queue() - reg_expecting = 0 - - dbg._pdebug('system session daemon port: {}'.format(sys_port)) - dbg._pdebug('user session daemon port: {}'.format(user_port)) - - if sys_port == user_port and sys_port is not None: - # The two session daemon ports are the same. This is not normal. - # Connect to only one. - dbg._pdebug('both user and system session daemon have the same port') - sys_port = None - - try: - if sys_port is not None: - dbg._pdebug('creating system client thread') - t = threading.Thread(target=_client_thread_target, - args=('system', sys_port, reg_queue)) - t.name = 'system' - t.daemon = True - t.start() - dbg._pdebug('created and started system client thread') - reg_expecting += 1 - - if user_port is not None: - dbg._pdebug('creating user client thread') - t = threading.Thread(target=_client_thread_target, - args=('user', user_port, reg_queue)) - t.name = 'user' - t.daemon = True - t.start() - dbg._pdebug('created and started user client thread') - reg_expecting += 1 - except: - # cannot create threads for some reason; stop this initialization - dbg._pwarning('cannot create client threads') - return - - if reg_expecting == 0: - # early exit: looks like there's not even one valid port - dbg._pwarning('no valid LTTng session daemon port found (is the session daemon started?)') - return - - cur_timeout = _REG_TIMEOUT - - # We block here to make sure the agent is properly registered to - # the session daemon. If we timeout, the client threads will still - # continue to try to connect and register to the session daemon, - # but there is no guarantee that all following logging statements - # will make it to LTTng-UST. - # - # When a client thread receives a "registration done" confirmation - # from the session daemon it's connected to, it puts True in - # reg_queue. - while True: - try: - dbg._pdebug('waiting for registration done (expecting {}, timeout is {} s)'.format(reg_expecting, - cur_timeout)) - t1 = lttngust.compat._clock() - reg_queue.get(timeout=cur_timeout) - t2 = lttngust.compat._clock() - reg_expecting -= 1 - dbg._pdebug('unblocked') - - if reg_expecting == 0: - # done! - dbg._pdebug('successfully registered to session daemon(s)') - break - - cur_timeout -= (t2 - t1) - - if cur_timeout <= 0: - # timeout - dbg._pdebug('ran out of time') - break - except queue.Empty: - dbg._pdebug('ran out of time') - break - - dbg._pdebug('leaving') - - -_init_threads()