Move LTTng-UST buffer ownership from application to consumer
[lttng-tools.git] / src / bin / lttng-sessiond / consumer.c
1 /*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _GNU_SOURCE
19 #include <assert.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <sys/stat.h>
24 #include <sys/types.h>
25 #include <unistd.h>
26
27 #include <common/common.h>
28 #include <common/defaults.h>
29 #include <common/uri.h>
30
31 #include "consumer.h"
32
33 /*
34 * Receive a reply command status message from the consumer. Consumer socket
35 * lock MUST be acquired before calling this function.
36 *
37 * Return 0 on success, -1 on recv error or a negative lttng error code which
38 * was possibly returned by the consumer.
39 */
40 int consumer_recv_status_reply(struct consumer_socket *sock)
41 {
42 int ret;
43 struct lttcomm_consumer_status_msg reply;
44
45 assert(sock);
46
47 ret = lttcomm_recv_unix_sock(sock->fd, &reply, sizeof(reply));
48 if (ret <= 0) {
49 if (ret == 0) {
50 /* Orderly shutdown. Don't return 0 which means success. */
51 ret = -1;
52 }
53 /* The above call will print a PERROR on error. */
54 DBG("Fail to receive status reply on sock %d", sock->fd);
55 goto end;
56 }
57
58 if (reply.ret_code == LTTNG_OK) {
59 /* All good. */
60 ret = 0;
61 } else {
62 ret = -reply.ret_code;
63 DBG("Consumer ret code %d", ret);
64 }
65
66 end:
67 return ret;
68 }
69
70 /*
71 * Once the ASK_CHANNEL command is sent to the consumer, the channel
72 * information are sent back. This call receives that data and populates key
73 * and stream_count.
74 *
75 * On success return 0 and both key and stream_count are set. On error, a
76 * negative value is sent back and both parameters are untouched.
77 */
78 int consumer_recv_status_channel(struct consumer_socket *sock,
79 unsigned long *key, unsigned int *stream_count)
80 {
81 int ret;
82 struct lttcomm_consumer_status_channel reply;
83
84 assert(sock);
85 assert(stream_count);
86 assert(key);
87
88 ret = lttcomm_recv_unix_sock(sock->fd, &reply, sizeof(reply));
89 if (ret <= 0) {
90 if (ret == 0) {
91 /* Orderly shutdown. Don't return 0 which means success. */
92 ret = -1;
93 }
94 /* The above call will print a PERROR on error. */
95 DBG("Fail to receive status reply on sock %d", sock->fd);
96 goto end;
97 }
98
99 /* An error is possible so don't touch the key and stream_count. */
100 if (reply.ret_code != LTTNG_OK) {
101 ret = -1;
102 goto end;
103 }
104
105 *key = reply.key;
106 *stream_count = reply.stream_count;
107
108 end:
109 return ret;
110 }
111
112 /*
113 * Send destroy relayd command to consumer.
114 *
115 * On success return positive value. On error, negative value.
116 */
117 int consumer_send_destroy_relayd(struct consumer_socket *sock,
118 struct consumer_output *consumer)
119 {
120 int ret;
121 struct lttcomm_consumer_msg msg;
122
123 assert(consumer);
124 assert(sock);
125
126 DBG2("Sending destroy relayd command to consumer sock %d", sock->fd);
127
128 /* Bail out if consumer is disabled */
129 if (!consumer->enabled) {
130 ret = LTTNG_OK;
131 DBG3("Consumer is disabled");
132 goto error;
133 }
134
135 msg.cmd_type = LTTNG_CONSUMER_DESTROY_RELAYD;
136 msg.u.destroy_relayd.net_seq_idx = consumer->net_seq_index;
137
138 pthread_mutex_lock(sock->lock);
139 ret = lttcomm_send_unix_sock(sock->fd, &msg, sizeof(msg));
140 if (ret < 0) {
141 /* Indicate that the consumer is probably closing at this point. */
142 DBG("send consumer destroy relayd command");
143 goto error_send;
144 }
145
146 /* Don't check the return value. The caller will do it. */
147 ret = consumer_recv_status_reply(sock);
148
149 DBG2("Consumer send destroy relayd command done");
150
151 error_send:
152 pthread_mutex_unlock(sock->lock);
153 error:
154 return ret;
155 }
156
157 /*
158 * For each consumer socket in the consumer output object, send a destroy
159 * relayd command.
160 */
161 void consumer_output_send_destroy_relayd(struct consumer_output *consumer)
162 {
163 struct lttng_ht_iter iter;
164 struct consumer_socket *socket;
165
166 assert(consumer);
167
168 /* Destroy any relayd connection */
169 if (consumer && consumer->type == CONSUMER_DST_NET) {
170 rcu_read_lock();
171 cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
172 node.node) {
173 int ret;
174
175 /* Send destroy relayd command */
176 ret = consumer_send_destroy_relayd(socket, consumer);
177 if (ret < 0) {
178 DBG("Unable to send destroy relayd command to consumer");
179 /* Continue since we MUST delete everything at this point. */
180 }
181 }
182 rcu_read_unlock();
183 }
184 }
185
186 /*
187 * From a consumer_data structure, allocate and add a consumer socket to the
188 * consumer output.
189 *
190 * Return 0 on success, else negative value on error
191 */
192 int consumer_create_socket(struct consumer_data *data,
193 struct consumer_output *output)
194 {
195 int ret = 0;
196 struct consumer_socket *socket;
197
198 assert(data);
199
200 if (output == NULL || data->cmd_sock < 0) {
201 /*
202 * Not an error. Possible there is simply not spawned consumer or it's
203 * disabled for the tracing session asking the socket.
204 */
205 goto error;
206 }
207
208 rcu_read_lock();
209 socket = consumer_find_socket(data->cmd_sock, output);
210 rcu_read_unlock();
211 if (socket == NULL) {
212 socket = consumer_allocate_socket(data->cmd_sock);
213 if (socket == NULL) {
214 ret = -1;
215 goto error;
216 }
217
218 socket->registered = 0;
219 socket->lock = &data->lock;
220 rcu_read_lock();
221 consumer_add_socket(socket, output);
222 rcu_read_unlock();
223 }
224
225 DBG3("Consumer socket created (fd: %d) and added to output",
226 data->cmd_sock);
227
228 error:
229 return ret;
230 }
231
232 /*
233 * Find a consumer_socket in a consumer_output hashtable. Read side lock must
234 * be acquired before calling this function and across use of the
235 * returned consumer_socket.
236 */
237 struct consumer_socket *consumer_find_socket(int key,
238 struct consumer_output *consumer)
239 {
240 struct lttng_ht_iter iter;
241 struct lttng_ht_node_ulong *node;
242 struct consumer_socket *socket = NULL;
243
244 /* Negative keys are lookup failures */
245 if (key < 0 || consumer == NULL) {
246 return NULL;
247 }
248
249 lttng_ht_lookup(consumer->socks, (void *)((unsigned long) key),
250 &iter);
251 node = lttng_ht_iter_get_node_ulong(&iter);
252 if (node != NULL) {
253 socket = caa_container_of(node, struct consumer_socket, node);
254 }
255
256 return socket;
257 }
258
259 /*
260 * Allocate a new consumer_socket and return the pointer.
261 */
262 struct consumer_socket *consumer_allocate_socket(int fd)
263 {
264 struct consumer_socket *socket = NULL;
265
266 socket = zmalloc(sizeof(struct consumer_socket));
267 if (socket == NULL) {
268 PERROR("zmalloc consumer socket");
269 goto error;
270 }
271
272 socket->fd = fd;
273 lttng_ht_node_init_ulong(&socket->node, fd);
274
275 error:
276 return socket;
277 }
278
279 /*
280 * Add consumer socket to consumer output object. Read side lock must be
281 * acquired before calling this function.
282 */
283 void consumer_add_socket(struct consumer_socket *sock,
284 struct consumer_output *consumer)
285 {
286 assert(sock);
287 assert(consumer);
288
289 lttng_ht_add_unique_ulong(consumer->socks, &sock->node);
290 }
291
292 /*
293 * Delte consumer socket to consumer output object. Read side lock must be
294 * acquired before calling this function.
295 */
296 void consumer_del_socket(struct consumer_socket *sock,
297 struct consumer_output *consumer)
298 {
299 int ret;
300 struct lttng_ht_iter iter;
301
302 assert(sock);
303 assert(consumer);
304
305 iter.iter.node = &sock->node.node;
306 ret = lttng_ht_del(consumer->socks, &iter);
307 assert(!ret);
308 }
309
310 /*
311 * RCU destroy call function.
312 */
313 static void destroy_socket_rcu(struct rcu_head *head)
314 {
315 struct lttng_ht_node_ulong *node =
316 caa_container_of(head, struct lttng_ht_node_ulong, head);
317 struct consumer_socket *socket =
318 caa_container_of(node, struct consumer_socket, node);
319
320 free(socket);
321 }
322
323 /*
324 * Destroy and free socket pointer in a call RCU. Read side lock must be
325 * acquired before calling this function.
326 */
327 void consumer_destroy_socket(struct consumer_socket *sock)
328 {
329 assert(sock);
330
331 /*
332 * We DO NOT close the file descriptor here since it is global to the
333 * session daemon and is closed only if the consumer dies or a custom
334 * consumer was registered,
335 */
336 if (sock->registered) {
337 DBG3("Consumer socket was registered. Closing fd %d", sock->fd);
338 lttcomm_close_unix_sock(sock->fd);
339 }
340
341 call_rcu(&sock->node.head, destroy_socket_rcu);
342 }
343
344 /*
345 * Allocate and assign data to a consumer_output object.
346 *
347 * Return pointer to structure.
348 */
349 struct consumer_output *consumer_create_output(enum consumer_dst_type type)
350 {
351 struct consumer_output *output = NULL;
352
353 output = zmalloc(sizeof(struct consumer_output));
354 if (output == NULL) {
355 PERROR("zmalloc consumer_output");
356 goto error;
357 }
358
359 /* By default, consumer output is enabled */
360 output->enabled = 1;
361 output->type = type;
362 output->net_seq_index = -1;
363
364 output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
365
366 error:
367 return output;
368 }
369
370 /*
371 * Delete the consumer_output object from the list and free the ptr.
372 */
373 void consumer_destroy_output(struct consumer_output *obj)
374 {
375 if (obj == NULL) {
376 return;
377 }
378
379 if (obj->socks) {
380 struct lttng_ht_iter iter;
381 struct consumer_socket *socket;
382
383 rcu_read_lock();
384 cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
385 consumer_del_socket(socket, obj);
386 consumer_destroy_socket(socket);
387 }
388 rcu_read_unlock();
389
390 /* Finally destroy HT */
391 lttng_ht_destroy(obj->socks);
392 }
393
394 free(obj);
395 }
396
397 /*
398 * Copy consumer output and returned the newly allocated copy.
399 */
400 struct consumer_output *consumer_copy_output(struct consumer_output *obj)
401 {
402 struct lttng_ht *tmp_ht_ptr;
403 struct lttng_ht_iter iter;
404 struct consumer_socket *socket, *copy_sock;
405 struct consumer_output *output;
406
407 assert(obj);
408
409 output = consumer_create_output(obj->type);
410 if (output == NULL) {
411 goto error;
412 }
413 /* Avoid losing the HT reference after the memcpy() */
414 tmp_ht_ptr = output->socks;
415
416 memcpy(output, obj, sizeof(struct consumer_output));
417
418 /* Putting back the HT pointer and start copying socket(s). */
419 output->socks = tmp_ht_ptr;
420
421 rcu_read_lock();
422 cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
423 /* Create new socket object. */
424 copy_sock = consumer_allocate_socket(socket->fd);
425 if (copy_sock == NULL) {
426 rcu_read_unlock();
427 goto malloc_error;
428 }
429
430 copy_sock->registered = socket->registered;
431 copy_sock->lock = socket->lock;
432 consumer_add_socket(copy_sock, output);
433 }
434 rcu_read_unlock();
435
436 error:
437 return output;
438
439 malloc_error:
440 consumer_destroy_output(output);
441 return NULL;
442 }
443
444 /*
445 * Set network URI to the consumer output object.
446 *
447 * Return 0 on success. Return 1 if the URI were equal. Else, negative value on
448 * error.
449 */
450 int consumer_set_network_uri(struct consumer_output *obj,
451 struct lttng_uri *uri)
452 {
453 int ret;
454 char tmp_path[PATH_MAX];
455 char hostname[HOST_NAME_MAX];
456 struct lttng_uri *dst_uri = NULL;
457
458 /* Code flow error safety net. */
459 assert(obj);
460 assert(uri);
461
462 switch (uri->stype) {
463 case LTTNG_STREAM_CONTROL:
464 dst_uri = &obj->dst.net.control;
465 obj->dst.net.control_isset = 1;
466 if (uri->port == 0) {
467 /* Assign default port. */
468 uri->port = DEFAULT_NETWORK_CONTROL_PORT;
469 }
470 DBG3("Consumer control URI set with port %d", uri->port);
471 break;
472 case LTTNG_STREAM_DATA:
473 dst_uri = &obj->dst.net.data;
474 obj->dst.net.data_isset = 1;
475 if (uri->port == 0) {
476 /* Assign default port. */
477 uri->port = DEFAULT_NETWORK_DATA_PORT;
478 }
479 DBG3("Consumer data URI set with port %d", uri->port);
480 break;
481 default:
482 ERR("Set network uri type unknown %d", uri->stype);
483 goto error;
484 }
485
486 ret = uri_compare(dst_uri, uri);
487 if (!ret) {
488 /* Same URI, don't touch it and return success. */
489 DBG3("URI network compare are the same");
490 goto equal;
491 }
492
493 /* URIs were not equal, replacing it. */
494 memset(dst_uri, 0, sizeof(struct lttng_uri));
495 memcpy(dst_uri, uri, sizeof(struct lttng_uri));
496 obj->type = CONSUMER_DST_NET;
497
498 /* Handle subdir and add hostname in front. */
499 if (dst_uri->stype == LTTNG_STREAM_CONTROL) {
500 /* Get hostname to append it in the pathname */
501 ret = gethostname(hostname, sizeof(hostname));
502 if (ret < 0) {
503 PERROR("gethostname. Fallback on default localhost");
504 strncpy(hostname, "localhost", sizeof(hostname));
505 }
506 hostname[sizeof(hostname) - 1] = '\0';
507
508 /* Setup consumer subdir if none present in the control URI */
509 if (strlen(dst_uri->subdir) == 0) {
510 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
511 hostname, obj->subdir);
512 } else {
513 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
514 hostname, dst_uri->subdir);
515 }
516 if (ret < 0) {
517 PERROR("snprintf set consumer uri subdir");
518 goto error;
519 }
520
521 strncpy(obj->subdir, tmp_path, sizeof(obj->subdir));
522 DBG3("Consumer set network uri subdir path %s", tmp_path);
523 }
524
525 return 0;
526 equal:
527 return 1;
528 error:
529 return -1;
530 }
531
532 /*
533 * Send file descriptor to consumer via sock.
534 */
535 int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd)
536 {
537 int ret;
538
539 assert(fds);
540 assert(sock);
541 assert(nb_fd > 0);
542
543 ret = lttcomm_send_fds_unix_sock(sock->fd, fds, nb_fd);
544 if (ret < 0) {
545 /* The above call will print a PERROR on error. */
546 DBG("Error when sending consumer fds on sock %d", sock->fd);
547 goto error;
548 }
549
550 ret = consumer_recv_status_reply(sock);
551
552 error:
553 return ret;
554 }
555
556 /*
557 * Consumer send communication message structure to consumer.
558 */
559 int consumer_send_msg(struct consumer_socket *sock,
560 struct lttcomm_consumer_msg *msg)
561 {
562 int ret;
563
564 assert(msg);
565 assert(sock);
566 assert(sock->fd >= 0);
567
568 ret = lttcomm_send_unix_sock(sock->fd, msg,
569 sizeof(struct lttcomm_consumer_msg));
570 if (ret < 0) {
571 /* The above call will print a PERROR on error. */
572 DBG("Error when sending consumer channel on sock %d", sock->fd);
573 goto error;
574 }
575
576 ret = consumer_recv_status_reply(sock);
577
578 error:
579 return ret;
580 }
581
582 /*
583 * Consumer send channel communication message structure to consumer.
584 */
585 int consumer_send_channel(struct consumer_socket *sock,
586 struct lttcomm_consumer_msg *msg)
587 {
588 int ret;
589
590 assert(msg);
591 assert(sock);
592 assert(sock->fd >= 0);
593
594 ret = lttcomm_send_unix_sock(sock->fd, msg,
595 sizeof(struct lttcomm_consumer_msg));
596 if (ret < 0) {
597 /* The above call will print a PERROR on error. */
598 DBG("Error when sending consumer channel on sock %d", sock->fd);
599 goto error;
600 }
601
602 ret = consumer_recv_status_reply(sock);
603
604 error:
605 return ret;
606 }
607
608 /*
609 * Populate the given consumer msg structure with the ask_channel command
610 * information.
611 */
612 void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
613 uint64_t subbuf_size,
614 uint64_t num_subbuf,
615 int overwrite,
616 unsigned int switch_timer_interval,
617 unsigned int read_timer_interval,
618 int output,
619 int type,
620 uint64_t session_id,
621 const char *pathname,
622 const char *name,
623 uid_t uid,
624 gid_t gid,
625 int relayd_id,
626 unsigned long key,
627 unsigned char *uuid)
628 {
629 assert(msg);
630
631 /* Zeroed structure */
632 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
633
634 msg->cmd_type = LTTNG_CONSUMER_ASK_CHANNEL_CREATION;
635 msg->u.ask_channel.subbuf_size = subbuf_size;
636 msg->u.ask_channel.num_subbuf = num_subbuf ;
637 msg->u.ask_channel.overwrite = overwrite;
638 msg->u.ask_channel.switch_timer_interval = switch_timer_interval;
639 msg->u.ask_channel.read_timer_interval = read_timer_interval;
640 msg->u.ask_channel.output = output;
641 msg->u.ask_channel.type = type;
642 msg->u.ask_channel.session_id = session_id;
643 msg->u.ask_channel.uid = uid;
644 msg->u.ask_channel.gid = gid;
645 msg->u.ask_channel.relayd_id = relayd_id;
646 msg->u.ask_channel.key = key;
647
648 memcpy(msg->u.ask_channel.uuid, uuid, sizeof(msg->u.ask_channel.uuid));
649
650 strncpy(msg->u.ask_channel.pathname, pathname,
651 sizeof(msg->u.ask_channel.pathname));
652 msg->u.ask_channel.pathname[sizeof(msg->u.ask_channel.pathname)-1] = '\0';
653
654 strncpy(msg->u.ask_channel.name, name, sizeof(msg->u.ask_channel.name));
655 msg->u.ask_channel.name[sizeof(msg->u.ask_channel.name) - 1] = '\0';
656 }
657
658 /*
659 * Init channel communication message structure.
660 */
661 void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
662 enum lttng_consumer_command cmd,
663 int channel_key,
664 uint64_t session_id,
665 const char *pathname,
666 uid_t uid,
667 gid_t gid,
668 int relayd_id,
669 const char *name,
670 unsigned int nb_init_streams,
671 enum lttng_event_output output,
672 int type)
673 {
674 assert(msg);
675
676 /* Zeroed structure */
677 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
678
679 /* Send channel */
680 msg->cmd_type = cmd;
681 msg->u.channel.channel_key = channel_key;
682 msg->u.channel.session_id = session_id;
683 msg->u.channel.uid = uid;
684 msg->u.channel.gid = gid;
685 msg->u.channel.relayd_id = relayd_id;
686 msg->u.channel.nb_init_streams = nb_init_streams;
687 msg->u.channel.output = output;
688 msg->u.channel.type = type;
689
690 strncpy(msg->u.channel.pathname, pathname,
691 sizeof(msg->u.channel.pathname));
692 msg->u.channel.pathname[sizeof(msg->u.channel.pathname) - 1] = '\0';
693
694 strncpy(msg->u.channel.name, name, sizeof(msg->u.channel.name));
695 msg->u.channel.name[sizeof(msg->u.channel.name) - 1] = '\0';
696 }
697
698 /*
699 * Init stream communication message structure.
700 */
701 void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
702 enum lttng_consumer_command cmd,
703 int channel_key,
704 int stream_key,
705 int cpu)
706 {
707 assert(msg);
708
709 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
710
711 msg->cmd_type = cmd;
712 msg->u.stream.channel_key = channel_key;
713 msg->u.stream.stream_key = stream_key;
714 msg->u.stream.cpu = cpu;
715 }
716
717 /*
718 * Send stream communication structure to the consumer.
719 */
720 int consumer_send_stream(struct consumer_socket *sock,
721 struct consumer_output *dst, struct lttcomm_consumer_msg *msg,
722 int *fds, size_t nb_fd)
723 {
724 int ret;
725
726 assert(msg);
727 assert(dst);
728 assert(sock);
729 assert(fds);
730
731 /* Send on socket */
732 ret = lttcomm_send_unix_sock(sock->fd, msg,
733 sizeof(struct lttcomm_consumer_msg));
734 if (ret < 0) {
735 /* The above call will print a PERROR on error. */
736 DBG("Error when sending consumer stream on sock %d", sock->fd);
737 goto error;
738 }
739
740 ret = consumer_recv_status_reply(sock);
741 if (ret < 0) {
742 goto error;
743 }
744
745 ret = consumer_send_fds(sock, fds, nb_fd);
746 if (ret < 0) {
747 goto error;
748 }
749
750 error:
751 return ret;
752 }
753
754 /*
755 * Send relayd socket to consumer associated with a session name.
756 *
757 * On success return positive value. On error, negative value.
758 */
759 int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
760 struct lttcomm_sock *sock, struct consumer_output *consumer,
761 enum lttng_stream_type type, unsigned int session_id)
762 {
763 int ret;
764 struct lttcomm_consumer_msg msg;
765
766 /* Code flow error. Safety net. */
767 assert(sock);
768 assert(consumer);
769 assert(consumer_sock);
770
771 /* Bail out if consumer is disabled */
772 if (!consumer->enabled) {
773 ret = LTTNG_OK;
774 goto error;
775 }
776
777 msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET;
778 /*
779 * Assign network consumer output index using the temporary consumer since
780 * this call should only be made from within a set_consumer_uri() function
781 * call in the session daemon.
782 */
783 msg.u.relayd_sock.net_index = consumer->net_seq_index;
784 msg.u.relayd_sock.type = type;
785 msg.u.relayd_sock.session_id = session_id;
786 memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock));
787
788 DBG3("Sending relayd sock info to consumer on %d", consumer_sock->fd);
789 ret = lttcomm_send_unix_sock(consumer_sock->fd, &msg, sizeof(msg));
790 if (ret < 0) {
791 /* The above call will print a PERROR on error. */
792 DBG("Error when sending relayd sockets on sock %d", sock->fd);
793 goto error;
794 }
795
796 ret = consumer_recv_status_reply(consumer_sock);
797 if (ret < 0) {
798 goto error;
799 }
800
801 DBG3("Sending relayd socket file descriptor to consumer");
802 ret = consumer_send_fds(consumer_sock, &sock->fd, 1);
803 if (ret < 0) {
804 goto error;
805 }
806
807 DBG2("Consumer relayd socket sent");
808
809 error:
810 return ret;
811 }
812
813 /*
814 * Set consumer subdirectory using the session name and a generated datetime if
815 * needed. This is appended to the current subdirectory.
816 */
817 int consumer_set_subdir(struct consumer_output *consumer,
818 const char *session_name)
819 {
820 int ret = 0;
821 unsigned int have_default_name = 0;
822 char datetime[16], tmp_path[PATH_MAX];
823 time_t rawtime;
824 struct tm *timeinfo;
825
826 assert(consumer);
827 assert(session_name);
828
829 memset(tmp_path, 0, sizeof(tmp_path));
830
831 /* Flag if we have a default session. */
832 if (strncmp(session_name, DEFAULT_SESSION_NAME "-",
833 strlen(DEFAULT_SESSION_NAME) + 1) == 0) {
834 have_default_name = 1;
835 } else {
836 /* Get date and time for session path */
837 time(&rawtime);
838 timeinfo = localtime(&rawtime);
839 strftime(datetime, sizeof(datetime), "%Y%m%d-%H%M%S", timeinfo);
840 }
841
842 if (have_default_name) {
843 ret = snprintf(tmp_path, sizeof(tmp_path),
844 "%s/%s", consumer->subdir, session_name);
845 } else {
846 ret = snprintf(tmp_path, sizeof(tmp_path),
847 "%s/%s-%s/", consumer->subdir, session_name, datetime);
848 }
849 if (ret < 0) {
850 PERROR("snprintf session name date");
851 goto error;
852 }
853
854 strncpy(consumer->subdir, tmp_path, sizeof(consumer->subdir));
855 DBG2("Consumer subdir set to %s", consumer->subdir);
856
857 error:
858 return ret;
859 }
860
861 /*
862 * Ask the consumer if the data is ready to read (NOT pending) for the specific
863 * session id.
864 *
865 * This function has a different behavior with the consumer i.e. that it waits
866 * for a reply from the consumer if yes or no the data is pending.
867 */
868 int consumer_is_data_pending(unsigned int id,
869 struct consumer_output *consumer)
870 {
871 int ret;
872 int32_t ret_code = 0; /* Default is that the data is NOT pending */
873 struct consumer_socket *socket;
874 struct lttng_ht_iter iter;
875 struct lttcomm_consumer_msg msg;
876
877 assert(consumer);
878
879 msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING;
880
881 msg.u.data_pending.session_id = (uint64_t) id;
882
883 DBG3("Consumer data pending for id %u", id);
884
885 /* Send command for each consumer */
886 rcu_read_lock();
887 cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
888 node.node) {
889 /* Code flow error */
890 assert(socket->fd >= 0);
891
892 pthread_mutex_lock(socket->lock);
893
894 ret = lttcomm_send_unix_sock(socket->fd, &msg, sizeof(msg));
895 if (ret < 0) {
896 /* The above call will print a PERROR on error. */
897 DBG("Error on consumer is data pending on sock %d", socket->fd);
898 pthread_mutex_unlock(socket->lock);
899 goto error_unlock;
900 }
901
902 /*
903 * No need for a recv reply status because the answer to the command is
904 * the reply status message.
905 */
906
907 ret = lttcomm_recv_unix_sock(socket->fd, &ret_code, sizeof(ret_code));
908 if (ret <= 0) {
909 if (ret == 0) {
910 /* Orderly shutdown. Don't return 0 which means success. */
911 ret = -1;
912 }
913 /* The above call will print a PERROR on error. */
914 DBG("Error on recv consumer is data pending on sock %d", socket->fd);
915 pthread_mutex_unlock(socket->lock);
916 goto error_unlock;
917 }
918
919 pthread_mutex_unlock(socket->lock);
920
921 if (ret_code == 1) {
922 break;
923 }
924 }
925 rcu_read_unlock();
926
927 DBG("Consumer data is %s pending for session id %u",
928 ret_code == 1 ? "" : "NOT", id);
929 return ret_code;
930
931 error_unlock:
932 rcu_read_unlock();
933 return -1;
934 }
This page took 0.048732 seconds and 4 git commands to generate.