Commit | Line | Data |
---|---|---|
a58c490f JG |
1 | /* |
2 | * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com> | |
3 | * | |
4 | * This library is free software; you can redistribute it and/or modify it | |
5 | * under the terms of the GNU Lesser General Public License, version 2.1 only, | |
6 | * as published by the Free Software Foundation. | |
7 | * | |
8 | * This library is distributed in the hope that it will be useful, but WITHOUT | |
9 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or | |
10 | * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License | |
11 | * for more details. | |
12 | * | |
13 | * You should have received a copy of the GNU Lesser General Public License | |
14 | * along with this library; if not, write to the Free Software Foundation, | |
15 | * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | |
16 | */ | |
17 | ||
18 | #include <lttng/notification/notification-internal.h> | |
19 | #include <lttng/notification/channel-internal.h> | |
20 | #include <lttng/condition/condition-internal.h> | |
21 | #include <lttng/endpoint.h> | |
22 | #include <common/defaults.h> | |
23 | #include <common/error.h> | |
24 | #include <common/dynamic-buffer.h> | |
25 | #include <common/utils.h> | |
26 | #include <common/defaults.h> | |
27 | #include <assert.h> | |
28 | #include "lttng-ctl-helper.h" | |
29 | ||
30 | static | |
31 | int handshake(struct lttng_notification_channel *channel); | |
32 | ||
33 | /* | |
34 | * Populates the reception buffer with the next complete message. | |
35 | * The caller must acquire the client's lock. | |
36 | */ | |
37 | static | |
38 | int receive_message(struct lttng_notification_channel *channel) | |
39 | { | |
40 | ssize_t ret; | |
41 | struct lttng_notification_channel_message msg; | |
42 | ||
43 | ret = lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0); | |
44 | if (ret) { | |
45 | goto error; | |
46 | } | |
47 | ||
48 | ret = lttcomm_recv_unix_sock(channel->socket, &msg, sizeof(msg)); | |
49 | if (ret <= 0) { | |
50 | ret = -1; | |
51 | goto error; | |
52 | } | |
53 | ||
54 | if (msg.size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) { | |
55 | ret = -1; | |
56 | goto error; | |
57 | } | |
58 | ||
59 | /* Add message header at buffer's start. */ | |
60 | ret = lttng_dynamic_buffer_append(&channel->reception_buffer, &msg, | |
61 | sizeof(msg)); | |
62 | if (ret) { | |
63 | goto error; | |
64 | } | |
65 | ||
66 | /* Reserve space for the payload. */ | |
67 | ret = lttng_dynamic_buffer_set_size(&channel->reception_buffer, | |
68 | channel->reception_buffer.size + msg.size); | |
69 | if (ret) { | |
70 | goto error; | |
71 | } | |
72 | ||
73 | /* Receive message payload. */ | |
74 | ret = lttcomm_recv_unix_sock(channel->socket, | |
75 | channel->reception_buffer.data + sizeof(msg), msg.size); | |
76 | if (ret < (ssize_t) msg.size) { | |
77 | ret = -1; | |
78 | goto error; | |
79 | } | |
80 | ret = 0; | |
81 | end: | |
82 | return ret; | |
83 | error: | |
84 | lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0); | |
85 | goto end; | |
86 | } | |
87 | ||
88 | static | |
89 | enum lttng_notification_channel_message_type get_current_message_type( | |
90 | struct lttng_notification_channel *channel) | |
91 | { | |
92 | struct lttng_notification_channel_message *msg; | |
93 | ||
94 | assert(channel->reception_buffer.size >= sizeof(*msg)); | |
95 | ||
96 | msg = (struct lttng_notification_channel_message *) | |
97 | channel->reception_buffer.data; | |
98 | return (enum lttng_notification_channel_message_type) msg->type; | |
99 | } | |
100 | ||
101 | static | |
102 | struct lttng_notification *create_notification_from_current_message( | |
103 | struct lttng_notification_channel *channel) | |
104 | { | |
105 | ssize_t ret; | |
106 | struct lttng_notification *notification = NULL; | |
107 | struct lttng_buffer_view view; | |
108 | ||
109 | if (channel->reception_buffer.size <= | |
110 | sizeof(struct lttng_notification_channel_message)) { | |
111 | goto end; | |
112 | } | |
113 | ||
114 | view = lttng_buffer_view_from_dynamic_buffer(&channel->reception_buffer, | |
115 | sizeof(struct lttng_notification_channel_message), -1); | |
116 | ||
117 | ret = lttng_notification_create_from_buffer(&view, ¬ification); | |
118 | if (ret != channel->reception_buffer.size - | |
119 | sizeof(struct lttng_notification_channel_message)) { | |
120 | lttng_notification_destroy(notification); | |
121 | notification = NULL; | |
122 | goto end; | |
123 | } | |
124 | end: | |
125 | return notification; | |
126 | } | |
127 | ||
128 | struct lttng_notification_channel *lttng_notification_channel_create( | |
129 | struct lttng_endpoint *endpoint) | |
130 | { | |
131 | int fd, ret; | |
132 | bool is_in_tracing_group = false, is_root = false; | |
133 | char *sock_path = NULL; | |
134 | struct lttng_notification_channel *channel = NULL; | |
135 | ||
136 | if (!endpoint || | |
137 | endpoint != lttng_session_daemon_notification_endpoint) { | |
138 | goto end; | |
139 | } | |
140 | ||
141 | sock_path = zmalloc(LTTNG_PATH_MAX); | |
142 | if (!sock_path) { | |
143 | goto end; | |
144 | } | |
145 | ||
146 | channel = zmalloc(sizeof(struct lttng_notification_channel)); | |
147 | if (!channel) { | |
148 | goto end; | |
149 | } | |
150 | channel->socket = -1; | |
151 | pthread_mutex_init(&channel->lock, NULL); | |
152 | lttng_dynamic_buffer_init(&channel->reception_buffer); | |
153 | CDS_INIT_LIST_HEAD(&channel->pending_notifications.list); | |
154 | ||
155 | is_root = (getuid() == 0); | |
156 | if (!is_root) { | |
157 | is_in_tracing_group = lttng_check_tracing_group(); | |
158 | } | |
159 | ||
160 | if (is_root || is_in_tracing_group) { | |
161 | lttng_ctl_copy_string(sock_path, | |
162 | DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK, | |
163 | LTTNG_PATH_MAX); | |
164 | ret = lttcomm_connect_unix_sock(sock_path); | |
165 | if (ret >= 0) { | |
166 | fd = ret; | |
167 | goto set_fd; | |
168 | } | |
169 | } | |
170 | ||
171 | /* Fallback to local session daemon. */ | |
172 | ret = snprintf(sock_path, LTTNG_PATH_MAX, | |
173 | DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK, | |
174 | utils_get_home_dir()); | |
175 | if (ret < 0 || ret >= LTTNG_PATH_MAX) { | |
176 | goto error; | |
177 | } | |
178 | ||
179 | ret = lttcomm_connect_unix_sock(sock_path); | |
180 | if (ret < 0) { | |
181 | goto error; | |
182 | } | |
183 | fd = ret; | |
184 | ||
185 | set_fd: | |
186 | channel->socket = fd; | |
187 | ||
188 | ret = handshake(channel); | |
189 | if (ret) { | |
190 | goto error; | |
191 | } | |
192 | end: | |
193 | free(sock_path); | |
194 | return channel; | |
195 | error: | |
196 | lttng_notification_channel_destroy(channel); | |
197 | channel = NULL; | |
198 | goto end; | |
199 | } | |
200 | ||
201 | enum lttng_notification_channel_status | |
202 | lttng_notification_channel_get_next_notification( | |
203 | struct lttng_notification_channel *channel, | |
204 | struct lttng_notification **_notification) | |
205 | { | |
206 | int ret; | |
207 | struct lttng_notification *notification = NULL; | |
208 | enum lttng_notification_channel_status status = | |
209 | LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; | |
210 | ||
211 | if (!channel || !_notification) { | |
212 | status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; | |
213 | goto end; | |
214 | } | |
215 | ||
216 | if (channel->pending_notifications.count) { | |
217 | struct pending_notification *pending_notification; | |
218 | ||
219 | assert(!cds_list_empty(&channel->pending_notifications.list)); | |
220 | ||
221 | /* Deliver one of the pending notifications. */ | |
222 | pending_notification = cds_list_first_entry( | |
223 | &channel->pending_notifications.list, | |
224 | struct pending_notification, | |
225 | node); | |
226 | notification = pending_notification->notification; | |
227 | if (!notification) { | |
228 | status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED; | |
229 | } | |
230 | cds_list_del(&pending_notification->node); | |
231 | channel->pending_notifications.count--; | |
232 | free(pending_notification); | |
233 | goto end; | |
234 | } | |
235 | ||
236 | pthread_mutex_lock(&channel->lock); | |
237 | ||
238 | ret = receive_message(channel); | |
239 | if (ret) { | |
240 | status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; | |
241 | goto end_unlock; | |
242 | } | |
243 | ||
244 | switch (get_current_message_type(channel)) { | |
245 | case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION: | |
246 | notification = create_notification_from_current_message( | |
247 | channel); | |
248 | if (!notification) { | |
249 | status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; | |
250 | goto end_unlock; | |
251 | } | |
252 | break; | |
253 | case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED: | |
254 | /* No payload to consume. */ | |
255 | status = LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED; | |
256 | break; | |
257 | default: | |
258 | /* Protocol error. */ | |
259 | status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; | |
260 | goto end_unlock; | |
261 | } | |
262 | ||
263 | end_unlock: | |
264 | pthread_mutex_unlock(&channel->lock); | |
265 | end: | |
266 | if (_notification) { | |
267 | *_notification = notification; | |
268 | } | |
269 | return status; | |
270 | } | |
271 | ||
272 | static | |
273 | int enqueue_dropped_notification( | |
274 | struct lttng_notification_channel *channel) | |
275 | { | |
276 | int ret = 0; | |
277 | struct pending_notification *pending_notification; | |
278 | struct cds_list_head *last_element = | |
279 | channel->pending_notifications.list.prev; | |
280 | ||
281 | pending_notification = caa_container_of(last_element, | |
282 | struct pending_notification, node); | |
283 | if (!pending_notification->notification) { | |
284 | /* | |
285 | * The last enqueued notification indicates dropped | |
286 | * notifications; there is nothing to do as we group | |
287 | * dropped notifications together. | |
288 | */ | |
289 | goto end; | |
290 | } | |
291 | ||
292 | if (channel->pending_notifications.count >= | |
293 | DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT && | |
294 | pending_notification->notification) { | |
295 | /* | |
296 | * Discard the last enqueued notification to indicate | |
297 | * that notifications were dropped at this point. | |
298 | */ | |
299 | lttng_notification_destroy( | |
300 | pending_notification->notification); | |
301 | pending_notification->notification = NULL; | |
302 | goto end; | |
303 | } | |
304 | ||
305 | pending_notification = zmalloc(sizeof(*pending_notification)); | |
306 | if (!pending_notification) { | |
307 | ret = -1; | |
308 | goto end; | |
309 | } | |
310 | CDS_INIT_LIST_HEAD(&pending_notification->node); | |
311 | cds_list_add(&pending_notification->node, | |
312 | &channel->pending_notifications.list); | |
313 | channel->pending_notifications.count++; | |
314 | end: | |
315 | return ret; | |
316 | } | |
317 | ||
318 | static | |
319 | int enqueue_notification_from_current_message( | |
320 | struct lttng_notification_channel *channel) | |
321 | { | |
322 | int ret = 0; | |
323 | struct lttng_notification *notification; | |
324 | struct pending_notification *pending_notification; | |
325 | ||
326 | if (channel->pending_notifications.count >= | |
327 | DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT) { | |
328 | /* Drop the notification. */ | |
329 | ret = enqueue_dropped_notification(channel); | |
330 | goto end; | |
331 | } | |
332 | ||
333 | pending_notification = zmalloc(sizeof(*pending_notification)); | |
334 | if (!pending_notification) { | |
335 | ret = -1; | |
336 | goto error; | |
337 | } | |
338 | CDS_INIT_LIST_HEAD(&pending_notification->node); | |
339 | ||
340 | notification = create_notification_from_current_message(channel); | |
341 | if (!notification) { | |
342 | ret = -1; | |
343 | goto error; | |
344 | } | |
345 | ||
346 | pending_notification->notification = notification; | |
347 | cds_list_add(&pending_notification->node, | |
348 | &channel->pending_notifications.list); | |
349 | channel->pending_notifications.count++; | |
350 | end: | |
351 | return ret; | |
352 | error: | |
353 | free(pending_notification); | |
354 | goto end; | |
355 | } | |
356 | ||
357 | static | |
358 | int receive_command_reply(struct lttng_notification_channel *channel, | |
359 | enum lttng_notification_channel_status *status) | |
360 | { | |
361 | int ret; | |
362 | struct lttng_notification_channel_command_reply *reply; | |
363 | ||
364 | while (true) { | |
365 | enum lttng_notification_channel_message_type msg_type; | |
366 | ||
367 | ret = receive_message(channel); | |
368 | if (ret) { | |
369 | goto end; | |
370 | } | |
371 | ||
372 | msg_type = get_current_message_type(channel); | |
373 | switch (msg_type) { | |
374 | case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY: | |
375 | goto exit_loop; | |
376 | case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION: | |
377 | ret = enqueue_notification_from_current_message( | |
378 | channel); | |
379 | if (ret) { | |
380 | goto end; | |
381 | } | |
382 | break; | |
383 | case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED: | |
384 | ret = enqueue_dropped_notification(channel); | |
385 | if (ret) { | |
386 | goto end; | |
387 | } | |
388 | break; | |
389 | case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE: | |
390 | { | |
391 | struct lttng_notification_channel_command_handshake *handshake; | |
392 | ||
393 | handshake = (struct lttng_notification_channel_command_handshake *) | |
394 | (channel->reception_buffer.data + | |
395 | sizeof(struct lttng_notification_channel_message)); | |
396 | channel->version.major = handshake->major; | |
397 | channel->version.minor = handshake->minor; | |
398 | channel->version.set = true; | |
399 | break; | |
400 | } | |
401 | default: | |
402 | ret = -1; | |
403 | goto end; | |
404 | } | |
405 | } | |
406 | ||
407 | exit_loop: | |
408 | if (channel->reception_buffer.size < | |
409 | (sizeof(struct lttng_notification_channel_message) + | |
410 | sizeof(*reply))) { | |
411 | /* Invalid message received. */ | |
412 | ret = -1; | |
413 | goto end; | |
414 | } | |
415 | ||
416 | reply = (struct lttng_notification_channel_command_reply *) | |
417 | (channel->reception_buffer.data + | |
418 | sizeof(struct lttng_notification_channel_message)); | |
419 | *status = (enum lttng_notification_channel_status) reply->status; | |
420 | end: | |
421 | return ret; | |
422 | } | |
423 | ||
424 | static | |
425 | int handshake(struct lttng_notification_channel *channel) | |
426 | { | |
427 | ssize_t ret; | |
428 | enum lttng_notification_channel_status status = | |
429 | LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; | |
430 | struct lttng_notification_channel_command_handshake handshake = { | |
431 | .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR, | |
432 | .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR, | |
433 | }; | |
434 | struct lttng_notification_channel_message msg_header = { | |
435 | .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE, | |
436 | .size = sizeof(handshake), | |
437 | }; | |
438 | char send_buffer[sizeof(msg_header) + sizeof(handshake)]; | |
439 | ||
440 | memcpy(send_buffer, &msg_header, sizeof(msg_header)); | |
441 | memcpy(send_buffer + sizeof(msg_header), &handshake, sizeof(handshake)); | |
442 | ||
443 | pthread_mutex_lock(&channel->lock); | |
444 | ||
445 | ret = lttcomm_send_unix_sock(channel->socket, send_buffer, | |
446 | sizeof(send_buffer)); | |
447 | if (ret < 0) { | |
448 | goto end_unlock; | |
449 | } | |
450 | ||
451 | /* Receive handshake info from the sessiond. */ | |
452 | ret = receive_command_reply(channel, &status); | |
453 | if (ret < 0) { | |
454 | goto end_unlock; | |
455 | } | |
456 | ||
457 | if (!channel->version.set) { | |
458 | ret = -1; | |
459 | goto end_unlock; | |
460 | } | |
461 | ||
462 | if (channel->version.major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) { | |
463 | ret = -1; | |
464 | goto end_unlock; | |
465 | } | |
466 | ||
467 | end_unlock: | |
468 | pthread_mutex_unlock(&channel->lock); | |
469 | return ret; | |
470 | } | |
471 | ||
472 | static | |
473 | enum lttng_notification_channel_status send_condition_command( | |
474 | struct lttng_notification_channel *channel, | |
475 | enum lttng_notification_channel_message_type type, | |
476 | const struct lttng_condition *condition) | |
477 | { | |
478 | int socket; | |
479 | ssize_t command_size, ret; | |
480 | enum lttng_notification_channel_status status = | |
481 | LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; | |
482 | char *command_buffer = NULL; | |
483 | struct lttng_notification_channel_message cmd_message = { | |
484 | .type = type, | |
485 | }; | |
486 | ||
487 | if (!channel) { | |
488 | status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; | |
489 | goto end; | |
490 | } | |
491 | ||
492 | assert(type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE || | |
493 | type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE); | |
494 | ||
495 | pthread_mutex_lock(&channel->lock); | |
496 | socket = channel->socket; | |
497 | if (!lttng_condition_validate(condition)) { | |
498 | status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; | |
499 | goto end_unlock; | |
500 | } | |
501 | ||
502 | ret = lttng_condition_serialize(condition, NULL); | |
503 | if (ret < 0) { | |
504 | status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; | |
505 | goto end_unlock; | |
506 | } | |
507 | assert(ret < UINT32_MAX); | |
508 | cmd_message.size = (uint32_t) ret; | |
509 | command_size = ret + sizeof( | |
510 | struct lttng_notification_channel_message); | |
511 | command_buffer = zmalloc(command_size); | |
512 | if (!command_buffer) { | |
513 | goto end_unlock; | |
514 | } | |
515 | ||
516 | memcpy(command_buffer, &cmd_message, sizeof(cmd_message)); | |
517 | ret = lttng_condition_serialize(condition, | |
518 | command_buffer + sizeof(cmd_message)); | |
519 | if (ret < 0) { | |
520 | goto end_unlock; | |
521 | } | |
522 | ||
523 | ret = lttcomm_send_unix_sock(socket, command_buffer, command_size); | |
524 | if (ret < 0) { | |
525 | status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; | |
526 | goto end_unlock; | |
527 | } | |
528 | ||
529 | ret = receive_command_reply(channel, &status); | |
530 | if (ret < 0) { | |
531 | status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; | |
532 | goto end_unlock; | |
533 | } | |
534 | end_unlock: | |
535 | pthread_mutex_unlock(&channel->lock); | |
536 | end: | |
537 | free(command_buffer); | |
538 | return status; | |
539 | } | |
540 | ||
541 | enum lttng_notification_channel_status lttng_notification_channel_subscribe( | |
542 | struct lttng_notification_channel *channel, | |
543 | const struct lttng_condition *condition) | |
544 | { | |
545 | return send_condition_command(channel, | |
546 | LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE, | |
547 | condition); | |
548 | } | |
549 | ||
550 | enum lttng_notification_channel_status lttng_notification_channel_unsubscribe( | |
551 | struct lttng_notification_channel *channel, | |
552 | const struct lttng_condition *condition) | |
553 | { | |
554 | return send_condition_command(channel, | |
555 | LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE, | |
556 | condition); | |
557 | } | |
558 | ||
559 | void lttng_notification_channel_destroy( | |
560 | struct lttng_notification_channel *channel) | |
561 | { | |
562 | if (!channel) { | |
563 | return; | |
564 | } | |
565 | ||
566 | if (channel->socket >= 0) { | |
567 | (void) lttcomm_close_unix_sock(channel->socket); | |
568 | } | |
569 | pthread_mutex_destroy(&channel->lock); | |
570 | lttng_dynamic_buffer_reset(&channel->reception_buffer); | |
571 | free(channel); | |
572 | } |