vscode: Add configurations to run the executables under the debugger
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.cpp
1 /*
2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #include "lttng/action/action.h"
9 #include "lttng/trigger/trigger-internal.hpp"
10 #define _LGPL_SOURCE
11 #include <urcu.h>
12 #include <urcu/rculfhash.h>
13
14 #include <common/defaults.hpp>
15 #include <common/error.hpp>
16 #include <common/futex.hpp>
17 #include <common/unix.hpp>
18 #include <common/dynamic-buffer.hpp>
19 #include <common/hashtable/utils.hpp>
20 #include <common/sessiond-comm/sessiond-comm.hpp>
21 #include <common/macros.hpp>
22 #include <lttng/condition/condition.h>
23 #include <lttng/action/action-internal.hpp>
24 #include <lttng/action/list-internal.hpp>
25 #include <lttng/domain-internal.hpp>
26 #include <lttng/notification/notification-internal.hpp>
27 #include <lttng/condition/condition-internal.hpp>
28 #include <lttng/condition/buffer-usage-internal.hpp>
29 #include <lttng/condition/session-consumed-size-internal.hpp>
30 #include <lttng/condition/session-rotation-internal.hpp>
31 #include <lttng/condition/event-rule-matches-internal.hpp>
32 #include <lttng/domain-internal.hpp>
33 #include <lttng/notification/channel-internal.hpp>
34 #include <lttng/trigger/trigger-internal.hpp>
35 #include <lttng/event-rule/event-rule-internal.hpp>
36
37 #include <time.h>
38 #include <unistd.h>
39 #include <inttypes.h>
40 #include <fcntl.h>
41
42 #include "condition-internal.hpp"
43 #include "event-notifier-error-accounting.hpp"
44 #include "notification-thread.hpp"
45 #include "notification-thread-events.hpp"
46 #include "notification-thread-commands.hpp"
47 #include "lttng-sessiond.hpp"
48 #include "kernel.hpp"
49
50 #define CLIENT_POLL_EVENTS_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
51 #define CLIENT_POLL_EVENTS_IN_OUT (CLIENT_POLL_EVENTS_IN | LPOLLOUT)
52
53 /* The tracers currently limit the capture size to PIPE_BUF (4kb on linux). */
54 #define MAX_CAPTURE_SIZE (PIPE_BUF)
55
56 enum lttng_object_type {
57 LTTNG_OBJECT_TYPE_UNKNOWN,
58 LTTNG_OBJECT_TYPE_NONE,
59 LTTNG_OBJECT_TYPE_CHANNEL,
60 LTTNG_OBJECT_TYPE_SESSION,
61 };
62
63 struct lttng_channel_trigger_list {
64 struct channel_key channel_key;
65 /* List of struct lttng_trigger_list_element. */
66 struct cds_list_head list;
67 /* Node in the channel_triggers_ht */
68 struct cds_lfht_node channel_triggers_ht_node;
69 /* call_rcu delayed reclaim. */
70 struct rcu_head rcu_node;
71 };
72
73 /*
74 * List of triggers applying to a given session.
75 *
76 * See:
77 * - lttng_session_trigger_list_create()
78 * - lttng_session_trigger_list_build()
79 * - lttng_session_trigger_list_destroy()
80 * - lttng_session_trigger_list_add()
81 */
82 struct lttng_session_trigger_list {
83 /*
84 * Not owned by this; points to the session_info structure's
85 * session name.
86 */
87 const char *session_name;
88 /* List of struct lttng_trigger_list_element. */
89 struct cds_list_head list;
90 /* Node in the session_triggers_ht */
91 struct cds_lfht_node session_triggers_ht_node;
92 /*
93 * Weak reference to the notification system's session triggers
94 * hashtable.
95 *
96 * The session trigger list structure structure is owned by
97 * the session's session_info.
98 *
99 * The session_info is kept alive the the channel_infos holding a
100 * reference to it (reference counting). When those channels are
101 * destroyed (at runtime or on teardown), the reference they hold
102 * to the session_info are released. On destruction of session_info,
103 * session_info_destroy() will remove the list of triggers applying
104 * to this session from the notification system's state.
105 *
106 * This implies that the session_triggers_ht must be destroyed
107 * after the channels.
108 */
109 struct cds_lfht *session_triggers_ht;
110 /* Used for delayed RCU reclaim. */
111 struct rcu_head rcu_node;
112 };
113
114 namespace {
115 struct lttng_trigger_list_element {
116 /* No ownership of the trigger object is assumed. */
117 struct lttng_trigger *trigger;
118 struct cds_list_head node;
119 };
120
121 struct lttng_trigger_ht_element {
122 struct lttng_trigger *trigger;
123 struct cds_lfht_node node;
124 struct cds_lfht_node node_by_name_uid;
125 struct cds_list_head client_list_trigger_node;
126 /* call_rcu delayed reclaim. */
127 struct rcu_head rcu_node;
128 };
129
130 struct lttng_condition_list_element {
131 struct lttng_condition *condition;
132 struct cds_list_head node;
133 };
134
135 struct channel_state_sample {
136 struct channel_key key;
137 struct cds_lfht_node channel_state_ht_node;
138 uint64_t highest_usage;
139 uint64_t lowest_usage;
140 uint64_t channel_total_consumed;
141 /* call_rcu delayed reclaim. */
142 struct rcu_head rcu_node;
143 };
144 } /* namespace */
145
146 static unsigned long hash_channel_key(struct channel_key *key);
147 static int evaluate_buffer_condition(const struct lttng_condition *condition,
148 struct lttng_evaluation **evaluation,
149 const struct notification_thread_state *state,
150 const struct channel_state_sample *previous_sample,
151 const struct channel_state_sample *latest_sample,
152 uint64_t previous_session_consumed_total,
153 uint64_t latest_session_consumed_total,
154 struct channel_info *channel_info);
155 static
156 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
157 const struct lttng_evaluation *evaluation,
158 struct notification_client_list *client_list,
159 struct notification_thread_state *state,
160 uid_t channel_uid, gid_t channel_gid);
161
162
163 /* session_info API */
164 static
165 void session_info_destroy(void *_data);
166 static
167 void session_info_get(struct session_info *session_info);
168 static
169 void session_info_put(struct session_info *session_info);
170 static
171 struct session_info *session_info_create(const char *name,
172 uid_t uid, gid_t gid,
173 struct lttng_session_trigger_list *trigger_list,
174 struct cds_lfht *sessions_ht);
175 static
176 void session_info_add_channel(struct session_info *session_info,
177 struct channel_info *channel_info);
178 static
179 void session_info_remove_channel(struct session_info *session_info,
180 struct channel_info *channel_info);
181
182 /* lttng_session_trigger_list API */
183 static
184 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
185 const char *session_name,
186 struct cds_lfht *session_triggers_ht);
187 static
188 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
189 const struct notification_thread_state *state,
190 const char *session_name);
191 static
192 void lttng_session_trigger_list_destroy(
193 struct lttng_session_trigger_list *list);
194 static
195 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
196 struct lttng_trigger *trigger);
197
198 static
199 int client_handle_transmission_status(
200 struct notification_client *client,
201 enum client_transmission_status transmission_status,
202 struct notification_thread_state *state);
203
204 static
205 int handle_one_event_notifier_notification(
206 struct notification_thread_state *state,
207 int pipe, enum lttng_domain_type domain);
208
209 static
210 void free_lttng_trigger_ht_element_rcu(struct rcu_head *node);
211
212 static
213 int match_client_socket(struct cds_lfht_node *node, const void *key)
214 {
215 /* This double-cast is intended to supress pointer-to-cast warning. */
216 const int socket = (int) (intptr_t) key;
217 const struct notification_client *client = caa_container_of(node,
218 struct notification_client, client_socket_ht_node);
219
220 return client->socket == socket;
221 }
222
223 static
224 int match_client_id(struct cds_lfht_node *node, const void *key)
225 {
226 /* This double-cast is intended to supress pointer-to-cast warning. */
227 const notification_client_id id = *((notification_client_id *) key);
228 const struct notification_client *client = caa_container_of(
229 node, struct notification_client, client_id_ht_node);
230
231 return client->id == id;
232 }
233
234 static
235 int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
236 {
237 struct channel_key *channel_key = (struct channel_key *) key;
238 struct lttng_channel_trigger_list *trigger_list;
239
240 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
241 channel_triggers_ht_node);
242
243 return !!((channel_key->key == trigger_list->channel_key.key) &&
244 (channel_key->domain == trigger_list->channel_key.domain));
245 }
246
247 static
248 int match_session_trigger_list(struct cds_lfht_node *node, const void *key)
249 {
250 const char *session_name = (const char *) key;
251 struct lttng_session_trigger_list *trigger_list;
252
253 trigger_list = caa_container_of(node, struct lttng_session_trigger_list,
254 session_triggers_ht_node);
255
256 return !!(strcmp(trigger_list->session_name, session_name) == 0);
257 }
258
259 static
260 int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
261 {
262 struct channel_key *channel_key = (struct channel_key *) key;
263 struct channel_state_sample *sample;
264
265 sample = caa_container_of(node, struct channel_state_sample,
266 channel_state_ht_node);
267
268 return !!((channel_key->key == sample->key.key) &&
269 (channel_key->domain == sample->key.domain));
270 }
271
272 static
273 int match_channel_info(struct cds_lfht_node *node, const void *key)
274 {
275 struct channel_key *channel_key = (struct channel_key *) key;
276 struct channel_info *channel_info;
277
278 channel_info = caa_container_of(node, struct channel_info,
279 channels_ht_node);
280
281 return !!((channel_key->key == channel_info->key.key) &&
282 (channel_key->domain == channel_info->key.domain));
283 }
284
285 static
286 int match_trigger(struct cds_lfht_node *node, const void *key)
287 {
288 struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
289 struct lttng_trigger_ht_element *trigger_ht_element;
290
291 trigger_ht_element = caa_container_of(node, struct lttng_trigger_ht_element,
292 node);
293
294 return !!lttng_trigger_is_equal(trigger_key, trigger_ht_element->trigger);
295 }
296
297 static
298 int match_trigger_token(struct cds_lfht_node *node, const void *key)
299 {
300 const uint64_t *_key = (uint64_t *) key;
301 struct notification_trigger_tokens_ht_element *element;
302
303 element = caa_container_of(node,
304 struct notification_trigger_tokens_ht_element, node);
305 return *_key == element->token;
306 }
307
308 static
309 int match_client_list_condition(struct cds_lfht_node *node, const void *key)
310 {
311 struct lttng_condition *condition_key = (struct lttng_condition *) key;
312 struct notification_client_list *client_list;
313 const struct lttng_condition *condition;
314
315 LTTNG_ASSERT(condition_key);
316
317 client_list = caa_container_of(node, struct notification_client_list,
318 notification_trigger_clients_ht_node);
319 condition = client_list->condition;
320
321 return !!lttng_condition_is_equal(condition_key, condition);
322 }
323
324 static
325 int match_session(struct cds_lfht_node *node, const void *key)
326 {
327 const char *name = (const char *) key;
328 struct session_info *session_info = caa_container_of(
329 node, struct session_info, sessions_ht_node);
330
331 return !strcmp(session_info->name, name);
332 }
333
334 static
335 const char *notification_command_type_str(
336 enum notification_thread_command_type type)
337 {
338 switch (type) {
339 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
340 return "REGISTER_TRIGGER";
341 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
342 return "UNREGISTER_TRIGGER";
343 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
344 return "ADD_CHANNEL";
345 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
346 return "REMOVE_CHANNEL";
347 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
348 return "SESSION_ROTATION_ONGOING";
349 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
350 return "SESSION_ROTATION_COMPLETED";
351 case NOTIFICATION_COMMAND_TYPE_ADD_TRACER_EVENT_SOURCE:
352 return "ADD_TRACER_EVENT_SOURCE";
353 case NOTIFICATION_COMMAND_TYPE_REMOVE_TRACER_EVENT_SOURCE:
354 return "REMOVE_TRACER_EVENT_SOURCE";
355 case NOTIFICATION_COMMAND_TYPE_LIST_TRIGGERS:
356 return "LIST_TRIGGERS";
357 case NOTIFICATION_COMMAND_TYPE_GET_TRIGGER:
358 return "GET_TRIGGER";
359 case NOTIFICATION_COMMAND_TYPE_QUIT:
360 return "QUIT";
361 case NOTIFICATION_COMMAND_TYPE_CLIENT_COMMUNICATION_UPDATE:
362 return "CLIENT_COMMUNICATION_UPDATE";
363 default:
364 abort();
365 }
366 }
367
368 /*
369 * Match trigger based on name and credentials only.
370 * Name duplication is NOT allowed for the same uid.
371 */
372 static
373 int match_trigger_by_name_uid(struct cds_lfht_node *node,
374 const void *key)
375 {
376 bool match = false;
377 const char *element_trigger_name;
378 const char *key_name;
379 enum lttng_trigger_status status;
380 const struct lttng_credentials *key_creds;
381 const struct lttng_credentials *node_creds;
382 const struct lttng_trigger *trigger_key =
383 (const struct lttng_trigger *) key;
384 const struct lttng_trigger_ht_element *trigger_ht_element =
385 caa_container_of(node,
386 struct lttng_trigger_ht_element,
387 node_by_name_uid);
388
389 status = lttng_trigger_get_name(trigger_ht_element->trigger,
390 &element_trigger_name);
391 element_trigger_name = status == LTTNG_TRIGGER_STATUS_OK ?
392 element_trigger_name : NULL;
393
394 status = lttng_trigger_get_name(trigger_key, &key_name);
395 key_name = status == LTTNG_TRIGGER_STATUS_OK ? key_name : NULL;
396
397 /*
398 * Compare the names.
399 * Consider null names as not equal. This is to maintain backwards
400 * compatibility with pre-2.13 anonymous triggers. Multiples anonymous
401 * triggers are allowed for a given user.
402 */
403 if (!element_trigger_name || !key_name) {
404 goto end;
405 }
406
407 if (strcmp(element_trigger_name, key_name) != 0) {
408 goto end;
409 }
410
411 /* Compare the owners' UIDs. */
412 key_creds = lttng_trigger_get_credentials(trigger_key);
413 node_creds = lttng_trigger_get_credentials(trigger_ht_element->trigger);
414
415 match = lttng_credentials_is_equal_uid(key_creds, node_creds);
416
417 end:
418 return match;
419 }
420
421 /*
422 * Hash trigger based on name and credentials only.
423 */
424 static
425 unsigned long hash_trigger_by_name_uid(const struct lttng_trigger *trigger)
426 {
427 unsigned long hash = 0;
428 const struct lttng_credentials *trigger_creds;
429 const char *trigger_name;
430 enum lttng_trigger_status status;
431
432 status = lttng_trigger_get_name(trigger, &trigger_name);
433 if (status == LTTNG_TRIGGER_STATUS_OK) {
434 hash = hash_key_str(trigger_name, lttng_ht_seed);
435 }
436
437 trigger_creds = lttng_trigger_get_credentials(trigger);
438 hash ^= hash_key_ulong((void *) (unsigned long) LTTNG_OPTIONAL_GET(trigger_creds->uid),
439 lttng_ht_seed);
440
441 return hash;
442 }
443
444 static
445 unsigned long hash_channel_key(struct channel_key *key)
446 {
447 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
448 unsigned long domain_hash = hash_key_ulong(
449 (void *) (unsigned long) key->domain, lttng_ht_seed);
450
451 return key_hash ^ domain_hash;
452 }
453
454 static
455 unsigned long hash_client_socket(int socket)
456 {
457 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
458 }
459
460 static
461 unsigned long hash_client_id(notification_client_id id)
462 {
463 return hash_key_u64(&id, lttng_ht_seed);
464 }
465
466 /*
467 * Get the type of object to which a given condition applies. Bindings let
468 * the notification system evaluate a trigger's condition when a given
469 * object's state is updated.
470 *
471 * For instance, a condition bound to a channel will be evaluated everytime
472 * the channel's state is changed by a channel monitoring sample.
473 */
474 static
475 enum lttng_object_type get_condition_binding_object(
476 const struct lttng_condition *condition)
477 {
478 switch (lttng_condition_get_type(condition)) {
479 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
480 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
481 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
482 return LTTNG_OBJECT_TYPE_CHANNEL;
483 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
484 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
485 return LTTNG_OBJECT_TYPE_SESSION;
486 case LTTNG_CONDITION_TYPE_EVENT_RULE_MATCHES:
487 return LTTNG_OBJECT_TYPE_NONE;
488 default:
489 return LTTNG_OBJECT_TYPE_UNKNOWN;
490 }
491 }
492
493 static
494 void free_channel_info_rcu(struct rcu_head *node)
495 {
496 free(caa_container_of(node, struct channel_info, rcu_node));
497 }
498
499 static
500 void channel_info_destroy(struct channel_info *channel_info)
501 {
502 if (!channel_info) {
503 return;
504 }
505
506 if (channel_info->session_info) {
507 session_info_remove_channel(channel_info->session_info,
508 channel_info);
509 session_info_put(channel_info->session_info);
510 }
511 if (channel_info->name) {
512 free(channel_info->name);
513 }
514 call_rcu(&channel_info->rcu_node, free_channel_info_rcu);
515 }
516
517 static
518 void free_session_info_rcu(struct rcu_head *node)
519 {
520 free(caa_container_of(node, struct session_info, rcu_node));
521 }
522
523 /* Don't call directly, use the ref-counting mechanism. */
524 static
525 void session_info_destroy(void *_data)
526 {
527 struct session_info *session_info = (struct session_info *) _data;
528 int ret;
529
530 LTTNG_ASSERT(session_info);
531 if (session_info->channel_infos_ht) {
532 ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
533 if (ret) {
534 ERR("Failed to destroy channel information hash table");
535 }
536 }
537 lttng_session_trigger_list_destroy(session_info->trigger_list);
538
539 rcu_read_lock();
540 cds_lfht_del(session_info->sessions_ht,
541 &session_info->sessions_ht_node);
542 rcu_read_unlock();
543 free(session_info->name);
544 call_rcu(&session_info->rcu_node, free_session_info_rcu);
545 }
546
547 static
548 void session_info_get(struct session_info *session_info)
549 {
550 if (!session_info) {
551 return;
552 }
553 lttng_ref_get(&session_info->ref);
554 }
555
556 static
557 void session_info_put(struct session_info *session_info)
558 {
559 if (!session_info) {
560 return;
561 }
562 lttng_ref_put(&session_info->ref);
563 }
564
565 static
566 struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
567 struct lttng_session_trigger_list *trigger_list,
568 struct cds_lfht *sessions_ht)
569 {
570 struct session_info *session_info;
571
572 LTTNG_ASSERT(name);
573
574 session_info = zmalloc<struct session_info>();
575 if (!session_info) {
576 goto end;
577 }
578 lttng_ref_init(&session_info->ref, session_info_destroy);
579
580 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
581 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
582 if (!session_info->channel_infos_ht) {
583 goto error;
584 }
585
586 cds_lfht_node_init(&session_info->sessions_ht_node);
587 session_info->name = strdup(name);
588 if (!session_info->name) {
589 goto error;
590 }
591 session_info->uid = uid;
592 session_info->gid = gid;
593 session_info->trigger_list = trigger_list;
594 session_info->sessions_ht = sessions_ht;
595 end:
596 return session_info;
597 error:
598 session_info_put(session_info);
599 return NULL;
600 }
601
602 static
603 void session_info_add_channel(struct session_info *session_info,
604 struct channel_info *channel_info)
605 {
606 rcu_read_lock();
607 cds_lfht_add(session_info->channel_infos_ht,
608 hash_channel_key(&channel_info->key),
609 &channel_info->session_info_channels_ht_node);
610 rcu_read_unlock();
611 }
612
613 static
614 void session_info_remove_channel(struct session_info *session_info,
615 struct channel_info *channel_info)
616 {
617 rcu_read_lock();
618 cds_lfht_del(session_info->channel_infos_ht,
619 &channel_info->session_info_channels_ht_node);
620 rcu_read_unlock();
621 }
622
623 static
624 struct channel_info *channel_info_create(const char *channel_name,
625 struct channel_key *channel_key, uint64_t channel_capacity,
626 struct session_info *session_info)
627 {
628 struct channel_info *channel_info = zmalloc<struct channel_info>();
629
630 if (!channel_info) {
631 goto end;
632 }
633
634 cds_lfht_node_init(&channel_info->channels_ht_node);
635 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
636 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
637 channel_info->capacity = channel_capacity;
638
639 channel_info->name = strdup(channel_name);
640 if (!channel_info->name) {
641 goto error;
642 }
643
644 /*
645 * Set the references between session and channel infos:
646 * - channel_info holds a strong reference to session_info
647 * - session_info holds a weak reference to channel_info
648 */
649 session_info_get(session_info);
650 session_info_add_channel(session_info, channel_info);
651 channel_info->session_info = session_info;
652 end:
653 return channel_info;
654 error:
655 channel_info_destroy(channel_info);
656 return NULL;
657 }
658
659 bool notification_client_list_get(struct notification_client_list *list)
660 {
661 return urcu_ref_get_unless_zero(&list->ref);
662 }
663
664 static
665 void free_notification_client_list_rcu(struct rcu_head *node)
666 {
667 free(caa_container_of(node, struct notification_client_list,
668 rcu_node));
669 }
670
671 static
672 void notification_client_list_release(struct urcu_ref *list_ref)
673 {
674 struct notification_client_list *list =
675 container_of(list_ref, typeof(*list), ref);
676 struct notification_client_list_element *client_list_element, *tmp;
677
678 lttng_condition_put(list->condition);
679
680 if (list->notification_trigger_clients_ht) {
681 rcu_read_lock();
682
683 cds_lfht_del(list->notification_trigger_clients_ht,
684 &list->notification_trigger_clients_ht_node);
685 rcu_read_unlock();
686 list->notification_trigger_clients_ht = NULL;
687 }
688 cds_list_for_each_entry_safe(client_list_element, tmp,
689 &list->clients_list, node) {
690 free(client_list_element);
691 }
692
693 LTTNG_ASSERT(cds_list_empty(&list->triggers_list));
694
695 pthread_mutex_destroy(&list->lock);
696 call_rcu(&list->rcu_node, free_notification_client_list_rcu);
697 }
698
699 static
700 bool condition_applies_to_client(const struct lttng_condition *condition,
701 struct notification_client *client)
702 {
703 bool applies = false;
704 struct lttng_condition_list_element *condition_list_element;
705
706 cds_list_for_each_entry(condition_list_element, &client->condition_list,
707 node) {
708 applies = lttng_condition_is_equal(
709 condition_list_element->condition,
710 condition);
711 if (applies) {
712 break;
713 }
714 }
715
716 return applies;
717 }
718
719 static
720 struct notification_client_list *notification_client_list_create(
721 struct notification_thread_state *state,
722 const struct lttng_condition *condition)
723 {
724 struct notification_client *client;
725 struct cds_lfht_iter iter;
726 struct notification_client_list *client_list;
727
728 client_list = zmalloc<notification_client_list>();
729 if (!client_list) {
730 PERROR("Failed to allocate notification client list");
731 goto end;
732 }
733
734 pthread_mutex_init(&client_list->lock, NULL);
735 /*
736 * The trigger that owns the condition has the first reference to this
737 * client list.
738 */
739 urcu_ref_init(&client_list->ref);
740 cds_lfht_node_init(&client_list->notification_trigger_clients_ht_node);
741 CDS_INIT_LIST_HEAD(&client_list->clients_list);
742 CDS_INIT_LIST_HEAD(&client_list->triggers_list);
743
744 /*
745 * Create a copy of the condition so that it's independent of any
746 * trigger. The client list may outlive the trigger object (which owns
747 * the condition) that is used to create it.
748 */
749 client_list->condition = lttng_condition_copy(condition);
750
751 /* Build a list of clients to which this new condition applies. */
752 cds_lfht_for_each_entry (state->client_socket_ht, &iter, client,
753 client_socket_ht_node) {
754 struct notification_client_list_element *client_list_element;
755
756 if (!condition_applies_to_client(condition, client)) {
757 continue;
758 }
759
760 client_list_element = zmalloc<notification_client_list_element>();
761 if (!client_list_element) {
762 goto error_put_client_list;
763 }
764
765 CDS_INIT_LIST_HEAD(&client_list_element->node);
766 client_list_element->client = client;
767 cds_list_add(&client_list_element->node, &client_list->clients_list);
768 }
769
770 client_list->notification_trigger_clients_ht =
771 state->notification_trigger_clients_ht;
772
773 rcu_read_lock();
774 /*
775 * Add the client list to the global list of client list.
776 */
777 cds_lfht_add_unique(state->notification_trigger_clients_ht,
778 lttng_condition_hash(client_list->condition),
779 match_client_list_condition,
780 client_list->condition,
781 &client_list->notification_trigger_clients_ht_node);
782 rcu_read_unlock();
783 goto end;
784
785 error_put_client_list:
786 notification_client_list_put(client_list);
787 client_list = NULL;
788
789 end:
790 return client_list;
791 }
792
793 void notification_client_list_put(struct notification_client_list *list)
794 {
795 if (!list) {
796 return;
797 }
798 return urcu_ref_put(&list->ref, notification_client_list_release);
799 }
800
801 /* Provides a reference to the returned list. */
802 static
803 struct notification_client_list *get_client_list_from_condition(
804 struct notification_thread_state *state,
805 const struct lttng_condition *condition)
806 {
807 struct cds_lfht_node *node;
808 struct cds_lfht_iter iter;
809 struct notification_client_list *list = NULL;
810
811 rcu_read_lock();
812 cds_lfht_lookup(state->notification_trigger_clients_ht,
813 lttng_condition_hash(condition),
814 match_client_list_condition,
815 condition,
816 &iter);
817 node = cds_lfht_iter_get_node(&iter);
818 if (node) {
819 list = container_of(node, struct notification_client_list,
820 notification_trigger_clients_ht_node);
821 list = notification_client_list_get(list) ? list : NULL;
822 }
823
824 rcu_read_unlock();
825 return list;
826 }
827
828 static
829 int evaluate_channel_condition_for_client(
830 const struct lttng_condition *condition,
831 struct notification_thread_state *state,
832 struct lttng_evaluation **evaluation,
833 uid_t *session_uid, gid_t *session_gid)
834 {
835 int ret;
836 struct cds_lfht_iter iter;
837 struct cds_lfht_node *node;
838 struct channel_info *channel_info = NULL;
839 struct channel_key *channel_key = NULL;
840 struct channel_state_sample *last_sample = NULL;
841 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
842
843 rcu_read_lock();
844
845 /* Find the channel associated with the condition. */
846 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
847 channel_trigger_list, channel_triggers_ht_node) {
848 struct lttng_trigger_list_element *element;
849
850 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
851 const struct lttng_condition *current_condition =
852 lttng_trigger_get_const_condition(
853 element->trigger);
854
855 LTTNG_ASSERT(current_condition);
856 if (!lttng_condition_is_equal(condition,
857 current_condition)) {
858 continue;
859 }
860
861 /* Found the trigger, save the channel key. */
862 channel_key = &channel_trigger_list->channel_key;
863 break;
864 }
865 if (channel_key) {
866 /* The channel key was found stop iteration. */
867 break;
868 }
869 }
870
871 if (!channel_key){
872 /* No channel found; normal exit. */
873 DBG("No known channel associated with newly subscribed-to condition");
874 ret = 0;
875 goto end;
876 }
877
878 /* Fetch channel info for the matching channel. */
879 cds_lfht_lookup(state->channels_ht,
880 hash_channel_key(channel_key),
881 match_channel_info,
882 channel_key,
883 &iter);
884 node = cds_lfht_iter_get_node(&iter);
885 LTTNG_ASSERT(node);
886 channel_info = caa_container_of(node, struct channel_info,
887 channels_ht_node);
888
889 /* Retrieve the channel's last sample, if it exists. */
890 cds_lfht_lookup(state->channel_state_ht,
891 hash_channel_key(channel_key),
892 match_channel_state_sample,
893 channel_key,
894 &iter);
895 node = cds_lfht_iter_get_node(&iter);
896 if (node) {
897 last_sample = caa_container_of(node,
898 struct channel_state_sample,
899 channel_state_ht_node);
900 } else {
901 /* Nothing to evaluate, no sample was ever taken. Normal exit */
902 DBG("No channel sample associated with newly subscribed-to condition");
903 ret = 0;
904 goto end;
905 }
906
907 ret = evaluate_buffer_condition(condition, evaluation, state,
908 NULL, last_sample,
909 0, channel_info->session_info->consumed_data_size,
910 channel_info);
911 if (ret) {
912 WARN("Fatal error occurred while evaluating a newly subscribed-to condition");
913 goto end;
914 }
915
916 *session_uid = channel_info->session_info->uid;
917 *session_gid = channel_info->session_info->gid;
918 end:
919 rcu_read_unlock();
920 return ret;
921 }
922
923 static
924 const char *get_condition_session_name(const struct lttng_condition *condition)
925 {
926 const char *session_name = NULL;
927 enum lttng_condition_status status;
928
929 switch (lttng_condition_get_type(condition)) {
930 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
931 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
932 status = lttng_condition_buffer_usage_get_session_name(
933 condition, &session_name);
934 break;
935 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
936 status = lttng_condition_session_consumed_size_get_session_name(
937 condition, &session_name);
938 break;
939 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
940 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
941 status = lttng_condition_session_rotation_get_session_name(
942 condition, &session_name);
943 break;
944 default:
945 abort();
946 }
947 if (status != LTTNG_CONDITION_STATUS_OK) {
948 ERR("Failed to retrieve session rotation condition's session name");
949 goto end;
950 }
951 end:
952 return session_name;
953 }
954
955 static
956 int evaluate_session_condition_for_client(
957 const struct lttng_condition *condition,
958 struct notification_thread_state *state,
959 struct lttng_evaluation **evaluation,
960 uid_t *session_uid, gid_t *session_gid)
961 {
962 int ret;
963 struct cds_lfht_iter iter;
964 struct cds_lfht_node *node;
965 const char *session_name;
966 struct session_info *session_info = NULL;
967
968 rcu_read_lock();
969 session_name = get_condition_session_name(condition);
970
971 /* Find the session associated with the trigger. */
972 cds_lfht_lookup(state->sessions_ht,
973 hash_key_str(session_name, lttng_ht_seed),
974 match_session,
975 session_name,
976 &iter);
977 node = cds_lfht_iter_get_node(&iter);
978 if (!node) {
979 DBG("No known session matching name \"%s\"",
980 session_name);
981 ret = 0;
982 goto end;
983 }
984
985 session_info = caa_container_of(node, struct session_info,
986 sessions_ht_node);
987 session_info_get(session_info);
988
989 /*
990 * Evaluation is performed in-line here since only one type of
991 * session-bound condition is handled for the moment.
992 */
993 switch (lttng_condition_get_type(condition)) {
994 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
995 if (!session_info->rotation.ongoing) {
996 ret = 0;
997 goto end_session_put;
998 }
999
1000 *evaluation = lttng_evaluation_session_rotation_ongoing_create(
1001 session_info->rotation.id);
1002 if (!*evaluation) {
1003 /* Fatal error. */
1004 ERR("Failed to create session rotation ongoing evaluation for session \"%s\"",
1005 session_info->name);
1006 ret = -1;
1007 goto end_session_put;
1008 }
1009 ret = 0;
1010 break;
1011 default:
1012 ret = 0;
1013 goto end_session_put;
1014 }
1015
1016 *session_uid = session_info->uid;
1017 *session_gid = session_info->gid;
1018
1019 end_session_put:
1020 session_info_put(session_info);
1021 end:
1022 rcu_read_unlock();
1023 return ret;
1024 }
1025
1026 static
1027 int evaluate_condition_for_client(const struct lttng_trigger *trigger,
1028 const struct lttng_condition *condition,
1029 struct notification_client *client,
1030 struct notification_thread_state *state)
1031 {
1032 int ret;
1033 struct lttng_evaluation *evaluation = NULL;
1034 struct notification_client_list client_list = {
1035 .lock = PTHREAD_MUTEX_INITIALIZER,
1036 .ref = {},
1037 .condition = NULL,
1038 .triggers_list = {},
1039 .clients_list = {},
1040 .notification_trigger_clients_ht = NULL,
1041 .notification_trigger_clients_ht_node = {},
1042 .rcu_node = {},
1043 };
1044 struct notification_client_list_element client_list_element = {};
1045 uid_t object_uid = 0;
1046 gid_t object_gid = 0;
1047
1048 LTTNG_ASSERT(trigger);
1049 LTTNG_ASSERT(condition);
1050 LTTNG_ASSERT(client);
1051 LTTNG_ASSERT(state);
1052
1053 switch (get_condition_binding_object(condition)) {
1054 case LTTNG_OBJECT_TYPE_SESSION:
1055 ret = evaluate_session_condition_for_client(condition, state,
1056 &evaluation, &object_uid, &object_gid);
1057 break;
1058 case LTTNG_OBJECT_TYPE_CHANNEL:
1059 ret = evaluate_channel_condition_for_client(condition, state,
1060 &evaluation, &object_uid, &object_gid);
1061 break;
1062 case LTTNG_OBJECT_TYPE_NONE:
1063 DBG("Newly subscribed-to condition not bound to object, nothing to evaluate");
1064 ret = 0;
1065 goto end;
1066 case LTTNG_OBJECT_TYPE_UNKNOWN:
1067 default:
1068 ret = -1;
1069 goto end;
1070 }
1071 if (ret) {
1072 /* Fatal error. */
1073 goto end;
1074 }
1075 if (!evaluation) {
1076 /* Evaluation yielded nothing. Normal exit. */
1077 DBG("Newly subscribed-to condition evaluated to false, nothing to report to client");
1078 ret = 0;
1079 goto end;
1080 }
1081
1082 /*
1083 * Create a temporary client list with the client currently
1084 * subscribing.
1085 */
1086 cds_lfht_node_init(&client_list.notification_trigger_clients_ht_node);
1087 CDS_INIT_LIST_HEAD(&client_list.clients_list);
1088
1089 CDS_INIT_LIST_HEAD(&client_list_element.node);
1090 client_list_element.client = client;
1091 cds_list_add(&client_list_element.node, &client_list.clients_list);
1092
1093 /* Send evaluation result to the newly-subscribed client. */
1094 DBG("Newly subscribed-to condition evaluated to true, notifying client");
1095 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
1096 state, object_uid, object_gid);
1097
1098 end:
1099 return ret;
1100 }
1101
1102 static
1103 int notification_thread_client_subscribe(struct notification_client *client,
1104 struct lttng_condition *condition,
1105 struct notification_thread_state *state,
1106 enum lttng_notification_channel_status *_status)
1107 {
1108 int ret = 0;
1109 struct notification_client_list *client_list = NULL;
1110 struct lttng_condition_list_element *condition_list_element = NULL;
1111 struct notification_client_list_element *client_list_element = NULL;
1112 struct lttng_trigger_ht_element *trigger_ht_element;
1113 enum lttng_notification_channel_status status =
1114 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1115
1116 /*
1117 * Ensure that the client has not already subscribed to this condition
1118 * before.
1119 */
1120 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
1121 if (lttng_condition_is_equal(condition_list_element->condition,
1122 condition)) {
1123 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
1124 goto end;
1125 }
1126 }
1127
1128 condition_list_element = zmalloc<lttng_condition_list_element>();
1129 if (!condition_list_element) {
1130 ret = -1;
1131 goto error;
1132 }
1133 client_list_element = zmalloc<notification_client_list_element>();
1134 if (!client_list_element) {
1135 ret = -1;
1136 goto error;
1137 }
1138
1139 /*
1140 * Add the newly-subscribed condition to the client's subscription list.
1141 */
1142 CDS_INIT_LIST_HEAD(&condition_list_element->node);
1143 condition_list_element->condition = condition;
1144 condition = NULL;
1145 cds_list_add(&condition_list_element->node, &client->condition_list);
1146
1147 client_list = get_client_list_from_condition(
1148 state, condition_list_element->condition);
1149 if (!client_list) {
1150 /*
1151 * No notification-emiting trigger registered with this
1152 * condition. We don't evaluate the condition right away
1153 * since this trigger is not registered yet.
1154 */
1155 free(client_list_element);
1156 goto end;
1157 }
1158
1159 /*
1160 * The condition to which the client just subscribed is evaluated
1161 * at this point so that conditions that are already TRUE result
1162 * in a notification being sent out.
1163 *
1164 * Note the iteration on all triggers which share an identical
1165 * `condition` than the one to which the client is registering. This is
1166 * done to ensure that the client receives a distinct notification for
1167 * all triggers that have a `notify` action that have this condition.
1168 */
1169 pthread_mutex_lock(&client_list->lock);
1170 cds_list_for_each_entry(trigger_ht_element,
1171 &client_list->triggers_list, client_list_trigger_node) {
1172 if (evaluate_condition_for_client(trigger_ht_element->trigger, condition_list_element->condition,
1173 client, state)) {
1174 WARN("Evaluation of a condition on client subscription failed, aborting.");
1175 ret = -1;
1176 free(client_list_element);
1177 pthread_mutex_unlock(&client_list->lock);
1178 goto end;
1179 }
1180 }
1181 pthread_mutex_unlock(&client_list->lock);
1182
1183 /*
1184 * Add the client to the list of clients interested in a given trigger
1185 * if a "notification" trigger with a corresponding condition was
1186 * added prior.
1187 */
1188 client_list_element->client = client;
1189 CDS_INIT_LIST_HEAD(&client_list_element->node);
1190
1191 pthread_mutex_lock(&client_list->lock);
1192 cds_list_add(&client_list_element->node, &client_list->clients_list);
1193 pthread_mutex_unlock(&client_list->lock);
1194 end:
1195 if (_status) {
1196 *_status = status;
1197 }
1198 if (client_list) {
1199 notification_client_list_put(client_list);
1200 }
1201 lttng_condition_destroy(condition);
1202 return ret;
1203 error:
1204 free(condition_list_element);
1205 free(client_list_element);
1206 lttng_condition_destroy(condition);
1207 return ret;
1208 }
1209
1210 static
1211 int notification_thread_client_unsubscribe(
1212 struct notification_client *client,
1213 struct lttng_condition *condition,
1214 struct notification_thread_state *state,
1215 enum lttng_notification_channel_status *_status)
1216 {
1217 struct notification_client_list *client_list;
1218 struct lttng_condition_list_element *condition_list_element,
1219 *condition_tmp;
1220 struct notification_client_list_element *client_list_element,
1221 *client_tmp;
1222 bool condition_found = false;
1223 enum lttng_notification_channel_status status =
1224 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1225
1226 /* Remove the condition from the client's condition list. */
1227 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
1228 &client->condition_list, node) {
1229 if (!lttng_condition_is_equal(condition_list_element->condition,
1230 condition)) {
1231 continue;
1232 }
1233
1234 cds_list_del(&condition_list_element->node);
1235 /*
1236 * The caller may be iterating on the client's conditions to
1237 * tear down a client's connection. In this case, the condition
1238 * will be destroyed at the end.
1239 */
1240 if (condition != condition_list_element->condition) {
1241 lttng_condition_destroy(
1242 condition_list_element->condition);
1243 }
1244 free(condition_list_element);
1245 condition_found = true;
1246 break;
1247 }
1248
1249 if (!condition_found) {
1250 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
1251 goto end;
1252 }
1253
1254 /*
1255 * Remove the client from the list of clients interested the trigger
1256 * matching the condition.
1257 */
1258 client_list = get_client_list_from_condition(state, condition);
1259 if (!client_list) {
1260 goto end;
1261 }
1262
1263 pthread_mutex_lock(&client_list->lock);
1264 cds_list_for_each_entry_safe(client_list_element, client_tmp,
1265 &client_list->clients_list, node) {
1266 if (client_list_element->client->id != client->id) {
1267 continue;
1268 }
1269 cds_list_del(&client_list_element->node);
1270 free(client_list_element);
1271 break;
1272 }
1273 pthread_mutex_unlock(&client_list->lock);
1274 notification_client_list_put(client_list);
1275 client_list = NULL;
1276 end:
1277 lttng_condition_destroy(condition);
1278 if (_status) {
1279 *_status = status;
1280 }
1281 return 0;
1282 }
1283
1284 static
1285 void free_notification_client_rcu(struct rcu_head *node)
1286 {
1287 free(caa_container_of(node, struct notification_client, rcu_node));
1288 }
1289
1290 static
1291 void notification_client_destroy(struct notification_client *client)
1292 {
1293 if (!client) {
1294 return;
1295 }
1296
1297 /*
1298 * The client object is not reachable by other threads, no need to lock
1299 * the client here.
1300 */
1301 if (client->socket >= 0) {
1302 (void) lttcomm_close_unix_sock(client->socket);
1303 client->socket = -1;
1304 }
1305 client->communication.active = false;
1306 lttng_payload_reset(&client->communication.inbound.payload);
1307 lttng_payload_reset(&client->communication.outbound.payload);
1308 pthread_mutex_destroy(&client->lock);
1309 call_rcu(&client->rcu_node, free_notification_client_rcu);
1310 }
1311
1312 /*
1313 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1314 * client pointer).
1315 */
1316 static
1317 struct notification_client *get_client_from_socket(int socket,
1318 struct notification_thread_state *state)
1319 {
1320 struct cds_lfht_iter iter;
1321 struct cds_lfht_node *node;
1322 struct notification_client *client = NULL;
1323
1324 ASSERT_RCU_READ_LOCKED();
1325
1326 cds_lfht_lookup(state->client_socket_ht,
1327 hash_client_socket(socket),
1328 match_client_socket,
1329 (void *) (unsigned long) socket,
1330 &iter);
1331 node = cds_lfht_iter_get_node(&iter);
1332 if (!node) {
1333 goto end;
1334 }
1335
1336 client = caa_container_of(node, struct notification_client,
1337 client_socket_ht_node);
1338 end:
1339 return client;
1340 }
1341
1342 /*
1343 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1344 * client pointer).
1345 */
1346 static
1347 struct notification_client *get_client_from_id(notification_client_id id,
1348 struct notification_thread_state *state)
1349 {
1350 struct cds_lfht_iter iter;
1351 struct cds_lfht_node *node;
1352 struct notification_client *client = NULL;
1353
1354 ASSERT_RCU_READ_LOCKED();
1355
1356 cds_lfht_lookup(state->client_id_ht,
1357 hash_client_id(id),
1358 match_client_id,
1359 &id,
1360 &iter);
1361 node = cds_lfht_iter_get_node(&iter);
1362 if (!node) {
1363 goto end;
1364 }
1365
1366 client = caa_container_of(node, struct notification_client,
1367 client_id_ht_node);
1368 end:
1369 return client;
1370 }
1371
1372 static
1373 bool buffer_usage_condition_applies_to_channel(
1374 const struct lttng_condition *condition,
1375 const struct channel_info *channel_info)
1376 {
1377 enum lttng_condition_status status;
1378 enum lttng_domain_type condition_domain;
1379 const char *condition_session_name = NULL;
1380 const char *condition_channel_name = NULL;
1381
1382 status = lttng_condition_buffer_usage_get_domain_type(condition,
1383 &condition_domain);
1384 LTTNG_ASSERT(status == LTTNG_CONDITION_STATUS_OK);
1385 if (channel_info->key.domain != condition_domain) {
1386 goto fail;
1387 }
1388
1389 status = lttng_condition_buffer_usage_get_session_name(
1390 condition, &condition_session_name);
1391 LTTNG_ASSERT((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1392
1393 status = lttng_condition_buffer_usage_get_channel_name(
1394 condition, &condition_channel_name);
1395 LTTNG_ASSERT((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
1396
1397 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1398 goto fail;
1399 }
1400 if (strcmp(channel_info->name, condition_channel_name)) {
1401 goto fail;
1402 }
1403
1404 return true;
1405 fail:
1406 return false;
1407 }
1408
1409 static
1410 bool session_consumed_size_condition_applies_to_channel(
1411 const struct lttng_condition *condition,
1412 const struct channel_info *channel_info)
1413 {
1414 enum lttng_condition_status status;
1415 const char *condition_session_name = NULL;
1416
1417 status = lttng_condition_session_consumed_size_get_session_name(
1418 condition, &condition_session_name);
1419 LTTNG_ASSERT((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1420
1421 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1422 goto fail;
1423 }
1424
1425 return true;
1426 fail:
1427 return false;
1428 }
1429
1430 static
1431 bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
1432 const struct channel_info *channel_info)
1433 {
1434 const struct lttng_condition *condition;
1435 bool trigger_applies;
1436
1437 condition = lttng_trigger_get_const_condition(trigger);
1438 if (!condition) {
1439 goto fail;
1440 }
1441
1442 switch (lttng_condition_get_type(condition)) {
1443 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1444 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1445 trigger_applies = buffer_usage_condition_applies_to_channel(
1446 condition, channel_info);
1447 break;
1448 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1449 trigger_applies = session_consumed_size_condition_applies_to_channel(
1450 condition, channel_info);
1451 break;
1452 default:
1453 goto fail;
1454 }
1455
1456 return trigger_applies;
1457 fail:
1458 return false;
1459 }
1460
1461 /* Must be called with RCU read lock held. */
1462 static
1463 struct lttng_session_trigger_list *get_session_trigger_list(
1464 struct notification_thread_state *state,
1465 const char *session_name)
1466 {
1467 struct lttng_session_trigger_list *list = NULL;
1468 struct cds_lfht_node *node;
1469 struct cds_lfht_iter iter;
1470
1471 ASSERT_RCU_READ_LOCKED();
1472
1473 cds_lfht_lookup(state->session_triggers_ht,
1474 hash_key_str(session_name, lttng_ht_seed),
1475 match_session_trigger_list,
1476 session_name,
1477 &iter);
1478 node = cds_lfht_iter_get_node(&iter);
1479 if (!node) {
1480 /*
1481 * Not an error, the list of triggers applying to that session
1482 * will be initialized when the session is created.
1483 */
1484 DBG("No trigger list found for session \"%s\" as it is not yet known to the notification system",
1485 session_name);
1486 goto end;
1487 }
1488
1489 list = caa_container_of(node,
1490 struct lttng_session_trigger_list,
1491 session_triggers_ht_node);
1492 end:
1493 return list;
1494 }
1495
1496 /*
1497 * Allocate an empty lttng_session_trigger_list for the session named
1498 * 'session_name'.
1499 *
1500 * No ownership of 'session_name' is assumed by the session trigger list.
1501 * It is the caller's responsability to ensure the session name is alive
1502 * for as long as this list is.
1503 */
1504 static
1505 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
1506 const char *session_name,
1507 struct cds_lfht *session_triggers_ht)
1508 {
1509 struct lttng_session_trigger_list *list;
1510
1511 list = zmalloc<lttng_session_trigger_list>();
1512 if (!list) {
1513 goto end;
1514 }
1515 list->session_name = session_name;
1516 CDS_INIT_LIST_HEAD(&list->list);
1517 cds_lfht_node_init(&list->session_triggers_ht_node);
1518 list->session_triggers_ht = session_triggers_ht;
1519
1520 rcu_read_lock();
1521 /* Publish the list through the session_triggers_ht. */
1522 cds_lfht_add(session_triggers_ht,
1523 hash_key_str(session_name, lttng_ht_seed),
1524 &list->session_triggers_ht_node);
1525 rcu_read_unlock();
1526 end:
1527 return list;
1528 }
1529
1530 static
1531 void free_session_trigger_list_rcu(struct rcu_head *node)
1532 {
1533 free(caa_container_of(node, struct lttng_session_trigger_list,
1534 rcu_node));
1535 }
1536
1537 static
1538 void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list)
1539 {
1540 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1541
1542 /* Empty the list element by element, and then free the list itself. */
1543 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1544 &list->list, node) {
1545 cds_list_del(&trigger_list_element->node);
1546 free(trigger_list_element);
1547 }
1548 rcu_read_lock();
1549 /* Unpublish the list from the session_triggers_ht. */
1550 cds_lfht_del(list->session_triggers_ht,
1551 &list->session_triggers_ht_node);
1552 rcu_read_unlock();
1553 call_rcu(&list->rcu_node, free_session_trigger_list_rcu);
1554 }
1555
1556 static
1557 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
1558 struct lttng_trigger *trigger)
1559 {
1560 int ret = 0;
1561 struct lttng_trigger_list_element *new_element =
1562 zmalloc<lttng_trigger_list_element>();
1563
1564 if (!new_element) {
1565 ret = -1;
1566 goto end;
1567 }
1568 CDS_INIT_LIST_HEAD(&new_element->node);
1569 new_element->trigger = trigger;
1570 cds_list_add(&new_element->node, &list->list);
1571 end:
1572 return ret;
1573 }
1574
1575 static
1576 bool trigger_applies_to_session(const struct lttng_trigger *trigger,
1577 const char *session_name)
1578 {
1579 bool applies = false;
1580 const struct lttng_condition *condition;
1581
1582 condition = lttng_trigger_get_const_condition(trigger);
1583 switch (lttng_condition_get_type(condition)) {
1584 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1585 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1586 {
1587 enum lttng_condition_status condition_status;
1588 const char *condition_session_name;
1589
1590 condition_status = lttng_condition_session_rotation_get_session_name(
1591 condition, &condition_session_name);
1592 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
1593 ERR("Failed to retrieve session rotation condition's session name");
1594 goto end;
1595 }
1596
1597 LTTNG_ASSERT(condition_session_name);
1598 applies = !strcmp(condition_session_name, session_name);
1599 break;
1600 }
1601 default:
1602 goto end;
1603 }
1604 end:
1605 return applies;
1606 }
1607
1608 /*
1609 * Allocate and initialize an lttng_session_trigger_list which contains
1610 * all triggers that apply to the session named 'session_name'.
1611 *
1612 * No ownership of 'session_name' is assumed by the session trigger list.
1613 * It is the caller's responsability to ensure the session name is alive
1614 * for as long as this list is.
1615 */
1616 static
1617 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
1618 const struct notification_thread_state *state,
1619 const char *session_name)
1620 {
1621 int trigger_count = 0;
1622 struct lttng_session_trigger_list *session_trigger_list = NULL;
1623 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1624 struct cds_lfht_iter iter;
1625
1626 session_trigger_list = lttng_session_trigger_list_create(session_name,
1627 state->session_triggers_ht);
1628
1629 /* Add all triggers applying to the session named 'session_name'. */
1630 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1631 node) {
1632 int ret;
1633
1634 if (!trigger_applies_to_session(trigger_ht_element->trigger,
1635 session_name)) {
1636 continue;
1637 }
1638
1639 ret = lttng_session_trigger_list_add(session_trigger_list,
1640 trigger_ht_element->trigger);
1641 if (ret) {
1642 goto error;
1643 }
1644
1645 trigger_count++;
1646 }
1647
1648 DBG("Found %i triggers that apply to newly created session",
1649 trigger_count);
1650 return session_trigger_list;
1651 error:
1652 lttng_session_trigger_list_destroy(session_trigger_list);
1653 return NULL;
1654 }
1655
1656 static
1657 struct session_info *find_or_create_session_info(
1658 struct notification_thread_state *state,
1659 const char *name, uid_t uid, gid_t gid)
1660 {
1661 struct session_info *session = NULL;
1662 struct cds_lfht_node *node;
1663 struct cds_lfht_iter iter;
1664 struct lttng_session_trigger_list *trigger_list;
1665
1666 rcu_read_lock();
1667 cds_lfht_lookup(state->sessions_ht,
1668 hash_key_str(name, lttng_ht_seed),
1669 match_session,
1670 name,
1671 &iter);
1672 node = cds_lfht_iter_get_node(&iter);
1673 if (node) {
1674 DBG("Found session info of session \"%s\" (uid = %i, gid = %i)",
1675 name, uid, gid);
1676 session = caa_container_of(node, struct session_info,
1677 sessions_ht_node);
1678 LTTNG_ASSERT(session->uid == uid);
1679 LTTNG_ASSERT(session->gid == gid);
1680 session_info_get(session);
1681 goto end;
1682 }
1683
1684 trigger_list = lttng_session_trigger_list_build(state, name);
1685 if (!trigger_list) {
1686 goto error;
1687 }
1688
1689 session = session_info_create(name, uid, gid, trigger_list,
1690 state->sessions_ht);
1691 if (!session) {
1692 ERR("Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1693 name, uid, gid);
1694 lttng_session_trigger_list_destroy(trigger_list);
1695 goto error;
1696 }
1697 trigger_list = NULL;
1698
1699 cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
1700 &session->sessions_ht_node);
1701 end:
1702 rcu_read_unlock();
1703 return session;
1704 error:
1705 rcu_read_unlock();
1706 session_info_put(session);
1707 return NULL;
1708 }
1709
1710 static
1711 int handle_notification_thread_command_add_channel(
1712 struct notification_thread_state *state,
1713 const char *session_name, uid_t session_uid, gid_t session_gid,
1714 const char *channel_name, enum lttng_domain_type channel_domain,
1715 uint64_t channel_key_int, uint64_t channel_capacity,
1716 enum lttng_error_code *cmd_result)
1717 {
1718 struct cds_list_head trigger_list;
1719 struct channel_info *new_channel_info = NULL;
1720 struct channel_key channel_key = {
1721 .key = channel_key_int,
1722 .domain = channel_domain,
1723 };
1724 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1725 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1726 int trigger_count = 0;
1727 struct cds_lfht_iter iter;
1728 struct session_info *session_info = NULL;
1729
1730 DBG("Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
1731 channel_name, session_name, channel_key_int,
1732 lttng_domain_type_str(channel_domain));
1733
1734 CDS_INIT_LIST_HEAD(&trigger_list);
1735
1736 session_info = find_or_create_session_info(state, session_name,
1737 session_uid, session_gid);
1738 if (!session_info) {
1739 /* Allocation error or an internal error occurred. */
1740 goto error;
1741 }
1742
1743 new_channel_info = channel_info_create(channel_name, &channel_key,
1744 channel_capacity, session_info);
1745 if (!new_channel_info) {
1746 goto error;
1747 }
1748
1749 rcu_read_lock();
1750 /* Build a list of all triggers applying to the new channel. */
1751 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1752 node) {
1753 struct lttng_trigger_list_element *new_element;
1754
1755 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
1756 new_channel_info)) {
1757 continue;
1758 }
1759
1760 new_element = zmalloc<lttng_trigger_list_element>();
1761 if (!new_element) {
1762 rcu_read_unlock();
1763 goto error;
1764 }
1765 CDS_INIT_LIST_HEAD(&new_element->node);
1766 new_element->trigger = trigger_ht_element->trigger;
1767 cds_list_add(&new_element->node, &trigger_list);
1768 trigger_count++;
1769 }
1770 rcu_read_unlock();
1771
1772 DBG("Found %i triggers that apply to newly added channel",
1773 trigger_count);
1774 channel_trigger_list = zmalloc<lttng_channel_trigger_list>();
1775 if (!channel_trigger_list) {
1776 goto error;
1777 }
1778 channel_trigger_list->channel_key = new_channel_info->key;
1779 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1780 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1781 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1782
1783 rcu_read_lock();
1784 /* Add channel to the channel_ht which owns the channel_infos. */
1785 cds_lfht_add(state->channels_ht,
1786 hash_channel_key(&new_channel_info->key),
1787 &new_channel_info->channels_ht_node);
1788 /*
1789 * Add the list of triggers associated with this channel to the
1790 * channel_triggers_ht.
1791 */
1792 cds_lfht_add(state->channel_triggers_ht,
1793 hash_channel_key(&new_channel_info->key),
1794 &channel_trigger_list->channel_triggers_ht_node);
1795 rcu_read_unlock();
1796 session_info_put(session_info);
1797 *cmd_result = LTTNG_OK;
1798 return 0;
1799 error:
1800 channel_info_destroy(new_channel_info);
1801 session_info_put(session_info);
1802 return 1;
1803 }
1804
1805 static
1806 void free_channel_trigger_list_rcu(struct rcu_head *node)
1807 {
1808 free(caa_container_of(node, struct lttng_channel_trigger_list,
1809 rcu_node));
1810 }
1811
1812 static
1813 void free_channel_state_sample_rcu(struct rcu_head *node)
1814 {
1815 free(caa_container_of(node, struct channel_state_sample,
1816 rcu_node));
1817 }
1818
1819 static
1820 int handle_notification_thread_command_remove_channel(
1821 struct notification_thread_state *state,
1822 uint64_t channel_key, enum lttng_domain_type domain,
1823 enum lttng_error_code *cmd_result)
1824 {
1825 struct cds_lfht_node *node;
1826 struct cds_lfht_iter iter;
1827 struct lttng_channel_trigger_list *trigger_list;
1828 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1829 struct channel_key key = { .key = channel_key, .domain = domain };
1830 struct channel_info *channel_info;
1831
1832 DBG("Removing channel key = %" PRIu64 " in %s domain",
1833 channel_key, lttng_domain_type_str(domain));
1834
1835 rcu_read_lock();
1836
1837 cds_lfht_lookup(state->channel_triggers_ht,
1838 hash_channel_key(&key),
1839 match_channel_trigger_list,
1840 &key,
1841 &iter);
1842 node = cds_lfht_iter_get_node(&iter);
1843 /*
1844 * There is a severe internal error if we are being asked to remove a
1845 * channel that doesn't exist.
1846 */
1847 if (!node) {
1848 ERR("Channel being removed is unknown to the notification thread");
1849 goto end;
1850 }
1851
1852 /* Free the list of triggers associated with this channel. */
1853 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1854 channel_triggers_ht_node);
1855 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1856 &trigger_list->list, node) {
1857 cds_list_del(&trigger_list_element->node);
1858 free(trigger_list_element);
1859 }
1860 cds_lfht_del(state->channel_triggers_ht, node);
1861 call_rcu(&trigger_list->rcu_node, free_channel_trigger_list_rcu);
1862
1863 /* Free sampled channel state. */
1864 cds_lfht_lookup(state->channel_state_ht,
1865 hash_channel_key(&key),
1866 match_channel_state_sample,
1867 &key,
1868 &iter);
1869 node = cds_lfht_iter_get_node(&iter);
1870 /*
1871 * This is expected to be NULL if the channel is destroyed before we
1872 * received a sample.
1873 */
1874 if (node) {
1875 struct channel_state_sample *sample = caa_container_of(node,
1876 struct channel_state_sample,
1877 channel_state_ht_node);
1878
1879 cds_lfht_del(state->channel_state_ht, node);
1880 call_rcu(&sample->rcu_node, free_channel_state_sample_rcu);
1881 }
1882
1883 /* Remove the channel from the channels_ht and free it. */
1884 cds_lfht_lookup(state->channels_ht,
1885 hash_channel_key(&key),
1886 match_channel_info,
1887 &key,
1888 &iter);
1889 node = cds_lfht_iter_get_node(&iter);
1890 LTTNG_ASSERT(node);
1891 channel_info = caa_container_of(node, struct channel_info,
1892 channels_ht_node);
1893 cds_lfht_del(state->channels_ht, node);
1894 channel_info_destroy(channel_info);
1895 end:
1896 rcu_read_unlock();
1897 *cmd_result = LTTNG_OK;
1898 return 0;
1899 }
1900
1901 static
1902 int handle_notification_thread_command_session_rotation(
1903 struct notification_thread_state *state,
1904 enum notification_thread_command_type cmd_type,
1905 const char *session_name, uid_t session_uid, gid_t session_gid,
1906 uint64_t trace_archive_chunk_id,
1907 struct lttng_trace_archive_location *location,
1908 enum lttng_error_code *_cmd_result)
1909 {
1910 int ret = 0;
1911 enum lttng_error_code cmd_result = LTTNG_OK;
1912 struct lttng_session_trigger_list *trigger_list;
1913 struct lttng_trigger_list_element *trigger_list_element;
1914 struct session_info *session_info;
1915 const struct lttng_credentials session_creds = {
1916 .uid = LTTNG_OPTIONAL_INIT_VALUE(session_uid),
1917 .gid = LTTNG_OPTIONAL_INIT_VALUE(session_gid),
1918 };
1919
1920 rcu_read_lock();
1921
1922 session_info = find_or_create_session_info(state, session_name,
1923 session_uid, session_gid);
1924 if (!session_info) {
1925 /* Allocation error or an internal error occurred. */
1926 ret = -1;
1927 cmd_result = LTTNG_ERR_NOMEM;
1928 goto end;
1929 }
1930
1931 session_info->rotation.ongoing =
1932 cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
1933 session_info->rotation.id = trace_archive_chunk_id;
1934 trigger_list = get_session_trigger_list(state, session_name);
1935 if (!trigger_list) {
1936 DBG("No triggers applying to session \"%s\" found",
1937 session_name);
1938 goto end;
1939 }
1940
1941 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
1942 node) {
1943 const struct lttng_condition *condition;
1944 struct lttng_trigger *trigger;
1945 struct notification_client_list *client_list;
1946 struct lttng_evaluation *evaluation = NULL;
1947 enum lttng_condition_type condition_type;
1948 enum action_executor_status executor_status;
1949
1950 trigger = trigger_list_element->trigger;
1951 condition = lttng_trigger_get_const_condition(trigger);
1952 LTTNG_ASSERT(condition);
1953 condition_type = lttng_condition_get_type(condition);
1954
1955 if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING &&
1956 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1957 continue;
1958 } else if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED &&
1959 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED) {
1960 continue;
1961 }
1962
1963 client_list = get_client_list_from_condition(state, condition);
1964 if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1965 evaluation = lttng_evaluation_session_rotation_ongoing_create(
1966 trace_archive_chunk_id);
1967 } else {
1968 evaluation = lttng_evaluation_session_rotation_completed_create(
1969 trace_archive_chunk_id, location);
1970 }
1971
1972 if (!evaluation) {
1973 /* Internal error */
1974 ret = -1;
1975 cmd_result = LTTNG_ERR_UNK;
1976 goto put_list;
1977 }
1978
1979 /*
1980 * Ownership of `evaluation` transferred to the action executor
1981 * no matter the result.
1982 */
1983 executor_status = action_executor_enqueue_trigger(
1984 state->executor, trigger, evaluation,
1985 &session_creds, client_list);
1986 evaluation = NULL;
1987 switch (executor_status) {
1988 case ACTION_EXECUTOR_STATUS_OK:
1989 break;
1990 case ACTION_EXECUTOR_STATUS_ERROR:
1991 case ACTION_EXECUTOR_STATUS_INVALID:
1992 /*
1993 * TODO Add trigger identification (name/id) when
1994 * it is added to the API.
1995 */
1996 ERR("Fatal error occurred while enqueuing action associated with session rotation trigger");
1997 ret = -1;
1998 goto put_list;
1999 case ACTION_EXECUTOR_STATUS_OVERFLOW:
2000 /*
2001 * TODO Add trigger identification (name/id) when
2002 * it is added to the API.
2003 *
2004 * Not a fatal error.
2005 */
2006 WARN("No space left when enqueuing action associated with session rotation trigger");
2007 ret = 0;
2008 goto put_list;
2009 default:
2010 abort();
2011 }
2012
2013 put_list:
2014 notification_client_list_put(client_list);
2015 if (caa_unlikely(ret)) {
2016 break;
2017 }
2018 }
2019 end:
2020 session_info_put(session_info);
2021 *_cmd_result = cmd_result;
2022 rcu_read_unlock();
2023 return ret;
2024 }
2025
2026 static
2027 int handle_notification_thread_command_add_tracer_event_source(
2028 struct notification_thread_state *state,
2029 int tracer_event_source_fd,
2030 enum lttng_domain_type domain_type,
2031 enum lttng_error_code *_cmd_result)
2032 {
2033 int ret = 0;
2034 enum lttng_error_code cmd_result = LTTNG_OK;
2035 struct notification_event_tracer_event_source_element *element = NULL;
2036
2037 element = zmalloc<notification_event_tracer_event_source_element>();
2038 if (!element) {
2039 cmd_result = LTTNG_ERR_NOMEM;
2040 ret = -1;
2041 goto end;
2042 }
2043
2044 element->fd = tracer_event_source_fd;
2045 element->domain = domain_type;
2046
2047 cds_list_add(&element->node, &state->tracer_event_sources_list);
2048
2049 DBG3("Adding tracer event source fd to poll set: tracer_event_source_fd = %d, domain = '%s'",
2050 tracer_event_source_fd,
2051 lttng_domain_type_str(domain_type));
2052
2053 /* Adding the read side pipe to the event poll. */
2054 ret = lttng_poll_add(&state->events, tracer_event_source_fd, LPOLLPRI | LPOLLIN | LPOLLERR);
2055 if (ret < 0) {
2056 ERR("Failed to add tracer event source to poll set: tracer_event_source_fd = %d, domain = '%s'",
2057 tracer_event_source_fd,
2058 lttng_domain_type_str(element->domain));
2059 cds_list_del(&element->node);
2060 free(element);
2061 goto end;
2062 }
2063
2064 element->is_fd_in_poll_set = true;
2065
2066 end:
2067 *_cmd_result = cmd_result;
2068 return ret;
2069 }
2070
2071 static
2072 int drain_event_notifier_notification_pipe(
2073 struct notification_thread_state *state,
2074 int pipe, enum lttng_domain_type domain)
2075 {
2076 struct lttng_poll_event events = {};
2077 int ret;
2078
2079 ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC);
2080 if (ret < 0) {
2081 ERR("Error creating lttng_poll_event");
2082 goto end;
2083 }
2084
2085 ret = lttng_poll_add(&events, pipe, LPOLLIN);
2086 if (ret < 0) {
2087 ERR("Error adding fd event notifier notification pipe to lttng_poll_event: fd = %d",
2088 pipe);
2089 goto end;
2090 }
2091
2092 while (true) {
2093 /*
2094 * Continue to consume notifications as long as there are new
2095 * ones coming in. The tracer has been asked to stop producing
2096 * them.
2097 *
2098 * LPOLLIN is explicitly checked since LPOLLHUP is implicitly
2099 * monitored (on Linux, at least) and will be returned when
2100 * the pipe is closed but empty.
2101 */
2102 ret = lttng_poll_wait_interruptible(&events, 0);
2103 if (ret == 0 || (LTTNG_POLL_GETEV(&events, 0) & LPOLLIN) == 0) {
2104 /* No more notification to be read on this pipe. */
2105 ret = 0;
2106 goto end;
2107 } else if (ret < 0) {
2108 PERROR("Failed on lttng_poll_wait_interruptible() call");
2109 ret = -1;
2110 goto end;
2111 }
2112
2113 ret = handle_one_event_notifier_notification(state, pipe, domain);
2114 if (ret) {
2115 ERR("Error consuming an event notifier notification from pipe: fd = %d",
2116 pipe);
2117 }
2118 }
2119 end:
2120 lttng_poll_clean(&events);
2121 return ret;
2122 }
2123
2124 static
2125 struct notification_event_tracer_event_source_element *
2126 find_tracer_event_source_element(struct notification_thread_state *state,
2127 int tracer_event_source_fd)
2128 {
2129 struct notification_event_tracer_event_source_element *source_element;
2130
2131 cds_list_for_each_entry(source_element,
2132 &state->tracer_event_sources_list, node) {
2133 if (source_element->fd == tracer_event_source_fd) {
2134 goto end;
2135 }
2136 }
2137
2138 source_element = NULL;
2139 end:
2140 return source_element;
2141 }
2142
2143 static
2144 int remove_tracer_event_source_from_pollset(
2145 struct notification_thread_state *state,
2146 struct notification_event_tracer_event_source_element *source_element)
2147 {
2148 int ret = 0;
2149
2150 LTTNG_ASSERT(source_element->is_fd_in_poll_set);
2151
2152 DBG3("Removing tracer event source from poll set: tracer_event_source_fd = %d, domain = '%s'",
2153 source_element->fd,
2154 lttng_domain_type_str(source_element->domain));
2155
2156 /* Removing the fd from the event poll set. */
2157 ret = lttng_poll_del(&state->events, source_element->fd);
2158 if (ret < 0) {
2159 ERR("Failed to remove tracer event source from poll set: tracer_event_source_fd = %d, domain = '%s'",
2160 source_element->fd,
2161 lttng_domain_type_str(source_element->domain));
2162 ret = -1;
2163 goto end;
2164 }
2165
2166 source_element->is_fd_in_poll_set = false;
2167
2168 /*
2169 * Force the notification thread to restart the poll() loop to ensure
2170 * that any events from the removed fd are removed.
2171 */
2172 state->restart_poll = true;
2173
2174 ret = drain_event_notifier_notification_pipe(state, source_element->fd,
2175 source_element->domain);
2176 if (ret) {
2177 ERR("Error draining event notifier notification: tracer_event_source_fd = %d, domain = %s",
2178 source_element->fd,
2179 lttng_domain_type_str(source_element->domain));
2180 ret = -1;
2181 goto end;
2182 }
2183
2184 end:
2185 return ret;
2186 }
2187
2188 int handle_notification_thread_tracer_event_source_died(
2189 struct notification_thread_state *state,
2190 int tracer_event_source_fd)
2191 {
2192 int ret = 0;
2193 struct notification_event_tracer_event_source_element *source_element;
2194
2195 source_element = find_tracer_event_source_element(state,
2196 tracer_event_source_fd);
2197
2198 LTTNG_ASSERT(source_element);
2199
2200 ret = remove_tracer_event_source_from_pollset(state, source_element);
2201 if (ret) {
2202 ERR("Failed to remove dead tracer event source from poll set");
2203 }
2204
2205 return ret;
2206 }
2207
2208 static
2209 int handle_notification_thread_command_remove_tracer_event_source(
2210 struct notification_thread_state *state,
2211 int tracer_event_source_fd,
2212 enum lttng_error_code *_cmd_result)
2213 {
2214 int ret = 0;
2215 enum lttng_error_code cmd_result = LTTNG_OK;
2216 struct notification_event_tracer_event_source_element *source_element = NULL;
2217
2218 source_element = find_tracer_event_source_element(state,
2219 tracer_event_source_fd);
2220
2221 LTTNG_ASSERT(source_element);
2222
2223 /* Remove the tracer source from the list. */
2224 cds_list_del(&source_element->node);
2225
2226 if (!source_element->is_fd_in_poll_set) {
2227 /* Skip the poll set removal. */
2228 goto end;
2229 }
2230
2231 ret = remove_tracer_event_source_from_pollset(state, source_element);
2232 if (ret) {
2233 ERR("Failed to remove tracer event source from poll set");
2234 cmd_result = LTTNG_ERR_FATAL;
2235 }
2236
2237 end:
2238 free(source_element);
2239 *_cmd_result = cmd_result;
2240 return ret;
2241 }
2242
2243 static int handle_notification_thread_command_list_triggers(
2244 struct notification_thread_handle *handle __attribute__((unused)),
2245 struct notification_thread_state *state,
2246 uid_t client_uid,
2247 struct lttng_triggers **triggers,
2248 enum lttng_error_code *_cmd_result)
2249 {
2250 int ret = 0;
2251 enum lttng_error_code cmd_result = LTTNG_OK;
2252 struct cds_lfht_iter iter;
2253 struct lttng_trigger_ht_element *trigger_ht_element;
2254 struct lttng_triggers *local_triggers = NULL;
2255 const struct lttng_credentials *creds;
2256
2257 rcu_read_lock();
2258
2259 local_triggers = lttng_triggers_create();
2260 if (!local_triggers) {
2261 /* Not a fatal error. */
2262 cmd_result = LTTNG_ERR_NOMEM;
2263 goto end;
2264 }
2265
2266 cds_lfht_for_each_entry(state->triggers_ht, &iter,
2267 trigger_ht_element, node) {
2268 /*
2269 * Only return the triggers to which the client has access.
2270 * The root user has visibility over all triggers.
2271 */
2272 creds = lttng_trigger_get_credentials(trigger_ht_element->trigger);
2273 if (client_uid != lttng_credentials_get_uid(creds) && client_uid != 0) {
2274 continue;
2275 }
2276
2277 ret = lttng_triggers_add(local_triggers,
2278 trigger_ht_element->trigger);
2279 if (ret < 0) {
2280 /* Not a fatal error. */
2281 ret = 0;
2282 cmd_result = LTTNG_ERR_NOMEM;
2283 goto end;
2284 }
2285 }
2286
2287 /* Transferring ownership to the caller. */
2288 *triggers = local_triggers;
2289 local_triggers = NULL;
2290
2291 end:
2292 rcu_read_unlock();
2293 lttng_triggers_destroy(local_triggers);
2294 *_cmd_result = cmd_result;
2295 return ret;
2296 }
2297
2298 static inline void get_trigger_info_for_log(const struct lttng_trigger *trigger,
2299 const char **trigger_name,
2300 uid_t *trigger_owner_uid)
2301 {
2302 enum lttng_trigger_status trigger_status;
2303
2304 trigger_status = lttng_trigger_get_name(trigger, trigger_name);
2305 switch (trigger_status) {
2306 case LTTNG_TRIGGER_STATUS_OK:
2307 break;
2308 case LTTNG_TRIGGER_STATUS_UNSET:
2309 *trigger_name = "(anonymous)";
2310 break;
2311 default:
2312 abort();
2313 }
2314
2315 trigger_status = lttng_trigger_get_owner_uid(trigger,
2316 trigger_owner_uid);
2317 LTTNG_ASSERT(trigger_status == LTTNG_TRIGGER_STATUS_OK);
2318 }
2319
2320 static int handle_notification_thread_command_get_trigger(
2321 struct notification_thread_state *state,
2322 const struct lttng_trigger *trigger,
2323 struct lttng_trigger **registered_trigger,
2324 enum lttng_error_code *_cmd_result)
2325 {
2326 int ret = -1;
2327 struct cds_lfht_iter iter;
2328 struct lttng_trigger_ht_element *trigger_ht_element;
2329 enum lttng_error_code cmd_result = LTTNG_ERR_TRIGGER_NOT_FOUND;
2330 const char *trigger_name;
2331 uid_t trigger_owner_uid;
2332
2333 rcu_read_lock();
2334
2335 cds_lfht_for_each_entry(
2336 state->triggers_ht, &iter, trigger_ht_element, node) {
2337 if (lttng_trigger_is_equal(
2338 trigger, trigger_ht_element->trigger)) {
2339 /* Take one reference on the return trigger. */
2340 *registered_trigger = trigger_ht_element->trigger;
2341 lttng_trigger_get(*registered_trigger);
2342 ret = 0;
2343 cmd_result = LTTNG_OK;
2344 goto end;
2345 }
2346 }
2347
2348 /* Not a fatal error if the trigger is not found. */
2349 get_trigger_info_for_log(trigger, &trigger_name, &trigger_owner_uid);
2350 DBG("Failed to retrieve registered version of trigger: trigger name = '%s', trigger owner uid = %d",
2351 trigger_name, (int) trigger_owner_uid);
2352
2353 ret = 0;
2354
2355 end:
2356 rcu_read_unlock();
2357 *_cmd_result = cmd_result;
2358 return ret;
2359 }
2360
2361 static
2362 bool condition_is_supported(struct lttng_condition *condition)
2363 {
2364 bool is_supported;
2365
2366 switch (lttng_condition_get_type(condition)) {
2367 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
2368 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
2369 {
2370 int ret;
2371 enum lttng_domain_type domain;
2372
2373 ret = lttng_condition_buffer_usage_get_domain_type(condition,
2374 &domain);
2375 LTTNG_ASSERT(ret == 0);
2376
2377 if (domain != LTTNG_DOMAIN_KERNEL) {
2378 is_supported = true;
2379 goto end;
2380 }
2381
2382 /*
2383 * Older kernel tracers don't expose the API to monitor their
2384 * buffers. Therefore, we reject triggers that require that
2385 * mechanism to be available to be evaluated.
2386 *
2387 * Assume unsupported on error.
2388 */
2389 is_supported = kernel_supports_ring_buffer_snapshot_sample_positions() == 1;
2390 break;
2391 }
2392 case LTTNG_CONDITION_TYPE_EVENT_RULE_MATCHES:
2393 {
2394 const struct lttng_event_rule *event_rule;
2395 enum lttng_domain_type domain;
2396 const enum lttng_condition_status status =
2397 lttng_condition_event_rule_matches_get_rule(
2398 condition, &event_rule);
2399
2400 LTTNG_ASSERT(status == LTTNG_CONDITION_STATUS_OK);
2401
2402 domain = lttng_event_rule_get_domain_type(event_rule);
2403 if (domain != LTTNG_DOMAIN_KERNEL) {
2404 is_supported = true;
2405 goto end;
2406 }
2407
2408 /*
2409 * Older kernel tracers can't emit notification. Therefore, we
2410 * reject triggers that require that mechanism to be available
2411 * to be evaluated.
2412 *
2413 * Assume unsupported on error.
2414 */
2415 is_supported = kernel_supports_event_notifiers() == 1;
2416 break;
2417 }
2418 default:
2419 is_supported = true;
2420 }
2421 end:
2422 return is_supported;
2423 }
2424
2425 /* Must be called with RCU read lock held. */
2426 static
2427 int bind_trigger_to_matching_session(struct lttng_trigger *trigger,
2428 struct notification_thread_state *state)
2429 {
2430 int ret = 0;
2431 const struct lttng_condition *condition;
2432 const char *session_name;
2433 struct lttng_session_trigger_list *trigger_list;
2434
2435 ASSERT_RCU_READ_LOCKED();
2436
2437 condition = lttng_trigger_get_const_condition(trigger);
2438 switch (lttng_condition_get_type(condition)) {
2439 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
2440 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
2441 {
2442 enum lttng_condition_status status;
2443
2444 status = lttng_condition_session_rotation_get_session_name(
2445 condition, &session_name);
2446 if (status != LTTNG_CONDITION_STATUS_OK) {
2447 ERR("Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
2448 ret = -1;
2449 goto end;
2450 }
2451 break;
2452 }
2453 default:
2454 ret = -1;
2455 goto end;
2456 }
2457
2458 trigger_list = get_session_trigger_list(state, session_name);
2459 if (!trigger_list) {
2460 DBG("Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
2461 session_name);
2462 goto end;
2463
2464 }
2465
2466 DBG("Newly registered trigger bound to session \"%s\"",
2467 session_name);
2468 ret = lttng_session_trigger_list_add(trigger_list, trigger);
2469 end:
2470 return ret;
2471 }
2472
2473 /* Must be called with RCU read lock held. */
2474 static
2475 int bind_trigger_to_matching_channels(struct lttng_trigger *trigger,
2476 struct notification_thread_state *state)
2477 {
2478 int ret = 0;
2479 struct cds_lfht_node *node;
2480 struct cds_lfht_iter iter;
2481 struct channel_info *channel;
2482
2483 ASSERT_RCU_READ_LOCKED();
2484
2485 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
2486 channels_ht_node) {
2487 struct lttng_trigger_list_element *trigger_list_element;
2488 struct lttng_channel_trigger_list *trigger_list;
2489 struct cds_lfht_iter lookup_iter;
2490
2491 if (!trigger_applies_to_channel(trigger, channel)) {
2492 continue;
2493 }
2494
2495 cds_lfht_lookup(state->channel_triggers_ht,
2496 hash_channel_key(&channel->key),
2497 match_channel_trigger_list,
2498 &channel->key,
2499 &lookup_iter);
2500 node = cds_lfht_iter_get_node(&lookup_iter);
2501 LTTNG_ASSERT(node);
2502 trigger_list = caa_container_of(node,
2503 struct lttng_channel_trigger_list,
2504 channel_triggers_ht_node);
2505
2506 trigger_list_element = zmalloc<lttng_trigger_list_element>();
2507 if (!trigger_list_element) {
2508 ret = -1;
2509 goto end;
2510 }
2511 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
2512 trigger_list_element->trigger = trigger;
2513 cds_list_add(&trigger_list_element->node, &trigger_list->list);
2514 DBG("Newly registered trigger bound to channel \"%s\"",
2515 channel->name);
2516 }
2517 end:
2518 return ret;
2519 }
2520
2521 static
2522 bool is_trigger_action_notify(const struct lttng_trigger *trigger)
2523 {
2524 bool is_notify = false;
2525 unsigned int i, count;
2526 enum lttng_action_status action_status;
2527 const struct lttng_action *action =
2528 lttng_trigger_get_const_action(trigger);
2529 enum lttng_action_type action_type;
2530
2531 LTTNG_ASSERT(action);
2532 action_type = lttng_action_get_type(action);
2533 if (action_type == LTTNG_ACTION_TYPE_NOTIFY) {
2534 is_notify = true;
2535 goto end;
2536 } else if (action_type != LTTNG_ACTION_TYPE_LIST) {
2537 goto end;
2538 }
2539
2540 action_status = lttng_action_list_get_count(action, &count);
2541 LTTNG_ASSERT(action_status == LTTNG_ACTION_STATUS_OK);
2542
2543 for (i = 0; i < count; i++) {
2544 const struct lttng_action *inner_action =
2545 lttng_action_list_get_at_index(
2546 action, i);
2547
2548 action_type = lttng_action_get_type(inner_action);
2549 if (action_type == LTTNG_ACTION_TYPE_NOTIFY) {
2550 is_notify = true;
2551 goto end;
2552 }
2553 }
2554
2555 end:
2556 return is_notify;
2557 }
2558
2559 static bool trigger_name_taken(struct notification_thread_state *state,
2560 const struct lttng_trigger *trigger)
2561 {
2562 struct cds_lfht_iter iter;
2563
2564 /*
2565 * No duplicata is allowed in the triggers_by_name_uid_ht.
2566 * The match is done against the trigger name and uid.
2567 */
2568 cds_lfht_lookup(state->triggers_by_name_uid_ht,
2569 hash_trigger_by_name_uid(trigger),
2570 match_trigger_by_name_uid,
2571 trigger,
2572 &iter);
2573 return !!cds_lfht_iter_get_node(&iter);
2574 }
2575
2576 static
2577 enum lttng_error_code generate_trigger_name(
2578 struct notification_thread_state *state,
2579 struct lttng_trigger *trigger, const char **name)
2580 {
2581 enum lttng_error_code ret_code = LTTNG_OK;
2582 bool taken = false;
2583 enum lttng_trigger_status status;
2584
2585 do {
2586 const int ret = lttng_trigger_generate_name(trigger,
2587 state->trigger_id.name_offset++);
2588 if (ret) {
2589 /* The only reason this can fail right now. */
2590 ret_code = LTTNG_ERR_NOMEM;
2591 break;
2592 }
2593
2594 status = lttng_trigger_get_name(trigger, name);
2595 LTTNG_ASSERT(status == LTTNG_TRIGGER_STATUS_OK);
2596
2597 taken = trigger_name_taken(state, trigger);
2598 } while (taken || state->trigger_id.name_offset == UINT64_MAX);
2599
2600 return ret_code;
2601 }
2602
2603 static inline
2604 void notif_thread_state_remove_trigger_ht_elem(
2605 struct notification_thread_state *state,
2606 struct lttng_trigger_ht_element *trigger_ht_element)
2607 {
2608 LTTNG_ASSERT(state);
2609 LTTNG_ASSERT(trigger_ht_element);
2610
2611 cds_lfht_del(state->triggers_ht, &trigger_ht_element->node);
2612 cds_lfht_del(state->triggers_by_name_uid_ht, &trigger_ht_element->node_by_name_uid);
2613 }
2614
2615 static
2616 enum lttng_error_code setup_tracer_notifier(
2617 struct notification_thread_state *state,
2618 struct lttng_trigger *trigger)
2619 {
2620 enum lttng_error_code ret;
2621 enum event_notifier_error_accounting_status error_accounting_status;
2622 struct cds_lfht_node *node;
2623 uint64_t error_counter_index = 0;
2624 struct lttng_condition *condition = lttng_trigger_get_condition(trigger);
2625 struct notification_trigger_tokens_ht_element *trigger_tokens_ht_element = NULL;
2626
2627 trigger_tokens_ht_element = zmalloc<notification_trigger_tokens_ht_element>();
2628 if (!trigger_tokens_ht_element) {
2629 ret = LTTNG_ERR_NOMEM;
2630 goto end;
2631 }
2632
2633 /* Add trigger token to the trigger_tokens_ht. */
2634 cds_lfht_node_init(&trigger_tokens_ht_element->node);
2635 trigger_tokens_ht_element->token = LTTNG_OPTIONAL_GET(trigger->tracer_token);
2636 trigger_tokens_ht_element->trigger = trigger;
2637
2638 node = cds_lfht_add_unique(state->trigger_tokens_ht,
2639 hash_key_u64(&trigger_tokens_ht_element->token, lttng_ht_seed),
2640 match_trigger_token,
2641 &trigger_tokens_ht_element->token,
2642 &trigger_tokens_ht_element->node);
2643 if (node != &trigger_tokens_ht_element->node) {
2644 ret = LTTNG_ERR_TRIGGER_EXISTS;
2645 goto error_free_ht_element;
2646 }
2647
2648 error_accounting_status = event_notifier_error_accounting_register_event_notifier(
2649 trigger, &error_counter_index);
2650 if (error_accounting_status != EVENT_NOTIFIER_ERROR_ACCOUNTING_STATUS_OK) {
2651 if (error_accounting_status == EVENT_NOTIFIER_ERROR_ACCOUNTING_STATUS_NO_INDEX_AVAILABLE) {
2652 DBG("Trigger list error accounting counter full.");
2653 ret = LTTNG_ERR_EVENT_NOTIFIER_ERROR_ACCOUNTING_FULL;
2654 } else {
2655 ERR("Error registering trigger for error accounting");
2656 ret = LTTNG_ERR_EVENT_NOTIFIER_REGISTRATION;
2657 }
2658
2659 goto error_remove_ht_element;
2660 }
2661
2662 lttng_condition_event_rule_matches_set_error_counter_index(
2663 condition, error_counter_index);
2664
2665 ret = LTTNG_OK;
2666 goto end;
2667
2668 error_remove_ht_element:
2669 cds_lfht_del(state->trigger_tokens_ht, &trigger_tokens_ht_element->node);
2670 error_free_ht_element:
2671 free(trigger_tokens_ht_element);
2672 end:
2673 return ret;
2674 }
2675
2676 /*
2677 * FIXME A client's credentials are not checked when registering a trigger.
2678 *
2679 * The effects of this are benign since:
2680 * - The client will succeed in registering the trigger, as it is valid,
2681 * - The trigger will, internally, be bound to the channel/session,
2682 * - The notifications will not be sent since the client's credentials
2683 * are checked against the channel at that moment.
2684 *
2685 * If this function returns a non-zero value, it means something is
2686 * fundamentally broken and the whole subsystem/thread will be torn down.
2687 *
2688 * If a non-fatal error occurs, just set the cmd_result to the appropriate
2689 * error code.
2690 */
2691 static
2692 int handle_notification_thread_command_register_trigger(
2693 struct notification_thread_state *state,
2694 struct lttng_trigger *trigger,
2695 bool is_trigger_anonymous,
2696 enum lttng_error_code *cmd_result)
2697 {
2698 int ret = 0;
2699 struct lttng_condition *condition;
2700 struct notification_client_list *client_list = NULL;
2701 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2702 struct cds_lfht_node *node;
2703 const char* trigger_name;
2704 bool free_trigger = true;
2705 struct lttng_evaluation *evaluation = NULL;
2706 struct lttng_credentials object_creds;
2707 uid_t object_uid;
2708 gid_t object_gid;
2709 enum action_executor_status executor_status;
2710 const uint64_t trigger_tracer_token =
2711 state->trigger_id.next_tracer_token++;
2712
2713 rcu_read_lock();
2714
2715 /* Set the trigger's tracer token. */
2716 lttng_trigger_set_tracer_token(trigger, trigger_tracer_token);
2717
2718 if (!is_trigger_anonymous) {
2719 if (lttng_trigger_get_name(trigger, &trigger_name) ==
2720 LTTNG_TRIGGER_STATUS_UNSET) {
2721 const enum lttng_error_code ret_code =
2722 generate_trigger_name(state, trigger,
2723 &trigger_name);
2724
2725 if (ret_code != LTTNG_OK) {
2726 /* Fatal error. */
2727 ret = -1;
2728 *cmd_result = ret_code;
2729 goto error;
2730 }
2731 } else if (trigger_name_taken(state, trigger)) {
2732 /* Not a fatal error. */
2733 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2734 ret = 0;
2735 goto error;
2736 }
2737 } else {
2738 trigger_name = "(anonymous)";
2739 }
2740
2741 condition = lttng_trigger_get_condition(trigger);
2742 LTTNG_ASSERT(condition);
2743
2744 /* Some conditions require tracers to implement a minimal ABI version. */
2745 if (!condition_is_supported(condition)) {
2746 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
2747 goto error;
2748 }
2749
2750 trigger_ht_element = zmalloc<lttng_trigger_ht_element>();
2751 if (!trigger_ht_element) {
2752 ret = -1;
2753 goto error;
2754 }
2755
2756 /* Add trigger to the trigger_ht. */
2757 cds_lfht_node_init(&trigger_ht_element->node);
2758 cds_lfht_node_init(&trigger_ht_element->node_by_name_uid);
2759 trigger_ht_element->trigger = trigger;
2760
2761 node = cds_lfht_add_unique(state->triggers_ht,
2762 lttng_condition_hash(condition),
2763 match_trigger,
2764 trigger,
2765 &trigger_ht_element->node);
2766 if (node != &trigger_ht_element->node) {
2767 /* Not a fatal error, simply report it to the client. */
2768 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2769 goto error_free_ht_element;
2770 }
2771
2772 node = cds_lfht_add_unique(state->triggers_by_name_uid_ht,
2773 hash_trigger_by_name_uid(trigger),
2774 match_trigger_by_name_uid,
2775 trigger,
2776 &trigger_ht_element->node_by_name_uid);
2777 if (node != &trigger_ht_element->node_by_name_uid) {
2778 /* Internal error: add to triggers_ht should have failed. */
2779 ret = -1;
2780 goto error_free_ht_element;
2781 }
2782
2783 /* From this point consider the trigger registered. */
2784 lttng_trigger_set_as_registered(trigger);
2785
2786 /*
2787 * Some triggers might need a tracer notifier depending on its
2788 * condition and actions.
2789 */
2790 if (lttng_trigger_needs_tracer_notifier(trigger)) {
2791 enum lttng_error_code error_code;
2792
2793 error_code = setup_tracer_notifier(state, trigger);
2794 if (error_code != LTTNG_OK) {
2795 notif_thread_state_remove_trigger_ht_elem(state,
2796 trigger_ht_element);
2797 if (error_code == LTTNG_ERR_NOMEM) {
2798 ret = -1;
2799 } else {
2800 *cmd_result = error_code;
2801 ret = 0;
2802 }
2803
2804 goto error_free_ht_element;
2805 }
2806 }
2807
2808 /*
2809 * The rest only applies to triggers that have a "notify" action.
2810 * It is not skipped as this is the only action type currently
2811 * supported.
2812 */
2813 if (is_trigger_action_notify(trigger)) {
2814 /*
2815 * Find or create the client list of this condition. It may
2816 * already be present if another trigger is already registered
2817 * with the same condition.
2818 */
2819 client_list = get_client_list_from_condition(state, condition);
2820 if (!client_list) {
2821 /*
2822 * No client list for this condition yet. We create new
2823 * one and build it up.
2824 */
2825 client_list = notification_client_list_create(state, condition);
2826 if (!client_list) {
2827 ERR("Error creating notification client list for trigger %s", trigger->name);
2828 goto error_free_ht_element;
2829 }
2830 }
2831
2832 CDS_INIT_LIST_HEAD(&trigger_ht_element->client_list_trigger_node);
2833
2834 pthread_mutex_lock(&client_list->lock);
2835 cds_list_add(&trigger_ht_element->client_list_trigger_node, &client_list->triggers_list);
2836 pthread_mutex_unlock(&client_list->lock);
2837 }
2838
2839 /*
2840 * Ownership of the trigger and of its wrapper was transfered to
2841 * the triggers_ht. Same for token ht element if necessary.
2842 */
2843 trigger_ht_element = NULL;
2844 free_trigger = false;
2845
2846 switch (get_condition_binding_object(condition)) {
2847 case LTTNG_OBJECT_TYPE_SESSION:
2848 /* Add the trigger to the list if it matches a known session. */
2849 ret = bind_trigger_to_matching_session(trigger, state);
2850 if (ret) {
2851 goto error_free_ht_element;
2852 }
2853 break;
2854 case LTTNG_OBJECT_TYPE_CHANNEL:
2855 /*
2856 * Add the trigger to list of triggers bound to the channels
2857 * currently known.
2858 */
2859 ret = bind_trigger_to_matching_channels(trigger, state);
2860 if (ret) {
2861 goto error_free_ht_element;
2862 }
2863 break;
2864 case LTTNG_OBJECT_TYPE_NONE:
2865 break;
2866 default:
2867 ERR("Unknown object type on which to bind a newly registered trigger was encountered");
2868 ret = -1;
2869 goto error_free_ht_element;
2870 }
2871
2872 /*
2873 * The new trigger's condition must be evaluated against the current
2874 * state.
2875 *
2876 * In the case of `notify` action, nothing preventing clients from
2877 * subscribing to a condition before the corresponding trigger is
2878 * registered, we have to evaluate this new condition right away.
2879 *
2880 * At some point, we were waiting for the next "evaluation" (e.g. on
2881 * reception of a channel sample) to evaluate this new condition, but
2882 * that was broken.
2883 *
2884 * The reason it was broken is that waiting for the next sample
2885 * does not allow us to properly handle transitions for edge-triggered
2886 * conditions.
2887 *
2888 * Consider this example: when we handle a new channel sample, we
2889 * evaluate each conditions twice: once with the previous state, and
2890 * again with the newest state. We then use those two results to
2891 * determine whether a state change happened: a condition was false and
2892 * became true. If a state change happened, we have to notify clients.
2893 *
2894 * Now, if a client subscribes to a given notification and registers
2895 * a trigger *after* that subscription, we have to make sure the
2896 * condition is evaluated at this point while considering only the
2897 * current state. Otherwise, the next evaluation cycle may only see
2898 * that the evaluations remain the same (true for samples n-1 and n) and
2899 * the client will never know that the condition has been met.
2900 */
2901 switch (get_condition_binding_object(condition)) {
2902 case LTTNG_OBJECT_TYPE_SESSION:
2903 ret = evaluate_session_condition_for_client(condition, state,
2904 &evaluation, &object_uid,
2905 &object_gid);
2906 LTTNG_OPTIONAL_SET(&object_creds.uid, object_uid);
2907 LTTNG_OPTIONAL_SET(&object_creds.gid, object_gid);
2908 break;
2909 case LTTNG_OBJECT_TYPE_CHANNEL:
2910 ret = evaluate_channel_condition_for_client(condition, state,
2911 &evaluation, &object_uid,
2912 &object_gid);
2913 LTTNG_OPTIONAL_SET(&object_creds.uid, object_uid);
2914 LTTNG_OPTIONAL_SET(&object_creds.gid, object_gid);
2915 break;
2916 case LTTNG_OBJECT_TYPE_NONE:
2917 ret = 0;
2918 break;
2919 case LTTNG_OBJECT_TYPE_UNKNOWN:
2920 default:
2921 ret = -1;
2922 break;
2923 }
2924
2925 if (ret) {
2926 /* Fatal error. */
2927 goto error_free_ht_element;
2928 }
2929
2930 DBG("Newly registered trigger's condition evaluated to %s",
2931 evaluation ? "true" : "false");
2932 if (!evaluation) {
2933 /* Evaluation yielded nothing. Normal exit. */
2934 ret = 0;
2935 goto success;
2936 }
2937
2938 /*
2939 * Ownership of `evaluation` transferred to the action executor
2940 * no matter the result.
2941 */
2942 executor_status = action_executor_enqueue_trigger(state->executor,
2943 trigger, evaluation, &object_creds, client_list);
2944 evaluation = NULL;
2945 switch (executor_status) {
2946 case ACTION_EXECUTOR_STATUS_OK:
2947 break;
2948 case ACTION_EXECUTOR_STATUS_ERROR:
2949 case ACTION_EXECUTOR_STATUS_INVALID:
2950 /*
2951 * TODO Add trigger identification (name/id) when
2952 * it is added to the API.
2953 */
2954 ERR("Fatal error occurred while enqueuing action associated to newly registered trigger");
2955 ret = -1;
2956 goto error_free_ht_element;
2957 case ACTION_EXECUTOR_STATUS_OVERFLOW:
2958 /*
2959 * TODO Add trigger identification (name/id) when
2960 * it is added to the API.
2961 *
2962 * Not a fatal error.
2963 */
2964 WARN("No space left when enqueuing action associated to newly registered trigger");
2965 ret = 0;
2966 goto success;
2967 default:
2968 abort();
2969 }
2970
2971 success:
2972 *cmd_result = LTTNG_OK;
2973 DBG("Registered trigger: name = `%s`, tracer token = %" PRIu64,
2974 trigger_name, trigger_tracer_token);
2975 goto end;
2976
2977 error_free_ht_element:
2978 if (trigger_ht_element) {
2979 /* Delayed removal due to RCU constraint on delete. */
2980 call_rcu(&trigger_ht_element->rcu_node,
2981 free_lttng_trigger_ht_element_rcu);
2982 }
2983 error:
2984 if (free_trigger) {
2985 /*
2986 * Other objects might have a reference to the trigger, mark it
2987 * as unregistered.
2988 */
2989 lttng_trigger_set_as_unregistered(trigger);
2990 lttng_trigger_destroy(trigger);
2991 }
2992 end:
2993 rcu_read_unlock();
2994 return ret;
2995 }
2996
2997 static
2998 void free_lttng_trigger_ht_element_rcu(struct rcu_head *node)
2999 {
3000 free(caa_container_of(node, struct lttng_trigger_ht_element,
3001 rcu_node));
3002 }
3003
3004 static
3005 void free_notification_trigger_tokens_ht_element_rcu(struct rcu_head *node)
3006 {
3007 free(caa_container_of(node, struct notification_trigger_tokens_ht_element,
3008 rcu_node));
3009 }
3010
3011 static
3012 void teardown_tracer_notifier(struct notification_thread_state *state,
3013 const struct lttng_trigger *trigger)
3014 {
3015 struct cds_lfht_iter iter;
3016 struct notification_trigger_tokens_ht_element *trigger_tokens_ht_element;
3017
3018 cds_lfht_for_each_entry(state->trigger_tokens_ht, &iter,
3019 trigger_tokens_ht_element, node) {
3020
3021 if (!lttng_trigger_is_equal(trigger,
3022 trigger_tokens_ht_element->trigger)) {
3023 continue;
3024 }
3025
3026 event_notifier_error_accounting_unregister_event_notifier(
3027 trigger_tokens_ht_element->trigger);
3028
3029 /* TODO talk to all app and remove it */
3030 DBG("Removed trigger from tokens_ht");
3031 cds_lfht_del(state->trigger_tokens_ht,
3032 &trigger_tokens_ht_element->node);
3033
3034 call_rcu(&trigger_tokens_ht_element->rcu_node,
3035 free_notification_trigger_tokens_ht_element_rcu);
3036
3037 break;
3038 }
3039 }
3040
3041 static
3042 int handle_notification_thread_command_unregister_trigger(
3043 struct notification_thread_state *state,
3044 const struct lttng_trigger *trigger,
3045 enum lttng_error_code *_cmd_reply)
3046 {
3047 struct cds_lfht_iter iter;
3048 struct cds_lfht_node *triggers_ht_node;
3049 struct lttng_channel_trigger_list *trigger_list;
3050 struct notification_client_list *client_list;
3051 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
3052 const struct lttng_condition *condition = lttng_trigger_get_const_condition(
3053 trigger);
3054 enum lttng_error_code cmd_reply;
3055
3056 rcu_read_lock();
3057
3058 cds_lfht_lookup(state->triggers_ht,
3059 lttng_condition_hash(condition),
3060 match_trigger,
3061 trigger,
3062 &iter);
3063 triggers_ht_node = cds_lfht_iter_get_node(&iter);
3064 if (!triggers_ht_node) {
3065 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
3066 goto end;
3067 } else {
3068 cmd_reply = LTTNG_OK;
3069 }
3070
3071 trigger_ht_element = caa_container_of(triggers_ht_node,
3072 struct lttng_trigger_ht_element, node);
3073
3074 /* Remove trigger from channel_triggers_ht. */
3075 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
3076 channel_triggers_ht_node) {
3077 struct lttng_trigger_list_element *trigger_element, *tmp;
3078
3079 cds_list_for_each_entry_safe(trigger_element, tmp,
3080 &trigger_list->list, node) {
3081 if (!lttng_trigger_is_equal(trigger, trigger_element->trigger)) {
3082 continue;
3083 }
3084
3085 DBG("Removed trigger from channel_triggers_ht");
3086 cds_list_del(&trigger_element->node);
3087 /* A trigger can only appear once per channel */
3088 break;
3089 }
3090 }
3091
3092 if (lttng_trigger_needs_tracer_notifier(trigger)) {
3093 teardown_tracer_notifier(state, trigger);
3094 }
3095
3096 if (is_trigger_action_notify(trigger)) {
3097 /*
3098 * Remove and release the client list from
3099 * notification_trigger_clients_ht.
3100 */
3101 client_list = get_client_list_from_condition(state, condition);
3102 LTTNG_ASSERT(client_list);
3103
3104 pthread_mutex_lock(&client_list->lock);
3105 cds_list_del(&trigger_ht_element->client_list_trigger_node);
3106 pthread_mutex_unlock(&client_list->lock);
3107
3108 /* Put new reference and the hashtable's reference. */
3109 notification_client_list_put(client_list);
3110 notification_client_list_put(client_list);
3111 client_list = NULL;
3112 }
3113
3114 /* Remove trigger from triggers_ht. */
3115 notif_thread_state_remove_trigger_ht_elem(state, trigger_ht_element);
3116
3117 /* Release the ownership of the trigger. */
3118 lttng_trigger_destroy(trigger_ht_element->trigger);
3119 call_rcu(&trigger_ht_element->rcu_node, free_lttng_trigger_ht_element_rcu);
3120 end:
3121 rcu_read_unlock();
3122 if (_cmd_reply) {
3123 *_cmd_reply = cmd_reply;
3124 }
3125 return 0;
3126 }
3127
3128 static
3129 int pop_cmd_queue(struct notification_thread_handle *handle,
3130 struct notification_thread_command **cmd)
3131 {
3132 int ret;
3133 uint64_t counter;
3134
3135 pthread_mutex_lock(&handle->cmd_queue.lock);
3136 ret = lttng_read(handle->cmd_queue.event_fd, &counter, sizeof(counter));
3137 if (ret != sizeof(counter)) {
3138 ret = -1;
3139 goto error_unlock;
3140 }
3141
3142 *cmd = cds_list_first_entry(&handle->cmd_queue.list,
3143 struct notification_thread_command, cmd_list_node);
3144 cds_list_del(&((*cmd)->cmd_list_node));
3145 ret = 0;
3146
3147 error_unlock:
3148 pthread_mutex_unlock(&handle->cmd_queue.lock);
3149 return ret;
3150 }
3151
3152 /* Returns 0 on success, 1 on exit requested, negative value on error. */
3153 int handle_notification_thread_command(
3154 struct notification_thread_handle *handle,
3155 struct notification_thread_state *state)
3156 {
3157 int ret;
3158 struct notification_thread_command *cmd;
3159
3160 ret = pop_cmd_queue(handle, &cmd);
3161 if (ret) {
3162 goto error;
3163 }
3164
3165 DBG("Received `%s` command",
3166 notification_command_type_str(cmd->type));
3167 switch (cmd->type) {
3168 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
3169 ret = handle_notification_thread_command_register_trigger(state,
3170 cmd->parameters.register_trigger.trigger,
3171 cmd->parameters.register_trigger.is_trigger_anonymous,
3172 &cmd->reply_code);
3173 break;
3174 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
3175 ret = handle_notification_thread_command_unregister_trigger(
3176 state,
3177 cmd->parameters.unregister_trigger.trigger,
3178 &cmd->reply_code);
3179 break;
3180 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
3181 ret = handle_notification_thread_command_add_channel(
3182 state,
3183 cmd->parameters.add_channel.session.name,
3184 cmd->parameters.add_channel.session.uid,
3185 cmd->parameters.add_channel.session.gid,
3186 cmd->parameters.add_channel.channel.name,
3187 cmd->parameters.add_channel.channel.domain,
3188 cmd->parameters.add_channel.channel.key,
3189 cmd->parameters.add_channel.channel.capacity,
3190 &cmd->reply_code);
3191 break;
3192 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
3193 ret = handle_notification_thread_command_remove_channel(
3194 state, cmd->parameters.remove_channel.key,
3195 cmd->parameters.remove_channel.domain,
3196 &cmd->reply_code);
3197 break;
3198 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
3199 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
3200 ret = handle_notification_thread_command_session_rotation(
3201 state,
3202 cmd->type,
3203 cmd->parameters.session_rotation.session_name,
3204 cmd->parameters.session_rotation.uid,
3205 cmd->parameters.session_rotation.gid,
3206 cmd->parameters.session_rotation.trace_archive_chunk_id,
3207 cmd->parameters.session_rotation.location,
3208 &cmd->reply_code);
3209 break;
3210 case NOTIFICATION_COMMAND_TYPE_ADD_TRACER_EVENT_SOURCE:
3211 ret = handle_notification_thread_command_add_tracer_event_source(
3212 state,
3213 cmd->parameters.tracer_event_source.tracer_event_source_fd,
3214 cmd->parameters.tracer_event_source.domain,
3215 &cmd->reply_code);
3216 break;
3217 case NOTIFICATION_COMMAND_TYPE_REMOVE_TRACER_EVENT_SOURCE:
3218 ret = handle_notification_thread_command_remove_tracer_event_source(
3219 state,
3220 cmd->parameters.tracer_event_source.tracer_event_source_fd,
3221 &cmd->reply_code);
3222 break;
3223 case NOTIFICATION_COMMAND_TYPE_LIST_TRIGGERS:
3224 {
3225 struct lttng_triggers *triggers = NULL;
3226
3227 ret = handle_notification_thread_command_list_triggers(
3228 handle,
3229 state,
3230 cmd->parameters.list_triggers.uid,
3231 &triggers,
3232 &cmd->reply_code);
3233 cmd->reply.list_triggers.triggers = triggers;
3234 ret = 0;
3235 break;
3236 }
3237 case NOTIFICATION_COMMAND_TYPE_QUIT:
3238 cmd->reply_code = LTTNG_OK;
3239 ret = 1;
3240 goto end;
3241 case NOTIFICATION_COMMAND_TYPE_GET_TRIGGER:
3242 {
3243 struct lttng_trigger *trigger = NULL;
3244
3245 ret = handle_notification_thread_command_get_trigger(state,
3246 cmd->parameters.get_trigger.trigger, &trigger,
3247 &cmd->reply_code);
3248 cmd->reply.get_trigger.trigger = trigger;
3249 break;
3250 }
3251 case NOTIFICATION_COMMAND_TYPE_CLIENT_COMMUNICATION_UPDATE:
3252 {
3253 const enum client_transmission_status client_status =
3254 cmd->parameters.client_communication_update
3255 .status;
3256 const notification_client_id client_id =
3257 cmd->parameters.client_communication_update.id;
3258 struct notification_client *client;
3259
3260 rcu_read_lock();
3261 client = get_client_from_id(client_id, state);
3262
3263 if (!client) {
3264 /*
3265 * Client error was probably already picked-up by the
3266 * notification thread or it has disconnected
3267 * gracefully while this command was queued.
3268 */
3269 DBG("Failed to find notification client to update communication status, client id = %" PRIu64,
3270 client_id);
3271 ret = 0;
3272 } else {
3273 ret = client_handle_transmission_status(
3274 client, client_status, state);
3275 }
3276 rcu_read_unlock();
3277 break;
3278 }
3279 default:
3280 ERR("Unknown internal command received");
3281 goto error_unlock;
3282 }
3283
3284 if (ret) {
3285 goto error_unlock;
3286 }
3287 end:
3288 if (cmd->is_async) {
3289 free(cmd);
3290 cmd = NULL;
3291 } else {
3292 lttng_waiter_wake_up(&cmd->reply_waiter);
3293 }
3294 return ret;
3295 error_unlock:
3296 /* Wake-up and return a fatal error to the calling thread. */
3297 lttng_waiter_wake_up(&cmd->reply_waiter);
3298 cmd->reply_code = LTTNG_ERR_FATAL;
3299 error:
3300 /* Indicate a fatal error to the caller. */
3301 return -1;
3302 }
3303
3304 static
3305 int socket_set_non_blocking(int socket)
3306 {
3307 int ret, flags;
3308
3309 /* Set the pipe as non-blocking. */
3310 ret = fcntl(socket, F_GETFL, 0);
3311 if (ret == -1) {
3312 PERROR("fcntl get socket flags");
3313 goto end;
3314 }
3315 flags = ret;
3316
3317 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
3318 if (ret == -1) {
3319 PERROR("fcntl set O_NONBLOCK socket flag");
3320 goto end;
3321 }
3322 DBG("Client socket (fd = %i) set as non-blocking", socket);
3323 end:
3324 return ret;
3325 }
3326
3327 static
3328 int client_reset_inbound_state(struct notification_client *client)
3329 {
3330 int ret;
3331
3332
3333 lttng_payload_clear(&client->communication.inbound.payload);
3334
3335 client->communication.inbound.bytes_to_receive =
3336 sizeof(struct lttng_notification_channel_message);
3337 client->communication.inbound.msg_type =
3338 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
3339 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
3340 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
3341 ret = lttng_dynamic_buffer_set_size(
3342 &client->communication.inbound.payload.buffer,
3343 client->communication.inbound.bytes_to_receive);
3344
3345 return ret;
3346 }
3347
3348 int handle_notification_thread_client_connect(
3349 struct notification_thread_state *state)
3350 {
3351 int ret;
3352 struct notification_client *client;
3353
3354 DBG("Handling new notification channel client connection");
3355
3356 client = zmalloc<notification_client>();
3357 if (!client) {
3358 /* Fatal error. */
3359 ret = -1;
3360 goto error;
3361 }
3362
3363 pthread_mutex_init(&client->lock, NULL);
3364 client->id = state->next_notification_client_id++;
3365 CDS_INIT_LIST_HEAD(&client->condition_list);
3366 lttng_payload_init(&client->communication.inbound.payload);
3367 lttng_payload_init(&client->communication.outbound.payload);
3368 client->communication.inbound.expect_creds = true;
3369
3370 ret = client_reset_inbound_state(client);
3371 if (ret) {
3372 ERR("Failed to reset client communication's inbound state");
3373 ret = 0;
3374 goto error;
3375 }
3376
3377 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
3378 if (ret < 0) {
3379 ERR("Failed to accept new notification channel client connection");
3380 ret = 0;
3381 goto error;
3382 }
3383
3384 client->socket = ret;
3385
3386 ret = socket_set_non_blocking(client->socket);
3387 if (ret) {
3388 ERR("Failed to set new notification channel client connection socket as non-blocking");
3389 goto error;
3390 }
3391
3392 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
3393 if (ret < 0) {
3394 ERR("Failed to set socket options on new notification channel client socket");
3395 ret = 0;
3396 goto error;
3397 }
3398
3399 client->communication.current_poll_events = CLIENT_POLL_EVENTS_IN;
3400 ret = lttng_poll_add(&state->events, client->socket,
3401 client->communication.current_poll_events);
3402 if (ret < 0) {
3403 ERR("Failed to add notification channel client socket to poll set");
3404 ret = 0;
3405 goto error;
3406 }
3407 DBG("Added new notification channel client socket (%i) to poll set",
3408 client->socket);
3409
3410 rcu_read_lock();
3411 cds_lfht_add(state->client_socket_ht,
3412 hash_client_socket(client->socket),
3413 &client->client_socket_ht_node);
3414 cds_lfht_add(state->client_id_ht,
3415 hash_client_id(client->id),
3416 &client->client_id_ht_node);
3417 rcu_read_unlock();
3418
3419 return ret;
3420
3421 error:
3422 notification_client_destroy(client);
3423 return ret;
3424 }
3425
3426 /*
3427 * RCU read-lock must be held by the caller.
3428 * Client lock must _not_ be held by the caller.
3429 */
3430 static
3431 int notification_thread_client_disconnect(
3432 struct notification_client *client,
3433 struct notification_thread_state *state)
3434 {
3435 int ret;
3436 struct lttng_condition_list_element *condition_list_element, *tmp;
3437
3438 ASSERT_RCU_READ_LOCKED();
3439
3440 /* Acquire the client lock to disable its communication atomically. */
3441 pthread_mutex_lock(&client->lock);
3442 client->communication.active = false;
3443 cds_lfht_del(state->client_socket_ht, &client->client_socket_ht_node);
3444 cds_lfht_del(state->client_id_ht, &client->client_id_ht_node);
3445 pthread_mutex_unlock(&client->lock);
3446
3447 ret = lttng_poll_del(&state->events, client->socket);
3448 if (ret) {
3449 ERR("Failed to remove client socket %d from poll set",
3450 client->socket);
3451 }
3452
3453 /* Release all conditions to which the client was subscribed. */
3454 cds_list_for_each_entry_safe(condition_list_element, tmp,
3455 &client->condition_list, node) {
3456 (void) notification_thread_client_unsubscribe(client,
3457 condition_list_element->condition, state, NULL);
3458 }
3459
3460 /*
3461 * Client no longer accessible to other threads (through the
3462 * client lists).
3463 */
3464 notification_client_destroy(client);
3465 return ret;
3466 }
3467
3468 int handle_notification_thread_client_disconnect(
3469 int client_socket, struct notification_thread_state *state)
3470 {
3471 int ret = 0;
3472 struct notification_client *client;
3473
3474 rcu_read_lock();
3475 DBG("Closing client connection (socket fd = %i)",
3476 client_socket);
3477 client = get_client_from_socket(client_socket, state);
3478 if (!client) {
3479 /* Internal state corruption, fatal error. */
3480 ERR("Unable to find client (socket fd = %i)",
3481 client_socket);
3482 ret = -1;
3483 goto end;
3484 }
3485
3486 ret = notification_thread_client_disconnect(client, state);
3487 end:
3488 rcu_read_unlock();
3489 return ret;
3490 }
3491
3492 int handle_notification_thread_client_disconnect_all(
3493 struct notification_thread_state *state)
3494 {
3495 struct cds_lfht_iter iter;
3496 struct notification_client *client;
3497 bool error_encoutered = false;
3498
3499 rcu_read_lock();
3500 DBG("Closing all client connections");
3501 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
3502 client_socket_ht_node) {
3503 int ret;
3504
3505 ret = notification_thread_client_disconnect(
3506 client, state);
3507 if (ret) {
3508 error_encoutered = true;
3509 }
3510 }
3511 rcu_read_unlock();
3512 return error_encoutered ? 1 : 0;
3513 }
3514
3515 int handle_notification_thread_trigger_unregister_all(
3516 struct notification_thread_state *state)
3517 {
3518 bool error_occurred = false;
3519 struct cds_lfht_iter iter;
3520 struct lttng_trigger_ht_element *trigger_ht_element;
3521
3522 rcu_read_lock();
3523 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
3524 node) {
3525 int ret = handle_notification_thread_command_unregister_trigger(
3526 state, trigger_ht_element->trigger, NULL);
3527 if (ret) {
3528 error_occurred = true;
3529 }
3530 }
3531 rcu_read_unlock();
3532 return error_occurred ? -1 : 0;
3533 }
3534
3535 static
3536 bool client_has_outbound_data_left(
3537 const struct notification_client *client)
3538 {
3539 const struct lttng_payload_view pv = lttng_payload_view_from_payload(
3540 &client->communication.outbound.payload, 0, -1);
3541 const bool has_data = pv.buffer.size != 0;
3542 const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv);
3543
3544 return has_data || has_fds;
3545 }
3546
3547 static
3548 int client_handle_transmission_status(
3549 struct notification_client *client,
3550 enum client_transmission_status transmission_status,
3551 struct notification_thread_state *state)
3552 {
3553 int ret = 0;
3554
3555 switch (transmission_status) {
3556 case CLIENT_TRANSMISSION_STATUS_COMPLETE:
3557 case CLIENT_TRANSMISSION_STATUS_QUEUED:
3558 {
3559 int current_poll_events;
3560 int new_poll_events;
3561 /*
3562 * We want to be notified whenever there is buffer space
3563 * available to send the rest of the payload if we are
3564 * waiting to send data to the client.
3565 *
3566 * The state of the outbound queue being sampled here is
3567 * fine since:
3568 * - it is okay to wake-up "for nothing" in case we see
3569 * that data is left, but another thread succeeds in
3570 * flushing it before us when handling the client "out"
3571 * event. We will simply stop monitoring that event the next
3572 * time it wakes us up and we see no data left to be sent,
3573 * - if another thread fails to flush the entire client
3574 * outgoing queue, it will issue a "communication update"
3575 * command and cause the client's (e)poll mask to be
3576 * re-evaluated.
3577 *
3578 * The situation we seek to avoid would be to disable the
3579 * monitoring of "out" client events indefinitely when there is
3580 * data to be sent, which can't happen because of the
3581 * aforementioned "communication update" mechanism.
3582 */
3583 pthread_mutex_lock(&client->lock);
3584 current_poll_events = client->communication.current_poll_events;
3585 new_poll_events = client_has_outbound_data_left(client) ?
3586 CLIENT_POLL_EVENTS_IN_OUT :
3587 CLIENT_POLL_EVENTS_IN;
3588 client->communication.current_poll_events = new_poll_events;
3589 pthread_mutex_unlock(&client->lock);
3590
3591 /* Update the monitored event set only if it changed. */
3592 if (current_poll_events != new_poll_events) {
3593 ret = lttng_poll_mod(&state->events, client->socket,
3594 new_poll_events);
3595 if (ret) {
3596 goto end;
3597 }
3598 }
3599
3600 break;
3601 }
3602 case CLIENT_TRANSMISSION_STATUS_FAIL:
3603 ret = notification_thread_client_disconnect(client, state);
3604 if (ret) {
3605 goto end;
3606 }
3607 break;
3608 case CLIENT_TRANSMISSION_STATUS_ERROR:
3609 ret = -1;
3610 goto end;
3611 default:
3612 abort();
3613 }
3614 end:
3615 return ret;
3616 }
3617
3618 /* Client lock must be acquired by caller. */
3619 static
3620 enum client_transmission_status client_flush_outgoing_queue(
3621 struct notification_client *client)
3622 {
3623 ssize_t ret;
3624 size_t to_send_count;
3625 enum client_transmission_status status;
3626 struct lttng_payload_view pv = lttng_payload_view_from_payload(
3627 &client->communication.outbound.payload, 0, -1);
3628 const int fds_to_send_count =
3629 lttng_payload_view_get_fd_handle_count(&pv);
3630
3631 ASSERT_LOCKED(client->lock);
3632
3633 if (!client->communication.active) {
3634 status = CLIENT_TRANSMISSION_STATUS_FAIL;
3635 goto end;
3636 }
3637
3638 if (pv.buffer.size == 0) {
3639 /*
3640 * If both data and fds are equal to zero, we are in an invalid
3641 * state.
3642 */
3643 LTTNG_ASSERT(fds_to_send_count != 0);
3644 goto send_fds;
3645 }
3646
3647 /* Send data. */
3648 to_send_count = pv.buffer.size;
3649 DBG("Flushing client (socket fd = %i) outgoing queue",
3650 client->socket);
3651
3652 ret = lttcomm_send_unix_sock_non_block(client->socket,
3653 pv.buffer.data,
3654 to_send_count);
3655 if ((ret >= 0 && ret < to_send_count)) {
3656 DBG("Client (socket fd = %i) outgoing queue could not be completely flushed",
3657 client->socket);
3658 to_send_count -= std::max(ret, (ssize_t) 0);
3659
3660 memmove(client->communication.outbound.payload.buffer.data,
3661 pv.buffer.data +
3662 pv.buffer.size - to_send_count,
3663 to_send_count);
3664 ret = lttng_dynamic_buffer_set_size(
3665 &client->communication.outbound.payload.buffer,
3666 to_send_count);
3667 if (ret) {
3668 goto error;
3669 }
3670
3671 status = CLIENT_TRANSMISSION_STATUS_QUEUED;
3672 goto end;
3673 } else if (ret < 0) {
3674 /* Generic error, disable the client's communication. */
3675 ERR("Failed to flush outgoing queue, disconnecting client (socket fd = %i)",
3676 client->socket);
3677 client->communication.active = false;
3678 status = CLIENT_TRANSMISSION_STATUS_FAIL;
3679 goto end;
3680 } else {
3681 /*
3682 * No error and flushed the queue completely.
3683 *
3684 * The payload buffer size is used later to
3685 * check if there is notifications queued. So albeit that the
3686 * direct caller knows that the transmission is complete, we
3687 * need to set the buffer size to zero.
3688 */
3689 ret = lttng_dynamic_buffer_set_size(
3690 &client->communication.outbound.payload.buffer, 0);
3691 if (ret) {
3692 goto error;
3693 }
3694 }
3695
3696 send_fds:
3697 /* No fds to send, transmission is complete. */
3698 if (fds_to_send_count == 0) {
3699 status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
3700 goto end;
3701 }
3702
3703 ret = lttcomm_send_payload_view_fds_unix_sock_non_block(
3704 client->socket, &pv);
3705 if (ret < 0) {
3706 /* Generic error, disable the client's communication. */
3707 ERR("Failed to flush outgoing fds queue, disconnecting client (socket fd = %i)",
3708 client->socket);
3709 client->communication.active = false;
3710 status = CLIENT_TRANSMISSION_STATUS_FAIL;
3711 goto end;
3712 } else if (ret == 0) {
3713 /* Nothing could be sent. */
3714 status = CLIENT_TRANSMISSION_STATUS_QUEUED;
3715 } else {
3716 /* Fd passing is an all or nothing kind of thing. */
3717 status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
3718 /*
3719 * The payload _fd_array count is used later to
3720 * check if there is notifications queued. So although the
3721 * direct caller knows that the transmission is complete, we
3722 * need to clear the _fd_array for the queuing check.
3723 */
3724 lttng_dynamic_pointer_array_clear(
3725 &client->communication.outbound.payload
3726 ._fd_handles);
3727 }
3728
3729 end:
3730 if (status == CLIENT_TRANSMISSION_STATUS_COMPLETE) {
3731 client->communication.outbound.queued_command_reply = false;
3732 client->communication.outbound.dropped_notification = false;
3733 lttng_payload_clear(&client->communication.outbound.payload);
3734 }
3735
3736 return status;
3737 error:
3738 return CLIENT_TRANSMISSION_STATUS_ERROR;
3739 }
3740
3741 /* Client lock must _not_ be held by the caller. */
3742 static
3743 int client_send_command_reply(struct notification_client *client,
3744 struct notification_thread_state *state,
3745 enum lttng_notification_channel_status status)
3746 {
3747 int ret;
3748 struct lttng_notification_channel_command_reply reply = {
3749 .status = (int8_t) status,
3750 };
3751 struct lttng_notification_channel_message msg = {
3752 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
3753 .size = sizeof(reply),
3754 .fds = 0,
3755 };
3756 char buffer[sizeof(msg) + sizeof(reply)];
3757 enum client_transmission_status transmission_status;
3758
3759 memcpy(buffer, &msg, sizeof(msg));
3760 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
3761 DBG("Send command reply (%i)", (int) status);
3762
3763 pthread_mutex_lock(&client->lock);
3764 if (client->communication.outbound.queued_command_reply) {
3765 /* Protocol error. */
3766 goto error_unlock;
3767 }
3768
3769 /* Enqueue buffer to outgoing queue and flush it. */
3770 ret = lttng_dynamic_buffer_append(
3771 &client->communication.outbound.payload.buffer,
3772 buffer, sizeof(buffer));
3773 if (ret) {
3774 goto error_unlock;
3775 }
3776
3777 transmission_status = client_flush_outgoing_queue(client);
3778
3779 if (client_has_outbound_data_left(client)) {
3780 /* Queue could not be emptied. */
3781 client->communication.outbound.queued_command_reply = true;
3782 }
3783
3784 pthread_mutex_unlock(&client->lock);
3785 ret = client_handle_transmission_status(
3786 client, transmission_status, state);
3787 if (ret) {
3788 goto error;
3789 }
3790
3791 return 0;
3792 error_unlock:
3793 pthread_mutex_unlock(&client->lock);
3794 error:
3795 return -1;
3796 }
3797
3798 static
3799 int client_handle_message_unknown(struct notification_client *client,
3800 struct notification_thread_state *state __attribute__((unused)))
3801 {
3802 int ret;
3803 /*
3804 * Receiving message header. The function will be called again
3805 * once the rest of the message as been received and can be
3806 * interpreted.
3807 */
3808 const struct lttng_notification_channel_message *msg;
3809
3810 LTTNG_ASSERT(sizeof(*msg) == client->communication.inbound.payload.buffer.size);
3811 msg = (const struct lttng_notification_channel_message *)
3812 client->communication.inbound.payload.buffer.data;
3813
3814 if (msg->size == 0 ||
3815 msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
3816 ERR("Invalid notification channel message: length = %u",
3817 msg->size);
3818 ret = -1;
3819 goto end;
3820 }
3821
3822 switch (msg->type) {
3823 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
3824 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
3825 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
3826 break;
3827 default:
3828 ret = -1;
3829 ERR("Invalid notification channel message: unexpected message type");
3830 goto end;
3831 }
3832
3833 client->communication.inbound.bytes_to_receive = msg->size;
3834 client->communication.inbound.fds_to_receive = msg->fds;
3835 client->communication.inbound.msg_type =
3836 (enum lttng_notification_channel_message_type) msg->type;
3837 ret = lttng_dynamic_buffer_set_size(
3838 &client->communication.inbound.payload.buffer, msg->size);
3839
3840 /* msg is not valid anymore due to lttng_dynamic_buffer_set_size. */
3841 msg = NULL;
3842 end:
3843 return ret;
3844 }
3845
3846 static
3847 int client_handle_message_handshake(struct notification_client *client,
3848 struct notification_thread_state *state)
3849 {
3850 int ret;
3851 struct lttng_notification_channel_command_handshake *handshake_client;
3852 const struct lttng_notification_channel_command_handshake handshake_reply = {
3853 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
3854 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
3855 };
3856 const struct lttng_notification_channel_message msg_header = {
3857 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
3858 .size = sizeof(handshake_reply),
3859 .fds = 0,
3860 };
3861 enum lttng_notification_channel_status status =
3862 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
3863 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
3864
3865 memcpy(send_buffer, &msg_header, sizeof(msg_header));
3866 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
3867 sizeof(handshake_reply));
3868
3869 handshake_client =
3870 (struct lttng_notification_channel_command_handshake *)
3871 client->communication.inbound.payload.buffer
3872 .data;
3873 client->major = handshake_client->major;
3874 client->minor = handshake_client->minor;
3875 if (!client->communication.inbound.creds_received) {
3876 ERR("No credentials received from client");
3877 ret = -1;
3878 goto end;
3879 }
3880
3881 client->uid = LTTNG_SOCK_GET_UID_CRED(
3882 &client->communication.inbound.creds);
3883 client->gid = LTTNG_SOCK_GET_GID_CRED(
3884 &client->communication.inbound.creds);
3885 client->is_sessiond = LTTNG_SOCK_GET_PID_CRED(&client->communication.inbound.creds) == getpid();
3886 DBG("Received handshake from client: uid = %u, gid = %u, protocol version = %i.%i, client is sessiond = %s",
3887 client->uid, client->gid, (int) client->major,
3888 (int) client->minor,
3889 client->is_sessiond ? "true" : "false");
3890
3891 if (handshake_client->major !=
3892 LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
3893 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
3894 }
3895
3896 pthread_mutex_lock(&client->lock);
3897 /* Outgoing queue will be flushed when the command reply is sent. */
3898 ret = lttng_dynamic_buffer_append(
3899 &client->communication.outbound.payload.buffer, send_buffer,
3900 sizeof(send_buffer));
3901 if (ret) {
3902 ERR("Failed to send protocol version to notification channel client");
3903 goto end_unlock;
3904 }
3905
3906 client->validated = true;
3907 client->communication.active = true;
3908 pthread_mutex_unlock(&client->lock);
3909
3910 /* Set reception state to receive the next message header. */
3911 ret = client_reset_inbound_state(client);
3912 if (ret) {
3913 ERR("Failed to reset client communication's inbound state");
3914 goto end;
3915 }
3916
3917 /* Flushes the outgoing queue. */
3918 ret = client_send_command_reply(client, state, status);
3919 if (ret) {
3920 ERR("Failed to send reply to notification channel client");
3921 goto end;
3922 }
3923
3924 goto end;
3925 end_unlock:
3926 pthread_mutex_unlock(&client->lock);
3927 end:
3928 return ret;
3929 }
3930
3931 static
3932 int client_handle_message_subscription(
3933 struct notification_client *client,
3934 enum lttng_notification_channel_message_type msg_type,
3935 struct notification_thread_state *state)
3936 {
3937 int ret;
3938 struct lttng_condition *condition;
3939 enum lttng_notification_channel_status status =
3940 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
3941 struct lttng_payload_view condition_view =
3942 lttng_payload_view_from_payload(
3943 &client->communication.inbound.payload,
3944 0, -1);
3945 size_t expected_condition_size;
3946
3947 /*
3948 * No need to lock client to sample the inbound state as the only
3949 * other thread accessing clients (action executor) only uses the
3950 * outbound state.
3951 */
3952 expected_condition_size = client->communication.inbound.payload.buffer.size;
3953 ret = lttng_condition_create_from_payload(&condition_view, &condition);
3954 if (ret != expected_condition_size) {
3955 ERR("Malformed condition received from client");
3956 goto end;
3957 }
3958
3959 /* Ownership of condition is always transferred. */
3960 if (msg_type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
3961 ret = notification_thread_client_subscribe(
3962 client, condition, state, &status);
3963 } else {
3964 ret = notification_thread_client_unsubscribe(
3965 client, condition, state, &status);
3966 }
3967
3968 if (ret) {
3969 goto end;
3970 }
3971
3972 /* Set reception state to receive the next message header. */
3973 ret = client_reset_inbound_state(client);
3974 if (ret) {
3975 ERR("Failed to reset client communication's inbound state");
3976 goto end;
3977 }
3978
3979 ret = client_send_command_reply(client, state, status);
3980 if (ret) {
3981 ERR("Failed to send reply to notification channel client");
3982 goto end;
3983 }
3984
3985 end:
3986 return ret;
3987 }
3988
3989 static
3990 int client_dispatch_message(struct notification_client *client,
3991 struct notification_thread_state *state)
3992 {
3993 int ret = 0;
3994
3995 if (client->communication.inbound.msg_type !=
3996 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
3997 client->communication.inbound.msg_type !=
3998 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
3999 !client->validated) {
4000 WARN("client attempted a command before handshake");
4001 ret = -1;
4002 goto end;
4003 }
4004
4005 switch (client->communication.inbound.msg_type) {
4006 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
4007 {
4008 ret = client_handle_message_unknown(client, state);
4009 break;
4010 }
4011 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
4012 {
4013 ret = client_handle_message_handshake(client, state);
4014 break;
4015 }
4016 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
4017 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
4018 {
4019 ret = client_handle_message_subscription(client,
4020 client->communication.inbound.msg_type, state);
4021 break;
4022 }
4023 default:
4024 abort();
4025 }
4026 end:
4027 return ret;
4028 }
4029
4030 /* Incoming data from client. */
4031 int handle_notification_thread_client_in(
4032 struct notification_thread_state *state, int socket)
4033 {
4034 int ret = 0;
4035 struct notification_client *client;
4036 ssize_t recv_ret;
4037 size_t offset;
4038
4039 rcu_read_lock();
4040 client = get_client_from_socket(socket, state);
4041 if (!client) {
4042 /* Internal error, abort. */
4043 ret = -1;
4044 goto end;
4045 }
4046
4047 if (client->communication.inbound.bytes_to_receive == 0 &&
4048 client->communication.inbound.fds_to_receive != 0) {
4049 /* Only FDs left to receive. */
4050 goto receive_fds;
4051 }
4052
4053 offset = client->communication.inbound.payload.buffer.size -
4054 client->communication.inbound.bytes_to_receive;
4055 if (client->communication.inbound.expect_creds) {
4056 recv_ret = lttcomm_recv_creds_unix_sock(socket,
4057 client->communication.inbound.payload.buffer.data + offset,
4058 client->communication.inbound.bytes_to_receive,
4059 &client->communication.inbound.creds);
4060 if (recv_ret > 0) {
4061 client->communication.inbound.expect_creds = false;
4062 client->communication.inbound.creds_received = true;
4063 }
4064 } else {
4065 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
4066 client->communication.inbound.payload.buffer.data + offset,
4067 client->communication.inbound.bytes_to_receive);
4068 }
4069 if (recv_ret >= 0) {
4070 client->communication.inbound.bytes_to_receive -= recv_ret;
4071 } else {
4072 goto error_disconnect_client;
4073 }
4074
4075 if (client->communication.inbound.bytes_to_receive != 0) {
4076 /* Message incomplete wait for more data. */
4077 ret = 0;
4078 goto end;
4079 }
4080
4081 receive_fds:
4082 LTTNG_ASSERT(client->communication.inbound.bytes_to_receive == 0);
4083
4084 /* Receive fds. */
4085 if (client->communication.inbound.fds_to_receive != 0) {
4086 ret = lttcomm_recv_payload_fds_unix_sock_non_block(
4087 client->socket,
4088 client->communication.inbound.fds_to_receive,
4089 &client->communication.inbound.payload);
4090 if (ret > 0) {
4091 /*
4092 * Fds received. non blocking fds passing is all
4093 * or nothing.
4094 */
4095 ssize_t expected_size;
4096
4097 expected_size = sizeof(int) *
4098 client->communication.inbound
4099 .fds_to_receive;
4100 LTTNG_ASSERT(ret == expected_size);
4101 client->communication.inbound.fds_to_receive = 0;
4102 } else if (ret == 0) {
4103 /* Received nothing. */
4104 ret = 0;
4105 goto end;
4106 } else {
4107 goto error_disconnect_client;
4108 }
4109 }
4110
4111 /* At this point the message is complete.*/
4112 LTTNG_ASSERT(client->communication.inbound.bytes_to_receive == 0 &&
4113 client->communication.inbound.fds_to_receive == 0);
4114 ret = client_dispatch_message(client, state);
4115 if (ret) {
4116 /*
4117 * Only returns an error if this client must be
4118 * disconnected.
4119 */
4120 goto error_disconnect_client;
4121 }
4122
4123 end:
4124 rcu_read_unlock();
4125 return ret;
4126
4127 error_disconnect_client:
4128 ret = notification_thread_client_disconnect(client, state);
4129 goto end;
4130 }
4131
4132 /* Client ready to receive outgoing data. */
4133 int handle_notification_thread_client_out(
4134 struct notification_thread_state *state, int socket)
4135 {
4136 int ret;
4137 struct notification_client *client;
4138 enum client_transmission_status transmission_status;
4139
4140 rcu_read_lock();
4141 client = get_client_from_socket(socket, state);
4142 if (!client) {
4143 /* Internal error, abort. */
4144 ret = -1;
4145 goto end;
4146 }
4147
4148 pthread_mutex_lock(&client->lock);
4149 if (!client_has_outbound_data_left(client)) {
4150 /*
4151 * A client "out" event can be received when no payload is left
4152 * to send under some circumstances.
4153 *
4154 * Many threads can flush a client's outgoing queue and, if they
4155 * had to queue their message (socket was full), will use the
4156 * "communication update" command to signal the (e)poll thread
4157 * to monitor for space being made available in the socket.
4158 *
4159 * Commands are sent over an internal pipe serviced by the same
4160 * thread as the client sockets.
4161 *
4162 * When space is made available in the socket, there is a race
4163 * between the (e)poll thread and the other threads that may
4164 * wish to use the client's socket to flush its outgoing queue.
4165 *
4166 * A non-(e)poll thread may attempt (and succeed) in flushing
4167 * the queue before the (e)poll thread gets a chance to service
4168 * the client's "out" event.
4169 *
4170 * In this situation, the (e)poll thread processing the client
4171 * out event will see an empty payload: there is nothing to do
4172 * except unsubscribing (e)poll "out" events.
4173 *
4174 * Note that this thread is the (e)poll thread so it can modify
4175 * the (e)poll mask directly without using a communication
4176 * update command. Other threads that flush the outgoing queue
4177 * will use the "communication update" command to wake up this
4178 * thread and force it to monitor "out" events.
4179 *
4180 * When other threads succeed in emptying the outgoing queue,
4181 * they don't need to update the (e)poll mask: if the "out"
4182 * event is monitored, it will fire once and the (e)poll
4183 * thread will reach this condition, causing the event to
4184 * stop being monitored.
4185 */
4186 transmission_status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
4187 } else {
4188 transmission_status = client_flush_outgoing_queue(client);
4189 }
4190 pthread_mutex_unlock(&client->lock);
4191
4192 ret = client_handle_transmission_status(
4193 client, transmission_status, state);
4194 if (ret) {
4195 goto end;
4196 }
4197 end:
4198 rcu_read_unlock();
4199 return ret;
4200 }
4201
4202 static
4203 bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
4204 const struct channel_state_sample *sample,
4205 uint64_t buffer_capacity)
4206 {
4207 bool result = false;
4208 uint64_t threshold;
4209 enum lttng_condition_type condition_type;
4210 const struct lttng_condition_buffer_usage *use_condition = container_of(
4211 condition, struct lttng_condition_buffer_usage,
4212 parent);
4213
4214 if (use_condition->threshold_bytes.set) {
4215 threshold = use_condition->threshold_bytes.value;
4216 } else {
4217 /*
4218 * Threshold was expressed as a ratio.
4219 *
4220 * TODO the threshold (in bytes) of conditions expressed
4221 * as a ratio of total buffer size could be cached to
4222 * forego this double-multiplication or it could be performed
4223 * as fixed-point math.
4224 *
4225 * Note that caching should accommodates the case where the
4226 * condition applies to multiple channels (i.e. don't assume
4227 * that all channels matching my_chann* have the same size...)
4228 */
4229 threshold = (uint64_t) (use_condition->threshold_ratio.value *
4230 (double) buffer_capacity);
4231 }
4232
4233 condition_type = lttng_condition_get_type(condition);
4234 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
4235 DBG("Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
4236 threshold, sample->highest_usage);
4237
4238 /*
4239 * The low condition should only be triggered once _all_ of the
4240 * streams in a channel have gone below the "low" threshold.
4241 */
4242 if (sample->highest_usage <= threshold) {
4243 result = true;
4244 }
4245 } else {
4246 DBG("High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
4247 threshold, sample->highest_usage);
4248
4249 /*
4250 * For high buffer usage scenarios, we want to trigger whenever
4251 * _any_ of the streams has reached the "high" threshold.
4252 */
4253 if (sample->highest_usage >= threshold) {
4254 result = true;
4255 }
4256 }
4257
4258 return result;
4259 }
4260
4261 static
4262 bool evaluate_session_consumed_size_condition(
4263 const struct lttng_condition *condition,
4264 uint64_t session_consumed_size)
4265 {
4266 uint64_t threshold;
4267 const struct lttng_condition_session_consumed_size *size_condition =
4268 container_of(condition,
4269 struct lttng_condition_session_consumed_size,
4270 parent);
4271
4272 threshold = size_condition->consumed_threshold_bytes.value;
4273 DBG("Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
4274 threshold, session_consumed_size);
4275 return session_consumed_size >= threshold;
4276 }
4277
4278 static
4279 int evaluate_buffer_condition(const struct lttng_condition *condition,
4280 struct lttng_evaluation **evaluation,
4281 const struct notification_thread_state *state __attribute__((unused)),
4282 const struct channel_state_sample *previous_sample,
4283 const struct channel_state_sample *latest_sample,
4284 uint64_t previous_session_consumed_total,
4285 uint64_t latest_session_consumed_total,
4286 struct channel_info *channel_info)
4287 {
4288 int ret = 0;
4289 enum lttng_condition_type condition_type;
4290 const bool previous_sample_available = !!previous_sample;
4291 bool previous_sample_result = false;
4292 bool latest_sample_result;
4293
4294 condition_type = lttng_condition_get_type(condition);
4295
4296 switch (condition_type) {
4297 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
4298 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
4299 if (caa_likely(previous_sample_available)) {
4300 previous_sample_result =
4301 evaluate_buffer_usage_condition(condition,
4302 previous_sample, channel_info->capacity);
4303 }
4304 latest_sample_result = evaluate_buffer_usage_condition(
4305 condition, latest_sample,
4306 channel_info->capacity);
4307 break;
4308 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
4309 if (caa_likely(previous_sample_available)) {
4310 previous_sample_result =
4311 evaluate_session_consumed_size_condition(
4312 condition,
4313 previous_session_consumed_total);
4314 }
4315 latest_sample_result =
4316 evaluate_session_consumed_size_condition(
4317 condition,
4318 latest_session_consumed_total);
4319 break;
4320 default:
4321 /* Unknown condition type; internal error. */
4322 abort();
4323 }
4324
4325 if (!latest_sample_result ||
4326 (previous_sample_result == latest_sample_result)) {
4327 /*
4328 * Only trigger on a condition evaluation transition.
4329 *
4330 * NOTE: This edge-triggered logic may not be appropriate for
4331 * future condition types.
4332 */
4333 goto end;
4334 }
4335
4336 if (!evaluation || !latest_sample_result) {
4337 goto end;
4338 }
4339
4340 switch (condition_type) {
4341 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
4342 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
4343 *evaluation = lttng_evaluation_buffer_usage_create(
4344 condition_type,
4345 latest_sample->highest_usage,
4346 channel_info->capacity);
4347 break;
4348 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
4349 *evaluation = lttng_evaluation_session_consumed_size_create(
4350 latest_session_consumed_total);
4351 break;
4352 default:
4353 abort();
4354 }
4355
4356 if (!*evaluation) {
4357 ret = -1;
4358 goto end;
4359 }
4360 end:
4361 return ret;
4362 }
4363
4364 static
4365 int client_notification_overflow(struct notification_client *client)
4366 {
4367 int ret = 0;
4368 const struct lttng_notification_channel_message msg = {
4369 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
4370 .size = 0,
4371 .fds = 0,
4372 };
4373
4374 ASSERT_LOCKED(client->lock);
4375
4376 DBG("Dropping notification addressed to client (socket fd = %i)",
4377 client->socket);
4378 if (client->communication.outbound.dropped_notification) {
4379 /*
4380 * The client already has a "notification dropped" message
4381 * in its outgoing queue. Nothing to do since all
4382 * of those messages are coalesced.
4383 */
4384 goto end;
4385 }
4386
4387 client->communication.outbound.dropped_notification = true;
4388 ret = lttng_dynamic_buffer_append(
4389 &client->communication.outbound.payload.buffer, &msg,
4390 sizeof(msg));
4391 if (ret) {
4392 PERROR("Failed to enqueue \"dropped notification\" message in client's (socket fd = %i) outgoing queue",
4393 client->socket);
4394 }
4395 end:
4396 return ret;
4397 }
4398
4399 static int client_handle_transmission_status_wrapper(
4400 struct notification_client *client,
4401 enum client_transmission_status status,
4402 void *user_data)
4403 {
4404 return client_handle_transmission_status(client, status,
4405 (struct notification_thread_state *) user_data);
4406 }
4407
4408 static
4409 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
4410 const struct lttng_evaluation *evaluation,
4411 struct notification_client_list* client_list,
4412 struct notification_thread_state *state,
4413 uid_t object_uid, gid_t object_gid)
4414 {
4415 const struct lttng_credentials creds = {
4416 .uid = LTTNG_OPTIONAL_INIT_VALUE(object_uid),
4417 .gid = LTTNG_OPTIONAL_INIT_VALUE(object_gid),
4418 };
4419
4420 return notification_client_list_send_evaluation(client_list,
4421 trigger, evaluation,
4422 &creds,
4423 client_handle_transmission_status_wrapper, state);
4424 }
4425
4426 /*
4427 * Permission checks relative to notification channel clients are performed
4428 * here. Notice how object, client, and trigger credentials are involved in
4429 * this check.
4430 *
4431 * The `object` credentials are the credentials associated with the "subject"
4432 * of a condition. For instance, a `rotation completed` condition applies
4433 * to a session. When that condition is met, it will produce an evaluation
4434 * against a session. Hence, in this case, the `object` credentials are the
4435 * credentials of the "subject" session.
4436 *
4437 * The `trigger` credentials are the credentials of the user that registered the
4438 * trigger.
4439 *
4440 * The `client` credentials are the credentials of the user that created a given
4441 * notification channel.
4442 *
4443 * In terms of visibility, it is expected that non-privilieged users can only
4444 * register triggers against "their" objects (their own sessions and
4445 * applications they are allowed to interact with). They can then open a
4446 * notification channel and subscribe to notifications associated with those
4447 * triggers.
4448 *
4449 * As for privilieged users, they can register triggers against the objects of
4450 * other users. They can then subscribe to the notifications associated to their
4451 * triggers. Privilieged users _can't_ subscribe to the notifications of
4452 * triggers owned by other users; they must create their own triggers.
4453 *
4454 * This is more a concern of usability than security. It would be difficult for
4455 * a root user reliably subscribe to a specific set of conditions without
4456 * interference from external users (those could, for instance, unregister
4457 * their triggers).
4458 */
4459 int notification_client_list_send_evaluation(
4460 struct notification_client_list *client_list,
4461 const struct lttng_trigger *trigger,
4462 const struct lttng_evaluation *evaluation,
4463 const struct lttng_credentials *source_object_creds,
4464 report_client_transmission_result_cb client_report,
4465 void *user_data)
4466 {
4467 int ret = 0;
4468 struct lttng_payload msg_payload;
4469 struct notification_client_list_element *client_list_element, *tmp;
4470 const struct lttng_notification notification = {
4471 .trigger = (struct lttng_trigger *) trigger,
4472 .evaluation = (struct lttng_evaluation *) evaluation,
4473 };
4474 struct lttng_notification_channel_message msg_header = {
4475 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
4476 .size = 0,
4477 .fds = 0,
4478 };
4479 const struct lttng_credentials *trigger_creds =
4480 lttng_trigger_get_credentials(trigger);
4481
4482 lttng_payload_init(&msg_payload);
4483
4484 ret = lttng_dynamic_buffer_append(&msg_payload.buffer, &msg_header,
4485 sizeof(msg_header));
4486 if (ret) {
4487 goto end;
4488 }
4489
4490 ret = lttng_notification_serialize(&notification, &msg_payload);
4491 if (ret) {
4492 ERR("Failed to serialize notification");
4493 ret = -1;
4494 goto end;
4495 }
4496
4497 /* Update payload size. */
4498 ((struct lttng_notification_channel_message *) msg_payload.buffer.data)
4499 ->size = (uint32_t)(
4500 msg_payload.buffer.size - sizeof(msg_header));
4501
4502 /* Update the payload number of fds. */
4503 {
4504 const struct lttng_payload_view pv = lttng_payload_view_from_payload(
4505 &msg_payload, 0, -1);
4506
4507 ((struct lttng_notification_channel_message *)
4508 msg_payload.buffer.data)->fds = (uint32_t)
4509 lttng_payload_view_get_fd_handle_count(&pv);
4510 }
4511
4512 pthread_mutex_lock(&client_list->lock);
4513 cds_list_for_each_entry_safe(client_list_element, tmp,
4514 &client_list->clients_list, node) {
4515 enum client_transmission_status transmission_status;
4516 struct notification_client *client =
4517 client_list_element->client;
4518
4519 ret = 0;
4520 pthread_mutex_lock(&client->lock);
4521 if (!client->communication.active) {
4522 /*
4523 * Skip inactive client (protocol error or
4524 * disconnecting).
4525 */
4526 DBG("Skipping client at it is marked as inactive");
4527 goto skip_client;
4528 }
4529
4530 if (lttng_trigger_is_hidden(trigger) && !client->is_sessiond) {
4531 /*
4532 * Notifications resulting from an hidden trigger are
4533 * only sent to the session daemon.
4534 */
4535 DBG("Skipping client as the trigger is hidden and the client is not the session daemon");
4536 goto skip_client;
4537 }
4538
4539 if (source_object_creds) {
4540 if (client->uid != lttng_credentials_get_uid(source_object_creds) &&
4541 client->gid != lttng_credentials_get_gid(source_object_creds) &&
4542 client->uid != 0) {
4543 /*
4544 * Client is not allowed to monitor this
4545 * object.
4546 */
4547 DBG("Skipping client at it does not have the object permission to receive notification for this trigger");
4548 goto skip_client;
4549 }
4550 }
4551
4552 if (client->uid != lttng_credentials_get_uid(trigger_creds)) {
4553 DBG("Skipping client at it does not have the permission to receive notification for this trigger");
4554 goto skip_client;
4555 }
4556
4557 DBG("Sending notification to client (fd = %i, %zu bytes)",
4558 client->socket, msg_payload.buffer.size);
4559
4560 if (client_has_outbound_data_left(client)) {
4561 /*
4562 * Outgoing data is already buffered for this client;
4563 * drop the notification and enqueue a "dropped
4564 * notification" message if this is the first dropped
4565 * notification since the socket spilled-over to the
4566 * queue.
4567 */
4568 ret = client_notification_overflow(client);
4569 if (ret) {
4570 /* Fatal error. */
4571 goto skip_client;
4572 }
4573 }
4574
4575 ret = lttng_payload_copy(&msg_payload, &client->communication.outbound.payload);
4576 if (ret) {
4577 /* Fatal error. */
4578 goto skip_client;
4579 }
4580
4581 transmission_status = client_flush_outgoing_queue(client);
4582 pthread_mutex_unlock(&client->lock);
4583 ret = client_report(client, transmission_status, user_data);
4584 if (ret) {
4585 /* Fatal error. */
4586 goto end_unlock_list;
4587 }
4588
4589 continue;
4590
4591 skip_client:
4592 pthread_mutex_unlock(&client->lock);
4593 if (ret) {
4594 /* Fatal error. */
4595 goto end_unlock_list;
4596 }
4597 }
4598 ret = 0;
4599
4600 end_unlock_list:
4601 pthread_mutex_unlock(&client_list->lock);
4602 end:
4603 lttng_payload_reset(&msg_payload);
4604 return ret;
4605 }
4606
4607 static
4608 struct lttng_event_notifier_notification *recv_one_event_notifier_notification(
4609 int notification_pipe_read_fd, enum lttng_domain_type domain)
4610 {
4611 int ret;
4612 uint64_t token;
4613 struct lttng_event_notifier_notification *notification = NULL;
4614 char *capture_buffer = NULL;
4615 size_t capture_buffer_size;
4616 void *reception_buffer;
4617 size_t reception_size;
4618
4619 struct lttng_ust_abi_event_notifier_notification ust_notification;
4620 struct lttng_kernel_abi_event_notifier_notification kernel_notification;
4621
4622 /* Init lttng_event_notifier_notification */
4623 switch(domain) {
4624 case LTTNG_DOMAIN_UST:
4625 reception_buffer = (void *) &ust_notification;
4626 reception_size = sizeof(ust_notification);
4627 break;
4628 case LTTNG_DOMAIN_KERNEL:
4629 reception_buffer = (void *) &kernel_notification;
4630 reception_size = sizeof(kernel_notification);
4631 break;
4632 default:
4633 abort();
4634 }
4635
4636 /*
4637 * The monitoring pipe only holds messages smaller than PIPE_BUF,
4638 * ensuring that read/write of tracer notifications are atomic.
4639 */
4640 ret = lttng_read(notification_pipe_read_fd, reception_buffer,
4641 reception_size);
4642 if (ret != reception_size) {
4643 PERROR("Failed to read from event source notification pipe: fd = %d, size to read = %zu, ret = %d",
4644 notification_pipe_read_fd, reception_size, ret);
4645 ret = -1;
4646 goto end;
4647 }
4648
4649 switch(domain) {
4650 case LTTNG_DOMAIN_UST:
4651 token = ust_notification.token;
4652 capture_buffer_size = ust_notification.capture_buf_size;
4653 break;
4654 case LTTNG_DOMAIN_KERNEL:
4655 token = kernel_notification.token;
4656 capture_buffer_size = kernel_notification.capture_buf_size;
4657 break;
4658 default:
4659 abort();
4660 }
4661
4662 if (capture_buffer_size == 0) {
4663 capture_buffer = NULL;
4664 goto skip_capture;
4665 }
4666
4667 if (capture_buffer_size > MAX_CAPTURE_SIZE) {
4668 ERR("Event notifier has a capture payload size which exceeds the maximum allowed size: capture_payload_size = %zu bytes, max allowed size = %d bytes",
4669 capture_buffer_size, MAX_CAPTURE_SIZE);
4670 goto end;
4671 }
4672
4673 capture_buffer = calloc<char>(capture_buffer_size);
4674 if (!capture_buffer) {
4675 ERR("Failed to allocate capture buffer");
4676 goto end;
4677 }
4678
4679 /* Fetch additional payload (capture). */
4680 ret = lttng_read(notification_pipe_read_fd, capture_buffer, capture_buffer_size);
4681 if (ret != capture_buffer_size) {
4682 ERR("Failed to read from event source pipe (fd = %i)",
4683 notification_pipe_read_fd);
4684 goto end;
4685 }
4686
4687 skip_capture:
4688 notification = lttng_event_notifier_notification_create(token, domain,
4689 capture_buffer, capture_buffer_size);
4690 if (notification == NULL) {
4691 goto end;
4692 }
4693
4694 /*
4695 * Ownership transfered to the lttng_event_notifier_notification object.
4696 */
4697 capture_buffer = NULL;
4698
4699 end:
4700 free(capture_buffer);
4701 return notification;
4702 }
4703
4704 static
4705 int dispatch_one_event_notifier_notification(struct notification_thread_state *state,
4706 struct lttng_event_notifier_notification *notification)
4707 {
4708 struct cds_lfht_node *node;
4709 struct cds_lfht_iter iter;
4710 struct notification_trigger_tokens_ht_element *element;
4711 struct lttng_evaluation *evaluation = NULL;
4712 enum action_executor_status executor_status;
4713 struct notification_client_list *client_list = NULL;
4714 int ret;
4715 unsigned int capture_count = 0;
4716
4717 /* Find triggers associated with this token. */
4718 rcu_read_lock();
4719 cds_lfht_lookup(state->trigger_tokens_ht,
4720 hash_key_u64(&notification->tracer_token, lttng_ht_seed),
4721 match_trigger_token, &notification->tracer_token, &iter);
4722 node = cds_lfht_iter_get_node(&iter);
4723 if (caa_unlikely(!node)) {
4724 /*
4725 * This is not an error, slow consumption of the tracer
4726 * notifications can lead to situations where a trigger is
4727 * removed but we still get tracer notifications matching a
4728 * trigger that no longer exists.
4729 */
4730 ret = 0;
4731 goto end_unlock;
4732 }
4733
4734 element = caa_container_of(node,
4735 struct notification_trigger_tokens_ht_element,
4736 node);
4737
4738 if (lttng_condition_event_rule_matches_get_capture_descriptor_count(
4739 lttng_trigger_get_const_condition(element->trigger),
4740 &capture_count) != LTTNG_CONDITION_STATUS_OK) {
4741 ERR("Failed to get capture count");
4742 ret = -1;
4743 goto end;
4744 }
4745
4746 if (!notification->capture_buffer && capture_count != 0) {
4747 ERR("Expected capture but capture buffer is null");
4748 ret = -1;
4749 goto end;
4750 }
4751
4752 evaluation = lttng_evaluation_event_rule_matches_create(
4753 container_of(lttng_trigger_get_const_condition(
4754 element->trigger),
4755 struct lttng_condition_event_rule_matches,
4756 parent),
4757 notification->capture_buffer,
4758 notification->capture_buf_size, false);
4759
4760 if (evaluation == NULL) {
4761 ERR("Failed to create event rule matches evaluation while creating and enqueuing action executor job");
4762 ret = -1;
4763 goto end_unlock;
4764 }
4765
4766 client_list = get_client_list_from_condition(state,
4767 lttng_trigger_get_const_condition(element->trigger));
4768 executor_status = action_executor_enqueue_trigger(state->executor,
4769 element->trigger, evaluation, NULL, client_list);
4770 switch (executor_status) {
4771 case ACTION_EXECUTOR_STATUS_OK:
4772 ret = 0;
4773 break;
4774 case ACTION_EXECUTOR_STATUS_OVERFLOW:
4775 {
4776 struct notification_client_list_element *client_list_element,
4777 *tmp;
4778
4779 /*
4780 * Not a fatal error; this is expected and simply means the
4781 * executor has too much work queued already.
4782 */
4783 ret = 0;
4784
4785 /* No clients subscribed to notifications for this trigger. */
4786 if (!client_list) {
4787 break;
4788 }
4789
4790 /* Warn clients that a notification (or more) was dropped. */
4791 pthread_mutex_lock(&client_list->lock);
4792 cds_list_for_each_entry_safe(client_list_element, tmp,
4793 &client_list->clients_list, node) {
4794 enum client_transmission_status transmission_status;
4795 struct notification_client *client =
4796 client_list_element->client;
4797
4798 pthread_mutex_lock(&client->lock);
4799 ret = client_notification_overflow(client);
4800 if (ret) {
4801 /* Fatal error. */
4802 goto next_client;
4803 }
4804
4805 transmission_status =
4806 client_flush_outgoing_queue(client);
4807 ret = client_handle_transmission_status(
4808 client, transmission_status, state);
4809 if (ret) {
4810 /* Fatal error. */
4811 goto next_client;
4812 }
4813 next_client:
4814 pthread_mutex_unlock(&client->lock);
4815 if (ret) {
4816 break;
4817 }
4818 }
4819
4820 pthread_mutex_unlock(&client_list->lock);
4821 break;
4822 }
4823 case ACTION_EXECUTOR_STATUS_INVALID:
4824 case ACTION_EXECUTOR_STATUS_ERROR:
4825 /* Fatal error, shut down everything. */
4826 ERR("Fatal error encoutered while enqueuing action to the action executor");
4827 ret = -1;
4828 goto end_unlock;
4829 default:
4830 /* Unhandled error. */
4831 abort();
4832 }
4833
4834 end_unlock:
4835 notification_client_list_put(client_list);
4836 rcu_read_unlock();
4837 end:
4838 return ret;
4839 }
4840
4841 static
4842 int handle_one_event_notifier_notification(
4843 struct notification_thread_state *state,
4844 int pipe, enum lttng_domain_type domain)
4845 {
4846 int ret = 0;
4847 struct lttng_event_notifier_notification *notification = NULL;
4848
4849 notification = recv_one_event_notifier_notification(pipe, domain);
4850 if (notification == NULL) {
4851 /* Reception failed, don't consider it fatal. */
4852 ERR("Error receiving an event notifier notification from tracer: fd = %i, domain = %s",
4853 pipe, lttng_domain_type_str(domain));
4854 goto end;
4855 }
4856
4857 ret = dispatch_one_event_notifier_notification(state, notification);
4858 if (ret) {
4859 ERR("Error dispatching an event notifier notification from tracer: fd = %i, domain = %s",
4860 pipe, lttng_domain_type_str(domain));
4861 goto end;
4862 }
4863
4864 end:
4865 lttng_event_notifier_notification_destroy(notification);
4866 return ret;
4867 }
4868
4869 int handle_notification_thread_event_notification(struct notification_thread_state *state,
4870 int pipe, enum lttng_domain_type domain)
4871 {
4872 return handle_one_event_notifier_notification(state, pipe, domain);
4873 }
4874
4875 int handle_notification_thread_channel_sample(
4876 struct notification_thread_state *state, int pipe,
4877 enum lttng_domain_type domain)
4878 {
4879 int ret = 0;
4880 struct lttcomm_consumer_channel_monitor_msg sample_msg;
4881 struct channel_info *channel_info;
4882 struct cds_lfht_node *node;
4883 struct cds_lfht_iter iter;
4884 struct lttng_channel_trigger_list *trigger_list;
4885 struct lttng_trigger_list_element *trigger_list_element;
4886 bool previous_sample_available = false;
4887 struct channel_state_sample previous_sample, latest_sample;
4888 uint64_t previous_session_consumed_total, latest_session_consumed_total;
4889 struct lttng_credentials channel_creds;
4890
4891 /*
4892 * The monitoring pipe only holds messages smaller than PIPE_BUF,
4893 * ensuring that read/write of sampling messages are atomic.
4894 */
4895 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
4896 if (ret != sizeof(sample_msg)) {
4897 ERR("Failed to read from monitoring pipe (fd = %i)",
4898 pipe);
4899 ret = -1;
4900 goto end;
4901 }
4902
4903 ret = 0;
4904 latest_sample.key.key = sample_msg.key;
4905 latest_sample.key.domain = domain;
4906 latest_sample.highest_usage = sample_msg.highest;
4907 latest_sample.lowest_usage = sample_msg.lowest;
4908 latest_sample.channel_total_consumed = sample_msg.total_consumed;
4909
4910 rcu_read_lock();
4911
4912 /* Retrieve the channel's informations */
4913 cds_lfht_lookup(state->channels_ht,
4914 hash_channel_key(&latest_sample.key),
4915 match_channel_info,
4916 &latest_sample.key,
4917 &iter);
4918 node = cds_lfht_iter_get_node(&iter);
4919 if (caa_unlikely(!node)) {
4920 /*
4921 * Not an error since the consumer can push a sample to the pipe
4922 * and the rest of the session daemon could notify us of the
4923 * channel's destruction before we get a chance to process that
4924 * sample.
4925 */
4926 DBG("Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
4927 latest_sample.key.key,
4928 lttng_domain_type_str(domain));
4929 goto end_unlock;
4930 }
4931 channel_info = caa_container_of(node, struct channel_info,
4932 channels_ht_node);
4933 DBG("Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
4934 channel_info->name,
4935 latest_sample.key.key,
4936 channel_info->session_info->name,
4937 latest_sample.highest_usage,
4938 latest_sample.lowest_usage,
4939 latest_sample.channel_total_consumed);
4940
4941 previous_session_consumed_total =
4942 channel_info->session_info->consumed_data_size;
4943
4944 /* Retrieve the channel's last sample, if it exists, and update it. */
4945 cds_lfht_lookup(state->channel_state_ht,
4946 hash_channel_key(&latest_sample.key),
4947 match_channel_state_sample,
4948 &latest_sample.key,
4949 &iter);
4950 node = cds_lfht_iter_get_node(&iter);
4951 if (caa_likely(node)) {
4952 struct channel_state_sample *stored_sample;
4953
4954 /* Update the sample stored. */
4955 stored_sample = caa_container_of(node,
4956 struct channel_state_sample,
4957 channel_state_ht_node);
4958
4959 memcpy(&previous_sample, stored_sample,
4960 sizeof(previous_sample));
4961 stored_sample->highest_usage = latest_sample.highest_usage;
4962 stored_sample->lowest_usage = latest_sample.lowest_usage;
4963 stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
4964 previous_sample_available = true;
4965
4966 latest_session_consumed_total =
4967 previous_session_consumed_total +
4968 (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
4969 } else {
4970 /*
4971 * This is the channel's first sample, allocate space for and
4972 * store the new sample.
4973 */
4974 struct channel_state_sample *stored_sample;
4975
4976 stored_sample = zmalloc<channel_state_sample>();
4977 if (!stored_sample) {
4978 ret = -1;
4979 goto end_unlock;
4980 }
4981
4982 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
4983 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
4984 cds_lfht_add(state->channel_state_ht,
4985 hash_channel_key(&stored_sample->key),
4986 &stored_sample->channel_state_ht_node);
4987
4988 latest_session_consumed_total =
4989 previous_session_consumed_total +
4990 latest_sample.channel_total_consumed;
4991 }
4992
4993 channel_info->session_info->consumed_data_size =
4994 latest_session_consumed_total;
4995
4996 /* Find triggers associated with this channel. */
4997 cds_lfht_lookup(state->channel_triggers_ht,
4998 hash_channel_key(&latest_sample.key),
4999 match_channel_trigger_list,
5000 &latest_sample.key,
5001 &iter);
5002 node = cds_lfht_iter_get_node(&iter);
5003 if (caa_likely(!node)) {
5004 goto end_unlock;
5005 }
5006
5007 channel_creds = (typeof(channel_creds)) {
5008 .uid = LTTNG_OPTIONAL_INIT_VALUE(channel_info->session_info->uid),
5009 .gid = LTTNG_OPTIONAL_INIT_VALUE(channel_info->session_info->gid),
5010 };
5011
5012 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
5013 channel_triggers_ht_node);
5014 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
5015 node) {
5016 const struct lttng_condition *condition;
5017 struct lttng_trigger *trigger;
5018 struct notification_client_list *client_list = NULL;
5019 struct lttng_evaluation *evaluation = NULL;
5020 enum action_executor_status executor_status;
5021
5022 ret = 0;
5023 trigger = trigger_list_element->trigger;
5024 condition = lttng_trigger_get_const_condition(trigger);
5025 LTTNG_ASSERT(condition);
5026
5027 /*
5028 * Check if any client is subscribed to the result of this
5029 * evaluation.
5030 */
5031 client_list = get_client_list_from_condition(state, condition);
5032
5033 ret = evaluate_buffer_condition(condition, &evaluation, state,
5034 previous_sample_available ? &previous_sample : NULL,
5035 &latest_sample,
5036 previous_session_consumed_total,
5037 latest_session_consumed_total,
5038 channel_info);
5039 if (caa_unlikely(ret)) {
5040 goto put_list;
5041 }
5042
5043 if (caa_likely(!evaluation)) {
5044 goto put_list;
5045 }
5046
5047 /*
5048 * Ownership of `evaluation` transferred to the action executor
5049 * no matter the result.
5050 */
5051 executor_status = action_executor_enqueue_trigger(
5052 state->executor, trigger, evaluation,
5053 &channel_creds, client_list);
5054 evaluation = NULL;
5055 switch (executor_status) {
5056 case ACTION_EXECUTOR_STATUS_OK:
5057 break;
5058 case ACTION_EXECUTOR_STATUS_ERROR:
5059 case ACTION_EXECUTOR_STATUS_INVALID:
5060 /*
5061 * TODO Add trigger identification (name/id) when
5062 * it is added to the API.
5063 */
5064 ERR("Fatal error occurred while enqueuing action associated with buffer-condition trigger");
5065 ret = -1;
5066 goto put_list;
5067 case ACTION_EXECUTOR_STATUS_OVERFLOW:
5068 /*
5069 * TODO Add trigger identification (name/id) when
5070 * it is added to the API.
5071 *
5072 * Not a fatal error.
5073 */
5074 WARN("No space left when enqueuing action associated with buffer-condition trigger");
5075 ret = 0;
5076 goto put_list;
5077 default:
5078 abort();
5079 }
5080
5081 put_list:
5082 notification_client_list_put(client_list);
5083 if (caa_unlikely(ret)) {
5084 break;
5085 }
5086 }
5087 end_unlock:
5088 rcu_read_unlock();
5089 end:
5090 return ret;
5091 }
This page took 0.137535 seconds and 4 git commands to generate.