Refactor Python agent
[lttng-ust.git] / liblttng-ust-python-agent / lttng_agent.py.in
diff --git a/liblttng-ust-python-agent/lttng_agent.py.in b/liblttng-ust-python-agent/lttng_agent.py.in
deleted file mode 100644 (file)
index 9e8cf61..0000000
+++ /dev/null
@@ -1,567 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Copyright (C) 2014 - David Goulet <dgoulet@efficios.com>
-#
-# This library is free software; you can redistribute it and/or modify it under
-# the terms of the GNU Lesser General Public License as published by the Free
-# Software Foundation; version 2.1 of the License.
-#
-# This library is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
-# details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with this library; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
-
-from __future__ import unicode_literals
-
-import ctypes
-import errno
-import logging
-import os
-import sys
-import threading
-import struct
-import select
-
-from select import epoll, EPOLLIN, EPOLLERR, EPOLLHUP
-from socket import *
-from time import sleep
-
-__all__ = ["lttng-agent"]
-__author__ = "David Goulet <dgoulet@efficios.com>"
-
-class LTTngAgent():
-    """
-    LTTng agent python code. A LTTng Agent is responsible to spawn two threads,
-    the current UID and root session daemon. Those two threads register to the
-    right daemon and handle the tracing.
-
-    This class needs to be instantiate once and once the init returns, tracing
-    is ready to happen.
-    """
-
-    SESSIOND_ADDR = "127.0.0.1"
-    SEM_COUNT = 2
-    # Timeout for the sempahore in seconds.
-    SEM_TIMEOUT = 5
-    SEM_WAIT_PERIOD = 0.2
-
-    def __init__(self):
-        # Session daemon register semaphore.
-        self.register_sem = threading.Semaphore(LTTngAgent.SEM_COUNT);
-
-        self.client_user = LTTngTCPClient(LTTngAgent.SESSIOND_ADDR, self.register_sem)
-        self.client_user.start()
-
-        self.client_root = LTTngTCPClient(LTTngAgent.SESSIOND_ADDR, self.register_sem)
-        self.client_root.log_handler.is_root = True
-        self.client_root.start()
-
-        acquire = 0
-        timeout = LTTngAgent.SEM_TIMEOUT
-        while True:
-            # Quit if timeout has reached 0 or below.
-            if acquire == LTTngAgent.SEM_COUNT or timeout <= 0:
-                break;
-
-            # Acquire semaphore for *user* thread.
-            if not self.register_sem.acquire(False):
-                sleep(LTTngAgent.SEM_WAIT_PERIOD)
-                timeout -= LTTngAgent.SEM_WAIT_PERIOD
-            else:
-                acquire += 1
-
-    def __del__(self):
-        self.destroy()
-
-    def destroy(self):
-        self.client_user.destroy()
-        self.client_user.join()
-
-        self.client_root.destroy()
-        self.client_root.join()
-
-class LTTngCmdError(RuntimeError):
-    """
-    Command error thrown if an error is encountered in a command from the
-    session daemon.
-    """
-
-    def __init__(self, code):
-        super().__init__('LTTng command error: code {}'.format(code))
-        self._code = code
-
-    def get_code(self):
-        return self._code
-
-class LTTngUnknownCmdError(RuntimeError):
-    pass
-
-class LTTngLoggingHandler(logging.Handler):
-    """
-    Class handler for the Python logging API.
-    """
-
-    def __init__(self):
-        logging.Handler.__init__(self, level = logging.NOTSET)
-
-        # Refcount tracking how many events have been enabled. This value above
-        # 0 means that the handler is attached to the root logger.
-        self.refcount = 0
-
-        # Dict of enabled event. We track them so we know if it's ok to disable
-        # the received event.
-        self.enabled_events = {}
-
-        # Am I root ?
-        self.is_root = False
-
-        # Using the logging formatter to extract the asctime only.
-        self.log_fmt = logging.Formatter("%(asctime)s")
-        self.setFormatter(self.log_fmt)
-
-        # ctypes lib for lttng-ust
-        try:
-            self.lttng_ust = ctypes.cdll.LoadLibrary("LIBDIR_STR/liblttng-ust-python-agent.so")
-        except OSError as e:
-            print("Unable to find libust for Python.")
-
-    def emit(self, record):
-        """
-        Fire LTTng UST tracepoint with the given record.
-        """
-        asctime = self.format(record)
-
-        self.lttng_ust.py_tracepoint(asctime.encode(),
-                record.getMessage().encode(), record.name.encode(),
-                record.funcName.encode(), record.lineno, record.levelno,
-                record.thread, record.threadName.encode())
-
-    def enable_event(self, name):
-        """
-        Enable an event name which will ultimately add an handler to the root
-        logger if none is present.
-        """
-        # Don't update the refcount if the event has been enabled prior.
-        if name in self.enabled_events:
-            return
-
-        # Get the root logger and attach our handler.
-        root_logger = logging.getLogger()
-        # First thing first, we need to set the root logger to the loglevel
-        # NOTSET so we can catch everything. The default is 30.
-        root_logger.setLevel(logging.NOTSET)
-
-        self.refcount += 1
-        if self.refcount == 1:
-            root_logger.addHandler(self)
-
-        self.enabled_events[name] = True
-
-    def disable_event(self, name):
-        """
-        Disable an event name which will ultimately add an handler to the root
-        logger if none is present.
-        """
-
-        if name not in self.enabled_events:
-            # Event was not enabled prior, do nothing.
-            return
-
-        # Get the root logger and attach our handler.
-        root_logger = logging.getLogger()
-
-        self.refcount -= 1
-        if self.refcount == 0:
-            root_logger.removeHandler(self)
-        del self.enabled_events[name]
-
-    def list_logger(self):
-        """
-        Return a list of logger name.
-        """
-        return logging.Logger.manager.loggerDict.keys()
-
-class LTTngSessiondCmd():
-    """
-    Class handling session daemon command.
-    """
-
-    # Command values from the agent protocol
-    CMD_LIST = 1
-    CMD_ENABLE = 2
-    CMD_DISABLE = 3
-    CMD_REG_DONE = 4
-
-    # Return code
-    CODE_SUCCESS = 1
-    CODE_INVALID_CMD = 2
-
-    # Python Logger LTTng domain value taken from lttng/domain.h
-    DOMAIN = 5
-
-    # Protocol version
-    MAJOR_VERSION = 1
-    MINOR_VERSION = 0
-
-    def execute(self):
-        """
-        This is part of the command interface. Must be implemented.
-        """
-        raise NotImplementedError
-
-class LTTngCommandReply():
-    """
-    Object that contains the information that should be replied to the session
-    daemon after a command execution.
-    """
-
-    def __init__(self, payload = None, reply = True):
-        self.payload = payload
-        self.reply = reply
-
-class LTTngCommandEnable(LTTngSessiondCmd):
-    """
-    Handle the enable event command from the session daemon.
-    """
-
-    def __init__(self, log_handler, data):
-        self.log_handler = log_handler
-        # 4 bytes for loglevel and 4 bytes for loglevel_type thus 8.
-        name_offset = 8;
-
-        data_size = len(data)
-        if data_size == 0:
-            raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD)
-
-        try:
-            self.loglevel, self.loglevel_type, self.name = \
-                    struct.unpack('>II%us' % (data_size - name_offset), data)
-            # Remove trailing NULL bytes from name.
-            self.name = self.name.decode().rstrip('\x00')
-        except struct.error:
-            raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD)
-
-    def execute(self):
-        self.log_handler.enable_event(self.name)
-        return LTTngCommandReply()
-
-class LTTngCommandDisable(LTTngSessiondCmd):
-    """
-    Handle the disable event command from the session daemon.
-    """
-
-    def __init__(self, log_handler, data):
-        self.log_handler = log_handler
-
-        data_size = len(data)
-        if data_size == 0:
-            raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD)
-
-        try:
-            self.name = struct.unpack('>%us' % (data_size), data)[0]
-            # Remove trailing NULL bytes from name.
-            self.name = self.name.decode().rstrip('\x00')
-        except struct.error:
-            raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD)
-
-    def execute(self):
-        self.log_handler.disable_event(self.name)
-        return LTTngCommandReply()
-
-class LTTngCommandRegDone(LTTngSessiondCmd):
-    """
-    Handle register done command. This is sent back after a successful
-    registration from the session daemon. We basically release the given
-    semaphore so the agent can return to the caller.
-    """
-
-    def __init__(self, sem):
-        self.sem = sem
-
-    def execute(self):
-        self.sem.release()
-        return LTTngCommandReply(reply = False)
-
-class LTTngCommandList(LTTngSessiondCmd):
-    """
-    Handle the list command from the session daemon on the given socket.
-    """
-
-    def __init__(self, log_handler):
-        self.log_handler = log_handler
-
-    def execute(self):
-        data_size = 0
-        data = logger_data = bytearray()
-
-        loggers = self.log_handler.list_logger()
-        # First, pack nb_event that must preceed the data.
-        logger_data += struct.pack('>I', len(loggers))
-
-        # Populate payload with logger name.
-        for logger in loggers:
-            # Increment data size plus the NULL byte at the end of the name.
-            data_size += len(logger) + 1
-            # Pack logger name and NULL byte.
-            logger_data += struct.pack('>%usB' % (len(logger)), \
-                    bytes(bytearray(str.encode(logger))), 0)
-
-        # Pack uint32_t data_size followed by nb event (number of logger)
-        data = struct.pack('>I', data_size)
-        data += logger_data
-        return LTTngCommandReply(payload = data)
-
-class LTTngTCPClient(threading.Thread):
-    """
-    TCP client that register and receives command from the session daemon.
-    """
-
-    SYSTEM_PORT_FILE = "/var/run/lttng/agent.port"
-    USER_PORT_FILE = os.path.join(os.path.expanduser("~"), ".lttng/agent.port")
-
-    # The time in seconds this client should wait before trying again to
-    # register back to the session daemon.
-    WAIT_TIME = 3
-
-    def __init__(self, host, sem):
-        threading.Thread.__init__(self)
-
-        # Which host to connect to. The port is fetch dynamically.
-        self.sessiond_host = host
-
-        # The session daemon register done semaphore. Needs to be released when
-        # receiving a CMD_REG_DONE command.
-        self.register_sem = sem
-        self.register_sem.acquire()
-
-        # Indicate that we have to quit thus stop the main loop.
-        self.quit_flag = False
-        # Quit pipe. The thread poll on it to know when to quit.
-        self.quit_pipe = os.pipe()
-
-        # Socket on which we communicate with the session daemon.
-        self.sessiond_sock = None
-        # LTTng Logging Handler
-        self.log_handler = LTTngLoggingHandler()
-
-    def cleanup_socket(self, epfd = None):
-        # Ease our life a bit.
-        sock = self.sessiond_sock
-        if not sock:
-            return
-
-        try:
-            if epfd is not None:
-                epfd.unregister(sock)
-            sock.shutdown(SHUT_RDWR)
-            sock.close()
-        except select.error:
-            # Cleanup fail, we can't do anything much...
-            pass
-        except IOError:
-            pass
-
-        self.sessiond_sock = None
-
-    def destroy(self):
-        self.quit_flag = True
-        try:
-            fp = os.fdopen(self.quit_pipe[1], 'w')
-            fp.write("42")
-            fp.close()
-        except OSError as e:
-            pass
-
-    def register(self):
-        """
-        Register to session daemon using the previously connected socket of the
-        class.
-
-        Command ABI:
-            uint32 domain
-            uint32 pid
-        """
-        data = struct.pack('>IIII', LTTngSessiondCmd.DOMAIN, os.getpid(), \
-                LTTngSessiondCmd.MAJOR_VERSION, LTTngSessiondCmd.MINOR_VERSION)
-        self.sessiond_sock.send(data)
-
-    def run(self):
-        """
-        Start the TCP client thread by registering to the session daemon and polling
-        on that socket for commands.
-        """
-
-        epfd = epoll()
-        epfd.register(self.quit_pipe[0], EPOLLIN)
-
-        # Main loop to handle session daemon command and disconnection.
-        while not self.quit_flag:
-            try:
-                # First, connect to the session daemon.
-                self.connect_sessiond()
-
-                # Register to session daemon after a successful connection.
-                self.register()
-                # Add registered socket to poll set.
-                epfd.register(self.sessiond_sock, EPOLLIN | EPOLLERR | EPOLLHUP)
-
-                self.quit_flag = self.wait_cmd(epfd)
-            except IOError as e:
-                # Whatever happens here, we have to close down everything and
-                # retry to connect to the session daemon since either the
-                # socket is closed or invalid data was sent.
-                self.cleanup_socket(epfd)
-                self.register_sem.release()
-                sleep(LTTngTCPClient.WAIT_TIME)
-                continue
-
-        self.cleanup_socket(epfd)
-        os.close(self.quit_pipe[0])
-        epfd.close()
-
-    def recv_header(self, sock):
-        """
-        Receive the command header from the given socket. Set the internal
-        state of this object with the header data.
-
-        Header ABI is defined like this:
-            uint64 data_size
-            uint32 cmd
-            uint32 cmd_version
-        """
-        s_pack = struct.Struct('>QII')
-
-        pack_data = sock.recv(s_pack.size)
-        data_received = len(pack_data)
-        if data_received == 0:
-            raise IOError(errno.ESHUTDOWN)
-
-        try:
-            return s_pack.unpack(pack_data)
-        except struct.error:
-            raise IOError(errno.EINVAL)
-
-    def create_command(self, cmd_type, version, data):
-        """
-        Return the right command object using the given command type. The
-        command version is unused since we only have once for now.
-        """
-
-        cmd_dict = {
-            LTTngSessiondCmd.CMD_LIST: \
-                    lambda: LTTngCommandList(self.log_handler),
-            LTTngSessiondCmd.CMD_ENABLE: \
-                    lambda: LTTngCommandEnable(self.log_handler, data),
-            LTTngSessiondCmd.CMD_DISABLE: \
-                    lambda: LTTngCommandDisable(self.log_handler, data),
-            LTTngSessiondCmd.CMD_REG_DONE: \
-                    lambda: LTTngCommandRegDone(self.register_sem),
-        }
-
-        if cmd_type in cmd_dict:
-            return cmd_dict[cmd_type]()
-        else:
-            raise LTTngUnknownCmdError()
-
-    def pack_code(self, code):
-        return struct.pack('>I', code)
-
-    def handle_command(self, data, cmd_type, cmd_version):
-        """
-        Handle the given command type with the received payload. This function
-        sends back data to the session daemon using to the return value of the
-        command.
-        """
-        payload = bytearray()
-
-        try:
-            cmd = self.create_command(cmd_type, cmd_version, data)
-            cmd_reply = cmd.execute()
-            # Set success code in data
-            payload += self.pack_code(LTTngSessiondCmd.CODE_SUCCESS)
-            if cmd_reply.payload is not None:
-                payload += cmd_reply.payload
-        except LTTngCmdError as e:
-            # Set error code in payload
-            payload += self.pack_code(e.get_code())
-        except LTTngUnknownCmdError:
-            # Set error code in payload
-            payload += self.pack_code(LTTngSessiondCmd.CODE_INVALID_CMD)
-
-        # Send response only if asked for.
-        if cmd_reply.reply:
-            self.sessiond_sock.send(payload)
-
-    def wait_cmd(self, epfd):
-        """
-        """
-
-        while True:
-            try:
-                # Poll on socket for command.
-                events = epfd.poll()
-            except select.error as e:
-                raise IOError(e.errno, e.message)
-
-            for fileno, event in events:
-                if fileno == self.quit_pipe[0]:
-                    return True
-                elif event & (EPOLLERR | EPOLLHUP):
-                    raise IOError(errno.ESHUTDOWN)
-                elif event & EPOLLIN:
-                    data = bytearray()
-
-                    data_size, cmd, cmd_version = self.recv_header(self.sessiond_sock)
-                    if data_size:
-                        data += self.sessiond_sock.recv(data_size)
-
-                    self.handle_command(data, cmd, cmd_version)
-                else:
-                    raise IOError(errno.ESHUTDOWN)
-
-    def get_port_from_file(self, path):
-        """
-        Open the session daemon agent port file and returns the value. If none
-        found, 0 is returned.
-        """
-
-        # By default, the port is set to 0 so if we can not find the agent port
-        # file simply don't try to connect. A value set to 0 indicates that.
-        port = 0
-
-        try:
-            f = open(path, "r")
-            r_port = int(f.readline())
-            if r_port > 0 or r_port <= 65535:
-                port = r_port
-            f.close()
-        except IOError as e:
-            pass
-        except ValueError as e:
-            pass
-
-        return port
-
-    def connect_sessiond(self):
-        """
-        Connect sessiond_sock to running session daemon using the port file.
-        """
-        # Create session daemon TCP socket
-        if not self.sessiond_sock:
-            self.sessiond_sock = socket(AF_INET, SOCK_STREAM)
-
-        if self.log_handler.is_root:
-            port = self.get_port_from_file(LTTngTCPClient.SYSTEM_PORT_FILE)
-        else:
-            port = self.get_port_from_file(LTTngTCPClient.USER_PORT_FILE)
-
-        # No session daemon available
-        if port == 0:
-            raise IOError(errno.ECONNREFUSED)
-
-        # Can raise an IOError so caller must catch it.
-        self.sessiond_sock.connect((self.sessiond_host, port))
This page took 0.027114 seconds and 4 git commands to generate.