Fix: sessiond: size-based rotation threshold exceeded in per-pid tracing (1/2)
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.cpp
1 /*
2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #include "lttng/action/action.h"
9 #include "lttng/trigger/trigger-internal.hpp"
10 #define _LGPL_SOURCE
11 #include <urcu.h>
12 #include <urcu/rculfhash.h>
13
14 #include <common/defaults.hpp>
15 #include <common/error.hpp>
16 #include <common/futex.hpp>
17 #include <common/unix.hpp>
18 #include <common/dynamic-buffer.hpp>
19 #include <common/hashtable/utils.hpp>
20 #include <common/sessiond-comm/sessiond-comm.hpp>
21 #include <common/macros.hpp>
22 #include <lttng/condition/condition.h>
23 #include <lttng/action/action-internal.hpp>
24 #include <lttng/action/list-internal.hpp>
25 #include <lttng/domain-internal.hpp>
26 #include <lttng/notification/notification-internal.hpp>
27 #include <lttng/condition/condition-internal.hpp>
28 #include <lttng/condition/buffer-usage-internal.hpp>
29 #include <lttng/condition/session-consumed-size-internal.hpp>
30 #include <lttng/condition/session-rotation-internal.hpp>
31 #include <lttng/condition/event-rule-matches-internal.hpp>
32 #include <lttng/domain-internal.hpp>
33 #include <lttng/notification/channel-internal.hpp>
34 #include <lttng/trigger/trigger-internal.hpp>
35 #include <lttng/event-rule/event-rule-internal.hpp>
36
37 #include <time.h>
38 #include <unistd.h>
39 #include <inttypes.h>
40 #include <fcntl.h>
41
42 #include "condition-internal.hpp"
43 #include "event-notifier-error-accounting.hpp"
44 #include "notification-thread.hpp"
45 #include "notification-thread-events.hpp"
46 #include "notification-thread-commands.hpp"
47 #include "lttng-sessiond.hpp"
48 #include "kernel.hpp"
49
50 #define CLIENT_POLL_EVENTS_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
51 #define CLIENT_POLL_EVENTS_IN_OUT (CLIENT_POLL_EVENTS_IN | LPOLLOUT)
52
53 /* The tracers currently limit the capture size to PIPE_BUF (4kb on linux). */
54 #define MAX_CAPTURE_SIZE (PIPE_BUF)
55
56 enum lttng_object_type {
57 LTTNG_OBJECT_TYPE_UNKNOWN,
58 LTTNG_OBJECT_TYPE_NONE,
59 LTTNG_OBJECT_TYPE_CHANNEL,
60 LTTNG_OBJECT_TYPE_SESSION,
61 };
62
63 struct lttng_channel_trigger_list {
64 struct channel_key channel_key;
65 /* List of struct lttng_trigger_list_element. */
66 struct cds_list_head list;
67 /* Node in the channel_triggers_ht */
68 struct cds_lfht_node channel_triggers_ht_node;
69 /* call_rcu delayed reclaim. */
70 struct rcu_head rcu_node;
71 };
72
73 /*
74 * List of triggers applying to a given session.
75 *
76 * See:
77 * - lttng_session_trigger_list_create()
78 * - lttng_session_trigger_list_build()
79 * - lttng_session_trigger_list_destroy()
80 * - lttng_session_trigger_list_add()
81 */
82 struct lttng_session_trigger_list {
83 /*
84 * Not owned by this; points to the session_info structure's
85 * session name.
86 */
87 const char *session_name;
88 /* List of struct lttng_trigger_list_element. */
89 struct cds_list_head list;
90 /* Node in the session_triggers_ht */
91 struct cds_lfht_node session_triggers_ht_node;
92 /*
93 * Weak reference to the notification system's session triggers
94 * hashtable.
95 *
96 * The session trigger list structure structure is owned by
97 * the session's session_info.
98 *
99 * The session_info is kept alive the the channel_infos holding a
100 * reference to it (reference counting). When those channels are
101 * destroyed (at runtime or on teardown), the reference they hold
102 * to the session_info are released. On destruction of session_info,
103 * session_info_destroy() will remove the list of triggers applying
104 * to this session from the notification system's state.
105 *
106 * This implies that the session_triggers_ht must be destroyed
107 * after the channels.
108 */
109 struct cds_lfht *session_triggers_ht;
110 /* Used for delayed RCU reclaim. */
111 struct rcu_head rcu_node;
112 };
113
114 namespace {
115 struct lttng_trigger_list_element {
116 /* No ownership of the trigger object is assumed. */
117 struct lttng_trigger *trigger;
118 struct cds_list_head node;
119 };
120
121 struct lttng_trigger_ht_element {
122 struct lttng_trigger *trigger;
123 struct cds_lfht_node node;
124 struct cds_lfht_node node_by_name_uid;
125 struct cds_list_head client_list_trigger_node;
126 /* call_rcu delayed reclaim. */
127 struct rcu_head rcu_node;
128 };
129
130 struct lttng_condition_list_element {
131 struct lttng_condition *condition;
132 struct cds_list_head node;
133 };
134
135 struct channel_state_sample {
136 struct channel_key key;
137 struct cds_lfht_node channel_state_ht_node;
138 uint64_t highest_usage;
139 uint64_t lowest_usage;
140 uint64_t channel_total_consumed;
141 /* call_rcu delayed reclaim. */
142 struct rcu_head rcu_node;
143 };
144 } /* namespace */
145
146 static unsigned long hash_channel_key(struct channel_key *key);
147 static int evaluate_buffer_condition(const struct lttng_condition *condition,
148 struct lttng_evaluation **evaluation,
149 const struct notification_thread_state *state,
150 const struct channel_state_sample *previous_sample,
151 const struct channel_state_sample *latest_sample,
152 uint64_t previous_session_consumed_total,
153 uint64_t latest_session_consumed_total,
154 struct channel_info *channel_info);
155 static
156 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
157 const struct lttng_evaluation *evaluation,
158 struct notification_client_list *client_list,
159 struct notification_thread_state *state,
160 uid_t channel_uid, gid_t channel_gid);
161
162
163 /* session_info API */
164 static
165 void session_info_destroy(void *_data);
166 static
167 void session_info_get(struct session_info *session_info);
168 static
169 void session_info_put(struct session_info *session_info);
170 static
171 struct session_info *session_info_create(uint64_t id,
172 const char *name,
173 uid_t uid,
174 gid_t gid,
175 struct lttng_session_trigger_list *trigger_list,
176 struct cds_lfht *sessions_ht);
177 static void session_info_add_channel(
178 struct session_info *session_info, struct channel_info *channel_info);
179 static
180 void session_info_remove_channel(struct session_info *session_info,
181 struct channel_info *channel_info);
182
183 /* lttng_session_trigger_list API */
184 static
185 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
186 const char *session_name,
187 struct cds_lfht *session_triggers_ht);
188 static
189 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
190 const struct notification_thread_state *state,
191 const char *session_name);
192 static
193 void lttng_session_trigger_list_destroy(
194 struct lttng_session_trigger_list *list);
195 static
196 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
197 struct lttng_trigger *trigger);
198
199 static
200 int client_handle_transmission_status(
201 struct notification_client *client,
202 enum client_transmission_status transmission_status,
203 struct notification_thread_state *state);
204
205 static
206 int handle_one_event_notifier_notification(
207 struct notification_thread_state *state,
208 int pipe, enum lttng_domain_type domain);
209
210 static
211 void free_lttng_trigger_ht_element_rcu(struct rcu_head *node);
212
213 static
214 int match_client_socket(struct cds_lfht_node *node, const void *key)
215 {
216 /* This double-cast is intended to supress pointer-to-cast warning. */
217 const int socket = (int) (intptr_t) key;
218 const struct notification_client *client = caa_container_of(node,
219 struct notification_client, client_socket_ht_node);
220
221 return client->socket == socket;
222 }
223
224 static
225 int match_client_id(struct cds_lfht_node *node, const void *key)
226 {
227 /* This double-cast is intended to supress pointer-to-cast warning. */
228 const notification_client_id id = *((notification_client_id *) key);
229 const struct notification_client *client = lttng::utils::container_of(
230 node, &notification_client::client_id_ht_node);
231
232 return client->id == id;
233 }
234
235 static
236 int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
237 {
238 struct channel_key *channel_key = (struct channel_key *) key;
239 struct lttng_channel_trigger_list *trigger_list;
240
241 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
242 channel_triggers_ht_node);
243
244 return !!((channel_key->key == trigger_list->channel_key.key) &&
245 (channel_key->domain == trigger_list->channel_key.domain));
246 }
247
248 static
249 int match_session_trigger_list(struct cds_lfht_node *node, const void *key)
250 {
251 const char *session_name = (const char *) key;
252 struct lttng_session_trigger_list *trigger_list;
253
254 trigger_list = caa_container_of(node, struct lttng_session_trigger_list,
255 session_triggers_ht_node);
256
257 return !!(strcmp(trigger_list->session_name, session_name) == 0);
258 }
259
260 static
261 int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
262 {
263 struct channel_key *channel_key = (struct channel_key *) key;
264 struct channel_state_sample *sample;
265
266 sample = caa_container_of(node, struct channel_state_sample,
267 channel_state_ht_node);
268
269 return !!((channel_key->key == sample->key.key) &&
270 (channel_key->domain == sample->key.domain));
271 }
272
273 static
274 int match_channel_info(struct cds_lfht_node *node, const void *key)
275 {
276 struct channel_key *channel_key = (struct channel_key *) key;
277 struct channel_info *channel_info;
278
279 channel_info = caa_container_of(node, struct channel_info,
280 channels_ht_node);
281
282 return !!((channel_key->key == channel_info->key.key) &&
283 (channel_key->domain == channel_info->key.domain));
284 }
285
286 static
287 int match_trigger(struct cds_lfht_node *node, const void *key)
288 {
289 struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
290 struct lttng_trigger_ht_element *trigger_ht_element;
291
292 trigger_ht_element = caa_container_of(node, struct lttng_trigger_ht_element,
293 node);
294
295 return !!lttng_trigger_is_equal(trigger_key, trigger_ht_element->trigger);
296 }
297
298 static
299 int match_trigger_token(struct cds_lfht_node *node, const void *key)
300 {
301 const uint64_t *_key = (uint64_t *) key;
302 struct notification_trigger_tokens_ht_element *element;
303
304 element = caa_container_of(node,
305 struct notification_trigger_tokens_ht_element, node);
306 return *_key == element->token;
307 }
308
309 static
310 int match_client_list_condition(struct cds_lfht_node *node, const void *key)
311 {
312 struct lttng_condition *condition_key = (struct lttng_condition *) key;
313 struct notification_client_list *client_list;
314 const struct lttng_condition *condition;
315
316 LTTNG_ASSERT(condition_key);
317
318 client_list = caa_container_of(node, struct notification_client_list,
319 notification_trigger_clients_ht_node);
320 condition = client_list->condition;
321
322 return !!lttng_condition_is_equal(condition_key, condition);
323 }
324
325 static
326 int match_session_info(struct cds_lfht_node *node, const void *key)
327 {
328 const auto session_id = *((uint64_t *) key);
329 const auto *session_info = lttng::utils::container_of(
330 node, &session_info::sessions_ht_node);
331
332 return session_id == session_info->id;
333 }
334
335 static
336 unsigned long hash_session_info_id(uint64_t id)
337 {
338 return hash_key_u64(&id, lttng_ht_seed);
339 }
340
341 static
342 unsigned long hash_session_info(const struct session_info *session_info)
343 {
344 return hash_session_info_id(session_info->id);
345 }
346
347 static
348 struct session_info *get_session_info_by_id(
349 const struct notification_thread_state *state, uint64_t id)
350 {
351 struct cds_lfht_iter iter;
352 struct cds_lfht_node *node;
353 lttng::urcu::read_lock_guard read_lock_guard;
354
355 cds_lfht_lookup(state->sessions_ht,
356 hash_session_info_id(id),
357 match_session_info,
358 &id,
359 &iter);
360 node = cds_lfht_iter_get_node(&iter);
361
362 if (node) {
363 auto session_info = lttng::utils::container_of(node, &session_info::sessions_ht_node);
364
365 session_info_get(session_info);
366 return session_info;
367 }
368
369 return NULL;
370 }
371
372 static
373 struct session_info *get_session_info_by_name(
374 const struct notification_thread_state *state, const char *name)
375 {
376 uint64_t session_id;
377 const auto found = sample_session_id_by_name(name, &session_id);
378
379 return found ? get_session_info_by_id(state, session_id) : NULL;
380 }
381
382 static
383 const char *notification_command_type_str(
384 enum notification_thread_command_type type)
385 {
386 switch (type) {
387 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
388 return "REGISTER_TRIGGER";
389 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
390 return "UNREGISTER_TRIGGER";
391 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
392 return "ADD_CHANNEL";
393 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
394 return "REMOVE_CHANNEL";
395 case NOTIFICATION_COMMAND_TYPE_ADD_SESSION:
396 return "ADD_SESSION";
397 case NOTIFICATION_COMMAND_TYPE_REMOVE_SESSION:
398 return "REMOVE_SESSION";
399 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
400 return "SESSION_ROTATION_ONGOING";
401 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
402 return "SESSION_ROTATION_COMPLETED";
403 case NOTIFICATION_COMMAND_TYPE_ADD_TRACER_EVENT_SOURCE:
404 return "ADD_TRACER_EVENT_SOURCE";
405 case NOTIFICATION_COMMAND_TYPE_REMOVE_TRACER_EVENT_SOURCE:
406 return "REMOVE_TRACER_EVENT_SOURCE";
407 case NOTIFICATION_COMMAND_TYPE_LIST_TRIGGERS:
408 return "LIST_TRIGGERS";
409 case NOTIFICATION_COMMAND_TYPE_GET_TRIGGER:
410 return "GET_TRIGGER";
411 case NOTIFICATION_COMMAND_TYPE_QUIT:
412 return "QUIT";
413 case NOTIFICATION_COMMAND_TYPE_CLIENT_COMMUNICATION_UPDATE:
414 return "CLIENT_COMMUNICATION_UPDATE";
415 default:
416 abort();
417 }
418 }
419
420 /*
421 * Match trigger based on name and credentials only.
422 * Name duplication is NOT allowed for the same uid.
423 */
424 static
425 int match_trigger_by_name_uid(struct cds_lfht_node *node,
426 const void *key)
427 {
428 bool match = false;
429 const char *element_trigger_name;
430 const char *key_name;
431 enum lttng_trigger_status status;
432 const struct lttng_credentials *key_creds;
433 const struct lttng_credentials *node_creds;
434 const struct lttng_trigger *trigger_key =
435 (const struct lttng_trigger *) key;
436 const struct lttng_trigger_ht_element *trigger_ht_element =
437 caa_container_of(node,
438 struct lttng_trigger_ht_element,
439 node_by_name_uid);
440
441 status = lttng_trigger_get_name(trigger_ht_element->trigger,
442 &element_trigger_name);
443 element_trigger_name = status == LTTNG_TRIGGER_STATUS_OK ?
444 element_trigger_name : NULL;
445
446 status = lttng_trigger_get_name(trigger_key, &key_name);
447 key_name = status == LTTNG_TRIGGER_STATUS_OK ? key_name : NULL;
448
449 /*
450 * Compare the names.
451 * Consider null names as not equal. This is to maintain backwards
452 * compatibility with pre-2.13 anonymous triggers. Multiples anonymous
453 * triggers are allowed for a given user.
454 */
455 if (!element_trigger_name || !key_name) {
456 goto end;
457 }
458
459 if (strcmp(element_trigger_name, key_name) != 0) {
460 goto end;
461 }
462
463 /* Compare the owners' UIDs. */
464 key_creds = lttng_trigger_get_credentials(trigger_key);
465 node_creds = lttng_trigger_get_credentials(trigger_ht_element->trigger);
466
467 match = lttng_credentials_is_equal_uid(key_creds, node_creds);
468
469 end:
470 return match;
471 }
472
473 /*
474 * Hash trigger based on name and credentials only.
475 */
476 static
477 unsigned long hash_trigger_by_name_uid(const struct lttng_trigger *trigger)
478 {
479 unsigned long hash = 0;
480 const struct lttng_credentials *trigger_creds;
481 const char *trigger_name;
482 enum lttng_trigger_status status;
483
484 status = lttng_trigger_get_name(trigger, &trigger_name);
485 if (status == LTTNG_TRIGGER_STATUS_OK) {
486 hash = hash_key_str(trigger_name, lttng_ht_seed);
487 }
488
489 trigger_creds = lttng_trigger_get_credentials(trigger);
490 hash ^= hash_key_ulong((void *) (unsigned long) LTTNG_OPTIONAL_GET(trigger_creds->uid),
491 lttng_ht_seed);
492
493 return hash;
494 }
495
496 static
497 unsigned long hash_channel_key(struct channel_key *key)
498 {
499 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
500 unsigned long domain_hash = hash_key_ulong(
501 (void *) (unsigned long) key->domain, lttng_ht_seed);
502
503 return key_hash ^ domain_hash;
504 }
505
506 static
507 unsigned long hash_client_socket(int socket)
508 {
509 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
510 }
511
512 static
513 unsigned long hash_client_id(notification_client_id id)
514 {
515 return hash_key_u64(&id, lttng_ht_seed);
516 }
517
518 /*
519 * Get the type of object to which a given condition applies. Bindings let
520 * the notification system evaluate a trigger's condition when a given
521 * object's state is updated.
522 *
523 * For instance, a condition bound to a channel will be evaluated everytime
524 * the channel's state is changed by a channel monitoring sample.
525 */
526 static
527 enum lttng_object_type get_condition_binding_object(
528 const struct lttng_condition *condition)
529 {
530 switch (lttng_condition_get_type(condition)) {
531 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
532 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
533 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
534 return LTTNG_OBJECT_TYPE_CHANNEL;
535 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
536 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
537 return LTTNG_OBJECT_TYPE_SESSION;
538 case LTTNG_CONDITION_TYPE_EVENT_RULE_MATCHES:
539 return LTTNG_OBJECT_TYPE_NONE;
540 default:
541 return LTTNG_OBJECT_TYPE_UNKNOWN;
542 }
543 }
544
545 static
546 void free_channel_info_rcu(struct rcu_head *node)
547 {
548 free(lttng::utils::container_of(node, &channel_info::rcu_node));
549 }
550
551 static
552 void channel_info_destroy(struct channel_info *channel_info)
553 {
554 if (!channel_info) {
555 return;
556 }
557
558 if (channel_info->session_info) {
559 session_info_remove_channel(channel_info->session_info,
560 channel_info);
561 session_info_put(channel_info->session_info);
562 }
563 if (channel_info->name) {
564 free(channel_info->name);
565 }
566 call_rcu(&channel_info->rcu_node, free_channel_info_rcu);
567 }
568
569 static
570 void free_session_info_rcu(struct rcu_head *node)
571 {
572 free(lttng::utils::container_of(node, &session_info::rcu_node));
573 }
574
575 /* Don't call directly, use the ref-counting mechanism. */
576 static
577 void session_info_destroy(void *_data)
578 {
579 struct session_info *session_info = (struct session_info *) _data;
580 int ret;
581
582 LTTNG_ASSERT(session_info);
583 if (session_info->channel_infos_ht) {
584 ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
585 if (ret) {
586 ERR("Failed to destroy channel information hash table");
587 }
588 }
589 lttng_session_trigger_list_destroy(session_info->trigger_list);
590
591 rcu_read_lock();
592 cds_lfht_del(session_info->sessions_ht,
593 &session_info->sessions_ht_node);
594 rcu_read_unlock();
595 free(session_info->name);
596 call_rcu(&session_info->rcu_node, free_session_info_rcu);
597 }
598
599 static
600 void session_info_get(struct session_info *session_info)
601 {
602 if (!session_info) {
603 return;
604 }
605 lttng_ref_get(&session_info->ref);
606 }
607
608 static
609 void session_info_put(struct session_info *session_info)
610 {
611 if (!session_info) {
612 return;
613 }
614 lttng_ref_put(&session_info->ref);
615 }
616
617 static
618 struct session_info *session_info_create(uint64_t id,
619 const char *name,
620 uid_t uid,
621 gid_t gid,
622 struct lttng_session_trigger_list *trigger_list,
623 struct cds_lfht *sessions_ht)
624 {
625 struct session_info *session_info;
626
627 LTTNG_ASSERT(name);
628
629 session_info = zmalloc<struct session_info>();
630 if (!session_info) {
631 goto end;
632 }
633
634 lttng_ref_init(&session_info->ref, session_info_destroy);
635
636 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
637 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
638 if (!session_info->channel_infos_ht) {
639 goto error;
640 }
641
642 cds_lfht_node_init(&session_info->sessions_ht_node);
643 session_info->id = id;
644 session_info->name = strdup(name);
645 if (!session_info->name) {
646 goto error;
647 }
648
649 session_info->uid = uid;
650 session_info->gid = gid;
651 session_info->trigger_list = trigger_list;
652 session_info->sessions_ht = sessions_ht;
653 end:
654 return session_info;
655 error:
656 session_info_put(session_info);
657 return NULL;
658 }
659
660 static
661 void session_info_add_channel(struct session_info *session_info,
662 struct channel_info *channel_info)
663 {
664 rcu_read_lock();
665 cds_lfht_add(session_info->channel_infos_ht,
666 hash_channel_key(&channel_info->key),
667 &channel_info->session_info_channels_ht_node);
668 rcu_read_unlock();
669 }
670
671 static
672 void session_info_remove_channel(struct session_info *session_info,
673 struct channel_info *channel_info)
674 {
675 rcu_read_lock();
676 cds_lfht_del(session_info->channel_infos_ht,
677 &channel_info->session_info_channels_ht_node);
678 rcu_read_unlock();
679 }
680
681 static
682 struct channel_info *channel_info_create(const char *channel_name,
683 struct channel_key *channel_key, uint64_t channel_capacity,
684 struct session_info *session_info)
685 {
686 struct channel_info *channel_info = zmalloc<struct channel_info>();
687
688 if (!channel_info) {
689 goto end;
690 }
691
692 cds_lfht_node_init(&channel_info->channels_ht_node);
693 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
694 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
695 channel_info->capacity = channel_capacity;
696
697 channel_info->name = strdup(channel_name);
698 if (!channel_info->name) {
699 goto error;
700 }
701
702 /*
703 * Set the references between session and channel infos:
704 * - channel_info holds a strong reference to session_info
705 * - session_info holds a weak reference to channel_info
706 */
707 session_info_get(session_info);
708 session_info_add_channel(session_info, channel_info);
709 channel_info->session_info = session_info;
710 end:
711 return channel_info;
712 error:
713 channel_info_destroy(channel_info);
714 return NULL;
715 }
716
717 bool notification_client_list_get(struct notification_client_list *list)
718 {
719 return urcu_ref_get_unless_zero(&list->ref);
720 }
721
722 static
723 void free_notification_client_list_rcu(struct rcu_head *node)
724 {
725 free(caa_container_of(node, struct notification_client_list,
726 rcu_node));
727 }
728
729 static
730 void notification_client_list_release(struct urcu_ref *list_ref)
731 {
732 struct notification_client_list *list =
733 lttng::utils::container_of(list_ref, &notification_client_list::ref);
734 struct notification_client_list_element *client_list_element, *tmp;
735
736 lttng_condition_put(list->condition);
737
738 if (list->notification_trigger_clients_ht) {
739 rcu_read_lock();
740
741 cds_lfht_del(list->notification_trigger_clients_ht,
742 &list->notification_trigger_clients_ht_node);
743 rcu_read_unlock();
744 list->notification_trigger_clients_ht = NULL;
745 }
746 cds_list_for_each_entry_safe(client_list_element, tmp,
747 &list->clients_list, node) {
748 free(client_list_element);
749 }
750
751 LTTNG_ASSERT(cds_list_empty(&list->triggers_list));
752
753 pthread_mutex_destroy(&list->lock);
754 call_rcu(&list->rcu_node, free_notification_client_list_rcu);
755 }
756
757 static
758 bool condition_applies_to_client(const struct lttng_condition *condition,
759 struct notification_client *client)
760 {
761 bool applies = false;
762 struct lttng_condition_list_element *condition_list_element;
763
764 cds_list_for_each_entry(condition_list_element, &client->condition_list,
765 node) {
766 applies = lttng_condition_is_equal(
767 condition_list_element->condition,
768 condition);
769 if (applies) {
770 break;
771 }
772 }
773
774 return applies;
775 }
776
777 static
778 struct notification_client_list *notification_client_list_create(
779 struct notification_thread_state *state,
780 const struct lttng_condition *condition)
781 {
782 struct notification_client *client;
783 struct cds_lfht_iter iter;
784 struct notification_client_list *client_list;
785
786 client_list = zmalloc<notification_client_list>();
787 if (!client_list) {
788 PERROR("Failed to allocate notification client list");
789 goto end;
790 }
791
792 pthread_mutex_init(&client_list->lock, NULL);
793 /*
794 * The trigger that owns the condition has the first reference to this
795 * client list.
796 */
797 urcu_ref_init(&client_list->ref);
798 cds_lfht_node_init(&client_list->notification_trigger_clients_ht_node);
799 CDS_INIT_LIST_HEAD(&client_list->clients_list);
800 CDS_INIT_LIST_HEAD(&client_list->triggers_list);
801
802 /*
803 * Create a copy of the condition so that it's independent of any
804 * trigger. The client list may outlive the trigger object (which owns
805 * the condition) that is used to create it.
806 */
807 client_list->condition = lttng_condition_copy(condition);
808
809 /* Build a list of clients to which this new condition applies. */
810 cds_lfht_for_each_entry (state->client_socket_ht, &iter, client,
811 client_socket_ht_node) {
812 struct notification_client_list_element *client_list_element;
813
814 if (!condition_applies_to_client(condition, client)) {
815 continue;
816 }
817
818 client_list_element = zmalloc<notification_client_list_element>();
819 if (!client_list_element) {
820 goto error_put_client_list;
821 }
822
823 CDS_INIT_LIST_HEAD(&client_list_element->node);
824 client_list_element->client = client;
825 cds_list_add(&client_list_element->node, &client_list->clients_list);
826 }
827
828 client_list->notification_trigger_clients_ht =
829 state->notification_trigger_clients_ht;
830
831 rcu_read_lock();
832 /*
833 * Add the client list to the global list of client list.
834 */
835 cds_lfht_add_unique(state->notification_trigger_clients_ht,
836 lttng_condition_hash(client_list->condition),
837 match_client_list_condition,
838 client_list->condition,
839 &client_list->notification_trigger_clients_ht_node);
840 rcu_read_unlock();
841 goto end;
842
843 error_put_client_list:
844 notification_client_list_put(client_list);
845 client_list = NULL;
846
847 end:
848 return client_list;
849 }
850
851 void notification_client_list_put(struct notification_client_list *list)
852 {
853 if (!list) {
854 return;
855 }
856 return urcu_ref_put(&list->ref, notification_client_list_release);
857 }
858
859 /* Provides a reference to the returned list. */
860 static
861 struct notification_client_list *get_client_list_from_condition(
862 struct notification_thread_state *state,
863 const struct lttng_condition *condition)
864 {
865 struct cds_lfht_node *node;
866 struct cds_lfht_iter iter;
867 struct notification_client_list *list = NULL;
868
869 rcu_read_lock();
870 cds_lfht_lookup(state->notification_trigger_clients_ht,
871 lttng_condition_hash(condition),
872 match_client_list_condition,
873 condition,
874 &iter);
875 node = cds_lfht_iter_get_node(&iter);
876 if (node) {
877 list = lttng::utils::container_of(node,
878 &notification_client_list::notification_trigger_clients_ht_node);
879 list = notification_client_list_get(list) ? list : NULL;
880 }
881
882 rcu_read_unlock();
883 return list;
884 }
885
886 static
887 int evaluate_channel_condition_for_client(
888 const struct lttng_condition *condition,
889 struct notification_thread_state *state,
890 struct lttng_evaluation **evaluation,
891 uid_t *session_uid, gid_t *session_gid)
892 {
893 int ret;
894 struct cds_lfht_iter iter;
895 struct cds_lfht_node *node;
896 struct channel_info *channel_info = NULL;
897 struct channel_key *channel_key = NULL;
898 struct channel_state_sample *last_sample = NULL;
899 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
900
901 rcu_read_lock();
902
903 /* Find the channel associated with the condition. */
904 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
905 channel_trigger_list, channel_triggers_ht_node) {
906 struct lttng_trigger_list_element *element;
907
908 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
909 const struct lttng_condition *current_condition =
910 lttng_trigger_get_const_condition(
911 element->trigger);
912
913 LTTNG_ASSERT(current_condition);
914 if (!lttng_condition_is_equal(condition,
915 current_condition)) {
916 continue;
917 }
918
919 /* Found the trigger, save the channel key. */
920 channel_key = &channel_trigger_list->channel_key;
921 break;
922 }
923 if (channel_key) {
924 /* The channel key was found stop iteration. */
925 break;
926 }
927 }
928
929 if (!channel_key){
930 /* No channel found; normal exit. */
931 DBG("No known channel associated with newly subscribed-to condition");
932 ret = 0;
933 goto end;
934 }
935
936 /* Fetch channel info for the matching channel. */
937 cds_lfht_lookup(state->channels_ht,
938 hash_channel_key(channel_key),
939 match_channel_info,
940 channel_key,
941 &iter);
942 node = cds_lfht_iter_get_node(&iter);
943 LTTNG_ASSERT(node);
944 channel_info = caa_container_of(node, struct channel_info,
945 channels_ht_node);
946
947 /* Retrieve the channel's last sample, if it exists. */
948 cds_lfht_lookup(state->channel_state_ht,
949 hash_channel_key(channel_key),
950 match_channel_state_sample,
951 channel_key,
952 &iter);
953 node = cds_lfht_iter_get_node(&iter);
954 if (node) {
955 last_sample = caa_container_of(node,
956 struct channel_state_sample,
957 channel_state_ht_node);
958 } else {
959 /* Nothing to evaluate, no sample was ever taken. Normal exit */
960 DBG("No channel sample associated with newly subscribed-to condition");
961 ret = 0;
962 goto end;
963 }
964
965 ret = evaluate_buffer_condition(condition, evaluation, state,
966 NULL, last_sample,
967 0, channel_info->session_info->consumed_data_size,
968 channel_info);
969 if (ret) {
970 WARN("Fatal error occurred while evaluating a newly subscribed-to condition");
971 goto end;
972 }
973
974 *session_uid = channel_info->session_info->uid;
975 *session_gid = channel_info->session_info->gid;
976 end:
977 rcu_read_unlock();
978 return ret;
979 }
980
981 static
982 const char *get_condition_session_name(const struct lttng_condition *condition)
983 {
984 const char *session_name = NULL;
985 enum lttng_condition_status status;
986
987 switch (lttng_condition_get_type(condition)) {
988 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
989 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
990 status = lttng_condition_buffer_usage_get_session_name(
991 condition, &session_name);
992 break;
993 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
994 status = lttng_condition_session_consumed_size_get_session_name(
995 condition, &session_name);
996 break;
997 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
998 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
999 status = lttng_condition_session_rotation_get_session_name(
1000 condition, &session_name);
1001 break;
1002 default:
1003 abort();
1004 }
1005 if (status != LTTNG_CONDITION_STATUS_OK) {
1006 ERR("Failed to retrieve session rotation condition's session name");
1007 goto end;
1008 }
1009 end:
1010 return session_name;
1011 }
1012
1013 static
1014 int evaluate_session_condition_for_client(
1015 const struct lttng_condition *condition,
1016 struct notification_thread_state *state,
1017 struct lttng_evaluation **evaluation,
1018 uid_t *session_uid, gid_t *session_gid)
1019 {
1020 int ret;
1021 const char *session_name;
1022 struct session_info *session_info = NULL;
1023
1024 rcu_read_lock();
1025 session_name = get_condition_session_name(condition);
1026
1027 /* Find the session associated with the condition. */
1028 session_info = get_session_info_by_name(state, session_name);
1029 if (!session_info) {
1030 DBG("Unknown session while evaluating session condition for client: name = `%s`",
1031 session_name);
1032 ret = 0;
1033 goto end;
1034 }
1035
1036 /*
1037 * Evaluation is performed in-line here since only one type of
1038 * session-bound condition is handled for the moment.
1039 */
1040 switch (lttng_condition_get_type(condition)) {
1041 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1042 if (!session_info->rotation.ongoing) {
1043 ret = 0;
1044 goto end_session_put;
1045 }
1046
1047 *evaluation = lttng_evaluation_session_rotation_ongoing_create(
1048 session_info->rotation.id);
1049 if (!*evaluation) {
1050 /* Fatal error. */
1051 ERR("Failed to create session rotation ongoing evaluation for session \"%s\"",
1052 session_info->name);
1053 ret = -1;
1054 goto end_session_put;
1055 }
1056 ret = 0;
1057 break;
1058 default:
1059 ret = 0;
1060 goto end_session_put;
1061 }
1062
1063 *session_uid = session_info->uid;
1064 *session_gid = session_info->gid;
1065
1066 end_session_put:
1067 session_info_put(session_info);
1068 end:
1069 rcu_read_unlock();
1070 return ret;
1071 }
1072
1073 static
1074 int evaluate_condition_for_client(const struct lttng_trigger *trigger,
1075 const struct lttng_condition *condition,
1076 struct notification_client *client,
1077 struct notification_thread_state *state)
1078 {
1079 int ret;
1080 struct lttng_evaluation *evaluation = NULL;
1081 struct notification_client_list client_list = {
1082 .lock = PTHREAD_MUTEX_INITIALIZER,
1083 .ref = {},
1084 .condition = NULL,
1085 .triggers_list = {},
1086 .clients_list = {},
1087 .notification_trigger_clients_ht = NULL,
1088 .notification_trigger_clients_ht_node = {},
1089 .rcu_node = {},
1090 };
1091 struct notification_client_list_element client_list_element = {};
1092 uid_t object_uid = 0;
1093 gid_t object_gid = 0;
1094
1095 LTTNG_ASSERT(trigger);
1096 LTTNG_ASSERT(condition);
1097 LTTNG_ASSERT(client);
1098 LTTNG_ASSERT(state);
1099
1100 switch (get_condition_binding_object(condition)) {
1101 case LTTNG_OBJECT_TYPE_SESSION:
1102 ret = evaluate_session_condition_for_client(condition, state,
1103 &evaluation, &object_uid, &object_gid);
1104 break;
1105 case LTTNG_OBJECT_TYPE_CHANNEL:
1106 ret = evaluate_channel_condition_for_client(condition, state,
1107 &evaluation, &object_uid, &object_gid);
1108 break;
1109 case LTTNG_OBJECT_TYPE_NONE:
1110 DBG("Newly subscribed-to condition not bound to object, nothing to evaluate");
1111 ret = 0;
1112 goto end;
1113 case LTTNG_OBJECT_TYPE_UNKNOWN:
1114 default:
1115 ret = -1;
1116 goto end;
1117 }
1118 if (ret) {
1119 /* Fatal error. */
1120 goto end;
1121 }
1122 if (!evaluation) {
1123 /* Evaluation yielded nothing. Normal exit. */
1124 DBG("Newly subscribed-to condition evaluated to false, nothing to report to client");
1125 ret = 0;
1126 goto end;
1127 }
1128
1129 /*
1130 * Create a temporary client list with the client currently
1131 * subscribing.
1132 */
1133 cds_lfht_node_init(&client_list.notification_trigger_clients_ht_node);
1134 CDS_INIT_LIST_HEAD(&client_list.clients_list);
1135
1136 CDS_INIT_LIST_HEAD(&client_list_element.node);
1137 client_list_element.client = client;
1138 cds_list_add(&client_list_element.node, &client_list.clients_list);
1139
1140 /* Send evaluation result to the newly-subscribed client. */
1141 DBG("Newly subscribed-to condition evaluated to true, notifying client");
1142 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
1143 state, object_uid, object_gid);
1144
1145 end:
1146 return ret;
1147 }
1148
1149 static
1150 int notification_thread_client_subscribe(struct notification_client *client,
1151 struct lttng_condition *condition,
1152 struct notification_thread_state *state,
1153 enum lttng_notification_channel_status *_status)
1154 {
1155 int ret = 0;
1156 struct notification_client_list *client_list = NULL;
1157 struct lttng_condition_list_element *condition_list_element = NULL;
1158 struct notification_client_list_element *client_list_element = NULL;
1159 struct lttng_trigger_ht_element *trigger_ht_element;
1160 enum lttng_notification_channel_status status =
1161 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1162
1163 /*
1164 * Ensure that the client has not already subscribed to this condition
1165 * before.
1166 */
1167 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
1168 if (lttng_condition_is_equal(condition_list_element->condition,
1169 condition)) {
1170 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
1171 goto end;
1172 }
1173 }
1174
1175 condition_list_element = zmalloc<lttng_condition_list_element>();
1176 if (!condition_list_element) {
1177 ret = -1;
1178 goto error;
1179 }
1180 client_list_element = zmalloc<notification_client_list_element>();
1181 if (!client_list_element) {
1182 ret = -1;
1183 goto error;
1184 }
1185
1186 /*
1187 * Add the newly-subscribed condition to the client's subscription list.
1188 */
1189 CDS_INIT_LIST_HEAD(&condition_list_element->node);
1190 condition_list_element->condition = condition;
1191 condition = NULL;
1192 cds_list_add(&condition_list_element->node, &client->condition_list);
1193
1194 client_list = get_client_list_from_condition(
1195 state, condition_list_element->condition);
1196 if (!client_list) {
1197 /*
1198 * No notification-emiting trigger registered with this
1199 * condition. We don't evaluate the condition right away
1200 * since this trigger is not registered yet.
1201 */
1202 free(client_list_element);
1203 goto end;
1204 }
1205
1206 /*
1207 * The condition to which the client just subscribed is evaluated
1208 * at this point so that conditions that are already TRUE result
1209 * in a notification being sent out.
1210 *
1211 * Note the iteration on all triggers which share an identical
1212 * `condition` than the one to which the client is registering. This is
1213 * done to ensure that the client receives a distinct notification for
1214 * all triggers that have a `notify` action that have this condition.
1215 */
1216 pthread_mutex_lock(&client_list->lock);
1217 cds_list_for_each_entry(trigger_ht_element,
1218 &client_list->triggers_list, client_list_trigger_node) {
1219 if (evaluate_condition_for_client(trigger_ht_element->trigger, condition_list_element->condition,
1220 client, state)) {
1221 WARN("Evaluation of a condition on client subscription failed, aborting.");
1222 ret = -1;
1223 free(client_list_element);
1224 pthread_mutex_unlock(&client_list->lock);
1225 goto end;
1226 }
1227 }
1228 pthread_mutex_unlock(&client_list->lock);
1229
1230 /*
1231 * Add the client to the list of clients interested in a given trigger
1232 * if a "notification" trigger with a corresponding condition was
1233 * added prior.
1234 */
1235 client_list_element->client = client;
1236 CDS_INIT_LIST_HEAD(&client_list_element->node);
1237
1238 pthread_mutex_lock(&client_list->lock);
1239 cds_list_add(&client_list_element->node, &client_list->clients_list);
1240 pthread_mutex_unlock(&client_list->lock);
1241 end:
1242 if (_status) {
1243 *_status = status;
1244 }
1245 if (client_list) {
1246 notification_client_list_put(client_list);
1247 }
1248 lttng_condition_destroy(condition);
1249 return ret;
1250 error:
1251 free(condition_list_element);
1252 free(client_list_element);
1253 lttng_condition_destroy(condition);
1254 return ret;
1255 }
1256
1257 static
1258 int notification_thread_client_unsubscribe(
1259 struct notification_client *client,
1260 struct lttng_condition *condition,
1261 struct notification_thread_state *state,
1262 enum lttng_notification_channel_status *_status)
1263 {
1264 struct notification_client_list *client_list;
1265 struct lttng_condition_list_element *condition_list_element,
1266 *condition_tmp;
1267 struct notification_client_list_element *client_list_element,
1268 *client_tmp;
1269 bool condition_found = false;
1270 enum lttng_notification_channel_status status =
1271 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1272
1273 /* Remove the condition from the client's condition list. */
1274 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
1275 &client->condition_list, node) {
1276 if (!lttng_condition_is_equal(condition_list_element->condition,
1277 condition)) {
1278 continue;
1279 }
1280
1281 cds_list_del(&condition_list_element->node);
1282 /*
1283 * The caller may be iterating on the client's conditions to
1284 * tear down a client's connection. In this case, the condition
1285 * will be destroyed at the end.
1286 */
1287 if (condition != condition_list_element->condition) {
1288 lttng_condition_destroy(
1289 condition_list_element->condition);
1290 }
1291 free(condition_list_element);
1292 condition_found = true;
1293 break;
1294 }
1295
1296 if (!condition_found) {
1297 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
1298 goto end;
1299 }
1300
1301 /*
1302 * Remove the client from the list of clients interested the trigger
1303 * matching the condition.
1304 */
1305 client_list = get_client_list_from_condition(state, condition);
1306 if (!client_list) {
1307 goto end;
1308 }
1309
1310 pthread_mutex_lock(&client_list->lock);
1311 cds_list_for_each_entry_safe(client_list_element, client_tmp,
1312 &client_list->clients_list, node) {
1313 if (client_list_element->client->id != client->id) {
1314 continue;
1315 }
1316 cds_list_del(&client_list_element->node);
1317 free(client_list_element);
1318 break;
1319 }
1320 pthread_mutex_unlock(&client_list->lock);
1321 notification_client_list_put(client_list);
1322 client_list = NULL;
1323 end:
1324 lttng_condition_destroy(condition);
1325 if (_status) {
1326 *_status = status;
1327 }
1328 return 0;
1329 }
1330
1331 static
1332 void free_notification_client_rcu(struct rcu_head *node)
1333 {
1334 free(lttng::utils::container_of(node, &notification_client::rcu_node));
1335 }
1336
1337 static
1338 void notification_client_destroy(struct notification_client *client)
1339 {
1340 if (!client) {
1341 return;
1342 }
1343
1344 /*
1345 * The client object is not reachable by other threads, no need to lock
1346 * the client here.
1347 */
1348 if (client->socket >= 0) {
1349 (void) lttcomm_close_unix_sock(client->socket);
1350 client->socket = -1;
1351 }
1352 client->communication.active = false;
1353 lttng_payload_reset(&client->communication.inbound.payload);
1354 lttng_payload_reset(&client->communication.outbound.payload);
1355 pthread_mutex_destroy(&client->lock);
1356 call_rcu(&client->rcu_node, free_notification_client_rcu);
1357 }
1358
1359 /*
1360 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1361 * client pointer).
1362 */
1363 static
1364 struct notification_client *get_client_from_socket(int socket,
1365 struct notification_thread_state *state)
1366 {
1367 struct cds_lfht_iter iter;
1368 struct cds_lfht_node *node;
1369 struct notification_client *client = NULL;
1370
1371 ASSERT_RCU_READ_LOCKED();
1372
1373 cds_lfht_lookup(state->client_socket_ht,
1374 hash_client_socket(socket),
1375 match_client_socket,
1376 (void *) (unsigned long) socket,
1377 &iter);
1378 node = cds_lfht_iter_get_node(&iter);
1379 if (!node) {
1380 goto end;
1381 }
1382
1383 client = caa_container_of(node, struct notification_client,
1384 client_socket_ht_node);
1385 end:
1386 return client;
1387 }
1388
1389 /*
1390 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1391 * client pointer).
1392 */
1393 static
1394 struct notification_client *get_client_from_id(notification_client_id id,
1395 struct notification_thread_state *state)
1396 {
1397 struct cds_lfht_iter iter;
1398 struct cds_lfht_node *node;
1399 struct notification_client *client = NULL;
1400
1401 ASSERT_RCU_READ_LOCKED();
1402
1403 cds_lfht_lookup(state->client_id_ht,
1404 hash_client_id(id),
1405 match_client_id,
1406 &id,
1407 &iter);
1408 node = cds_lfht_iter_get_node(&iter);
1409 if (!node) {
1410 goto end;
1411 }
1412
1413 client = caa_container_of(node, struct notification_client,
1414 client_id_ht_node);
1415 end:
1416 return client;
1417 }
1418
1419 static
1420 bool buffer_usage_condition_applies_to_channel(
1421 const struct lttng_condition *condition,
1422 const struct channel_info *channel_info)
1423 {
1424 enum lttng_condition_status status;
1425 enum lttng_domain_type condition_domain;
1426 const char *condition_session_name = NULL;
1427 const char *condition_channel_name = NULL;
1428
1429 status = lttng_condition_buffer_usage_get_domain_type(condition,
1430 &condition_domain);
1431 LTTNG_ASSERT(status == LTTNG_CONDITION_STATUS_OK);
1432 if (channel_info->key.domain != condition_domain) {
1433 goto fail;
1434 }
1435
1436 status = lttng_condition_buffer_usage_get_session_name(
1437 condition, &condition_session_name);
1438 LTTNG_ASSERT((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1439
1440 status = lttng_condition_buffer_usage_get_channel_name(
1441 condition, &condition_channel_name);
1442 LTTNG_ASSERT((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
1443
1444 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1445 goto fail;
1446 }
1447 if (strcmp(channel_info->name, condition_channel_name)) {
1448 goto fail;
1449 }
1450
1451 return true;
1452 fail:
1453 return false;
1454 }
1455
1456 static
1457 bool session_consumed_size_condition_applies_to_channel(
1458 const struct lttng_condition *condition,
1459 const struct channel_info *channel_info)
1460 {
1461 enum lttng_condition_status status;
1462 const char *condition_session_name = NULL;
1463
1464 status = lttng_condition_session_consumed_size_get_session_name(
1465 condition, &condition_session_name);
1466 LTTNG_ASSERT((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1467
1468 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1469 goto fail;
1470 }
1471
1472 return true;
1473 fail:
1474 return false;
1475 }
1476
1477 static
1478 bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
1479 const struct channel_info *channel_info)
1480 {
1481 const struct lttng_condition *condition;
1482 bool trigger_applies;
1483
1484 condition = lttng_trigger_get_const_condition(trigger);
1485 if (!condition) {
1486 goto fail;
1487 }
1488
1489 switch (lttng_condition_get_type(condition)) {
1490 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1491 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1492 trigger_applies = buffer_usage_condition_applies_to_channel(
1493 condition, channel_info);
1494 break;
1495 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1496 trigger_applies = session_consumed_size_condition_applies_to_channel(
1497 condition, channel_info);
1498 break;
1499 default:
1500 goto fail;
1501 }
1502
1503 return trigger_applies;
1504 fail:
1505 return false;
1506 }
1507
1508 /* Must be called with RCU read lock held. */
1509 static
1510 struct lttng_session_trigger_list *get_session_trigger_list(
1511 struct notification_thread_state *state,
1512 const char *session_name)
1513 {
1514 struct lttng_session_trigger_list *list = NULL;
1515 struct cds_lfht_node *node;
1516 struct cds_lfht_iter iter;
1517
1518 ASSERT_RCU_READ_LOCKED();
1519
1520 cds_lfht_lookup(state->session_triggers_ht,
1521 hash_key_str(session_name, lttng_ht_seed),
1522 match_session_trigger_list,
1523 session_name,
1524 &iter);
1525 node = cds_lfht_iter_get_node(&iter);
1526 if (!node) {
1527 /*
1528 * Not an error, the list of triggers applying to that session
1529 * will be initialized when the session is created.
1530 */
1531 DBG("No trigger list found for session \"%s\" as it is not yet known to the notification system",
1532 session_name);
1533 goto end;
1534 }
1535
1536 list = caa_container_of(node,
1537 struct lttng_session_trigger_list,
1538 session_triggers_ht_node);
1539 end:
1540 return list;
1541 }
1542
1543 /*
1544 * Allocate an empty lttng_session_trigger_list for the session named
1545 * 'session_name'.
1546 *
1547 * No ownership of 'session_name' is assumed by the session trigger list.
1548 * It is the caller's responsability to ensure the session name is alive
1549 * for as long as this list is.
1550 */
1551 static
1552 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
1553 const char *session_name,
1554 struct cds_lfht *session_triggers_ht)
1555 {
1556 struct lttng_session_trigger_list *list;
1557
1558 list = zmalloc<lttng_session_trigger_list>();
1559 if (!list) {
1560 goto end;
1561 }
1562 list->session_name = session_name;
1563 CDS_INIT_LIST_HEAD(&list->list);
1564 cds_lfht_node_init(&list->session_triggers_ht_node);
1565 list->session_triggers_ht = session_triggers_ht;
1566
1567 rcu_read_lock();
1568 /* Publish the list through the session_triggers_ht. */
1569 cds_lfht_add(session_triggers_ht,
1570 hash_key_str(session_name, lttng_ht_seed),
1571 &list->session_triggers_ht_node);
1572 rcu_read_unlock();
1573 end:
1574 return list;
1575 }
1576
1577 static
1578 void free_session_trigger_list_rcu(struct rcu_head *node)
1579 {
1580 free(caa_container_of(node, struct lttng_session_trigger_list,
1581 rcu_node));
1582 }
1583
1584 static
1585 void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list)
1586 {
1587 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1588
1589 /* Empty the list element by element, and then free the list itself. */
1590 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1591 &list->list, node) {
1592 cds_list_del(&trigger_list_element->node);
1593 free(trigger_list_element);
1594 }
1595 rcu_read_lock();
1596 /* Unpublish the list from the session_triggers_ht. */
1597 cds_lfht_del(list->session_triggers_ht,
1598 &list->session_triggers_ht_node);
1599 rcu_read_unlock();
1600 call_rcu(&list->rcu_node, free_session_trigger_list_rcu);
1601 }
1602
1603 static
1604 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
1605 struct lttng_trigger *trigger)
1606 {
1607 int ret = 0;
1608 struct lttng_trigger_list_element *new_element =
1609 zmalloc<lttng_trigger_list_element>();
1610
1611 if (!new_element) {
1612 ret = -1;
1613 goto end;
1614 }
1615 CDS_INIT_LIST_HEAD(&new_element->node);
1616 new_element->trigger = trigger;
1617 cds_list_add(&new_element->node, &list->list);
1618 end:
1619 return ret;
1620 }
1621
1622 static
1623 bool trigger_applies_to_session(const struct lttng_trigger *trigger,
1624 const char *session_name)
1625 {
1626 bool applies = false;
1627 const struct lttng_condition *condition;
1628
1629 condition = lttng_trigger_get_const_condition(trigger);
1630 switch (lttng_condition_get_type(condition)) {
1631 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1632 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1633 {
1634 enum lttng_condition_status condition_status;
1635 const char *condition_session_name;
1636
1637 condition_status = lttng_condition_session_rotation_get_session_name(
1638 condition, &condition_session_name);
1639 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
1640 ERR("Failed to retrieve session rotation condition's session name");
1641 goto end;
1642 }
1643
1644 LTTNG_ASSERT(condition_session_name);
1645 applies = !strcmp(condition_session_name, session_name);
1646 break;
1647 }
1648 default:
1649 goto end;
1650 }
1651 end:
1652 return applies;
1653 }
1654
1655 /*
1656 * Allocate and initialize an lttng_session_trigger_list which contains
1657 * all triggers that apply to the session named 'session_name'.
1658 *
1659 * No ownership of 'session_name' is assumed by the session trigger list.
1660 * It is the caller's responsability to ensure the session name is alive
1661 * for as long as this list is.
1662 */
1663 static
1664 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
1665 const struct notification_thread_state *state,
1666 const char *session_name)
1667 {
1668 int trigger_count = 0;
1669 struct lttng_session_trigger_list *session_trigger_list = NULL;
1670 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1671 struct cds_lfht_iter iter;
1672
1673 session_trigger_list = lttng_session_trigger_list_create(session_name,
1674 state->session_triggers_ht);
1675
1676 /* Add all triggers applying to the session named 'session_name'. */
1677 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1678 node) {
1679 int ret;
1680
1681 if (!trigger_applies_to_session(trigger_ht_element->trigger,
1682 session_name)) {
1683 continue;
1684 }
1685
1686 ret = lttng_session_trigger_list_add(session_trigger_list,
1687 trigger_ht_element->trigger);
1688 if (ret) {
1689 goto error;
1690 }
1691
1692 trigger_count++;
1693 }
1694
1695 DBG("Found %i triggers that apply to newly created session",
1696 trigger_count);
1697 return session_trigger_list;
1698 error:
1699 lttng_session_trigger_list_destroy(session_trigger_list);
1700 return NULL;
1701 }
1702
1703 static
1704 struct session_info *create_and_publish_session_info(struct notification_thread_state *state,
1705 uint64_t id,
1706 const char *name,
1707 uid_t uid,
1708 gid_t gid)
1709 {
1710 struct session_info *session = NULL;
1711 struct lttng_session_trigger_list *trigger_list;
1712
1713 rcu_read_lock();
1714 trigger_list = lttng_session_trigger_list_build(state, name);
1715 if (!trigger_list) {
1716 goto error;
1717 }
1718
1719 session = session_info_create(id, name, uid, gid, trigger_list,
1720 state->sessions_ht);
1721 if (!session) {
1722 ERR("Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1723 name, uid, gid);
1724 lttng_session_trigger_list_destroy(trigger_list);
1725 goto error;
1726 }
1727
1728 /* Transferred ownership to the new session. */
1729 trigger_list = NULL;
1730
1731 if (cds_lfht_add_unique(state->sessions_ht, hash_session_info(session), match_session_info,
1732 &id, &session->sessions_ht_node) != &session->sessions_ht_node) {
1733 ERR("Duplicate session found: name = `%s`, id = %" PRIu64, name, id);
1734 goto error;
1735 }
1736
1737 rcu_read_unlock();
1738 return session;
1739 error:
1740 rcu_read_unlock();
1741 session_info_put(session);
1742 return NULL;
1743 }
1744
1745 static
1746 int handle_notification_thread_command_add_channel(struct notification_thread_state *state,
1747 uint64_t session_id,
1748 const char *channel_name,
1749 enum lttng_domain_type channel_domain,
1750 uint64_t channel_key_int,
1751 uint64_t channel_capacity,
1752 enum lttng_error_code *cmd_result)
1753 {
1754 struct cds_list_head trigger_list;
1755 struct channel_info *new_channel_info = NULL;
1756 struct channel_key channel_key = {
1757 .key = channel_key_int,
1758 .domain = channel_domain,
1759 };
1760 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1761 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1762 int trigger_count = 0;
1763 struct cds_lfht_iter iter;
1764 struct session_info *session_info = NULL;
1765
1766 DBG("Adding channel: channel name = `%s`, session id = %" PRIu64 ", channel key = %" PRIu64 ", domain = %s",
1767 channel_name, session_id, channel_key_int,
1768 lttng_domain_type_str(channel_domain));
1769
1770 CDS_INIT_LIST_HEAD(&trigger_list);
1771
1772 session_info = get_session_info_by_id(state, session_id);
1773 if (!session_info) {
1774 /* Fatal logic error. */
1775 ERR("Failed to find session while adding channel: session id = %" PRIu64,
1776 session_id);
1777 goto error;
1778 }
1779
1780 new_channel_info = channel_info_create(channel_name, &channel_key,
1781 channel_capacity, session_info);
1782 if (!new_channel_info) {
1783 goto error;
1784 }
1785
1786 rcu_read_lock();
1787 /* Build a list of all triggers applying to the new channel. */
1788 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1789 node) {
1790 struct lttng_trigger_list_element *new_element;
1791
1792 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
1793 new_channel_info)) {
1794 continue;
1795 }
1796
1797 new_element = zmalloc<lttng_trigger_list_element>();
1798 if (!new_element) {
1799 rcu_read_unlock();
1800 goto error;
1801 }
1802 CDS_INIT_LIST_HEAD(&new_element->node);
1803 new_element->trigger = trigger_ht_element->trigger;
1804 cds_list_add(&new_element->node, &trigger_list);
1805 trigger_count++;
1806 }
1807 rcu_read_unlock();
1808
1809 DBG("Found %i triggers that apply to newly added channel",
1810 trigger_count);
1811 channel_trigger_list = zmalloc<lttng_channel_trigger_list>();
1812 if (!channel_trigger_list) {
1813 goto error;
1814 }
1815 channel_trigger_list->channel_key = new_channel_info->key;
1816 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1817 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1818 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1819
1820 rcu_read_lock();
1821 /* Add channel to the channel_ht which owns the channel_infos. */
1822 cds_lfht_add(state->channels_ht,
1823 hash_channel_key(&new_channel_info->key),
1824 &new_channel_info->channels_ht_node);
1825 /*
1826 * Add the list of triggers associated with this channel to the
1827 * channel_triggers_ht.
1828 */
1829 cds_lfht_add(state->channel_triggers_ht,
1830 hash_channel_key(&new_channel_info->key),
1831 &channel_trigger_list->channel_triggers_ht_node);
1832 rcu_read_unlock();
1833 session_info_put(session_info);
1834 *cmd_result = LTTNG_OK;
1835 return 0;
1836 error:
1837 channel_info_destroy(new_channel_info);
1838 session_info_put(session_info);
1839 return 1;
1840 }
1841
1842 static
1843 int handle_notification_thread_command_add_session(struct notification_thread_state *state,
1844 uint64_t session_id,
1845 const char *session_name,
1846 uid_t session_uid,
1847 gid_t session_gid,
1848 enum lttng_error_code *cmd_result)
1849 {
1850 int ret;
1851
1852 DBG("Adding session: session name = `%s`, session id = %" PRIu64 ", session uid = %d, session gid = %d",
1853 session_name, session_id, session_uid, session_gid);
1854
1855 auto session = create_and_publish_session_info(state, session_id, session_name, session_uid, session_gid);
1856 if (!session) {
1857 PERROR("Failed to add session: session name = `%s`, session id = %" PRIu64 ", session uid = %d, session gid = %d",
1858 session_name, session_id, session_uid, session_gid);
1859 ret = -1;
1860 *cmd_result = LTTNG_ERR_NOMEM;
1861 goto end;
1862 }
1863
1864 /*
1865 * Note that the reference to `session` is not released; this reference is
1866 * the "global" reference that is used to allow look-ups. This reference will
1867 * only be released when the session is removed. See
1868 * handle_notification_thread_command_remove_session.
1869 */
1870 ret = 0;
1871 *cmd_result = LTTNG_OK;
1872 end:
1873 return ret;
1874 }
1875
1876 static
1877 int handle_notification_thread_command_remove_session(
1878 struct notification_thread_state *state,
1879 uint64_t session_id,
1880 enum lttng_error_code *cmd_result)
1881 {
1882 int ret;
1883
1884 DBG("Removing session: session id = %" PRIu64, session_id);
1885
1886 auto session = get_session_info_by_id(state, session_id);
1887 if (!session) {
1888 ERR("Failed to remove session: session id = %" PRIu64, session_id);
1889 ret = -1;
1890 *cmd_result = LTTNG_ERR_NO_SESSION;
1891 goto end;
1892 }
1893
1894 /* Release the reference returned by the look-up, and then release the global reference. */
1895 session_info_put(session);
1896 session_info_put(session);
1897 ret = 0;
1898 *cmd_result = LTTNG_OK;
1899 end:
1900 return ret;
1901 }
1902
1903 static
1904 void free_channel_trigger_list_rcu(struct rcu_head *node)
1905 {
1906 free(caa_container_of(node, struct lttng_channel_trigger_list,
1907 rcu_node));
1908 }
1909
1910 static
1911 void free_channel_state_sample_rcu(struct rcu_head *node)
1912 {
1913 free(caa_container_of(node, struct channel_state_sample,
1914 rcu_node));
1915 }
1916
1917 static
1918 int handle_notification_thread_command_remove_channel(
1919 struct notification_thread_state *state,
1920 uint64_t channel_key, enum lttng_domain_type domain,
1921 enum lttng_error_code *cmd_result)
1922 {
1923 struct cds_lfht_node *node;
1924 struct cds_lfht_iter iter;
1925 struct lttng_channel_trigger_list *trigger_list;
1926 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1927 struct channel_key key = { .key = channel_key, .domain = domain };
1928 struct channel_info *channel_info;
1929
1930 DBG("Removing channel key = %" PRIu64 " in %s domain",
1931 channel_key, lttng_domain_type_str(domain));
1932
1933 rcu_read_lock();
1934
1935 cds_lfht_lookup(state->channel_triggers_ht,
1936 hash_channel_key(&key),
1937 match_channel_trigger_list,
1938 &key,
1939 &iter);
1940 node = cds_lfht_iter_get_node(&iter);
1941 /*
1942 * There is a severe internal error if we are being asked to remove a
1943 * channel that doesn't exist.
1944 */
1945 if (!node) {
1946 ERR("Channel being removed is unknown to the notification thread");
1947 goto end;
1948 }
1949
1950 /* Free the list of triggers associated with this channel. */
1951 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1952 channel_triggers_ht_node);
1953 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1954 &trigger_list->list, node) {
1955 cds_list_del(&trigger_list_element->node);
1956 free(trigger_list_element);
1957 }
1958 cds_lfht_del(state->channel_triggers_ht, node);
1959 call_rcu(&trigger_list->rcu_node, free_channel_trigger_list_rcu);
1960
1961 /* Free sampled channel state. */
1962 cds_lfht_lookup(state->channel_state_ht,
1963 hash_channel_key(&key),
1964 match_channel_state_sample,
1965 &key,
1966 &iter);
1967 node = cds_lfht_iter_get_node(&iter);
1968 /*
1969 * This is expected to be NULL if the channel is destroyed before we
1970 * received a sample.
1971 */
1972 if (node) {
1973 struct channel_state_sample *sample = caa_container_of(node,
1974 struct channel_state_sample,
1975 channel_state_ht_node);
1976
1977 cds_lfht_del(state->channel_state_ht, node);
1978 call_rcu(&sample->rcu_node, free_channel_state_sample_rcu);
1979 }
1980
1981 /* Remove the channel from the channels_ht and free it. */
1982 cds_lfht_lookup(state->channels_ht,
1983 hash_channel_key(&key),
1984 match_channel_info,
1985 &key,
1986 &iter);
1987 node = cds_lfht_iter_get_node(&iter);
1988 LTTNG_ASSERT(node);
1989 channel_info = caa_container_of(node, struct channel_info,
1990 channels_ht_node);
1991 cds_lfht_del(state->channels_ht, node);
1992 channel_info_destroy(channel_info);
1993 end:
1994 rcu_read_unlock();
1995 *cmd_result = LTTNG_OK;
1996 return 0;
1997 }
1998
1999 static
2000 int handle_notification_thread_command_session_rotation(
2001 struct notification_thread_state *state,
2002 enum notification_thread_command_type cmd_type,
2003 uint64_t session_id,
2004 uint64_t trace_archive_chunk_id,
2005 struct lttng_trace_archive_location *location,
2006 enum lttng_error_code *_cmd_result)
2007 {
2008 int ret = 0;
2009 enum lttng_error_code cmd_result = LTTNG_OK;
2010 struct lttng_session_trigger_list *trigger_list;
2011 struct lttng_trigger_list_element *trigger_list_element;
2012 struct session_info *session_info;
2013 struct lttng_credentials session_creds;
2014
2015 rcu_read_lock();
2016
2017 session_info = get_session_info_by_id(state, session_id);
2018 if (!session_info) {
2019 /* Fatal logic error. */
2020 ERR("Failed to find session while handling rotation state change: session id = %" PRIu64,
2021 session_id);
2022 ret = -1;
2023 cmd_result = LTTNG_ERR_FATAL;
2024 goto end;
2025 }
2026
2027 session_creds = {
2028 .uid = LTTNG_OPTIONAL_INIT_VALUE(session_info->uid),
2029 .gid = LTTNG_OPTIONAL_INIT_VALUE(session_info->gid),
2030 };
2031
2032 session_info->rotation.ongoing =
2033 cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
2034 session_info->rotation.id = trace_archive_chunk_id;
2035 trigger_list = get_session_trigger_list(state, session_info->name);
2036 if (!trigger_list) {
2037 DBG("No triggers apply to session: session name = `%s` ",
2038 session_info->name);
2039 goto end;
2040 }
2041
2042 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
2043 node) {
2044 const struct lttng_condition *condition;
2045 struct lttng_trigger *trigger;
2046 struct notification_client_list *client_list;
2047 struct lttng_evaluation *evaluation = NULL;
2048 enum lttng_condition_type condition_type;
2049 enum action_executor_status executor_status;
2050
2051 trigger = trigger_list_element->trigger;
2052 condition = lttng_trigger_get_const_condition(trigger);
2053 LTTNG_ASSERT(condition);
2054 condition_type = lttng_condition_get_type(condition);
2055
2056 if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING &&
2057 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
2058 continue;
2059 } else if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED &&
2060 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED) {
2061 continue;
2062 }
2063
2064 client_list = get_client_list_from_condition(state, condition);
2065 if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
2066 evaluation = lttng_evaluation_session_rotation_ongoing_create(
2067 trace_archive_chunk_id);
2068 } else {
2069 evaluation = lttng_evaluation_session_rotation_completed_create(
2070 trace_archive_chunk_id, location);
2071 }
2072
2073 if (!evaluation) {
2074 /* Internal error */
2075 ret = -1;
2076 cmd_result = LTTNG_ERR_UNK;
2077 goto put_list;
2078 }
2079
2080 /*
2081 * Ownership of `evaluation` transferred to the action executor
2082 * no matter the result.
2083 */
2084 executor_status = action_executor_enqueue_trigger(
2085 state->executor, trigger, evaluation,
2086 &session_creds, client_list);
2087 evaluation = NULL;
2088 switch (executor_status) {
2089 case ACTION_EXECUTOR_STATUS_OK:
2090 break;
2091 case ACTION_EXECUTOR_STATUS_ERROR:
2092 case ACTION_EXECUTOR_STATUS_INVALID:
2093 /*
2094 * TODO Add trigger identification (name/id) when
2095 * it is added to the API.
2096 */
2097 ERR("Fatal error occurred while enqueuing action associated with session rotation trigger");
2098 ret = -1;
2099 goto put_list;
2100 case ACTION_EXECUTOR_STATUS_OVERFLOW:
2101 /*
2102 * TODO Add trigger identification (name/id) when
2103 * it is added to the API.
2104 *
2105 * Not a fatal error.
2106 */
2107 WARN("No space left when enqueuing action associated with session rotation trigger");
2108 ret = 0;
2109 goto put_list;
2110 default:
2111 abort();
2112 }
2113
2114 put_list:
2115 notification_client_list_put(client_list);
2116 if (caa_unlikely(ret)) {
2117 break;
2118 }
2119 }
2120 end:
2121 session_info_put(session_info);
2122 *_cmd_result = cmd_result;
2123 rcu_read_unlock();
2124 return ret;
2125 }
2126
2127 static
2128 int handle_notification_thread_command_add_tracer_event_source(
2129 struct notification_thread_state *state,
2130 int tracer_event_source_fd,
2131 enum lttng_domain_type domain_type,
2132 enum lttng_error_code *_cmd_result)
2133 {
2134 int ret = 0;
2135 enum lttng_error_code cmd_result = LTTNG_OK;
2136 struct notification_event_tracer_event_source_element *element = NULL;
2137
2138 element = zmalloc<notification_event_tracer_event_source_element>();
2139 if (!element) {
2140 cmd_result = LTTNG_ERR_NOMEM;
2141 ret = -1;
2142 goto end;
2143 }
2144
2145 element->fd = tracer_event_source_fd;
2146 element->domain = domain_type;
2147
2148 cds_list_add(&element->node, &state->tracer_event_sources_list);
2149
2150 DBG3("Adding tracer event source fd to poll set: tracer_event_source_fd = %d, domain = '%s'",
2151 tracer_event_source_fd,
2152 lttng_domain_type_str(domain_type));
2153
2154 /* Adding the read side pipe to the event poll. */
2155 ret = lttng_poll_add(&state->events, tracer_event_source_fd, LPOLLPRI | LPOLLIN | LPOLLERR);
2156 if (ret < 0) {
2157 ERR("Failed to add tracer event source to poll set: tracer_event_source_fd = %d, domain = '%s'",
2158 tracer_event_source_fd,
2159 lttng_domain_type_str(element->domain));
2160 cds_list_del(&element->node);
2161 free(element);
2162 goto end;
2163 }
2164
2165 element->is_fd_in_poll_set = true;
2166
2167 end:
2168 *_cmd_result = cmd_result;
2169 return ret;
2170 }
2171
2172 static
2173 int drain_event_notifier_notification_pipe(
2174 struct notification_thread_state *state,
2175 int pipe, enum lttng_domain_type domain)
2176 {
2177 struct lttng_poll_event events = {};
2178 int ret;
2179
2180 ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC);
2181 if (ret < 0) {
2182 ERR("Error creating lttng_poll_event");
2183 goto end;
2184 }
2185
2186 ret = lttng_poll_add(&events, pipe, LPOLLIN);
2187 if (ret < 0) {
2188 ERR("Error adding fd event notifier notification pipe to lttng_poll_event: fd = %d",
2189 pipe);
2190 goto end;
2191 }
2192
2193 while (true) {
2194 /*
2195 * Continue to consume notifications as long as there are new
2196 * ones coming in. The tracer has been asked to stop producing
2197 * them.
2198 *
2199 * LPOLLIN is explicitly checked since LPOLLHUP is implicitly
2200 * monitored (on Linux, at least) and will be returned when
2201 * the pipe is closed but empty.
2202 */
2203 ret = lttng_poll_wait_interruptible(&events, 0);
2204 if (ret == 0 || (LTTNG_POLL_GETEV(&events, 0) & LPOLLIN) == 0) {
2205 /* No more notification to be read on this pipe. */
2206 ret = 0;
2207 goto end;
2208 } else if (ret < 0) {
2209 PERROR("Failed on lttng_poll_wait_interruptible() call");
2210 ret = -1;
2211 goto end;
2212 }
2213
2214 ret = handle_one_event_notifier_notification(state, pipe, domain);
2215 if (ret) {
2216 ERR("Error consuming an event notifier notification from pipe: fd = %d",
2217 pipe);
2218 }
2219 }
2220 end:
2221 lttng_poll_clean(&events);
2222 return ret;
2223 }
2224
2225 static
2226 struct notification_event_tracer_event_source_element *
2227 find_tracer_event_source_element(struct notification_thread_state *state,
2228 int tracer_event_source_fd)
2229 {
2230 struct notification_event_tracer_event_source_element *source_element;
2231
2232 cds_list_for_each_entry(source_element,
2233 &state->tracer_event_sources_list, node) {
2234 if (source_element->fd == tracer_event_source_fd) {
2235 goto end;
2236 }
2237 }
2238
2239 source_element = NULL;
2240 end:
2241 return source_element;
2242 }
2243
2244 static
2245 int remove_tracer_event_source_from_pollset(
2246 struct notification_thread_state *state,
2247 struct notification_event_tracer_event_source_element *source_element)
2248 {
2249 int ret = 0;
2250
2251 LTTNG_ASSERT(source_element->is_fd_in_poll_set);
2252
2253 DBG3("Removing tracer event source from poll set: tracer_event_source_fd = %d, domain = '%s'",
2254 source_element->fd,
2255 lttng_domain_type_str(source_element->domain));
2256
2257 /* Removing the fd from the event poll set. */
2258 ret = lttng_poll_del(&state->events, source_element->fd);
2259 if (ret < 0) {
2260 ERR("Failed to remove tracer event source from poll set: tracer_event_source_fd = %d, domain = '%s'",
2261 source_element->fd,
2262 lttng_domain_type_str(source_element->domain));
2263 ret = -1;
2264 goto end;
2265 }
2266
2267 source_element->is_fd_in_poll_set = false;
2268
2269 /*
2270 * Force the notification thread to restart the poll() loop to ensure
2271 * that any events from the removed fd are removed.
2272 */
2273 state->restart_poll = true;
2274
2275 ret = drain_event_notifier_notification_pipe(state, source_element->fd,
2276 source_element->domain);
2277 if (ret) {
2278 ERR("Error draining event notifier notification: tracer_event_source_fd = %d, domain = %s",
2279 source_element->fd,
2280 lttng_domain_type_str(source_element->domain));
2281 ret = -1;
2282 goto end;
2283 }
2284
2285 end:
2286 return ret;
2287 }
2288
2289 int handle_notification_thread_tracer_event_source_died(
2290 struct notification_thread_state *state,
2291 int tracer_event_source_fd)
2292 {
2293 int ret = 0;
2294 struct notification_event_tracer_event_source_element *source_element;
2295
2296 source_element = find_tracer_event_source_element(state,
2297 tracer_event_source_fd);
2298
2299 LTTNG_ASSERT(source_element);
2300
2301 ret = remove_tracer_event_source_from_pollset(state, source_element);
2302 if (ret) {
2303 ERR("Failed to remove dead tracer event source from poll set");
2304 }
2305
2306 return ret;
2307 }
2308
2309 static
2310 int handle_notification_thread_command_remove_tracer_event_source(
2311 struct notification_thread_state *state,
2312 int tracer_event_source_fd,
2313 enum lttng_error_code *_cmd_result)
2314 {
2315 int ret = 0;
2316 enum lttng_error_code cmd_result = LTTNG_OK;
2317 struct notification_event_tracer_event_source_element *source_element = NULL;
2318
2319 source_element = find_tracer_event_source_element(state,
2320 tracer_event_source_fd);
2321
2322 LTTNG_ASSERT(source_element);
2323
2324 /* Remove the tracer source from the list. */
2325 cds_list_del(&source_element->node);
2326
2327 if (!source_element->is_fd_in_poll_set) {
2328 /* Skip the poll set removal. */
2329 goto end;
2330 }
2331
2332 ret = remove_tracer_event_source_from_pollset(state, source_element);
2333 if (ret) {
2334 ERR("Failed to remove tracer event source from poll set");
2335 cmd_result = LTTNG_ERR_FATAL;
2336 }
2337
2338 end:
2339 free(source_element);
2340 *_cmd_result = cmd_result;
2341 return ret;
2342 }
2343
2344 static int handle_notification_thread_command_list_triggers(
2345 struct notification_thread_handle *handle __attribute__((unused)),
2346 struct notification_thread_state *state,
2347 uid_t client_uid,
2348 struct lttng_triggers **triggers,
2349 enum lttng_error_code *_cmd_result)
2350 {
2351 int ret = 0;
2352 enum lttng_error_code cmd_result = LTTNG_OK;
2353 struct cds_lfht_iter iter;
2354 struct lttng_trigger_ht_element *trigger_ht_element;
2355 struct lttng_triggers *local_triggers = NULL;
2356 const struct lttng_credentials *creds;
2357
2358 rcu_read_lock();
2359
2360 local_triggers = lttng_triggers_create();
2361 if (!local_triggers) {
2362 /* Not a fatal error. */
2363 cmd_result = LTTNG_ERR_NOMEM;
2364 goto end;
2365 }
2366
2367 cds_lfht_for_each_entry(state->triggers_ht, &iter,
2368 trigger_ht_element, node) {
2369 /*
2370 * Only return the triggers to which the client has access.
2371 * The root user has visibility over all triggers.
2372 */
2373 creds = lttng_trigger_get_credentials(trigger_ht_element->trigger);
2374 if (client_uid != lttng_credentials_get_uid(creds) && client_uid != 0) {
2375 continue;
2376 }
2377
2378 ret = lttng_triggers_add(local_triggers,
2379 trigger_ht_element->trigger);
2380 if (ret < 0) {
2381 /* Not a fatal error. */
2382 ret = 0;
2383 cmd_result = LTTNG_ERR_NOMEM;
2384 goto end;
2385 }
2386 }
2387
2388 /* Transferring ownership to the caller. */
2389 *triggers = local_triggers;
2390 local_triggers = NULL;
2391
2392 end:
2393 rcu_read_unlock();
2394 lttng_triggers_destroy(local_triggers);
2395 *_cmd_result = cmd_result;
2396 return ret;
2397 }
2398
2399 static inline void get_trigger_info_for_log(const struct lttng_trigger *trigger,
2400 const char **trigger_name,
2401 uid_t *trigger_owner_uid)
2402 {
2403 enum lttng_trigger_status trigger_status;
2404
2405 trigger_status = lttng_trigger_get_name(trigger, trigger_name);
2406 switch (trigger_status) {
2407 case LTTNG_TRIGGER_STATUS_OK:
2408 break;
2409 case LTTNG_TRIGGER_STATUS_UNSET:
2410 *trigger_name = "(anonymous)";
2411 break;
2412 default:
2413 abort();
2414 }
2415
2416 trigger_status = lttng_trigger_get_owner_uid(trigger,
2417 trigger_owner_uid);
2418 LTTNG_ASSERT(trigger_status == LTTNG_TRIGGER_STATUS_OK);
2419 }
2420
2421 static int handle_notification_thread_command_get_trigger(
2422 struct notification_thread_state *state,
2423 const struct lttng_trigger *trigger,
2424 struct lttng_trigger **registered_trigger,
2425 enum lttng_error_code *_cmd_result)
2426 {
2427 int ret = -1;
2428 struct cds_lfht_iter iter;
2429 struct lttng_trigger_ht_element *trigger_ht_element;
2430 enum lttng_error_code cmd_result = LTTNG_ERR_TRIGGER_NOT_FOUND;
2431 const char *trigger_name;
2432 uid_t trigger_owner_uid;
2433
2434 rcu_read_lock();
2435
2436 cds_lfht_for_each_entry(
2437 state->triggers_ht, &iter, trigger_ht_element, node) {
2438 if (lttng_trigger_is_equal(
2439 trigger, trigger_ht_element->trigger)) {
2440 /* Take one reference on the return trigger. */
2441 *registered_trigger = trigger_ht_element->trigger;
2442 lttng_trigger_get(*registered_trigger);
2443 ret = 0;
2444 cmd_result = LTTNG_OK;
2445 goto end;
2446 }
2447 }
2448
2449 /* Not a fatal error if the trigger is not found. */
2450 get_trigger_info_for_log(trigger, &trigger_name, &trigger_owner_uid);
2451 DBG("Failed to retrieve registered version of trigger: trigger name = '%s', trigger owner uid = %d",
2452 trigger_name, (int) trigger_owner_uid);
2453
2454 ret = 0;
2455
2456 end:
2457 rcu_read_unlock();
2458 *_cmd_result = cmd_result;
2459 return ret;
2460 }
2461
2462 static
2463 bool condition_is_supported(struct lttng_condition *condition)
2464 {
2465 bool is_supported;
2466
2467 switch (lttng_condition_get_type(condition)) {
2468 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
2469 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
2470 {
2471 int ret;
2472 enum lttng_domain_type domain;
2473
2474 ret = lttng_condition_buffer_usage_get_domain_type(condition,
2475 &domain);
2476 LTTNG_ASSERT(ret == 0);
2477
2478 if (domain != LTTNG_DOMAIN_KERNEL) {
2479 is_supported = true;
2480 goto end;
2481 }
2482
2483 /*
2484 * Older kernel tracers don't expose the API to monitor their
2485 * buffers. Therefore, we reject triggers that require that
2486 * mechanism to be available to be evaluated.
2487 *
2488 * Assume unsupported on error.
2489 */
2490 is_supported = kernel_supports_ring_buffer_snapshot_sample_positions() == 1;
2491 break;
2492 }
2493 case LTTNG_CONDITION_TYPE_EVENT_RULE_MATCHES:
2494 {
2495 const struct lttng_event_rule *event_rule;
2496 enum lttng_domain_type domain;
2497 const enum lttng_condition_status status =
2498 lttng_condition_event_rule_matches_get_rule(
2499 condition, &event_rule);
2500
2501 LTTNG_ASSERT(status == LTTNG_CONDITION_STATUS_OK);
2502
2503 domain = lttng_event_rule_get_domain_type(event_rule);
2504 if (domain != LTTNG_DOMAIN_KERNEL) {
2505 is_supported = true;
2506 goto end;
2507 }
2508
2509 /*
2510 * Older kernel tracers can't emit notification. Therefore, we
2511 * reject triggers that require that mechanism to be available
2512 * to be evaluated.
2513 *
2514 * Assume unsupported on error.
2515 */
2516 is_supported = kernel_supports_event_notifiers() == 1;
2517 break;
2518 }
2519 default:
2520 is_supported = true;
2521 }
2522 end:
2523 return is_supported;
2524 }
2525
2526 /* Must be called with RCU read lock held. */
2527 static
2528 int bind_trigger_to_matching_session(struct lttng_trigger *trigger,
2529 struct notification_thread_state *state)
2530 {
2531 int ret = 0;
2532 const struct lttng_condition *condition;
2533 const char *session_name;
2534 struct lttng_session_trigger_list *trigger_list;
2535
2536 ASSERT_RCU_READ_LOCKED();
2537
2538 condition = lttng_trigger_get_const_condition(trigger);
2539 switch (lttng_condition_get_type(condition)) {
2540 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
2541 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
2542 {
2543 enum lttng_condition_status status;
2544
2545 status = lttng_condition_session_rotation_get_session_name(
2546 condition, &session_name);
2547 if (status != LTTNG_CONDITION_STATUS_OK) {
2548 ERR("Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
2549 ret = -1;
2550 goto end;
2551 }
2552 break;
2553 }
2554 default:
2555 ret = -1;
2556 goto end;
2557 }
2558
2559 trigger_list = get_session_trigger_list(state, session_name);
2560 if (!trigger_list) {
2561 DBG("Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
2562 session_name);
2563 goto end;
2564
2565 }
2566
2567 DBG("Newly registered trigger bound to session \"%s\"",
2568 session_name);
2569 ret = lttng_session_trigger_list_add(trigger_list, trigger);
2570 end:
2571 return ret;
2572 }
2573
2574 /* Must be called with RCU read lock held. */
2575 static
2576 int bind_trigger_to_matching_channels(struct lttng_trigger *trigger,
2577 struct notification_thread_state *state)
2578 {
2579 int ret = 0;
2580 struct cds_lfht_node *node;
2581 struct cds_lfht_iter iter;
2582 struct channel_info *channel;
2583
2584 ASSERT_RCU_READ_LOCKED();
2585
2586 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
2587 channels_ht_node) {
2588 struct lttng_trigger_list_element *trigger_list_element;
2589 struct lttng_channel_trigger_list *trigger_list;
2590 struct cds_lfht_iter lookup_iter;
2591
2592 if (!trigger_applies_to_channel(trigger, channel)) {
2593 continue;
2594 }
2595
2596 cds_lfht_lookup(state->channel_triggers_ht,
2597 hash_channel_key(&channel->key),
2598 match_channel_trigger_list,
2599 &channel->key,
2600 &lookup_iter);
2601 node = cds_lfht_iter_get_node(&lookup_iter);
2602 LTTNG_ASSERT(node);
2603 trigger_list = caa_container_of(node,
2604 struct lttng_channel_trigger_list,
2605 channel_triggers_ht_node);
2606
2607 trigger_list_element = zmalloc<lttng_trigger_list_element>();
2608 if (!trigger_list_element) {
2609 ret = -1;
2610 goto end;
2611 }
2612 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
2613 trigger_list_element->trigger = trigger;
2614 cds_list_add(&trigger_list_element->node, &trigger_list->list);
2615 DBG("Newly registered trigger bound to channel \"%s\"",
2616 channel->name);
2617 }
2618 end:
2619 return ret;
2620 }
2621
2622 static
2623 bool is_trigger_action_notify(const struct lttng_trigger *trigger)
2624 {
2625 bool is_notify = false;
2626 unsigned int i, count;
2627 enum lttng_action_status action_status;
2628 const struct lttng_action *action =
2629 lttng_trigger_get_const_action(trigger);
2630 enum lttng_action_type action_type;
2631
2632 LTTNG_ASSERT(action);
2633 action_type = lttng_action_get_type(action);
2634 if (action_type == LTTNG_ACTION_TYPE_NOTIFY) {
2635 is_notify = true;
2636 goto end;
2637 } else if (action_type != LTTNG_ACTION_TYPE_LIST) {
2638 goto end;
2639 }
2640
2641 action_status = lttng_action_list_get_count(action, &count);
2642 LTTNG_ASSERT(action_status == LTTNG_ACTION_STATUS_OK);
2643
2644 for (i = 0; i < count; i++) {
2645 const struct lttng_action *inner_action =
2646 lttng_action_list_get_at_index(
2647 action, i);
2648
2649 action_type = lttng_action_get_type(inner_action);
2650 if (action_type == LTTNG_ACTION_TYPE_NOTIFY) {
2651 is_notify = true;
2652 goto end;
2653 }
2654 }
2655
2656 end:
2657 return is_notify;
2658 }
2659
2660 static bool trigger_name_taken(struct notification_thread_state *state,
2661 const struct lttng_trigger *trigger)
2662 {
2663 struct cds_lfht_iter iter;
2664
2665 /*
2666 * No duplicata is allowed in the triggers_by_name_uid_ht.
2667 * The match is done against the trigger name and uid.
2668 */
2669 cds_lfht_lookup(state->triggers_by_name_uid_ht,
2670 hash_trigger_by_name_uid(trigger),
2671 match_trigger_by_name_uid,
2672 trigger,
2673 &iter);
2674 return !!cds_lfht_iter_get_node(&iter);
2675 }
2676
2677 static
2678 enum lttng_error_code generate_trigger_name(
2679 struct notification_thread_state *state,
2680 struct lttng_trigger *trigger, const char **name)
2681 {
2682 enum lttng_error_code ret_code = LTTNG_OK;
2683 bool taken = false;
2684 enum lttng_trigger_status status;
2685
2686 do {
2687 const int ret = lttng_trigger_generate_name(trigger,
2688 state->trigger_id.name_offset++);
2689 if (ret) {
2690 /* The only reason this can fail right now. */
2691 ret_code = LTTNG_ERR_NOMEM;
2692 break;
2693 }
2694
2695 status = lttng_trigger_get_name(trigger, name);
2696 LTTNG_ASSERT(status == LTTNG_TRIGGER_STATUS_OK);
2697
2698 taken = trigger_name_taken(state, trigger);
2699 } while (taken || state->trigger_id.name_offset == UINT64_MAX);
2700
2701 return ret_code;
2702 }
2703
2704 static inline
2705 void notif_thread_state_remove_trigger_ht_elem(
2706 struct notification_thread_state *state,
2707 struct lttng_trigger_ht_element *trigger_ht_element)
2708 {
2709 LTTNG_ASSERT(state);
2710 LTTNG_ASSERT(trigger_ht_element);
2711
2712 cds_lfht_del(state->triggers_ht, &trigger_ht_element->node);
2713 cds_lfht_del(state->triggers_by_name_uid_ht, &trigger_ht_element->node_by_name_uid);
2714 }
2715
2716 static
2717 enum lttng_error_code setup_tracer_notifier(
2718 struct notification_thread_state *state,
2719 struct lttng_trigger *trigger)
2720 {
2721 enum lttng_error_code ret;
2722 enum event_notifier_error_accounting_status error_accounting_status;
2723 struct cds_lfht_node *node;
2724 uint64_t error_counter_index = 0;
2725 struct lttng_condition *condition = lttng_trigger_get_condition(trigger);
2726 struct notification_trigger_tokens_ht_element *trigger_tokens_ht_element = NULL;
2727
2728 trigger_tokens_ht_element = zmalloc<notification_trigger_tokens_ht_element>();
2729 if (!trigger_tokens_ht_element) {
2730 ret = LTTNG_ERR_NOMEM;
2731 goto end;
2732 }
2733
2734 /* Add trigger token to the trigger_tokens_ht. */
2735 cds_lfht_node_init(&trigger_tokens_ht_element->node);
2736 trigger_tokens_ht_element->token = LTTNG_OPTIONAL_GET(trigger->tracer_token);
2737 trigger_tokens_ht_element->trigger = trigger;
2738
2739 node = cds_lfht_add_unique(state->trigger_tokens_ht,
2740 hash_key_u64(&trigger_tokens_ht_element->token, lttng_ht_seed),
2741 match_trigger_token,
2742 &trigger_tokens_ht_element->token,
2743 &trigger_tokens_ht_element->node);
2744 if (node != &trigger_tokens_ht_element->node) {
2745 ret = LTTNG_ERR_TRIGGER_EXISTS;
2746 goto error_free_ht_element;
2747 }
2748
2749 error_accounting_status = event_notifier_error_accounting_register_event_notifier(
2750 trigger, &error_counter_index);
2751 if (error_accounting_status != EVENT_NOTIFIER_ERROR_ACCOUNTING_STATUS_OK) {
2752 if (error_accounting_status == EVENT_NOTIFIER_ERROR_ACCOUNTING_STATUS_NO_INDEX_AVAILABLE) {
2753 DBG("Trigger list error accounting counter full.");
2754 ret = LTTNG_ERR_EVENT_NOTIFIER_ERROR_ACCOUNTING_FULL;
2755 } else {
2756 ERR("Error registering trigger for error accounting");
2757 ret = LTTNG_ERR_EVENT_NOTIFIER_REGISTRATION;
2758 }
2759
2760 goto error_remove_ht_element;
2761 }
2762
2763 lttng_condition_event_rule_matches_set_error_counter_index(
2764 condition, error_counter_index);
2765
2766 ret = LTTNG_OK;
2767 goto end;
2768
2769 error_remove_ht_element:
2770 cds_lfht_del(state->trigger_tokens_ht, &trigger_tokens_ht_element->node);
2771 error_free_ht_element:
2772 free(trigger_tokens_ht_element);
2773 end:
2774 return ret;
2775 }
2776
2777 /*
2778 * FIXME A client's credentials are not checked when registering a trigger.
2779 *
2780 * The effects of this are benign since:
2781 * - The client will succeed in registering the trigger, as it is valid,
2782 * - The trigger will, internally, be bound to the channel/session,
2783 * - The notifications will not be sent since the client's credentials
2784 * are checked against the channel at that moment.
2785 *
2786 * If this function returns a non-zero value, it means something is
2787 * fundamentally broken and the whole subsystem/thread will be torn down.
2788 *
2789 * If a non-fatal error occurs, just set the cmd_result to the appropriate
2790 * error code.
2791 */
2792 static
2793 int handle_notification_thread_command_register_trigger(
2794 struct notification_thread_state *state,
2795 struct lttng_trigger *trigger,
2796 bool is_trigger_anonymous,
2797 enum lttng_error_code *cmd_result)
2798 {
2799 int ret = 0;
2800 struct lttng_condition *condition;
2801 struct notification_client_list *client_list = NULL;
2802 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2803 struct cds_lfht_node *node;
2804 const char* trigger_name;
2805 bool free_trigger = true;
2806 struct lttng_evaluation *evaluation = NULL;
2807 struct lttng_credentials object_creds;
2808 uid_t object_uid;
2809 gid_t object_gid;
2810 enum action_executor_status executor_status;
2811 const uint64_t trigger_tracer_token =
2812 state->trigger_id.next_tracer_token++;
2813
2814 rcu_read_lock();
2815
2816 /* Set the trigger's tracer token. */
2817 lttng_trigger_set_tracer_token(trigger, trigger_tracer_token);
2818
2819 if (!is_trigger_anonymous) {
2820 if (lttng_trigger_get_name(trigger, &trigger_name) ==
2821 LTTNG_TRIGGER_STATUS_UNSET) {
2822 const enum lttng_error_code ret_code =
2823 generate_trigger_name(state, trigger,
2824 &trigger_name);
2825
2826 if (ret_code != LTTNG_OK) {
2827 /* Fatal error. */
2828 ret = -1;
2829 *cmd_result = ret_code;
2830 goto error;
2831 }
2832 } else if (trigger_name_taken(state, trigger)) {
2833 /* Not a fatal error. */
2834 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2835 ret = 0;
2836 goto error;
2837 }
2838 } else {
2839 trigger_name = "(anonymous)";
2840 }
2841
2842 condition = lttng_trigger_get_condition(trigger);
2843 LTTNG_ASSERT(condition);
2844
2845 /* Some conditions require tracers to implement a minimal ABI version. */
2846 if (!condition_is_supported(condition)) {
2847 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
2848 goto error;
2849 }
2850
2851 trigger_ht_element = zmalloc<lttng_trigger_ht_element>();
2852 if (!trigger_ht_element) {
2853 ret = -1;
2854 goto error;
2855 }
2856
2857 /* Add trigger to the trigger_ht. */
2858 cds_lfht_node_init(&trigger_ht_element->node);
2859 cds_lfht_node_init(&trigger_ht_element->node_by_name_uid);
2860 trigger_ht_element->trigger = trigger;
2861
2862 node = cds_lfht_add_unique(state->triggers_ht,
2863 lttng_condition_hash(condition),
2864 match_trigger,
2865 trigger,
2866 &trigger_ht_element->node);
2867 if (node != &trigger_ht_element->node) {
2868 /* Not a fatal error, simply report it to the client. */
2869 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2870 goto error_free_ht_element;
2871 }
2872
2873 node = cds_lfht_add_unique(state->triggers_by_name_uid_ht,
2874 hash_trigger_by_name_uid(trigger),
2875 match_trigger_by_name_uid,
2876 trigger,
2877 &trigger_ht_element->node_by_name_uid);
2878 if (node != &trigger_ht_element->node_by_name_uid) {
2879 /* Internal error: add to triggers_ht should have failed. */
2880 ret = -1;
2881 goto error_free_ht_element;
2882 }
2883
2884 /* From this point consider the trigger registered. */
2885 lttng_trigger_set_as_registered(trigger);
2886
2887 /*
2888 * Some triggers might need a tracer notifier depending on its
2889 * condition and actions.
2890 */
2891 if (lttng_trigger_needs_tracer_notifier(trigger)) {
2892 enum lttng_error_code error_code;
2893
2894 error_code = setup_tracer_notifier(state, trigger);
2895 if (error_code != LTTNG_OK) {
2896 notif_thread_state_remove_trigger_ht_elem(state,
2897 trigger_ht_element);
2898 if (error_code == LTTNG_ERR_NOMEM) {
2899 ret = -1;
2900 } else {
2901 *cmd_result = error_code;
2902 ret = 0;
2903 }
2904
2905 goto error_free_ht_element;
2906 }
2907 }
2908
2909 /*
2910 * The rest only applies to triggers that have a "notify" action.
2911 * It is not skipped as this is the only action type currently
2912 * supported.
2913 */
2914 if (is_trigger_action_notify(trigger)) {
2915 /*
2916 * Find or create the client list of this condition. It may
2917 * already be present if another trigger is already registered
2918 * with the same condition.
2919 */
2920 client_list = get_client_list_from_condition(state, condition);
2921 if (!client_list) {
2922 /*
2923 * No client list for this condition yet. We create new
2924 * one and build it up.
2925 */
2926 client_list = notification_client_list_create(state, condition);
2927 if (!client_list) {
2928 ERR("Error creating notification client list for trigger %s", trigger->name);
2929 goto error_free_ht_element;
2930 }
2931 }
2932
2933 CDS_INIT_LIST_HEAD(&trigger_ht_element->client_list_trigger_node);
2934
2935 pthread_mutex_lock(&client_list->lock);
2936 cds_list_add(&trigger_ht_element->client_list_trigger_node, &client_list->triggers_list);
2937 pthread_mutex_unlock(&client_list->lock);
2938 }
2939
2940 /*
2941 * Ownership of the trigger and of its wrapper was transfered to
2942 * the triggers_ht. Same for token ht element if necessary.
2943 */
2944 trigger_ht_element = NULL;
2945 free_trigger = false;
2946
2947 switch (get_condition_binding_object(condition)) {
2948 case LTTNG_OBJECT_TYPE_SESSION:
2949 /* Add the trigger to the list if it matches a known session. */
2950 ret = bind_trigger_to_matching_session(trigger, state);
2951 if (ret) {
2952 goto error_free_ht_element;
2953 }
2954 break;
2955 case LTTNG_OBJECT_TYPE_CHANNEL:
2956 /*
2957 * Add the trigger to list of triggers bound to the channels
2958 * currently known.
2959 */
2960 ret = bind_trigger_to_matching_channels(trigger, state);
2961 if (ret) {
2962 goto error_free_ht_element;
2963 }
2964 break;
2965 case LTTNG_OBJECT_TYPE_NONE:
2966 break;
2967 default:
2968 ERR("Unknown object type on which to bind a newly registered trigger was encountered");
2969 ret = -1;
2970 goto error_free_ht_element;
2971 }
2972
2973 /*
2974 * The new trigger's condition must be evaluated against the current
2975 * state.
2976 *
2977 * In the case of `notify` action, nothing preventing clients from
2978 * subscribing to a condition before the corresponding trigger is
2979 * registered, we have to evaluate this new condition right away.
2980 *
2981 * At some point, we were waiting for the next "evaluation" (e.g. on
2982 * reception of a channel sample) to evaluate this new condition, but
2983 * that was broken.
2984 *
2985 * The reason it was broken is that waiting for the next sample
2986 * does not allow us to properly handle transitions for edge-triggered
2987 * conditions.
2988 *
2989 * Consider this example: when we handle a new channel sample, we
2990 * evaluate each conditions twice: once with the previous state, and
2991 * again with the newest state. We then use those two results to
2992 * determine whether a state change happened: a condition was false and
2993 * became true. If a state change happened, we have to notify clients.
2994 *
2995 * Now, if a client subscribes to a given notification and registers
2996 * a trigger *after* that subscription, we have to make sure the
2997 * condition is evaluated at this point while considering only the
2998 * current state. Otherwise, the next evaluation cycle may only see
2999 * that the evaluations remain the same (true for samples n-1 and n) and
3000 * the client will never know that the condition has been met.
3001 */
3002 switch (get_condition_binding_object(condition)) {
3003 case LTTNG_OBJECT_TYPE_SESSION:
3004 ret = evaluate_session_condition_for_client(condition, state,
3005 &evaluation, &object_uid,
3006 &object_gid);
3007 LTTNG_OPTIONAL_SET(&object_creds.uid, object_uid);
3008 LTTNG_OPTIONAL_SET(&object_creds.gid, object_gid);
3009 break;
3010 case LTTNG_OBJECT_TYPE_CHANNEL:
3011 ret = evaluate_channel_condition_for_client(condition, state,
3012 &evaluation, &object_uid,
3013 &object_gid);
3014 LTTNG_OPTIONAL_SET(&object_creds.uid, object_uid);
3015 LTTNG_OPTIONAL_SET(&object_creds.gid, object_gid);
3016 break;
3017 case LTTNG_OBJECT_TYPE_NONE:
3018 ret = 0;
3019 break;
3020 case LTTNG_OBJECT_TYPE_UNKNOWN:
3021 default:
3022 ret = -1;
3023 break;
3024 }
3025
3026 if (ret) {
3027 /* Fatal error. */
3028 goto error_free_ht_element;
3029 }
3030
3031 DBG("Newly registered trigger's condition evaluated to %s",
3032 evaluation ? "true" : "false");
3033 if (!evaluation) {
3034 /* Evaluation yielded nothing. Normal exit. */
3035 ret = 0;
3036 goto success;
3037 }
3038
3039 /*
3040 * Ownership of `evaluation` transferred to the action executor
3041 * no matter the result.
3042 */
3043 executor_status = action_executor_enqueue_trigger(state->executor,
3044 trigger, evaluation, &object_creds, client_list);
3045 evaluation = NULL;
3046 switch (executor_status) {
3047 case ACTION_EXECUTOR_STATUS_OK:
3048 break;
3049 case ACTION_EXECUTOR_STATUS_ERROR:
3050 case ACTION_EXECUTOR_STATUS_INVALID:
3051 /*
3052 * TODO Add trigger identification (name/id) when
3053 * it is added to the API.
3054 */
3055 ERR("Fatal error occurred while enqueuing action associated to newly registered trigger");
3056 ret = -1;
3057 goto error_free_ht_element;
3058 case ACTION_EXECUTOR_STATUS_OVERFLOW:
3059 /*
3060 * TODO Add trigger identification (name/id) when
3061 * it is added to the API.
3062 *
3063 * Not a fatal error.
3064 */
3065 WARN("No space left when enqueuing action associated to newly registered trigger");
3066 ret = 0;
3067 goto success;
3068 default:
3069 abort();
3070 }
3071
3072 success:
3073 *cmd_result = LTTNG_OK;
3074 DBG("Registered trigger: name = `%s`, tracer token = %" PRIu64,
3075 trigger_name, trigger_tracer_token);
3076 goto end;
3077
3078 error_free_ht_element:
3079 if (trigger_ht_element) {
3080 /* Delayed removal due to RCU constraint on delete. */
3081 call_rcu(&trigger_ht_element->rcu_node,
3082 free_lttng_trigger_ht_element_rcu);
3083 }
3084 error:
3085 if (free_trigger) {
3086 /*
3087 * Other objects might have a reference to the trigger, mark it
3088 * as unregistered.
3089 */
3090 lttng_trigger_set_as_unregistered(trigger);
3091 lttng_trigger_destroy(trigger);
3092 }
3093 end:
3094 rcu_read_unlock();
3095 return ret;
3096 }
3097
3098 static
3099 void free_lttng_trigger_ht_element_rcu(struct rcu_head *node)
3100 {
3101 free(caa_container_of(node, struct lttng_trigger_ht_element,
3102 rcu_node));
3103 }
3104
3105 static
3106 void free_notification_trigger_tokens_ht_element_rcu(struct rcu_head *node)
3107 {
3108 free(caa_container_of(node, struct notification_trigger_tokens_ht_element,
3109 rcu_node));
3110 }
3111
3112 static
3113 void teardown_tracer_notifier(struct notification_thread_state *state,
3114 const struct lttng_trigger *trigger)
3115 {
3116 struct cds_lfht_iter iter;
3117 struct notification_trigger_tokens_ht_element *trigger_tokens_ht_element;
3118
3119 cds_lfht_for_each_entry(state->trigger_tokens_ht, &iter,
3120 trigger_tokens_ht_element, node) {
3121
3122 if (!lttng_trigger_is_equal(trigger,
3123 trigger_tokens_ht_element->trigger)) {
3124 continue;
3125 }
3126
3127 event_notifier_error_accounting_unregister_event_notifier(
3128 trigger_tokens_ht_element->trigger);
3129
3130 /* TODO talk to all app and remove it */
3131 DBG("Removed trigger from tokens_ht");
3132 cds_lfht_del(state->trigger_tokens_ht,
3133 &trigger_tokens_ht_element->node);
3134
3135 call_rcu(&trigger_tokens_ht_element->rcu_node,
3136 free_notification_trigger_tokens_ht_element_rcu);
3137
3138 break;
3139 }
3140 }
3141
3142 static
3143 int handle_notification_thread_command_unregister_trigger(
3144 struct notification_thread_state *state,
3145 const struct lttng_trigger *trigger,
3146 enum lttng_error_code *_cmd_reply)
3147 {
3148 struct cds_lfht_iter iter;
3149 struct cds_lfht_node *triggers_ht_node;
3150 struct lttng_channel_trigger_list *trigger_list;
3151 struct notification_client_list *client_list;
3152 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
3153 const struct lttng_condition *condition = lttng_trigger_get_const_condition(
3154 trigger);
3155 enum lttng_error_code cmd_reply;
3156
3157 rcu_read_lock();
3158
3159 cds_lfht_lookup(state->triggers_ht,
3160 lttng_condition_hash(condition),
3161 match_trigger,
3162 trigger,
3163 &iter);
3164 triggers_ht_node = cds_lfht_iter_get_node(&iter);
3165 if (!triggers_ht_node) {
3166 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
3167 goto end;
3168 } else {
3169 cmd_reply = LTTNG_OK;
3170 }
3171
3172 trigger_ht_element = caa_container_of(triggers_ht_node,
3173 struct lttng_trigger_ht_element, node);
3174
3175 /* Remove trigger from channel_triggers_ht. */
3176 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
3177 channel_triggers_ht_node) {
3178 struct lttng_trigger_list_element *trigger_element, *tmp;
3179
3180 cds_list_for_each_entry_safe(trigger_element, tmp,
3181 &trigger_list->list, node) {
3182 if (!lttng_trigger_is_equal(trigger, trigger_element->trigger)) {
3183 continue;
3184 }
3185
3186 DBG("Removed trigger from channel_triggers_ht");
3187 cds_list_del(&trigger_element->node);
3188 /* A trigger can only appear once per channel */
3189 break;
3190 }
3191 }
3192
3193 if (lttng_trigger_needs_tracer_notifier(trigger)) {
3194 teardown_tracer_notifier(state, trigger);
3195 }
3196
3197 if (is_trigger_action_notify(trigger)) {
3198 /*
3199 * Remove and release the client list from
3200 * notification_trigger_clients_ht.
3201 */
3202 client_list = get_client_list_from_condition(state, condition);
3203 LTTNG_ASSERT(client_list);
3204
3205 pthread_mutex_lock(&client_list->lock);
3206 cds_list_del(&trigger_ht_element->client_list_trigger_node);
3207 pthread_mutex_unlock(&client_list->lock);
3208
3209 /* Put new reference and the hashtable's reference. */
3210 notification_client_list_put(client_list);
3211 notification_client_list_put(client_list);
3212 client_list = NULL;
3213 }
3214
3215 /* Remove trigger from triggers_ht. */
3216 notif_thread_state_remove_trigger_ht_elem(state, trigger_ht_element);
3217
3218 /* Release the ownership of the trigger. */
3219 lttng_trigger_destroy(trigger_ht_element->trigger);
3220 call_rcu(&trigger_ht_element->rcu_node, free_lttng_trigger_ht_element_rcu);
3221 end:
3222 rcu_read_unlock();
3223 if (_cmd_reply) {
3224 *_cmd_reply = cmd_reply;
3225 }
3226 return 0;
3227 }
3228
3229 static
3230 int pop_cmd_queue(struct notification_thread_handle *handle,
3231 struct notification_thread_command **cmd)
3232 {
3233 int ret;
3234 uint64_t counter;
3235
3236 pthread_mutex_lock(&handle->cmd_queue.lock);
3237 ret = lttng_read(handle->cmd_queue.event_fd, &counter, sizeof(counter));
3238 if (ret != sizeof(counter)) {
3239 ret = -1;
3240 goto error_unlock;
3241 }
3242
3243 *cmd = cds_list_first_entry(&handle->cmd_queue.list,
3244 struct notification_thread_command, cmd_list_node);
3245 cds_list_del(&((*cmd)->cmd_list_node));
3246 ret = 0;
3247
3248 error_unlock:
3249 pthread_mutex_unlock(&handle->cmd_queue.lock);
3250 return ret;
3251 }
3252
3253 /* Returns 0 on success, 1 on exit requested, negative value on error. */
3254 int handle_notification_thread_command(
3255 struct notification_thread_handle *handle,
3256 struct notification_thread_state *state)
3257 {
3258 int ret;
3259 struct notification_thread_command *cmd;
3260
3261 ret = pop_cmd_queue(handle, &cmd);
3262 if (ret) {
3263 goto error;
3264 }
3265
3266 DBG("Received `%s` command",
3267 notification_command_type_str(cmd->type));
3268 switch (cmd->type) {
3269 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
3270 ret = handle_notification_thread_command_register_trigger(state,
3271 cmd->parameters.register_trigger.trigger,
3272 cmd->parameters.register_trigger.is_trigger_anonymous,
3273 &cmd->reply_code);
3274 break;
3275 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
3276 ret = handle_notification_thread_command_unregister_trigger(
3277 state,
3278 cmd->parameters.unregister_trigger.trigger,
3279 &cmd->reply_code);
3280 break;
3281 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
3282 ret = handle_notification_thread_command_add_channel(
3283 state,
3284 cmd->parameters.add_channel.session.id,
3285 cmd->parameters.add_channel.channel.name,
3286 cmd->parameters.add_channel.channel.domain,
3287 cmd->parameters.add_channel.channel.key,
3288 cmd->parameters.add_channel.channel.capacity,
3289 &cmd->reply_code);
3290 break;
3291 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
3292 ret = handle_notification_thread_command_remove_channel(
3293 state, cmd->parameters.remove_channel.key,
3294 cmd->parameters.remove_channel.domain,
3295 &cmd->reply_code);
3296 break;
3297 case NOTIFICATION_COMMAND_TYPE_ADD_SESSION:
3298 ret = handle_notification_thread_command_add_session(state,
3299 cmd->parameters.add_session.session_id,
3300 cmd->parameters.add_session.session_name,
3301 cmd->parameters.add_session.session_uid,
3302 cmd->parameters.add_session.session_gid, &cmd->reply_code);
3303 break;
3304 case NOTIFICATION_COMMAND_TYPE_REMOVE_SESSION:
3305 ret = handle_notification_thread_command_remove_session(
3306 state, cmd->parameters.remove_session.session_id, &cmd->reply_code);
3307 break;
3308 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
3309 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
3310 ret = handle_notification_thread_command_session_rotation(
3311 state,
3312 cmd->type,
3313 cmd->parameters.session_rotation.session_id,
3314 cmd->parameters.session_rotation.trace_archive_chunk_id,
3315 cmd->parameters.session_rotation.location,
3316 &cmd->reply_code);
3317 break;
3318 case NOTIFICATION_COMMAND_TYPE_ADD_TRACER_EVENT_SOURCE:
3319 ret = handle_notification_thread_command_add_tracer_event_source(
3320 state,
3321 cmd->parameters.tracer_event_source.tracer_event_source_fd,
3322 cmd->parameters.tracer_event_source.domain,
3323 &cmd->reply_code);
3324 break;
3325 case NOTIFICATION_COMMAND_TYPE_REMOVE_TRACER_EVENT_SOURCE:
3326 ret = handle_notification_thread_command_remove_tracer_event_source(
3327 state,
3328 cmd->parameters.tracer_event_source.tracer_event_source_fd,
3329 &cmd->reply_code);
3330 break;
3331 case NOTIFICATION_COMMAND_TYPE_LIST_TRIGGERS:
3332 {
3333 struct lttng_triggers *triggers = NULL;
3334
3335 ret = handle_notification_thread_command_list_triggers(
3336 handle,
3337 state,
3338 cmd->parameters.list_triggers.uid,
3339 &triggers,
3340 &cmd->reply_code);
3341 cmd->reply.list_triggers.triggers = triggers;
3342 ret = 0;
3343 break;
3344 }
3345 case NOTIFICATION_COMMAND_TYPE_QUIT:
3346 cmd->reply_code = LTTNG_OK;
3347 ret = 1;
3348 goto end;
3349 case NOTIFICATION_COMMAND_TYPE_GET_TRIGGER:
3350 {
3351 struct lttng_trigger *trigger = NULL;
3352
3353 ret = handle_notification_thread_command_get_trigger(state,
3354 cmd->parameters.get_trigger.trigger, &trigger,
3355 &cmd->reply_code);
3356 cmd->reply.get_trigger.trigger = trigger;
3357 break;
3358 }
3359 case NOTIFICATION_COMMAND_TYPE_CLIENT_COMMUNICATION_UPDATE:
3360 {
3361 const enum client_transmission_status client_status =
3362 cmd->parameters.client_communication_update
3363 .status;
3364 const notification_client_id client_id =
3365 cmd->parameters.client_communication_update.id;
3366 struct notification_client *client;
3367
3368 rcu_read_lock();
3369 client = get_client_from_id(client_id, state);
3370
3371 if (!client) {
3372 /*
3373 * Client error was probably already picked-up by the
3374 * notification thread or it has disconnected
3375 * gracefully while this command was queued.
3376 */
3377 DBG("Failed to find notification client to update communication status, client id = %" PRIu64,
3378 client_id);
3379 ret = 0;
3380 } else {
3381 ret = client_handle_transmission_status(
3382 client, client_status, state);
3383 }
3384 rcu_read_unlock();
3385 break;
3386 }
3387 default:
3388 ERR("Unknown internal command received");
3389 goto error_unlock;
3390 }
3391
3392 if (ret) {
3393 goto error_unlock;
3394 }
3395 end:
3396 if (cmd->is_async) {
3397 free(cmd);
3398 cmd = NULL;
3399 } else {
3400 lttng_waiter_wake_up(&cmd->reply_waiter);
3401 }
3402 return ret;
3403 error_unlock:
3404 /* Wake-up and return a fatal error to the calling thread. */
3405 lttng_waiter_wake_up(&cmd->reply_waiter);
3406 cmd->reply_code = LTTNG_ERR_FATAL;
3407 error:
3408 /* Indicate a fatal error to the caller. */
3409 return -1;
3410 }
3411
3412 static
3413 int socket_set_non_blocking(int socket)
3414 {
3415 int ret, flags;
3416
3417 /* Set the pipe as non-blocking. */
3418 ret = fcntl(socket, F_GETFL, 0);
3419 if (ret == -1) {
3420 PERROR("fcntl get socket flags");
3421 goto end;
3422 }
3423 flags = ret;
3424
3425 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
3426 if (ret == -1) {
3427 PERROR("fcntl set O_NONBLOCK socket flag");
3428 goto end;
3429 }
3430 DBG("Client socket (fd = %i) set as non-blocking", socket);
3431 end:
3432 return ret;
3433 }
3434
3435 static
3436 int client_reset_inbound_state(struct notification_client *client)
3437 {
3438 int ret;
3439
3440
3441 lttng_payload_clear(&client->communication.inbound.payload);
3442
3443 client->communication.inbound.bytes_to_receive =
3444 sizeof(struct lttng_notification_channel_message);
3445 client->communication.inbound.msg_type =
3446 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
3447 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
3448 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
3449 ret = lttng_dynamic_buffer_set_size(
3450 &client->communication.inbound.payload.buffer,
3451 client->communication.inbound.bytes_to_receive);
3452
3453 return ret;
3454 }
3455
3456 int handle_notification_thread_client_connect(
3457 struct notification_thread_state *state)
3458 {
3459 int ret;
3460 struct notification_client *client;
3461
3462 DBG("Handling new notification channel client connection");
3463
3464 client = zmalloc<notification_client>();
3465 if (!client) {
3466 /* Fatal error. */
3467 ret = -1;
3468 goto error;
3469 }
3470
3471 pthread_mutex_init(&client->lock, NULL);
3472 client->id = state->next_notification_client_id++;
3473 CDS_INIT_LIST_HEAD(&client->condition_list);
3474 lttng_payload_init(&client->communication.inbound.payload);
3475 lttng_payload_init(&client->communication.outbound.payload);
3476 client->communication.inbound.expect_creds = true;
3477
3478 ret = client_reset_inbound_state(client);
3479 if (ret) {
3480 ERR("Failed to reset client communication's inbound state");
3481 ret = 0;
3482 goto error;
3483 }
3484
3485 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
3486 if (ret < 0) {
3487 ERR("Failed to accept new notification channel client connection");
3488 ret = 0;
3489 goto error;
3490 }
3491
3492 client->socket = ret;
3493
3494 ret = socket_set_non_blocking(client->socket);
3495 if (ret) {
3496 ERR("Failed to set new notification channel client connection socket as non-blocking");
3497 goto error;
3498 }
3499
3500 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
3501 if (ret < 0) {
3502 ERR("Failed to set socket options on new notification channel client socket");
3503 ret = 0;
3504 goto error;
3505 }
3506
3507 client->communication.current_poll_events = CLIENT_POLL_EVENTS_IN;
3508 ret = lttng_poll_add(&state->events, client->socket,
3509 client->communication.current_poll_events);
3510 if (ret < 0) {
3511 ERR("Failed to add notification channel client socket to poll set");
3512 ret = 0;
3513 goto error;
3514 }
3515 DBG("Added new notification channel client socket (%i) to poll set",
3516 client->socket);
3517
3518 rcu_read_lock();
3519 cds_lfht_add(state->client_socket_ht,
3520 hash_client_socket(client->socket),
3521 &client->client_socket_ht_node);
3522 cds_lfht_add(state->client_id_ht,
3523 hash_client_id(client->id),
3524 &client->client_id_ht_node);
3525 rcu_read_unlock();
3526
3527 return ret;
3528
3529 error:
3530 notification_client_destroy(client);
3531 return ret;
3532 }
3533
3534 /*
3535 * RCU read-lock must be held by the caller.
3536 * Client lock must _not_ be held by the caller.
3537 */
3538 static
3539 int notification_thread_client_disconnect(
3540 struct notification_client *client,
3541 struct notification_thread_state *state)
3542 {
3543 int ret;
3544 struct lttng_condition_list_element *condition_list_element, *tmp;
3545
3546 ASSERT_RCU_READ_LOCKED();
3547
3548 /* Acquire the client lock to disable its communication atomically. */
3549 pthread_mutex_lock(&client->lock);
3550 client->communication.active = false;
3551 cds_lfht_del(state->client_socket_ht, &client->client_socket_ht_node);
3552 cds_lfht_del(state->client_id_ht, &client->client_id_ht_node);
3553 pthread_mutex_unlock(&client->lock);
3554
3555 ret = lttng_poll_del(&state->events, client->socket);
3556 if (ret) {
3557 ERR("Failed to remove client socket %d from poll set",
3558 client->socket);
3559 }
3560
3561 /* Release all conditions to which the client was subscribed. */
3562 cds_list_for_each_entry_safe(condition_list_element, tmp,
3563 &client->condition_list, node) {
3564 (void) notification_thread_client_unsubscribe(client,
3565 condition_list_element->condition, state, NULL);
3566 }
3567
3568 /*
3569 * Client no longer accessible to other threads (through the
3570 * client lists).
3571 */
3572 notification_client_destroy(client);
3573 return ret;
3574 }
3575
3576 int handle_notification_thread_client_disconnect(
3577 int client_socket, struct notification_thread_state *state)
3578 {
3579 int ret = 0;
3580 struct notification_client *client;
3581
3582 rcu_read_lock();
3583 DBG("Closing client connection (socket fd = %i)",
3584 client_socket);
3585 client = get_client_from_socket(client_socket, state);
3586 if (!client) {
3587 /* Internal state corruption, fatal error. */
3588 ERR("Unable to find client (socket fd = %i)",
3589 client_socket);
3590 ret = -1;
3591 goto end;
3592 }
3593
3594 ret = notification_thread_client_disconnect(client, state);
3595 end:
3596 rcu_read_unlock();
3597 return ret;
3598 }
3599
3600 int handle_notification_thread_client_disconnect_all(
3601 struct notification_thread_state *state)
3602 {
3603 struct cds_lfht_iter iter;
3604 struct notification_client *client;
3605 bool error_encoutered = false;
3606
3607 rcu_read_lock();
3608 DBG("Closing all client connections");
3609 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
3610 client_socket_ht_node) {
3611 int ret;
3612
3613 ret = notification_thread_client_disconnect(
3614 client, state);
3615 if (ret) {
3616 error_encoutered = true;
3617 }
3618 }
3619 rcu_read_unlock();
3620 return error_encoutered ? 1 : 0;
3621 }
3622
3623 int handle_notification_thread_trigger_unregister_all(
3624 struct notification_thread_state *state)
3625 {
3626 bool error_occurred = false;
3627 struct cds_lfht_iter iter;
3628 struct lttng_trigger_ht_element *trigger_ht_element;
3629
3630 rcu_read_lock();
3631 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
3632 node) {
3633 int ret = handle_notification_thread_command_unregister_trigger(
3634 state, trigger_ht_element->trigger, NULL);
3635 if (ret) {
3636 error_occurred = true;
3637 }
3638 }
3639 rcu_read_unlock();
3640 return error_occurred ? -1 : 0;
3641 }
3642
3643 static
3644 bool client_has_outbound_data_left(
3645 const struct notification_client *client)
3646 {
3647 const struct lttng_payload_view pv = lttng_payload_view_from_payload(
3648 &client->communication.outbound.payload, 0, -1);
3649 const bool has_data = pv.buffer.size != 0;
3650 const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv);
3651
3652 return has_data || has_fds;
3653 }
3654
3655 static
3656 int client_handle_transmission_status(
3657 struct notification_client *client,
3658 enum client_transmission_status transmission_status,
3659 struct notification_thread_state *state)
3660 {
3661 int ret = 0;
3662
3663 switch (transmission_status) {
3664 case CLIENT_TRANSMISSION_STATUS_COMPLETE:
3665 case CLIENT_TRANSMISSION_STATUS_QUEUED:
3666 {
3667 int current_poll_events;
3668 int new_poll_events;
3669 /*
3670 * We want to be notified whenever there is buffer space
3671 * available to send the rest of the payload if we are
3672 * waiting to send data to the client.
3673 *
3674 * The state of the outbound queue being sampled here is
3675 * fine since:
3676 * - it is okay to wake-up "for nothing" in case we see
3677 * that data is left, but another thread succeeds in
3678 * flushing it before us when handling the client "out"
3679 * event. We will simply stop monitoring that event the next
3680 * time it wakes us up and we see no data left to be sent,
3681 * - if another thread fails to flush the entire client
3682 * outgoing queue, it will issue a "communication update"
3683 * command and cause the client's (e)poll mask to be
3684 * re-evaluated.
3685 *
3686 * The situation we seek to avoid would be to disable the
3687 * monitoring of "out" client events indefinitely when there is
3688 * data to be sent, which can't happen because of the
3689 * aforementioned "communication update" mechanism.
3690 */
3691 pthread_mutex_lock(&client->lock);
3692 current_poll_events = client->communication.current_poll_events;
3693 new_poll_events = client_has_outbound_data_left(client) ?
3694 CLIENT_POLL_EVENTS_IN_OUT :
3695 CLIENT_POLL_EVENTS_IN;
3696 client->communication.current_poll_events = new_poll_events;
3697 pthread_mutex_unlock(&client->lock);
3698
3699 /* Update the monitored event set only if it changed. */
3700 if (current_poll_events != new_poll_events) {
3701 ret = lttng_poll_mod(&state->events, client->socket,
3702 new_poll_events);
3703 if (ret) {
3704 goto end;
3705 }
3706 }
3707
3708 break;
3709 }
3710 case CLIENT_TRANSMISSION_STATUS_FAIL:
3711 ret = notification_thread_client_disconnect(client, state);
3712 if (ret) {
3713 goto end;
3714 }
3715 break;
3716 case CLIENT_TRANSMISSION_STATUS_ERROR:
3717 ret = -1;
3718 goto end;
3719 default:
3720 abort();
3721 }
3722 end:
3723 return ret;
3724 }
3725
3726 /* Client lock must be acquired by caller. */
3727 static
3728 enum client_transmission_status client_flush_outgoing_queue(
3729 struct notification_client *client)
3730 {
3731 ssize_t ret;
3732 size_t to_send_count;
3733 enum client_transmission_status status;
3734 struct lttng_payload_view pv = lttng_payload_view_from_payload(
3735 &client->communication.outbound.payload, 0, -1);
3736 const int fds_to_send_count =
3737 lttng_payload_view_get_fd_handle_count(&pv);
3738
3739 ASSERT_LOCKED(client->lock);
3740
3741 if (!client->communication.active) {
3742 status = CLIENT_TRANSMISSION_STATUS_FAIL;
3743 goto end;
3744 }
3745
3746 if (pv.buffer.size == 0) {
3747 /*
3748 * If both data and fds are equal to zero, we are in an invalid
3749 * state.
3750 */
3751 LTTNG_ASSERT(fds_to_send_count != 0);
3752 goto send_fds;
3753 }
3754
3755 /* Send data. */
3756 to_send_count = pv.buffer.size;
3757 DBG("Flushing client (socket fd = %i) outgoing queue",
3758 client->socket);
3759
3760 ret = lttcomm_send_unix_sock_non_block(client->socket,
3761 pv.buffer.data,
3762 to_send_count);
3763 if ((ret >= 0 && ret < to_send_count)) {
3764 DBG("Client (socket fd = %i) outgoing queue could not be completely flushed",
3765 client->socket);
3766 to_send_count -= std::max(ret, (ssize_t) 0);
3767
3768 memmove(client->communication.outbound.payload.buffer.data,
3769 pv.buffer.data +
3770 pv.buffer.size - to_send_count,
3771 to_send_count);
3772 ret = lttng_dynamic_buffer_set_size(
3773 &client->communication.outbound.payload.buffer,
3774 to_send_count);
3775 if (ret) {
3776 goto error;
3777 }
3778
3779 status = CLIENT_TRANSMISSION_STATUS_QUEUED;
3780 goto end;
3781 } else if (ret < 0) {
3782 /* Generic error, disable the client's communication. */
3783 ERR("Failed to flush outgoing queue, disconnecting client (socket fd = %i)",
3784 client->socket);
3785 client->communication.active = false;
3786 status = CLIENT_TRANSMISSION_STATUS_FAIL;
3787 goto end;
3788 } else {
3789 /*
3790 * No error and flushed the queue completely.
3791 *
3792 * The payload buffer size is used later to
3793 * check if there is notifications queued. So albeit that the
3794 * direct caller knows that the transmission is complete, we
3795 * need to set the buffer size to zero.
3796 */
3797 ret = lttng_dynamic_buffer_set_size(
3798 &client->communication.outbound.payload.buffer, 0);
3799 if (ret) {
3800 goto error;
3801 }
3802 }
3803
3804 send_fds:
3805 /* No fds to send, transmission is complete. */
3806 if (fds_to_send_count == 0) {
3807 status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
3808 goto end;
3809 }
3810
3811 ret = lttcomm_send_payload_view_fds_unix_sock_non_block(
3812 client->socket, &pv);
3813 if (ret < 0) {
3814 /* Generic error, disable the client's communication. */
3815 ERR("Failed to flush outgoing fds queue, disconnecting client (socket fd = %i)",
3816 client->socket);
3817 client->communication.active = false;
3818 status = CLIENT_TRANSMISSION_STATUS_FAIL;
3819 goto end;
3820 } else if (ret == 0) {
3821 /* Nothing could be sent. */
3822 status = CLIENT_TRANSMISSION_STATUS_QUEUED;
3823 } else {
3824 /* Fd passing is an all or nothing kind of thing. */
3825 status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
3826 /*
3827 * The payload _fd_array count is used later to
3828 * check if there is notifications queued. So although the
3829 * direct caller knows that the transmission is complete, we
3830 * need to clear the _fd_array for the queuing check.
3831 */
3832 lttng_dynamic_pointer_array_clear(
3833 &client->communication.outbound.payload
3834 ._fd_handles);
3835 }
3836
3837 end:
3838 if (status == CLIENT_TRANSMISSION_STATUS_COMPLETE) {
3839 client->communication.outbound.queued_command_reply = false;
3840 client->communication.outbound.dropped_notification = false;
3841 lttng_payload_clear(&client->communication.outbound.payload);
3842 }
3843
3844 return status;
3845 error:
3846 return CLIENT_TRANSMISSION_STATUS_ERROR;
3847 }
3848
3849 /* Client lock must _not_ be held by the caller. */
3850 static
3851 int client_send_command_reply(struct notification_client *client,
3852 struct notification_thread_state *state,
3853 enum lttng_notification_channel_status status)
3854 {
3855 int ret;
3856 struct lttng_notification_channel_command_reply reply = {
3857 .status = (int8_t) status,
3858 };
3859 struct lttng_notification_channel_message msg;
3860 char buffer[sizeof(msg) + sizeof(reply)];
3861 enum client_transmission_status transmission_status;
3862
3863 msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY;
3864 msg.size = sizeof(reply);
3865 msg.fds = 0;
3866
3867 memcpy(buffer, &msg, sizeof(msg));
3868 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
3869 DBG("Send command reply (%i)", (int) status);
3870
3871 pthread_mutex_lock(&client->lock);
3872 if (client->communication.outbound.queued_command_reply) {
3873 /* Protocol error. */
3874 goto error_unlock;
3875 }
3876
3877 /* Enqueue buffer to outgoing queue and flush it. */
3878 ret = lttng_dynamic_buffer_append(
3879 &client->communication.outbound.payload.buffer,
3880 buffer, sizeof(buffer));
3881 if (ret) {
3882 goto error_unlock;
3883 }
3884
3885 transmission_status = client_flush_outgoing_queue(client);
3886
3887 if (client_has_outbound_data_left(client)) {
3888 /* Queue could not be emptied. */
3889 client->communication.outbound.queued_command_reply = true;
3890 }
3891
3892 pthread_mutex_unlock(&client->lock);
3893 ret = client_handle_transmission_status(
3894 client, transmission_status, state);
3895 if (ret) {
3896 goto error;
3897 }
3898
3899 return 0;
3900 error_unlock:
3901 pthread_mutex_unlock(&client->lock);
3902 error:
3903 return -1;
3904 }
3905
3906 static
3907 int client_handle_message_unknown(struct notification_client *client,
3908 struct notification_thread_state *state __attribute__((unused)))
3909 {
3910 int ret;
3911 /*
3912 * Receiving message header. The function will be called again
3913 * once the rest of the message as been received and can be
3914 * interpreted.
3915 */
3916 const struct lttng_notification_channel_message *msg;
3917
3918 LTTNG_ASSERT(sizeof(*msg) == client->communication.inbound.payload.buffer.size);
3919 msg = (const struct lttng_notification_channel_message *)
3920 client->communication.inbound.payload.buffer.data;
3921
3922 if (msg->size == 0 ||
3923 msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
3924 ERR("Invalid notification channel message: length = %u",
3925 msg->size);
3926 ret = -1;
3927 goto end;
3928 }
3929
3930 switch (msg->type) {
3931 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
3932 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
3933 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
3934 break;
3935 default:
3936 ret = -1;
3937 ERR("Invalid notification channel message: unexpected message type");
3938 goto end;
3939 }
3940
3941 client->communication.inbound.bytes_to_receive = msg->size;
3942 client->communication.inbound.fds_to_receive = msg->fds;
3943 client->communication.inbound.msg_type =
3944 (enum lttng_notification_channel_message_type) msg->type;
3945 ret = lttng_dynamic_buffer_set_size(
3946 &client->communication.inbound.payload.buffer, msg->size);
3947
3948 /* msg is not valid anymore due to lttng_dynamic_buffer_set_size. */
3949 msg = NULL;
3950 end:
3951 return ret;
3952 }
3953
3954 static
3955 int client_handle_message_handshake(struct notification_client *client,
3956 struct notification_thread_state *state)
3957 {
3958 int ret;
3959 struct lttng_notification_channel_command_handshake *handshake_client;
3960 const struct lttng_notification_channel_command_handshake handshake_reply = {
3961 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
3962 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
3963 };
3964 struct lttng_notification_channel_message msg_header;
3965 enum lttng_notification_channel_status status =
3966 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
3967 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
3968
3969 msg_header.type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE;
3970 msg_header.size = sizeof(handshake_reply);
3971 msg_header.fds = 0;
3972
3973 memcpy(send_buffer, &msg_header, sizeof(msg_header));
3974 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
3975 sizeof(handshake_reply));
3976
3977 handshake_client =
3978 (struct lttng_notification_channel_command_handshake *)
3979 client->communication.inbound.payload.buffer
3980 .data;
3981 client->major = handshake_client->major;
3982 client->minor = handshake_client->minor;
3983 if (!client->communication.inbound.creds_received) {
3984 ERR("No credentials received from client");
3985 ret = -1;
3986 goto end;
3987 }
3988
3989 client->uid = LTTNG_SOCK_GET_UID_CRED(
3990 &client->communication.inbound.creds);
3991 client->gid = LTTNG_SOCK_GET_GID_CRED(
3992 &client->communication.inbound.creds);
3993 client->is_sessiond = LTTNG_SOCK_GET_PID_CRED(&client->communication.inbound.creds) == getpid();
3994 DBG("Received handshake from client: uid = %u, gid = %u, protocol version = %i.%i, client is sessiond = %s",
3995 client->uid, client->gid, (int) client->major,
3996 (int) client->minor,
3997 client->is_sessiond ? "true" : "false");
3998
3999 if (handshake_client->major !=
4000 LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
4001 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
4002 }
4003
4004 pthread_mutex_lock(&client->lock);
4005 /* Outgoing queue will be flushed when the command reply is sent. */
4006 ret = lttng_dynamic_buffer_append(
4007 &client->communication.outbound.payload.buffer, send_buffer,
4008 sizeof(send_buffer));
4009 if (ret) {
4010 ERR("Failed to send protocol version to notification channel client");
4011 goto end_unlock;
4012 }
4013
4014 client->validated = true;
4015 client->communication.active = true;
4016 pthread_mutex_unlock(&client->lock);
4017
4018 /* Set reception state to receive the next message header. */
4019 ret = client_reset_inbound_state(client);
4020 if (ret) {
4021 ERR("Failed to reset client communication's inbound state");
4022 goto end;
4023 }
4024
4025 /* Flushes the outgoing queue. */
4026 ret = client_send_command_reply(client, state, status);
4027 if (ret) {
4028 ERR("Failed to send reply to notification channel client");
4029 goto end;
4030 }
4031
4032 goto end;
4033 end_unlock:
4034 pthread_mutex_unlock(&client->lock);
4035 end:
4036 return ret;
4037 }
4038
4039 static
4040 int client_handle_message_subscription(
4041 struct notification_client *client,
4042 enum lttng_notification_channel_message_type msg_type,
4043 struct notification_thread_state *state)
4044 {
4045 int ret;
4046 struct lttng_condition *condition;
4047 enum lttng_notification_channel_status status =
4048 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
4049 struct lttng_payload_view condition_view =
4050 lttng_payload_view_from_payload(
4051 &client->communication.inbound.payload,
4052 0, -1);
4053 size_t expected_condition_size;
4054
4055 /*
4056 * No need to lock client to sample the inbound state as the only
4057 * other thread accessing clients (action executor) only uses the
4058 * outbound state.
4059 */
4060 expected_condition_size = client->communication.inbound.payload.buffer.size;
4061 ret = lttng_condition_create_from_payload(&condition_view, &condition);
4062 if (ret != expected_condition_size) {
4063 ERR("Malformed condition received from client");
4064 goto end;
4065 }
4066
4067 /* Ownership of condition is always transferred. */
4068 if (msg_type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
4069 ret = notification_thread_client_subscribe(
4070 client, condition, state, &status);
4071 } else {
4072 ret = notification_thread_client_unsubscribe(
4073 client, condition, state, &status);
4074 }
4075
4076 if (ret) {
4077 goto end;
4078 }
4079
4080 /* Set reception state to receive the next message header. */
4081 ret = client_reset_inbound_state(client);
4082 if (ret) {
4083 ERR("Failed to reset client communication's inbound state");
4084 goto end;
4085 }
4086
4087 ret = client_send_command_reply(client, state, status);
4088 if (ret) {
4089 ERR("Failed to send reply to notification channel client");
4090 goto end;
4091 }
4092
4093 end:
4094 return ret;
4095 }
4096
4097 static
4098 int client_dispatch_message(struct notification_client *client,
4099 struct notification_thread_state *state)
4100 {
4101 int ret = 0;
4102
4103 if (client->communication.inbound.msg_type !=
4104 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
4105 client->communication.inbound.msg_type !=
4106 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
4107 !client->validated) {
4108 WARN("client attempted a command before handshake");
4109 ret = -1;
4110 goto end;
4111 }
4112
4113 switch (client->communication.inbound.msg_type) {
4114 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
4115 {
4116 ret = client_handle_message_unknown(client, state);
4117 break;
4118 }
4119 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
4120 {
4121 ret = client_handle_message_handshake(client, state);
4122 break;
4123 }
4124 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
4125 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
4126 {
4127 ret = client_handle_message_subscription(client,
4128 client->communication.inbound.msg_type, state);
4129 break;
4130 }
4131 default:
4132 abort();
4133 }
4134 end:
4135 return ret;
4136 }
4137
4138 /* Incoming data from client. */
4139 int handle_notification_thread_client_in(
4140 struct notification_thread_state *state, int socket)
4141 {
4142 int ret = 0;
4143 struct notification_client *client;
4144 ssize_t recv_ret;
4145 size_t offset;
4146
4147 rcu_read_lock();
4148 client = get_client_from_socket(socket, state);
4149 if (!client) {
4150 /* Internal error, abort. */
4151 ret = -1;
4152 goto end;
4153 }
4154
4155 if (client->communication.inbound.bytes_to_receive == 0 &&
4156 client->communication.inbound.fds_to_receive != 0) {
4157 /* Only FDs left to receive. */
4158 goto receive_fds;
4159 }
4160
4161 offset = client->communication.inbound.payload.buffer.size -
4162 client->communication.inbound.bytes_to_receive;
4163 if (client->communication.inbound.expect_creds) {
4164 recv_ret = lttcomm_recv_creds_unix_sock(socket,
4165 client->communication.inbound.payload.buffer.data + offset,
4166 client->communication.inbound.bytes_to_receive,
4167 &client->communication.inbound.creds);
4168 if (recv_ret > 0) {
4169 client->communication.inbound.expect_creds = false;
4170 client->communication.inbound.creds_received = true;
4171 }
4172 } else {
4173 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
4174 client->communication.inbound.payload.buffer.data + offset,
4175 client->communication.inbound.bytes_to_receive);
4176 }
4177 if (recv_ret >= 0) {
4178 client->communication.inbound.bytes_to_receive -= recv_ret;
4179 } else {
4180 goto error_disconnect_client;
4181 }
4182
4183 if (client->communication.inbound.bytes_to_receive != 0) {
4184 /* Message incomplete wait for more data. */
4185 ret = 0;
4186 goto end;
4187 }
4188
4189 receive_fds:
4190 LTTNG_ASSERT(client->communication.inbound.bytes_to_receive == 0);
4191
4192 /* Receive fds. */
4193 if (client->communication.inbound.fds_to_receive != 0) {
4194 ret = lttcomm_recv_payload_fds_unix_sock_non_block(
4195 client->socket,
4196 client->communication.inbound.fds_to_receive,
4197 &client->communication.inbound.payload);
4198 if (ret > 0) {
4199 /*
4200 * Fds received. non blocking fds passing is all
4201 * or nothing.
4202 */
4203 ssize_t expected_size;
4204
4205 expected_size = sizeof(int) *
4206 client->communication.inbound
4207 .fds_to_receive;
4208 LTTNG_ASSERT(ret == expected_size);
4209 client->communication.inbound.fds_to_receive = 0;
4210 } else if (ret == 0) {
4211 /* Received nothing. */
4212 ret = 0;
4213 goto end;
4214 } else {
4215 goto error_disconnect_client;
4216 }
4217 }
4218
4219 /* At this point the message is complete.*/
4220 LTTNG_ASSERT(client->communication.inbound.bytes_to_receive == 0 &&
4221 client->communication.inbound.fds_to_receive == 0);
4222 ret = client_dispatch_message(client, state);
4223 if (ret) {
4224 /*
4225 * Only returns an error if this client must be
4226 * disconnected.
4227 */
4228 goto error_disconnect_client;
4229 }
4230
4231 end:
4232 rcu_read_unlock();
4233 return ret;
4234
4235 error_disconnect_client:
4236 ret = notification_thread_client_disconnect(client, state);
4237 goto end;
4238 }
4239
4240 /* Client ready to receive outgoing data. */
4241 int handle_notification_thread_client_out(
4242 struct notification_thread_state *state, int socket)
4243 {
4244 int ret;
4245 struct notification_client *client;
4246 enum client_transmission_status transmission_status;
4247
4248 rcu_read_lock();
4249 client = get_client_from_socket(socket, state);
4250 if (!client) {
4251 /* Internal error, abort. */
4252 ret = -1;
4253 goto end;
4254 }
4255
4256 pthread_mutex_lock(&client->lock);
4257 if (!client_has_outbound_data_left(client)) {
4258 /*
4259 * A client "out" event can be received when no payload is left
4260 * to send under some circumstances.
4261 *
4262 * Many threads can flush a client's outgoing queue and, if they
4263 * had to queue their message (socket was full), will use the
4264 * "communication update" command to signal the (e)poll thread
4265 * to monitor for space being made available in the socket.
4266 *
4267 * Commands are sent over an internal pipe serviced by the same
4268 * thread as the client sockets.
4269 *
4270 * When space is made available in the socket, there is a race
4271 * between the (e)poll thread and the other threads that may
4272 * wish to use the client's socket to flush its outgoing queue.
4273 *
4274 * A non-(e)poll thread may attempt (and succeed) in flushing
4275 * the queue before the (e)poll thread gets a chance to service
4276 * the client's "out" event.
4277 *
4278 * In this situation, the (e)poll thread processing the client
4279 * out event will see an empty payload: there is nothing to do
4280 * except unsubscribing (e)poll "out" events.
4281 *
4282 * Note that this thread is the (e)poll thread so it can modify
4283 * the (e)poll mask directly without using a communication
4284 * update command. Other threads that flush the outgoing queue
4285 * will use the "communication update" command to wake up this
4286 * thread and force it to monitor "out" events.
4287 *
4288 * When other threads succeed in emptying the outgoing queue,
4289 * they don't need to update the (e)poll mask: if the "out"
4290 * event is monitored, it will fire once and the (e)poll
4291 * thread will reach this condition, causing the event to
4292 * stop being monitored.
4293 */
4294 transmission_status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
4295 } else {
4296 transmission_status = client_flush_outgoing_queue(client);
4297 }
4298 pthread_mutex_unlock(&client->lock);
4299
4300 ret = client_handle_transmission_status(
4301 client, transmission_status, state);
4302 if (ret) {
4303 goto end;
4304 }
4305 end:
4306 rcu_read_unlock();
4307 return ret;
4308 }
4309
4310 static
4311 bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
4312 const struct channel_state_sample *sample,
4313 uint64_t buffer_capacity)
4314 {
4315 bool result = false;
4316 uint64_t threshold;
4317 enum lttng_condition_type condition_type;
4318 const struct lttng_condition_buffer_usage *use_condition = lttng::utils::container_of(
4319 condition, &lttng_condition_buffer_usage::parent);
4320
4321 if (use_condition->threshold_bytes.set) {
4322 threshold = use_condition->threshold_bytes.value;
4323 } else {
4324 /*
4325 * Threshold was expressed as a ratio.
4326 *
4327 * TODO the threshold (in bytes) of conditions expressed
4328 * as a ratio of total buffer size could be cached to
4329 * forego this double-multiplication or it could be performed
4330 * as fixed-point math.
4331 *
4332 * Note that caching should accommodates the case where the
4333 * condition applies to multiple channels (i.e. don't assume
4334 * that all channels matching my_chann* have the same size...)
4335 */
4336 threshold = (uint64_t) (use_condition->threshold_ratio.value *
4337 (double) buffer_capacity);
4338 }
4339
4340 condition_type = lttng_condition_get_type(condition);
4341 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
4342 DBG("Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
4343 threshold, sample->highest_usage);
4344
4345 /*
4346 * The low condition should only be triggered once _all_ of the
4347 * streams in a channel have gone below the "low" threshold.
4348 */
4349 if (sample->highest_usage <= threshold) {
4350 result = true;
4351 }
4352 } else {
4353 DBG("High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
4354 threshold, sample->highest_usage);
4355
4356 /*
4357 * For high buffer usage scenarios, we want to trigger whenever
4358 * _any_ of the streams has reached the "high" threshold.
4359 */
4360 if (sample->highest_usage >= threshold) {
4361 result = true;
4362 }
4363 }
4364
4365 return result;
4366 }
4367
4368 static
4369 bool evaluate_session_consumed_size_condition(
4370 const struct lttng_condition *condition,
4371 uint64_t session_consumed_size)
4372 {
4373 uint64_t threshold;
4374 const struct lttng_condition_session_consumed_size *size_condition =
4375 lttng::utils::container_of(condition,
4376 &lttng_condition_session_consumed_size::parent);
4377
4378 threshold = size_condition->consumed_threshold_bytes.value;
4379 DBG("Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
4380 threshold, session_consumed_size);
4381 return session_consumed_size >= threshold;
4382 }
4383
4384 static
4385 int evaluate_buffer_condition(const struct lttng_condition *condition,
4386 struct lttng_evaluation **evaluation,
4387 const struct notification_thread_state *state __attribute__((unused)),
4388 const struct channel_state_sample *previous_sample,
4389 const struct channel_state_sample *latest_sample,
4390 uint64_t previous_session_consumed_total,
4391 uint64_t latest_session_consumed_total,
4392 struct channel_info *channel_info)
4393 {
4394 int ret = 0;
4395 enum lttng_condition_type condition_type;
4396 const bool previous_sample_available = !!previous_sample;
4397 bool previous_sample_result = false;
4398 bool latest_sample_result;
4399
4400 condition_type = lttng_condition_get_type(condition);
4401
4402 switch (condition_type) {
4403 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
4404 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
4405 if (caa_likely(previous_sample_available)) {
4406 previous_sample_result =
4407 evaluate_buffer_usage_condition(condition,
4408 previous_sample, channel_info->capacity);
4409 }
4410 latest_sample_result = evaluate_buffer_usage_condition(
4411 condition, latest_sample,
4412 channel_info->capacity);
4413 break;
4414 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
4415 if (caa_likely(previous_sample_available)) {
4416 previous_sample_result =
4417 evaluate_session_consumed_size_condition(
4418 condition,
4419 previous_session_consumed_total);
4420 }
4421 latest_sample_result =
4422 evaluate_session_consumed_size_condition(
4423 condition,
4424 latest_session_consumed_total);
4425 break;
4426 default:
4427 /* Unknown condition type; internal error. */
4428 abort();
4429 }
4430
4431 if (!latest_sample_result ||
4432 (previous_sample_result == latest_sample_result)) {
4433 /*
4434 * Only trigger on a condition evaluation transition.
4435 *
4436 * NOTE: This edge-triggered logic may not be appropriate for
4437 * future condition types.
4438 */
4439 goto end;
4440 }
4441
4442 if (!evaluation || !latest_sample_result) {
4443 goto end;
4444 }
4445
4446 switch (condition_type) {
4447 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
4448 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
4449 *evaluation = lttng_evaluation_buffer_usage_create(
4450 condition_type,
4451 latest_sample->highest_usage,
4452 channel_info->capacity);
4453 break;
4454 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
4455 *evaluation = lttng_evaluation_session_consumed_size_create(
4456 latest_session_consumed_total);
4457 break;
4458 default:
4459 abort();
4460 }
4461
4462 if (!*evaluation) {
4463 ret = -1;
4464 goto end;
4465 }
4466 end:
4467 return ret;
4468 }
4469
4470 static
4471 int client_notification_overflow(struct notification_client *client)
4472 {
4473 int ret = 0;
4474 struct lttng_notification_channel_message msg;
4475
4476 msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED;
4477 msg.size = 0;
4478 msg.fds = 0;
4479
4480 ASSERT_LOCKED(client->lock);
4481
4482 DBG("Dropping notification addressed to client (socket fd = %i)",
4483 client->socket);
4484 if (client->communication.outbound.dropped_notification) {
4485 /*
4486 * The client already has a "notification dropped" message
4487 * in its outgoing queue. Nothing to do since all
4488 * of those messages are coalesced.
4489 */
4490 goto end;
4491 }
4492
4493 client->communication.outbound.dropped_notification = true;
4494 ret = lttng_dynamic_buffer_append(
4495 &client->communication.outbound.payload.buffer, &msg,
4496 sizeof(msg));
4497 if (ret) {
4498 PERROR("Failed to enqueue \"dropped notification\" message in client's (socket fd = %i) outgoing queue",
4499 client->socket);
4500 }
4501 end:
4502 return ret;
4503 }
4504
4505 static int client_handle_transmission_status_wrapper(
4506 struct notification_client *client,
4507 enum client_transmission_status status,
4508 void *user_data)
4509 {
4510 return client_handle_transmission_status(client, status,
4511 (struct notification_thread_state *) user_data);
4512 }
4513
4514 static
4515 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
4516 const struct lttng_evaluation *evaluation,
4517 struct notification_client_list* client_list,
4518 struct notification_thread_state *state,
4519 uid_t object_uid, gid_t object_gid)
4520 {
4521 const struct lttng_credentials creds = {
4522 .uid = LTTNG_OPTIONAL_INIT_VALUE(object_uid),
4523 .gid = LTTNG_OPTIONAL_INIT_VALUE(object_gid),
4524 };
4525
4526 return notification_client_list_send_evaluation(client_list,
4527 trigger, evaluation,
4528 &creds,
4529 client_handle_transmission_status_wrapper, state);
4530 }
4531
4532 /*
4533 * Permission checks relative to notification channel clients are performed
4534 * here. Notice how object, client, and trigger credentials are involved in
4535 * this check.
4536 *
4537 * The `object` credentials are the credentials associated with the "subject"
4538 * of a condition. For instance, a `rotation completed` condition applies
4539 * to a session. When that condition is met, it will produce an evaluation
4540 * against a session. Hence, in this case, the `object` credentials are the
4541 * credentials of the "subject" session.
4542 *
4543 * The `trigger` credentials are the credentials of the user that registered the
4544 * trigger.
4545 *
4546 * The `client` credentials are the credentials of the user that created a given
4547 * notification channel.
4548 *
4549 * In terms of visibility, it is expected that non-privilieged users can only
4550 * register triggers against "their" objects (their own sessions and
4551 * applications they are allowed to interact with). They can then open a
4552 * notification channel and subscribe to notifications associated with those
4553 * triggers.
4554 *
4555 * As for privilieged users, they can register triggers against the objects of
4556 * other users. They can then subscribe to the notifications associated to their
4557 * triggers. Privilieged users _can't_ subscribe to the notifications of
4558 * triggers owned by other users; they must create their own triggers.
4559 *
4560 * This is more a concern of usability than security. It would be difficult for
4561 * a root user reliably subscribe to a specific set of conditions without
4562 * interference from external users (those could, for instance, unregister
4563 * their triggers).
4564 */
4565 int notification_client_list_send_evaluation(
4566 struct notification_client_list *client_list,
4567 const struct lttng_trigger *trigger,
4568 const struct lttng_evaluation *evaluation,
4569 const struct lttng_credentials *source_object_creds,
4570 report_client_transmission_result_cb client_report,
4571 void *user_data)
4572 {
4573 int ret = 0;
4574 struct lttng_payload msg_payload;
4575 struct notification_client_list_element *client_list_element, *tmp;
4576 const struct lttng_notification notification = {
4577 .trigger = (struct lttng_trigger *) trigger,
4578 .evaluation = (struct lttng_evaluation *) evaluation,
4579 };
4580 struct lttng_notification_channel_message msg_header;
4581 const struct lttng_credentials *trigger_creds =
4582 lttng_trigger_get_credentials(trigger);
4583
4584 lttng_payload_init(&msg_payload);
4585
4586 msg_header.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION;
4587 msg_header.size = 0;
4588 msg_header.fds = 0;
4589
4590 ret = lttng_dynamic_buffer_append(&msg_payload.buffer, &msg_header,
4591 sizeof(msg_header));
4592 if (ret) {
4593 goto end;
4594 }
4595
4596 ret = lttng_notification_serialize(&notification, &msg_payload);
4597 if (ret) {
4598 ERR("Failed to serialize notification");
4599 ret = -1;
4600 goto end;
4601 }
4602
4603 /* Update payload size. */
4604 ((struct lttng_notification_channel_message *) msg_payload.buffer.data)
4605 ->size = (uint32_t)(
4606 msg_payload.buffer.size - sizeof(msg_header));
4607
4608 /* Update the payload number of fds. */
4609 {
4610 const struct lttng_payload_view pv = lttng_payload_view_from_payload(
4611 &msg_payload, 0, -1);
4612
4613 ((struct lttng_notification_channel_message *)
4614 msg_payload.buffer.data)->fds = (uint32_t)
4615 lttng_payload_view_get_fd_handle_count(&pv);
4616 }
4617
4618 pthread_mutex_lock(&client_list->lock);
4619 cds_list_for_each_entry_safe(client_list_element, tmp,
4620 &client_list->clients_list, node) {
4621 enum client_transmission_status transmission_status;
4622 struct notification_client *client =
4623 client_list_element->client;
4624
4625 ret = 0;
4626 pthread_mutex_lock(&client->lock);
4627 if (!client->communication.active) {
4628 /*
4629 * Skip inactive client (protocol error or
4630 * disconnecting).
4631 */
4632 DBG("Skipping client at it is marked as inactive");
4633 goto skip_client;
4634 }
4635
4636 if (lttng_trigger_is_hidden(trigger) && !client->is_sessiond) {
4637 /*
4638 * Notifications resulting from an hidden trigger are
4639 * only sent to the session daemon.
4640 */
4641 DBG("Skipping client as the trigger is hidden and the client is not the session daemon");
4642 goto skip_client;
4643 }
4644
4645 if (source_object_creds) {
4646 if (client->uid != lttng_credentials_get_uid(source_object_creds) &&
4647 client->gid != lttng_credentials_get_gid(source_object_creds) &&
4648 client->uid != 0) {
4649 /*
4650 * Client is not allowed to monitor this
4651 * object.
4652 */
4653 DBG("Skipping client at it does not have the object permission to receive notification for this trigger");
4654 goto skip_client;
4655 }
4656 }
4657
4658 if (client->uid != lttng_credentials_get_uid(trigger_creds)) {
4659 DBG("Skipping client at it does not have the permission to receive notification for this trigger");
4660 goto skip_client;
4661 }
4662
4663 DBG("Sending notification to client (fd = %i, %zu bytes)",
4664 client->socket, msg_payload.buffer.size);
4665
4666 if (client_has_outbound_data_left(client)) {
4667 /*
4668 * Outgoing data is already buffered for this client;
4669 * drop the notification and enqueue a "dropped
4670 * notification" message if this is the first dropped
4671 * notification since the socket spilled-over to the
4672 * queue.
4673 */
4674 ret = client_notification_overflow(client);
4675 if (ret) {
4676 /* Fatal error. */
4677 goto skip_client;
4678 }
4679 }
4680
4681 ret = lttng_payload_copy(&msg_payload, &client->communication.outbound.payload);
4682 if (ret) {
4683 /* Fatal error. */
4684 goto skip_client;
4685 }
4686
4687 transmission_status = client_flush_outgoing_queue(client);
4688 pthread_mutex_unlock(&client->lock);
4689 ret = client_report(client, transmission_status, user_data);
4690 if (ret) {
4691 /* Fatal error. */
4692 goto end_unlock_list;
4693 }
4694
4695 continue;
4696
4697 skip_client:
4698 pthread_mutex_unlock(&client->lock);
4699 if (ret) {
4700 /* Fatal error. */
4701 goto end_unlock_list;
4702 }
4703 }
4704 ret = 0;
4705
4706 end_unlock_list:
4707 pthread_mutex_unlock(&client_list->lock);
4708 end:
4709 lttng_payload_reset(&msg_payload);
4710 return ret;
4711 }
4712
4713 static
4714 struct lttng_event_notifier_notification *recv_one_event_notifier_notification(
4715 int notification_pipe_read_fd, enum lttng_domain_type domain)
4716 {
4717 int ret;
4718 uint64_t token;
4719 struct lttng_event_notifier_notification *notification = NULL;
4720 char *capture_buffer = NULL;
4721 size_t capture_buffer_size;
4722 void *reception_buffer;
4723 size_t reception_size;
4724
4725 struct lttng_ust_abi_event_notifier_notification ust_notification;
4726 struct lttng_kernel_abi_event_notifier_notification kernel_notification;
4727
4728 /* Init lttng_event_notifier_notification */
4729 switch(domain) {
4730 case LTTNG_DOMAIN_UST:
4731 reception_buffer = (void *) &ust_notification;
4732 reception_size = sizeof(ust_notification);
4733 break;
4734 case LTTNG_DOMAIN_KERNEL:
4735 reception_buffer = (void *) &kernel_notification;
4736 reception_size = sizeof(kernel_notification);
4737 break;
4738 default:
4739 abort();
4740 }
4741
4742 /*
4743 * The monitoring pipe only holds messages smaller than PIPE_BUF,
4744 * ensuring that read/write of tracer notifications are atomic.
4745 */
4746 ret = lttng_read(notification_pipe_read_fd, reception_buffer,
4747 reception_size);
4748 if (ret != reception_size) {
4749 PERROR("Failed to read from event source notification pipe: fd = %d, size to read = %zu, ret = %d",
4750 notification_pipe_read_fd, reception_size, ret);
4751 ret = -1;
4752 goto end;
4753 }
4754
4755 switch(domain) {
4756 case LTTNG_DOMAIN_UST:
4757 token = ust_notification.token;
4758 capture_buffer_size = ust_notification.capture_buf_size;
4759 break;
4760 case LTTNG_DOMAIN_KERNEL:
4761 token = kernel_notification.token;
4762 capture_buffer_size = kernel_notification.capture_buf_size;
4763 break;
4764 default:
4765 abort();
4766 }
4767
4768 if (capture_buffer_size == 0) {
4769 capture_buffer = NULL;
4770 goto skip_capture;
4771 }
4772
4773 if (capture_buffer_size > MAX_CAPTURE_SIZE) {
4774 ERR("Event notifier has a capture payload size which exceeds the maximum allowed size: capture_payload_size = %zu bytes, max allowed size = %d bytes",
4775 capture_buffer_size, MAX_CAPTURE_SIZE);
4776 goto end;
4777 }
4778
4779 capture_buffer = calloc<char>(capture_buffer_size);
4780 if (!capture_buffer) {
4781 ERR("Failed to allocate capture buffer");
4782 goto end;
4783 }
4784
4785 /* Fetch additional payload (capture). */
4786 ret = lttng_read(notification_pipe_read_fd, capture_buffer, capture_buffer_size);
4787 if (ret != capture_buffer_size) {
4788 ERR("Failed to read from event source pipe (fd = %i)",
4789 notification_pipe_read_fd);
4790 goto end;
4791 }
4792
4793 skip_capture:
4794 notification = lttng_event_notifier_notification_create(token, domain,
4795 capture_buffer, capture_buffer_size);
4796 if (notification == NULL) {
4797 goto end;
4798 }
4799
4800 /*
4801 * Ownership transfered to the lttng_event_notifier_notification object.
4802 */
4803 capture_buffer = NULL;
4804
4805 end:
4806 free(capture_buffer);
4807 return notification;
4808 }
4809
4810 static
4811 int dispatch_one_event_notifier_notification(struct notification_thread_state *state,
4812 struct lttng_event_notifier_notification *notification)
4813 {
4814 struct cds_lfht_node *node;
4815 struct cds_lfht_iter iter;
4816 struct notification_trigger_tokens_ht_element *element;
4817 struct lttng_evaluation *evaluation = NULL;
4818 enum action_executor_status executor_status;
4819 struct notification_client_list *client_list = NULL;
4820 int ret;
4821 unsigned int capture_count = 0;
4822
4823 /* Find triggers associated with this token. */
4824 rcu_read_lock();
4825 cds_lfht_lookup(state->trigger_tokens_ht,
4826 hash_key_u64(&notification->tracer_token, lttng_ht_seed),
4827 match_trigger_token, &notification->tracer_token, &iter);
4828 node = cds_lfht_iter_get_node(&iter);
4829 if (caa_unlikely(!node)) {
4830 /*
4831 * This is not an error, slow consumption of the tracer
4832 * notifications can lead to situations where a trigger is
4833 * removed but we still get tracer notifications matching a
4834 * trigger that no longer exists.
4835 */
4836 ret = 0;
4837 goto end_unlock;
4838 }
4839
4840 element = caa_container_of(node,
4841 struct notification_trigger_tokens_ht_element,
4842 node);
4843
4844 if (lttng_condition_event_rule_matches_get_capture_descriptor_count(
4845 lttng_trigger_get_const_condition(element->trigger),
4846 &capture_count) != LTTNG_CONDITION_STATUS_OK) {
4847 ERR("Failed to get capture count");
4848 ret = -1;
4849 goto end;
4850 }
4851
4852 if (!notification->capture_buffer && capture_count != 0) {
4853 ERR("Expected capture but capture buffer is null");
4854 ret = -1;
4855 goto end;
4856 }
4857
4858 evaluation = lttng_evaluation_event_rule_matches_create(
4859 lttng::utils::container_of(lttng_trigger_get_const_condition(
4860 element->trigger),
4861 &lttng_condition_event_rule_matches::parent),
4862 notification->capture_buffer,
4863 notification->capture_buf_size, false);
4864
4865 if (evaluation == NULL) {
4866 ERR("Failed to create event rule matches evaluation while creating and enqueuing action executor job");
4867 ret = -1;
4868 goto end_unlock;
4869 }
4870
4871 client_list = get_client_list_from_condition(state,
4872 lttng_trigger_get_const_condition(element->trigger));
4873 executor_status = action_executor_enqueue_trigger(state->executor,
4874 element->trigger, evaluation, NULL, client_list);
4875 switch (executor_status) {
4876 case ACTION_EXECUTOR_STATUS_OK:
4877 ret = 0;
4878 break;
4879 case ACTION_EXECUTOR_STATUS_OVERFLOW:
4880 {
4881 struct notification_client_list_element *client_list_element,
4882 *tmp;
4883
4884 /*
4885 * Not a fatal error; this is expected and simply means the
4886 * executor has too much work queued already.
4887 */
4888 ret = 0;
4889
4890 /* No clients subscribed to notifications for this trigger. */
4891 if (!client_list) {
4892 break;
4893 }
4894
4895 /* Warn clients that a notification (or more) was dropped. */
4896 pthread_mutex_lock(&client_list->lock);
4897 cds_list_for_each_entry_safe(client_list_element, tmp,
4898 &client_list->clients_list, node) {
4899 enum client_transmission_status transmission_status;
4900 struct notification_client *client =
4901 client_list_element->client;
4902
4903 pthread_mutex_lock(&client->lock);
4904 ret = client_notification_overflow(client);
4905 if (ret) {
4906 /* Fatal error. */
4907 goto next_client;
4908 }
4909
4910 transmission_status =
4911 client_flush_outgoing_queue(client);
4912 ret = client_handle_transmission_status(
4913 client, transmission_status, state);
4914 if (ret) {
4915 /* Fatal error. */
4916 goto next_client;
4917 }
4918 next_client:
4919 pthread_mutex_unlock(&client->lock);
4920 if (ret) {
4921 break;
4922 }
4923 }
4924
4925 pthread_mutex_unlock(&client_list->lock);
4926 break;
4927 }
4928 case ACTION_EXECUTOR_STATUS_INVALID:
4929 case ACTION_EXECUTOR_STATUS_ERROR:
4930 /* Fatal error, shut down everything. */
4931 ERR("Fatal error encoutered while enqueuing action to the action executor");
4932 ret = -1;
4933 goto end_unlock;
4934 default:
4935 /* Unhandled error. */
4936 abort();
4937 }
4938
4939 end_unlock:
4940 notification_client_list_put(client_list);
4941 rcu_read_unlock();
4942 end:
4943 return ret;
4944 }
4945
4946 static
4947 int handle_one_event_notifier_notification(
4948 struct notification_thread_state *state,
4949 int pipe, enum lttng_domain_type domain)
4950 {
4951 int ret = 0;
4952 struct lttng_event_notifier_notification *notification = NULL;
4953
4954 notification = recv_one_event_notifier_notification(pipe, domain);
4955 if (notification == NULL) {
4956 /* Reception failed, don't consider it fatal. */
4957 ERR("Error receiving an event notifier notification from tracer: fd = %i, domain = %s",
4958 pipe, lttng_domain_type_str(domain));
4959 goto end;
4960 }
4961
4962 ret = dispatch_one_event_notifier_notification(state, notification);
4963 if (ret) {
4964 ERR("Error dispatching an event notifier notification from tracer: fd = %i, domain = %s",
4965 pipe, lttng_domain_type_str(domain));
4966 goto end;
4967 }
4968
4969 end:
4970 lttng_event_notifier_notification_destroy(notification);
4971 return ret;
4972 }
4973
4974 int handle_notification_thread_event_notification(struct notification_thread_state *state,
4975 int pipe, enum lttng_domain_type domain)
4976 {
4977 return handle_one_event_notifier_notification(state, pipe, domain);
4978 }
4979
4980 int handle_notification_thread_channel_sample(
4981 struct notification_thread_state *state, int pipe,
4982 enum lttng_domain_type domain)
4983 {
4984 int ret = 0;
4985 struct lttcomm_consumer_channel_monitor_msg sample_msg;
4986 struct channel_info *channel_info;
4987 struct cds_lfht_node *node;
4988 struct cds_lfht_iter iter;
4989 struct lttng_channel_trigger_list *trigger_list;
4990 struct lttng_trigger_list_element *trigger_list_element;
4991 bool previous_sample_available = false;
4992 struct channel_state_sample previous_sample, latest_sample;
4993 uint64_t previous_session_consumed_total, latest_session_consumed_total;
4994 struct lttng_credentials channel_creds;
4995
4996 /*
4997 * The monitoring pipe only holds messages smaller than PIPE_BUF,
4998 * ensuring that read/write of sampling messages are atomic.
4999 */
5000 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
5001 if (ret != sizeof(sample_msg)) {
5002 ERR("Failed to read from monitoring pipe (fd = %i)",
5003 pipe);
5004 ret = -1;
5005 goto end;
5006 }
5007
5008 ret = 0;
5009 latest_sample.key.key = sample_msg.key;
5010 latest_sample.key.domain = domain;
5011 latest_sample.highest_usage = sample_msg.highest;
5012 latest_sample.lowest_usage = sample_msg.lowest;
5013 latest_sample.channel_total_consumed = sample_msg.total_consumed;
5014
5015 rcu_read_lock();
5016
5017 /* Retrieve the channel's informations */
5018 cds_lfht_lookup(state->channels_ht,
5019 hash_channel_key(&latest_sample.key),
5020 match_channel_info,
5021 &latest_sample.key,
5022 &iter);
5023 node = cds_lfht_iter_get_node(&iter);
5024 if (caa_unlikely(!node)) {
5025 /*
5026 * Not an error since the consumer can push a sample to the pipe
5027 * and the rest of the session daemon could notify us of the
5028 * channel's destruction before we get a chance to process that
5029 * sample.
5030 */
5031 DBG("Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
5032 latest_sample.key.key,
5033 lttng_domain_type_str(domain));
5034 goto end_unlock;
5035 }
5036 channel_info = caa_container_of(node, struct channel_info,
5037 channels_ht_node);
5038 DBG("Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
5039 channel_info->name,
5040 latest_sample.key.key,
5041 channel_info->session_info->name,
5042 latest_sample.highest_usage,
5043 latest_sample.lowest_usage,
5044 latest_sample.channel_total_consumed);
5045
5046 previous_session_consumed_total =
5047 channel_info->session_info->consumed_data_size;
5048
5049 /* Retrieve the channel's last sample, if it exists, and update it. */
5050 cds_lfht_lookup(state->channel_state_ht,
5051 hash_channel_key(&latest_sample.key),
5052 match_channel_state_sample,
5053 &latest_sample.key,
5054 &iter);
5055 node = cds_lfht_iter_get_node(&iter);
5056 if (caa_likely(node)) {
5057 struct channel_state_sample *stored_sample;
5058
5059 /* Update the sample stored. */
5060 stored_sample = caa_container_of(node,
5061 struct channel_state_sample,
5062 channel_state_ht_node);
5063
5064 memcpy(&previous_sample, stored_sample,
5065 sizeof(previous_sample));
5066 stored_sample->highest_usage = latest_sample.highest_usage;
5067 stored_sample->lowest_usage = latest_sample.lowest_usage;
5068 stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
5069 previous_sample_available = true;
5070
5071 latest_session_consumed_total =
5072 previous_session_consumed_total +
5073 (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
5074 } else {
5075 /*
5076 * This is the channel's first sample, allocate space for and
5077 * store the new sample.
5078 */
5079 struct channel_state_sample *stored_sample;
5080
5081 stored_sample = zmalloc<channel_state_sample>();
5082 if (!stored_sample) {
5083 ret = -1;
5084 goto end_unlock;
5085 }
5086
5087 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
5088 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
5089 cds_lfht_add(state->channel_state_ht,
5090 hash_channel_key(&stored_sample->key),
5091 &stored_sample->channel_state_ht_node);
5092
5093 latest_session_consumed_total =
5094 previous_session_consumed_total +
5095 latest_sample.channel_total_consumed;
5096 }
5097
5098 channel_info->session_info->consumed_data_size =
5099 latest_session_consumed_total;
5100
5101 /* Find triggers associated with this channel. */
5102 cds_lfht_lookup(state->channel_triggers_ht,
5103 hash_channel_key(&latest_sample.key),
5104 match_channel_trigger_list,
5105 &latest_sample.key,
5106 &iter);
5107 node = cds_lfht_iter_get_node(&iter);
5108 if (caa_likely(!node)) {
5109 goto end_unlock;
5110 }
5111
5112 channel_creds = (typeof(channel_creds)) {
5113 .uid = LTTNG_OPTIONAL_INIT_VALUE(channel_info->session_info->uid),
5114 .gid = LTTNG_OPTIONAL_INIT_VALUE(channel_info->session_info->gid),
5115 };
5116
5117 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
5118 channel_triggers_ht_node);
5119 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
5120 node) {
5121 const struct lttng_condition *condition;
5122 struct lttng_trigger *trigger;
5123 struct notification_client_list *client_list = NULL;
5124 struct lttng_evaluation *evaluation = NULL;
5125 enum action_executor_status executor_status;
5126
5127 ret = 0;
5128 trigger = trigger_list_element->trigger;
5129 condition = lttng_trigger_get_const_condition(trigger);
5130 LTTNG_ASSERT(condition);
5131
5132 /*
5133 * Check if any client is subscribed to the result of this
5134 * evaluation.
5135 */
5136 client_list = get_client_list_from_condition(state, condition);
5137
5138 ret = evaluate_buffer_condition(condition, &evaluation, state,
5139 previous_sample_available ? &previous_sample : NULL,
5140 &latest_sample,
5141 previous_session_consumed_total,
5142 latest_session_consumed_total,
5143 channel_info);
5144 if (caa_unlikely(ret)) {
5145 goto put_list;
5146 }
5147
5148 if (caa_likely(!evaluation)) {
5149 goto put_list;
5150 }
5151
5152 /*
5153 * Ownership of `evaluation` transferred to the action executor
5154 * no matter the result.
5155 */
5156 executor_status = action_executor_enqueue_trigger(
5157 state->executor, trigger, evaluation,
5158 &channel_creds, client_list);
5159 evaluation = NULL;
5160 switch (executor_status) {
5161 case ACTION_EXECUTOR_STATUS_OK:
5162 break;
5163 case ACTION_EXECUTOR_STATUS_ERROR:
5164 case ACTION_EXECUTOR_STATUS_INVALID:
5165 /*
5166 * TODO Add trigger identification (name/id) when
5167 * it is added to the API.
5168 */
5169 ERR("Fatal error occurred while enqueuing action associated with buffer-condition trigger");
5170 ret = -1;
5171 goto put_list;
5172 case ACTION_EXECUTOR_STATUS_OVERFLOW:
5173 /*
5174 * TODO Add trigger identification (name/id) when
5175 * it is added to the API.
5176 *
5177 * Not a fatal error.
5178 */
5179 WARN("No space left when enqueuing action associated with buffer-condition trigger");
5180 ret = 0;
5181 goto put_list;
5182 default:
5183 abort();
5184 }
5185
5186 put_list:
5187 notification_client_list_put(client_list);
5188 if (caa_unlikely(ret)) {
5189 break;
5190 }
5191 }
5192 end_unlock:
5193 rcu_read_unlock();
5194 end:
5195 return ret;
5196 }
This page took 0.552071 seconds and 4 git commands to generate.