Extend API and remove lttng_uri from lttng.h
[lttng-tools.git] / src / bin / lttng-sessiond / consumer.c
1 /*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _GNU_SOURCE
19 #include <assert.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <sys/stat.h>
24 #include <sys/types.h>
25 #include <unistd.h>
26
27 #include <common/common.h>
28 #include <common/defaults.h>
29 #include <common/uri.h>
30
31 #include "consumer.h"
32
33 /*
34 * From a consumer_data structure, allocate and add a consumer socket to the
35 * consumer output.
36 *
37 * Return 0 on success, else negative value on error
38 */
39 int consumer_create_socket(struct consumer_data *data,
40 struct consumer_output *output)
41 {
42 int ret = 0;
43 struct consumer_socket *socket;
44
45 assert(data);
46
47 if (output == NULL || data->cmd_sock < 0) {
48 /*
49 * Not an error. Possible there is simply not spawned consumer or it's
50 * disabled for the tracing session asking the socket.
51 */
52 goto error;
53 }
54
55 rcu_read_lock();
56 socket = consumer_find_socket(data->cmd_sock, output);
57 rcu_read_unlock();
58 if (socket == NULL) {
59 socket = consumer_allocate_socket(data->cmd_sock);
60 if (socket == NULL) {
61 ret = -1;
62 goto error;
63 }
64
65 socket->lock = &data->lock;
66 rcu_read_lock();
67 consumer_add_socket(socket, output);
68 rcu_read_unlock();
69 }
70
71 DBG3("Consumer socket created (fd: %d) and added to output",
72 data->cmd_sock);
73
74 error:
75 return ret;
76 }
77
78 /*
79 * Find a consumer_socket in a consumer_output hashtable. Read side lock must
80 * be acquired before calling this function and across use of the
81 * returned consumer_socket.
82 */
83 struct consumer_socket *consumer_find_socket(int key,
84 struct consumer_output *consumer)
85 {
86 struct lttng_ht_iter iter;
87 struct lttng_ht_node_ulong *node;
88 struct consumer_socket *socket = NULL;
89
90 /* Negative keys are lookup failures */
91 if (key < 0 || consumer == NULL) {
92 return NULL;
93 }
94
95 lttng_ht_lookup(consumer->socks, (void *)((unsigned long) key),
96 &iter);
97 node = lttng_ht_iter_get_node_ulong(&iter);
98 if (node != NULL) {
99 socket = caa_container_of(node, struct consumer_socket, node);
100 }
101
102 return socket;
103 }
104
105 /*
106 * Allocate a new consumer_socket and return the pointer.
107 */
108 struct consumer_socket *consumer_allocate_socket(int fd)
109 {
110 struct consumer_socket *socket = NULL;
111
112 socket = zmalloc(sizeof(struct consumer_socket));
113 if (socket == NULL) {
114 PERROR("zmalloc consumer socket");
115 goto error;
116 }
117
118 socket->fd = fd;
119 lttng_ht_node_init_ulong(&socket->node, fd);
120
121 error:
122 return socket;
123 }
124
125 /*
126 * Add consumer socket to consumer output object. Read side lock must be
127 * acquired before calling this function.
128 */
129 void consumer_add_socket(struct consumer_socket *sock,
130 struct consumer_output *consumer)
131 {
132 assert(sock);
133 assert(consumer);
134
135 lttng_ht_add_unique_ulong(consumer->socks, &sock->node);
136 }
137
138 /*
139 * Delte consumer socket to consumer output object. Read side lock must be
140 * acquired before calling this function.
141 */
142 void consumer_del_socket(struct consumer_socket *sock,
143 struct consumer_output *consumer)
144 {
145 int ret;
146 struct lttng_ht_iter iter;
147
148 assert(sock);
149 assert(consumer);
150
151 iter.iter.node = &sock->node.node;
152 ret = lttng_ht_del(consumer->socks, &iter);
153 assert(!ret);
154 }
155
156 /*
157 * RCU destroy call function.
158 */
159 static void destroy_socket_rcu(struct rcu_head *head)
160 {
161 struct lttng_ht_node_ulong *node =
162 caa_container_of(head, struct lttng_ht_node_ulong, head);
163 struct consumer_socket *socket =
164 caa_container_of(node, struct consumer_socket, node);
165
166 free(socket);
167 }
168
169 /*
170 * Destroy and free socket pointer in a call RCU. Read side lock must be
171 * acquired before calling this function.
172 */
173 void consumer_destroy_socket(struct consumer_socket *sock)
174 {
175 assert(sock);
176
177 /*
178 * We DO NOT close the file descriptor here since it is global to the
179 * session daemon and is closed only if the consumer dies.
180 */
181
182 call_rcu(&sock->node.head, destroy_socket_rcu);
183 }
184
185 /*
186 * Allocate and assign data to a consumer_output object.
187 *
188 * Return pointer to structure.
189 */
190 struct consumer_output *consumer_create_output(enum consumer_dst_type type)
191 {
192 struct consumer_output *output = NULL;
193
194 output = zmalloc(sizeof(struct consumer_output));
195 if (output == NULL) {
196 PERROR("zmalloc consumer_output");
197 goto error;
198 }
199
200 /* By default, consumer output is enabled */
201 output->enabled = 1;
202 output->type = type;
203 output->net_seq_index = -1;
204
205 output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
206
207 error:
208 return output;
209 }
210
211 /*
212 * Delete the consumer_output object from the list and free the ptr.
213 */
214 void consumer_destroy_output(struct consumer_output *obj)
215 {
216 if (obj == NULL) {
217 return;
218 }
219
220 if (obj->socks) {
221 struct lttng_ht_iter iter;
222 struct consumer_socket *socket;
223
224 cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
225 consumer_destroy_socket(socket);
226 }
227 }
228
229 free(obj);
230 }
231
232 /*
233 * Copy consumer output and returned the newly allocated copy.
234 */
235 struct consumer_output *consumer_copy_output(struct consumer_output *obj)
236 {
237 struct lttng_ht_iter iter;
238 struct consumer_socket *socket, *copy_sock;
239 struct consumer_output *output;
240
241 assert(obj);
242
243 output = consumer_create_output(obj->type);
244 if (output == NULL) {
245 goto error;
246 }
247
248 memcpy(output, obj, sizeof(struct consumer_output));
249
250 /* Copy sockets */
251 output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
252
253 cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
254 /* Create new socket object. */
255 copy_sock = consumer_allocate_socket(socket->fd);
256 if (copy_sock == NULL) {
257 goto malloc_error;
258 }
259
260 copy_sock->lock = socket->lock;
261 consumer_add_socket(copy_sock, output);
262 }
263
264 error:
265 return output;
266
267 malloc_error:
268 consumer_destroy_output(output);
269 return NULL;
270 }
271
272 /*
273 * Set network URI to the consumer output object.
274 *
275 * Return 0 on success. Negative value on error.
276 */
277 int consumer_set_network_uri(struct consumer_output *obj,
278 struct lttng_uri *uri)
279 {
280 int ret;
281 char tmp_path[PATH_MAX];
282 char hostname[HOST_NAME_MAX];
283 struct lttng_uri *dst_uri = NULL;
284
285 /* Code flow error safety net. */
286 assert(obj);
287 assert(uri);
288
289 switch (uri->stype) {
290 case LTTNG_STREAM_CONTROL:
291 dst_uri = &obj->dst.net.control;
292 obj->dst.net.control_isset = 1;
293 if (uri->port == 0) {
294 /* Assign default port. */
295 uri->port = DEFAULT_NETWORK_CONTROL_PORT;
296 }
297 break;
298 case LTTNG_STREAM_DATA:
299 dst_uri = &obj->dst.net.data;
300 obj->dst.net.data_isset = 1;
301 if (uri->port == 0) {
302 /* Assign default port. */
303 uri->port = DEFAULT_NETWORK_DATA_PORT;
304 }
305 break;
306 default:
307 ERR("Set network uri type unknown %d", uri->stype);
308 goto error;
309 }
310
311 ret = uri_compare(dst_uri, uri);
312 if (!ret) {
313 /* Same URI, don't touch it and return success. */
314 DBG3("URI network compare are the same");
315 goto end;
316 }
317
318 /* URIs were not equal, replacing it. */
319 memset(dst_uri, 0, sizeof(struct lttng_uri));
320 memcpy(dst_uri, uri, sizeof(struct lttng_uri));
321 obj->type = CONSUMER_DST_NET;
322
323 /* Handle subdir and add hostname in front. */
324 if (dst_uri->stype == LTTNG_STREAM_CONTROL) {
325 /* Get hostname to append it in the pathname */
326 ret = gethostname(hostname, sizeof(hostname));
327 if (ret < 0) {
328 PERROR("gethostname. Fallback on default localhost");
329 strncpy(hostname, "localhost", sizeof(hostname));
330 }
331 hostname[sizeof(hostname) - 1] = '\0';
332
333 /* Setup consumer subdir if none present in the control URI */
334 if (strlen(dst_uri->subdir) == 0) {
335 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
336 hostname, obj->subdir);
337 } else {
338 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
339 hostname, dst_uri->subdir);
340 }
341 if (ret < 0) {
342 PERROR("snprintf set consumer uri subdir");
343 goto error;
344 }
345
346 strncpy(obj->subdir, tmp_path, sizeof(obj->subdir));
347 DBG3("Consumer set network uri subdir path %s", tmp_path);
348 }
349
350 end:
351 return 0;
352
353 error:
354 return -1;
355 }
356
357 /*
358 * Send file descriptor to consumer via sock.
359 */
360 int consumer_send_fds(int sock, int *fds, size_t nb_fd)
361 {
362 int ret;
363
364 assert(fds);
365 assert(nb_fd > 0);
366
367 ret = lttcomm_send_fds_unix_sock(sock, fds, nb_fd);
368 if (ret < 0) {
369 PERROR("send consumer fds");
370 goto error;
371 }
372
373 error:
374 return ret;
375 }
376
377 /*
378 * Consumer send channel communication message structure to consumer.
379 */
380 int consumer_send_channel(int sock, struct lttcomm_consumer_msg *msg)
381 {
382 int ret;
383
384 assert(msg);
385 assert(sock >= 0);
386
387 ret = lttcomm_send_unix_sock(sock, msg,
388 sizeof(struct lttcomm_consumer_msg));
389 if (ret < 0) {
390 PERROR("send consumer channel");
391 goto error;
392 }
393
394 error:
395 return ret;
396 }
397
398 /*
399 * Init channel communication message structure.
400 */
401 void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
402 enum lttng_consumer_command cmd,
403 int channel_key,
404 uint64_t max_sb_size,
405 uint64_t mmap_len,
406 const char *name)
407 {
408 assert(msg);
409
410 /* TODO: Args validation */
411
412 /* Zeroed structure */
413 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
414
415 /* Send channel */
416 msg->cmd_type = cmd;
417 msg->u.channel.channel_key = channel_key;
418 msg->u.channel.max_sb_size = max_sb_size;
419 msg->u.channel.mmap_len = mmap_len;
420 }
421
422 /*
423 * Init stream communication message structure.
424 */
425 void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
426 enum lttng_consumer_command cmd,
427 int channel_key,
428 int stream_key,
429 uint32_t state,
430 enum lttng_event_output output,
431 uint64_t mmap_len,
432 uid_t uid,
433 gid_t gid,
434 int net_index,
435 unsigned int metadata_flag,
436 const char *name,
437 const char *pathname)
438 {
439 assert(msg);
440
441 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
442
443 /* TODO: Args validation */
444
445 msg->cmd_type = cmd;
446 msg->u.stream.channel_key = channel_key;
447 msg->u.stream.stream_key = stream_key;
448 msg->u.stream.state = state;
449 msg->u.stream.output = output;
450 msg->u.stream.mmap_len = mmap_len;
451 msg->u.stream.uid = uid;
452 msg->u.stream.gid = gid;
453 msg->u.stream.net_index = net_index;
454 msg->u.stream.metadata_flag = metadata_flag;
455 strncpy(msg->u.stream.name, name, sizeof(msg->u.stream.name));
456 msg->u.stream.name[sizeof(msg->u.stream.name) - 1] = '\0';
457 strncpy(msg->u.stream.path_name, pathname,
458 sizeof(msg->u.stream.path_name));
459 msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0';
460 }
461
462 /*
463 * Send stream communication structure to the consumer.
464 */
465 int consumer_send_stream(int sock, struct consumer_output *dst,
466 struct lttcomm_consumer_msg *msg, int *fds, size_t nb_fd)
467 {
468 int ret;
469
470 assert(msg);
471 assert(dst);
472
473 switch (dst->type) {
474 case CONSUMER_DST_NET:
475 /* Consumer should send the stream on the network. */
476 msg->u.stream.net_index = dst->net_seq_index;
477 break;
478 case CONSUMER_DST_LOCAL:
479 /* Add stream file name to stream path */
480 strncat(msg->u.stream.path_name, "/", sizeof(msg->u.stream.path_name));
481 strncat(msg->u.stream.path_name, msg->u.stream.name,
482 sizeof(msg->u.stream.path_name));
483 msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0';
484 /* Indicate that the stream is NOT network */
485 msg->u.stream.net_index = -1;
486 break;
487 default:
488 ERR("Consumer unknown output type (%d)", dst->type);
489 ret = -1;
490 goto error;
491 }
492
493 /* Send on socket */
494 ret = lttcomm_send_unix_sock(sock, msg,
495 sizeof(struct lttcomm_consumer_msg));
496 if (ret < 0) {
497 PERROR("send consumer stream");
498 goto error;
499 }
500
501 ret = consumer_send_fds(sock, fds, nb_fd);
502 if (ret < 0) {
503 goto error;
504 }
505
506 error:
507 return ret;
508 }
509
510 /*
511 * Send relayd socket to consumer associated with a session name.
512 *
513 * On success return positive value. On error, negative value.
514 */
515 int consumer_send_relayd_socket(int consumer_sock,
516 struct lttcomm_sock *sock, struct consumer_output *consumer,
517 enum lttng_stream_type type)
518 {
519 int ret;
520 struct lttcomm_consumer_msg msg;
521
522 /* Code flow error. Safety net. */
523 assert(sock);
524 assert(consumer);
525
526 /* Bail out if consumer is disabled */
527 if (!consumer->enabled) {
528 ret = LTTCOMM_OK;
529 goto error;
530 }
531
532 msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET;
533 /*
534 * Assign network consumer output index using the temporary consumer since
535 * this call should only be made from within a set_consumer_uri() function
536 * call in the session daemon.
537 */
538 msg.u.relayd_sock.net_index = consumer->net_seq_index;
539 msg.u.relayd_sock.type = type;
540 memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock));
541
542 DBG3("Sending relayd sock info to consumer on %d", consumer_sock);
543 ret = lttcomm_send_unix_sock(consumer_sock, &msg, sizeof(msg));
544 if (ret < 0) {
545 PERROR("send consumer relayd socket info");
546 goto error;
547 }
548
549 DBG3("Sending relayd socket file descriptor to consumer");
550 ret = consumer_send_fds(consumer_sock, &sock->fd, 1);
551 if (ret < 0) {
552 goto error;
553 }
554
555 DBG2("Consumer relayd socket sent");
556
557 error:
558 return ret;
559 }
560
561 /*
562 * Send destroy relayd command to consumer.
563 *
564 * On success return positive value. On error, negative value.
565 */
566 int consumer_send_destroy_relayd(struct consumer_socket *sock,
567 struct consumer_output *consumer)
568 {
569 int ret;
570 struct lttcomm_consumer_msg msg;
571
572 assert(consumer);
573 assert(sock);
574
575 DBG2("Sending destroy relayd command to consumer...");
576
577 /* Bail out if consumer is disabled */
578 if (!consumer->enabled) {
579 ret = LTTCOMM_OK;
580 DBG3("Consumer is disabled");
581 goto error;
582 }
583
584 msg.cmd_type = LTTNG_CONSUMER_DESTROY_RELAYD;
585 msg.u.destroy_relayd.net_seq_idx = consumer->net_seq_index;
586
587 pthread_mutex_lock(sock->lock);
588 ret = lttcomm_send_unix_sock(sock->fd, &msg, sizeof(msg));
589 pthread_mutex_unlock(sock->lock);
590 if (ret < 0) {
591 PERROR("send consumer destroy relayd command");
592 goto error;
593 }
594
595 DBG2("Consumer send destroy relayd command done");
596
597 error:
598 return ret;
599 }
This page took 0.04223 seconds and 5 git commands to generate.