75a911f2af3ec7080085c648792d23393f82ad1c
[lttng-tools.git] / src / lib / lttng-ctl / channel.c
1 /*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This library is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU Lesser General Public License, version 2.1 only,
6 * as published by the Free Software Foundation.
7 *
8 * This library is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
11 * for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this library; if not, write to the Free Software Foundation,
15 * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17
18 #include <lttng/notification/notification-internal.h>
19 #include <lttng/notification/channel-internal.h>
20 #include <lttng/condition/condition-internal.h>
21 #include <lttng/endpoint.h>
22 #include <common/defaults.h>
23 #include <common/error.h>
24 #include <common/dynamic-buffer.h>
25 #include <common/utils.h>
26 #include <common/defaults.h>
27 #include <assert.h>
28 #include "lttng-ctl-helper.h"
29
30 static
31 int handshake(struct lttng_notification_channel *channel);
32
33 /*
34 * Populates the reception buffer with the next complete message.
35 * The caller must acquire the client's lock.
36 */
37 static
38 int receive_message(struct lttng_notification_channel *channel)
39 {
40 ssize_t ret;
41 struct lttng_notification_channel_message msg;
42
43 ret = lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0);
44 if (ret) {
45 goto error;
46 }
47
48 ret = lttcomm_recv_unix_sock(channel->socket, &msg, sizeof(msg));
49 if (ret <= 0) {
50 ret = -1;
51 goto error;
52 }
53
54 if (msg.size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
55 ret = -1;
56 goto error;
57 }
58
59 /* Add message header at buffer's start. */
60 ret = lttng_dynamic_buffer_append(&channel->reception_buffer, &msg,
61 sizeof(msg));
62 if (ret) {
63 goto error;
64 }
65
66 /* Reserve space for the payload. */
67 ret = lttng_dynamic_buffer_set_size(&channel->reception_buffer,
68 channel->reception_buffer.size + msg.size);
69 if (ret) {
70 goto error;
71 }
72
73 /* Receive message payload. */
74 ret = lttcomm_recv_unix_sock(channel->socket,
75 channel->reception_buffer.data + sizeof(msg), msg.size);
76 if (ret < (ssize_t) msg.size) {
77 ret = -1;
78 goto error;
79 }
80 ret = 0;
81 end:
82 return ret;
83 error:
84 lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0);
85 goto end;
86 }
87
88 static
89 enum lttng_notification_channel_message_type get_current_message_type(
90 struct lttng_notification_channel *channel)
91 {
92 struct lttng_notification_channel_message *msg;
93
94 assert(channel->reception_buffer.size >= sizeof(*msg));
95
96 msg = (struct lttng_notification_channel_message *)
97 channel->reception_buffer.data;
98 return (enum lttng_notification_channel_message_type) msg->type;
99 }
100
101 static
102 struct lttng_notification *create_notification_from_current_message(
103 struct lttng_notification_channel *channel)
104 {
105 ssize_t ret;
106 struct lttng_notification *notification = NULL;
107 struct lttng_buffer_view view;
108
109 if (channel->reception_buffer.size <=
110 sizeof(struct lttng_notification_channel_message)) {
111 goto end;
112 }
113
114 view = lttng_buffer_view_from_dynamic_buffer(&channel->reception_buffer,
115 sizeof(struct lttng_notification_channel_message), -1);
116
117 ret = lttng_notification_create_from_buffer(&view, &notification);
118 if (ret != channel->reception_buffer.size -
119 sizeof(struct lttng_notification_channel_message)) {
120 lttng_notification_destroy(notification);
121 notification = NULL;
122 goto end;
123 }
124 end:
125 return notification;
126 }
127
128 struct lttng_notification_channel *lttng_notification_channel_create(
129 struct lttng_endpoint *endpoint)
130 {
131 int fd, ret;
132 bool is_in_tracing_group = false, is_root = false;
133 char *sock_path = NULL;
134 struct lttng_notification_channel *channel = NULL;
135
136 if (!endpoint ||
137 endpoint != lttng_session_daemon_notification_endpoint) {
138 goto end;
139 }
140
141 sock_path = zmalloc(LTTNG_PATH_MAX);
142 if (!sock_path) {
143 goto end;
144 }
145
146 channel = zmalloc(sizeof(struct lttng_notification_channel));
147 if (!channel) {
148 goto end;
149 }
150 channel->socket = -1;
151 pthread_mutex_init(&channel->lock, NULL);
152 lttng_dynamic_buffer_init(&channel->reception_buffer);
153 CDS_INIT_LIST_HEAD(&channel->pending_notifications.list);
154
155 is_root = (getuid() == 0);
156 if (!is_root) {
157 is_in_tracing_group = lttng_check_tracing_group();
158 }
159
160 if (is_root || is_in_tracing_group) {
161 lttng_ctl_copy_string(sock_path,
162 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK,
163 LTTNG_PATH_MAX);
164 ret = lttcomm_connect_unix_sock(sock_path);
165 if (ret >= 0) {
166 fd = ret;
167 goto set_fd;
168 }
169 }
170
171 /* Fallback to local session daemon. */
172 ret = snprintf(sock_path, LTTNG_PATH_MAX,
173 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK,
174 utils_get_home_dir());
175 if (ret < 0 || ret >= LTTNG_PATH_MAX) {
176 goto error;
177 }
178
179 ret = lttcomm_connect_unix_sock(sock_path);
180 if (ret < 0) {
181 goto error;
182 }
183 fd = ret;
184
185 set_fd:
186 channel->socket = fd;
187
188 ret = handshake(channel);
189 if (ret) {
190 goto error;
191 }
192 end:
193 free(sock_path);
194 return channel;
195 error:
196 lttng_notification_channel_destroy(channel);
197 channel = NULL;
198 goto end;
199 }
200
201 enum lttng_notification_channel_status
202 lttng_notification_channel_get_next_notification(
203 struct lttng_notification_channel *channel,
204 struct lttng_notification **_notification)
205 {
206 int ret;
207 struct lttng_notification *notification = NULL;
208 enum lttng_notification_channel_status status =
209 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
210
211 if (!channel || !_notification) {
212 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
213 goto end;
214 }
215
216 if (channel->pending_notifications.count) {
217 struct pending_notification *pending_notification;
218
219 assert(!cds_list_empty(&channel->pending_notifications.list));
220
221 /* Deliver one of the pending notifications. */
222 pending_notification = cds_list_first_entry(
223 &channel->pending_notifications.list,
224 struct pending_notification,
225 node);
226 notification = pending_notification->notification;
227 if (!notification) {
228 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED;
229 }
230 cds_list_del(&pending_notification->node);
231 channel->pending_notifications.count--;
232 free(pending_notification);
233 goto end;
234 }
235
236 pthread_mutex_lock(&channel->lock);
237
238 ret = receive_message(channel);
239 if (ret) {
240 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
241 goto end_unlock;
242 }
243
244 switch (get_current_message_type(channel)) {
245 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
246 notification = create_notification_from_current_message(
247 channel);
248 if (!notification) {
249 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
250 goto end_unlock;
251 }
252 break;
253 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
254 /* No payload to consume. */
255 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED;
256 break;
257 default:
258 /* Protocol error. */
259 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
260 goto end_unlock;
261 }
262
263 end_unlock:
264 pthread_mutex_unlock(&channel->lock);
265 end:
266 if (_notification) {
267 *_notification = notification;
268 }
269 return status;
270 }
271
272 static
273 int enqueue_dropped_notification(
274 struct lttng_notification_channel *channel)
275 {
276 int ret = 0;
277 struct pending_notification *pending_notification;
278 struct cds_list_head *last_element =
279 channel->pending_notifications.list.prev;
280
281 pending_notification = caa_container_of(last_element,
282 struct pending_notification, node);
283 if (!pending_notification->notification) {
284 /*
285 * The last enqueued notification indicates dropped
286 * notifications; there is nothing to do as we group
287 * dropped notifications together.
288 */
289 goto end;
290 }
291
292 if (channel->pending_notifications.count >=
293 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT &&
294 pending_notification->notification) {
295 /*
296 * Discard the last enqueued notification to indicate
297 * that notifications were dropped at this point.
298 */
299 lttng_notification_destroy(
300 pending_notification->notification);
301 pending_notification->notification = NULL;
302 goto end;
303 }
304
305 pending_notification = zmalloc(sizeof(*pending_notification));
306 if (!pending_notification) {
307 ret = -1;
308 goto end;
309 }
310 CDS_INIT_LIST_HEAD(&pending_notification->node);
311 cds_list_add(&pending_notification->node,
312 &channel->pending_notifications.list);
313 channel->pending_notifications.count++;
314 end:
315 return ret;
316 }
317
318 static
319 int enqueue_notification_from_current_message(
320 struct lttng_notification_channel *channel)
321 {
322 int ret = 0;
323 struct lttng_notification *notification;
324 struct pending_notification *pending_notification;
325
326 if (channel->pending_notifications.count >=
327 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT) {
328 /* Drop the notification. */
329 ret = enqueue_dropped_notification(channel);
330 goto end;
331 }
332
333 pending_notification = zmalloc(sizeof(*pending_notification));
334 if (!pending_notification) {
335 ret = -1;
336 goto error;
337 }
338 CDS_INIT_LIST_HEAD(&pending_notification->node);
339
340 notification = create_notification_from_current_message(channel);
341 if (!notification) {
342 ret = -1;
343 goto error;
344 }
345
346 pending_notification->notification = notification;
347 cds_list_add(&pending_notification->node,
348 &channel->pending_notifications.list);
349 channel->pending_notifications.count++;
350 end:
351 return ret;
352 error:
353 free(pending_notification);
354 goto end;
355 }
356
357 static
358 int receive_command_reply(struct lttng_notification_channel *channel,
359 enum lttng_notification_channel_status *status)
360 {
361 int ret;
362 struct lttng_notification_channel_command_reply *reply;
363
364 while (true) {
365 enum lttng_notification_channel_message_type msg_type;
366
367 ret = receive_message(channel);
368 if (ret) {
369 goto end;
370 }
371
372 msg_type = get_current_message_type(channel);
373 switch (msg_type) {
374 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY:
375 goto exit_loop;
376 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
377 ret = enqueue_notification_from_current_message(
378 channel);
379 if (ret) {
380 goto end;
381 }
382 break;
383 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
384 ret = enqueue_dropped_notification(channel);
385 if (ret) {
386 goto end;
387 }
388 break;
389 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
390 {
391 struct lttng_notification_channel_command_handshake *handshake;
392
393 handshake = (struct lttng_notification_channel_command_handshake *)
394 (channel->reception_buffer.data +
395 sizeof(struct lttng_notification_channel_message));
396 channel->version.major = handshake->major;
397 channel->version.minor = handshake->minor;
398 channel->version.set = true;
399 break;
400 }
401 default:
402 ret = -1;
403 goto end;
404 }
405 }
406
407 exit_loop:
408 if (channel->reception_buffer.size <
409 (sizeof(struct lttng_notification_channel_message) +
410 sizeof(*reply))) {
411 /* Invalid message received. */
412 ret = -1;
413 goto end;
414 }
415
416 reply = (struct lttng_notification_channel_command_reply *)
417 (channel->reception_buffer.data +
418 sizeof(struct lttng_notification_channel_message));
419 *status = (enum lttng_notification_channel_status) reply->status;
420 end:
421 return ret;
422 }
423
424 static
425 int handshake(struct lttng_notification_channel *channel)
426 {
427 ssize_t ret;
428 enum lttng_notification_channel_status status =
429 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
430 struct lttng_notification_channel_command_handshake handshake = {
431 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
432 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
433 };
434 struct lttng_notification_channel_message msg_header = {
435 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
436 .size = sizeof(handshake),
437 };
438 char send_buffer[sizeof(msg_header) + sizeof(handshake)];
439
440 memcpy(send_buffer, &msg_header, sizeof(msg_header));
441 memcpy(send_buffer + sizeof(msg_header), &handshake, sizeof(handshake));
442
443 pthread_mutex_lock(&channel->lock);
444
445 ret = lttcomm_send_unix_sock(channel->socket, send_buffer,
446 sizeof(send_buffer));
447 if (ret < 0) {
448 goto end_unlock;
449 }
450
451 /* Receive handshake info from the sessiond. */
452 ret = receive_command_reply(channel, &status);
453 if (ret < 0) {
454 goto end_unlock;
455 }
456
457 if (!channel->version.set) {
458 ret = -1;
459 goto end_unlock;
460 }
461
462 if (channel->version.major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
463 ret = -1;
464 goto end_unlock;
465 }
466
467 end_unlock:
468 pthread_mutex_unlock(&channel->lock);
469 return ret;
470 }
471
472 static
473 enum lttng_notification_channel_status send_condition_command(
474 struct lttng_notification_channel *channel,
475 enum lttng_notification_channel_message_type type,
476 const struct lttng_condition *condition)
477 {
478 int socket;
479 ssize_t command_size, ret;
480 enum lttng_notification_channel_status status =
481 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
482 char *command_buffer = NULL;
483 struct lttng_notification_channel_message cmd_message = {
484 .type = type,
485 };
486
487 if (!channel) {
488 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
489 goto end;
490 }
491
492 assert(type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE ||
493 type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE);
494
495 pthread_mutex_lock(&channel->lock);
496 socket = channel->socket;
497 if (!lttng_condition_validate(condition)) {
498 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
499 goto end_unlock;
500 }
501
502 ret = lttng_condition_serialize(condition, NULL);
503 if (ret < 0) {
504 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
505 goto end_unlock;
506 }
507 assert(ret < UINT32_MAX);
508 cmd_message.size = (uint32_t) ret;
509 command_size = ret + sizeof(
510 struct lttng_notification_channel_message);
511 command_buffer = zmalloc(command_size);
512 if (!command_buffer) {
513 goto end_unlock;
514 }
515
516 memcpy(command_buffer, &cmd_message, sizeof(cmd_message));
517 ret = lttng_condition_serialize(condition,
518 command_buffer + sizeof(cmd_message));
519 if (ret < 0) {
520 goto end_unlock;
521 }
522
523 ret = lttcomm_send_unix_sock(socket, command_buffer, command_size);
524 if (ret < 0) {
525 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
526 goto end_unlock;
527 }
528
529 ret = receive_command_reply(channel, &status);
530 if (ret < 0) {
531 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
532 goto end_unlock;
533 }
534 end_unlock:
535 pthread_mutex_unlock(&channel->lock);
536 end:
537 free(command_buffer);
538 return status;
539 }
540
541 enum lttng_notification_channel_status lttng_notification_channel_subscribe(
542 struct lttng_notification_channel *channel,
543 const struct lttng_condition *condition)
544 {
545 return send_condition_command(channel,
546 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE,
547 condition);
548 }
549
550 enum lttng_notification_channel_status lttng_notification_channel_unsubscribe(
551 struct lttng_notification_channel *channel,
552 const struct lttng_condition *condition)
553 {
554 return send_condition_command(channel,
555 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE,
556 condition);
557 }
558
559 void lttng_notification_channel_destroy(
560 struct lttng_notification_channel *channel)
561 {
562 if (!channel) {
563 return;
564 }
565
566 if (channel->socket >= 0) {
567 (void) lttcomm_close_unix_sock(channel->socket);
568 }
569 pthread_mutex_destroy(&channel->lock);
570 lttng_dynamic_buffer_reset(&channel->reception_buffer);
571 free(channel);
572 }
This page took 0.038885 seconds and 3 git commands to generate.