Fix: make dist: python agent file handling
[lttng-ust.git] / liblttng-ust-python-agent / lttng_agent.py.in
1 # -*- coding: utf-8 -*-
2 #
3 # Copyright (C) 2014 - David Goulet <dgoulet@efficios.com>
4 #
5 # This library is free software; you can redistribute it and/or modify it under
6 # the terms of the GNU Lesser General Public License as published by the Free
7 # Software Foundation; version 2.1 of the License.
8 #
9 # This library is distributed in the hope that it will be useful, but WITHOUT
10 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
12 # details.
13 #
14 # You should have received a copy of the GNU Lesser General Public License
15 # along with this library; if not, write to the Free Software Foundation, Inc.,
16 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
18 from __future__ import unicode_literals
19
20 import ctypes
21 import errno
22 import logging
23 import os
24 import sys
25 import threading
26 import struct
27 import select
28
29 from select import epoll, EPOLLIN, EPOLLERR, EPOLLHUP
30 from socket import *
31 from time import sleep
32
33 __all__ = ["lttng-agent"]
34 __author__ = "David Goulet <dgoulet@efficios.com>"
35
36 class LTTngAgent():
37 """
38 LTTng agent python code. A LTTng Agent is responsible to spawn two threads,
39 the current UID and root session daemon. Those two threads register to the
40 right daemon and handle the tracing.
41
42 This class needs to be instantiate once and once the init returns, tracing
43 is ready to happen.
44 """
45
46 SESSIOND_ADDR = "127.0.0.1"
47 SEM_COUNT = 2
48 # Timeout for the sempahore in seconds.
49 SEM_TIMEOUT = 5
50 SEM_WAIT_PERIOD = 0.2
51
52 def __init__(self):
53 # Session daemon register semaphore.
54 self.register_sem = threading.Semaphore(LTTngAgent.SEM_COUNT);
55
56 self.client_user = LTTngTCPClient(LTTngAgent.SESSIOND_ADDR, self.register_sem)
57 self.client_user.start()
58
59 self.client_root = LTTngTCPClient(LTTngAgent.SESSIOND_ADDR, self.register_sem)
60 self.client_root.log_handler.is_root = True
61 self.client_root.start()
62
63 acquire = 0
64 timeout = LTTngAgent.SEM_TIMEOUT
65 while True:
66 # Quit if timeout has reached 0 or below.
67 if acquire == LTTngAgent.SEM_COUNT or timeout <= 0:
68 break;
69
70 # Acquire semaphore for *user* thread.
71 if not self.register_sem.acquire(False):
72 sleep(LTTngAgent.SEM_WAIT_PERIOD)
73 timeout -= LTTngAgent.SEM_WAIT_PERIOD
74 else:
75 acquire += 1
76
77 def __del__(self):
78 self.destroy()
79
80 def destroy(self):
81 self.client_user.destroy()
82 self.client_user.join()
83
84 self.client_root.destroy()
85 self.client_root.join()
86
87 class LTTngCmdError(RuntimeError):
88 """
89 Command error thrown if an error is encountered in a command from the
90 session daemon.
91 """
92
93 def __init__(self, code):
94 super().__init__('LTTng command error: code {}'.format(code))
95 self._code = code
96
97 def get_code(self):
98 return self._code
99
100 class LTTngUnknownCmdError(RuntimeError):
101 pass
102
103 class LTTngLoggingHandler(logging.Handler):
104 """
105 Class handler for the Python logging API.
106 """
107
108 def __init__(self):
109 logging.Handler.__init__(self, level = logging.NOTSET)
110
111 # Refcount tracking how many events have been enabled. This value above
112 # 0 means that the handler is attached to the root logger.
113 self.refcount = 0
114
115 # Dict of enabled event. We track them so we know if it's ok to disable
116 # the received event.
117 self.enabled_events = {}
118
119 # Am I root ?
120 self.is_root = False
121
122 # Using the logging formatter to extract the asctime only.
123 self.log_fmt = logging.Formatter("%(asctime)s")
124 self.setFormatter(self.log_fmt)
125
126 # ctypes lib for lttng-ust
127 try:
128 self.lttng_ust = ctypes.cdll.LoadLibrary("LIBDIR_STR/liblttng-ust-python-agent.so")
129 except OSError as e:
130 print("Unable to find libust for Python.")
131
132 def emit(self, record):
133 """
134 Fire LTTng UST tracepoint with the given record.
135 """
136 asctime = self.format(record)
137
138 self.lttng_ust.py_tracepoint(asctime.encode(),
139 record.getMessage().encode(), record.name.encode(),
140 record.funcName.encode(), record.lineno, record.levelno,
141 record.thread, record.threadName.encode())
142
143 def enable_event(self, name):
144 """
145 Enable an event name which will ultimately add an handler to the root
146 logger if none is present.
147 """
148 # Don't update the refcount if the event has been enabled prior.
149 if name in self.enabled_events:
150 return
151
152 # Get the root logger and attach our handler.
153 root_logger = logging.getLogger()
154 # First thing first, we need to set the root logger to the loglevel
155 # NOTSET so we can catch everything. The default is 30.
156 root_logger.setLevel(logging.NOTSET)
157
158 self.refcount += 1
159 if self.refcount == 1:
160 root_logger.addHandler(self)
161
162 self.enabled_events[name] = True
163
164 def disable_event(self, name):
165 """
166 Disable an event name which will ultimately add an handler to the root
167 logger if none is present.
168 """
169
170 if name not in self.enabled_events:
171 # Event was not enabled prior, do nothing.
172 return
173
174 # Get the root logger and attach our handler.
175 root_logger = logging.getLogger()
176
177 self.refcount -= 1
178 if self.refcount == 0:
179 root_logger.removeHandler(self)
180 del self.enabled_events[name]
181
182 def list_logger(self):
183 """
184 Return a list of logger name.
185 """
186 return logging.Logger.manager.loggerDict.keys()
187
188 class LTTngSessiondCmd():
189 """
190 Class handling session daemon command.
191 """
192
193 # Command values from the agent protocol
194 CMD_LIST = 1
195 CMD_ENABLE = 2
196 CMD_DISABLE = 3
197 CMD_REG_DONE = 4
198
199 # Return code
200 CODE_SUCCESS = 1
201 CODE_INVALID_CMD = 2
202
203 # Python Logger LTTng domain value taken from lttng/domain.h
204 DOMAIN = 5
205
206 # Protocol version
207 MAJOR_VERSION = 1
208 MINOR_VERSION = 0
209
210 def execute(self):
211 """
212 This is part of the command interface. Must be implemented.
213 """
214 raise NotImplementedError
215
216 class LTTngCommandReply():
217 """
218 Object that contains the information that should be replied to the session
219 daemon after a command execution.
220 """
221
222 def __init__(self, payload = None, reply = True):
223 self.payload = payload
224 self.reply = reply
225
226 class LTTngCommandEnable(LTTngSessiondCmd):
227 """
228 Handle the enable event command from the session daemon.
229 """
230
231 def __init__(self, log_handler, data):
232 self.log_handler = log_handler
233 # 4 bytes for loglevel and 4 bytes for loglevel_type thus 8.
234 name_offset = 8;
235
236 data_size = len(data)
237 if data_size == 0:
238 raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD)
239
240 try:
241 self.loglevel, self.loglevel_type, self.name = \
242 struct.unpack('>II%us' % (data_size - name_offset), data)
243 # Remove trailing NULL bytes from name.
244 self.name = self.name.decode().rstrip('\x00')
245 except struct.error:
246 raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD)
247
248 def execute(self):
249 self.log_handler.enable_event(self.name)
250 return LTTngCommandReply()
251
252 class LTTngCommandDisable(LTTngSessiondCmd):
253 """
254 Handle the disable event command from the session daemon.
255 """
256
257 def __init__(self, log_handler, data):
258 self.log_handler = log_handler
259
260 data_size = len(data)
261 if data_size == 0:
262 raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD)
263
264 try:
265 self.name = struct.unpack('>%us' % (data_size), data)[0]
266 # Remove trailing NULL bytes from name.
267 self.name = self.name.decode().rstrip('\x00')
268 except struct.error:
269 raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD)
270
271 def execute(self):
272 self.log_handler.disable_event(self.name)
273 return LTTngCommandReply()
274
275 class LTTngCommandRegDone(LTTngSessiondCmd):
276 """
277 Handle register done command. This is sent back after a successful
278 registration from the session daemon. We basically release the given
279 semaphore so the agent can return to the caller.
280 """
281
282 def __init__(self, sem):
283 self.sem = sem
284
285 def execute(self):
286 self.sem.release()
287 return LTTngCommandReply(reply = False)
288
289 class LTTngCommandList(LTTngSessiondCmd):
290 """
291 Handle the list command from the session daemon on the given socket.
292 """
293
294 def __init__(self, log_handler):
295 self.log_handler = log_handler
296
297 def execute(self):
298 data_size = 0
299 data = logger_data = bytearray()
300
301 loggers = self.log_handler.list_logger()
302 # First, pack nb_event that must preceed the data.
303 logger_data += struct.pack('>I', len(loggers))
304
305 # Populate payload with logger name.
306 for logger in loggers:
307 # Increment data size plus the NULL byte at the end of the name.
308 data_size += len(logger) + 1
309 # Pack logger name and NULL byte.
310 logger_data += struct.pack('>%usB' % (len(logger)), \
311 bytes(bytearray(str.encode(logger))), 0)
312
313 # Pack uint32_t data_size followed by nb event (number of logger)
314 data = struct.pack('>I', data_size)
315 data += logger_data
316 return LTTngCommandReply(payload = data)
317
318 class LTTngTCPClient(threading.Thread):
319 """
320 TCP client that register and receives command from the session daemon.
321 """
322
323 SYSTEM_PORT_FILE = "/var/run/lttng/agent.port"
324 USER_PORT_FILE = os.path.join(os.path.expanduser("~"), ".lttng/agent.port")
325
326 # The time in seconds this client should wait before trying again to
327 # register back to the session daemon.
328 WAIT_TIME = 3
329
330 def __init__(self, host, sem):
331 threading.Thread.__init__(self)
332
333 # Which host to connect to. The port is fetch dynamically.
334 self.sessiond_host = host
335
336 # The session daemon register done semaphore. Needs to be released when
337 # receiving a CMD_REG_DONE command.
338 self.register_sem = sem
339 self.register_sem.acquire()
340
341 # Indicate that we have to quit thus stop the main loop.
342 self.quit_flag = False
343 # Quit pipe. The thread poll on it to know when to quit.
344 self.quit_pipe = os.pipe()
345
346 # Socket on which we communicate with the session daemon.
347 self.sessiond_sock = None
348 # LTTng Logging Handler
349 self.log_handler = LTTngLoggingHandler()
350
351 def cleanup_socket(self, epfd = None):
352 # Ease our life a bit.
353 sock = self.sessiond_sock
354 if not sock:
355 return
356
357 try:
358 if epfd is not None:
359 epfd.unregister(sock)
360 sock.shutdown(SHUT_RDWR)
361 sock.close()
362 except select.error:
363 # Cleanup fail, we can't do anything much...
364 pass
365 except IOError:
366 pass
367
368 self.sessiond_sock = None
369
370 def destroy(self):
371 self.quit_flag = True
372 try:
373 fp = os.fdopen(self.quit_pipe[1], 'w')
374 fp.write("42")
375 fp.close()
376 except OSError as e:
377 pass
378
379 def register(self):
380 """
381 Register to session daemon using the previously connected socket of the
382 class.
383
384 Command ABI:
385 uint32 domain
386 uint32 pid
387 """
388 data = struct.pack('>IIII', LTTngSessiondCmd.DOMAIN, os.getpid(), \
389 LTTngSessiondCmd.MAJOR_VERSION, LTTngSessiondCmd.MINOR_VERSION)
390 self.sessiond_sock.send(data)
391
392 def run(self):
393 """
394 Start the TCP client thread by registering to the session daemon and polling
395 on that socket for commands.
396 """
397
398 epfd = epoll()
399 epfd.register(self.quit_pipe[0], EPOLLIN)
400
401 # Main loop to handle session daemon command and disconnection.
402 while not self.quit_flag:
403 try:
404 # First, connect to the session daemon.
405 self.connect_sessiond()
406
407 # Register to session daemon after a successful connection.
408 self.register()
409 # Add registered socket to poll set.
410 epfd.register(self.sessiond_sock, EPOLLIN | EPOLLERR | EPOLLHUP)
411
412 self.quit_flag = self.wait_cmd(epfd)
413 except IOError as e:
414 # Whatever happens here, we have to close down everything and
415 # retry to connect to the session daemon since either the
416 # socket is closed or invalid data was sent.
417 self.cleanup_socket(epfd)
418 self.register_sem.release()
419 sleep(LTTngTCPClient.WAIT_TIME)
420 continue
421
422 self.cleanup_socket(epfd)
423 os.close(self.quit_pipe[0])
424 epfd.close()
425
426 def recv_header(self, sock):
427 """
428 Receive the command header from the given socket. Set the internal
429 state of this object with the header data.
430
431 Header ABI is defined like this:
432 uint64 data_size
433 uint32 cmd
434 uint32 cmd_version
435 """
436 s_pack = struct.Struct('>QII')
437
438 pack_data = sock.recv(s_pack.size)
439 data_received = len(pack_data)
440 if data_received == 0:
441 raise IOError(errno.ESHUTDOWN)
442
443 try:
444 return s_pack.unpack(pack_data)
445 except struct.error:
446 raise IOError(errno.EINVAL)
447
448 def create_command(self, cmd_type, version, data):
449 """
450 Return the right command object using the given command type. The
451 command version is unused since we only have once for now.
452 """
453
454 cmd_dict = {
455 LTTngSessiondCmd.CMD_LIST: \
456 lambda: LTTngCommandList(self.log_handler),
457 LTTngSessiondCmd.CMD_ENABLE: \
458 lambda: LTTngCommandEnable(self.log_handler, data),
459 LTTngSessiondCmd.CMD_DISABLE: \
460 lambda: LTTngCommandDisable(self.log_handler, data),
461 LTTngSessiondCmd.CMD_REG_DONE: \
462 lambda: LTTngCommandRegDone(self.register_sem),
463 }
464
465 if cmd_type in cmd_dict:
466 return cmd_dict[cmd_type]()
467 else:
468 raise LTTngUnknownCmdError()
469
470 def pack_code(self, code):
471 return struct.pack('>I', code)
472
473 def handle_command(self, data, cmd_type, cmd_version):
474 """
475 Handle the given command type with the received payload. This function
476 sends back data to the session daemon using to the return value of the
477 command.
478 """
479 payload = bytearray()
480
481 try:
482 cmd = self.create_command(cmd_type, cmd_version, data)
483 cmd_reply = cmd.execute()
484 # Set success code in data
485 payload += self.pack_code(LTTngSessiondCmd.CODE_SUCCESS)
486 if cmd_reply.payload is not None:
487 payload += cmd_reply.payload
488 except LTTngCmdError as e:
489 # Set error code in payload
490 payload += self.pack_code(e.get_code())
491 except LTTngUnknownCmdError:
492 # Set error code in payload
493 payload += self.pack_code(LTTngSessiondCmd.CODE_INVALID_CMD)
494
495 # Send response only if asked for.
496 if cmd_reply.reply:
497 self.sessiond_sock.send(payload)
498
499 def wait_cmd(self, epfd):
500 """
501 """
502
503 while True:
504 try:
505 # Poll on socket for command.
506 events = epfd.poll()
507 except select.error as e:
508 raise IOError(e.errno, e.message)
509
510 for fileno, event in events:
511 if fileno == self.quit_pipe[0]:
512 return True
513 elif event & (EPOLLERR | EPOLLHUP):
514 raise IOError(errno.ESHUTDOWN)
515 elif event & EPOLLIN:
516 data = bytearray()
517
518 data_size, cmd, cmd_version = self.recv_header(self.sessiond_sock)
519 if data_size:
520 data += self.sessiond_sock.recv(data_size)
521
522 self.handle_command(data, cmd, cmd_version)
523 else:
524 raise IOError(errno.ESHUTDOWN)
525
526 def get_port_from_file(self, path):
527 """
528 Open the session daemon agent port file and returns the value. If none
529 found, 0 is returned.
530 """
531
532 # By default, the port is set to 0 so if we can not find the agent port
533 # file simply don't try to connect. A value set to 0 indicates that.
534 port = 0
535
536 try:
537 f = open(path, "r")
538 r_port = int(f.readline())
539 if r_port > 0 or r_port <= 65535:
540 port = r_port
541 f.close()
542 except IOError as e:
543 pass
544 except ValueError as e:
545 pass
546
547 return port
548
549 def connect_sessiond(self):
550 """
551 Connect sessiond_sock to running session daemon using the port file.
552 """
553 # Create session daemon TCP socket
554 if not self.sessiond_sock:
555 self.sessiond_sock = socket(AF_INET, SOCK_STREAM)
556
557 if self.log_handler.is_root:
558 port = self.get_port_from_file(LTTngTCPClient.SYSTEM_PORT_FILE)
559 else:
560 port = self.get_port_from_file(LTTngTCPClient.USER_PORT_FILE)
561
562 # No session daemon available
563 if port == 0:
564 raise IOError(errno.ECONNREFUSED)
565
566 # Can raise an IOError so caller must catch it.
567 self.sessiond_sock.connect((self.sessiond_host, port))
This page took 0.039885 seconds and 4 git commands to generate.