Move LTTng-UST buffer ownership from application to consumer
[lttng-tools.git] / src / bin / lttng-sessiond / consumer.c
CommitLineData
00e2e675
DG
1/*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18#define _GNU_SOURCE
19#include <assert.h>
20#include <stdio.h>
21#include <stdlib.h>
22#include <string.h>
23#include <sys/stat.h>
24#include <sys/types.h>
25#include <unistd.h>
26
27#include <common/common.h>
28#include <common/defaults.h>
29#include <common/uri.h>
30
31#include "consumer.h"
32
f50f23d9
DG
33/*
34 * Receive a reply command status message from the consumer. Consumer socket
35 * lock MUST be acquired before calling this function.
36 *
37 * Return 0 on success, -1 on recv error or a negative lttng error code which
38 * was possibly returned by the consumer.
39 */
40int consumer_recv_status_reply(struct consumer_socket *sock)
41{
42 int ret;
43 struct lttcomm_consumer_status_msg reply;
44
45 assert(sock);
46
47 ret = lttcomm_recv_unix_sock(sock->fd, &reply, sizeof(reply));
a6cd2b97
DG
48 if (ret <= 0) {
49 if (ret == 0) {
50 /* Orderly shutdown. Don't return 0 which means success. */
51 ret = -1;
52 }
3448e266
DG
53 /* The above call will print a PERROR on error. */
54 DBG("Fail to receive status reply on sock %d", sock->fd);
f50f23d9
DG
55 goto end;
56 }
57
58 if (reply.ret_code == LTTNG_OK) {
59 /* All good. */
60 ret = 0;
61 } else {
62 ret = -reply.ret_code;
ffe60014 63 DBG("Consumer ret code %d", ret);
f50f23d9
DG
64 }
65
66end:
67 return ret;
68}
69
ffe60014
DG
70/*
71 * Once the ASK_CHANNEL command is sent to the consumer, the channel
72 * information are sent back. This call receives that data and populates key
73 * and stream_count.
74 *
75 * On success return 0 and both key and stream_count are set. On error, a
76 * negative value is sent back and both parameters are untouched.
77 */
78int consumer_recv_status_channel(struct consumer_socket *sock,
79 unsigned long *key, unsigned int *stream_count)
80{
81 int ret;
82 struct lttcomm_consumer_status_channel reply;
83
84 assert(sock);
85 assert(stream_count);
86 assert(key);
87
88 ret = lttcomm_recv_unix_sock(sock->fd, &reply, sizeof(reply));
89 if (ret <= 0) {
90 if (ret == 0) {
91 /* Orderly shutdown. Don't return 0 which means success. */
92 ret = -1;
93 }
94 /* The above call will print a PERROR on error. */
95 DBG("Fail to receive status reply on sock %d", sock->fd);
96 goto end;
97 }
98
99 /* An error is possible so don't touch the key and stream_count. */
100 if (reply.ret_code != LTTNG_OK) {
101 ret = -1;
102 goto end;
103 }
104
105 *key = reply.key;
106 *stream_count = reply.stream_count;
107
108end:
109 return ret;
110}
111
2f77fc4b
DG
112/*
113 * Send destroy relayd command to consumer.
114 *
115 * On success return positive value. On error, negative value.
116 */
117int consumer_send_destroy_relayd(struct consumer_socket *sock,
118 struct consumer_output *consumer)
119{
120 int ret;
121 struct lttcomm_consumer_msg msg;
122
123 assert(consumer);
124 assert(sock);
125
ffe60014 126 DBG2("Sending destroy relayd command to consumer sock %d", sock->fd);
2f77fc4b
DG
127
128 /* Bail out if consumer is disabled */
129 if (!consumer->enabled) {
f73fabfd 130 ret = LTTNG_OK;
2f77fc4b
DG
131 DBG3("Consumer is disabled");
132 goto error;
133 }
134
135 msg.cmd_type = LTTNG_CONSUMER_DESTROY_RELAYD;
136 msg.u.destroy_relayd.net_seq_idx = consumer->net_seq_index;
137
138 pthread_mutex_lock(sock->lock);
139 ret = lttcomm_send_unix_sock(sock->fd, &msg, sizeof(msg));
2f77fc4b 140 if (ret < 0) {
c5c45efa
DG
141 /* Indicate that the consumer is probably closing at this point. */
142 DBG("send consumer destroy relayd command");
f50f23d9 143 goto error_send;
2f77fc4b
DG
144 }
145
f50f23d9
DG
146 /* Don't check the return value. The caller will do it. */
147 ret = consumer_recv_status_reply(sock);
148
2f77fc4b
DG
149 DBG2("Consumer send destroy relayd command done");
150
f50f23d9
DG
151error_send:
152 pthread_mutex_unlock(sock->lock);
2f77fc4b
DG
153error:
154 return ret;
155}
156
157/*
158 * For each consumer socket in the consumer output object, send a destroy
159 * relayd command.
160 */
161void consumer_output_send_destroy_relayd(struct consumer_output *consumer)
162{
2f77fc4b
DG
163 struct lttng_ht_iter iter;
164 struct consumer_socket *socket;
165
166 assert(consumer);
167
168 /* Destroy any relayd connection */
169 if (consumer && consumer->type == CONSUMER_DST_NET) {
170 rcu_read_lock();
171 cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
172 node.node) {
c617c0c6
MD
173 int ret;
174
2f77fc4b
DG
175 /* Send destroy relayd command */
176 ret = consumer_send_destroy_relayd(socket, consumer);
177 if (ret < 0) {
c5c45efa 178 DBG("Unable to send destroy relayd command to consumer");
2f77fc4b
DG
179 /* Continue since we MUST delete everything at this point. */
180 }
181 }
182 rcu_read_unlock();
183 }
184}
185
a4b92340
DG
186/*
187 * From a consumer_data structure, allocate and add a consumer socket to the
188 * consumer output.
189 *
190 * Return 0 on success, else negative value on error
191 */
192int consumer_create_socket(struct consumer_data *data,
193 struct consumer_output *output)
194{
195 int ret = 0;
196 struct consumer_socket *socket;
197
198 assert(data);
199
200 if (output == NULL || data->cmd_sock < 0) {
201 /*
202 * Not an error. Possible there is simply not spawned consumer or it's
203 * disabled for the tracing session asking the socket.
204 */
205 goto error;
206 }
207
208 rcu_read_lock();
209 socket = consumer_find_socket(data->cmd_sock, output);
210 rcu_read_unlock();
211 if (socket == NULL) {
212 socket = consumer_allocate_socket(data->cmd_sock);
213 if (socket == NULL) {
214 ret = -1;
215 goto error;
216 }
217
2f77fc4b 218 socket->registered = 0;
a4b92340
DG
219 socket->lock = &data->lock;
220 rcu_read_lock();
221 consumer_add_socket(socket, output);
222 rcu_read_unlock();
223 }
224
225 DBG3("Consumer socket created (fd: %d) and added to output",
226 data->cmd_sock);
227
228error:
229 return ret;
230}
231
173af62f
DG
232/*
233 * Find a consumer_socket in a consumer_output hashtable. Read side lock must
234 * be acquired before calling this function and across use of the
235 * returned consumer_socket.
236 */
237struct consumer_socket *consumer_find_socket(int key,
238 struct consumer_output *consumer)
239{
240 struct lttng_ht_iter iter;
241 struct lttng_ht_node_ulong *node;
242 struct consumer_socket *socket = NULL;
243
244 /* Negative keys are lookup failures */
a4b92340 245 if (key < 0 || consumer == NULL) {
173af62f
DG
246 return NULL;
247 }
248
249 lttng_ht_lookup(consumer->socks, (void *)((unsigned long) key),
250 &iter);
251 node = lttng_ht_iter_get_node_ulong(&iter);
252 if (node != NULL) {
253 socket = caa_container_of(node, struct consumer_socket, node);
254 }
255
256 return socket;
257}
258
259/*
260 * Allocate a new consumer_socket and return the pointer.
261 */
262struct consumer_socket *consumer_allocate_socket(int fd)
263{
264 struct consumer_socket *socket = NULL;
265
266 socket = zmalloc(sizeof(struct consumer_socket));
267 if (socket == NULL) {
268 PERROR("zmalloc consumer socket");
269 goto error;
270 }
271
272 socket->fd = fd;
273 lttng_ht_node_init_ulong(&socket->node, fd);
274
275error:
276 return socket;
277}
278
279/*
280 * Add consumer socket to consumer output object. Read side lock must be
281 * acquired before calling this function.
282 */
283void consumer_add_socket(struct consumer_socket *sock,
284 struct consumer_output *consumer)
285{
286 assert(sock);
287 assert(consumer);
288
289 lttng_ht_add_unique_ulong(consumer->socks, &sock->node);
290}
291
292/*
293 * Delte consumer socket to consumer output object. Read side lock must be
294 * acquired before calling this function.
295 */
296void consumer_del_socket(struct consumer_socket *sock,
297 struct consumer_output *consumer)
298{
299 int ret;
300 struct lttng_ht_iter iter;
301
302 assert(sock);
303 assert(consumer);
304
305 iter.iter.node = &sock->node.node;
306 ret = lttng_ht_del(consumer->socks, &iter);
307 assert(!ret);
308}
309
310/*
311 * RCU destroy call function.
312 */
313static void destroy_socket_rcu(struct rcu_head *head)
314{
315 struct lttng_ht_node_ulong *node =
316 caa_container_of(head, struct lttng_ht_node_ulong, head);
317 struct consumer_socket *socket =
318 caa_container_of(node, struct consumer_socket, node);
319
320 free(socket);
321}
322
323/*
324 * Destroy and free socket pointer in a call RCU. Read side lock must be
325 * acquired before calling this function.
326 */
327void consumer_destroy_socket(struct consumer_socket *sock)
328{
329 assert(sock);
330
331 /*
332 * We DO NOT close the file descriptor here since it is global to the
2f77fc4b
DG
333 * session daemon and is closed only if the consumer dies or a custom
334 * consumer was registered,
173af62f 335 */
2f77fc4b
DG
336 if (sock->registered) {
337 DBG3("Consumer socket was registered. Closing fd %d", sock->fd);
338 lttcomm_close_unix_sock(sock->fd);
339 }
173af62f
DG
340
341 call_rcu(&sock->node.head, destroy_socket_rcu);
342}
343
00e2e675
DG
344/*
345 * Allocate and assign data to a consumer_output object.
346 *
347 * Return pointer to structure.
348 */
349struct consumer_output *consumer_create_output(enum consumer_dst_type type)
350{
351 struct consumer_output *output = NULL;
352
353 output = zmalloc(sizeof(struct consumer_output));
354 if (output == NULL) {
355 PERROR("zmalloc consumer_output");
356 goto error;
357 }
358
359 /* By default, consumer output is enabled */
360 output->enabled = 1;
361 output->type = type;
362 output->net_seq_index = -1;
173af62f
DG
363
364 output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
00e2e675
DG
365
366error:
367 return output;
368}
369
370/*
371 * Delete the consumer_output object from the list and free the ptr.
372 */
373void consumer_destroy_output(struct consumer_output *obj)
374{
375 if (obj == NULL) {
376 return;
377 }
378
173af62f
DG
379 if (obj->socks) {
380 struct lttng_ht_iter iter;
381 struct consumer_socket *socket;
382
2f77fc4b 383 rcu_read_lock();
173af62f 384 cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
2f77fc4b 385 consumer_del_socket(socket, obj);
173af62f
DG
386 consumer_destroy_socket(socket);
387 }
2f77fc4b
DG
388 rcu_read_unlock();
389
390 /* Finally destroy HT */
391 lttng_ht_destroy(obj->socks);
00e2e675 392 }
173af62f 393
00e2e675
DG
394 free(obj);
395}
396
397/*
398 * Copy consumer output and returned the newly allocated copy.
399 */
400struct consumer_output *consumer_copy_output(struct consumer_output *obj)
401{
09a90bcd 402 struct lttng_ht *tmp_ht_ptr;
173af62f
DG
403 struct lttng_ht_iter iter;
404 struct consumer_socket *socket, *copy_sock;
00e2e675
DG
405 struct consumer_output *output;
406
407 assert(obj);
408
409 output = consumer_create_output(obj->type);
410 if (output == NULL) {
411 goto error;
412 }
09a90bcd
DG
413 /* Avoid losing the HT reference after the memcpy() */
414 tmp_ht_ptr = output->socks;
00e2e675
DG
415
416 memcpy(output, obj, sizeof(struct consumer_output));
417
09a90bcd
DG
418 /* Putting back the HT pointer and start copying socket(s). */
419 output->socks = tmp_ht_ptr;
173af62f 420
b82c5c4d 421 rcu_read_lock();
173af62f
DG
422 cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
423 /* Create new socket object. */
424 copy_sock = consumer_allocate_socket(socket->fd);
425 if (copy_sock == NULL) {
b82c5c4d 426 rcu_read_unlock();
173af62f
DG
427 goto malloc_error;
428 }
429
09a90bcd 430 copy_sock->registered = socket->registered;
173af62f
DG
431 copy_sock->lock = socket->lock;
432 consumer_add_socket(copy_sock, output);
433 }
b82c5c4d 434 rcu_read_unlock();
173af62f 435
00e2e675
DG
436error:
437 return output;
173af62f
DG
438
439malloc_error:
440 consumer_destroy_output(output);
441 return NULL;
00e2e675
DG
442}
443
444/*
445 * Set network URI to the consumer output object.
446 *
ad20f474
DG
447 * Return 0 on success. Return 1 if the URI were equal. Else, negative value on
448 * error.
00e2e675
DG
449 */
450int consumer_set_network_uri(struct consumer_output *obj,
451 struct lttng_uri *uri)
452{
453 int ret;
454 char tmp_path[PATH_MAX];
455 char hostname[HOST_NAME_MAX];
456 struct lttng_uri *dst_uri = NULL;
457
458 /* Code flow error safety net. */
459 assert(obj);
460 assert(uri);
461
462 switch (uri->stype) {
463 case LTTNG_STREAM_CONTROL:
464 dst_uri = &obj->dst.net.control;
465 obj->dst.net.control_isset = 1;
466 if (uri->port == 0) {
467 /* Assign default port. */
468 uri->port = DEFAULT_NETWORK_CONTROL_PORT;
469 }
ad20f474 470 DBG3("Consumer control URI set with port %d", uri->port);
00e2e675
DG
471 break;
472 case LTTNG_STREAM_DATA:
473 dst_uri = &obj->dst.net.data;
474 obj->dst.net.data_isset = 1;
475 if (uri->port == 0) {
476 /* Assign default port. */
477 uri->port = DEFAULT_NETWORK_DATA_PORT;
478 }
ad20f474 479 DBG3("Consumer data URI set with port %d", uri->port);
00e2e675
DG
480 break;
481 default:
482 ERR("Set network uri type unknown %d", uri->stype);
483 goto error;
484 }
485
486 ret = uri_compare(dst_uri, uri);
487 if (!ret) {
488 /* Same URI, don't touch it and return success. */
489 DBG3("URI network compare are the same");
ad20f474 490 goto equal;
00e2e675
DG
491 }
492
493 /* URIs were not equal, replacing it. */
494 memset(dst_uri, 0, sizeof(struct lttng_uri));
495 memcpy(dst_uri, uri, sizeof(struct lttng_uri));
496 obj->type = CONSUMER_DST_NET;
497
498 /* Handle subdir and add hostname in front. */
499 if (dst_uri->stype == LTTNG_STREAM_CONTROL) {
500 /* Get hostname to append it in the pathname */
501 ret = gethostname(hostname, sizeof(hostname));
502 if (ret < 0) {
503 PERROR("gethostname. Fallback on default localhost");
504 strncpy(hostname, "localhost", sizeof(hostname));
505 }
506 hostname[sizeof(hostname) - 1] = '\0';
507
508 /* Setup consumer subdir if none present in the control URI */
509 if (strlen(dst_uri->subdir) == 0) {
510 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
511 hostname, obj->subdir);
512 } else {
513 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
514 hostname, dst_uri->subdir);
515 }
516 if (ret < 0) {
517 PERROR("snprintf set consumer uri subdir");
518 goto error;
519 }
520
521 strncpy(obj->subdir, tmp_path, sizeof(obj->subdir));
522 DBG3("Consumer set network uri subdir path %s", tmp_path);
523 }
524
00e2e675 525 return 0;
ad20f474
DG
526equal:
527 return 1;
00e2e675
DG
528error:
529 return -1;
530}
531
532/*
533 * Send file descriptor to consumer via sock.
534 */
f50f23d9 535int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd)
00e2e675
DG
536{
537 int ret;
538
539 assert(fds);
f50f23d9 540 assert(sock);
00e2e675
DG
541 assert(nb_fd > 0);
542
f50f23d9 543 ret = lttcomm_send_fds_unix_sock(sock->fd, fds, nb_fd);
00e2e675 544 if (ret < 0) {
3448e266
DG
545 /* The above call will print a PERROR on error. */
546 DBG("Error when sending consumer fds on sock %d", sock->fd);
00e2e675
DG
547 goto error;
548 }
549
f50f23d9
DG
550 ret = consumer_recv_status_reply(sock);
551
00e2e675
DG
552error:
553 return ret;
554}
555
ffe60014
DG
556/*
557 * Consumer send communication message structure to consumer.
558 */
559int consumer_send_msg(struct consumer_socket *sock,
560 struct lttcomm_consumer_msg *msg)
561{
562 int ret;
563
564 assert(msg);
565 assert(sock);
566 assert(sock->fd >= 0);
567
568 ret = lttcomm_send_unix_sock(sock->fd, msg,
569 sizeof(struct lttcomm_consumer_msg));
570 if (ret < 0) {
571 /* The above call will print a PERROR on error. */
572 DBG("Error when sending consumer channel on sock %d", sock->fd);
573 goto error;
574 }
575
576 ret = consumer_recv_status_reply(sock);
577
578error:
579 return ret;
580}
581
00e2e675
DG
582/*
583 * Consumer send channel communication message structure to consumer.
584 */
f50f23d9
DG
585int consumer_send_channel(struct consumer_socket *sock,
586 struct lttcomm_consumer_msg *msg)
00e2e675
DG
587{
588 int ret;
589
590 assert(msg);
f50f23d9
DG
591 assert(sock);
592 assert(sock->fd >= 0);
00e2e675 593
f50f23d9 594 ret = lttcomm_send_unix_sock(sock->fd, msg,
00e2e675
DG
595 sizeof(struct lttcomm_consumer_msg));
596 if (ret < 0) {
3448e266
DG
597 /* The above call will print a PERROR on error. */
598 DBG("Error when sending consumer channel on sock %d", sock->fd);
00e2e675
DG
599 goto error;
600 }
601
f50f23d9
DG
602 ret = consumer_recv_status_reply(sock);
603
00e2e675
DG
604error:
605 return ret;
606}
607
ffe60014
DG
608/*
609 * Populate the given consumer msg structure with the ask_channel command
610 * information.
611 */
612void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
613 uint64_t subbuf_size,
614 uint64_t num_subbuf,
615 int overwrite,
616 unsigned int switch_timer_interval,
617 unsigned int read_timer_interval,
618 int output,
619 int type,
620 uint64_t session_id,
621 const char *pathname,
622 const char *name,
623 uid_t uid,
624 gid_t gid,
625 int relayd_id,
626 unsigned long key,
627 unsigned char *uuid)
628{
629 assert(msg);
630
631 /* Zeroed structure */
632 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
633
634 msg->cmd_type = LTTNG_CONSUMER_ASK_CHANNEL_CREATION;
635 msg->u.ask_channel.subbuf_size = subbuf_size;
636 msg->u.ask_channel.num_subbuf = num_subbuf ;
637 msg->u.ask_channel.overwrite = overwrite;
638 msg->u.ask_channel.switch_timer_interval = switch_timer_interval;
639 msg->u.ask_channel.read_timer_interval = read_timer_interval;
640 msg->u.ask_channel.output = output;
641 msg->u.ask_channel.type = type;
642 msg->u.ask_channel.session_id = session_id;
643 msg->u.ask_channel.uid = uid;
644 msg->u.ask_channel.gid = gid;
645 msg->u.ask_channel.relayd_id = relayd_id;
646 msg->u.ask_channel.key = key;
647
648 memcpy(msg->u.ask_channel.uuid, uuid, sizeof(msg->u.ask_channel.uuid));
649
650 strncpy(msg->u.ask_channel.pathname, pathname,
651 sizeof(msg->u.ask_channel.pathname));
652 msg->u.ask_channel.pathname[sizeof(msg->u.ask_channel.pathname)-1] = '\0';
653
654 strncpy(msg->u.ask_channel.name, name, sizeof(msg->u.ask_channel.name));
655 msg->u.ask_channel.name[sizeof(msg->u.ask_channel.name) - 1] = '\0';
656}
657
00e2e675
DG
658/*
659 * Init channel communication message structure.
660 */
661void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
662 enum lttng_consumer_command cmd,
663 int channel_key,
ffe60014
DG
664 uint64_t session_id,
665 const char *pathname,
666 uid_t uid,
667 gid_t gid,
668 int relayd_id,
c30aaa51 669 const char *name,
ffe60014
DG
670 unsigned int nb_init_streams,
671 enum lttng_event_output output,
672 int type)
00e2e675
DG
673{
674 assert(msg);
675
00e2e675
DG
676 /* Zeroed structure */
677 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
678
679 /* Send channel */
680 msg->cmd_type = cmd;
681 msg->u.channel.channel_key = channel_key;
ffe60014
DG
682 msg->u.channel.session_id = session_id;
683 msg->u.channel.uid = uid;
684 msg->u.channel.gid = gid;
685 msg->u.channel.relayd_id = relayd_id;
c30aaa51 686 msg->u.channel.nb_init_streams = nb_init_streams;
ffe60014
DG
687 msg->u.channel.output = output;
688 msg->u.channel.type = type;
689
690 strncpy(msg->u.channel.pathname, pathname,
691 sizeof(msg->u.channel.pathname));
692 msg->u.channel.pathname[sizeof(msg->u.channel.pathname) - 1] = '\0';
693
694 strncpy(msg->u.channel.name, name, sizeof(msg->u.channel.name));
695 msg->u.channel.name[sizeof(msg->u.channel.name) - 1] = '\0';
00e2e675
DG
696}
697
698/*
699 * Init stream communication message structure.
700 */
701void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
702 enum lttng_consumer_command cmd,
703 int channel_key,
704 int stream_key,
ffe60014 705 int cpu)
00e2e675
DG
706{
707 assert(msg);
708
709 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
710
00e2e675
DG
711 msg->cmd_type = cmd;
712 msg->u.stream.channel_key = channel_key;
713 msg->u.stream.stream_key = stream_key;
ffe60014 714 msg->u.stream.cpu = cpu;
00e2e675
DG
715}
716
717/*
718 * Send stream communication structure to the consumer.
719 */
f50f23d9
DG
720int consumer_send_stream(struct consumer_socket *sock,
721 struct consumer_output *dst, struct lttcomm_consumer_msg *msg,
722 int *fds, size_t nb_fd)
00e2e675
DG
723{
724 int ret;
725
726 assert(msg);
727 assert(dst);
f50f23d9 728 assert(sock);
ffe60014 729 assert(fds);
00e2e675
DG
730
731 /* Send on socket */
f50f23d9 732 ret = lttcomm_send_unix_sock(sock->fd, msg,
00e2e675
DG
733 sizeof(struct lttcomm_consumer_msg));
734 if (ret < 0) {
3448e266
DG
735 /* The above call will print a PERROR on error. */
736 DBG("Error when sending consumer stream on sock %d", sock->fd);
00e2e675
DG
737 goto error;
738 }
739
f50f23d9
DG
740 ret = consumer_recv_status_reply(sock);
741 if (ret < 0) {
742 goto error;
743 }
744
00e2e675
DG
745 ret = consumer_send_fds(sock, fds, nb_fd);
746 if (ret < 0) {
747 goto error;
748 }
749
750error:
751 return ret;
752}
37278a1e
DG
753
754/*
755 * Send relayd socket to consumer associated with a session name.
756 *
757 * On success return positive value. On error, negative value.
758 */
f50f23d9 759int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
37278a1e 760 struct lttcomm_sock *sock, struct consumer_output *consumer,
46e6455f 761 enum lttng_stream_type type, unsigned int session_id)
37278a1e
DG
762{
763 int ret;
764 struct lttcomm_consumer_msg msg;
765
766 /* Code flow error. Safety net. */
767 assert(sock);
768 assert(consumer);
f50f23d9 769 assert(consumer_sock);
37278a1e
DG
770
771 /* Bail out if consumer is disabled */
772 if (!consumer->enabled) {
f73fabfd 773 ret = LTTNG_OK;
37278a1e
DG
774 goto error;
775 }
776
777 msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET;
778 /*
779 * Assign network consumer output index using the temporary consumer since
780 * this call should only be made from within a set_consumer_uri() function
781 * call in the session daemon.
782 */
783 msg.u.relayd_sock.net_index = consumer->net_seq_index;
784 msg.u.relayd_sock.type = type;
46e6455f 785 msg.u.relayd_sock.session_id = session_id;
37278a1e
DG
786 memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock));
787
f50f23d9
DG
788 DBG3("Sending relayd sock info to consumer on %d", consumer_sock->fd);
789 ret = lttcomm_send_unix_sock(consumer_sock->fd, &msg, sizeof(msg));
37278a1e 790 if (ret < 0) {
3448e266
DG
791 /* The above call will print a PERROR on error. */
792 DBG("Error when sending relayd sockets on sock %d", sock->fd);
37278a1e
DG
793 goto error;
794 }
795
f50f23d9
DG
796 ret = consumer_recv_status_reply(consumer_sock);
797 if (ret < 0) {
798 goto error;
799 }
800
37278a1e
DG
801 DBG3("Sending relayd socket file descriptor to consumer");
802 ret = consumer_send_fds(consumer_sock, &sock->fd, 1);
803 if (ret < 0) {
804 goto error;
805 }
806
807 DBG2("Consumer relayd socket sent");
808
809error:
810 return ret;
811}
173af62f
DG
812
813/*
2f77fc4b
DG
814 * Set consumer subdirectory using the session name and a generated datetime if
815 * needed. This is appended to the current subdirectory.
173af62f 816 */
2f77fc4b
DG
817int consumer_set_subdir(struct consumer_output *consumer,
818 const char *session_name)
173af62f 819{
2f77fc4b
DG
820 int ret = 0;
821 unsigned int have_default_name = 0;
822 char datetime[16], tmp_path[PATH_MAX];
823 time_t rawtime;
824 struct tm *timeinfo;
173af62f
DG
825
826 assert(consumer);
2f77fc4b
DG
827 assert(session_name);
828
829 memset(tmp_path, 0, sizeof(tmp_path));
830
831 /* Flag if we have a default session. */
832 if (strncmp(session_name, DEFAULT_SESSION_NAME "-",
833 strlen(DEFAULT_SESSION_NAME) + 1) == 0) {
834 have_default_name = 1;
835 } else {
836 /* Get date and time for session path */
837 time(&rawtime);
838 timeinfo = localtime(&rawtime);
839 strftime(datetime, sizeof(datetime), "%Y%m%d-%H%M%S", timeinfo);
173af62f
DG
840 }
841
2f77fc4b
DG
842 if (have_default_name) {
843 ret = snprintf(tmp_path, sizeof(tmp_path),
844 "%s/%s", consumer->subdir, session_name);
845 } else {
846 ret = snprintf(tmp_path, sizeof(tmp_path),
847 "%s/%s-%s/", consumer->subdir, session_name, datetime);
848 }
173af62f 849 if (ret < 0) {
2f77fc4b 850 PERROR("snprintf session name date");
173af62f
DG
851 goto error;
852 }
853
2f77fc4b
DG
854 strncpy(consumer->subdir, tmp_path, sizeof(consumer->subdir));
855 DBG2("Consumer subdir set to %s", consumer->subdir);
173af62f
DG
856
857error:
858 return ret;
859}
806e2684
DG
860
861/*
6d805429 862 * Ask the consumer if the data is ready to read (NOT pending) for the specific
806e2684
DG
863 * session id.
864 *
865 * This function has a different behavior with the consumer i.e. that it waits
6d805429 866 * for a reply from the consumer if yes or no the data is pending.
806e2684 867 */
6d805429 868int consumer_is_data_pending(unsigned int id,
806e2684
DG
869 struct consumer_output *consumer)
870{
871 int ret;
6d805429 872 int32_t ret_code = 0; /* Default is that the data is NOT pending */
806e2684
DG
873 struct consumer_socket *socket;
874 struct lttng_ht_iter iter;
875 struct lttcomm_consumer_msg msg;
876
877 assert(consumer);
878
6d805429 879 msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING;
806e2684 880
6d805429 881 msg.u.data_pending.session_id = (uint64_t) id;
806e2684 882
6d805429 883 DBG3("Consumer data pending for id %u", id);
806e2684 884
c8f59ee5 885 /* Send command for each consumer */
b82c5c4d 886 rcu_read_lock();
806e2684
DG
887 cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
888 node.node) {
889 /* Code flow error */
890 assert(socket->fd >= 0);
891
892 pthread_mutex_lock(socket->lock);
893
894 ret = lttcomm_send_unix_sock(socket->fd, &msg, sizeof(msg));
895 if (ret < 0) {
3448e266
DG
896 /* The above call will print a PERROR on error. */
897 DBG("Error on consumer is data pending on sock %d", socket->fd);
806e2684 898 pthread_mutex_unlock(socket->lock);
b82c5c4d 899 goto error_unlock;
806e2684
DG
900 }
901
f50f23d9
DG
902 /*
903 * No need for a recv reply status because the answer to the command is
904 * the reply status message.
905 */
906
806e2684 907 ret = lttcomm_recv_unix_sock(socket->fd, &ret_code, sizeof(ret_code));
a6cd2b97
DG
908 if (ret <= 0) {
909 if (ret == 0) {
910 /* Orderly shutdown. Don't return 0 which means success. */
911 ret = -1;
912 }
3448e266
DG
913 /* The above call will print a PERROR on error. */
914 DBG("Error on recv consumer is data pending on sock %d", socket->fd);
806e2684 915 pthread_mutex_unlock(socket->lock);
b82c5c4d 916 goto error_unlock;
806e2684
DG
917 }
918
919 pthread_mutex_unlock(socket->lock);
920
6d805429 921 if (ret_code == 1) {
806e2684
DG
922 break;
923 }
924 }
b82c5c4d 925 rcu_read_unlock();
806e2684 926
3448e266
DG
927 DBG("Consumer data is %s pending for session id %u",
928 ret_code == 1 ? "" : "NOT", id);
806e2684
DG
929 return ret_code;
930
b82c5c4d
DG
931error_unlock:
932 rcu_read_unlock();
806e2684
DG
933 return -1;
934}
This page took 0.065557 seconds and 4 git commands to generate.