Document prod/consumer synchro
[lttng-ust.git] / libust / lttng-ust-comm.c
... / ...
CommitLineData
1/*
2 * lttng-ust-comm.c
3 *
4 * Copyright (C) 2011 David Goulet <david.goulet@polymtl.ca>
5 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; only
10 * version 2.1 of the License.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22#include <sys/types.h>
23#include <sys/socket.h>
24#include <unistd.h>
25#include <errno.h>
26#include <ust/lttng-ust-abi.h>
27#include <lttng-ust-comm.h>
28#include <ust/usterr-signal-safe.h>
29#include <pthread.h>
30#include <semaphore.h>
31#include <time.h>
32#include <assert.h>
33#include <urcu/uatomic.h>
34
35/*
36 * communication thread mutex. Held when handling a command, also held
37 * by fork() to deal with removal of threads, and by exit path.
38 */
39static pthread_mutex_t lttng_ust_comm_mutex = PTHREAD_MUTEX_INITIALIZER;
40
41/* Should the ust comm thread quit ? */
42static int lttng_ust_comm_should_quit;
43
44/*
45 * Wait for either of these before continuing to the main
46 * program:
47 * - the register_done message from sessiond daemon
48 * (will let the sessiond daemon enable sessions before main
49 * starts.)
50 * - sessiond daemon is not reachable.
51 * - timeout (ensuring applications are resilient to session
52 * daemon problems).
53 */
54static sem_t constructor_wait;
55/*
56 * Doing this for both the global and local sessiond.
57 */
58static int sem_count = { 2 };
59
60/*
61 * Info about socket and associated listener thread.
62 */
63struct sock_info {
64 const char *name;
65 char sock_path[PATH_MAX];
66 int socket;
67 pthread_t ust_listener; /* listener thread */
68 int root_handle;
69};
70
71/* Socket from app (connect) to session daemon (listen) for communication */
72struct sock_info global_apps = {
73 .name = "global",
74 .sock_path = DEFAULT_GLOBAL_APPS_UNIX_SOCK,
75 .socket = -1,
76 .root_handle = -1,
77};
78
79/* TODO: allow global_apps_sock_path override */
80
81struct sock_info local_apps = {
82 .name = "local",
83 .socket = -1,
84 .root_handle = -1,
85};
86
87static
88int setup_local_apps_socket(void)
89{
90 const char *home_dir;
91
92 home_dir = (const char *) getenv("HOME");
93 if (!home_dir)
94 return -ENOENT;
95 snprintf(local_apps.sock_path, PATH_MAX,
96 DEFAULT_HOME_APPS_UNIX_SOCK, home_dir);
97 return 0;
98}
99
100static
101int register_app_to_sessiond(int socket)
102{
103 ssize_t ret;
104 struct {
105 uint32_t major;
106 uint32_t minor;
107 pid_t pid;
108 uid_t uid;
109 } reg_msg;
110
111 reg_msg.major = LTTNG_UST_COMM_VERSION_MAJOR;
112 reg_msg.minor = LTTNG_UST_COMM_VERSION_MINOR;
113 reg_msg.pid = getpid();
114 reg_msg.uid = getuid();
115
116 ret = lttcomm_send_unix_sock(socket, &reg_msg, sizeof(reg_msg));
117 if (ret >= 0 && ret != sizeof(reg_msg))
118 return -EIO;
119 return ret;
120}
121
122static
123int send_reply(int sock, struct lttcomm_ust_reply *lur)
124{
125 ssize_t len;
126
127 len = lttcomm_send_unix_sock(sock, lur, sizeof(*lur));
128 switch (len) {
129 case sizeof(*lur):
130 DBG("message successfully sent");
131 return 0;
132 case -1:
133 if (errno == ECONNRESET) {
134 printf("remote end closed connection\n");
135 return 0;
136 }
137 return -1;
138 default:
139 printf("incorrect message size: %zd\n", len);
140 return -1;
141 }
142}
143
144static
145int handle_register_done(void)
146{
147 int ret;
148
149 ret = uatomic_add_return(&sem_count, -1);
150 if (ret == 0) {
151 ret = sem_post(&constructor_wait);
152 assert(!ret);
153 }
154 return 0;
155}
156
157static
158int handle_message(struct sock_info *sock_info,
159 int sock, struct lttcomm_ust_msg *lum)
160{
161 int ret = 0;
162 const struct objd_ops *ops;
163 struct lttcomm_ust_reply lur;
164
165 pthread_mutex_lock(&lttng_ust_comm_mutex);
166
167 memset(&lur, 0, sizeof(lur));
168
169 if (lttng_ust_comm_should_quit) {
170 ret = -EPERM;
171 goto end;
172 }
173
174 ops = objd_ops(lum->handle);
175 if (!ops) {
176 ret = -ENOENT;
177 goto end;
178 }
179
180 switch (lum->cmd) {
181 case LTTNG_UST_REGISTER_DONE:
182 if (lum->handle == LTTNG_UST_ROOT_HANDLE)
183 ret = handle_register_done();
184 else
185 ret = -EINVAL;
186 break;
187 case LTTNG_UST_RELEASE:
188 if (lum->handle == LTTNG_UST_ROOT_HANDLE)
189 ret = -EPERM;
190 else
191 ret = objd_unref(lum->handle);
192 break;
193 default:
194 if (ops->cmd)
195 ret = ops->cmd(lum->handle, lum->cmd,
196 (unsigned long) &lum->u);
197 else
198 ret = -ENOSYS;
199 break;
200 }
201
202end:
203 lur.handle = lum->handle;
204 lur.cmd = lum->cmd;
205 lur.ret_val = ret;
206 if (ret >= 0) {
207 lur.ret_code = LTTCOMM_OK;
208 } else {
209 lur.ret_code = LTTCOMM_SESSION_FAIL;
210 }
211 ret = send_reply(sock, &lur);
212
213 pthread_mutex_unlock(&lttng_ust_comm_mutex);
214 return ret;
215}
216
217static
218void cleanup_sock_info(struct sock_info *sock_info)
219{
220 int ret;
221
222 if (sock_info->socket != -1) {
223 ret = close(sock_info->socket);
224 if (ret) {
225 ERR("Error closing local apps socket");
226 }
227 sock_info->socket = -1;
228 }
229 if (sock_info->root_handle != -1) {
230 ret = objd_unref(sock_info->root_handle);
231 if (ret) {
232 ERR("Error unref root handle");
233 }
234 sock_info->root_handle = -1;
235 }
236}
237
238/*
239 * This thread does not allocate any resource, except within
240 * handle_message, within mutex protection. This mutex protects against
241 * fork and exit.
242 * The other moment it allocates resources is at socket connexion, which
243 * is also protected by the mutex.
244 */
245static
246void *ust_listener_thread(void *arg)
247{
248 struct sock_info *sock_info = arg;
249 int sock, ret;
250
251 /* Restart trying to connect to the session daemon */
252restart:
253 pthread_mutex_lock(&lttng_ust_comm_mutex);
254
255 if (lttng_ust_comm_should_quit) {
256 pthread_mutex_unlock(&lttng_ust_comm_mutex);
257 goto quit;
258 }
259
260 if (sock_info->socket != -1) {
261 ret = close(sock_info->socket);
262 if (ret) {
263 ERR("Error closing %s apps socket", sock_info->name);
264 }
265 sock_info->socket = -1;
266 }
267
268 /* Check for sessiond availability with pipe TODO */
269
270 /* Register */
271 ret = lttcomm_connect_unix_sock(sock_info->sock_path);
272 if (ret < 0) {
273 ERR("Error connecting to %s apps socket", sock_info->name);
274 /*
275 * If we cannot find the sessiond daemon, don't delay
276 * constructor execution.
277 */
278 ret = handle_register_done();
279 assert(!ret);
280 pthread_mutex_unlock(&lttng_ust_comm_mutex);
281 sleep(5);
282 goto restart;
283 }
284
285 sock_info->socket = sock = ret;
286
287 /*
288 * Create only one root handle per listener thread for the whole
289 * process lifetime.
290 */
291 if (sock_info->root_handle == -1) {
292 ret = lttng_abi_create_root_handle();
293 if (ret) {
294 ERR("Error creating root handle");
295 pthread_mutex_unlock(&lttng_ust_comm_mutex);
296 goto quit;
297 }
298 sock_info->root_handle = ret;
299 }
300
301 ret = register_app_to_sessiond(sock);
302 if (ret < 0) {
303 ERR("Error registering to %s apps socket", sock_info->name);
304 /*
305 * If we cannot register to the sessiond daemon, don't
306 * delay constructor execution.
307 */
308 ret = handle_register_done();
309 assert(!ret);
310 pthread_mutex_unlock(&lttng_ust_comm_mutex);
311 sleep(5);
312 goto restart;
313 }
314 pthread_mutex_unlock(&lttng_ust_comm_mutex);
315
316 for (;;) {
317 ssize_t len;
318 struct lttcomm_ust_msg lum;
319
320 len = lttcomm_recv_unix_sock(sock, &lum, sizeof(lum));
321 switch (len) {
322 case 0: /* orderly shutdown */
323 DBG("%s ltt-sessiond has performed an orderly shutdown\n", sock_info->name);
324 goto end;
325 case sizeof(lum):
326 DBG("message received\n");
327 ret = handle_message(sock_info, sock, &lum);
328 if (ret < 0) {
329 ERR("Error handling message for %s socket", sock_info->name);
330 }
331 continue;
332 case -1:
333 if (errno == ECONNRESET) {
334 ERR("%s remote end closed connection\n", sock_info->name);
335 goto end;
336 }
337 goto end;
338 default:
339 ERR("incorrect message size (%s socket): %zd\n", sock_info->name, len);
340 continue;
341 }
342
343 }
344end:
345 goto restart; /* try to reconnect */
346quit:
347 return NULL;
348}
349
350/*
351 * Return values: -1: don't wait. 0: wait forever. 1: timeout wait.
352 */
353static
354int get_timeout(struct timespec *constructor_timeout)
355{
356 long constructor_delay_ms = LTTNG_UST_DEFAULT_CONSTRUCTOR_TIMEOUT_MS;
357 char *str_delay;
358 int ret;
359
360 str_delay = getenv("UST_REGISTER_TIMEOUT");
361 if (str_delay) {
362 constructor_delay_ms = strtol(str_delay, NULL, 10);
363 }
364
365 switch (constructor_delay_ms) {
366 case -1:/* fall-through */
367 case 0:
368 return constructor_delay_ms;
369 default:
370 break;
371 }
372
373 /*
374 * If we are unable to find the current time, don't wait.
375 */
376 ret = clock_gettime(CLOCK_REALTIME, constructor_timeout);
377 if (ret) {
378 return -1;
379 }
380 constructor_timeout->tv_sec += constructor_delay_ms / 1000UL;
381 constructor_timeout->tv_nsec +=
382 (constructor_delay_ms % 1000UL) * 1000000UL;
383 if (constructor_timeout->tv_nsec >= 1000000000UL) {
384 constructor_timeout->tv_sec++;
385 constructor_timeout->tv_nsec -= 1000000000UL;
386 }
387 return 1;
388}
389
390/*
391 * sessiond monitoring thread: monitor presence of global and per-user
392 * sessiond by polling the application common named pipe.
393 */
394/* TODO */
395
396void __attribute__((constructor)) lttng_ust_comm_init(void)
397{
398 struct timespec constructor_timeout;
399 int timeout_mode;
400 int ret;
401
402 init_usterr();
403
404 timeout_mode = get_timeout(&constructor_timeout);
405
406 ret = sem_init(&constructor_wait, 0, 0);
407 assert(!ret);
408
409 ret = setup_local_apps_socket();
410 if (ret) {
411 ERR("Error setting up to local apps socket");
412 }
413
414 ret = pthread_create(&global_apps.ust_listener, NULL,
415 ust_listener_thread, &global_apps);
416 ret = pthread_create(&local_apps.ust_listener, NULL,
417 ust_listener_thread, &local_apps);
418
419 switch (timeout_mode) {
420 case 1: /* timeout wait */
421 do {
422 ret = sem_timedwait(&constructor_wait,
423 &constructor_timeout);
424 } while (ret < 0 && errno == EINTR);
425 if (ret < 0 && errno == ETIMEDOUT) {
426 ERR("Timed out waiting for ltt-sessiond");
427 } else {
428 assert(!ret);
429 }
430 break;
431 case -1:/* wait forever */
432 do {
433 ret = sem_wait(&constructor_wait);
434 } while (ret < 0 && errno == EINTR);
435 assert(!ret);
436 break;
437 case 0: /* no timeout */
438 break;
439 }
440}
441
442void __attribute__((destructor)) lttng_ust_comm_exit(void)
443{
444 int ret;
445
446 /*
447 * Using pthread_cancel here because:
448 * A) we don't want to hang application teardown.
449 * B) the thread is not allocating any resource.
450 */
451
452 /*
453 * Require the communication thread to quit. Synchronize with
454 * mutexes to ensure it is not in a mutex critical section when
455 * pthread_cancel is later called.
456 */
457 pthread_mutex_lock(&lttng_ust_comm_mutex);
458 lttng_ust_comm_should_quit = 1;
459 pthread_mutex_unlock(&lttng_ust_comm_mutex);
460
461#if 0
462 ret = pthread_cancel(global_apps.ust_listener);
463 if (ret) {
464 ERR("Error cancelling global ust listener thread");
465 }
466#endif //0
467
468 cleanup_sock_info(&global_apps);
469
470 ret = pthread_cancel(local_apps.ust_listener);
471 if (ret) {
472 ERR("Error cancelling local ust listener thread");
473 }
474
475 cleanup_sock_info(&local_apps);
476
477 lttng_ust_abi_exit();
478 ltt_events_exit();
479}
This page took 0.02427 seconds and 4 git commands to generate.