Add Python agent support
[lttng-ust.git] / liblttng-ust-python-agent / lttng_agent.py.in
diff --git a/liblttng-ust-python-agent/lttng_agent.py.in b/liblttng-ust-python-agent/lttng_agent.py.in
new file mode 100644 (file)
index 0000000..9e8cf61
--- /dev/null
@@ -0,0 +1,567 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2014 - David Goulet <dgoulet@efficios.com>
+#
+# This library is free software; you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; version 2.1 of the License.
+#
+# This library is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with this library; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
+
+from __future__ import unicode_literals
+
+import ctypes
+import errno
+import logging
+import os
+import sys
+import threading
+import struct
+import select
+
+from select import epoll, EPOLLIN, EPOLLERR, EPOLLHUP
+from socket import *
+from time import sleep
+
+__all__ = ["lttng-agent"]
+__author__ = "David Goulet <dgoulet@efficios.com>"
+
+class LTTngAgent():
+    """
+    LTTng agent python code. A LTTng Agent is responsible to spawn two threads,
+    the current UID and root session daemon. Those two threads register to the
+    right daemon and handle the tracing.
+
+    This class needs to be instantiate once and once the init returns, tracing
+    is ready to happen.
+    """
+
+    SESSIOND_ADDR = "127.0.0.1"
+    SEM_COUNT = 2
+    # Timeout for the sempahore in seconds.
+    SEM_TIMEOUT = 5
+    SEM_WAIT_PERIOD = 0.2
+
+    def __init__(self):
+        # Session daemon register semaphore.
+        self.register_sem = threading.Semaphore(LTTngAgent.SEM_COUNT);
+
+        self.client_user = LTTngTCPClient(LTTngAgent.SESSIOND_ADDR, self.register_sem)
+        self.client_user.start()
+
+        self.client_root = LTTngTCPClient(LTTngAgent.SESSIOND_ADDR, self.register_sem)
+        self.client_root.log_handler.is_root = True
+        self.client_root.start()
+
+        acquire = 0
+        timeout = LTTngAgent.SEM_TIMEOUT
+        while True:
+            # Quit if timeout has reached 0 or below.
+            if acquire == LTTngAgent.SEM_COUNT or timeout <= 0:
+                break;
+
+            # Acquire semaphore for *user* thread.
+            if not self.register_sem.acquire(False):
+                sleep(LTTngAgent.SEM_WAIT_PERIOD)
+                timeout -= LTTngAgent.SEM_WAIT_PERIOD
+            else:
+                acquire += 1
+
+    def __del__(self):
+        self.destroy()
+
+    def destroy(self):
+        self.client_user.destroy()
+        self.client_user.join()
+
+        self.client_root.destroy()
+        self.client_root.join()
+
+class LTTngCmdError(RuntimeError):
+    """
+    Command error thrown if an error is encountered in a command from the
+    session daemon.
+    """
+
+    def __init__(self, code):
+        super().__init__('LTTng command error: code {}'.format(code))
+        self._code = code
+
+    def get_code(self):
+        return self._code
+
+class LTTngUnknownCmdError(RuntimeError):
+    pass
+
+class LTTngLoggingHandler(logging.Handler):
+    """
+    Class handler for the Python logging API.
+    """
+
+    def __init__(self):
+        logging.Handler.__init__(self, level = logging.NOTSET)
+
+        # Refcount tracking how many events have been enabled. This value above
+        # 0 means that the handler is attached to the root logger.
+        self.refcount = 0
+
+        # Dict of enabled event. We track them so we know if it's ok to disable
+        # the received event.
+        self.enabled_events = {}
+
+        # Am I root ?
+        self.is_root = False
+
+        # Using the logging formatter to extract the asctime only.
+        self.log_fmt = logging.Formatter("%(asctime)s")
+        self.setFormatter(self.log_fmt)
+
+        # ctypes lib for lttng-ust
+        try:
+            self.lttng_ust = ctypes.cdll.LoadLibrary("LIBDIR_STR/liblttng-ust-python-agent.so")
+        except OSError as e:
+            print("Unable to find libust for Python.")
+
+    def emit(self, record):
+        """
+        Fire LTTng UST tracepoint with the given record.
+        """
+        asctime = self.format(record)
+
+        self.lttng_ust.py_tracepoint(asctime.encode(),
+                record.getMessage().encode(), record.name.encode(),
+                record.funcName.encode(), record.lineno, record.levelno,
+                record.thread, record.threadName.encode())
+
+    def enable_event(self, name):
+        """
+        Enable an event name which will ultimately add an handler to the root
+        logger if none is present.
+        """
+        # Don't update the refcount if the event has been enabled prior.
+        if name in self.enabled_events:
+            return
+
+        # Get the root logger and attach our handler.
+        root_logger = logging.getLogger()
+        # First thing first, we need to set the root logger to the loglevel
+        # NOTSET so we can catch everything. The default is 30.
+        root_logger.setLevel(logging.NOTSET)
+
+        self.refcount += 1
+        if self.refcount == 1:
+            root_logger.addHandler(self)
+
+        self.enabled_events[name] = True
+
+    def disable_event(self, name):
+        """
+        Disable an event name which will ultimately add an handler to the root
+        logger if none is present.
+        """
+
+        if name not in self.enabled_events:
+            # Event was not enabled prior, do nothing.
+            return
+
+        # Get the root logger and attach our handler.
+        root_logger = logging.getLogger()
+
+        self.refcount -= 1
+        if self.refcount == 0:
+            root_logger.removeHandler(self)
+        del self.enabled_events[name]
+
+    def list_logger(self):
+        """
+        Return a list of logger name.
+        """
+        return logging.Logger.manager.loggerDict.keys()
+
+class LTTngSessiondCmd():
+    """
+    Class handling session daemon command.
+    """
+
+    # Command values from the agent protocol
+    CMD_LIST = 1
+    CMD_ENABLE = 2
+    CMD_DISABLE = 3
+    CMD_REG_DONE = 4
+
+    # Return code
+    CODE_SUCCESS = 1
+    CODE_INVALID_CMD = 2
+
+    # Python Logger LTTng domain value taken from lttng/domain.h
+    DOMAIN = 5
+
+    # Protocol version
+    MAJOR_VERSION = 1
+    MINOR_VERSION = 0
+
+    def execute(self):
+        """
+        This is part of the command interface. Must be implemented.
+        """
+        raise NotImplementedError
+
+class LTTngCommandReply():
+    """
+    Object that contains the information that should be replied to the session
+    daemon after a command execution.
+    """
+
+    def __init__(self, payload = None, reply = True):
+        self.payload = payload
+        self.reply = reply
+
+class LTTngCommandEnable(LTTngSessiondCmd):
+    """
+    Handle the enable event command from the session daemon.
+    """
+
+    def __init__(self, log_handler, data):
+        self.log_handler = log_handler
+        # 4 bytes for loglevel and 4 bytes for loglevel_type thus 8.
+        name_offset = 8;
+
+        data_size = len(data)
+        if data_size == 0:
+            raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD)
+
+        try:
+            self.loglevel, self.loglevel_type, self.name = \
+                    struct.unpack('>II%us' % (data_size - name_offset), data)
+            # Remove trailing NULL bytes from name.
+            self.name = self.name.decode().rstrip('\x00')
+        except struct.error:
+            raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD)
+
+    def execute(self):
+        self.log_handler.enable_event(self.name)
+        return LTTngCommandReply()
+
+class LTTngCommandDisable(LTTngSessiondCmd):
+    """
+    Handle the disable event command from the session daemon.
+    """
+
+    def __init__(self, log_handler, data):
+        self.log_handler = log_handler
+
+        data_size = len(data)
+        if data_size == 0:
+            raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD)
+
+        try:
+            self.name = struct.unpack('>%us' % (data_size), data)[0]
+            # Remove trailing NULL bytes from name.
+            self.name = self.name.decode().rstrip('\x00')
+        except struct.error:
+            raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD)
+
+    def execute(self):
+        self.log_handler.disable_event(self.name)
+        return LTTngCommandReply()
+
+class LTTngCommandRegDone(LTTngSessiondCmd):
+    """
+    Handle register done command. This is sent back after a successful
+    registration from the session daemon. We basically release the given
+    semaphore so the agent can return to the caller.
+    """
+
+    def __init__(self, sem):
+        self.sem = sem
+
+    def execute(self):
+        self.sem.release()
+        return LTTngCommandReply(reply = False)
+
+class LTTngCommandList(LTTngSessiondCmd):
+    """
+    Handle the list command from the session daemon on the given socket.
+    """
+
+    def __init__(self, log_handler):
+        self.log_handler = log_handler
+
+    def execute(self):
+        data_size = 0
+        data = logger_data = bytearray()
+
+        loggers = self.log_handler.list_logger()
+        # First, pack nb_event that must preceed the data.
+        logger_data += struct.pack('>I', len(loggers))
+
+        # Populate payload with logger name.
+        for logger in loggers:
+            # Increment data size plus the NULL byte at the end of the name.
+            data_size += len(logger) + 1
+            # Pack logger name and NULL byte.
+            logger_data += struct.pack('>%usB' % (len(logger)), \
+                    bytes(bytearray(str.encode(logger))), 0)
+
+        # Pack uint32_t data_size followed by nb event (number of logger)
+        data = struct.pack('>I', data_size)
+        data += logger_data
+        return LTTngCommandReply(payload = data)
+
+class LTTngTCPClient(threading.Thread):
+    """
+    TCP client that register and receives command from the session daemon.
+    """
+
+    SYSTEM_PORT_FILE = "/var/run/lttng/agent.port"
+    USER_PORT_FILE = os.path.join(os.path.expanduser("~"), ".lttng/agent.port")
+
+    # The time in seconds this client should wait before trying again to
+    # register back to the session daemon.
+    WAIT_TIME = 3
+
+    def __init__(self, host, sem):
+        threading.Thread.__init__(self)
+
+        # Which host to connect to. The port is fetch dynamically.
+        self.sessiond_host = host
+
+        # The session daemon register done semaphore. Needs to be released when
+        # receiving a CMD_REG_DONE command.
+        self.register_sem = sem
+        self.register_sem.acquire()
+
+        # Indicate that we have to quit thus stop the main loop.
+        self.quit_flag = False
+        # Quit pipe. The thread poll on it to know when to quit.
+        self.quit_pipe = os.pipe()
+
+        # Socket on which we communicate with the session daemon.
+        self.sessiond_sock = None
+        # LTTng Logging Handler
+        self.log_handler = LTTngLoggingHandler()
+
+    def cleanup_socket(self, epfd = None):
+        # Ease our life a bit.
+        sock = self.sessiond_sock
+        if not sock:
+            return
+
+        try:
+            if epfd is not None:
+                epfd.unregister(sock)
+            sock.shutdown(SHUT_RDWR)
+            sock.close()
+        except select.error:
+            # Cleanup fail, we can't do anything much...
+            pass
+        except IOError:
+            pass
+
+        self.sessiond_sock = None
+
+    def destroy(self):
+        self.quit_flag = True
+        try:
+            fp = os.fdopen(self.quit_pipe[1], 'w')
+            fp.write("42")
+            fp.close()
+        except OSError as e:
+            pass
+
+    def register(self):
+        """
+        Register to session daemon using the previously connected socket of the
+        class.
+
+        Command ABI:
+            uint32 domain
+            uint32 pid
+        """
+        data = struct.pack('>IIII', LTTngSessiondCmd.DOMAIN, os.getpid(), \
+                LTTngSessiondCmd.MAJOR_VERSION, LTTngSessiondCmd.MINOR_VERSION)
+        self.sessiond_sock.send(data)
+
+    def run(self):
+        """
+        Start the TCP client thread by registering to the session daemon and polling
+        on that socket for commands.
+        """
+
+        epfd = epoll()
+        epfd.register(self.quit_pipe[0], EPOLLIN)
+
+        # Main loop to handle session daemon command and disconnection.
+        while not self.quit_flag:
+            try:
+                # First, connect to the session daemon.
+                self.connect_sessiond()
+
+                # Register to session daemon after a successful connection.
+                self.register()
+                # Add registered socket to poll set.
+                epfd.register(self.sessiond_sock, EPOLLIN | EPOLLERR | EPOLLHUP)
+
+                self.quit_flag = self.wait_cmd(epfd)
+            except IOError as e:
+                # Whatever happens here, we have to close down everything and
+                # retry to connect to the session daemon since either the
+                # socket is closed or invalid data was sent.
+                self.cleanup_socket(epfd)
+                self.register_sem.release()
+                sleep(LTTngTCPClient.WAIT_TIME)
+                continue
+
+        self.cleanup_socket(epfd)
+        os.close(self.quit_pipe[0])
+        epfd.close()
+
+    def recv_header(self, sock):
+        """
+        Receive the command header from the given socket. Set the internal
+        state of this object with the header data.
+
+        Header ABI is defined like this:
+            uint64 data_size
+            uint32 cmd
+            uint32 cmd_version
+        """
+        s_pack = struct.Struct('>QII')
+
+        pack_data = sock.recv(s_pack.size)
+        data_received = len(pack_data)
+        if data_received == 0:
+            raise IOError(errno.ESHUTDOWN)
+
+        try:
+            return s_pack.unpack(pack_data)
+        except struct.error:
+            raise IOError(errno.EINVAL)
+
+    def create_command(self, cmd_type, version, data):
+        """
+        Return the right command object using the given command type. The
+        command version is unused since we only have once for now.
+        """
+
+        cmd_dict = {
+            LTTngSessiondCmd.CMD_LIST: \
+                    lambda: LTTngCommandList(self.log_handler),
+            LTTngSessiondCmd.CMD_ENABLE: \
+                    lambda: LTTngCommandEnable(self.log_handler, data),
+            LTTngSessiondCmd.CMD_DISABLE: \
+                    lambda: LTTngCommandDisable(self.log_handler, data),
+            LTTngSessiondCmd.CMD_REG_DONE: \
+                    lambda: LTTngCommandRegDone(self.register_sem),
+        }
+
+        if cmd_type in cmd_dict:
+            return cmd_dict[cmd_type]()
+        else:
+            raise LTTngUnknownCmdError()
+
+    def pack_code(self, code):
+        return struct.pack('>I', code)
+
+    def handle_command(self, data, cmd_type, cmd_version):
+        """
+        Handle the given command type with the received payload. This function
+        sends back data to the session daemon using to the return value of the
+        command.
+        """
+        payload = bytearray()
+
+        try:
+            cmd = self.create_command(cmd_type, cmd_version, data)
+            cmd_reply = cmd.execute()
+            # Set success code in data
+            payload += self.pack_code(LTTngSessiondCmd.CODE_SUCCESS)
+            if cmd_reply.payload is not None:
+                payload += cmd_reply.payload
+        except LTTngCmdError as e:
+            # Set error code in payload
+            payload += self.pack_code(e.get_code())
+        except LTTngUnknownCmdError:
+            # Set error code in payload
+            payload += self.pack_code(LTTngSessiondCmd.CODE_INVALID_CMD)
+
+        # Send response only if asked for.
+        if cmd_reply.reply:
+            self.sessiond_sock.send(payload)
+
+    def wait_cmd(self, epfd):
+        """
+        """
+
+        while True:
+            try:
+                # Poll on socket for command.
+                events = epfd.poll()
+            except select.error as e:
+                raise IOError(e.errno, e.message)
+
+            for fileno, event in events:
+                if fileno == self.quit_pipe[0]:
+                    return True
+                elif event & (EPOLLERR | EPOLLHUP):
+                    raise IOError(errno.ESHUTDOWN)
+                elif event & EPOLLIN:
+                    data = bytearray()
+
+                    data_size, cmd, cmd_version = self.recv_header(self.sessiond_sock)
+                    if data_size:
+                        data += self.sessiond_sock.recv(data_size)
+
+                    self.handle_command(data, cmd, cmd_version)
+                else:
+                    raise IOError(errno.ESHUTDOWN)
+
+    def get_port_from_file(self, path):
+        """
+        Open the session daemon agent port file and returns the value. If none
+        found, 0 is returned.
+        """
+
+        # By default, the port is set to 0 so if we can not find the agent port
+        # file simply don't try to connect. A value set to 0 indicates that.
+        port = 0
+
+        try:
+            f = open(path, "r")
+            r_port = int(f.readline())
+            if r_port > 0 or r_port <= 65535:
+                port = r_port
+            f.close()
+        except IOError as e:
+            pass
+        except ValueError as e:
+            pass
+
+        return port
+
+    def connect_sessiond(self):
+        """
+        Connect sessiond_sock to running session daemon using the port file.
+        """
+        # Create session daemon TCP socket
+        if not self.sessiond_sock:
+            self.sessiond_sock = socket(AF_INET, SOCK_STREAM)
+
+        if self.log_handler.is_root:
+            port = self.get_port_from_file(LTTngTCPClient.SYSTEM_PORT_FILE)
+        else:
+            port = self.get_port_from_file(LTTngTCPClient.USER_PORT_FILE)
+
+        # No session daemon available
+        if port == 0:
+            raise IOError(errno.ECONNREFUSED)
+
+        # Can raise an IOError so caller must catch it.
+        self.sessiond_sock.connect((self.sessiond_host, port))
This page took 0.028341 seconds and 4 git commands to generate.