1 # -*- coding: utf-8 -*-
3 # SPDX-License-Identifier: LGPL-2.1-only
5 # Copyright (C) 2015 Philippe Proulx <pproulx@efficios.com>
6 # Copyright (C) 2014 David Goulet <dgoulet@efficios.com>
8 from __future__
import unicode_literals
9 from __future__
import print_function
10 from __future__
import division
11 import lttngust
.debug
as dbg
12 import lttngust
.loghandler
13 import lttngust
.compat
37 def _get_env_value_ms(key
, default_s
):
39 val
= int(os
.getenv(key
, default_s
* 1000)) / 1000
44 fmt
= 'invalid ${} value; {} seconds will be used'
45 dbg
._pwarning
(fmt
.format(key
, default_s
))
51 _REG_TIMEOUT
= _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_TIMEOUT', 5)
52 _RETRY_REG_DELAY
= _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_RETRY_DELAY', 3)
55 class _TcpClient(object):
56 def __init__(self
, name
, host
, port
, reg_queue
):
57 super(self
.__class
__, self
).__init
__()
63 self
._log
_handler
= lttngust
.loghandler
._Handler
()
64 except (OSError) as e
:
65 dbg
._pwarning
('cannot load library: {}'.format(e
))
68 self
._root
_logger
= logging
.getLogger()
69 self
._root
_logger
.setLevel(logging
.NOTSET
)
71 self
._sessiond
_sock
= None
72 self
._reg
_queue
= reg_queue
73 self
._server
_cmd
_handlers
= {
74 lttngust
.cmd
._ServerCmdRegistrationDone
: self
._handle
_server
_cmd
_reg
_done
,
75 lttngust
.cmd
._ServerCmdEnable
: self
._handle
_server
_cmd
_enable
,
76 lttngust
.cmd
._ServerCmdDisable
: self
._handle
_server
_cmd
_disable
,
77 lttngust
.cmd
._ServerCmdList
: self
._handle
_server
_cmd
_list
,
80 def _debug(self
, msg
):
81 return 'client "{}": {}'.format(self
._name
, msg
)
86 # connect to the session daemon
87 dbg
._pdebug
(self
._debug
('connecting to session daemon'))
88 self
._connect
_to
_sessiond
()
90 # register to the session daemon after a successful connection
91 dbg
._pdebug
(self
._debug
('registering to session daemon'))
94 # wait for commands from the session daemon
95 self
._wait
_server
_cmd
()
96 except (Exception) as e
:
97 # Whatever happens here, we have to close the socket and
98 # retry to connect to the session daemon since either
99 # the socket was closed, a network timeout occurred, or
100 # invalid data was received.
101 dbg
._pdebug
(self
._debug
('got exception: {}'.format(e
)))
102 self
._cleanup
_socket
()
103 dbg
._pdebug
(self
._debug
('sleeping for {} s'.format(_RETRY_REG_DELAY
)))
104 time
.sleep(_RETRY_REG_DELAY
)
106 def _recv_server_cmd_header(self
):
107 data
= self
._sessiond
_sock
.recv(lttngust
.cmd
._SERVER
_CMD
_HEADER
_SIZE
)
110 dbg
._pdebug
(self
._debug
('received empty server command header'))
113 assert(len(data
) == lttngust
.cmd
._SERVER
_CMD
_HEADER
_SIZE
)
114 dbg
._pdebug
(self
._debug
('received server command header ({} bytes)'.format(len(data
))))
116 return lttngust
.cmd
._server
_cmd
_header
_from
_data
(data
)
118 def _recv_server_cmd(self
):
119 server_cmd_header
= self
._recv
_server
_cmd
_header
()
121 if server_cmd_header
is None:
124 dbg
._pdebug
(self
._debug
('server command header: data size: {} bytes'.format(server_cmd_header
.data_size
)))
125 dbg
._pdebug
(self
._debug
('server command header: command ID: {}'.format(server_cmd_header
.cmd_id
)))
126 dbg
._pdebug
(self
._debug
('server command header: command version: {}'.format(server_cmd_header
.cmd_version
)))
129 if server_cmd_header
.data_size
> 0:
130 data
= self
._sessiond
_sock
.recv(server_cmd_header
.data_size
)
131 assert(len(data
) == server_cmd_header
.data_size
)
133 return lttngust
.cmd
._server
_cmd
_from
_data
(server_cmd_header
, data
)
135 def _send_cmd_reply(self
, cmd_reply
):
136 data
= cmd_reply
.get_data()
137 dbg
._pdebug
(self
._debug
('sending command reply ({} bytes)'.format(len(data
))))
138 self
._sessiond
_sock
.sendall(data
)
140 def _handle_server_cmd_reg_done(self
, server_cmd
):
141 dbg
._pdebug
(self
._debug
('got "registration done" server command'))
143 if self
._reg
_queue
is not None:
144 dbg
._pdebug
(self
._debug
('notifying _init_threads()'))
147 self
._reg
_queue
.put(True)
148 except (Exception) as e
:
149 # read side could be closed by now; ignore it
152 self
._reg
_queue
= None
154 def _handle_server_cmd_enable(self
, server_cmd
):
155 dbg
._pdebug
(self
._debug
('got "enable" server command'))
158 if self
._ref
_count
== 1:
159 dbg
._pdebug
(self
._debug
('adding our handler to the root logger'))
160 self
._root
_logger
.addHandler(self
._log
_handler
)
162 dbg
._pdebug
(self
._debug
('ref count is {}'.format(self
._ref
_count
)))
164 return lttngust
.cmd
._ClientCmdReplyEnable
()
166 def _handle_server_cmd_disable(self
, server_cmd
):
167 dbg
._pdebug
(self
._debug
('got "disable" server command'))
170 if self
._ref
_count
< 0:
171 # disable command could be sent again when a session is destroyed
174 if self
._ref
_count
== 0:
175 dbg
._pdebug
(self
._debug
('removing our handler from the root logger'))
176 self
._root
_logger
.removeHandler(self
._log
_handler
)
178 dbg
._pdebug
(self
._debug
('ref count is {}'.format(self
._ref
_count
)))
180 return lttngust
.cmd
._ClientCmdReplyDisable
()
182 def _handle_server_cmd_list(self
, server_cmd
):
183 dbg
._pdebug
(self
._debug
('got "list" server command'))
184 names
= logging
.Logger
.manager
.loggerDict
.keys()
185 dbg
._pdebug
(self
._debug
('found {} loggers'.format(len(names
))))
186 cmd_reply
= lttngust
.cmd
._ClientCmdReplyList
(names
=names
)
190 def _handle_server_cmd(self
, server_cmd
):
193 if server_cmd
is None:
194 dbg
._pdebug
(self
._debug
('bad server command'))
195 status
= lttngust
.cmd
._CLIENT
_CMD
_REPLY
_STATUS
_INVALID
_CMD
196 cmd_reply
= lttngust
.cmd
._ClientCmdReply
(status
)
197 elif type(server_cmd
) in self
._server
_cmd
_handlers
:
198 cmd_reply
= self
._server
_cmd
_handlers
[type(server_cmd
)](server_cmd
)
200 dbg
._pdebug
(self
._debug
('unknown server command'))
201 status
= lttngust
.cmd
._CLIENT
_CMD
_REPLY
_STATUS
_INVALID
_CMD
202 cmd_reply
= lttngust
.cmd
._ClientCmdReply
(status
)
204 if cmd_reply
is not None:
205 self
._send
_cmd
_reply
(cmd_reply
)
207 def _wait_server_cmd(self
):
210 server_cmd
= self
._recv
_server
_cmd
()
211 except socket
.timeout
:
212 # simply retry here; the protocol has no KA and we could
216 self
._handle
_server
_cmd
(server_cmd
)
218 def _cleanup_socket(self
):
220 self
._sessiond
_sock
.shutdown(socket
.SHUT_RDWR
)
221 self
._sessiond
_sock
.close()
225 self
._sessiond
_sock
= None
227 def _connect_to_sessiond(self
):
228 # create session daemon TCP socket
229 if self
._sessiond
_sock
is None:
230 self
._sessiond
_sock
= socket
.socket(socket
.AF_INET
,
233 # Use str(self._host) here. Since this host could be a string
234 # literal, and since we're importing __future__.unicode_literals,
235 # we want to make sure the host is a native string in Python 2.
236 # This avoids an indirect module import (unicode module to
237 # decode the unicode string, eventually imported by the
238 # socket module if needed), which is not allowed in a thread
239 # directly created by a module in Python 2 (our case).
241 # tl;dr: Do NOT remove str() here, or this call in Python 2
242 # _will_ block on an interpreter's mutex until the waiting
243 # register queue timeouts.
244 self
._sessiond
_sock
.connect((str(self
._host
), self
._port
))
247 cmd
= lttngust
.cmd
._ClientRegisterCmd
(_PROTO_DOMAIN
, os
.getpid(),
248 _PROTO_MAJOR
, _PROTO_MINOR
)
249 data
= cmd
.get_data()
250 self
._sessiond
_sock
.sendall(data
)
253 def _get_port_from_file(path
):
255 dbg
._pdebug
('reading port from file "{}"'.format(path
))
259 r_port
= int(f
.readline())
262 if r_port
> 0 or r_port
<= 65535:
269 def _get_ust_app_path():
270 paths
= os
.getenv('LTTNG_UST_APP_PATH')
273 paths
= paths
.split(':')
275 dbg
._pwarning
("':' separator in LTTNG_UST_APP_PATH, only the first path will be used")
278 def _get_user_home_path():
279 # $LTTNG_HOME overrides $HOME if it exists
280 return os
.getenv('LTTNG_HOME', os
.path
.expanduser('~'))
284 _SESSIOND_HOST
= '127.0.0.1'
287 def _client_thread_target(name
, port
, reg_queue
):
288 dbg
._pdebug
('creating client "{}" using TCP port {}'.format(name
, port
))
289 client
= _TcpClient(name
, _SESSIOND_HOST
, port
, reg_queue
)
290 dbg
._pdebug
('starting client "{}"'.format(name
))
297 dbg
._pdebug
('entering')
300 dbg
._pdebug
('agent is already initialized')
303 # This makes sure that the appropriate modules for encoding and
304 # decoding strings/bytes are imported now, since no import should
305 # happen within a thread at import time (our case).
306 'lttng'.encode().decode()
310 # The LTTNG_UST_APP_PATH environment variables disables connections
311 # to the global and per-user session daemons.
312 if _get_ust_app_path() is not None:
313 ust_app_port_file
= os
.path
.join(_get_ust_app_path(), 'agent.port')
314 ust_app_port
= _get_port_from_file(ust_app_port_file
)
317 dbg
._pdebug
('ust_app session daemon port: {}'.format(ust_app_port
))
320 sys_port
= _get_port_from_file('/var/run/lttng/agent.port')
321 user_port_file
= os
.path
.join(_get_user_home_path(), '.lttng', 'agent.port')
322 user_port
= _get_port_from_file(user_port_file
)
323 dbg
._pdebug
('system session daemon port: {}'.format(sys_port
))
324 dbg
._pdebug
('user session daemon port: {}'.format(user_port
))
326 reg_queue
= queue
.Queue()
329 if sys_port
== user_port
and sys_port
is not None:
330 # The two session daemon ports are the same. This is not normal.
331 # Connect to only one.
332 dbg
._pdebug
('both user and system session daemon have the same port')
336 if ust_app_port
is not None:
337 dbg
._pdebug
('creating ust_app client thread')
338 t
= threading
.Thread(target
=_client_thread_target
,
339 args
=('ust_app', ust_app_port
, reg_queue
))
343 dbg
._pdebug
('created and started ust_app client thread')
346 if sys_port
is not None:
347 dbg
._pdebug
('creating system client thread')
348 t
= threading
.Thread(target
=_client_thread_target
,
349 args
=('system', sys_port
, reg_queue
))
353 dbg
._pdebug
('created and started system client thread')
356 if user_port
is not None:
357 dbg
._pdebug
('creating user client thread')
358 t
= threading
.Thread(target
=_client_thread_target
,
359 args
=('user', user_port
, reg_queue
))
363 dbg
._pdebug
('created and started user client thread')
366 # cannot create threads for some reason; stop this initialization
367 dbg
._pwarning
('cannot create client threads')
370 if reg_expecting
== 0:
371 # early exit: looks like there's not even one valid port
372 dbg
._pwarning
('no valid LTTng session daemon port found (is the session daemon started?)')
375 cur_timeout
= _REG_TIMEOUT
377 # We block here to make sure the agent is properly registered to
378 # the session daemon. If we timeout, the client threads will still
379 # continue to try to connect and register to the session daemon,
380 # but there is no guarantee that all following logging statements
381 # will make it to LTTng-UST.
383 # When a client thread receives a "registration done" confirmation
384 # from the session daemon it's connected to, it puts True in
388 dbg
._pdebug
('waiting for registration done (expecting {}, timeout is {} s)'.format(reg_expecting
,
390 t1
= lttngust
.compat
._clock
()
391 reg_queue
.get(timeout
=cur_timeout
)
392 t2
= lttngust
.compat
._clock
()
394 dbg
._pdebug
('unblocked')
396 if reg_expecting
== 0:
398 dbg
._pdebug
('successfully registered to session daemon(s)')
401 cur_timeout
-= (t2
- t1
)
405 dbg
._pdebug
('ran out of time')
408 dbg
._pdebug
('ran out of time')
411 dbg
._pdebug
('leaving')