Make the consumer sends a ACK after each command
[lttng-tools.git] / src / bin / lttng-sessiond / consumer.c
1 /*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _GNU_SOURCE
19 #include <assert.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <sys/stat.h>
24 #include <sys/types.h>
25 #include <unistd.h>
26
27 #include <common/common.h>
28 #include <common/defaults.h>
29 #include <common/uri.h>
30
31 #include "consumer.h"
32
33 /*
34 * Receive a reply command status message from the consumer. Consumer socket
35 * lock MUST be acquired before calling this function.
36 *
37 * Return 0 on success, -1 on recv error or a negative lttng error code which
38 * was possibly returned by the consumer.
39 */
40 int consumer_recv_status_reply(struct consumer_socket *sock)
41 {
42 int ret;
43 struct lttcomm_consumer_status_msg reply;
44
45 assert(sock);
46
47 ret = lttcomm_recv_unix_sock(sock->fd, &reply, sizeof(reply));
48 if (ret < 0) {
49 PERROR("recv consumer status msg");
50 goto end;
51 }
52
53 if (reply.ret_code == LTTNG_OK) {
54 /* All good. */
55 ret = 0;
56 } else {
57 ret = -reply.ret_code;
58 ERR("Consumer ret code %d", reply.ret_code);
59 }
60
61 end:
62 return ret;
63 }
64
65 /*
66 * Send destroy relayd command to consumer.
67 *
68 * On success return positive value. On error, negative value.
69 */
70 int consumer_send_destroy_relayd(struct consumer_socket *sock,
71 struct consumer_output *consumer)
72 {
73 int ret;
74 struct lttcomm_consumer_msg msg;
75
76 assert(consumer);
77 assert(sock);
78
79 DBG2("Sending destroy relayd command to consumer...");
80
81 /* Bail out if consumer is disabled */
82 if (!consumer->enabled) {
83 ret = LTTNG_OK;
84 DBG3("Consumer is disabled");
85 goto error;
86 }
87
88 msg.cmd_type = LTTNG_CONSUMER_DESTROY_RELAYD;
89 msg.u.destroy_relayd.net_seq_idx = consumer->net_seq_index;
90
91 pthread_mutex_lock(sock->lock);
92 ret = lttcomm_send_unix_sock(sock->fd, &msg, sizeof(msg));
93 if (ret < 0) {
94 PERROR("send consumer destroy relayd command");
95 goto error_send;
96 }
97
98 /* Don't check the return value. The caller will do it. */
99 ret = consumer_recv_status_reply(sock);
100
101 DBG2("Consumer send destroy relayd command done");
102
103 error_send:
104 pthread_mutex_unlock(sock->lock);
105 error:
106 return ret;
107 }
108
109 /*
110 * For each consumer socket in the consumer output object, send a destroy
111 * relayd command.
112 */
113 void consumer_output_send_destroy_relayd(struct consumer_output *consumer)
114 {
115 int ret;
116 struct lttng_ht_iter iter;
117 struct consumer_socket *socket;
118
119 assert(consumer);
120
121 /* Destroy any relayd connection */
122 if (consumer && consumer->type == CONSUMER_DST_NET) {
123 rcu_read_lock();
124 cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
125 node.node) {
126 /* Send destroy relayd command */
127 ret = consumer_send_destroy_relayd(socket, consumer);
128 if (ret < 0) {
129 ERR("Unable to send destroy relayd command to consumer");
130 /* Continue since we MUST delete everything at this point. */
131 }
132 }
133 rcu_read_unlock();
134 }
135 }
136
137 /*
138 * From a consumer_data structure, allocate and add a consumer socket to the
139 * consumer output.
140 *
141 * Return 0 on success, else negative value on error
142 */
143 int consumer_create_socket(struct consumer_data *data,
144 struct consumer_output *output)
145 {
146 int ret = 0;
147 struct consumer_socket *socket;
148
149 assert(data);
150
151 if (output == NULL || data->cmd_sock < 0) {
152 /*
153 * Not an error. Possible there is simply not spawned consumer or it's
154 * disabled for the tracing session asking the socket.
155 */
156 goto error;
157 }
158
159 rcu_read_lock();
160 socket = consumer_find_socket(data->cmd_sock, output);
161 rcu_read_unlock();
162 if (socket == NULL) {
163 socket = consumer_allocate_socket(data->cmd_sock);
164 if (socket == NULL) {
165 ret = -1;
166 goto error;
167 }
168
169 socket->registered = 0;
170 socket->lock = &data->lock;
171 rcu_read_lock();
172 consumer_add_socket(socket, output);
173 rcu_read_unlock();
174 }
175
176 DBG3("Consumer socket created (fd: %d) and added to output",
177 data->cmd_sock);
178
179 error:
180 return ret;
181 }
182
183 /*
184 * Find a consumer_socket in a consumer_output hashtable. Read side lock must
185 * be acquired before calling this function and across use of the
186 * returned consumer_socket.
187 */
188 struct consumer_socket *consumer_find_socket(int key,
189 struct consumer_output *consumer)
190 {
191 struct lttng_ht_iter iter;
192 struct lttng_ht_node_ulong *node;
193 struct consumer_socket *socket = NULL;
194
195 /* Negative keys are lookup failures */
196 if (key < 0 || consumer == NULL) {
197 return NULL;
198 }
199
200 lttng_ht_lookup(consumer->socks, (void *)((unsigned long) key),
201 &iter);
202 node = lttng_ht_iter_get_node_ulong(&iter);
203 if (node != NULL) {
204 socket = caa_container_of(node, struct consumer_socket, node);
205 }
206
207 return socket;
208 }
209
210 /*
211 * Allocate a new consumer_socket and return the pointer.
212 */
213 struct consumer_socket *consumer_allocate_socket(int fd)
214 {
215 struct consumer_socket *socket = NULL;
216
217 socket = zmalloc(sizeof(struct consumer_socket));
218 if (socket == NULL) {
219 PERROR("zmalloc consumer socket");
220 goto error;
221 }
222
223 socket->fd = fd;
224 lttng_ht_node_init_ulong(&socket->node, fd);
225
226 error:
227 return socket;
228 }
229
230 /*
231 * Add consumer socket to consumer output object. Read side lock must be
232 * acquired before calling this function.
233 */
234 void consumer_add_socket(struct consumer_socket *sock,
235 struct consumer_output *consumer)
236 {
237 assert(sock);
238 assert(consumer);
239
240 lttng_ht_add_unique_ulong(consumer->socks, &sock->node);
241 }
242
243 /*
244 * Delte consumer socket to consumer output object. Read side lock must be
245 * acquired before calling this function.
246 */
247 void consumer_del_socket(struct consumer_socket *sock,
248 struct consumer_output *consumer)
249 {
250 int ret;
251 struct lttng_ht_iter iter;
252
253 assert(sock);
254 assert(consumer);
255
256 iter.iter.node = &sock->node.node;
257 ret = lttng_ht_del(consumer->socks, &iter);
258 assert(!ret);
259 }
260
261 /*
262 * RCU destroy call function.
263 */
264 static void destroy_socket_rcu(struct rcu_head *head)
265 {
266 struct lttng_ht_node_ulong *node =
267 caa_container_of(head, struct lttng_ht_node_ulong, head);
268 struct consumer_socket *socket =
269 caa_container_of(node, struct consumer_socket, node);
270
271 free(socket);
272 }
273
274 /*
275 * Destroy and free socket pointer in a call RCU. Read side lock must be
276 * acquired before calling this function.
277 */
278 void consumer_destroy_socket(struct consumer_socket *sock)
279 {
280 assert(sock);
281
282 /*
283 * We DO NOT close the file descriptor here since it is global to the
284 * session daemon and is closed only if the consumer dies or a custom
285 * consumer was registered,
286 */
287 if (sock->registered) {
288 DBG3("Consumer socket was registered. Closing fd %d", sock->fd);
289 lttcomm_close_unix_sock(sock->fd);
290 }
291
292 call_rcu(&sock->node.head, destroy_socket_rcu);
293 }
294
295 /*
296 * Allocate and assign data to a consumer_output object.
297 *
298 * Return pointer to structure.
299 */
300 struct consumer_output *consumer_create_output(enum consumer_dst_type type)
301 {
302 struct consumer_output *output = NULL;
303
304 output = zmalloc(sizeof(struct consumer_output));
305 if (output == NULL) {
306 PERROR("zmalloc consumer_output");
307 goto error;
308 }
309
310 /* By default, consumer output is enabled */
311 output->enabled = 1;
312 output->type = type;
313 output->net_seq_index = -1;
314
315 output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
316
317 error:
318 return output;
319 }
320
321 /*
322 * Delete the consumer_output object from the list and free the ptr.
323 */
324 void consumer_destroy_output(struct consumer_output *obj)
325 {
326 if (obj == NULL) {
327 return;
328 }
329
330 if (obj->socks) {
331 struct lttng_ht_iter iter;
332 struct consumer_socket *socket;
333
334 rcu_read_lock();
335 cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
336 consumer_del_socket(socket, obj);
337 consumer_destroy_socket(socket);
338 }
339 rcu_read_unlock();
340
341 /* Finally destroy HT */
342 lttng_ht_destroy(obj->socks);
343 }
344
345 free(obj);
346 }
347
348 /*
349 * Copy consumer output and returned the newly allocated copy.
350 */
351 struct consumer_output *consumer_copy_output(struct consumer_output *obj)
352 {
353 struct lttng_ht *tmp_ht_ptr;
354 struct lttng_ht_iter iter;
355 struct consumer_socket *socket, *copy_sock;
356 struct consumer_output *output;
357
358 assert(obj);
359
360 output = consumer_create_output(obj->type);
361 if (output == NULL) {
362 goto error;
363 }
364 /* Avoid losing the HT reference after the memcpy() */
365 tmp_ht_ptr = output->socks;
366
367 memcpy(output, obj, sizeof(struct consumer_output));
368
369 /* Putting back the HT pointer and start copying socket(s). */
370 output->socks = tmp_ht_ptr;
371
372 cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
373 /* Create new socket object. */
374 copy_sock = consumer_allocate_socket(socket->fd);
375 if (copy_sock == NULL) {
376 goto malloc_error;
377 }
378
379 copy_sock->registered = socket->registered;
380 copy_sock->lock = socket->lock;
381 consumer_add_socket(copy_sock, output);
382 }
383
384 error:
385 return output;
386
387 malloc_error:
388 consumer_destroy_output(output);
389 return NULL;
390 }
391
392 /*
393 * Set network URI to the consumer output object.
394 *
395 * Return 0 on success. Return 1 if the URI were equal. Else, negative value on
396 * error.
397 */
398 int consumer_set_network_uri(struct consumer_output *obj,
399 struct lttng_uri *uri)
400 {
401 int ret;
402 char tmp_path[PATH_MAX];
403 char hostname[HOST_NAME_MAX];
404 struct lttng_uri *dst_uri = NULL;
405
406 /* Code flow error safety net. */
407 assert(obj);
408 assert(uri);
409
410 switch (uri->stype) {
411 case LTTNG_STREAM_CONTROL:
412 dst_uri = &obj->dst.net.control;
413 obj->dst.net.control_isset = 1;
414 if (uri->port == 0) {
415 /* Assign default port. */
416 uri->port = DEFAULT_NETWORK_CONTROL_PORT;
417 }
418 DBG3("Consumer control URI set with port %d", uri->port);
419 break;
420 case LTTNG_STREAM_DATA:
421 dst_uri = &obj->dst.net.data;
422 obj->dst.net.data_isset = 1;
423 if (uri->port == 0) {
424 /* Assign default port. */
425 uri->port = DEFAULT_NETWORK_DATA_PORT;
426 }
427 DBG3("Consumer data URI set with port %d", uri->port);
428 break;
429 default:
430 ERR("Set network uri type unknown %d", uri->stype);
431 goto error;
432 }
433
434 ret = uri_compare(dst_uri, uri);
435 if (!ret) {
436 /* Same URI, don't touch it and return success. */
437 DBG3("URI network compare are the same");
438 goto equal;
439 }
440
441 /* URIs were not equal, replacing it. */
442 memset(dst_uri, 0, sizeof(struct lttng_uri));
443 memcpy(dst_uri, uri, sizeof(struct lttng_uri));
444 obj->type = CONSUMER_DST_NET;
445
446 /* Handle subdir and add hostname in front. */
447 if (dst_uri->stype == LTTNG_STREAM_CONTROL) {
448 /* Get hostname to append it in the pathname */
449 ret = gethostname(hostname, sizeof(hostname));
450 if (ret < 0) {
451 PERROR("gethostname. Fallback on default localhost");
452 strncpy(hostname, "localhost", sizeof(hostname));
453 }
454 hostname[sizeof(hostname) - 1] = '\0';
455
456 /* Setup consumer subdir if none present in the control URI */
457 if (strlen(dst_uri->subdir) == 0) {
458 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
459 hostname, obj->subdir);
460 } else {
461 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
462 hostname, dst_uri->subdir);
463 }
464 if (ret < 0) {
465 PERROR("snprintf set consumer uri subdir");
466 goto error;
467 }
468
469 strncpy(obj->subdir, tmp_path, sizeof(obj->subdir));
470 DBG3("Consumer set network uri subdir path %s", tmp_path);
471 }
472
473 return 0;
474 equal:
475 return 1;
476 error:
477 return -1;
478 }
479
480 /*
481 * Send file descriptor to consumer via sock.
482 */
483 int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd)
484 {
485 int ret;
486
487 assert(fds);
488 assert(sock);
489 assert(nb_fd > 0);
490
491 ret = lttcomm_send_fds_unix_sock(sock->fd, fds, nb_fd);
492 if (ret < 0) {
493 PERROR("send consumer fds");
494 goto error;
495 }
496
497 ret = consumer_recv_status_reply(sock);
498
499 error:
500 return ret;
501 }
502
503 /*
504 * Consumer send channel communication message structure to consumer.
505 */
506 int consumer_send_channel(struct consumer_socket *sock,
507 struct lttcomm_consumer_msg *msg)
508 {
509 int ret;
510
511 assert(msg);
512 assert(sock);
513 assert(sock->fd >= 0);
514
515 ret = lttcomm_send_unix_sock(sock->fd, msg,
516 sizeof(struct lttcomm_consumer_msg));
517 if (ret < 0) {
518 PERROR("send consumer channel");
519 goto error;
520 }
521
522 ret = consumer_recv_status_reply(sock);
523
524 error:
525 return ret;
526 }
527
528 /*
529 * Init channel communication message structure.
530 */
531 void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
532 enum lttng_consumer_command cmd,
533 int channel_key,
534 uint64_t max_sb_size,
535 uint64_t mmap_len,
536 const char *name,
537 unsigned int nb_init_streams)
538 {
539 assert(msg);
540
541 /* TODO: Args validation */
542
543 /* Zeroed structure */
544 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
545
546 /* Send channel */
547 msg->cmd_type = cmd;
548 msg->u.channel.channel_key = channel_key;
549 msg->u.channel.max_sb_size = max_sb_size;
550 msg->u.channel.mmap_len = mmap_len;
551 msg->u.channel.nb_init_streams = nb_init_streams;
552 }
553
554 /*
555 * Init stream communication message structure.
556 */
557 void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
558 enum lttng_consumer_command cmd,
559 int channel_key,
560 int stream_key,
561 uint32_t state,
562 enum lttng_event_output output,
563 uint64_t mmap_len,
564 uid_t uid,
565 gid_t gid,
566 int net_index,
567 unsigned int metadata_flag,
568 const char *name,
569 const char *pathname,
570 unsigned int session_id)
571 {
572 assert(msg);
573
574 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
575
576 /* TODO: Args validation */
577
578 msg->cmd_type = cmd;
579 msg->u.stream.channel_key = channel_key;
580 msg->u.stream.stream_key = stream_key;
581 msg->u.stream.state = state;
582 msg->u.stream.output = output;
583 msg->u.stream.mmap_len = mmap_len;
584 msg->u.stream.uid = uid;
585 msg->u.stream.gid = gid;
586 msg->u.stream.net_index = net_index;
587 msg->u.stream.metadata_flag = metadata_flag;
588 msg->u.stream.session_id = (uint64_t) session_id;
589 strncpy(msg->u.stream.name, name, sizeof(msg->u.stream.name));
590 msg->u.stream.name[sizeof(msg->u.stream.name) - 1] = '\0';
591 strncpy(msg->u.stream.path_name, pathname,
592 sizeof(msg->u.stream.path_name));
593 msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0';
594 }
595
596 /*
597 * Send stream communication structure to the consumer.
598 */
599 int consumer_send_stream(struct consumer_socket *sock,
600 struct consumer_output *dst, struct lttcomm_consumer_msg *msg,
601 int *fds, size_t nb_fd)
602 {
603 int ret;
604
605 assert(msg);
606 assert(dst);
607 assert(sock);
608
609 switch (dst->type) {
610 case CONSUMER_DST_NET:
611 /* Consumer should send the stream on the network. */
612 msg->u.stream.net_index = dst->net_seq_index;
613 break;
614 case CONSUMER_DST_LOCAL:
615 /* Add stream file name to stream path */
616 strncat(msg->u.stream.path_name, "/",
617 sizeof(msg->u.stream.path_name) -
618 strlen(msg->u.stream.path_name) - 1);
619 strncat(msg->u.stream.path_name, msg->u.stream.name,
620 sizeof(msg->u.stream.path_name) -
621 strlen(msg->u.stream.path_name) - 1);
622 msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0';
623 /* Indicate that the stream is NOT network */
624 msg->u.stream.net_index = -1;
625 break;
626 default:
627 ERR("Consumer unknown output type (%d)", dst->type);
628 ret = -1;
629 goto error;
630 }
631
632 /* Send on socket */
633 ret = lttcomm_send_unix_sock(sock->fd, msg,
634 sizeof(struct lttcomm_consumer_msg));
635 if (ret < 0) {
636 PERROR("send consumer stream");
637 goto error;
638 }
639
640 ret = consumer_recv_status_reply(sock);
641 if (ret < 0) {
642 goto error;
643 }
644
645 ret = consumer_send_fds(sock, fds, nb_fd);
646 if (ret < 0) {
647 goto error;
648 }
649
650 error:
651 return ret;
652 }
653
654 /*
655 * Send relayd socket to consumer associated with a session name.
656 *
657 * On success return positive value. On error, negative value.
658 */
659 int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
660 struct lttcomm_sock *sock, struct consumer_output *consumer,
661 enum lttng_stream_type type)
662 {
663 int ret;
664 struct lttcomm_consumer_msg msg;
665
666 /* Code flow error. Safety net. */
667 assert(sock);
668 assert(consumer);
669 assert(consumer_sock);
670
671 /* Bail out if consumer is disabled */
672 if (!consumer->enabled) {
673 ret = LTTNG_OK;
674 goto error;
675 }
676
677 msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET;
678 /*
679 * Assign network consumer output index using the temporary consumer since
680 * this call should only be made from within a set_consumer_uri() function
681 * call in the session daemon.
682 */
683 msg.u.relayd_sock.net_index = consumer->net_seq_index;
684 msg.u.relayd_sock.type = type;
685 memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock));
686
687 DBG3("Sending relayd sock info to consumer on %d", consumer_sock->fd);
688 ret = lttcomm_send_unix_sock(consumer_sock->fd, &msg, sizeof(msg));
689 if (ret < 0) {
690 PERROR("send consumer relayd socket info");
691 goto error;
692 }
693
694 ret = consumer_recv_status_reply(consumer_sock);
695 if (ret < 0) {
696 goto error;
697 }
698
699 DBG3("Sending relayd socket file descriptor to consumer");
700 ret = consumer_send_fds(consumer_sock, &sock->fd, 1);
701 if (ret < 0) {
702 goto error;
703 }
704
705 DBG2("Consumer relayd socket sent");
706
707 error:
708 return ret;
709 }
710
711 /*
712 * Set consumer subdirectory using the session name and a generated datetime if
713 * needed. This is appended to the current subdirectory.
714 */
715 int consumer_set_subdir(struct consumer_output *consumer,
716 const char *session_name)
717 {
718 int ret = 0;
719 unsigned int have_default_name = 0;
720 char datetime[16], tmp_path[PATH_MAX];
721 time_t rawtime;
722 struct tm *timeinfo;
723
724 assert(consumer);
725 assert(session_name);
726
727 memset(tmp_path, 0, sizeof(tmp_path));
728
729 /* Flag if we have a default session. */
730 if (strncmp(session_name, DEFAULT_SESSION_NAME "-",
731 strlen(DEFAULT_SESSION_NAME) + 1) == 0) {
732 have_default_name = 1;
733 } else {
734 /* Get date and time for session path */
735 time(&rawtime);
736 timeinfo = localtime(&rawtime);
737 strftime(datetime, sizeof(datetime), "%Y%m%d-%H%M%S", timeinfo);
738 }
739
740 if (have_default_name) {
741 ret = snprintf(tmp_path, sizeof(tmp_path),
742 "%s/%s", consumer->subdir, session_name);
743 } else {
744 ret = snprintf(tmp_path, sizeof(tmp_path),
745 "%s/%s-%s/", consumer->subdir, session_name, datetime);
746 }
747 if (ret < 0) {
748 PERROR("snprintf session name date");
749 goto error;
750 }
751
752 strncpy(consumer->subdir, tmp_path, sizeof(consumer->subdir));
753 DBG2("Consumer subdir set to %s", consumer->subdir);
754
755 error:
756 return ret;
757 }
758
759 /*
760 * Ask the consumer if the data is ready to read (NOT pending) for the specific
761 * session id.
762 *
763 * This function has a different behavior with the consumer i.e. that it waits
764 * for a reply from the consumer if yes or no the data is pending.
765 */
766 int consumer_is_data_pending(unsigned int id,
767 struct consumer_output *consumer)
768 {
769 int ret;
770 int32_t ret_code = 0; /* Default is that the data is NOT pending */
771 struct consumer_socket *socket;
772 struct lttng_ht_iter iter;
773 struct lttcomm_consumer_msg msg;
774
775 assert(consumer);
776
777 msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING;
778
779 msg.u.data_pending.session_id = (uint64_t) id;
780
781 DBG3("Consumer data pending for id %u", id);
782
783 /* Send command for each consumer */
784 cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
785 node.node) {
786 /* Code flow error */
787 assert(socket->fd >= 0);
788
789 pthread_mutex_lock(socket->lock);
790
791 ret = lttcomm_send_unix_sock(socket->fd, &msg, sizeof(msg));
792 if (ret < 0) {
793 PERROR("send consumer data pending command");
794 pthread_mutex_unlock(socket->lock);
795 goto error;
796 }
797
798 /*
799 * No need for a recv reply status because the answer to the command is
800 * the reply status message.
801 */
802
803 ret = lttcomm_recv_unix_sock(socket->fd, &ret_code, sizeof(ret_code));
804 if (ret < 0) {
805 PERROR("recv consumer data pending status");
806 pthread_mutex_unlock(socket->lock);
807 goto error;
808 }
809
810 pthread_mutex_unlock(socket->lock);
811
812 if (ret_code == 1) {
813 break;
814 }
815 }
816
817 DBG("Consumer data pending ret %d", ret_code);
818 return ret_code;
819
820 error:
821 return -1;
822 }
This page took 0.045394 seconds and 4 git commands to generate.