X-Git-Url: http://git.lttng.org/?a=blobdiff_plain;f=liblttng-ust-python-agent%2Flttng_agent.py.in;fp=liblttng-ust-python-agent%2Flttng_agent.py.in;h=0000000000000000000000000000000000000000;hb=de4dee04fa3e008fe1044538f78778a867563aa4;hp=9e8cf61171a388f5e50644efd512647c293afff3;hpb=e72c9d7ead60e3317bd6d1fade995c07021c947b;p=lttng-ust.git diff --git a/liblttng-ust-python-agent/lttng_agent.py.in b/liblttng-ust-python-agent/lttng_agent.py.in deleted file mode 100644 index 9e8cf611..00000000 --- a/liblttng-ust-python-agent/lttng_agent.py.in +++ /dev/null @@ -1,567 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright (C) 2014 - David Goulet -# -# This library is free software; you can redistribute it and/or modify it under -# the terms of the GNU Lesser General Public License as published by the Free -# Software Foundation; version 2.1 of the License. -# -# This library is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more -# details. -# -# You should have received a copy of the GNU Lesser General Public License -# along with this library; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - -from __future__ import unicode_literals - -import ctypes -import errno -import logging -import os -import sys -import threading -import struct -import select - -from select import epoll, EPOLLIN, EPOLLERR, EPOLLHUP -from socket import * -from time import sleep - -__all__ = ["lttng-agent"] -__author__ = "David Goulet " - -class LTTngAgent(): - """ - LTTng agent python code. A LTTng Agent is responsible to spawn two threads, - the current UID and root session daemon. Those two threads register to the - right daemon and handle the tracing. - - This class needs to be instantiate once and once the init returns, tracing - is ready to happen. - """ - - SESSIOND_ADDR = "127.0.0.1" - SEM_COUNT = 2 - # Timeout for the sempahore in seconds. - SEM_TIMEOUT = 5 - SEM_WAIT_PERIOD = 0.2 - - def __init__(self): - # Session daemon register semaphore. - self.register_sem = threading.Semaphore(LTTngAgent.SEM_COUNT); - - self.client_user = LTTngTCPClient(LTTngAgent.SESSIOND_ADDR, self.register_sem) - self.client_user.start() - - self.client_root = LTTngTCPClient(LTTngAgent.SESSIOND_ADDR, self.register_sem) - self.client_root.log_handler.is_root = True - self.client_root.start() - - acquire = 0 - timeout = LTTngAgent.SEM_TIMEOUT - while True: - # Quit if timeout has reached 0 or below. - if acquire == LTTngAgent.SEM_COUNT or timeout <= 0: - break; - - # Acquire semaphore for *user* thread. - if not self.register_sem.acquire(False): - sleep(LTTngAgent.SEM_WAIT_PERIOD) - timeout -= LTTngAgent.SEM_WAIT_PERIOD - else: - acquire += 1 - - def __del__(self): - self.destroy() - - def destroy(self): - self.client_user.destroy() - self.client_user.join() - - self.client_root.destroy() - self.client_root.join() - -class LTTngCmdError(RuntimeError): - """ - Command error thrown if an error is encountered in a command from the - session daemon. - """ - - def __init__(self, code): - super().__init__('LTTng command error: code {}'.format(code)) - self._code = code - - def get_code(self): - return self._code - -class LTTngUnknownCmdError(RuntimeError): - pass - -class LTTngLoggingHandler(logging.Handler): - """ - Class handler for the Python logging API. - """ - - def __init__(self): - logging.Handler.__init__(self, level = logging.NOTSET) - - # Refcount tracking how many events have been enabled. This value above - # 0 means that the handler is attached to the root logger. - self.refcount = 0 - - # Dict of enabled event. We track them so we know if it's ok to disable - # the received event. - self.enabled_events = {} - - # Am I root ? - self.is_root = False - - # Using the logging formatter to extract the asctime only. - self.log_fmt = logging.Formatter("%(asctime)s") - self.setFormatter(self.log_fmt) - - # ctypes lib for lttng-ust - try: - self.lttng_ust = ctypes.cdll.LoadLibrary("LIBDIR_STR/liblttng-ust-python-agent.so") - except OSError as e: - print("Unable to find libust for Python.") - - def emit(self, record): - """ - Fire LTTng UST tracepoint with the given record. - """ - asctime = self.format(record) - - self.lttng_ust.py_tracepoint(asctime.encode(), - record.getMessage().encode(), record.name.encode(), - record.funcName.encode(), record.lineno, record.levelno, - record.thread, record.threadName.encode()) - - def enable_event(self, name): - """ - Enable an event name which will ultimately add an handler to the root - logger if none is present. - """ - # Don't update the refcount if the event has been enabled prior. - if name in self.enabled_events: - return - - # Get the root logger and attach our handler. - root_logger = logging.getLogger() - # First thing first, we need to set the root logger to the loglevel - # NOTSET so we can catch everything. The default is 30. - root_logger.setLevel(logging.NOTSET) - - self.refcount += 1 - if self.refcount == 1: - root_logger.addHandler(self) - - self.enabled_events[name] = True - - def disable_event(self, name): - """ - Disable an event name which will ultimately add an handler to the root - logger if none is present. - """ - - if name not in self.enabled_events: - # Event was not enabled prior, do nothing. - return - - # Get the root logger and attach our handler. - root_logger = logging.getLogger() - - self.refcount -= 1 - if self.refcount == 0: - root_logger.removeHandler(self) - del self.enabled_events[name] - - def list_logger(self): - """ - Return a list of logger name. - """ - return logging.Logger.manager.loggerDict.keys() - -class LTTngSessiondCmd(): - """ - Class handling session daemon command. - """ - - # Command values from the agent protocol - CMD_LIST = 1 - CMD_ENABLE = 2 - CMD_DISABLE = 3 - CMD_REG_DONE = 4 - - # Return code - CODE_SUCCESS = 1 - CODE_INVALID_CMD = 2 - - # Python Logger LTTng domain value taken from lttng/domain.h - DOMAIN = 5 - - # Protocol version - MAJOR_VERSION = 1 - MINOR_VERSION = 0 - - def execute(self): - """ - This is part of the command interface. Must be implemented. - """ - raise NotImplementedError - -class LTTngCommandReply(): - """ - Object that contains the information that should be replied to the session - daemon after a command execution. - """ - - def __init__(self, payload = None, reply = True): - self.payload = payload - self.reply = reply - -class LTTngCommandEnable(LTTngSessiondCmd): - """ - Handle the enable event command from the session daemon. - """ - - def __init__(self, log_handler, data): - self.log_handler = log_handler - # 4 bytes for loglevel and 4 bytes for loglevel_type thus 8. - name_offset = 8; - - data_size = len(data) - if data_size == 0: - raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD) - - try: - self.loglevel, self.loglevel_type, self.name = \ - struct.unpack('>II%us' % (data_size - name_offset), data) - # Remove trailing NULL bytes from name. - self.name = self.name.decode().rstrip('\x00') - except struct.error: - raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD) - - def execute(self): - self.log_handler.enable_event(self.name) - return LTTngCommandReply() - -class LTTngCommandDisable(LTTngSessiondCmd): - """ - Handle the disable event command from the session daemon. - """ - - def __init__(self, log_handler, data): - self.log_handler = log_handler - - data_size = len(data) - if data_size == 0: - raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD) - - try: - self.name = struct.unpack('>%us' % (data_size), data)[0] - # Remove trailing NULL bytes from name. - self.name = self.name.decode().rstrip('\x00') - except struct.error: - raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD) - - def execute(self): - self.log_handler.disable_event(self.name) - return LTTngCommandReply() - -class LTTngCommandRegDone(LTTngSessiondCmd): - """ - Handle register done command. This is sent back after a successful - registration from the session daemon. We basically release the given - semaphore so the agent can return to the caller. - """ - - def __init__(self, sem): - self.sem = sem - - def execute(self): - self.sem.release() - return LTTngCommandReply(reply = False) - -class LTTngCommandList(LTTngSessiondCmd): - """ - Handle the list command from the session daemon on the given socket. - """ - - def __init__(self, log_handler): - self.log_handler = log_handler - - def execute(self): - data_size = 0 - data = logger_data = bytearray() - - loggers = self.log_handler.list_logger() - # First, pack nb_event that must preceed the data. - logger_data += struct.pack('>I', len(loggers)) - - # Populate payload with logger name. - for logger in loggers: - # Increment data size plus the NULL byte at the end of the name. - data_size += len(logger) + 1 - # Pack logger name and NULL byte. - logger_data += struct.pack('>%usB' % (len(logger)), \ - bytes(bytearray(str.encode(logger))), 0) - - # Pack uint32_t data_size followed by nb event (number of logger) - data = struct.pack('>I', data_size) - data += logger_data - return LTTngCommandReply(payload = data) - -class LTTngTCPClient(threading.Thread): - """ - TCP client that register and receives command from the session daemon. - """ - - SYSTEM_PORT_FILE = "/var/run/lttng/agent.port" - USER_PORT_FILE = os.path.join(os.path.expanduser("~"), ".lttng/agent.port") - - # The time in seconds this client should wait before trying again to - # register back to the session daemon. - WAIT_TIME = 3 - - def __init__(self, host, sem): - threading.Thread.__init__(self) - - # Which host to connect to. The port is fetch dynamically. - self.sessiond_host = host - - # The session daemon register done semaphore. Needs to be released when - # receiving a CMD_REG_DONE command. - self.register_sem = sem - self.register_sem.acquire() - - # Indicate that we have to quit thus stop the main loop. - self.quit_flag = False - # Quit pipe. The thread poll on it to know when to quit. - self.quit_pipe = os.pipe() - - # Socket on which we communicate with the session daemon. - self.sessiond_sock = None - # LTTng Logging Handler - self.log_handler = LTTngLoggingHandler() - - def cleanup_socket(self, epfd = None): - # Ease our life a bit. - sock = self.sessiond_sock - if not sock: - return - - try: - if epfd is not None: - epfd.unregister(sock) - sock.shutdown(SHUT_RDWR) - sock.close() - except select.error: - # Cleanup fail, we can't do anything much... - pass - except IOError: - pass - - self.sessiond_sock = None - - def destroy(self): - self.quit_flag = True - try: - fp = os.fdopen(self.quit_pipe[1], 'w') - fp.write("42") - fp.close() - except OSError as e: - pass - - def register(self): - """ - Register to session daemon using the previously connected socket of the - class. - - Command ABI: - uint32 domain - uint32 pid - """ - data = struct.pack('>IIII', LTTngSessiondCmd.DOMAIN, os.getpid(), \ - LTTngSessiondCmd.MAJOR_VERSION, LTTngSessiondCmd.MINOR_VERSION) - self.sessiond_sock.send(data) - - def run(self): - """ - Start the TCP client thread by registering to the session daemon and polling - on that socket for commands. - """ - - epfd = epoll() - epfd.register(self.quit_pipe[0], EPOLLIN) - - # Main loop to handle session daemon command and disconnection. - while not self.quit_flag: - try: - # First, connect to the session daemon. - self.connect_sessiond() - - # Register to session daemon after a successful connection. - self.register() - # Add registered socket to poll set. - epfd.register(self.sessiond_sock, EPOLLIN | EPOLLERR | EPOLLHUP) - - self.quit_flag = self.wait_cmd(epfd) - except IOError as e: - # Whatever happens here, we have to close down everything and - # retry to connect to the session daemon since either the - # socket is closed or invalid data was sent. - self.cleanup_socket(epfd) - self.register_sem.release() - sleep(LTTngTCPClient.WAIT_TIME) - continue - - self.cleanup_socket(epfd) - os.close(self.quit_pipe[0]) - epfd.close() - - def recv_header(self, sock): - """ - Receive the command header from the given socket. Set the internal - state of this object with the header data. - - Header ABI is defined like this: - uint64 data_size - uint32 cmd - uint32 cmd_version - """ - s_pack = struct.Struct('>QII') - - pack_data = sock.recv(s_pack.size) - data_received = len(pack_data) - if data_received == 0: - raise IOError(errno.ESHUTDOWN) - - try: - return s_pack.unpack(pack_data) - except struct.error: - raise IOError(errno.EINVAL) - - def create_command(self, cmd_type, version, data): - """ - Return the right command object using the given command type. The - command version is unused since we only have once for now. - """ - - cmd_dict = { - LTTngSessiondCmd.CMD_LIST: \ - lambda: LTTngCommandList(self.log_handler), - LTTngSessiondCmd.CMD_ENABLE: \ - lambda: LTTngCommandEnable(self.log_handler, data), - LTTngSessiondCmd.CMD_DISABLE: \ - lambda: LTTngCommandDisable(self.log_handler, data), - LTTngSessiondCmd.CMD_REG_DONE: \ - lambda: LTTngCommandRegDone(self.register_sem), - } - - if cmd_type in cmd_dict: - return cmd_dict[cmd_type]() - else: - raise LTTngUnknownCmdError() - - def pack_code(self, code): - return struct.pack('>I', code) - - def handle_command(self, data, cmd_type, cmd_version): - """ - Handle the given command type with the received payload. This function - sends back data to the session daemon using to the return value of the - command. - """ - payload = bytearray() - - try: - cmd = self.create_command(cmd_type, cmd_version, data) - cmd_reply = cmd.execute() - # Set success code in data - payload += self.pack_code(LTTngSessiondCmd.CODE_SUCCESS) - if cmd_reply.payload is not None: - payload += cmd_reply.payload - except LTTngCmdError as e: - # Set error code in payload - payload += self.pack_code(e.get_code()) - except LTTngUnknownCmdError: - # Set error code in payload - payload += self.pack_code(LTTngSessiondCmd.CODE_INVALID_CMD) - - # Send response only if asked for. - if cmd_reply.reply: - self.sessiond_sock.send(payload) - - def wait_cmd(self, epfd): - """ - """ - - while True: - try: - # Poll on socket for command. - events = epfd.poll() - except select.error as e: - raise IOError(e.errno, e.message) - - for fileno, event in events: - if fileno == self.quit_pipe[0]: - return True - elif event & (EPOLLERR | EPOLLHUP): - raise IOError(errno.ESHUTDOWN) - elif event & EPOLLIN: - data = bytearray() - - data_size, cmd, cmd_version = self.recv_header(self.sessiond_sock) - if data_size: - data += self.sessiond_sock.recv(data_size) - - self.handle_command(data, cmd, cmd_version) - else: - raise IOError(errno.ESHUTDOWN) - - def get_port_from_file(self, path): - """ - Open the session daemon agent port file and returns the value. If none - found, 0 is returned. - """ - - # By default, the port is set to 0 so if we can not find the agent port - # file simply don't try to connect. A value set to 0 indicates that. - port = 0 - - try: - f = open(path, "r") - r_port = int(f.readline()) - if r_port > 0 or r_port <= 65535: - port = r_port - f.close() - except IOError as e: - pass - except ValueError as e: - pass - - return port - - def connect_sessiond(self): - """ - Connect sessiond_sock to running session daemon using the port file. - """ - # Create session daemon TCP socket - if not self.sessiond_sock: - self.sessiond_sock = socket(AF_INET, SOCK_STREAM) - - if self.log_handler.is_root: - port = self.get_port_from_file(LTTngTCPClient.SYSTEM_PORT_FILE) - else: - port = self.get_port_from_file(LTTngTCPClient.USER_PORT_FILE) - - # No session daemon available - if port == 0: - raise IOError(errno.ECONNREFUSED) - - # Can raise an IOError so caller must catch it. - self.sessiond_sock.connect((self.sessiond_host, port))