Introduce LTTNG_UST_APP_PATH environment variable
[lttng-ust.git] / src / python-lttngust / lttngust / agent.py
CommitLineData
de4dee04
PP
1# -*- coding: utf-8 -*-
2#
c0c0989a 3# SPDX-License-Identifier: LGPL-2.1-only
de4dee04 4#
c0c0989a
MJ
5# Copyright (C) 2015 Philippe Proulx <pproulx@efficios.com>
6# Copyright (C) 2014 David Goulet <dgoulet@efficios.com>
de4dee04
PP
7
8from __future__ import unicode_literals
9from __future__ import print_function
10from __future__ import division
11import lttngust.debug as dbg
12import lttngust.loghandler
e7bf4968 13import lttngust.compat
de4dee04
PP
14import lttngust.cmd
15from io import open
16import threading
17import logging
18import socket
19import time
20import sys
21import os
22
23
24try:
25 # Python 2
26 import Queue as queue
27except ImportError:
28 # Python 3
29 import queue
30
31
32_PROTO_DOMAIN = 5
b52ff352 33_PROTO_MAJOR = 2
de4dee04
PP
34_PROTO_MINOR = 0
35
36
37def _get_env_value_ms(key, default_s):
38 try:
39 val = int(os.getenv(key, default_s * 1000)) / 1000
40 except:
41 val = -1
42
43 if val < 0:
44 fmt = 'invalid ${} value; {} seconds will be used'
45 dbg._pwarning(fmt.format(key, default_s))
46 val = default_s
47
48 return val
49
50
51_REG_TIMEOUT = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_TIMEOUT', 5)
52_RETRY_REG_DELAY = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_RETRY_DELAY', 3)
53
54
55class _TcpClient(object):
56 def __init__(self, name, host, port, reg_queue):
57 super(self.__class__, self).__init__()
58 self._name = name
59 self._host = host
60 self._port = port
61
62 try:
63 self._log_handler = lttngust.loghandler._Handler()
64 except (OSError) as e:
65 dbg._pwarning('cannot load library: {}'.format(e))
66 raise e
67
68 self._root_logger = logging.getLogger()
69 self._root_logger.setLevel(logging.NOTSET)
70 self._ref_count = 0
71 self._sessiond_sock = None
72 self._reg_queue = reg_queue
73 self._server_cmd_handlers = {
74 lttngust.cmd._ServerCmdRegistrationDone: self._handle_server_cmd_reg_done,
75 lttngust.cmd._ServerCmdEnable: self._handle_server_cmd_enable,
76 lttngust.cmd._ServerCmdDisable: self._handle_server_cmd_disable,
77 lttngust.cmd._ServerCmdList: self._handle_server_cmd_list,
78 }
79
80 def _debug(self, msg):
81 return 'client "{}": {}'.format(self._name, msg)
82
83 def run(self):
84 while True:
85 try:
86 # connect to the session daemon
87 dbg._pdebug(self._debug('connecting to session daemon'))
88 self._connect_to_sessiond()
89
90 # register to the session daemon after a successful connection
91 dbg._pdebug(self._debug('registering to session daemon'))
92 self._register()
93
94 # wait for commands from the session daemon
95 self._wait_server_cmd()
96 except (Exception) as e:
97 # Whatever happens here, we have to close the socket and
98 # retry to connect to the session daemon since either
2fbda51c 99 # the socket was closed, a network timeout occurred, or
de4dee04
PP
100 # invalid data was received.
101 dbg._pdebug(self._debug('got exception: {}'.format(e)))
102 self._cleanup_socket()
103 dbg._pdebug(self._debug('sleeping for {} s'.format(_RETRY_REG_DELAY)))
104 time.sleep(_RETRY_REG_DELAY)
105
106 def _recv_server_cmd_header(self):
107 data = self._sessiond_sock.recv(lttngust.cmd._SERVER_CMD_HEADER_SIZE)
108
109 if not data:
110 dbg._pdebug(self._debug('received empty server command header'))
111 return None
112
113 assert(len(data) == lttngust.cmd._SERVER_CMD_HEADER_SIZE)
114 dbg._pdebug(self._debug('received server command header ({} bytes)'.format(len(data))))
115
116 return lttngust.cmd._server_cmd_header_from_data(data)
117
118 def _recv_server_cmd(self):
119 server_cmd_header = self._recv_server_cmd_header()
120
121 if server_cmd_header is None:
122 return None
123
124 dbg._pdebug(self._debug('server command header: data size: {} bytes'.format(server_cmd_header.data_size)))
125 dbg._pdebug(self._debug('server command header: command ID: {}'.format(server_cmd_header.cmd_id)))
126 dbg._pdebug(self._debug('server command header: command version: {}'.format(server_cmd_header.cmd_version)))
127 data = bytes()
128
129 if server_cmd_header.data_size > 0:
130 data = self._sessiond_sock.recv(server_cmd_header.data_size)
131 assert(len(data) == server_cmd_header.data_size)
132
133 return lttngust.cmd._server_cmd_from_data(server_cmd_header, data)
134
135 def _send_cmd_reply(self, cmd_reply):
136 data = cmd_reply.get_data()
137 dbg._pdebug(self._debug('sending command reply ({} bytes)'.format(len(data))))
138 self._sessiond_sock.sendall(data)
139
140 def _handle_server_cmd_reg_done(self, server_cmd):
141 dbg._pdebug(self._debug('got "registration done" server command'))
142
143 if self._reg_queue is not None:
144 dbg._pdebug(self._debug('notifying _init_threads()'))
145
146 try:
147 self._reg_queue.put(True)
148 except (Exception) as e:
149 # read side could be closed by now; ignore it
150 pass
151
152 self._reg_queue = None
153
154 def _handle_server_cmd_enable(self, server_cmd):
155 dbg._pdebug(self._debug('got "enable" server command'))
156 self._ref_count += 1
157
158 if self._ref_count == 1:
159 dbg._pdebug(self._debug('adding our handler to the root logger'))
160 self._root_logger.addHandler(self._log_handler)
161
162 dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count)))
163
164 return lttngust.cmd._ClientCmdReplyEnable()
165
166 def _handle_server_cmd_disable(self, server_cmd):
167 dbg._pdebug(self._debug('got "disable" server command'))
168 self._ref_count -= 1
169
170 if self._ref_count < 0:
171 # disable command could be sent again when a session is destroyed
172 self._ref_count = 0
173
174 if self._ref_count == 0:
175 dbg._pdebug(self._debug('removing our handler from the root logger'))
176 self._root_logger.removeHandler(self._log_handler)
177
178 dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count)))
179
180 return lttngust.cmd._ClientCmdReplyDisable()
181
182 def _handle_server_cmd_list(self, server_cmd):
183 dbg._pdebug(self._debug('got "list" server command'))
184 names = logging.Logger.manager.loggerDict.keys()
185 dbg._pdebug(self._debug('found {} loggers'.format(len(names))))
186 cmd_reply = lttngust.cmd._ClientCmdReplyList(names=names)
187
188 return cmd_reply
189
190 def _handle_server_cmd(self, server_cmd):
191 cmd_reply = None
192
193 if server_cmd is None:
194 dbg._pdebug(self._debug('bad server command'))
195 status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD
196 cmd_reply = lttngust.cmd._ClientCmdReply(status)
197 elif type(server_cmd) in self._server_cmd_handlers:
198 cmd_reply = self._server_cmd_handlers[type(server_cmd)](server_cmd)
199 else:
200 dbg._pdebug(self._debug('unknown server command'))
201 status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD
202 cmd_reply = lttngust.cmd._ClientCmdReply(status)
203
204 if cmd_reply is not None:
205 self._send_cmd_reply(cmd_reply)
206
207 def _wait_server_cmd(self):
208 while True:
209 try:
210 server_cmd = self._recv_server_cmd()
211 except socket.timeout:
212 # simply retry here; the protocol has no KA and we could
213 # wait for hours
214 continue
215
216 self._handle_server_cmd(server_cmd)
217
218 def _cleanup_socket(self):
219 try:
220 self._sessiond_sock.shutdown(socket.SHUT_RDWR)
221 self._sessiond_sock.close()
222 except:
223 pass
224
225 self._sessiond_sock = None
226
227 def _connect_to_sessiond(self):
228 # create session daemon TCP socket
229 if self._sessiond_sock is None:
230 self._sessiond_sock = socket.socket(socket.AF_INET,
231 socket.SOCK_STREAM)
232
233 # Use str(self._host) here. Since this host could be a string
234 # literal, and since we're importing __future__.unicode_literals,
235 # we want to make sure the host is a native string in Python 2.
236 # This avoids an indirect module import (unicode module to
237 # decode the unicode string, eventually imported by the
238 # socket module if needed), which is not allowed in a thread
239 # directly created by a module in Python 2 (our case).
240 #
241 # tl;dr: Do NOT remove str() here, or this call in Python 2
242 # _will_ block on an interpreter's mutex until the waiting
243 # register queue timeouts.
244 self._sessiond_sock.connect((str(self._host), self._port))
245
246 def _register(self):
247 cmd = lttngust.cmd._ClientRegisterCmd(_PROTO_DOMAIN, os.getpid(),
248 _PROTO_MAJOR, _PROTO_MINOR)
249 data = cmd.get_data()
250 self._sessiond_sock.sendall(data)
251
252
253def _get_port_from_file(path):
254 port = None
255 dbg._pdebug('reading port from file "{}"'.format(path))
256
257 try:
258 f = open(path)
259 r_port = int(f.readline())
260 f.close()
261
262 if r_port > 0 or r_port <= 65535:
263 port = r_port
264 except:
265 pass
266
267 return port
268
c0f6fb05
MD
269def _get_ust_app_path():
270 return os.getenv('LTTNG_UST_APP_PATH')
de4dee04
PP
271
272def _get_user_home_path():
8bc1125e
MD
273 # $LTTNG_HOME overrides $HOME if it exists
274 return os.getenv('LTTNG_HOME', os.path.expanduser('~'))
de4dee04
PP
275
276
277_initialized = False
278_SESSIOND_HOST = '127.0.0.1'
279
280
281def _client_thread_target(name, port, reg_queue):
282 dbg._pdebug('creating client "{}" using TCP port {}'.format(name, port))
283 client = _TcpClient(name, _SESSIOND_HOST, port, reg_queue)
284 dbg._pdebug('starting client "{}"'.format(name))
285 client.run()
286
287
288def _init_threads():
289 global _initialized
290
291 dbg._pdebug('entering')
292
293 if _initialized:
294 dbg._pdebug('agent is already initialized')
295 return
296
297 # This makes sure that the appropriate modules for encoding and
298 # decoding strings/bytes are imported now, since no import should
299 # happen within a thread at import time (our case).
300 'lttng'.encode().decode()
301
302 _initialized = True
c0f6fb05
MD
303
304 # The LTTNG_UST_APP_PATH environment variables disables connections
305 # to the global and per-user session daemons.
306 if _get_ust_app_path() is not None:
307 ust_app_port_file = os.path.join(_get_ust_app_path(), 'agent.port')
308 ust_app_port = _get_port_from_file(ust_app_port_file)
309 sys_port = None
310 user_port = None
311 dbg._pdebug('ust_app session daemon port: {}'.format(ust_app_port))
312 else:
313 sys_port = _get_port_from_file('/var/run/lttng/agent.port')
314 user_port_file = os.path.join(_get_user_home_path(), '.lttng', 'agent.port')
315 user_port = _get_port_from_file(user_port_file)
316 dbg._pdebug('system session daemon port: {}'.format(sys_port))
317 dbg._pdebug('user session daemon port: {}'.format(user_port))
318
de4dee04
PP
319 reg_queue = queue.Queue()
320 reg_expecting = 0
321
382cbd15
PP
322 if sys_port == user_port and sys_port is not None:
323 # The two session daemon ports are the same. This is not normal.
324 # Connect to only one.
325 dbg._pdebug('both user and system session daemon have the same port')
326 sys_port = None
327
de4dee04 328 try:
c0f6fb05
MD
329 if ust_app_port is not None:
330 dbg._pdebug('creating ust_app client thread')
331 t = threading.Thread(target=_client_thread_target,
332 args=('ust_app', ust_app_port, reg_queue))
333 t.name = 'ust_app'
334 t.daemon = True
335 t.start()
336 dbg._pdebug('created and started ust_app client thread')
337 reg_expecting += 1
338
de4dee04
PP
339 if sys_port is not None:
340 dbg._pdebug('creating system client thread')
341 t = threading.Thread(target=_client_thread_target,
342 args=('system', sys_port, reg_queue))
343 t.name = 'system'
344 t.daemon = True
345 t.start()
346 dbg._pdebug('created and started system client thread')
347 reg_expecting += 1
348
349 if user_port is not None:
350 dbg._pdebug('creating user client thread')
351 t = threading.Thread(target=_client_thread_target,
352 args=('user', user_port, reg_queue))
353 t.name = 'user'
354 t.daemon = True
355 t.start()
356 dbg._pdebug('created and started user client thread')
357 reg_expecting += 1
358 except:
359 # cannot create threads for some reason; stop this initialization
360 dbg._pwarning('cannot create client threads')
361 return
362
363 if reg_expecting == 0:
364 # early exit: looks like there's not even one valid port
365 dbg._pwarning('no valid LTTng session daemon port found (is the session daemon started?)')
366 return
367
368 cur_timeout = _REG_TIMEOUT
369
370 # We block here to make sure the agent is properly registered to
371 # the session daemon. If we timeout, the client threads will still
372 # continue to try to connect and register to the session daemon,
373 # but there is no guarantee that all following logging statements
374 # will make it to LTTng-UST.
375 #
376 # When a client thread receives a "registration done" confirmation
377 # from the session daemon it's connected to, it puts True in
378 # reg_queue.
379 while True:
380 try:
381 dbg._pdebug('waiting for registration done (expecting {}, timeout is {} s)'.format(reg_expecting,
382 cur_timeout))
e7bf4968 383 t1 = lttngust.compat._clock()
de4dee04 384 reg_queue.get(timeout=cur_timeout)
e7bf4968 385 t2 = lttngust.compat._clock()
de4dee04
PP
386 reg_expecting -= 1
387 dbg._pdebug('unblocked')
388
389 if reg_expecting == 0:
390 # done!
391 dbg._pdebug('successfully registered to session daemon(s)')
392 break
393
394 cur_timeout -= (t2 - t1)
395
396 if cur_timeout <= 0:
397 # timeout
398 dbg._pdebug('ran out of time')
399 break
400 except queue.Empty:
401 dbg._pdebug('ran out of time')
402 break
403
404 dbg._pdebug('leaving')
405
406
407_init_threads()
This page took 0.046292 seconds and 4 git commands to generate.