Fix: python lttngust agent fails when LTTNG_UST_APP_PATH is not set
[lttng-ust.git] / src / python-lttngust / lttngust / agent.py
1 # -*- coding: utf-8 -*-
2 #
3 # SPDX-License-Identifier: LGPL-2.1-only
4 #
5 # Copyright (C) 2015 Philippe Proulx <pproulx@efficios.com>
6 # Copyright (C) 2014 David Goulet <dgoulet@efficios.com>
7
8 from __future__ import unicode_literals
9 from __future__ import print_function
10 from __future__ import division
11 import lttngust.debug as dbg
12 import lttngust.loghandler
13 import lttngust.compat
14 import lttngust.cmd
15 from io import open
16 import threading
17 import logging
18 import socket
19 import time
20 import sys
21 import os
22
23
24 try:
25 # Python 2
26 import Queue as queue
27 except ImportError:
28 # Python 3
29 import queue
30
31
32 _PROTO_DOMAIN = 5
33 _PROTO_MAJOR = 2
34 _PROTO_MINOR = 0
35
36
37 def _get_env_value_ms(key, default_s):
38 try:
39 val = int(os.getenv(key, default_s * 1000)) / 1000
40 except:
41 val = -1
42
43 if val < 0:
44 fmt = 'invalid ${} value; {} seconds will be used'
45 dbg._pwarning(fmt.format(key, default_s))
46 val = default_s
47
48 return val
49
50
51 _REG_TIMEOUT = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_TIMEOUT', 5)
52 _RETRY_REG_DELAY = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_RETRY_DELAY', 3)
53
54
55 class _TcpClient(object):
56 def __init__(self, name, host, port, reg_queue):
57 super(self.__class__, self).__init__()
58 self._name = name
59 self._host = host
60 self._port = port
61
62 try:
63 self._log_handler = lttngust.loghandler._Handler()
64 except (OSError) as e:
65 dbg._pwarning('cannot load library: {}'.format(e))
66 raise e
67
68 self._root_logger = logging.getLogger()
69 self._root_logger.setLevel(logging.NOTSET)
70 self._ref_count = 0
71 self._sessiond_sock = None
72 self._reg_queue = reg_queue
73 self._server_cmd_handlers = {
74 lttngust.cmd._ServerCmdRegistrationDone: self._handle_server_cmd_reg_done,
75 lttngust.cmd._ServerCmdEnable: self._handle_server_cmd_enable,
76 lttngust.cmd._ServerCmdDisable: self._handle_server_cmd_disable,
77 lttngust.cmd._ServerCmdList: self._handle_server_cmd_list,
78 }
79
80 def _debug(self, msg):
81 return 'client "{}": {}'.format(self._name, msg)
82
83 def run(self):
84 while True:
85 try:
86 # connect to the session daemon
87 dbg._pdebug(self._debug('connecting to session daemon'))
88 self._connect_to_sessiond()
89
90 # register to the session daemon after a successful connection
91 dbg._pdebug(self._debug('registering to session daemon'))
92 self._register()
93
94 # wait for commands from the session daemon
95 self._wait_server_cmd()
96 except (Exception) as e:
97 # Whatever happens here, we have to close the socket and
98 # retry to connect to the session daemon since either
99 # the socket was closed, a network timeout occurred, or
100 # invalid data was received.
101 dbg._pdebug(self._debug('got exception: {}'.format(e)))
102 self._cleanup_socket()
103 dbg._pdebug(self._debug('sleeping for {} s'.format(_RETRY_REG_DELAY)))
104 time.sleep(_RETRY_REG_DELAY)
105
106 def _recv_server_cmd_header(self):
107 data = self._sessiond_sock.recv(lttngust.cmd._SERVER_CMD_HEADER_SIZE)
108
109 if not data:
110 dbg._pdebug(self._debug('received empty server command header'))
111 return None
112
113 assert(len(data) == lttngust.cmd._SERVER_CMD_HEADER_SIZE)
114 dbg._pdebug(self._debug('received server command header ({} bytes)'.format(len(data))))
115
116 return lttngust.cmd._server_cmd_header_from_data(data)
117
118 def _recv_server_cmd(self):
119 server_cmd_header = self._recv_server_cmd_header()
120
121 if server_cmd_header is None:
122 return None
123
124 dbg._pdebug(self._debug('server command header: data size: {} bytes'.format(server_cmd_header.data_size)))
125 dbg._pdebug(self._debug('server command header: command ID: {}'.format(server_cmd_header.cmd_id)))
126 dbg._pdebug(self._debug('server command header: command version: {}'.format(server_cmd_header.cmd_version)))
127 data = bytes()
128
129 if server_cmd_header.data_size > 0:
130 data = self._sessiond_sock.recv(server_cmd_header.data_size)
131 assert(len(data) == server_cmd_header.data_size)
132
133 return lttngust.cmd._server_cmd_from_data(server_cmd_header, data)
134
135 def _send_cmd_reply(self, cmd_reply):
136 data = cmd_reply.get_data()
137 dbg._pdebug(self._debug('sending command reply ({} bytes)'.format(len(data))))
138 self._sessiond_sock.sendall(data)
139
140 def _handle_server_cmd_reg_done(self, server_cmd):
141 dbg._pdebug(self._debug('got "registration done" server command'))
142
143 if self._reg_queue is not None:
144 dbg._pdebug(self._debug('notifying _init_threads()'))
145
146 try:
147 self._reg_queue.put(True)
148 except (Exception) as e:
149 # read side could be closed by now; ignore it
150 pass
151
152 self._reg_queue = None
153
154 def _handle_server_cmd_enable(self, server_cmd):
155 dbg._pdebug(self._debug('got "enable" server command'))
156 self._ref_count += 1
157
158 if self._ref_count == 1:
159 dbg._pdebug(self._debug('adding our handler to the root logger'))
160 self._root_logger.addHandler(self._log_handler)
161
162 dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count)))
163
164 return lttngust.cmd._ClientCmdReplyEnable()
165
166 def _handle_server_cmd_disable(self, server_cmd):
167 dbg._pdebug(self._debug('got "disable" server command'))
168 self._ref_count -= 1
169
170 if self._ref_count < 0:
171 # disable command could be sent again when a session is destroyed
172 self._ref_count = 0
173
174 if self._ref_count == 0:
175 dbg._pdebug(self._debug('removing our handler from the root logger'))
176 self._root_logger.removeHandler(self._log_handler)
177
178 dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count)))
179
180 return lttngust.cmd._ClientCmdReplyDisable()
181
182 def _handle_server_cmd_list(self, server_cmd):
183 dbg._pdebug(self._debug('got "list" server command'))
184 names = logging.Logger.manager.loggerDict.keys()
185 dbg._pdebug(self._debug('found {} loggers'.format(len(names))))
186 cmd_reply = lttngust.cmd._ClientCmdReplyList(names=names)
187
188 return cmd_reply
189
190 def _handle_server_cmd(self, server_cmd):
191 cmd_reply = None
192
193 if server_cmd is None:
194 dbg._pdebug(self._debug('bad server command'))
195 status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD
196 cmd_reply = lttngust.cmd._ClientCmdReply(status)
197 elif type(server_cmd) in self._server_cmd_handlers:
198 cmd_reply = self._server_cmd_handlers[type(server_cmd)](server_cmd)
199 else:
200 dbg._pdebug(self._debug('unknown server command'))
201 status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD
202 cmd_reply = lttngust.cmd._ClientCmdReply(status)
203
204 if cmd_reply is not None:
205 self._send_cmd_reply(cmd_reply)
206
207 def _wait_server_cmd(self):
208 while True:
209 try:
210 server_cmd = self._recv_server_cmd()
211 except socket.timeout:
212 # simply retry here; the protocol has no KA and we could
213 # wait for hours
214 continue
215
216 self._handle_server_cmd(server_cmd)
217
218 def _cleanup_socket(self):
219 try:
220 self._sessiond_sock.shutdown(socket.SHUT_RDWR)
221 self._sessiond_sock.close()
222 except:
223 pass
224
225 self._sessiond_sock = None
226
227 def _connect_to_sessiond(self):
228 # create session daemon TCP socket
229 if self._sessiond_sock is None:
230 self._sessiond_sock = socket.socket(socket.AF_INET,
231 socket.SOCK_STREAM)
232
233 # Use str(self._host) here. Since this host could be a string
234 # literal, and since we're importing __future__.unicode_literals,
235 # we want to make sure the host is a native string in Python 2.
236 # This avoids an indirect module import (unicode module to
237 # decode the unicode string, eventually imported by the
238 # socket module if needed), which is not allowed in a thread
239 # directly created by a module in Python 2 (our case).
240 #
241 # tl;dr: Do NOT remove str() here, or this call in Python 2
242 # _will_ block on an interpreter's mutex until the waiting
243 # register queue timeouts.
244 self._sessiond_sock.connect((str(self._host), self._port))
245
246 def _register(self):
247 cmd = lttngust.cmd._ClientRegisterCmd(_PROTO_DOMAIN, os.getpid(),
248 _PROTO_MAJOR, _PROTO_MINOR)
249 data = cmd.get_data()
250 self._sessiond_sock.sendall(data)
251
252
253 def _get_port_from_file(path):
254 port = None
255 dbg._pdebug('reading port from file "{}"'.format(path))
256
257 try:
258 f = open(path)
259 r_port = int(f.readline())
260 f.close()
261
262 if r_port > 0 or r_port <= 65535:
263 port = r_port
264 except:
265 pass
266
267 return port
268
269 def _get_ust_app_path():
270 paths = os.getenv('LTTNG_UST_APP_PATH')
271 if paths is None:
272 return paths
273 paths = paths.split(':')
274 if len(paths) > 1:
275 dbg._pwarning("':' separator in LTTNG_UST_APP_PATH, only the first path will be used")
276 return paths[0]
277
278 def _get_user_home_path():
279 # $LTTNG_HOME overrides $HOME if it exists
280 return os.getenv('LTTNG_HOME', os.path.expanduser('~'))
281
282
283 _initialized = False
284 _SESSIOND_HOST = '127.0.0.1'
285
286
287 def _client_thread_target(name, port, reg_queue):
288 dbg._pdebug('creating client "{}" using TCP port {}'.format(name, port))
289 client = _TcpClient(name, _SESSIOND_HOST, port, reg_queue)
290 dbg._pdebug('starting client "{}"'.format(name))
291 client.run()
292
293
294 def _init_threads():
295 global _initialized
296
297 dbg._pdebug('entering')
298
299 if _initialized:
300 dbg._pdebug('agent is already initialized')
301 return
302
303 # This makes sure that the appropriate modules for encoding and
304 # decoding strings/bytes are imported now, since no import should
305 # happen within a thread at import time (our case).
306 'lttng'.encode().decode()
307
308 _initialized = True
309
310 # The LTTNG_UST_APP_PATH environment variables disables connections
311 # to the global and per-user session daemons.
312 if _get_ust_app_path() is not None:
313 ust_app_port_file = os.path.join(_get_ust_app_path(), 'agent.port')
314 ust_app_port = _get_port_from_file(ust_app_port_file)
315 sys_port = None
316 user_port = None
317 dbg._pdebug('ust_app session daemon port: {}'.format(ust_app_port))
318 else:
319 ust_app_port = None
320 sys_port = _get_port_from_file('/var/run/lttng/agent.port')
321 user_port_file = os.path.join(_get_user_home_path(), '.lttng', 'agent.port')
322 user_port = _get_port_from_file(user_port_file)
323 dbg._pdebug('system session daemon port: {}'.format(sys_port))
324 dbg._pdebug('user session daemon port: {}'.format(user_port))
325
326 reg_queue = queue.Queue()
327 reg_expecting = 0
328
329 if sys_port == user_port and sys_port is not None:
330 # The two session daemon ports are the same. This is not normal.
331 # Connect to only one.
332 dbg._pdebug('both user and system session daemon have the same port')
333 sys_port = None
334
335 try:
336 if ust_app_port is not None:
337 dbg._pdebug('creating ust_app client thread')
338 t = threading.Thread(target=_client_thread_target,
339 args=('ust_app', ust_app_port, reg_queue))
340 t.name = 'ust_app'
341 t.daemon = True
342 t.start()
343 dbg._pdebug('created and started ust_app client thread')
344 reg_expecting += 1
345
346 if sys_port is not None:
347 dbg._pdebug('creating system client thread')
348 t = threading.Thread(target=_client_thread_target,
349 args=('system', sys_port, reg_queue))
350 t.name = 'system'
351 t.daemon = True
352 t.start()
353 dbg._pdebug('created and started system client thread')
354 reg_expecting += 1
355
356 if user_port is not None:
357 dbg._pdebug('creating user client thread')
358 t = threading.Thread(target=_client_thread_target,
359 args=('user', user_port, reg_queue))
360 t.name = 'user'
361 t.daemon = True
362 t.start()
363 dbg._pdebug('created and started user client thread')
364 reg_expecting += 1
365 except:
366 # cannot create threads for some reason; stop this initialization
367 dbg._pwarning('cannot create client threads')
368 return
369
370 if reg_expecting == 0:
371 # early exit: looks like there's not even one valid port
372 dbg._pwarning('no valid LTTng session daemon port found (is the session daemon started?)')
373 return
374
375 cur_timeout = _REG_TIMEOUT
376
377 # We block here to make sure the agent is properly registered to
378 # the session daemon. If we timeout, the client threads will still
379 # continue to try to connect and register to the session daemon,
380 # but there is no guarantee that all following logging statements
381 # will make it to LTTng-UST.
382 #
383 # When a client thread receives a "registration done" confirmation
384 # from the session daemon it's connected to, it puts True in
385 # reg_queue.
386 while True:
387 try:
388 dbg._pdebug('waiting for registration done (expecting {}, timeout is {} s)'.format(reg_expecting,
389 cur_timeout))
390 t1 = lttngust.compat._clock()
391 reg_queue.get(timeout=cur_timeout)
392 t2 = lttngust.compat._clock()
393 reg_expecting -= 1
394 dbg._pdebug('unblocked')
395
396 if reg_expecting == 0:
397 # done!
398 dbg._pdebug('successfully registered to session daemon(s)')
399 break
400
401 cur_timeout -= (t2 - t1)
402
403 if cur_timeout <= 0:
404 # timeout
405 dbg._pdebug('ran out of time')
406 break
407 except queue.Empty:
408 dbg._pdebug('ran out of time')
409 break
410
411 dbg._pdebug('leaving')
412
413
414 _init_threads()
This page took 0.038673 seconds and 4 git commands to generate.