bin: compile lttng-consumerd as a C++
[lttng-tools.git] / src / lib / lttng-ctl / channel.c
CommitLineData
a58c490f 1/*
ab5be9fa 2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
a58c490f 3 *
ab5be9fa 4 * SPDX-License-Identifier: LGPL-2.1-only
a58c490f 5 *
a58c490f
JG
6 */
7
8#include <lttng/notification/notification-internal.h>
9#include <lttng/notification/channel-internal.h>
10#include <lttng/condition/condition-internal.h>
11#include <lttng/endpoint.h>
12#include <common/defaults.h>
13#include <common/error.h>
14#include <common/dynamic-buffer.h>
15#include <common/utils.h>
16#include <common/defaults.h>
882093ee
JR
17#include <common/payload.h>
18#include <common/payload-view.h>
19#include <common/unix.h>
a58c490f 20#include "lttng-ctl-helper.h"
d977a743 21#include <common/compat/poll.h>
a58c490f
JG
22
23static
24int handshake(struct lttng_notification_channel *channel);
25
26/*
27 * Populates the reception buffer with the next complete message.
1d757b1c 28 * The caller must acquire the channel's lock.
a58c490f
JG
29 */
30static
31int receive_message(struct lttng_notification_channel *channel)
32{
33 ssize_t ret;
34 struct lttng_notification_channel_message msg;
35
882093ee 36 lttng_payload_clear(&channel->reception_payload);
a58c490f
JG
37
38 ret = lttcomm_recv_unix_sock(channel->socket, &msg, sizeof(msg));
39 if (ret <= 0) {
40 ret = -1;
41 goto error;
42 }
43
44 if (msg.size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
45 ret = -1;
46 goto error;
47 }
48
49 /* Add message header at buffer's start. */
882093ee 50 ret = lttng_dynamic_buffer_append(&channel->reception_payload.buffer, &msg,
a58c490f
JG
51 sizeof(msg));
52 if (ret) {
53 goto error;
54 }
55
56 /* Reserve space for the payload. */
882093ee
JR
57 ret = lttng_dynamic_buffer_set_size(&channel->reception_payload.buffer,
58 channel->reception_payload.buffer.size + msg.size);
a58c490f
JG
59 if (ret) {
60 goto error;
61 }
62
63 /* Receive message payload. */
64 ret = lttcomm_recv_unix_sock(channel->socket,
882093ee 65 channel->reception_payload.buffer.data + sizeof(msg), msg.size);
a58c490f
JG
66 if (ret < (ssize_t) msg.size) {
67 ret = -1;
68 goto error;
69 }
882093ee
JR
70
71 /* Receive message fds. */
72 if (msg.fds != 0) {
73 ret = lttcomm_recv_payload_fds_unix_sock(channel->socket,
74 msg.fds, &channel->reception_payload);
75 if (ret < sizeof(int) * msg.fds) {
76 ret = -1;
77 goto error;
78 }
79 }
a58c490f
JG
80 ret = 0;
81end:
82 return ret;
83error:
882093ee 84 lttng_payload_clear(&channel->reception_payload);
a58c490f
JG
85 goto end;
86}
87
88static
89enum lttng_notification_channel_message_type get_current_message_type(
90 struct lttng_notification_channel *channel)
91{
92 struct lttng_notification_channel_message *msg;
93
a0377dfe 94 LTTNG_ASSERT(channel->reception_payload.buffer.size >= sizeof(*msg));
a58c490f
JG
95
96 msg = (struct lttng_notification_channel_message *)
882093ee 97 channel->reception_payload.buffer.data;
a58c490f
JG
98 return (enum lttng_notification_channel_message_type) msg->type;
99}
100
101static
102struct lttng_notification *create_notification_from_current_message(
103 struct lttng_notification_channel *channel)
104{
105 ssize_t ret;
106 struct lttng_notification *notification = NULL;
a58c490f 107
882093ee 108 if (channel->reception_payload.buffer.size <=
a58c490f
JG
109 sizeof(struct lttng_notification_channel_message)) {
110 goto end;
111 }
112
c0a66c84 113 {
882093ee
JR
114 struct lttng_payload_view view = lttng_payload_view_from_payload(
115 &channel->reception_payload,
c0a66c84
JG
116 sizeof(struct lttng_notification_channel_message),
117 -1);
118
119 ret = lttng_notification_create_from_payload(
120 &view, &notification);
121 }
a58c490f 122
882093ee 123 if (ret != channel->reception_payload.buffer.size -
a58c490f
JG
124 sizeof(struct lttng_notification_channel_message)) {
125 lttng_notification_destroy(notification);
126 notification = NULL;
127 goto end;
128 }
129end:
130 return notification;
131}
132
133struct lttng_notification_channel *lttng_notification_channel_create(
134 struct lttng_endpoint *endpoint)
135{
136 int fd, ret;
137 bool is_in_tracing_group = false, is_root = false;
138 char *sock_path = NULL;
139 struct lttng_notification_channel *channel = NULL;
140
141 if (!endpoint ||
142 endpoint != lttng_session_daemon_notification_endpoint) {
143 goto end;
144 }
145
146 sock_path = zmalloc(LTTNG_PATH_MAX);
147 if (!sock_path) {
148 goto end;
149 }
150
151 channel = zmalloc(sizeof(struct lttng_notification_channel));
152 if (!channel) {
153 goto end;
154 }
155 channel->socket = -1;
156 pthread_mutex_init(&channel->lock, NULL);
882093ee 157 lttng_payload_init(&channel->reception_payload);
a58c490f
JG
158 CDS_INIT_LIST_HEAD(&channel->pending_notifications.list);
159
160 is_root = (getuid() == 0);
161 if (!is_root) {
162 is_in_tracing_group = lttng_check_tracing_group();
163 }
164
165 if (is_root || is_in_tracing_group) {
e1b624d0 166 ret = lttng_strncpy(sock_path,
a58c490f
JG
167 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK,
168 LTTNG_PATH_MAX);
e1b624d0
JG
169 if (ret) {
170 ret = -LTTNG_ERR_INVALID;
171 goto error;
172 }
173
a58c490f
JG
174 ret = lttcomm_connect_unix_sock(sock_path);
175 if (ret >= 0) {
176 fd = ret;
177 goto set_fd;
178 }
179 }
180
181 /* Fallback to local session daemon. */
182 ret = snprintf(sock_path, LTTNG_PATH_MAX,
183 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK,
184 utils_get_home_dir());
185 if (ret < 0 || ret >= LTTNG_PATH_MAX) {
186 goto error;
187 }
188
189 ret = lttcomm_connect_unix_sock(sock_path);
190 if (ret < 0) {
191 goto error;
192 }
193 fd = ret;
194
195set_fd:
196 channel->socket = fd;
197
198 ret = handshake(channel);
199 if (ret) {
200 goto error;
201 }
202end:
203 free(sock_path);
204 return channel;
205error:
206 lttng_notification_channel_destroy(channel);
207 channel = NULL;
208 goto end;
209}
210
211enum lttng_notification_channel_status
212lttng_notification_channel_get_next_notification(
213 struct lttng_notification_channel *channel,
214 struct lttng_notification **_notification)
215{
216 int ret;
217 struct lttng_notification *notification = NULL;
218 enum lttng_notification_channel_status status =
219 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
d977a743 220 struct lttng_poll_event events;
a58c490f
JG
221
222 if (!channel || !_notification) {
223 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
224 goto end;
225 }
226
94a61469
JG
227 pthread_mutex_lock(&channel->lock);
228
a58c490f
JG
229 if (channel->pending_notifications.count) {
230 struct pending_notification *pending_notification;
231
a0377dfe 232 LTTNG_ASSERT(!cds_list_empty(&channel->pending_notifications.list));
a58c490f
JG
233
234 /* Deliver one of the pending notifications. */
235 pending_notification = cds_list_first_entry(
236 &channel->pending_notifications.list,
237 struct pending_notification,
238 node);
239 notification = pending_notification->notification;
240 if (!notification) {
241 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED;
242 }
243 cds_list_del(&pending_notification->node);
244 channel->pending_notifications.count--;
245 free(pending_notification);
94a61469 246 goto end_unlock;
f83bcd90
JG
247 }
248
249 /*
d977a743
MD
250 * Block on interruptible epoll/poll() instead of the message reception
251 * itself as the recvmsg() wrappers always restart on EINTR. We choose
252 * to wait using interruptible epoll/poll() in order to:
f83bcd90
JG
253 * 1) Return if a signal occurs,
254 * 2) Not deal with partially received messages.
255 *
256 * The drawback to this approach is that we assume that messages
257 * are complete/well formed. If a message is shorter than its
258 * announced length, receive_message() will block on recvmsg()
259 * and never return (even if a signal is received).
260 */
d977a743
MD
261 ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC);
262 if (ret < 0) {
263 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
264 goto end_unlock;
265 }
266 ret = lttng_poll_add(&events, channel->socket, LPOLLIN | LPOLLERR);
267 if (ret < 0) {
268 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
269 goto end_clean_poll;
270 }
271 ret = lttng_poll_wait_interruptible(&events, -1);
272 if (ret <= 0) {
273 status = (ret == -1 && errno == EINTR) ?
f83bcd90
JG
274 LTTNG_NOTIFICATION_CHANNEL_STATUS_INTERRUPTED :
275 LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
d977a743 276 goto end_clean_poll;
a58c490f
JG
277 }
278
a58c490f
JG
279 ret = receive_message(channel);
280 if (ret) {
281 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
d977a743 282 goto end_clean_poll;
a58c490f
JG
283 }
284
285 switch (get_current_message_type(channel)) {
286 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
287 notification = create_notification_from_current_message(
288 channel);
289 if (!notification) {
290 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
d977a743 291 goto end_clean_poll;
a58c490f
JG
292 }
293 break;
294 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
295 /* No payload to consume. */
296 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED;
297 break;
298 default:
299 /* Protocol error. */
300 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
d977a743 301 goto end_clean_poll;
a58c490f
JG
302 }
303
d977a743
MD
304end_clean_poll:
305 lttng_poll_clean(&events);
a58c490f
JG
306end_unlock:
307 pthread_mutex_unlock(&channel->lock);
a57a7f22 308 *_notification = notification;
a58c490f 309end:
a58c490f
JG
310 return status;
311}
312
313static
314int enqueue_dropped_notification(
315 struct lttng_notification_channel *channel)
316{
317 int ret = 0;
318 struct pending_notification *pending_notification;
319 struct cds_list_head *last_element =
320 channel->pending_notifications.list.prev;
321
322 pending_notification = caa_container_of(last_element,
323 struct pending_notification, node);
324 if (!pending_notification->notification) {
325 /*
326 * The last enqueued notification indicates dropped
327 * notifications; there is nothing to do as we group
328 * dropped notifications together.
329 */
330 goto end;
331 }
332
333 if (channel->pending_notifications.count >=
334 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT &&
335 pending_notification->notification) {
336 /*
337 * Discard the last enqueued notification to indicate
338 * that notifications were dropped at this point.
339 */
340 lttng_notification_destroy(
341 pending_notification->notification);
342 pending_notification->notification = NULL;
343 goto end;
344 }
345
346 pending_notification = zmalloc(sizeof(*pending_notification));
347 if (!pending_notification) {
348 ret = -1;
349 goto end;
350 }
351 CDS_INIT_LIST_HEAD(&pending_notification->node);
352 cds_list_add(&pending_notification->node,
353 &channel->pending_notifications.list);
354 channel->pending_notifications.count++;
355end:
356 return ret;
357}
358
359static
360int enqueue_notification_from_current_message(
361 struct lttng_notification_channel *channel)
362{
363 int ret = 0;
364 struct lttng_notification *notification;
365 struct pending_notification *pending_notification;
366
367 if (channel->pending_notifications.count >=
368 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT) {
369 /* Drop the notification. */
370 ret = enqueue_dropped_notification(channel);
371 goto end;
372 }
373
374 pending_notification = zmalloc(sizeof(*pending_notification));
375 if (!pending_notification) {
376 ret = -1;
377 goto error;
378 }
379 CDS_INIT_LIST_HEAD(&pending_notification->node);
380
381 notification = create_notification_from_current_message(channel);
382 if (!notification) {
383 ret = -1;
384 goto error;
385 }
386
387 pending_notification->notification = notification;
388 cds_list_add(&pending_notification->node,
389 &channel->pending_notifications.list);
390 channel->pending_notifications.count++;
391end:
392 return ret;
393error:
394 free(pending_notification);
395 goto end;
396}
397
1d757b1c
JG
398enum lttng_notification_channel_status
399lttng_notification_channel_has_pending_notification(
400 struct lttng_notification_channel *channel,
401 bool *_notification_pending)
402{
403 int ret;
404 enum lttng_notification_channel_status status =
405 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
d977a743 406 struct lttng_poll_event events;
1d757b1c
JG
407
408 if (!channel || !_notification_pending) {
409 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
410 goto end;
411 }
412
413 pthread_mutex_lock(&channel->lock);
414
415 if (channel->pending_notifications.count) {
416 *_notification_pending = true;
417 goto end_unlock;
418 }
419
420 if (channel->socket < 0) {
421 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED;
422 goto end_unlock;
423 }
424
425 /*
426 * Check, without blocking, if data is available on the channel's
427 * socket. If there is data available, it is safe to read (blocking)
428 * on the socket for a message from the session daemon.
429 *
430 * Since all commands wait for the session daemon's reply before
431 * releasing the channel's lock, the protocol only allows for
432 * notifications and "notification dropped" messages to come
433 * through. If we receive a different message type, it is
434 * considered a protocol error.
435 *
436 * Note that this function is not guaranteed not to block. This
437 * will block until our peer (the session daemon) has sent a complete
438 * message if we see data available on the socket. If the peer does
439 * not respect the protocol, this may block indefinitely.
440 */
d977a743
MD
441 ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC);
442 if (ret < 0) {
443 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
444 goto end_unlock;
445 }
446 ret = lttng_poll_add(&events, channel->socket, LPOLLIN | LPOLLERR);
447 if (ret < 0) {
448 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
449 goto end_clean_poll;
450 }
451 /* timeout = 0: return immediately. */
452 ret = lttng_poll_wait_interruptible(&events, 0);
1d757b1c
JG
453 if (ret == 0) {
454 /* No data available. */
455 *_notification_pending = false;
d977a743 456 goto end_clean_poll;
1d757b1c
JG
457 } else if (ret < 0) {
458 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
d977a743 459 goto end_clean_poll;
1d757b1c
JG
460 }
461
462 /* Data available on socket. */
463 ret = receive_message(channel);
464 if (ret) {
465 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
d977a743 466 goto end_clean_poll;
1d757b1c
JG
467 }
468
469 switch (get_current_message_type(channel)) {
470 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
471 ret = enqueue_notification_from_current_message(channel);
472 if (ret) {
d977a743 473 goto end_clean_poll;
1d757b1c
JG
474 }
475 *_notification_pending = true;
476 break;
477 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
478 ret = enqueue_dropped_notification(channel);
479 if (ret) {
d977a743 480 goto end_clean_poll;
1d757b1c
JG
481 }
482 *_notification_pending = true;
483 break;
484 default:
485 /* Protocol error. */
486 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
d977a743 487 goto end_clean_poll;
1d757b1c
JG
488 }
489
d977a743
MD
490end_clean_poll:
491 lttng_poll_clean(&events);
1d757b1c
JG
492end_unlock:
493 pthread_mutex_unlock(&channel->lock);
494end:
495 return status;
496}
497
a58c490f
JG
498static
499int receive_command_reply(struct lttng_notification_channel *channel,
500 enum lttng_notification_channel_status *status)
501{
502 int ret;
503 struct lttng_notification_channel_command_reply *reply;
504
505 while (true) {
506 enum lttng_notification_channel_message_type msg_type;
507
508 ret = receive_message(channel);
509 if (ret) {
510 goto end;
511 }
512
513 msg_type = get_current_message_type(channel);
514 switch (msg_type) {
515 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY:
516 goto exit_loop;
517 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
518 ret = enqueue_notification_from_current_message(
519 channel);
520 if (ret) {
521 goto end;
522 }
523 break;
524 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
525 ret = enqueue_dropped_notification(channel);
526 if (ret) {
527 goto end;
528 }
529 break;
530 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
531 {
532 struct lttng_notification_channel_command_handshake *handshake;
533
534 handshake = (struct lttng_notification_channel_command_handshake *)
882093ee 535 (channel->reception_payload.buffer.data +
a58c490f
JG
536 sizeof(struct lttng_notification_channel_message));
537 channel->version.major = handshake->major;
538 channel->version.minor = handshake->minor;
539 channel->version.set = true;
540 break;
541 }
542 default:
543 ret = -1;
544 goto end;
545 }
546 }
547
548exit_loop:
882093ee 549 if (channel->reception_payload.buffer.size <
a58c490f
JG
550 (sizeof(struct lttng_notification_channel_message) +
551 sizeof(*reply))) {
552 /* Invalid message received. */
553 ret = -1;
554 goto end;
555 }
556
557 reply = (struct lttng_notification_channel_command_reply *)
882093ee 558 (channel->reception_payload.buffer.data +
a58c490f
JG
559 sizeof(struct lttng_notification_channel_message));
560 *status = (enum lttng_notification_channel_status) reply->status;
561end:
562 return ret;
563}
564
565static
566int handshake(struct lttng_notification_channel *channel)
567{
568 ssize_t ret;
569 enum lttng_notification_channel_status status =
570 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
571 struct lttng_notification_channel_command_handshake handshake = {
572 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
573 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
574 };
575 struct lttng_notification_channel_message msg_header = {
576 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
577 .size = sizeof(handshake),
578 };
579 char send_buffer[sizeof(msg_header) + sizeof(handshake)];
580
581 memcpy(send_buffer, &msg_header, sizeof(msg_header));
582 memcpy(send_buffer + sizeof(msg_header), &handshake, sizeof(handshake));
583
584 pthread_mutex_lock(&channel->lock);
585
01ea340e 586 ret = lttcomm_send_creds_unix_sock(channel->socket, send_buffer,
a58c490f
JG
587 sizeof(send_buffer));
588 if (ret < 0) {
589 goto end_unlock;
590 }
591
592 /* Receive handshake info from the sessiond. */
593 ret = receive_command_reply(channel, &status);
594 if (ret < 0) {
595 goto end_unlock;
596 }
597
598 if (!channel->version.set) {
599 ret = -1;
600 goto end_unlock;
601 }
602
603 if (channel->version.major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
604 ret = -1;
605 goto end_unlock;
606 }
607
608end_unlock:
609 pthread_mutex_unlock(&channel->lock);
610 return ret;
611}
612
613static
614enum lttng_notification_channel_status send_condition_command(
615 struct lttng_notification_channel *channel,
616 enum lttng_notification_channel_message_type type,
617 const struct lttng_condition *condition)
618{
619 int socket;
3647288f 620 ssize_t ret;
a58c490f
JG
621 enum lttng_notification_channel_status status =
622 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
c0a66c84 623 struct lttng_payload payload;
3647288f
JG
624 struct lttng_notification_channel_message cmd_header = {
625 .type = (int8_t) type,
a58c490f
JG
626 };
627
c0a66c84 628 lttng_payload_init(&payload);
3647288f 629
a58c490f
JG
630 if (!channel) {
631 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
632 goto end;
633 }
634
a0377dfe 635 LTTNG_ASSERT(type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE ||
a58c490f
JG
636 type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE);
637
638 pthread_mutex_lock(&channel->lock);
639 socket = channel->socket;
882093ee 640
a58c490f
JG
641 if (!lttng_condition_validate(condition)) {
642 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
643 goto end_unlock;
644 }
645
c0a66c84 646 ret = lttng_dynamic_buffer_append(&payload.buffer, &cmd_header,
3647288f
JG
647 sizeof(cmd_header));
648 if (ret) {
649 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
a58c490f
JG
650 goto end_unlock;
651 }
652
c0a66c84 653 ret = lttng_condition_serialize(condition, &payload);
3647288f
JG
654 if (ret) {
655 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
a58c490f
JG
656 goto end_unlock;
657 }
658
3647288f 659 /* Update payload length. */
c0a66c84
JG
660 ((struct lttng_notification_channel_message *) payload.buffer.data)->size =
661 (uint32_t) (payload.buffer.size - sizeof(cmd_header));
3647288f 662
882093ee
JR
663 {
664 struct lttng_payload_view pv =
665 lttng_payload_view_from_payload(
666 &payload, 0, -1);
667 const int fd_count =
668 lttng_payload_view_get_fd_handle_count(&pv);
669
670 /* Update fd count. */
671 ((struct lttng_notification_channel_message *) payload.buffer.data)->fds =
672 (uint32_t) fd_count;
673
674 ret = lttcomm_send_unix_sock(
675 socket, pv.buffer.data, pv.buffer.size);
676 if (ret < 0) {
677 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
678 goto end_unlock;
679 }
680
681 /* Pass fds if present. */
682 if (fd_count > 0) {
683 ret = lttcomm_send_payload_view_fds_unix_sock(socket,
684 &pv);
685 if (ret < 0) {
686 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
687 goto end_unlock;
688 }
689 }
a58c490f
JG
690 }
691
692 ret = receive_command_reply(channel, &status);
693 if (ret < 0) {
694 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
695 goto end_unlock;
696 }
697end_unlock:
698 pthread_mutex_unlock(&channel->lock);
699end:
c0a66c84 700 lttng_payload_reset(&payload);
a58c490f
JG
701 return status;
702}
703
704enum lttng_notification_channel_status lttng_notification_channel_subscribe(
705 struct lttng_notification_channel *channel,
706 const struct lttng_condition *condition)
707{
708 return send_condition_command(channel,
709 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE,
710 condition);
711}
712
713enum lttng_notification_channel_status lttng_notification_channel_unsubscribe(
714 struct lttng_notification_channel *channel,
715 const struct lttng_condition *condition)
716{
717 return send_condition_command(channel,
718 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE,
719 condition);
720}
721
722void lttng_notification_channel_destroy(
723 struct lttng_notification_channel *channel)
724{
725 if (!channel) {
726 return;
727 }
728
729 if (channel->socket >= 0) {
730 (void) lttcomm_close_unix_sock(channel->socket);
731 }
732 pthread_mutex_destroy(&channel->lock);
882093ee 733 lttng_payload_reset(&channel->reception_payload);
a58c490f
JG
734 free(channel);
735}
This page took 0.064173 seconds and 4 git commands to generate.