-# -*- coding: utf-8 -*-
-#
-# Copyright (C) 2014 - David Goulet <dgoulet@efficios.com>
-#
-# This library is free software; you can redistribute it and/or modify it under
-# the terms of the GNU Lesser General Public License as published by the Free
-# Software Foundation; version 2.1 of the License.
-#
-# This library is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
-# details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with this library; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-
-from __future__ import unicode_literals
-
-import ctypes
-import errno
-import logging
-import os
-import sys
-import threading
-import struct
-import select
-
-from select import epoll, EPOLLIN, EPOLLERR, EPOLLHUP
-from socket import *
-from time import sleep
-
-__all__ = ["lttng-agent"]
-__author__ = "David Goulet <dgoulet@efficios.com>"
-
-class LTTngAgent():
- """
- LTTng agent python code. A LTTng Agent is responsible to spawn two threads,
- the current UID and root session daemon. Those two threads register to the
- right daemon and handle the tracing.
-
- This class needs to be instantiate once and once the init returns, tracing
- is ready to happen.
- """
-
- SESSIOND_ADDR = "127.0.0.1"
- SEM_COUNT = 2
- # Timeout for the sempahore in seconds.
- SEM_TIMEOUT = 5
- SEM_WAIT_PERIOD = 0.2
-
- def __init__(self):
- # Session daemon register semaphore.
- self.register_sem = threading.Semaphore(LTTngAgent.SEM_COUNT);
-
- self.client_user = LTTngTCPClient(LTTngAgent.SESSIOND_ADDR, self.register_sem)
- self.client_user.start()
-
- self.client_root = LTTngTCPClient(LTTngAgent.SESSIOND_ADDR, self.register_sem)
- self.client_root.log_handler.is_root = True
- self.client_root.start()
-
- acquire = 0
- timeout = LTTngAgent.SEM_TIMEOUT
- while True:
- # Quit if timeout has reached 0 or below.
- if acquire == LTTngAgent.SEM_COUNT or timeout <= 0:
- break;
-
- # Acquire semaphore for *user* thread.
- if not self.register_sem.acquire(False):
- sleep(LTTngAgent.SEM_WAIT_PERIOD)
- timeout -= LTTngAgent.SEM_WAIT_PERIOD
- else:
- acquire += 1
-
- def __del__(self):
- self.destroy()
-
- def destroy(self):
- self.client_user.destroy()
- self.client_user.join()
-
- self.client_root.destroy()
- self.client_root.join()
-
-class LTTngCmdError(RuntimeError):
- """
- Command error thrown if an error is encountered in a command from the
- session daemon.
- """
-
- def __init__(self, code):
- super().__init__('LTTng command error: code {}'.format(code))
- self._code = code
-
- def get_code(self):
- return self._code
-
-class LTTngUnknownCmdError(RuntimeError):
- pass
-
-class LTTngLoggingHandler(logging.Handler):
- """
- Class handler for the Python logging API.
- """
-
- def __init__(self):
- logging.Handler.__init__(self, level = logging.NOTSET)
-
- # Refcount tracking how many events have been enabled. This value above
- # 0 means that the handler is attached to the root logger.
- self.refcount = 0
-
- # Dict of enabled event. We track them so we know if it's ok to disable
- # the received event.
- self.enabled_events = {}
-
- # Am I root ?
- self.is_root = False
-
- # Using the logging formatter to extract the asctime only.
- self.log_fmt = logging.Formatter("%(asctime)s")
- self.setFormatter(self.log_fmt)
-
- # ctypes lib for lttng-ust
- try:
- self.lttng_ust = ctypes.cdll.LoadLibrary("LIBDIR_STR/liblttng-ust-python-agent.so")
- except OSError as e:
- print("Unable to find libust for Python.")
-
- def emit(self, record):
- """
- Fire LTTng UST tracepoint with the given record.
- """
- asctime = self.format(record)
-
- self.lttng_ust.py_tracepoint(asctime.encode(),
- record.getMessage().encode(), record.name.encode(),
- record.funcName.encode(), record.lineno, record.levelno,
- record.thread, record.threadName.encode())
-
- def enable_event(self, name):
- """
- Enable an event name which will ultimately add an handler to the root
- logger if none is present.
- """
- # Don't update the refcount if the event has been enabled prior.
- if name in self.enabled_events:
- return
-
- # Get the root logger and attach our handler.
- root_logger = logging.getLogger()
- # First thing first, we need to set the root logger to the loglevel
- # NOTSET so we can catch everything. The default is 30.
- root_logger.setLevel(logging.NOTSET)
-
- self.refcount += 1
- if self.refcount == 1:
- root_logger.addHandler(self)
-
- self.enabled_events[name] = True
-
- def disable_event(self, name):
- """
- Disable an event name which will ultimately add an handler to the root
- logger if none is present.
- """
-
- if name not in self.enabled_events:
- # Event was not enabled prior, do nothing.
- return
-
- # Get the root logger and attach our handler.
- root_logger = logging.getLogger()
-
- self.refcount -= 1
- if self.refcount == 0:
- root_logger.removeHandler(self)
- del self.enabled_events[name]
-
- def list_logger(self):
- """
- Return a list of logger name.
- """
- return logging.Logger.manager.loggerDict.keys()
-
-class LTTngSessiondCmd():
- """
- Class handling session daemon command.
- """
-
- # Command values from the agent protocol
- CMD_LIST = 1
- CMD_ENABLE = 2
- CMD_DISABLE = 3
- CMD_REG_DONE = 4
-
- # Return code
- CODE_SUCCESS = 1
- CODE_INVALID_CMD = 2
-
- # Python Logger LTTng domain value taken from lttng/domain.h
- DOMAIN = 5
-
- # Protocol version
- MAJOR_VERSION = 1
- MINOR_VERSION = 0
-
- def execute(self):
- """
- This is part of the command interface. Must be implemented.
- """
- raise NotImplementedError
-
-class LTTngCommandReply():
- """
- Object that contains the information that should be replied to the session
- daemon after a command execution.
- """
-
- def __init__(self, payload = None, reply = True):
- self.payload = payload
- self.reply = reply
-
-class LTTngCommandEnable(LTTngSessiondCmd):
- """
- Handle the enable event command from the session daemon.
- """
-
- def __init__(self, log_handler, data):
- self.log_handler = log_handler
- # 4 bytes for loglevel and 4 bytes for loglevel_type thus 8.
- name_offset = 8;
-
- data_size = len(data)
- if data_size == 0:
- raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD)
-
- try:
- self.loglevel, self.loglevel_type, self.name = \
- struct.unpack('>II%us' % (data_size - name_offset), data)
- # Remove trailing NULL bytes from name.
- self.name = self.name.decode().rstrip('\x00')
- except struct.error:
- raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD)
-
- def execute(self):
- self.log_handler.enable_event(self.name)
- return LTTngCommandReply()
-
-class LTTngCommandDisable(LTTngSessiondCmd):
- """
- Handle the disable event command from the session daemon.
- """
-
- def __init__(self, log_handler, data):
- self.log_handler = log_handler
-
- data_size = len(data)
- if data_size == 0:
- raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD)
-
- try:
- self.name = struct.unpack('>%us' % (data_size), data)[0]
- # Remove trailing NULL bytes from name.
- self.name = self.name.decode().rstrip('\x00')
- except struct.error:
- raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD)
-
- def execute(self):
- self.log_handler.disable_event(self.name)
- return LTTngCommandReply()
-
-class LTTngCommandRegDone(LTTngSessiondCmd):
- """
- Handle register done command. This is sent back after a successful
- registration from the session daemon. We basically release the given
- semaphore so the agent can return to the caller.
- """
-
- def __init__(self, sem):
- self.sem = sem
-
- def execute(self):
- self.sem.release()
- return LTTngCommandReply(reply = False)
-
-class LTTngCommandList(LTTngSessiondCmd):
- """
- Handle the list command from the session daemon on the given socket.
- """
-
- def __init__(self, log_handler):
- self.log_handler = log_handler
-
- def execute(self):
- data_size = 0
- data = logger_data = bytearray()
-
- loggers = self.log_handler.list_logger()
- # First, pack nb_event that must preceed the data.
- logger_data += struct.pack('>I', len(loggers))
-
- # Populate payload with logger name.
- for logger in loggers:
- # Increment data size plus the NULL byte at the end of the name.
- data_size += len(logger) + 1
- # Pack logger name and NULL byte.
- logger_data += struct.pack('>%usB' % (len(logger)), \
- bytes(bytearray(str.encode(logger))), 0)
-
- # Pack uint32_t data_size followed by nb event (number of logger)
- data = struct.pack('>I', data_size)
- data += logger_data
- return LTTngCommandReply(payload = data)
-
-class LTTngTCPClient(threading.Thread):
- """
- TCP client that register and receives command from the session daemon.
- """
-
- SYSTEM_PORT_FILE = "/var/run/lttng/agent.port"
- USER_PORT_FILE = os.path.join(os.path.expanduser("~"), ".lttng/agent.port")
-
- # The time in seconds this client should wait before trying again to
- # register back to the session daemon.
- WAIT_TIME = 3
-
- def __init__(self, host, sem):
- threading.Thread.__init__(self)
-
- # Which host to connect to. The port is fetch dynamically.
- self.sessiond_host = host
-
- # The session daemon register done semaphore. Needs to be released when
- # receiving a CMD_REG_DONE command.
- self.register_sem = sem
- self.register_sem.acquire()
-
- # Indicate that we have to quit thus stop the main loop.
- self.quit_flag = False
- # Quit pipe. The thread poll on it to know when to quit.
- self.quit_pipe = os.pipe()
-
- # Socket on which we communicate with the session daemon.
- self.sessiond_sock = None
- # LTTng Logging Handler
- self.log_handler = LTTngLoggingHandler()
-
- def cleanup_socket(self, epfd = None):
- # Ease our life a bit.
- sock = self.sessiond_sock
- if not sock:
- return
-
- try:
- if epfd is not None:
- epfd.unregister(sock)
- sock.shutdown(SHUT_RDWR)
- sock.close()
- except select.error:
- # Cleanup fail, we can't do anything much...
- pass
- except IOError:
- pass
-
- self.sessiond_sock = None
-
- def destroy(self):
- self.quit_flag = True
- try:
- fp = os.fdopen(self.quit_pipe[1], 'w')
- fp.write("42")
- fp.close()
- except OSError as e:
- pass
-
- def register(self):
- """
- Register to session daemon using the previously connected socket of the
- class.
-
- Command ABI:
- uint32 domain
- uint32 pid
- """
- data = struct.pack('>IIII', LTTngSessiondCmd.DOMAIN, os.getpid(), \
- LTTngSessiondCmd.MAJOR_VERSION, LTTngSessiondCmd.MINOR_VERSION)
- self.sessiond_sock.send(data)
-
- def run(self):
- """
- Start the TCP client thread by registering to the session daemon and polling
- on that socket for commands.
- """
-
- epfd = epoll()
- epfd.register(self.quit_pipe[0], EPOLLIN)
-
- # Main loop to handle session daemon command and disconnection.
- while not self.quit_flag:
- try:
- # First, connect to the session daemon.
- self.connect_sessiond()
-
- # Register to session daemon after a successful connection.
- self.register()
- # Add registered socket to poll set.
- epfd.register(self.sessiond_sock, EPOLLIN | EPOLLERR | EPOLLHUP)
-
- self.quit_flag = self.wait_cmd(epfd)
- except IOError as e:
- # Whatever happens here, we have to close down everything and
- # retry to connect to the session daemon since either the
- # socket is closed or invalid data was sent.
- self.cleanup_socket(epfd)
- self.register_sem.release()
- sleep(LTTngTCPClient.WAIT_TIME)
- continue
-
- self.cleanup_socket(epfd)
- os.close(self.quit_pipe[0])
- epfd.close()
-
- def recv_header(self, sock):
- """
- Receive the command header from the given socket. Set the internal
- state of this object with the header data.
-
- Header ABI is defined like this:
- uint64 data_size
- uint32 cmd
- uint32 cmd_version
- """
- s_pack = struct.Struct('>QII')
-
- pack_data = sock.recv(s_pack.size)
- data_received = len(pack_data)
- if data_received == 0:
- raise IOError(errno.ESHUTDOWN)
-
- try:
- return s_pack.unpack(pack_data)
- except struct.error:
- raise IOError(errno.EINVAL)
-
- def create_command(self, cmd_type, version, data):
- """
- Return the right command object using the given command type. The
- command version is unused since we only have once for now.
- """
-
- cmd_dict = {
- LTTngSessiondCmd.CMD_LIST: \
- lambda: LTTngCommandList(self.log_handler),
- LTTngSessiondCmd.CMD_ENABLE: \
- lambda: LTTngCommandEnable(self.log_handler, data),
- LTTngSessiondCmd.CMD_DISABLE: \
- lambda: LTTngCommandDisable(self.log_handler, data),
- LTTngSessiondCmd.CMD_REG_DONE: \
- lambda: LTTngCommandRegDone(self.register_sem),
- }
-
- if cmd_type in cmd_dict:
- return cmd_dict[cmd_type]()
- else:
- raise LTTngUnknownCmdError()
-
- def pack_code(self, code):
- return struct.pack('>I', code)
-
- def handle_command(self, data, cmd_type, cmd_version):
- """
- Handle the given command type with the received payload. This function
- sends back data to the session daemon using to the return value of the
- command.
- """
- payload = bytearray()
-
- try:
- cmd = self.create_command(cmd_type, cmd_version, data)
- cmd_reply = cmd.execute()
- # Set success code in data
- payload += self.pack_code(LTTngSessiondCmd.CODE_SUCCESS)
- if cmd_reply.payload is not None:
- payload += cmd_reply.payload
- except LTTngCmdError as e:
- # Set error code in payload
- payload += self.pack_code(e.get_code())
- except LTTngUnknownCmdError:
- # Set error code in payload
- payload += self.pack_code(LTTngSessiondCmd.CODE_INVALID_CMD)
-
- # Send response only if asked for.
- if cmd_reply.reply:
- self.sessiond_sock.send(payload)
-
- def wait_cmd(self, epfd):
- """
- """
-
- while True:
- try:
- # Poll on socket for command.
- events = epfd.poll()
- except select.error as e:
- raise IOError(e.errno, e.message)
-
- for fileno, event in events:
- if fileno == self.quit_pipe[0]:
- return True
- elif event & (EPOLLERR | EPOLLHUP):
- raise IOError(errno.ESHUTDOWN)
- elif event & EPOLLIN:
- data = bytearray()
-
- data_size, cmd, cmd_version = self.recv_header(self.sessiond_sock)
- if data_size:
- data += self.sessiond_sock.recv(data_size)
-
- self.handle_command(data, cmd, cmd_version)
- else:
- raise IOError(errno.ESHUTDOWN)
-
- def get_port_from_file(self, path):
- """
- Open the session daemon agent port file and returns the value. If none
- found, 0 is returned.
- """
-
- # By default, the port is set to 0 so if we can not find the agent port
- # file simply don't try to connect. A value set to 0 indicates that.
- port = 0
-
- try:
- f = open(path, "r")
- r_port = int(f.readline())
- if r_port > 0 or r_port <= 65535:
- port = r_port
- f.close()
- except IOError as e:
- pass
- except ValueError as e:
- pass
-
- return port
-
- def connect_sessiond(self):
- """
- Connect sessiond_sock to running session daemon using the port file.
- """
- # Create session daemon TCP socket
- if not self.sessiond_sock:
- self.sessiond_sock = socket(AF_INET, SOCK_STREAM)
-
- if self.log_handler.is_root:
- port = self.get_port_from_file(LTTngTCPClient.SYSTEM_PORT_FILE)
- else:
- port = self.get_port_from_file(LTTngTCPClient.USER_PORT_FILE)
-
- # No session daemon available
- if port == 0:
- raise IOError(errno.ECONNREFUSED)
-
- # Can raise an IOError so caller must catch it.
- self.sessiond_sock.connect((self.sessiond_host, port))