notification: fetch capture payload on notification reception
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
CommitLineData
ab0ee2ca 1/*
ab5be9fa 2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
ab0ee2ca 3 *
ab5be9fa 4 * SPDX-License-Identifier: GPL-2.0-only
ab0ee2ca 5 *
ab0ee2ca
JG
6 */
7
f2b3ef9f
JG
8#include "lttng/action/action.h"
9#include "lttng/trigger/trigger-internal.h"
ab0ee2ca
JG
10#define _LGPL_SOURCE
11#include <urcu.h>
12#include <urcu/rculfhash.h>
13
ab0ee2ca
JG
14#include <common/defaults.h>
15#include <common/error.h>
16#include <common/futex.h>
17#include <common/unix.h>
18#include <common/dynamic-buffer.h>
19#include <common/hashtable/utils.h>
20#include <common/sessiond-comm/sessiond-comm.h>
21#include <common/macros.h>
22#include <lttng/condition/condition.h>
9b63a4aa 23#include <lttng/action/action-internal.h>
ab0ee2ca
JG
24#include <lttng/notification/notification-internal.h>
25#include <lttng/condition/condition-internal.h>
26#include <lttng/condition/buffer-usage-internal.h>
e8360425 27#include <lttng/condition/session-consumed-size-internal.h>
ea9a44f0 28#include <lttng/condition/session-rotation-internal.h>
959e3c66 29#include <lttng/condition/event-rule-internal.h>
d02d7404 30#include <lttng/domain-internal.h>
ab0ee2ca 31#include <lttng/notification/channel-internal.h>
85c06c44 32#include <lttng/trigger/trigger-internal.h>
959e3c66 33#include <lttng/event-rule/event-rule-internal.h>
1da26331 34
ab0ee2ca
JG
35#include <time.h>
36#include <unistd.h>
37#include <assert.h>
38#include <inttypes.h>
d53ea4e4 39#include <fcntl.h>
ab0ee2ca 40
e98a976a 41#include "condition-internal.h"
1da26331
JG
42#include "notification-thread.h"
43#include "notification-thread-events.h"
44#include "notification-thread-commands.h"
45#include "lttng-sessiond.h"
46#include "kernel.h"
47
ab0ee2ca
JG
48#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
49#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
50
82b3cbf4
JR
51/* The tracers currently limit the capture size to PIPE_BUF (4kb on linux). */
52#define MAX_CAPTURE_SIZE (PIPE_BUF)
53
51eab943
JG
54enum lttng_object_type {
55 LTTNG_OBJECT_TYPE_UNKNOWN,
56 LTTNG_OBJECT_TYPE_NONE,
57 LTTNG_OBJECT_TYPE_CHANNEL,
58 LTTNG_OBJECT_TYPE_SESSION,
59};
60
ab0ee2ca 61struct lttng_trigger_list_element {
9e0bb80e 62 /* No ownership of the trigger object is assumed. */
f2b3ef9f 63 struct lttng_trigger *trigger;
ab0ee2ca
JG
64 struct cds_list_head node;
65};
66
67struct lttng_channel_trigger_list {
68 struct channel_key channel_key;
ea9a44f0 69 /* List of struct lttng_trigger_list_element. */
ab0ee2ca 70 struct cds_list_head list;
ea9a44f0 71 /* Node in the channel_triggers_ht */
ab0ee2ca 72 struct cds_lfht_node channel_triggers_ht_node;
83b934ad
MD
73 /* call_rcu delayed reclaim. */
74 struct rcu_head rcu_node;
ab0ee2ca
JG
75};
76
ea9a44f0
JG
77/*
78 * List of triggers applying to a given session.
79 *
80 * See:
81 * - lttng_session_trigger_list_create()
82 * - lttng_session_trigger_list_build()
83 * - lttng_session_trigger_list_destroy()
84 * - lttng_session_trigger_list_add()
85 */
86struct lttng_session_trigger_list {
87 /*
88 * Not owned by this; points to the session_info structure's
89 * session name.
90 */
91 const char *session_name;
92 /* List of struct lttng_trigger_list_element. */
93 struct cds_list_head list;
94 /* Node in the session_triggers_ht */
95 struct cds_lfht_node session_triggers_ht_node;
96 /*
97 * Weak reference to the notification system's session triggers
98 * hashtable.
99 *
100 * The session trigger list structure structure is owned by
101 * the session's session_info.
102 *
103 * The session_info is kept alive the the channel_infos holding a
104 * reference to it (reference counting). When those channels are
105 * destroyed (at runtime or on teardown), the reference they hold
106 * to the session_info are released. On destruction of session_info,
107 * session_info_destroy() will remove the list of triggers applying
108 * to this session from the notification system's state.
109 *
110 * This implies that the session_triggers_ht must be destroyed
111 * after the channels.
112 */
113 struct cds_lfht *session_triggers_ht;
114 /* Used for delayed RCU reclaim. */
115 struct rcu_head rcu_node;
116};
117
ab0ee2ca
JG
118struct lttng_trigger_ht_element {
119 struct lttng_trigger *trigger;
120 struct cds_lfht_node node;
242388e4 121 struct cds_lfht_node node_by_name_uid;
83b934ad
MD
122 /* call_rcu delayed reclaim. */
123 struct rcu_head rcu_node;
ab0ee2ca
JG
124};
125
126struct lttng_condition_list_element {
127 struct lttng_condition *condition;
128 struct cds_list_head node;
129};
130
ab0ee2ca
JG
131struct channel_state_sample {
132 struct channel_key key;
133 struct cds_lfht_node channel_state_ht_node;
134 uint64_t highest_usage;
135 uint64_t lowest_usage;
e8360425 136 uint64_t channel_total_consumed;
83b934ad
MD
137 /* call_rcu delayed reclaim. */
138 struct rcu_head rcu_node;
ab0ee2ca
JG
139};
140
e4db5ace 141static unsigned long hash_channel_key(struct channel_key *key);
ed327204 142static int evaluate_buffer_condition(const struct lttng_condition *condition,
e4db5ace 143 struct lttng_evaluation **evaluation,
e8360425
JD
144 const struct notification_thread_state *state,
145 const struct channel_state_sample *previous_sample,
146 const struct channel_state_sample *latest_sample,
147 uint64_t previous_session_consumed_total,
148 uint64_t latest_session_consumed_total,
149 struct channel_info *channel_info);
e4db5ace 150static
9b63a4aa
JG
151int send_evaluation_to_clients(const struct lttng_trigger *trigger,
152 const struct lttng_evaluation *evaluation,
e4db5ace
JR
153 struct notification_client_list *client_list,
154 struct notification_thread_state *state,
155 uid_t channel_uid, gid_t channel_gid);
156
8abe313a 157
ea9a44f0 158/* session_info API */
8abe313a
JG
159static
160void session_info_destroy(void *_data);
161static
162void session_info_get(struct session_info *session_info);
163static
164void session_info_put(struct session_info *session_info);
165static
166struct session_info *session_info_create(const char *name,
ea9a44f0 167 uid_t uid, gid_t gid,
1eee26c5
JG
168 struct lttng_session_trigger_list *trigger_list,
169 struct cds_lfht *sessions_ht);
8abe313a
JG
170static
171void session_info_add_channel(struct session_info *session_info,
172 struct channel_info *channel_info);
173static
174void session_info_remove_channel(struct session_info *session_info,
175 struct channel_info *channel_info);
176
ea9a44f0
JG
177/* lttng_session_trigger_list API */
178static
179struct lttng_session_trigger_list *lttng_session_trigger_list_create(
180 const char *session_name,
181 struct cds_lfht *session_triggers_ht);
182static
183struct lttng_session_trigger_list *lttng_session_trigger_list_build(
184 const struct notification_thread_state *state,
185 const char *session_name);
186static
187void lttng_session_trigger_list_destroy(
188 struct lttng_session_trigger_list *list);
189static
190int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
f2b3ef9f 191 struct lttng_trigger *trigger);
ea9a44f0 192
f2b3ef9f
JG
193static
194int client_handle_transmission_status(
195 struct notification_client *client,
196 enum client_transmission_status transmission_status,
197 struct notification_thread_state *state);
ea9a44f0 198
8b524060
FD
199static
200int handle_one_event_notifier_notification(
201 struct notification_thread_state *state,
202 int pipe, enum lttng_domain_type domain);
203
242388e4
JR
204static
205void free_lttng_trigger_ht_element_rcu(struct rcu_head *node);
206
ab0ee2ca 207static
ac1889bf 208int match_client_socket(struct cds_lfht_node *node, const void *key)
ab0ee2ca
JG
209{
210 /* This double-cast is intended to supress pointer-to-cast warning. */
ac1889bf
JG
211 const int socket = (int) (intptr_t) key;
212 const struct notification_client *client = caa_container_of(node,
213 struct notification_client, client_socket_ht_node);
ab0ee2ca 214
ac1889bf
JG
215 return client->socket == socket;
216}
217
218static
219int match_client_id(struct cds_lfht_node *node, const void *key)
220{
221 /* This double-cast is intended to supress pointer-to-cast warning. */
222 const notification_client_id id = *((notification_client_id *) key);
223 const struct notification_client *client = caa_container_of(
224 node, struct notification_client, client_id_ht_node);
ab0ee2ca 225
ac1889bf 226 return client->id == id;
ab0ee2ca
JG
227}
228
229static
230int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
231{
232 struct channel_key *channel_key = (struct channel_key *) key;
233 struct lttng_channel_trigger_list *trigger_list;
234
235 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
236 channel_triggers_ht_node);
237
238 return !!((channel_key->key == trigger_list->channel_key.key) &&
239 (channel_key->domain == trigger_list->channel_key.domain));
240}
241
51eab943
JG
242static
243int match_session_trigger_list(struct cds_lfht_node *node, const void *key)
244{
245 const char *session_name = (const char *) key;
246 struct lttng_session_trigger_list *trigger_list;
247
248 trigger_list = caa_container_of(node, struct lttng_session_trigger_list,
249 session_triggers_ht_node);
250
251 return !!(strcmp(trigger_list->session_name, session_name) == 0);
252}
253
ab0ee2ca
JG
254static
255int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
256{
257 struct channel_key *channel_key = (struct channel_key *) key;
258 struct channel_state_sample *sample;
259
260 sample = caa_container_of(node, struct channel_state_sample,
261 channel_state_ht_node);
262
263 return !!((channel_key->key == sample->key.key) &&
264 (channel_key->domain == sample->key.domain));
265}
266
267static
268int match_channel_info(struct cds_lfht_node *node, const void *key)
269{
270 struct channel_key *channel_key = (struct channel_key *) key;
271 struct channel_info *channel_info;
272
273 channel_info = caa_container_of(node, struct channel_info,
274 channels_ht_node);
275
276 return !!((channel_key->key == channel_info->key.key) &&
277 (channel_key->domain == channel_info->key.domain));
278}
279
280static
242388e4 281int match_trigger(struct cds_lfht_node *node, const void *key)
ab0ee2ca 282{
242388e4
JR
283 struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
284 struct lttng_trigger_ht_element *trigger_ht_element;
ab0ee2ca 285
242388e4 286 trigger_ht_element = caa_container_of(node, struct lttng_trigger_ht_element,
ab0ee2ca 287 node);
ab0ee2ca 288
242388e4 289 return !!lttng_trigger_is_equal(trigger_key, trigger_ht_element->trigger);
ab0ee2ca
JG
290}
291
e7c93cf9
JR
292static
293int match_trigger_token(struct cds_lfht_node *node, const void *key)
294{
295 const uint64_t *_key = key;
296 struct notification_trigger_tokens_ht_element *element;
297
298 element = caa_container_of(node,
299 struct notification_trigger_tokens_ht_element, node);
300 return *_key == element->token;
301}
302
ab0ee2ca
JG
303static
304int match_client_list_condition(struct cds_lfht_node *node, const void *key)
305{
306 struct lttng_condition *condition_key = (struct lttng_condition *) key;
307 struct notification_client_list *client_list;
f82f93a1 308 const struct lttng_condition *condition;
ab0ee2ca
JG
309
310 assert(condition_key);
311
312 client_list = caa_container_of(node, struct notification_client_list,
505b2d90 313 notification_trigger_clients_ht_node);
f82f93a1 314 condition = lttng_trigger_get_const_condition(client_list->trigger);
ab0ee2ca
JG
315
316 return !!lttng_condition_is_equal(condition_key, condition);
317}
318
f82f93a1
JG
319static
320int match_session(struct cds_lfht_node *node, const void *key)
321{
322 const char *name = key;
323 struct session_info *session_info = caa_container_of(
324 node, struct session_info, sessions_ht_node);
325
326 return !strcmp(session_info->name, name);
327}
328
6900ae81
FD
329static
330const char *notification_command_type_str(
331 enum notification_thread_command_type type)
332{
333 switch (type) {
334 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
335 return "REGISTER_TRIGGER";
336 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
337 return "UNREGISTER_TRIGGER";
338 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
339 return "ADD_CHANNEL";
340 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
341 return "REMOVE_CHANNEL";
342 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
343 return "SESSION_ROTATION_ONGOING";
344 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
345 return "SESSION_ROTATION_COMPLETED";
346 case NOTIFICATION_COMMAND_TYPE_ADD_TRACER_EVENT_SOURCE:
347 return "ADD_TRACER_EVENT_SOURCE";
348 case NOTIFICATION_COMMAND_TYPE_REMOVE_TRACER_EVENT_SOURCE:
349 return "REMOVE_TRACER_EVENT_SOURCE";
350 case NOTIFICATION_COMMAND_TYPE_LIST_TRIGGERS:
351 return "LIST_TRIGGERS";
352 case NOTIFICATION_COMMAND_TYPE_QUIT:
353 return "QUIT";
354 case NOTIFICATION_COMMAND_TYPE_CLIENT_COMMUNICATION_UPDATE:
355 return "CLIENT_COMMUNICATION_UPDATE";
356 default:
357 abort();
358 }
359}
360
242388e4
JR
361/*
362 * Match trigger based on name and credentials only.
363 * Name duplication is NOT allowed for the same uid.
364 */
365static
366int match_trigger_by_name_uid(struct cds_lfht_node *node,
367 const void *key)
368{
369 bool match = false;
370 const char *name;
371 const char *key_name;
372 enum lttng_trigger_status status;
373 const struct lttng_credentials *key_creds;
374 const struct lttng_credentials *node_creds;
375 const struct lttng_trigger *trigger_key =
376 (const struct lttng_trigger *) key;
377 const struct lttng_trigger_ht_element *trigger_ht_element =
378 caa_container_of(node,
379 struct lttng_trigger_ht_element,
380 node_by_name_uid);
381
382 status = lttng_trigger_get_name(trigger_ht_element->trigger, &name);
383 assert(status == LTTNG_TRIGGER_STATUS_OK);
384
385 status = lttng_trigger_get_name(trigger_key, &key_name);
386 assert(status == LTTNG_TRIGGER_STATUS_OK);
387
388 /* Compare the names. */
389 if (strcmp(name, key_name) != 0) {
390 goto end;
391 }
392
393 /* Compare the owners' UIDs. */
394 key_creds = lttng_trigger_get_credentials(trigger_key);
395 node_creds = lttng_trigger_get_credentials(trigger_ht_element->trigger);
396
397 match = lttng_credentials_is_equal_uid(key_creds, node_creds);
398
399end:
400 return match;
401}
402
403/*
404 * Hash trigger based on name and credentials only.
405 */
406static
407unsigned long hash_trigger_by_name_uid(const struct lttng_trigger *trigger)
408{
409 unsigned long hash = 0;
410 const struct lttng_credentials *trigger_creds;
411 const char *trigger_name;
412 enum lttng_trigger_status status;
413
414 status = lttng_trigger_get_name(trigger, &trigger_name);
415 if (status == LTTNG_TRIGGER_STATUS_OK) {
416 hash = hash_key_str(trigger_name, lttng_ht_seed);
417 }
418
419 trigger_creds = lttng_trigger_get_credentials(trigger);
420 hash ^= hash_key_ulong((void *) (unsigned long) LTTNG_OPTIONAL_GET(trigger_creds->uid),
421 lttng_ht_seed);
422
423 return hash;
424}
425
e4db5ace
JR
426static
427unsigned long hash_channel_key(struct channel_key *key)
428{
429 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
430 unsigned long domain_hash = hash_key_ulong(
431 (void *) (unsigned long) key->domain, lttng_ht_seed);
432
433 return key_hash ^ domain_hash;
434}
435
ac1889bf
JG
436static
437unsigned long hash_client_socket(int socket)
438{
439 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
440}
441
442static
443unsigned long hash_client_id(notification_client_id id)
444{
445 return hash_key_u64(&id, lttng_ht_seed);
446}
447
f82f93a1
JG
448/*
449 * Get the type of object to which a given condition applies. Bindings let
450 * the notification system evaluate a trigger's condition when a given
451 * object's state is updated.
452 *
453 * For instance, a condition bound to a channel will be evaluated everytime
454 * the channel's state is changed by a channel monitoring sample.
455 */
456static
457enum lttng_object_type get_condition_binding_object(
458 const struct lttng_condition *condition)
459{
460 switch (lttng_condition_get_type(condition)) {
461 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
462 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
463 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
e742b055 464 return LTTNG_OBJECT_TYPE_CHANNEL;
f82f93a1
JG
465 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
466 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
467 return LTTNG_OBJECT_TYPE_SESSION;
959e3c66
JR
468 case LTTNG_CONDITION_TYPE_EVENT_RULE_HIT:
469 return LTTNG_OBJECT_TYPE_NONE;
f82f93a1
JG
470 default:
471 return LTTNG_OBJECT_TYPE_UNKNOWN;
472 }
473}
474
83b934ad
MD
475static
476void free_channel_info_rcu(struct rcu_head *node)
477{
478 free(caa_container_of(node, struct channel_info, rcu_node));
479}
480
ab0ee2ca
JG
481static
482void channel_info_destroy(struct channel_info *channel_info)
483{
484 if (!channel_info) {
485 return;
486 }
487
8abe313a
JG
488 if (channel_info->session_info) {
489 session_info_remove_channel(channel_info->session_info,
490 channel_info);
491 session_info_put(channel_info->session_info);
ab0ee2ca 492 }
8abe313a
JG
493 if (channel_info->name) {
494 free(channel_info->name);
ab0ee2ca 495 }
83b934ad
MD
496 call_rcu(&channel_info->rcu_node, free_channel_info_rcu);
497}
498
499static
500void free_session_info_rcu(struct rcu_head *node)
501{
502 free(caa_container_of(node, struct session_info, rcu_node));
ab0ee2ca
JG
503}
504
8abe313a 505/* Don't call directly, use the ref-counting mechanism. */
ab0ee2ca 506static
8abe313a 507void session_info_destroy(void *_data)
ab0ee2ca 508{
8abe313a 509 struct session_info *session_info = _data;
3b68d0a3 510 int ret;
ab0ee2ca 511
8abe313a
JG
512 assert(session_info);
513 if (session_info->channel_infos_ht) {
3b68d0a3
JR
514 ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
515 if (ret) {
4c3f302b 516 ERR("[notification-thread] Failed to destroy channel information hash table");
3b68d0a3 517 }
8abe313a 518 }
ea9a44f0 519 lttng_session_trigger_list_destroy(session_info->trigger_list);
1eee26c5
JG
520
521 rcu_read_lock();
522 cds_lfht_del(session_info->sessions_ht,
523 &session_info->sessions_ht_node);
524 rcu_read_unlock();
8abe313a 525 free(session_info->name);
83b934ad 526 call_rcu(&session_info->rcu_node, free_session_info_rcu);
8abe313a 527}
ab0ee2ca 528
8abe313a
JG
529static
530void session_info_get(struct session_info *session_info)
531{
532 if (!session_info) {
533 return;
534 }
535 lttng_ref_get(&session_info->ref);
536}
537
538static
539void session_info_put(struct session_info *session_info)
540{
541 if (!session_info) {
542 return;
ab0ee2ca 543 }
8abe313a
JG
544 lttng_ref_put(&session_info->ref);
545}
546
547static
ea9a44f0 548struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
1eee26c5
JG
549 struct lttng_session_trigger_list *trigger_list,
550 struct cds_lfht *sessions_ht)
8abe313a
JG
551{
552 struct session_info *session_info;
ab0ee2ca 553
8abe313a 554 assert(name);
ab0ee2ca 555
8abe313a
JG
556 session_info = zmalloc(sizeof(*session_info));
557 if (!session_info) {
558 goto end;
559 }
560 lttng_ref_init(&session_info->ref, session_info_destroy);
561
562 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
563 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
564 if (!session_info->channel_infos_ht) {
ab0ee2ca
JG
565 goto error;
566 }
8abe313a
JG
567
568 cds_lfht_node_init(&session_info->sessions_ht_node);
569 session_info->name = strdup(name);
570 if (!session_info->name) {
ab0ee2ca
JG
571 goto error;
572 }
8abe313a
JG
573 session_info->uid = uid;
574 session_info->gid = gid;
ea9a44f0 575 session_info->trigger_list = trigger_list;
1eee26c5 576 session_info->sessions_ht = sessions_ht;
8abe313a
JG
577end:
578 return session_info;
579error:
580 session_info_put(session_info);
581 return NULL;
582}
583
584static
585void session_info_add_channel(struct session_info *session_info,
586 struct channel_info *channel_info)
587{
588 rcu_read_lock();
589 cds_lfht_add(session_info->channel_infos_ht,
590 hash_channel_key(&channel_info->key),
591 &channel_info->session_info_channels_ht_node);
592 rcu_read_unlock();
593}
594
595static
596void session_info_remove_channel(struct session_info *session_info,
597 struct channel_info *channel_info)
598{
599 rcu_read_lock();
600 cds_lfht_del(session_info->channel_infos_ht,
601 &channel_info->session_info_channels_ht_node);
602 rcu_read_unlock();
603}
604
605static
606struct channel_info *channel_info_create(const char *channel_name,
607 struct channel_key *channel_key, uint64_t channel_capacity,
608 struct session_info *session_info)
609{
610 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
611
612 if (!channel_info) {
613 goto end;
614 }
615
ab0ee2ca 616 cds_lfht_node_init(&channel_info->channels_ht_node);
8abe313a
JG
617 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
618 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
619 channel_info->capacity = channel_capacity;
620
621 channel_info->name = strdup(channel_name);
622 if (!channel_info->name) {
623 goto error;
624 }
625
626 /*
627 * Set the references between session and channel infos:
628 * - channel_info holds a strong reference to session_info
629 * - session_info holds a weak reference to channel_info
630 */
631 session_info_get(session_info);
632 session_info_add_channel(session_info, channel_info);
633 channel_info->session_info = session_info;
ab0ee2ca 634end:
8abe313a 635 return channel_info;
ab0ee2ca 636error:
8abe313a 637 channel_info_destroy(channel_info);
ab0ee2ca
JG
638 return NULL;
639}
640
f2b3ef9f 641LTTNG_HIDDEN
505b2d90
JG
642bool notification_client_list_get(struct notification_client_list *list)
643{
644 return urcu_ref_get_unless_zero(&list->ref);
645}
646
647static
648void free_notification_client_list_rcu(struct rcu_head *node)
649{
650 free(caa_container_of(node, struct notification_client_list,
651 rcu_node));
652}
653
654static
655void notification_client_list_release(struct urcu_ref *list_ref)
656{
657 struct notification_client_list *list =
658 container_of(list_ref, typeof(*list), ref);
659 struct notification_client_list_element *client_list_element, *tmp;
660
661 if (list->notification_trigger_clients_ht) {
662 rcu_read_lock();
663 cds_lfht_del(list->notification_trigger_clients_ht,
664 &list->notification_trigger_clients_ht_node);
665 rcu_read_unlock();
666 list->notification_trigger_clients_ht = NULL;
667 }
668 cds_list_for_each_entry_safe(client_list_element, tmp,
669 &list->list, node) {
670 free(client_list_element);
671 }
672 pthread_mutex_destroy(&list->lock);
673 call_rcu(&list->rcu_node, free_notification_client_list_rcu);
674}
675
676static
677struct notification_client_list *notification_client_list_create(
678 const struct lttng_trigger *trigger)
679{
680 struct notification_client_list *client_list =
681 zmalloc(sizeof(*client_list));
682
683 if (!client_list) {
684 goto error;
685 }
686 pthread_mutex_init(&client_list->lock, NULL);
687 urcu_ref_init(&client_list->ref);
688 cds_lfht_node_init(&client_list->notification_trigger_clients_ht_node);
689 CDS_INIT_LIST_HEAD(&client_list->list);
690 client_list->trigger = trigger;
691error:
692 return client_list;
693}
694
695static
696void publish_notification_client_list(
697 struct notification_thread_state *state,
698 struct notification_client_list *list)
699{
700 const struct lttng_condition *condition =
701 lttng_trigger_get_const_condition(list->trigger);
702
703 assert(!list->notification_trigger_clients_ht);
f2b3ef9f 704 notification_client_list_get(list);
505b2d90
JG
705
706 list->notification_trigger_clients_ht =
707 state->notification_trigger_clients_ht;
708
709 rcu_read_lock();
710 cds_lfht_add(state->notification_trigger_clients_ht,
711 lttng_condition_hash(condition),
712 &list->notification_trigger_clients_ht_node);
713 rcu_read_unlock();
714}
715
f2b3ef9f 716LTTNG_HIDDEN
505b2d90
JG
717void notification_client_list_put(struct notification_client_list *list)
718{
719 if (!list) {
720 return;
721 }
722 return urcu_ref_put(&list->ref, notification_client_list_release);
723}
724
725/* Provides a reference to the returned list. */
ed327204
JG
726static
727struct notification_client_list *get_client_list_from_condition(
728 struct notification_thread_state *state,
729 const struct lttng_condition *condition)
730{
731 struct cds_lfht_node *node;
732 struct cds_lfht_iter iter;
505b2d90 733 struct notification_client_list *list = NULL;
ed327204 734
505b2d90 735 rcu_read_lock();
ed327204
JG
736 cds_lfht_lookup(state->notification_trigger_clients_ht,
737 lttng_condition_hash(condition),
738 match_client_list_condition,
739 condition,
740 &iter);
741 node = cds_lfht_iter_get_node(&iter);
505b2d90
JG
742 if (node) {
743 list = container_of(node, struct notification_client_list,
744 notification_trigger_clients_ht_node);
745 list = notification_client_list_get(list) ? list : NULL;
746 }
ed327204 747
505b2d90
JG
748 rcu_read_unlock();
749 return list;
ed327204
JG
750}
751
e4db5ace 752static
f82f93a1
JG
753int evaluate_channel_condition_for_client(
754 const struct lttng_condition *condition,
755 struct notification_thread_state *state,
756 struct lttng_evaluation **evaluation,
757 uid_t *session_uid, gid_t *session_gid)
e4db5ace
JR
758{
759 int ret;
760 struct cds_lfht_iter iter;
761 struct cds_lfht_node *node;
762 struct channel_info *channel_info = NULL;
763 struct channel_key *channel_key = NULL;
764 struct channel_state_sample *last_sample = NULL;
765 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
e4db5ace 766
505b2d90
JG
767 rcu_read_lock();
768
f82f93a1 769 /* Find the channel associated with the condition. */
e4db5ace 770 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
f82f93a1 771 channel_trigger_list, channel_triggers_ht_node) {
e4db5ace
JR
772 struct lttng_trigger_list_element *element;
773
774 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
9b63a4aa 775 const struct lttng_condition *current_condition =
f82f93a1 776 lttng_trigger_get_const_condition(
e4db5ace
JR
777 element->trigger);
778
779 assert(current_condition);
780 if (!lttng_condition_is_equal(condition,
2ae99f0b 781 current_condition)) {
e4db5ace
JR
782 continue;
783 }
784
785 /* Found the trigger, save the channel key. */
786 channel_key = &channel_trigger_list->channel_key;
787 break;
788 }
789 if (channel_key) {
790 /* The channel key was found stop iteration. */
791 break;
792 }
793 }
794
795 if (!channel_key){
796 /* No channel found; normal exit. */
f82f93a1 797 DBG("[notification-thread] No known channel associated with newly subscribed-to condition");
e4db5ace
JR
798 ret = 0;
799 goto end;
800 }
801
802 /* Fetch channel info for the matching channel. */
803 cds_lfht_lookup(state->channels_ht,
804 hash_channel_key(channel_key),
805 match_channel_info,
806 channel_key,
807 &iter);
808 node = cds_lfht_iter_get_node(&iter);
809 assert(node);
810 channel_info = caa_container_of(node, struct channel_info,
811 channels_ht_node);
812
813 /* Retrieve the channel's last sample, if it exists. */
814 cds_lfht_lookup(state->channel_state_ht,
815 hash_channel_key(channel_key),
816 match_channel_state_sample,
817 channel_key,
818 &iter);
819 node = cds_lfht_iter_get_node(&iter);
820 if (node) {
821 last_sample = caa_container_of(node,
822 struct channel_state_sample,
823 channel_state_ht_node);
824 } else {
825 /* Nothing to evaluate, no sample was ever taken. Normal exit */
826 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
827 ret = 0;
828 goto end;
829 }
830
f82f93a1 831 ret = evaluate_buffer_condition(condition, evaluation, state,
e8360425
JD
832 NULL, last_sample,
833 0, channel_info->session_info->consumed_data_size,
834 channel_info);
e4db5ace 835 if (ret) {
066a3c82 836 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
e4db5ace
JR
837 goto end;
838 }
839
f82f93a1
JG
840 *session_uid = channel_info->session_info->uid;
841 *session_gid = channel_info->session_info->gid;
842end:
505b2d90 843 rcu_read_unlock();
f82f93a1
JG
844 return ret;
845}
846
847static
848const char *get_condition_session_name(const struct lttng_condition *condition)
849{
850 const char *session_name = NULL;
851 enum lttng_condition_status status;
852
853 switch (lttng_condition_get_type(condition)) {
854 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
855 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
856 status = lttng_condition_buffer_usage_get_session_name(
857 condition, &session_name);
858 break;
859 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
860 status = lttng_condition_session_consumed_size_get_session_name(
861 condition, &session_name);
862 break;
863 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
864 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
865 status = lttng_condition_session_rotation_get_session_name(
866 condition, &session_name);
867 break;
868 default:
869 abort();
870 }
871 if (status != LTTNG_CONDITION_STATUS_OK) {
872 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
873 goto end;
874 }
875end:
876 return session_name;
877}
878
f82f93a1
JG
879static
880int evaluate_session_condition_for_client(
881 const struct lttng_condition *condition,
882 struct notification_thread_state *state,
883 struct lttng_evaluation **evaluation,
884 uid_t *session_uid, gid_t *session_gid)
885{
886 int ret;
887 struct cds_lfht_iter iter;
888 struct cds_lfht_node *node;
889 const char *session_name;
890 struct session_info *session_info = NULL;
891
505b2d90 892 rcu_read_lock();
f82f93a1
JG
893 session_name = get_condition_session_name(condition);
894
895 /* Find the session associated with the trigger. */
896 cds_lfht_lookup(state->sessions_ht,
897 hash_key_str(session_name, lttng_ht_seed),
898 match_session,
899 session_name,
900 &iter);
901 node = cds_lfht_iter_get_node(&iter);
902 if (!node) {
903 DBG("[notification-thread] No known session matching name \"%s\"",
904 session_name);
905 ret = 0;
906 goto end;
907 }
908
909 session_info = caa_container_of(node, struct session_info,
910 sessions_ht_node);
911 session_info_get(session_info);
912
913 /*
914 * Evaluation is performed in-line here since only one type of
915 * session-bound condition is handled for the moment.
916 */
917 switch (lttng_condition_get_type(condition)) {
918 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
919 if (!session_info->rotation.ongoing) {
be558f88 920 ret = 0;
f82f93a1
JG
921 goto end_session_put;
922 }
923
924 *evaluation = lttng_evaluation_session_rotation_ongoing_create(
925 session_info->rotation.id);
926 if (!*evaluation) {
927 /* Fatal error. */
928 ERR("[notification-thread] Failed to create session rotation ongoing evaluation for session \"%s\"",
929 session_info->name);
930 ret = -1;
931 goto end_session_put;
932 }
933 ret = 0;
934 break;
935 default:
936 ret = 0;
937 goto end_session_put;
938 }
939
940 *session_uid = session_info->uid;
941 *session_gid = session_info->gid;
942
943end_session_put:
944 session_info_put(session_info);
945end:
505b2d90 946 rcu_read_unlock();
f82f93a1
JG
947 return ret;
948}
949
f82f93a1
JG
950static
951int evaluate_condition_for_client(const struct lttng_trigger *trigger,
952 const struct lttng_condition *condition,
953 struct notification_client *client,
954 struct notification_thread_state *state)
955{
956 int ret;
957 struct lttng_evaluation *evaluation = NULL;
505b2d90
JG
958 struct notification_client_list client_list = {
959 .lock = PTHREAD_MUTEX_INITIALIZER,
960 };
f82f93a1
JG
961 struct notification_client_list_element client_list_element = { 0 };
962 uid_t object_uid = 0;
963 gid_t object_gid = 0;
964
965 assert(trigger);
966 assert(condition);
967 assert(client);
968 assert(state);
969
970 switch (get_condition_binding_object(condition)) {
971 case LTTNG_OBJECT_TYPE_SESSION:
972 ret = evaluate_session_condition_for_client(condition, state,
973 &evaluation, &object_uid, &object_gid);
974 break;
975 case LTTNG_OBJECT_TYPE_CHANNEL:
976 ret = evaluate_channel_condition_for_client(condition, state,
977 &evaluation, &object_uid, &object_gid);
978 break;
979 case LTTNG_OBJECT_TYPE_NONE:
7e802d0a 980 DBG("[notification-thread] Newly subscribed-to condition not bound to object, nothing to evaluate");
f82f93a1
JG
981 ret = 0;
982 goto end;
983 case LTTNG_OBJECT_TYPE_UNKNOWN:
984 default:
985 ret = -1;
986 goto end;
987 }
af0c318d
JG
988 if (ret) {
989 /* Fatal error. */
990 goto end;
991 }
e4db5ace
JR
992 if (!evaluation) {
993 /* Evaluation yielded nothing. Normal exit. */
994 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
995 ret = 0;
996 goto end;
997 }
998
999 /*
1000 * Create a temporary client list with the client currently
1001 * subscribing.
1002 */
505b2d90 1003 cds_lfht_node_init(&client_list.notification_trigger_clients_ht_node);
e4db5ace
JR
1004 CDS_INIT_LIST_HEAD(&client_list.list);
1005 client_list.trigger = trigger;
1006
1007 CDS_INIT_LIST_HEAD(&client_list_element.node);
1008 client_list_element.client = client;
1009 cds_list_add(&client_list_element.node, &client_list.list);
1010
1011 /* Send evaluation result to the newly-subscribed client. */
1012 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
1013 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
f82f93a1 1014 state, object_uid, object_gid);
e4db5ace
JR
1015
1016end:
1017 return ret;
1018}
1019
ab0ee2ca
JG
1020static
1021int notification_thread_client_subscribe(struct notification_client *client,
1022 struct lttng_condition *condition,
1023 struct notification_thread_state *state,
1024 enum lttng_notification_channel_status *_status)
1025{
1026 int ret = 0;
505b2d90 1027 struct notification_client_list *client_list = NULL;
ab0ee2ca
JG
1028 struct lttng_condition_list_element *condition_list_element = NULL;
1029 struct notification_client_list_element *client_list_element = NULL;
1030 enum lttng_notification_channel_status status =
1031 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1032
1033 /*
1034 * Ensure that the client has not already subscribed to this condition
1035 * before.
1036 */
1037 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
1038 if (lttng_condition_is_equal(condition_list_element->condition,
1039 condition)) {
1040 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
1041 goto end;
1042 }
1043 }
1044
1045 condition_list_element = zmalloc(sizeof(*condition_list_element));
1046 if (!condition_list_element) {
1047 ret = -1;
1048 goto error;
1049 }
1050 client_list_element = zmalloc(sizeof(*client_list_element));
1051 if (!client_list_element) {
1052 ret = -1;
1053 goto error;
1054 }
1055
ab0ee2ca
JG
1056 /*
1057 * Add the newly-subscribed condition to the client's subscription list.
1058 */
1059 CDS_INIT_LIST_HEAD(&condition_list_element->node);
1060 condition_list_element->condition = condition;
1061 cds_list_add(&condition_list_element->node, &client->condition_list);
1062
ed327204
JG
1063 client_list = get_client_list_from_condition(state, condition);
1064 if (!client_list) {
2ae99f0b
JG
1065 /*
1066 * No notification-emiting trigger registered with this
1067 * condition. We don't evaluate the condition right away
1068 * since this trigger is not registered yet.
1069 */
4fb43b68 1070 free(client_list_element);
505b2d90 1071 goto end;
ab0ee2ca
JG
1072 }
1073
2ae99f0b
JG
1074 /*
1075 * The condition to which the client just subscribed is evaluated
1076 * at this point so that conditions that are already TRUE result
1077 * in a notification being sent out.
505b2d90
JG
1078 *
1079 * The client_list's trigger is used without locking the list itself.
1080 * This is correct since the list doesn't own the trigger and the
1081 * object is immutable.
2ae99f0b 1082 */
e4db5ace
JR
1083 if (evaluate_condition_for_client(client_list->trigger, condition,
1084 client, state)) {
1085 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
1086 ret = -1;
4bd5b4af 1087 free(client_list_element);
505b2d90 1088 goto end;
e4db5ace
JR
1089 }
1090
1091 /*
1092 * Add the client to the list of clients interested in a given trigger
1093 * if a "notification" trigger with a corresponding condition was
1094 * added prior.
1095 */
ab0ee2ca
JG
1096 client_list_element->client = client;
1097 CDS_INIT_LIST_HEAD(&client_list_element->node);
505b2d90
JG
1098
1099 pthread_mutex_lock(&client_list->lock);
ab0ee2ca 1100 cds_list_add(&client_list_element->node, &client_list->list);
505b2d90 1101 pthread_mutex_unlock(&client_list->lock);
ab0ee2ca
JG
1102end:
1103 if (_status) {
1104 *_status = status;
1105 }
505b2d90
JG
1106 if (client_list) {
1107 notification_client_list_put(client_list);
1108 }
ab0ee2ca
JG
1109 return ret;
1110error:
1111 free(condition_list_element);
1112 free(client_list_element);
1113 return ret;
1114}
1115
1116static
1117int notification_thread_client_unsubscribe(
1118 struct notification_client *client,
1119 struct lttng_condition *condition,
1120 struct notification_thread_state *state,
1121 enum lttng_notification_channel_status *_status)
1122{
ab0ee2ca
JG
1123 struct notification_client_list *client_list;
1124 struct lttng_condition_list_element *condition_list_element,
1125 *condition_tmp;
1126 struct notification_client_list_element *client_list_element,
1127 *client_tmp;
1128 bool condition_found = false;
1129 enum lttng_notification_channel_status status =
1130 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1131
1132 /* Remove the condition from the client's condition list. */
1133 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
1134 &client->condition_list, node) {
1135 if (!lttng_condition_is_equal(condition_list_element->condition,
1136 condition)) {
1137 continue;
1138 }
1139
1140 cds_list_del(&condition_list_element->node);
1141 /*
1142 * The caller may be iterating on the client's conditions to
1143 * tear down a client's connection. In this case, the condition
1144 * will be destroyed at the end.
1145 */
1146 if (condition != condition_list_element->condition) {
1147 lttng_condition_destroy(
1148 condition_list_element->condition);
1149 }
1150 free(condition_list_element);
1151 condition_found = true;
1152 break;
1153 }
1154
1155 if (!condition_found) {
1156 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
1157 goto end;
1158 }
1159
1160 /*
1161 * Remove the client from the list of clients interested the trigger
1162 * matching the condition.
1163 */
ed327204
JG
1164 client_list = get_client_list_from_condition(state, condition);
1165 if (!client_list) {
505b2d90 1166 goto end;
ab0ee2ca
JG
1167 }
1168
505b2d90 1169 pthread_mutex_lock(&client_list->lock);
ab0ee2ca
JG
1170 cds_list_for_each_entry_safe(client_list_element, client_tmp,
1171 &client_list->list, node) {
505b2d90 1172 if (client_list_element->client->id != client->id) {
ab0ee2ca
JG
1173 continue;
1174 }
1175 cds_list_del(&client_list_element->node);
1176 free(client_list_element);
1177 break;
1178 }
505b2d90
JG
1179 pthread_mutex_unlock(&client_list->lock);
1180 notification_client_list_put(client_list);
1181 client_list = NULL;
ab0ee2ca
JG
1182end:
1183 lttng_condition_destroy(condition);
1184 if (_status) {
1185 *_status = status;
1186 }
1187 return 0;
1188}
1189
83b934ad
MD
1190static
1191void free_notification_client_rcu(struct rcu_head *node)
1192{
1193 free(caa_container_of(node, struct notification_client, rcu_node));
1194}
1195
ab0ee2ca
JG
1196static
1197void notification_client_destroy(struct notification_client *client,
1198 struct notification_thread_state *state)
1199{
ab0ee2ca
JG
1200 if (!client) {
1201 return;
1202 }
1203
505b2d90
JG
1204 /*
1205 * The client object is not reachable by other threads, no need to lock
1206 * the client here.
1207 */
ab0ee2ca
JG
1208 if (client->socket >= 0) {
1209 (void) lttcomm_close_unix_sock(client->socket);
ac1889bf 1210 client->socket = -1;
ab0ee2ca 1211 }
505b2d90 1212 client->communication.active = false;
882093ee
JR
1213 lttng_payload_reset(&client->communication.inbound.payload);
1214 lttng_payload_reset(&client->communication.outbound.payload);
505b2d90 1215 pthread_mutex_destroy(&client->lock);
83b934ad 1216 call_rcu(&client->rcu_node, free_notification_client_rcu);
ab0ee2ca
JG
1217}
1218
1219/*
1220 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1221 * client pointer).
1222 */
1223static
1224struct notification_client *get_client_from_socket(int socket,
1225 struct notification_thread_state *state)
1226{
1227 struct cds_lfht_iter iter;
1228 struct cds_lfht_node *node;
1229 struct notification_client *client = NULL;
1230
1231 cds_lfht_lookup(state->client_socket_ht,
ac1889bf
JG
1232 hash_client_socket(socket),
1233 match_client_socket,
ab0ee2ca
JG
1234 (void *) (unsigned long) socket,
1235 &iter);
1236 node = cds_lfht_iter_get_node(&iter);
1237 if (!node) {
1238 goto end;
1239 }
1240
1241 client = caa_container_of(node, struct notification_client,
1242 client_socket_ht_node);
1243end:
1244 return client;
1245}
1246
f2b3ef9f
JG
1247/*
1248 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1249 * client pointer).
1250 */
1251static
1252struct notification_client *get_client_from_id(notification_client_id id,
1253 struct notification_thread_state *state)
1254{
1255 struct cds_lfht_iter iter;
1256 struct cds_lfht_node *node;
1257 struct notification_client *client = NULL;
1258
1259 cds_lfht_lookup(state->client_id_ht,
1260 hash_client_id(id),
1261 match_client_id,
1262 &id,
1263 &iter);
1264 node = cds_lfht_iter_get_node(&iter);
1265 if (!node) {
1266 goto end;
1267 }
1268
1269 client = caa_container_of(node, struct notification_client,
1270 client_id_ht_node);
1271end:
1272 return client;
1273}
1274
ab0ee2ca 1275static
e8360425 1276bool buffer_usage_condition_applies_to_channel(
51eab943
JG
1277 const struct lttng_condition *condition,
1278 const struct channel_info *channel_info)
ab0ee2ca
JG
1279{
1280 enum lttng_condition_status status;
e8360425
JD
1281 enum lttng_domain_type condition_domain;
1282 const char *condition_session_name = NULL;
1283 const char *condition_channel_name = NULL;
ab0ee2ca 1284
e8360425
JD
1285 status = lttng_condition_buffer_usage_get_domain_type(condition,
1286 &condition_domain);
1287 assert(status == LTTNG_CONDITION_STATUS_OK);
1288 if (channel_info->key.domain != condition_domain) {
ab0ee2ca
JG
1289 goto fail;
1290 }
1291
e8360425
JD
1292 status = lttng_condition_buffer_usage_get_session_name(
1293 condition, &condition_session_name);
1294 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1295
1296 status = lttng_condition_buffer_usage_get_channel_name(
1297 condition, &condition_channel_name);
1298 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
1299
1300 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1301 goto fail;
1302 }
1303 if (strcmp(channel_info->name, condition_channel_name)) {
ab0ee2ca
JG
1304 goto fail;
1305 }
1306
e8360425
JD
1307 return true;
1308fail:
1309 return false;
1310}
1311
1312static
1313bool session_consumed_size_condition_applies_to_channel(
51eab943
JG
1314 const struct lttng_condition *condition,
1315 const struct channel_info *channel_info)
e8360425
JD
1316{
1317 enum lttng_condition_status status;
1318 const char *condition_session_name = NULL;
1319
1320 status = lttng_condition_session_consumed_size_get_session_name(
1321 condition, &condition_session_name);
1322 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1323
1324 if (strcmp(channel_info->session_info->name, condition_session_name)) {
ab0ee2ca
JG
1325 goto fail;
1326 }
1327
e8360425
JD
1328 return true;
1329fail:
1330 return false;
1331}
ab0ee2ca 1332
51eab943
JG
1333static
1334bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
1335 const struct channel_info *channel_info)
1336{
1337 const struct lttng_condition *condition;
e8360425 1338 bool trigger_applies;
ab0ee2ca 1339
51eab943 1340 condition = lttng_trigger_get_const_condition(trigger);
e8360425 1341 if (!condition) {
ab0ee2ca
JG
1342 goto fail;
1343 }
e8360425
JD
1344
1345 switch (lttng_condition_get_type(condition)) {
1346 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1347 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1348 trigger_applies = buffer_usage_condition_applies_to_channel(
1349 condition, channel_info);
1350 break;
1351 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1352 trigger_applies = session_consumed_size_condition_applies_to_channel(
1353 condition, channel_info);
1354 break;
1355 default:
ab0ee2ca
JG
1356 goto fail;
1357 }
1358
e8360425 1359 return trigger_applies;
ab0ee2ca
JG
1360fail:
1361 return false;
1362}
1363
1364static
1365bool trigger_applies_to_client(struct lttng_trigger *trigger,
1366 struct notification_client *client)
1367{
1368 bool applies = false;
1369 struct lttng_condition_list_element *condition_list_element;
1370
1371 cds_list_for_each_entry(condition_list_element, &client->condition_list,
1372 node) {
1373 applies = lttng_condition_is_equal(
1374 condition_list_element->condition,
1375 lttng_trigger_get_condition(trigger));
1376 if (applies) {
1377 break;
1378 }
1379 }
1380 return applies;
1381}
1382
ed327204
JG
1383/* Must be called with RCU read lock held. */
1384static
1385struct lttng_session_trigger_list *get_session_trigger_list(
1386 struct notification_thread_state *state,
1387 const char *session_name)
1388{
1389 struct lttng_session_trigger_list *list = NULL;
1390 struct cds_lfht_node *node;
1391 struct cds_lfht_iter iter;
1392
1393 cds_lfht_lookup(state->session_triggers_ht,
1394 hash_key_str(session_name, lttng_ht_seed),
1395 match_session_trigger_list,
1396 session_name,
1397 &iter);
1398 node = cds_lfht_iter_get_node(&iter);
1399 if (!node) {
1400 /*
1401 * Not an error, the list of triggers applying to that session
1402 * will be initialized when the session is created.
1403 */
1404 DBG("[notification-thread] No trigger list found for session \"%s\" as it is not yet known to the notification system",
1405 session_name);
1406 goto end;
1407 }
1408
e742b055 1409 list = caa_container_of(node,
ed327204
JG
1410 struct lttng_session_trigger_list,
1411 session_triggers_ht_node);
1412end:
1413 return list;
1414}
1415
ea9a44f0
JG
1416/*
1417 * Allocate an empty lttng_session_trigger_list for the session named
1418 * 'session_name'.
1419 *
1420 * No ownership of 'session_name' is assumed by the session trigger list.
1421 * It is the caller's responsability to ensure the session name is alive
1422 * for as long as this list is.
1423 */
1424static
1425struct lttng_session_trigger_list *lttng_session_trigger_list_create(
1426 const char *session_name,
1427 struct cds_lfht *session_triggers_ht)
1428{
1429 struct lttng_session_trigger_list *list;
1430
1431 list = zmalloc(sizeof(*list));
1432 if (!list) {
1433 goto end;
1434 }
1435 list->session_name = session_name;
1436 CDS_INIT_LIST_HEAD(&list->list);
1437 cds_lfht_node_init(&list->session_triggers_ht_node);
1438 list->session_triggers_ht = session_triggers_ht;
1439
1440 rcu_read_lock();
1441 /* Publish the list through the session_triggers_ht. */
1442 cds_lfht_add(session_triggers_ht,
1443 hash_key_str(session_name, lttng_ht_seed),
1444 &list->session_triggers_ht_node);
1445 rcu_read_unlock();
1446end:
1447 return list;
1448}
1449
1450static
1451void free_session_trigger_list_rcu(struct rcu_head *node)
1452{
1453 free(caa_container_of(node, struct lttng_session_trigger_list,
1454 rcu_node));
1455}
1456
1457static
1458void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list)
1459{
1460 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1461
1462 /* Empty the list element by element, and then free the list itself. */
1463 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1464 &list->list, node) {
1465 cds_list_del(&trigger_list_element->node);
1466 free(trigger_list_element);
1467 }
1468 rcu_read_lock();
1469 /* Unpublish the list from the session_triggers_ht. */
1470 cds_lfht_del(list->session_triggers_ht,
1471 &list->session_triggers_ht_node);
1472 rcu_read_unlock();
1473 call_rcu(&list->rcu_node, free_session_trigger_list_rcu);
1474}
1475
1476static
1477int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
f2b3ef9f 1478 struct lttng_trigger *trigger)
ea9a44f0
JG
1479{
1480 int ret = 0;
1481 struct lttng_trigger_list_element *new_element =
1482 zmalloc(sizeof(*new_element));
1483
1484 if (!new_element) {
1485 ret = -1;
1486 goto end;
1487 }
1488 CDS_INIT_LIST_HEAD(&new_element->node);
1489 new_element->trigger = trigger;
1490 cds_list_add(&new_element->node, &list->list);
1491end:
1492 return ret;
1493}
1494
1495static
1496bool trigger_applies_to_session(const struct lttng_trigger *trigger,
1497 const char *session_name)
1498{
1499 bool applies = false;
1500 const struct lttng_condition *condition;
1501
1502 condition = lttng_trigger_get_const_condition(trigger);
1503 switch (lttng_condition_get_type(condition)) {
1504 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1505 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1506 {
1507 enum lttng_condition_status condition_status;
1508 const char *condition_session_name;
1509
1510 condition_status = lttng_condition_session_rotation_get_session_name(
1511 condition, &condition_session_name);
1512 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
1513 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1514 goto end;
1515 }
1516
1517 assert(condition_session_name);
1518 applies = !strcmp(condition_session_name, session_name);
1519 break;
1520 }
1521 default:
1522 goto end;
1523 }
1524end:
1525 return applies;
1526}
1527
1528/*
1529 * Allocate and initialize an lttng_session_trigger_list which contains
1530 * all triggers that apply to the session named 'session_name'.
1531 *
1532 * No ownership of 'session_name' is assumed by the session trigger list.
1533 * It is the caller's responsability to ensure the session name is alive
1534 * for as long as this list is.
1535 */
1536static
1537struct lttng_session_trigger_list *lttng_session_trigger_list_build(
1538 const struct notification_thread_state *state,
1539 const char *session_name)
1540{
1541 int trigger_count = 0;
1542 struct lttng_session_trigger_list *session_trigger_list = NULL;
1543 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1544 struct cds_lfht_iter iter;
1545
1546 session_trigger_list = lttng_session_trigger_list_create(session_name,
1547 state->session_triggers_ht);
1548
1549 /* Add all triggers applying to the session named 'session_name'. */
1550 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1551 node) {
1552 int ret;
1553
1554 if (!trigger_applies_to_session(trigger_ht_element->trigger,
1555 session_name)) {
1556 continue;
1557 }
1558
1559 ret = lttng_session_trigger_list_add(session_trigger_list,
1560 trigger_ht_element->trigger);
1561 if (ret) {
1562 goto error;
1563 }
1564
1565 trigger_count++;
1566 }
1567
1568 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1569 trigger_count);
1570 return session_trigger_list;
1571error:
1572 lttng_session_trigger_list_destroy(session_trigger_list);
1573 return NULL;
1574}
1575
8abe313a
JG
1576static
1577struct session_info *find_or_create_session_info(
1578 struct notification_thread_state *state,
1579 const char *name, uid_t uid, gid_t gid)
1580{
1581 struct session_info *session = NULL;
1582 struct cds_lfht_node *node;
1583 struct cds_lfht_iter iter;
ea9a44f0 1584 struct lttng_session_trigger_list *trigger_list;
8abe313a
JG
1585
1586 rcu_read_lock();
1587 cds_lfht_lookup(state->sessions_ht,
1588 hash_key_str(name, lttng_ht_seed),
1589 match_session,
1590 name,
1591 &iter);
1592 node = cds_lfht_iter_get_node(&iter);
1593 if (node) {
1594 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1595 name, uid, gid);
1596 session = caa_container_of(node, struct session_info,
1597 sessions_ht_node);
1598 assert(session->uid == uid);
1599 assert(session->gid == gid);
1eee26c5
JG
1600 session_info_get(session);
1601 goto end;
ea9a44f0
JG
1602 }
1603
1604 trigger_list = lttng_session_trigger_list_build(state, name);
1605 if (!trigger_list) {
1606 goto error;
8abe313a
JG
1607 }
1608
1eee26c5
JG
1609 session = session_info_create(name, uid, gid, trigger_list,
1610 state->sessions_ht);
8abe313a
JG
1611 if (!session) {
1612 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1613 name, uid, gid);
b248644d 1614 lttng_session_trigger_list_destroy(trigger_list);
ea9a44f0 1615 goto error;
8abe313a 1616 }
ea9a44f0 1617 trigger_list = NULL;
577983cb
JG
1618
1619 cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
9b63a4aa 1620 &session->sessions_ht_node);
1eee26c5 1621end:
8abe313a
JG
1622 rcu_read_unlock();
1623 return session;
ea9a44f0
JG
1624error:
1625 rcu_read_unlock();
1626 session_info_put(session);
1627 return NULL;
8abe313a
JG
1628}
1629
ab0ee2ca
JG
1630static
1631int handle_notification_thread_command_add_channel(
8abe313a
JG
1632 struct notification_thread_state *state,
1633 const char *session_name, uid_t session_uid, gid_t session_gid,
1634 const char *channel_name, enum lttng_domain_type channel_domain,
1635 uint64_t channel_key_int, uint64_t channel_capacity,
1636 enum lttng_error_code *cmd_result)
ab0ee2ca
JG
1637{
1638 struct cds_list_head trigger_list;
8abe313a
JG
1639 struct channel_info *new_channel_info = NULL;
1640 struct channel_key channel_key = {
1641 .key = channel_key_int,
1642 .domain = channel_domain,
1643 };
ab0ee2ca
JG
1644 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1645 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1646 int trigger_count = 0;
1647 struct cds_lfht_iter iter;
8abe313a 1648 struct session_info *session_info = NULL;
ab0ee2ca
JG
1649
1650 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
8abe313a 1651 channel_name, session_name, channel_key_int,
526200e4 1652 lttng_domain_type_str(channel_domain));
ab0ee2ca
JG
1653
1654 CDS_INIT_LIST_HEAD(&trigger_list);
1655
8abe313a
JG
1656 session_info = find_or_create_session_info(state, session_name,
1657 session_uid, session_gid);
1658 if (!session_info) {
a9577b76 1659 /* Allocation error or an internal error occurred. */
ab0ee2ca
JG
1660 goto error;
1661 }
1662
8abe313a
JG
1663 new_channel_info = channel_info_create(channel_name, &channel_key,
1664 channel_capacity, session_info);
1665 if (!new_channel_info) {
1666 goto error;
1667 }
ab0ee2ca 1668
83b934ad 1669 rcu_read_lock();
ab0ee2ca
JG
1670 /* Build a list of all triggers applying to the new channel. */
1671 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1672 node) {
1673 struct lttng_trigger_list_element *new_element;
1674
1675 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
8abe313a 1676 new_channel_info)) {
ab0ee2ca
JG
1677 continue;
1678 }
1679
1680 new_element = zmalloc(sizeof(*new_element));
1681 if (!new_element) {
83b934ad 1682 rcu_read_unlock();
ab0ee2ca
JG
1683 goto error;
1684 }
1685 CDS_INIT_LIST_HEAD(&new_element->node);
1686 new_element->trigger = trigger_ht_element->trigger;
1687 cds_list_add(&new_element->node, &trigger_list);
1688 trigger_count++;
1689 }
83b934ad 1690 rcu_read_unlock();
ab0ee2ca
JG
1691
1692 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1693 trigger_count);
1694 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1695 if (!channel_trigger_list) {
1696 goto error;
1697 }
8abe313a 1698 channel_trigger_list->channel_key = new_channel_info->key;
ab0ee2ca
JG
1699 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1700 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1701 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1702
1703 rcu_read_lock();
1704 /* Add channel to the channel_ht which owns the channel_infos. */
1705 cds_lfht_add(state->channels_ht,
8abe313a 1706 hash_channel_key(&new_channel_info->key),
ab0ee2ca
JG
1707 &new_channel_info->channels_ht_node);
1708 /*
1709 * Add the list of triggers associated with this channel to the
1710 * channel_triggers_ht.
1711 */
1712 cds_lfht_add(state->channel_triggers_ht,
8abe313a 1713 hash_channel_key(&new_channel_info->key),
ab0ee2ca
JG
1714 &channel_trigger_list->channel_triggers_ht_node);
1715 rcu_read_unlock();
1eee26c5 1716 session_info_put(session_info);
ab0ee2ca
JG
1717 *cmd_result = LTTNG_OK;
1718 return 0;
1719error:
ab0ee2ca 1720 channel_info_destroy(new_channel_info);
8abe313a 1721 session_info_put(session_info);
ab0ee2ca
JG
1722 return 1;
1723}
1724
83b934ad
MD
1725static
1726void free_channel_trigger_list_rcu(struct rcu_head *node)
1727{
1728 free(caa_container_of(node, struct lttng_channel_trigger_list,
1729 rcu_node));
1730}
1731
1732static
1733void free_channel_state_sample_rcu(struct rcu_head *node)
1734{
1735 free(caa_container_of(node, struct channel_state_sample,
1736 rcu_node));
1737}
1738
ab0ee2ca
JG
1739static
1740int handle_notification_thread_command_remove_channel(
1741 struct notification_thread_state *state,
1742 uint64_t channel_key, enum lttng_domain_type domain,
1743 enum lttng_error_code *cmd_result)
1744{
1745 struct cds_lfht_node *node;
1746 struct cds_lfht_iter iter;
1747 struct lttng_channel_trigger_list *trigger_list;
1748 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1749 struct channel_key key = { .key = channel_key, .domain = domain };
1750 struct channel_info *channel_info;
1751
1752 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
526200e4 1753 channel_key, lttng_domain_type_str(domain));
ab0ee2ca
JG
1754
1755 rcu_read_lock();
1756
1757 cds_lfht_lookup(state->channel_triggers_ht,
1758 hash_channel_key(&key),
1759 match_channel_trigger_list,
1760 &key,
1761 &iter);
1762 node = cds_lfht_iter_get_node(&iter);
1763 /*
1764 * There is a severe internal error if we are being asked to remove a
1765 * channel that doesn't exist.
1766 */
1767 if (!node) {
1768 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1769 goto end;
1770 }
1771
1772 /* Free the list of triggers associated with this channel. */
1773 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1774 channel_triggers_ht_node);
1775 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1776 &trigger_list->list, node) {
1777 cds_list_del(&trigger_list_element->node);
1778 free(trigger_list_element);
1779 }
1780 cds_lfht_del(state->channel_triggers_ht, node);
83b934ad 1781 call_rcu(&trigger_list->rcu_node, free_channel_trigger_list_rcu);
ab0ee2ca
JG
1782
1783 /* Free sampled channel state. */
1784 cds_lfht_lookup(state->channel_state_ht,
1785 hash_channel_key(&key),
1786 match_channel_state_sample,
1787 &key,
1788 &iter);
1789 node = cds_lfht_iter_get_node(&iter);
1790 /*
1791 * This is expected to be NULL if the channel is destroyed before we
1792 * received a sample.
1793 */
1794 if (node) {
1795 struct channel_state_sample *sample = caa_container_of(node,
1796 struct channel_state_sample,
1797 channel_state_ht_node);
1798
1799 cds_lfht_del(state->channel_state_ht, node);
83b934ad 1800 call_rcu(&sample->rcu_node, free_channel_state_sample_rcu);
ab0ee2ca
JG
1801 }
1802
1803 /* Remove the channel from the channels_ht and free it. */
1804 cds_lfht_lookup(state->channels_ht,
1805 hash_channel_key(&key),
1806 match_channel_info,
1807 &key,
1808 &iter);
1809 node = cds_lfht_iter_get_node(&iter);
1810 assert(node);
1811 channel_info = caa_container_of(node, struct channel_info,
1812 channels_ht_node);
1813 cds_lfht_del(state->channels_ht, node);
1814 channel_info_destroy(channel_info);
1815end:
1816 rcu_read_unlock();
1817 *cmd_result = LTTNG_OK;
1818 return 0;
1819}
1820
0ca52944 1821static
ed327204 1822int handle_notification_thread_command_session_rotation(
0ca52944 1823 struct notification_thread_state *state,
ed327204
JG
1824 enum notification_thread_command_type cmd_type,
1825 const char *session_name, uid_t session_uid, gid_t session_gid,
1826 uint64_t trace_archive_chunk_id,
1827 struct lttng_trace_archive_location *location,
1828 enum lttng_error_code *_cmd_result)
0ca52944 1829{
ed327204
JG
1830 int ret = 0;
1831 enum lttng_error_code cmd_result = LTTNG_OK;
1832 struct lttng_session_trigger_list *trigger_list;
1833 struct lttng_trigger_list_element *trigger_list_element;
1834 struct session_info *session_info;
f2b3ef9f 1835 const struct lttng_credentials session_creds = {
ff588497
JR
1836 .uid = LTTNG_OPTIONAL_INIT_VALUE(session_uid),
1837 .gid = LTTNG_OPTIONAL_INIT_VALUE(session_gid),
f2b3ef9f 1838 };
0ca52944 1839
ed327204
JG
1840 rcu_read_lock();
1841
1842 session_info = find_or_create_session_info(state, session_name,
1843 session_uid, session_gid);
1844 if (!session_info) {
a9577b76 1845 /* Allocation error or an internal error occurred. */
ed327204
JG
1846 ret = -1;
1847 cmd_result = LTTNG_ERR_NOMEM;
1848 goto end;
1849 }
1850
1851 session_info->rotation.ongoing =
1852 cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
1853 session_info->rotation.id = trace_archive_chunk_id;
1854 trigger_list = get_session_trigger_list(state, session_name);
1855 if (!trigger_list) {
1856 DBG("[notification-thread] No triggers applying to session \"%s\" found",
1857 session_name);
1858 goto end;
1859 }
1860
1861 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
1862 node) {
1863 const struct lttng_condition *condition;
f2b3ef9f 1864 struct lttng_trigger *trigger;
ed327204
JG
1865 struct notification_client_list *client_list;
1866 struct lttng_evaluation *evaluation = NULL;
1867 enum lttng_condition_type condition_type;
f2b3ef9f 1868 enum action_executor_status executor_status;
ed327204
JG
1869
1870 trigger = trigger_list_element->trigger;
1871 condition = lttng_trigger_get_const_condition(trigger);
1872 assert(condition);
1873 condition_type = lttng_condition_get_type(condition);
1874
1875 if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING &&
1876 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1877 continue;
1878 } else if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED &&
1879 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED) {
1880 continue;
1881 }
1882
ed327204 1883 client_list = get_client_list_from_condition(state, condition);
ed327204
JG
1884 if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1885 evaluation = lttng_evaluation_session_rotation_ongoing_create(
1886 trace_archive_chunk_id);
1887 } else {
1888 evaluation = lttng_evaluation_session_rotation_completed_create(
1889 trace_archive_chunk_id, location);
1890 }
1891
1892 if (!evaluation) {
1893 /* Internal error */
1894 ret = -1;
1895 cmd_result = LTTNG_ERR_UNK;
505b2d90 1896 goto put_list;
ed327204
JG
1897 }
1898
f2b3ef9f
JG
1899 /*
1900 * Ownership of `evaluation` transferred to the action executor
1901 * no matter the result.
1902 */
1903 executor_status = action_executor_enqueue(state->executor,
1904 trigger, evaluation, &session_creds,
1905 client_list);
1906 evaluation = NULL;
1907 switch (executor_status) {
1908 case ACTION_EXECUTOR_STATUS_OK:
1909 break;
1910 case ACTION_EXECUTOR_STATUS_ERROR:
1911 case ACTION_EXECUTOR_STATUS_INVALID:
1912 /*
1913 * TODO Add trigger identification (name/id) when
1914 * it is added to the API.
1915 */
1916 ERR("Fatal error occurred while enqueuing action associated with session rotation trigger");
1917 ret = -1;
1918 goto put_list;
1919 case ACTION_EXECUTOR_STATUS_OVERFLOW:
1920 /*
1921 * TODO Add trigger identification (name/id) when
1922 * it is added to the API.
1923 *
1924 * Not a fatal error.
1925 */
1926 WARN("No space left when enqueuing action associated with session rotation trigger");
1927 ret = 0;
1928 goto put_list;
1929 default:
1930 abort();
1931 }
1932
505b2d90
JG
1933put_list:
1934 notification_client_list_put(client_list);
ed327204 1935 if (caa_unlikely(ret)) {
505b2d90 1936 break;
ed327204
JG
1937 }
1938 }
1939end:
1940 session_info_put(session_info);
1941 *_cmd_result = cmd_result;
1942 rcu_read_unlock();
1943 return ret;
0ca52944
JG
1944}
1945
d02d7404
JR
1946static
1947int handle_notification_thread_command_add_tracer_event_source(
1948 struct notification_thread_state *state,
1949 int tracer_event_source_fd,
1950 enum lttng_domain_type domain_type,
1951 enum lttng_error_code *_cmd_result)
1952{
1953 int ret = 0;
1954 enum lttng_error_code cmd_result = LTTNG_OK;
1955 struct notification_event_tracer_event_source_element *element = NULL;
1956
1957 element = zmalloc(sizeof(*element));
1958 if (!element) {
1959 cmd_result = LTTNG_ERR_NOMEM;
1960 ret = -1;
1961 goto end;
1962 }
1963
d02d7404
JR
1964 element->fd = tracer_event_source_fd;
1965 element->domain = domain_type;
1966
1967 cds_list_add(&element->node, &state->tracer_event_sources_list);
1968
1969 DBG3("[notification-thread] Adding tracer event source fd to poll set: tracer_event_source_fd = %d, domain = '%s'",
1970 tracer_event_source_fd,
1971 lttng_domain_type_str(domain_type));
1972
1973 /* Adding the read side pipe to the event poll. */
1974 ret = lttng_poll_add(&state->events, tracer_event_source_fd, LPOLLIN | LPOLLERR);
1975 if (ret < 0) {
1976 ERR("[notification-thread] Failed to add tracer event source to poll set: tracer_event_source_fd = %d, domain = '%s'",
1977 tracer_event_source_fd,
1978 lttng_domain_type_str(element->domain));
1979 cds_list_del(&element->node);
1980 free(element);
1981 goto end;
1982 }
1983
1984 element->is_fd_in_poll_set = true;
1985
1986end:
1987 *_cmd_result = cmd_result;
1988 return ret;
1989}
1990
8b524060
FD
1991static
1992int drain_event_notifier_notification_pipe(
1993 struct notification_thread_state *state,
1994 int pipe, enum lttng_domain_type domain)
1995{
1996 struct lttng_poll_event events = {0};
1997 int ret;
1998
1999 ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC);
2000 if (ret < 0) {
2001 ERR("[notification-thread] Error creating lttng_poll_event");
2002 goto end;
2003 }
2004
2005 ret = lttng_poll_add(&events, pipe, LPOLLIN);
2006 if (ret < 0) {
2007 ERR("[notification-thread] Error adding fd event notifier notification pipe to lttng_poll_event: fd = %d",
2008 pipe);
2009 goto end;
2010 }
2011
2012 while (true) {
2013 /*
2014 * Continue to consume notifications as long as there are new
2015 * ones coming in. The tracer has been asked to stop producing
2016 * them.
2017 *
2018 * LPOLLIN is explicitly checked since LPOLLHUP is implicitly
2019 * monitored (on Linux, at least) and will be returned when
2020 * the pipe is closed but empty.
2021 */
2022 ret = lttng_poll_wait_interruptible(&events, 0);
a17b133b 2023 if (ret == 0 || (LTTNG_POLL_GETEV(&events, 0) & LPOLLIN) == 0) {
8b524060
FD
2024 /* No more notification to be read on this pipe. */
2025 ret = 0;
2026 goto end;
2027 } else if (ret < 0) {
2028 PERROR("Failed on lttng_poll_wait_interruptible() call");
2029 ret = -1;
2030 goto end;
2031 }
2032
2033 ret = handle_one_event_notifier_notification(state, pipe, domain);
2034 if (ret) {
2035 ERR("[notification-thread] Error consuming an event notifier notification from pipe: fd = %d",
2036 pipe);
2037 }
2038 }
2039end:
2040 lttng_poll_clean(&events);
2041 return ret;
2042}
2043
d02d7404
JR
2044static
2045int handle_notification_thread_command_remove_tracer_event_source(
2046 struct notification_thread_state *state,
2047 int tracer_event_source_fd,
2048 enum lttng_error_code *_cmd_result)
2049{
2050 int ret = 0;
2fb5e9b7 2051 bool found = false;
d02d7404
JR
2052 enum lttng_error_code cmd_result = LTTNG_OK;
2053 struct notification_event_tracer_event_source_element *source_element = NULL, *tmp;
2054
2055 cds_list_for_each_entry_safe(source_element, tmp,
2056 &state->tracer_event_sources_list, node) {
2057 if (source_element->fd != tracer_event_source_fd) {
2058 continue;
2059 }
2060
2061 DBG("[notification-thread] Removed tracer event source from poll set: tracer_event_source_fd = %d, domain = '%s'",
2062 tracer_event_source_fd,
2063 lttng_domain_type_str(source_element->domain));
2064 cds_list_del(&source_element->node);
2fb5e9b7 2065 found = true;
d02d7404
JR
2066 break;
2067 }
2068
2fb5e9b7
JG
2069 if (!found) {
2070 /*
2071 * This is temporarily allowed since the poll activity set is
2072 * not properly cleaned-up for the moment. This is adressed in
2073 * an upcoming fix.
2074 */
2075 source_element = NULL;
2076 goto end;
2077 }
d02d7404
JR
2078
2079 if (!source_element->is_fd_in_poll_set) {
2080 /* Skip the poll set removal. */
2081 goto end;
2082 }
2083
2084 DBG3("[notification-thread] Removing tracer event source from poll set: tracer_event_source_fd = %d, domain = '%s'",
2085 tracer_event_source_fd,
2086 lttng_domain_type_str(source_element->domain));
2087
2088 /* Removing the fd from the event poll set. */
2089 ret = lttng_poll_del(&state->events, tracer_event_source_fd);
2090 if (ret < 0) {
2091 ERR("[notification-thread] Failed to remove tracer event source from poll set: tracer_event_source_fd = %d, domain = '%s'",
2092 tracer_event_source_fd,
2093 lttng_domain_type_str(source_element->domain));
2094 cmd_result = LTTNG_ERR_FATAL;
2095 goto end;
2096 }
2097
173900f6
JG
2098 source_element->is_fd_in_poll_set = false;
2099
8b524060
FD
2100 ret = drain_event_notifier_notification_pipe(state, tracer_event_source_fd,
2101 source_element->domain);
2102 if (ret) {
2103 ERR("[notification-thread] Error draining event notifier notification: tracer_event_source_fd = %d, domain = %s",
2104 tracer_event_source_fd,
2105 lttng_domain_type_str(source_element->domain));
2106 cmd_result = LTTNG_ERR_FATAL;
2107 goto end;
2108 }
2109
2110 /*
2111 * The drain_event_notifier_notification_pipe() call might have read
2112 * data from an fd that we received in event in the latest _poll_wait()
2113 * call. Make sure the thread call poll_wait() again to ensure we have
2114 * a clean state.
2115 */
2116 state->restart_poll = true;
2117
d02d7404
JR
2118end:
2119 free(source_element);
2120 *_cmd_result = cmd_result;
2121 return ret;
2122}
2123
2124int handle_notification_thread_remove_tracer_event_source_no_result(
2125 struct notification_thread_state *state,
2126 int tracer_event_source_fd)
2127{
2128 int ret;
2129 enum lttng_error_code cmd_result;
2130
2131 ret = handle_notification_thread_command_remove_tracer_event_source(
2132 state, tracer_event_source_fd, &cmd_result);
2133 (void) cmd_result;
2134 return ret;
2135}
2136
fbc9f37d
JR
2137static int handle_notification_thread_command_list_triggers(
2138 struct notification_thread_handle *handle,
2139 struct notification_thread_state *state,
2140 uid_t client_uid,
2141 struct lttng_triggers **triggers,
2142 enum lttng_error_code *_cmd_result)
2143{
2144 int ret = 0;
2145 enum lttng_error_code cmd_result = LTTNG_OK;
2146 struct cds_lfht_iter iter;
2147 struct lttng_trigger_ht_element *trigger_ht_element;
2148 struct lttng_triggers *local_triggers = NULL;
2149 const struct lttng_credentials *creds;
2150
2151 rcu_read_lock();
2152
2153 local_triggers = lttng_triggers_create();
2154 if (!local_triggers) {
2155 /* Not a fatal error. */
2156 cmd_result = LTTNG_ERR_NOMEM;
2157 goto end;
2158 }
2159
2160 cds_lfht_for_each_entry(state->triggers_ht, &iter,
2161 trigger_ht_element, node) {
2162 /*
2163 * Only return the triggers to which the client has access.
2164 * The root user has visibility over all triggers.
2165 */
2166 creds = lttng_trigger_get_credentials(trigger_ht_element->trigger);
2167 if (client_uid != lttng_credentials_get_uid(creds) && client_uid != 0) {
2168 continue;
2169 }
2170
2171 ret = lttng_triggers_add(local_triggers,
2172 trigger_ht_element->trigger);
2173 if (ret < 0) {
2174 /* Not a fatal error. */
2175 ret = 0;
2176 cmd_result = LTTNG_ERR_NOMEM;
2177 goto end;
2178 }
2179 }
2180
2181 /* Transferring ownership to the caller. */
2182 *triggers = local_triggers;
2183 local_triggers = NULL;
2184
2185end:
2186 rcu_read_unlock();
2187 lttng_triggers_destroy(local_triggers);
2188 *_cmd_result = cmd_result;
2189 return ret;
2190}
2191
1da26331 2192static
959e3c66 2193bool condition_is_supported(struct lttng_condition *condition)
1da26331 2194{
959e3c66 2195 bool is_supported;
1da26331
JG
2196
2197 switch (lttng_condition_get_type(condition)) {
2198 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
2199 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
2200 {
959e3c66 2201 int ret;
1da26331
JG
2202 enum lttng_domain_type domain;
2203
2204 ret = lttng_condition_buffer_usage_get_domain_type(condition,
2205 &domain);
959e3c66 2206 assert(ret == 0);
1da26331
JG
2207
2208 if (domain != LTTNG_DOMAIN_KERNEL) {
959e3c66 2209 is_supported = true;
1da26331
JG
2210 goto end;
2211 }
2212
2213 /*
2214 * Older kernel tracers don't expose the API to monitor their
2215 * buffers. Therefore, we reject triggers that require that
2216 * mechanism to be available to be evaluated.
959e3c66
JR
2217 *
2218 * Assume unsupported on error.
1da26331 2219 */
959e3c66
JR
2220 is_supported = kernel_supports_ring_buffer_snapshot_sample_positions() == 1;
2221 break;
2222 }
2223 case LTTNG_CONDITION_TYPE_EVENT_RULE_HIT:
2224 {
2225 const struct lttng_event_rule *event_rule;
2226 enum lttng_domain_type domain;
2227 const enum lttng_condition_status status =
2228 lttng_condition_event_rule_get_rule(
2229 condition, &event_rule);
2230
2231 assert(status == LTTNG_CONDITION_STATUS_OK);
2232
2233 domain = lttng_event_rule_get_domain_type(event_rule);
2234 if (domain != LTTNG_DOMAIN_KERNEL) {
2235 is_supported = true;
2236 goto end;
2237 }
2238
2239 /*
2240 * Older kernel tracers can't emit notification. Therefore, we
2241 * reject triggers that require that mechanism to be available
2242 * to be evaluated.
2243 *
2244 * Assume unsupported on error.
2245 */
2246 is_supported = kernel_supports_event_notifiers() == 1;
1da26331
JG
2247 break;
2248 }
2249 default:
959e3c66 2250 is_supported = true;
1da26331
JG
2251 }
2252end:
959e3c66 2253 return is_supported;
1da26331
JG
2254}
2255
51eab943
JG
2256/* Must be called with RCU read lock held. */
2257static
f2b3ef9f 2258int bind_trigger_to_matching_session(struct lttng_trigger *trigger,
51eab943
JG
2259 struct notification_thread_state *state)
2260{
2261 int ret = 0;
51eab943
JG
2262 const struct lttng_condition *condition;
2263 const char *session_name;
2264 struct lttng_session_trigger_list *trigger_list;
2265
2266 condition = lttng_trigger_get_const_condition(trigger);
2267 switch (lttng_condition_get_type(condition)) {
2268 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
2269 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
2270 {
2271 enum lttng_condition_status status;
2272
2273 status = lttng_condition_session_rotation_get_session_name(
2274 condition, &session_name);
2275 if (status != LTTNG_CONDITION_STATUS_OK) {
2276 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
2277 ret = -1;
2278 goto end;
2279 }
2280 break;
2281 }
2282 default:
2283 ret = -1;
2284 goto end;
2285 }
2286
ed327204
JG
2287 trigger_list = get_session_trigger_list(state, session_name);
2288 if (!trigger_list) {
51eab943
JG
2289 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
2290 session_name);
2291 goto end;
51eab943 2292
ed327204 2293 }
51eab943
JG
2294
2295 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
2296 session_name);
2297 ret = lttng_session_trigger_list_add(trigger_list, trigger);
2298end:
2299 return ret;
2300}
2301
2302/* Must be called with RCU read lock held. */
2303static
f2b3ef9f 2304int bind_trigger_to_matching_channels(struct lttng_trigger *trigger,
51eab943
JG
2305 struct notification_thread_state *state)
2306{
2307 int ret = 0;
2308 struct cds_lfht_node *node;
2309 struct cds_lfht_iter iter;
2310 struct channel_info *channel;
2311
2312 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
2313 channels_ht_node) {
2314 struct lttng_trigger_list_element *trigger_list_element;
2315 struct lttng_channel_trigger_list *trigger_list;
a5d64ae7 2316 struct cds_lfht_iter lookup_iter;
51eab943
JG
2317
2318 if (!trigger_applies_to_channel(trigger, channel)) {
2319 continue;
2320 }
2321
2322 cds_lfht_lookup(state->channel_triggers_ht,
2323 hash_channel_key(&channel->key),
2324 match_channel_trigger_list,
2325 &channel->key,
a5d64ae7
MD
2326 &lookup_iter);
2327 node = cds_lfht_iter_get_node(&lookup_iter);
51eab943
JG
2328 assert(node);
2329 trigger_list = caa_container_of(node,
2330 struct lttng_channel_trigger_list,
2331 channel_triggers_ht_node);
2332
2333 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
2334 if (!trigger_list_element) {
2335 ret = -1;
2336 goto end;
2337 }
2338 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
2339 trigger_list_element->trigger = trigger;
2340 cds_list_add(&trigger_list_element->node, &trigger_list->list);
2341 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
2342 channel->name);
2343 }
2344end:
2345 return ret;
2346}
2347
f2b3ef9f
JG
2348static
2349bool is_trigger_action_notify(const struct lttng_trigger *trigger)
2350{
2351 bool is_notify = false;
2352 unsigned int i, count;
2353 enum lttng_action_status action_status;
2354 const struct lttng_action *action =
2355 lttng_trigger_get_const_action(trigger);
2356 enum lttng_action_type action_type;
2357
2358 assert(action);
17182cfd 2359 action_type = lttng_action_get_type(action);
f2b3ef9f
JG
2360 if (action_type == LTTNG_ACTION_TYPE_NOTIFY) {
2361 is_notify = true;
2362 goto end;
2363 } else if (action_type != LTTNG_ACTION_TYPE_GROUP) {
2364 goto end;
2365 }
2366
2367 action_status = lttng_action_group_get_count(action, &count);
2368 assert(action_status == LTTNG_ACTION_STATUS_OK);
2369
2370 for (i = 0; i < count; i++) {
2371 const struct lttng_action *inner_action =
2372 lttng_action_group_get_at_index(
2373 action, i);
2374
17182cfd 2375 action_type = lttng_action_get_type(inner_action);
f2b3ef9f
JG
2376 if (action_type == LTTNG_ACTION_TYPE_NOTIFY) {
2377 is_notify = true;
2378 goto end;
2379 }
2380 }
2381
2382end:
2383 return is_notify;
2384}
2385
242388e4
JR
2386static bool trigger_name_taken(struct notification_thread_state *state,
2387 const struct lttng_trigger *trigger)
2388{
2389 struct cds_lfht_iter iter;
2390
2391 /*
2392 * No duplicata is allowed in the triggers_by_name_uid_ht.
2393 * The match is done against the trigger name and uid.
2394 */
2395 cds_lfht_lookup(state->triggers_by_name_uid_ht,
2396 hash_trigger_by_name_uid(trigger),
2397 match_trigger_by_name_uid,
2398 trigger,
2399 &iter);
2400 return !!cds_lfht_iter_get_node(&iter);
2401}
2402
2403static
2404enum lttng_error_code generate_trigger_name(
2405 struct notification_thread_state *state,
2406 struct lttng_trigger *trigger, const char **name)
2407{
2408 enum lttng_error_code ret_code = LTTNG_OK;
2409 bool taken = false;
2410 enum lttng_trigger_status status;
2411
2412 do {
2413 const int ret = lttng_trigger_generate_name(trigger,
2414 state->trigger_id.name_offset++);
2415 if (ret) {
2416 /* The only reason this can fail right now. */
2417 ret_code = LTTNG_ERR_NOMEM;
2418 break;
2419 }
2420
2421 status = lttng_trigger_get_name(trigger, name);
2422 assert(status == LTTNG_TRIGGER_STATUS_OK);
2423
2424 taken = trigger_name_taken(state, trigger);
2425 } while (taken || state->trigger_id.name_offset == UINT64_MAX);
2426
2427 return ret_code;
2428}
2429
ab0ee2ca 2430/*
f2b3ef9f 2431 * FIXME A client's credentials are not checked when registering a trigger.
ab0ee2ca 2432 *
1da26331 2433 * The effects of this are benign since:
ab0ee2ca 2434 * - The client will succeed in registering the trigger, as it is valid,
51eab943 2435 * - The trigger will, internally, be bound to the channel/session,
ab0ee2ca
JG
2436 * - The notifications will not be sent since the client's credentials
2437 * are checked against the channel at that moment.
1da26331
JG
2438 *
2439 * If this function returns a non-zero value, it means something is
50ca7858 2440 * fundamentally broken and the whole subsystem/thread will be torn down.
1da26331
JG
2441 *
2442 * If a non-fatal error occurs, just set the cmd_result to the appropriate
2443 * error code.
ab0ee2ca
JG
2444 */
2445static
2446int handle_notification_thread_command_register_trigger(
8abe313a
JG
2447 struct notification_thread_state *state,
2448 struct lttng_trigger *trigger,
2449 enum lttng_error_code *cmd_result)
ab0ee2ca
JG
2450{
2451 int ret = 0;
2452 struct lttng_condition *condition;
2453 struct notification_client *client;
2454 struct notification_client_list *client_list = NULL;
2455 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
f2b3ef9f 2456 struct notification_client_list_element *client_list_element;
e7c93cf9 2457 struct notification_trigger_tokens_ht_element *trigger_tokens_ht_element = NULL;
ab0ee2ca
JG
2458 struct cds_lfht_node *node;
2459 struct cds_lfht_iter iter;
242388e4 2460 const char* trigger_name;
ab0ee2ca 2461 bool free_trigger = true;
f2b3ef9f
JG
2462 struct lttng_evaluation *evaluation = NULL;
2463 struct lttng_credentials object_creds;
ff588497
JR
2464 uid_t object_uid;
2465 gid_t object_gid;
f2b3ef9f 2466 enum action_executor_status executor_status;
e6887944
JR
2467 const uint64_t trigger_tracer_token =
2468 state->trigger_id.next_tracer_token++;
ab0ee2ca
JG
2469
2470 rcu_read_lock();
2471
e6887944
JR
2472 /* Set the trigger's tracer token. */
2473 lttng_trigger_set_tracer_token(trigger, trigger_tracer_token);
2474
242388e4
JR
2475 if (lttng_trigger_get_name(trigger, &trigger_name) ==
2476 LTTNG_TRIGGER_STATUS_UNSET) {
2477 const enum lttng_error_code ret_code = generate_trigger_name(
2478 state, trigger, &trigger_name);
2479
2480 if (ret_code != LTTNG_OK) {
2481 /* Fatal error. */
2482 ret = -1;
2483 *cmd_result = ret_code;
2484 goto error;
2485 }
2486 } else if (trigger_name_taken(state, trigger)) {
2487 /* Not a fatal error. */
2488 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2489 ret = 0;
2490 goto error;
2491 }
2492
ab0ee2ca 2493 condition = lttng_trigger_get_condition(trigger);
1da26331
JG
2494 assert(condition);
2495
959e3c66
JR
2496 /* Some conditions require tracers to implement a minimal ABI version. */
2497 if (!condition_is_supported(condition)) {
1da26331
JG
2498 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
2499 goto error;
1da26331
JG
2500 }
2501
ab0ee2ca
JG
2502 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
2503 if (!trigger_ht_element) {
2504 ret = -1;
2505 goto error;
2506 }
2507
2508 /* Add trigger to the trigger_ht. */
2509 cds_lfht_node_init(&trigger_ht_element->node);
242388e4 2510 cds_lfht_node_init(&trigger_ht_element->node_by_name_uid);
ab0ee2ca
JG
2511 trigger_ht_element->trigger = trigger;
2512
2513 node = cds_lfht_add_unique(state->triggers_ht,
2514 lttng_condition_hash(condition),
242388e4
JR
2515 match_trigger,
2516 trigger,
ab0ee2ca
JG
2517 &trigger_ht_element->node);
2518 if (node != &trigger_ht_element->node) {
2519 /* Not a fatal error, simply report it to the client. */
2520 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2521 goto error_free_ht_element;
2522 }
2523
242388e4
JR
2524 node = cds_lfht_add_unique(state->triggers_by_name_uid_ht,
2525 hash_trigger_by_name_uid(trigger),
2526 match_trigger_by_name_uid,
2527 trigger,
2528 &trigger_ht_element->node_by_name_uid);
2529 if (node != &trigger_ht_element->node_by_name_uid) {
2530 /* Not a fatal error, simply report it to the client. */
2531 cds_lfht_del(state->triggers_ht, &trigger_ht_element->node);
2532 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2533 goto error_free_ht_element;
2534 }
2535
e7c93cf9
JR
2536 if (lttng_condition_get_type(condition) == LTTNG_CONDITION_TYPE_EVENT_RULE_HIT) {
2537 trigger_tokens_ht_element = zmalloc(sizeof(*trigger_tokens_ht_element));
2538 if (!trigger_tokens_ht_element) {
2539 /* Fatal error. */
2540 ret = -1;
2541 cds_lfht_del(state->triggers_ht,
2542 &trigger_ht_element->node);
2543 cds_lfht_del(state->triggers_by_name_uid_ht,
2544 &trigger_ht_element->node_by_name_uid);
2545 goto error_free_ht_element;
2546 }
2547
2548 /* Add trigger token to the trigger_tokens_ht. */
2549 cds_lfht_node_init(&trigger_tokens_ht_element->node);
2550 trigger_tokens_ht_element->token =
2551 LTTNG_OPTIONAL_GET(trigger->tracer_token);
2552 trigger_tokens_ht_element->trigger = trigger;
2553
2554 node = cds_lfht_add_unique(state->trigger_tokens_ht,
2555 hash_key_u64(&trigger_tokens_ht_element->token,
2556 lttng_ht_seed),
2557 match_trigger_token,
2558 &trigger_tokens_ht_element->token,
2559 &trigger_tokens_ht_element->node);
2560 if (node != &trigger_tokens_ht_element->node) {
2561 /* Internal corruption, fatal error. */
2562 ret = -1;
2563 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2564 cds_lfht_del(state->triggers_ht,
2565 &trigger_ht_element->node);
2566 cds_lfht_del(state->triggers_by_name_uid_ht,
2567 &trigger_ht_element->node_by_name_uid);
2568 goto error_free_ht_element;
2569 }
2570 }
2571
ab0ee2ca
JG
2572 /*
2573 * Ownership of the trigger and of its wrapper was transfered to
e7c93cf9 2574 * the triggers_ht. Same for token ht element if necessary.
ab0ee2ca 2575 */
e7c93cf9 2576 trigger_tokens_ht_element = NULL;
ab0ee2ca
JG
2577 trigger_ht_element = NULL;
2578 free_trigger = false;
2579
2580 /*
2581 * The rest only applies to triggers that have a "notify" action.
2582 * It is not skipped as this is the only action type currently
2583 * supported.
2584 */
f2b3ef9f
JG
2585 if (is_trigger_action_notify(trigger)) {
2586 client_list = notification_client_list_create(trigger);
2587 if (!client_list) {
2588 ret = -1;
2589 goto error_free_ht_element;
ab0ee2ca
JG
2590 }
2591
f2b3ef9f
JG
2592 /* Build a list of clients to which this new trigger applies. */
2593 cds_lfht_for_each_entry (state->client_socket_ht, &iter, client,
2594 client_socket_ht_node) {
2595 if (!trigger_applies_to_client(trigger, client)) {
2596 continue;
2597 }
2598
2599 client_list_element =
2600 zmalloc(sizeof(*client_list_element));
2601 if (!client_list_element) {
2602 ret = -1;
2603 goto error_put_client_list;
2604 }
2605
2606 CDS_INIT_LIST_HEAD(&client_list_element->node);
2607 client_list_element->client = client;
2608 cds_list_add(&client_list_element->node,
2609 &client_list->list);
ab0ee2ca 2610 }
f2b3ef9f
JG
2611
2612 /*
2613 * Client list ownership transferred to the
2614 * notification_trigger_clients_ht.
2615 */
2616 publish_notification_client_list(state, client_list);
ab0ee2ca
JG
2617 }
2618
f82f93a1 2619 switch (get_condition_binding_object(condition)) {
51eab943
JG
2620 case LTTNG_OBJECT_TYPE_SESSION:
2621 /* Add the trigger to the list if it matches a known session. */
2622 ret = bind_trigger_to_matching_session(trigger, state);
2623 if (ret) {
505b2d90 2624 goto error_put_client_list;
ab0ee2ca 2625 }
f82f93a1 2626 break;
51eab943
JG
2627 case LTTNG_OBJECT_TYPE_CHANNEL:
2628 /*
2629 * Add the trigger to list of triggers bound to the channels
2630 * currently known.
2631 */
2632 ret = bind_trigger_to_matching_channels(trigger, state);
2633 if (ret) {
505b2d90 2634 goto error_put_client_list;
ab0ee2ca 2635 }
51eab943
JG
2636 break;
2637 case LTTNG_OBJECT_TYPE_NONE:
2638 break;
2639 default:
f2b3ef9f 2640 ERR("Unknown object type on which to bind a newly registered trigger was encountered");
51eab943 2641 ret = -1;
505b2d90 2642 goto error_put_client_list;
ab0ee2ca
JG
2643 }
2644
2ae99f0b 2645 /*
f2b3ef9f
JG
2646 * The new trigger's condition must be evaluated against the current
2647 * state.
2648 *
2649 * In the case of `notify` action, nothing preventing clients from
2650 * subscribing to a condition before the corresponding trigger is
2651 * registered, we have to evaluate this new condition right away.
2ae99f0b
JG
2652 *
2653 * At some point, we were waiting for the next "evaluation" (e.g. on
2654 * reception of a channel sample) to evaluate this new condition, but
2655 * that was broken.
2656 *
2657 * The reason it was broken is that waiting for the next sample
2658 * does not allow us to properly handle transitions for edge-triggered
2659 * conditions.
2660 *
2661 * Consider this example: when we handle a new channel sample, we
2662 * evaluate each conditions twice: once with the previous state, and
2663 * again with the newest state. We then use those two results to
2664 * determine whether a state change happened: a condition was false and
2665 * became true. If a state change happened, we have to notify clients.
2666 *
2667 * Now, if a client subscribes to a given notification and registers
2668 * a trigger *after* that subscription, we have to make sure the
2669 * condition is evaluated at this point while considering only the
2670 * current state. Otherwise, the next evaluation cycle may only see
2671 * that the evaluations remain the same (true for samples n-1 and n) and
2672 * the client will never know that the condition has been met.
2673 */
f2b3ef9f
JG
2674 switch (get_condition_binding_object(condition)) {
2675 case LTTNG_OBJECT_TYPE_SESSION:
2676 ret = evaluate_session_condition_for_client(condition, state,
ff588497
JR
2677 &evaluation, &object_uid,
2678 &object_gid);
bc8daafb
JG
2679 LTTNG_OPTIONAL_SET(&object_creds.uid, object_uid);
2680 LTTNG_OPTIONAL_SET(&object_creds.gid, object_gid);
b42ada90 2681 break;
f2b3ef9f
JG
2682 case LTTNG_OBJECT_TYPE_CHANNEL:
2683 ret = evaluate_channel_condition_for_client(condition, state,
ff588497
JR
2684 &evaluation, &object_uid,
2685 &object_gid);
bc8daafb
JG
2686 LTTNG_OPTIONAL_SET(&object_creds.uid, object_uid);
2687 LTTNG_OPTIONAL_SET(&object_creds.gid, object_gid);
f2b3ef9f
JG
2688 break;
2689 case LTTNG_OBJECT_TYPE_NONE:
2690 ret = 0;
242388e4 2691 break;
f2b3ef9f
JG
2692 case LTTNG_OBJECT_TYPE_UNKNOWN:
2693 default:
2694 ret = -1;
242388e4 2695 break;
f2b3ef9f
JG
2696 }
2697
2698 if (ret) {
2699 /* Fatal error. */
2700 goto error_put_client_list;
2701 }
2702
2703 DBG("Newly registered trigger's condition evaluated to %s",
2704 evaluation ? "true" : "false");
2705 if (!evaluation) {
2706 /* Evaluation yielded nothing. Normal exit. */
2707 ret = 0;
242388e4 2708 goto end;
2ae99f0b
JG
2709 }
2710
2711 /*
f2b3ef9f
JG
2712 * Ownership of `evaluation` transferred to the action executor
2713 * no matter the result.
2ae99f0b 2714 */
f2b3ef9f
JG
2715 executor_status = action_executor_enqueue(state->executor, trigger,
2716 evaluation, &object_creds, client_list);
2717 evaluation = NULL;
2718 switch (executor_status) {
2719 case ACTION_EXECUTOR_STATUS_OK:
2720 break;
2721 case ACTION_EXECUTOR_STATUS_ERROR:
2722 case ACTION_EXECUTOR_STATUS_INVALID:
2723 /*
2724 * TODO Add trigger identification (name/id) when
2725 * it is added to the API.
2726 */
2727 ERR("Fatal error occurred while enqueuing action associated to newly registered trigger");
2728 ret = -1;
2729 goto error_put_client_list;
2730 case ACTION_EXECUTOR_STATUS_OVERFLOW:
2731 /*
2732 * TODO Add trigger identification (name/id) when
2733 * it is added to the API.
2734 *
2735 * Not a fatal error.
2736 */
2737 WARN("No space left when enqueuing action associated to newly registered trigger");
2738 ret = 0;
242388e4 2739 goto end;
f2b3ef9f
JG
2740 default:
2741 abort();
2742 }
2ae99f0b 2743
242388e4 2744end:
ab0ee2ca 2745 *cmd_result = LTTNG_OK;
e6887944
JR
2746 DBG("Registered trigger: name = `%s`, tracer token = %" PRIu64,
2747 trigger_name, trigger_tracer_token);
505b2d90
JG
2748
2749error_put_client_list:
2750 notification_client_list_put(client_list);
2751
ab0ee2ca 2752error_free_ht_element:
242388e4
JR
2753 if (trigger_ht_element) {
2754 /* Delayed removal due to RCU constraint on delete. */
2755 call_rcu(&trigger_ht_element->rcu_node,
2756 free_lttng_trigger_ht_element_rcu);
2757 }
2758
e7c93cf9 2759 free(trigger_tokens_ht_element);
ab0ee2ca
JG
2760error:
2761 if (free_trigger) {
ab0ee2ca
JG
2762 lttng_trigger_destroy(trigger);
2763 }
2764 rcu_read_unlock();
2765 return ret;
2766}
2767
83b934ad
MD
2768static
2769void free_lttng_trigger_ht_element_rcu(struct rcu_head *node)
2770{
2771 free(caa_container_of(node, struct lttng_trigger_ht_element,
2772 rcu_node));
2773}
2774
e7c93cf9
JR
2775static
2776void free_notification_trigger_tokens_ht_element_rcu(struct rcu_head *node)
2777{
2778 free(caa_container_of(node, struct notification_trigger_tokens_ht_element,
2779 rcu_node));
2780}
2781
cc2295b5 2782static
ab0ee2ca
JG
2783int handle_notification_thread_command_unregister_trigger(
2784 struct notification_thread_state *state,
ac16173e 2785 const struct lttng_trigger *trigger,
ab0ee2ca
JG
2786 enum lttng_error_code *_cmd_reply)
2787{
2788 struct cds_lfht_iter iter;
ed327204 2789 struct cds_lfht_node *triggers_ht_node;
ab0ee2ca
JG
2790 struct lttng_channel_trigger_list *trigger_list;
2791 struct notification_client_list *client_list;
ab0ee2ca 2792 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
ac16173e 2793 const struct lttng_condition *condition = lttng_trigger_get_const_condition(
ab0ee2ca 2794 trigger);
ab0ee2ca
JG
2795 enum lttng_error_code cmd_reply;
2796
2797 rcu_read_lock();
2798
2799 cds_lfht_lookup(state->triggers_ht,
2800 lttng_condition_hash(condition),
242388e4
JR
2801 match_trigger,
2802 trigger,
ab0ee2ca
JG
2803 &iter);
2804 triggers_ht_node = cds_lfht_iter_get_node(&iter);
2805 if (!triggers_ht_node) {
2806 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
2807 goto end;
2808 } else {
2809 cmd_reply = LTTNG_OK;
2810 }
2811
2812 /* Remove trigger from channel_triggers_ht. */
2813 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
2814 channel_triggers_ht_node) {
2815 struct lttng_trigger_list_element *trigger_element, *tmp;
2816
2817 cds_list_for_each_entry_safe(trigger_element, tmp,
2818 &trigger_list->list, node) {
e8f7fa11 2819 if (!lttng_trigger_is_equal(trigger, trigger_element->trigger)) {
ab0ee2ca
JG
2820 continue;
2821 }
2822
2823 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
2824 cds_list_del(&trigger_element->node);
e4db5ace
JR
2825 /* A trigger can only appear once per channel */
2826 break;
ab0ee2ca
JG
2827 }
2828 }
2829
e7c93cf9
JR
2830 if (lttng_condition_get_type(condition) ==
2831 LTTNG_CONDITION_TYPE_EVENT_RULE_HIT) {
2832 struct notification_trigger_tokens_ht_element
2833 *trigger_tokens_ht_element;
2834
2835 cds_lfht_for_each_entry (state->trigger_tokens_ht, &iter,
2836 trigger_tokens_ht_element, node) {
2837 if (!lttng_trigger_is_equal(trigger,
2838 trigger_tokens_ht_element->trigger)) {
2839 continue;
2840 }
2841
2842 DBG("[notification-thread] Removed trigger from tokens_ht");
2843 cds_lfht_del(state->trigger_tokens_ht,
2844 &trigger_tokens_ht_element->node);
2845 call_rcu(&trigger_tokens_ht_element->rcu_node,
2846 free_notification_trigger_tokens_ht_element_rcu);
2847
2848 break;
2849 }
2850 }
2851
51367634
JR
2852 if (is_trigger_action_notify(trigger)) {
2853 /*
2854 * Remove and release the client list from
2855 * notification_trigger_clients_ht.
2856 */
2857 client_list = get_client_list_from_condition(state, condition);
2858 assert(client_list);
ed327204 2859
51367634
JR
2860 /* Put new reference and the hashtable's reference. */
2861 notification_client_list_put(client_list);
2862 notification_client_list_put(client_list);
2863 client_list = NULL;
2864 }
ab0ee2ca
JG
2865
2866 /* Remove trigger from triggers_ht. */
2867 trigger_ht_element = caa_container_of(triggers_ht_node,
2868 struct lttng_trigger_ht_element, node);
242388e4 2869 cds_lfht_del(state->triggers_by_name_uid_ht, &trigger_ht_element->node_by_name_uid);
ab0ee2ca
JG
2870 cds_lfht_del(state->triggers_ht, triggers_ht_node);
2871
7ca172c1 2872 /* Release the ownership of the trigger. */
ab0ee2ca 2873 lttng_trigger_destroy(trigger_ht_element->trigger);
83b934ad 2874 call_rcu(&trigger_ht_element->rcu_node, free_lttng_trigger_ht_element_rcu);
ab0ee2ca
JG
2875end:
2876 rcu_read_unlock();
2877 if (_cmd_reply) {
2878 *_cmd_reply = cmd_reply;
2879 }
2880 return 0;
2881}
2882
2883/* Returns 0 on success, 1 on exit requested, negative value on error. */
2884int handle_notification_thread_command(
2885 struct notification_thread_handle *handle,
2886 struct notification_thread_state *state)
2887{
2888 int ret;
2889 uint64_t counter;
2890 struct notification_thread_command *cmd;
2891
8abe313a 2892 /* Read the event pipe to put it back into a quiescent state. */
18aeca05 2893 ret = lttng_read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
8abe313a 2894 sizeof(counter));
18aeca05 2895 if (ret != sizeof(counter)) {
ab0ee2ca
JG
2896 goto error;
2897 }
2898
2899 pthread_mutex_lock(&handle->cmd_queue.lock);
2900 cmd = cds_list_first_entry(&handle->cmd_queue.list,
2901 struct notification_thread_command, cmd_list_node);
97a71ea6
JR
2902 cds_list_del(&cmd->cmd_list_node);
2903 pthread_mutex_unlock(&handle->cmd_queue.lock);
6900ae81
FD
2904
2905 DBG("[notification-thread] Received `%s` command",
2906 notification_command_type_str(cmd->type));
ab0ee2ca
JG
2907 switch (cmd->type) {
2908 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
ac16173e
JG
2909 ret = handle_notification_thread_command_register_trigger(state,
2910 cmd->parameters.register_trigger.trigger,
ab0ee2ca
JG
2911 &cmd->reply_code);
2912 break;
2913 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
ab0ee2ca 2914 ret = handle_notification_thread_command_unregister_trigger(
ac16173e
JG
2915 state,
2916 cmd->parameters.unregister_trigger.trigger,
ab0ee2ca
JG
2917 &cmd->reply_code);
2918 break;
2919 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
ab0ee2ca 2920 ret = handle_notification_thread_command_add_channel(
8abe313a
JG
2921 state,
2922 cmd->parameters.add_channel.session.name,
2923 cmd->parameters.add_channel.session.uid,
2924 cmd->parameters.add_channel.session.gid,
2925 cmd->parameters.add_channel.channel.name,
2926 cmd->parameters.add_channel.channel.domain,
2927 cmd->parameters.add_channel.channel.key,
2928 cmd->parameters.add_channel.channel.capacity,
ab0ee2ca
JG
2929 &cmd->reply_code);
2930 break;
2931 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
ab0ee2ca
JG
2932 ret = handle_notification_thread_command_remove_channel(
2933 state, cmd->parameters.remove_channel.key,
2934 cmd->parameters.remove_channel.domain,
2935 &cmd->reply_code);
2936 break;
0ca52944 2937 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
0ca52944 2938 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
ed327204 2939 ret = handle_notification_thread_command_session_rotation(
0ca52944 2940 state,
ed327204
JG
2941 cmd->type,
2942 cmd->parameters.session_rotation.session_name,
2943 cmd->parameters.session_rotation.uid,
2944 cmd->parameters.session_rotation.gid,
2945 cmd->parameters.session_rotation.trace_archive_chunk_id,
2946 cmd->parameters.session_rotation.location,
0ca52944
JG
2947 &cmd->reply_code);
2948 break;
d02d7404
JR
2949 case NOTIFICATION_COMMAND_TYPE_ADD_TRACER_EVENT_SOURCE:
2950 ret = handle_notification_thread_command_add_tracer_event_source(
2951 state,
2952 cmd->parameters.tracer_event_source.tracer_event_source_fd,
2953 cmd->parameters.tracer_event_source.domain,
2954 &cmd->reply_code);
2955 break;
2956 case NOTIFICATION_COMMAND_TYPE_REMOVE_TRACER_EVENT_SOURCE:
2957 ret = handle_notification_thread_command_remove_tracer_event_source(
2958 state,
2959 cmd->parameters.tracer_event_source.tracer_event_source_fd,
2960 &cmd->reply_code);
2961 break;
fbc9f37d
JR
2962 case NOTIFICATION_COMMAND_TYPE_LIST_TRIGGERS:
2963 {
2964 struct lttng_triggers *triggers = NULL;
2965
2966 ret = handle_notification_thread_command_list_triggers(
2967 handle,
2968 state,
2969 cmd->parameters.list_triggers.uid,
2970 &triggers,
2971 &cmd->reply_code);
2972 cmd->reply.list_triggers.triggers = triggers;
2973 ret = 0;
2974 break;
2975 }
ab0ee2ca 2976 case NOTIFICATION_COMMAND_TYPE_QUIT:
ab0ee2ca
JG
2977 cmd->reply_code = LTTNG_OK;
2978 ret = 1;
2979 goto end;
f2b3ef9f
JG
2980 case NOTIFICATION_COMMAND_TYPE_CLIENT_COMMUNICATION_UPDATE:
2981 {
2982 const enum client_transmission_status client_status =
2983 cmd->parameters.client_communication_update
2984 .status;
2985 const notification_client_id client_id =
2986 cmd->parameters.client_communication_update.id;
2987 struct notification_client *client;
2988
2989 rcu_read_lock();
2990 client = get_client_from_id(client_id, state);
2991
2992 if (!client) {
2993 /*
2994 * Client error was probably already picked-up by the
2995 * notification thread or it has disconnected
2996 * gracefully while this command was queued.
2997 */
2998 DBG("Failed to find notification client to update communication status, client id = %" PRIu64,
2999 client_id);
3000 ret = 0;
3001 } else {
f2b3ef9f
JG
3002 ret = client_handle_transmission_status(
3003 client, client_status, state);
f2b3ef9f
JG
3004 }
3005 rcu_read_unlock();
3006 break;
3007 }
ab0ee2ca
JG
3008 default:
3009 ERR("[notification-thread] Unknown internal command received");
3010 goto error_unlock;
3011 }
3012
3013 if (ret) {
3014 goto error_unlock;
3015 }
3016end:
0ab399e0
JG
3017 if (cmd->is_async) {
3018 free(cmd);
3019 cmd = NULL;
3020 } else {
3021 lttng_waiter_wake_up(&cmd->reply_waiter);
3022 }
ab0ee2ca
JG
3023 return ret;
3024error_unlock:
3025 /* Wake-up and return a fatal error to the calling thread. */
8ada111f 3026 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
3027 cmd->reply_code = LTTNG_ERR_FATAL;
3028error:
3029 /* Indicate a fatal error to the caller. */
3030 return -1;
3031}
3032
ab0ee2ca
JG
3033static
3034int socket_set_non_blocking(int socket)
3035{
3036 int ret, flags;
3037
3038 /* Set the pipe as non-blocking. */
3039 ret = fcntl(socket, F_GETFL, 0);
3040 if (ret == -1) {
3041 PERROR("fcntl get socket flags");
3042 goto end;
3043 }
3044 flags = ret;
3045
3046 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
3047 if (ret == -1) {
3048 PERROR("fcntl set O_NONBLOCK socket flag");
3049 goto end;
3050 }
3051 DBG("Client socket (fd = %i) set as non-blocking", socket);
3052end:
3053 return ret;
3054}
3055
3056static
14fa22f8 3057int client_reset_inbound_state(struct notification_client *client)
ab0ee2ca
JG
3058{
3059 int ret;
3060
882093ee
JR
3061
3062 lttng_payload_clear(&client->communication.inbound.payload);
ab0ee2ca
JG
3063
3064 client->communication.inbound.bytes_to_receive =
3065 sizeof(struct lttng_notification_channel_message);
3066 client->communication.inbound.msg_type =
3067 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
ab0ee2ca
JG
3068 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
3069 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
14fa22f8 3070 ret = lttng_dynamic_buffer_set_size(
882093ee 3071 &client->communication.inbound.payload.buffer,
14fa22f8 3072 client->communication.inbound.bytes_to_receive);
882093ee 3073
14fa22f8 3074 return ret;
ab0ee2ca
JG
3075}
3076
3077int handle_notification_thread_client_connect(
3078 struct notification_thread_state *state)
3079{
3080 int ret;
3081 struct notification_client *client;
3082
3083 DBG("[notification-thread] Handling new notification channel client connection");
3084
3085 client = zmalloc(sizeof(*client));
3086 if (!client) {
3087 /* Fatal error. */
3088 ret = -1;
3089 goto error;
3090 }
6c24d3fd 3091
505b2d90 3092 pthread_mutex_init(&client->lock, NULL);
ac1889bf 3093 client->id = state->next_notification_client_id++;
ab0ee2ca 3094 CDS_INIT_LIST_HEAD(&client->condition_list);
882093ee
JR
3095 lttng_payload_init(&client->communication.inbound.payload);
3096 lttng_payload_init(&client->communication.outbound.payload);
01ea340e 3097 client->communication.inbound.expect_creds = true;
505b2d90 3098
14fa22f8
JG
3099 ret = client_reset_inbound_state(client);
3100 if (ret) {
3101 ERR("[notification-thread] Failed to reset client communication's inbound state");
e742b055 3102 ret = 0;
14fa22f8
JG
3103 goto error;
3104 }
ab0ee2ca
JG
3105
3106 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
3107 if (ret < 0) {
3108 ERR("[notification-thread] Failed to accept new notification channel client connection");
3109 ret = 0;
3110 goto error;
3111 }
3112
3113 client->socket = ret;
3114
3115 ret = socket_set_non_blocking(client->socket);
3116 if (ret) {
3117 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
3118 goto error;
3119 }
3120
3121 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
3122 if (ret < 0) {
3123 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
3124 ret = 0;
3125 goto error;
3126 }
3127
3128 ret = lttng_poll_add(&state->events, client->socket,
3129 LPOLLIN | LPOLLERR |
3130 LPOLLHUP | LPOLLRDHUP);
3131 if (ret < 0) {
3132 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
3133 ret = 0;
3134 goto error;
3135 }
3136 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
3137 client->socket);
3138
ab0ee2ca
JG
3139 rcu_read_lock();
3140 cds_lfht_add(state->client_socket_ht,
3141 hash_client_socket(client->socket),
3142 &client->client_socket_ht_node);
ac1889bf
JG
3143 cds_lfht_add(state->client_id_ht,
3144 hash_client_id(client->id),
3145 &client->client_id_ht_node);
ab0ee2ca
JG
3146 rcu_read_unlock();
3147
3148 return ret;
6c24d3fd 3149
ab0ee2ca
JG
3150error:
3151 notification_client_destroy(client, state);
3152 return ret;
3153}
3154
6c24d3fd
JG
3155/*
3156 * RCU read-lock must be held by the caller.
3157 * Client lock must _not_ be held by the caller.
3158 */
505b2d90
JG
3159static
3160int notification_thread_client_disconnect(
3161 struct notification_client *client,
ab0ee2ca 3162 struct notification_thread_state *state)
505b2d90
JG
3163{
3164 int ret;
3165 struct lttng_condition_list_element *condition_list_element, *tmp;
3166
3167 /* Acquire the client lock to disable its communication atomically. */
6c24d3fd 3168 pthread_mutex_lock(&client->lock);
505b2d90 3169 client->communication.active = false;
6c24d3fd
JG
3170 cds_lfht_del(state->client_socket_ht, &client->client_socket_ht_node);
3171 cds_lfht_del(state->client_id_ht, &client->client_id_ht_node);
3172 pthread_mutex_unlock(&client->lock);
3173
505b2d90
JG
3174 ret = lttng_poll_del(&state->events, client->socket);
3175 if (ret) {
3176 ERR("[notification-thread] Failed to remove client socket %d from poll set",
3177 client->socket);
3178 }
3179
505b2d90
JG
3180 /* Release all conditions to which the client was subscribed. */
3181 cds_list_for_each_entry_safe(condition_list_element, tmp,
3182 &client->condition_list, node) {
3183 (void) notification_thread_client_unsubscribe(client,
3184 condition_list_element->condition, state, NULL);
3185 }
3186
3187 /*
3188 * Client no longer accessible to other threads (through the
3189 * client lists).
3190 */
3191 notification_client_destroy(client, state);
3192 return ret;
3193}
3194
3195int handle_notification_thread_client_disconnect(
3196 int client_socket, struct notification_thread_state *state)
ab0ee2ca
JG
3197{
3198 int ret = 0;
3199 struct notification_client *client;
3200
3201 rcu_read_lock();
3202 DBG("[notification-thread] Closing client connection (socket fd = %i)",
3203 client_socket);
3204 client = get_client_from_socket(client_socket, state);
3205 if (!client) {
3206 /* Internal state corruption, fatal error. */
3207 ERR("[notification-thread] Unable to find client (socket fd = %i)",
3208 client_socket);
3209 ret = -1;
3210 goto end;
3211 }
3212
505b2d90 3213 ret = notification_thread_client_disconnect(client, state);
ab0ee2ca
JG
3214end:
3215 rcu_read_unlock();
3216 return ret;
3217}
3218
3219int handle_notification_thread_client_disconnect_all(
3220 struct notification_thread_state *state)
3221{
3222 struct cds_lfht_iter iter;
3223 struct notification_client *client;
3224 bool error_encoutered = false;
3225
3226 rcu_read_lock();
3227 DBG("[notification-thread] Closing all client connections");
3228 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
505b2d90 3229 client_socket_ht_node) {
ab0ee2ca
JG
3230 int ret;
3231
505b2d90
JG
3232 ret = notification_thread_client_disconnect(
3233 client, state);
ab0ee2ca
JG
3234 if (ret) {
3235 error_encoutered = true;
3236 }
3237 }
3238 rcu_read_unlock();
3239 return error_encoutered ? 1 : 0;
3240}
3241
3242int handle_notification_thread_trigger_unregister_all(
3243 struct notification_thread_state *state)
3244{
4149ace8 3245 bool error_occurred = false;
ab0ee2ca
JG
3246 struct cds_lfht_iter iter;
3247 struct lttng_trigger_ht_element *trigger_ht_element;
3248
763de384 3249 rcu_read_lock();
ab0ee2ca
JG
3250 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
3251 node) {
3252 int ret = handle_notification_thread_command_unregister_trigger(
3253 state, trigger_ht_element->trigger, NULL);
3254 if (ret) {
4149ace8 3255 error_occurred = true;
ab0ee2ca
JG
3256 }
3257 }
763de384 3258 rcu_read_unlock();
4149ace8 3259 return error_occurred ? -1 : 0;
ab0ee2ca
JG
3260}
3261
dd877ea6
JG
3262static
3263int client_handle_transmission_status(
3264 struct notification_client *client,
3265 enum client_transmission_status transmission_status,
3266 struct notification_thread_state *state)
3267{
3268 int ret = 0;
3269
3270 switch (transmission_status) {
3271 case CLIENT_TRANSMISSION_STATUS_COMPLETE:
3272 ret = lttng_poll_mod(&state->events, client->socket,
3273 CLIENT_POLL_MASK_IN);
3274 if (ret) {
3275 goto end;
3276 }
3277
dd877ea6
JG
3278 break;
3279 case CLIENT_TRANSMISSION_STATUS_QUEUED:
3280 /*
3281 * We want to be notified whenever there is buffer space
3282 * available to send the rest of the payload.
3283 */
3284 ret = lttng_poll_mod(&state->events, client->socket,
3285 CLIENT_POLL_MASK_IN_OUT);
3286 if (ret) {
3287 goto end;
3288 }
3289 break;
3290 case CLIENT_TRANSMISSION_STATUS_FAIL:
3291 ret = notification_thread_client_disconnect(client, state);
3292 if (ret) {
3293 goto end;
3294 }
3295 break;
3296 case CLIENT_TRANSMISSION_STATUS_ERROR:
3297 ret = -1;
3298 goto end;
3299 default:
3300 abort();
3301 }
3302end:
3303 return ret;
3304}
3305
505b2d90 3306/* Client lock must be acquired by caller. */
ab0ee2ca 3307static
dd877ea6 3308enum client_transmission_status client_flush_outgoing_queue(
f2b3ef9f 3309 struct notification_client *client)
ab0ee2ca
JG
3310{
3311 ssize_t ret;
3312 size_t to_send_count;
dd877ea6 3313 enum client_transmission_status status;
882093ee
JR
3314 struct lttng_payload_view pv = lttng_payload_view_from_payload(
3315 &client->communication.outbound.payload, 0, -1);
3316 const int fds_to_send_count =
3317 lttng_payload_view_get_fd_handle_count(&pv);
ab0ee2ca 3318
505b2d90
JG
3319 ASSERT_LOCKED(client->lock);
3320
f2b3ef9f
JG
3321 if (!client->communication.active) {
3322 status = CLIENT_TRANSMISSION_STATUS_FAIL;
3323 goto end;
3324 }
3325
882093ee
JR
3326 if (pv.buffer.size == 0) {
3327 /*
3328 * If both data and fds are equal to zero, we are in an invalid
3329 * state.
3330 */
3331 assert(fds_to_send_count != 0);
3332 goto send_fds;
3333 }
3334
3335 /* Send data. */
3336 to_send_count = pv.buffer.size;
ab0ee2ca
JG
3337 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
3338 client->socket);
3339
3340 ret = lttcomm_send_unix_sock_non_block(client->socket,
882093ee 3341 pv.buffer.data,
ab0ee2ca 3342 to_send_count);
44c180ca 3343 if ((ret >= 0 && ret < to_send_count)) {
ab0ee2ca
JG
3344 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
3345 client->socket);
3346 to_send_count -= max(ret, 0);
3347
6f110534 3348 memmove(client->communication.outbound.payload.buffer.data,
882093ee
JR
3349 pv.buffer.data +
3350 pv.buffer.size - to_send_count,
ab0ee2ca
JG
3351 to_send_count);
3352 ret = lttng_dynamic_buffer_set_size(
882093ee 3353 &client->communication.outbound.payload.buffer,
ab0ee2ca
JG
3354 to_send_count);
3355 if (ret) {
3356 goto error;
3357 }
6c24d3fd 3358
dd877ea6 3359 status = CLIENT_TRANSMISSION_STATUS_QUEUED;
882093ee 3360 goto end;
ab0ee2ca 3361 } else if (ret < 0) {
6c24d3fd 3362 /* Generic error, disable the client's communication. */
dd877ea6 3363 ERR("[notification-thread] Failed to flush outgoing queue, disconnecting client (socket fd = %i)",
ab0ee2ca 3364 client->socket);
6c24d3fd 3365 client->communication.active = false;
dd877ea6 3366 status = CLIENT_TRANSMISSION_STATUS_FAIL;
882093ee 3367 goto end;
ab0ee2ca 3368 } else {
882093ee
JR
3369 /*
3370 * No error and flushed the queue completely.
3371 *
3372 * The payload buffer size is used later to
3373 * check if there is notifications queued. So albeit that the
3374 * direct caller knows that the transmission is complete, we
3375 * need to set the buffer size to zero.
3376 */
ab0ee2ca 3377 ret = lttng_dynamic_buffer_set_size(
882093ee 3378 &client->communication.outbound.payload.buffer, 0);
ab0ee2ca
JG
3379 if (ret) {
3380 goto error;
3381 }
882093ee 3382 }
6c24d3fd 3383
882093ee
JR
3384send_fds:
3385 /* No fds to send, transmission is complete. */
3386 if (fds_to_send_count == 0) {
dd877ea6 3387 status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
882093ee 3388 goto end;
dd877ea6 3389 }
882093ee
JR
3390
3391 ret = lttcomm_send_payload_view_fds_unix_sock_non_block(
3392 client->socket, &pv);
3393 if (ret < 0) {
3394 /* Generic error, disable the client's communication. */
3395 ERR("[notification-thread] Failed to flush outgoing fds queue, disconnecting client (socket fd = %i)",
3396 client->socket);
3397 client->communication.active = false;
3398 status = CLIENT_TRANSMISSION_STATUS_FAIL;
3399 goto end;
3400 } else if (ret == 0) {
3401 /* Nothing could be sent. */
3402 status = CLIENT_TRANSMISSION_STATUS_QUEUED;
3403 } else {
3404 /* Fd passing is an all or nothing kind of thing. */
3405 status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
3406 /*
3407 * The payload _fd_array count is used later to
3408 * check if there is notifications queued. So although the
3409 * direct caller knows that the transmission is complete, we
3410 * need to clear the _fd_array for the queuing check.
3411 */
3412 lttng_dynamic_pointer_array_clear(
3413 &client->communication.outbound.payload
3414 ._fd_handles);
3415 }
3416
f2b3ef9f 3417end:
882093ee
JR
3418 if (status == CLIENT_TRANSMISSION_STATUS_COMPLETE) {
3419 client->communication.outbound.queued_command_reply = false;
3420 client->communication.outbound.dropped_notification = false;
3421 lttng_payload_clear(&client->communication.outbound.payload);
3422 }
3423
f2b3ef9f 3424 return status;
ab0ee2ca 3425error:
f2b3ef9f 3426 return CLIENT_TRANSMISSION_STATUS_ERROR;
ab0ee2ca
JG
3427}
3428
882093ee
JR
3429static
3430bool client_has_outbound_data_left(
3431 const struct notification_client *client)
3432{
3433 const struct lttng_payload_view pv = lttng_payload_view_from_payload(
3434 &client->communication.outbound.payload, 0, -1);
3435 const bool has_data = pv.buffer.size != 0;
3436 const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv);
3437
3438 return has_data || has_fds;
3439}
3440
6c24d3fd 3441/* Client lock must _not_ be held by the caller. */
ab0ee2ca
JG
3442static
3443int client_send_command_reply(struct notification_client *client,
3444 struct notification_thread_state *state,
3445 enum lttng_notification_channel_status status)
3446{
3447 int ret;
3448 struct lttng_notification_channel_command_reply reply = {
3449 .status = (int8_t) status,
3450 };
3451 struct lttng_notification_channel_message msg = {
3452 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
3453 .size = sizeof(reply),
3454 };
3455 char buffer[sizeof(msg) + sizeof(reply)];
f2b3ef9f 3456 enum client_transmission_status transmission_status;
ab0ee2ca 3457
6c24d3fd
JG
3458 memcpy(buffer, &msg, sizeof(msg));
3459 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
3460 DBG("[notification-thread] Send command reply (%i)", (int) status);
505b2d90 3461
6c24d3fd 3462 pthread_mutex_lock(&client->lock);
ab0ee2ca
JG
3463 if (client->communication.outbound.queued_command_reply) {
3464 /* Protocol error. */
6c24d3fd 3465 goto error_unlock;
ab0ee2ca
JG
3466 }
3467
ab0ee2ca
JG
3468 /* Enqueue buffer to outgoing queue and flush it. */
3469 ret = lttng_dynamic_buffer_append(
882093ee 3470 &client->communication.outbound.payload.buffer,
ab0ee2ca
JG
3471 buffer, sizeof(buffer));
3472 if (ret) {
6c24d3fd 3473 goto error_unlock;
ab0ee2ca
JG
3474 }
3475
f2b3ef9f 3476 transmission_status = client_flush_outgoing_queue(client);
882093ee
JR
3477
3478 if (client_has_outbound_data_left(client)) {
6c24d3fd
JG
3479 /* Queue could not be emptied. */
3480 client->communication.outbound.queued_command_reply = true;
3481 }
3482
3483 pthread_mutex_unlock(&client->lock);
f2b3ef9f
JG
3484 ret = client_handle_transmission_status(
3485 client, transmission_status, state);
ab0ee2ca
JG
3486 if (ret) {
3487 goto error;
3488 }
3489
ab0ee2ca 3490 return 0;
6c24d3fd
JG
3491error_unlock:
3492 pthread_mutex_unlock(&client->lock);
ab0ee2ca
JG
3493error:
3494 return -1;
3495}
3496
505b2d90
JG
3497static
3498int client_handle_message_unknown(struct notification_client *client,
3499 struct notification_thread_state *state)
3500{
3501 int ret;
505b2d90
JG
3502 /*
3503 * Receiving message header. The function will be called again
3504 * once the rest of the message as been received and can be
3505 * interpreted.
3506 */
3507 const struct lttng_notification_channel_message *msg;
3508
882093ee 3509 assert(sizeof(*msg) == client->communication.inbound.payload.buffer.size);
505b2d90 3510 msg = (const struct lttng_notification_channel_message *)
882093ee 3511 client->communication.inbound.payload.buffer.data;
505b2d90
JG
3512
3513 if (msg->size == 0 ||
3514 msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
3515 ERR("[notification-thread] Invalid notification channel message: length = %u",
3516 msg->size);
3517 ret = -1;
3518 goto end;
3519 }
3520
3521 switch (msg->type) {
3522 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
3523 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
3524 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
3525 break;
3526 default:
3527 ret = -1;
3528 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
3529 goto end;
3530 }
3531
3532 client->communication.inbound.bytes_to_receive = msg->size;
882093ee 3533 client->communication.inbound.fds_to_receive = msg->fds;
505b2d90
JG
3534 client->communication.inbound.msg_type =
3535 (enum lttng_notification_channel_message_type) msg->type;
3536 ret = lttng_dynamic_buffer_set_size(
882093ee
JR
3537 &client->communication.inbound.payload.buffer, msg->size);
3538
3539 /* msg is not valid anymore due to lttng_dynamic_buffer_set_size. */
3540 msg = NULL;
505b2d90 3541end:
505b2d90
JG
3542 return ret;
3543}
3544
3545static
3546int client_handle_message_handshake(struct notification_client *client,
3547 struct notification_thread_state *state)
3548{
3549 int ret;
3550 struct lttng_notification_channel_command_handshake *handshake_client;
3551 const struct lttng_notification_channel_command_handshake handshake_reply = {
3552 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
3553 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
3554 };
3555 const struct lttng_notification_channel_message msg_header = {
3556 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
3557 .size = sizeof(handshake_reply),
3558 };
3559 enum lttng_notification_channel_status status =
3560 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
3561 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
505b2d90
JG
3562
3563 memcpy(send_buffer, &msg_header, sizeof(msg_header));
3564 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
3565 sizeof(handshake_reply));
3566
3567 handshake_client =
3568 (struct lttng_notification_channel_command_handshake *)
882093ee 3569 client->communication.inbound.payload.buffer
505b2d90
JG
3570 .data;
3571 client->major = handshake_client->major;
3572 client->minor = handshake_client->minor;
3573 if (!client->communication.inbound.creds_received) {
3574 ERR("[notification-thread] No credentials received from client");
3575 ret = -1;
3576 goto end;
3577 }
3578
3579 client->uid = LTTNG_SOCK_GET_UID_CRED(
3580 &client->communication.inbound.creds);
3581 client->gid = LTTNG_SOCK_GET_GID_CRED(
3582 &client->communication.inbound.creds);
3583 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
3584 client->uid, client->gid, (int) client->major,
3585 (int) client->minor);
3586
3587 if (handshake_client->major !=
3588 LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
3589 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
3590 }
3591
6c24d3fd
JG
3592 pthread_mutex_lock(&client->lock);
3593 /* Outgoing queue will be flushed when the command reply is sent. */
505b2d90 3594 ret = lttng_dynamic_buffer_append(
882093ee 3595 &client->communication.outbound.payload.buffer, send_buffer,
505b2d90
JG
3596 sizeof(send_buffer));
3597 if (ret) {
3598 ERR("[notification-thread] Failed to send protocol version to notification channel client");
6c24d3fd 3599 goto end_unlock;
505b2d90
JG
3600 }
3601
3602 client->validated = true;
3603 client->communication.active = true;
6c24d3fd 3604 pthread_mutex_unlock(&client->lock);
505b2d90 3605
6c24d3fd
JG
3606 /* Set reception state to receive the next message header. */
3607 ret = client_reset_inbound_state(client);
505b2d90 3608 if (ret) {
6c24d3fd 3609 ERR("[notification-thread] Failed to reset client communication's inbound state");
505b2d90
JG
3610 goto end;
3611 }
3612
6c24d3fd 3613 /* Flushes the outgoing queue. */
505b2d90
JG
3614 ret = client_send_command_reply(client, state, status);
3615 if (ret) {
3616 ERR("[notification-thread] Failed to send reply to notification channel client");
3617 goto end;
3618 }
3619
6c24d3fd
JG
3620 goto end;
3621end_unlock:
505b2d90 3622 pthread_mutex_unlock(&client->lock);
6c24d3fd 3623end:
505b2d90
JG
3624 return ret;
3625}
3626
3627static
3628int client_handle_message_subscription(
3629 struct notification_client *client,
3630 enum lttng_notification_channel_message_type msg_type,
3631 struct notification_thread_state *state)
3632{
3633 int ret;
3634 struct lttng_condition *condition;
3635 enum lttng_notification_channel_status status =
3636 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
3637 struct lttng_payload_view condition_view =
882093ee
JR
3638 lttng_payload_view_from_payload(
3639 &client->communication.inbound.payload,
505b2d90
JG
3640 0, -1);
3641 size_t expected_condition_size;
3642
6c24d3fd
JG
3643 /*
3644 * No need to lock client to sample the inbound state as the only
3645 * other thread accessing clients (action executor) only uses the
3646 * outbound state.
3647 */
882093ee 3648 expected_condition_size = client->communication.inbound.payload.buffer.size;
505b2d90
JG
3649 ret = lttng_condition_create_from_payload(&condition_view, &condition);
3650 if (ret != expected_condition_size) {
3651 ERR("[notification-thread] Malformed condition received from client");
3652 goto end;
3653 }
3654
3655 if (msg_type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
3656 ret = notification_thread_client_subscribe(
3657 client, condition, state, &status);
3658 } else {
3659 ret = notification_thread_client_unsubscribe(
3660 client, condition, state, &status);
3661 }
505b2d90 3662
505b2d90 3663 if (ret) {
6c24d3fd 3664 goto end;
505b2d90
JG
3665 }
3666
3667 /* Set reception state to receive the next message header. */
3668 ret = client_reset_inbound_state(client);
3669 if (ret) {
3670 ERR("[notification-thread] Failed to reset client communication's inbound state");
6c24d3fd
JG
3671 goto end;
3672 }
3673
3674 ret = client_send_command_reply(client, state, status);
3675 if (ret) {
3676 ERR("[notification-thread] Failed to send reply to notification channel client");
3677 goto end;
505b2d90
JG
3678 }
3679
505b2d90
JG
3680end:
3681 return ret;
3682}
3683
ab0ee2ca
JG
3684static
3685int client_dispatch_message(struct notification_client *client,
3686 struct notification_thread_state *state)
3687{
3688 int ret = 0;
3689
3690 if (client->communication.inbound.msg_type !=
3691 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
3692 client->communication.inbound.msg_type !=
3693 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
3694 !client->validated) {
3695 WARN("[notification-thread] client attempted a command before handshake");
3696 ret = -1;
3697 goto end;
3698 }
3699
3700 switch (client->communication.inbound.msg_type) {
3701 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
3702 {
505b2d90 3703 ret = client_handle_message_unknown(client, state);
ab0ee2ca
JG
3704 break;
3705 }
3706 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
3707 {
505b2d90 3708 ret = client_handle_message_handshake(client, state);
ab0ee2ca
JG
3709 break;
3710 }
3711 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
3712 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
3713 {
505b2d90
JG
3714 ret = client_handle_message_subscription(client,
3715 client->communication.inbound.msg_type, state);
ab0ee2ca
JG
3716 break;
3717 }
3718 default:
3719 abort();
3720 }
3721end:
3722 return ret;
3723}
3724
3725/* Incoming data from client. */
3726int handle_notification_thread_client_in(
3727 struct notification_thread_state *state, int socket)
3728{
14fa22f8 3729 int ret = 0;
ab0ee2ca
JG
3730 struct notification_client *client;
3731 ssize_t recv_ret;
3732 size_t offset;
3733
98b5ff34 3734 rcu_read_lock();
ab0ee2ca
JG
3735 client = get_client_from_socket(socket, state);
3736 if (!client) {
3737 /* Internal error, abort. */
3738 ret = -1;
3739 goto end;
3740 }
3741
882093ee 3742 offset = client->communication.inbound.payload.buffer.size -
14fa22f8 3743 client->communication.inbound.bytes_to_receive;
01ea340e 3744 if (client->communication.inbound.expect_creds) {
ab0ee2ca 3745 recv_ret = lttcomm_recv_creds_unix_sock(socket,
882093ee 3746 client->communication.inbound.payload.buffer.data + offset,
ab0ee2ca
JG
3747 client->communication.inbound.bytes_to_receive,
3748 &client->communication.inbound.creds);
3749 if (recv_ret > 0) {
01ea340e 3750 client->communication.inbound.expect_creds = false;
ab0ee2ca
JG
3751 client->communication.inbound.creds_received = true;
3752 }
3753 } else {
3754 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
882093ee 3755 client->communication.inbound.payload.buffer.data + offset,
ab0ee2ca
JG
3756 client->communication.inbound.bytes_to_receive);
3757 }
505b2d90
JG
3758 if (recv_ret >= 0) {
3759 client->communication.inbound.bytes_to_receive -= recv_ret;
882093ee
JR
3760 } else {
3761 goto error_disconnect_client;
505b2d90 3762 }
6c24d3fd 3763
882093ee
JR
3764 if (client->communication.inbound.bytes_to_receive != 0) {
3765 /* Message incomplete wait for more data. */
3766 ret = 0;
3767 goto end;
ab0ee2ca
JG
3768 }
3769
882093ee
JR
3770 assert(client->communication.inbound.bytes_to_receive == 0);
3771
3772 /* Receive fds. */
3773 if (client->communication.inbound.fds_to_receive != 0) {
3774 ret = lttcomm_recv_payload_fds_unix_sock_non_block(
3775 client->socket,
3776 client->communication.inbound.fds_to_receive,
3777 &client->communication.inbound.payload);
3778 if (ret > 0) {
ab0ee2ca 3779 /*
882093ee
JR
3780 * Fds received. non blocking fds passing is all
3781 * or nothing.
ab0ee2ca 3782 */
882093ee
JR
3783 ssize_t expected_size;
3784
3785 expected_size = sizeof(int) *
3786 client->communication.inbound
3787 .fds_to_receive;
3788 assert(ret == expected_size);
3789 client->communication.inbound.fds_to_receive = 0;
3790 } else if (ret == 0) {
3791 /* Received nothing. */
3792 ret = 0;
3793 goto end;
3794 } else {
ab0ee2ca
JG
3795 goto error_disconnect_client;
3796 }
ab0ee2ca 3797 }
882093ee
JR
3798
3799 /* At this point the message is complete.*/
3800 assert(client->communication.inbound.bytes_to_receive == 0 &&
3801 client->communication.inbound.fds_to_receive == 0);
3802 ret = client_dispatch_message(client, state);
3803 if (ret) {
3804 /*
3805 * Only returns an error if this client must be
3806 * disconnected.
3807 */
3808 goto error_disconnect_client;
3809 }
3810
ab0ee2ca 3811end:
98b5ff34 3812 rcu_read_unlock();
ab0ee2ca 3813 return ret;
882093ee 3814
ab0ee2ca 3815error_disconnect_client:
505b2d90 3816 ret = notification_thread_client_disconnect(client, state);
98b5ff34 3817 goto end;
ab0ee2ca
JG
3818}
3819
3820/* Client ready to receive outgoing data. */
3821int handle_notification_thread_client_out(
3822 struct notification_thread_state *state, int socket)
3823{
3824 int ret;
3825 struct notification_client *client;
f2b3ef9f 3826 enum client_transmission_status transmission_status;
ab0ee2ca 3827
98b5ff34 3828 rcu_read_lock();
ab0ee2ca
JG
3829 client = get_client_from_socket(socket, state);
3830 if (!client) {
3831 /* Internal error, abort. */
3832 ret = -1;
3833 goto end;
3834 }
3835
505b2d90 3836 pthread_mutex_lock(&client->lock);
f2b3ef9f 3837 transmission_status = client_flush_outgoing_queue(client);
6c24d3fd
JG
3838 pthread_mutex_unlock(&client->lock);
3839
f2b3ef9f
JG
3840 ret = client_handle_transmission_status(
3841 client, transmission_status, state);
ab0ee2ca
JG
3842 if (ret) {
3843 goto end;
3844 }
3845end:
98b5ff34 3846 rcu_read_unlock();
ab0ee2ca
JG
3847 return ret;
3848}
3849
3850static
e8360425
JD
3851bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
3852 const struct channel_state_sample *sample,
3853 uint64_t buffer_capacity)
ab0ee2ca
JG
3854{
3855 bool result = false;
3856 uint64_t threshold;
3857 enum lttng_condition_type condition_type;
e8360425 3858 const struct lttng_condition_buffer_usage *use_condition = container_of(
ab0ee2ca
JG
3859 condition, struct lttng_condition_buffer_usage,
3860 parent);
3861
ab0ee2ca
JG
3862 if (use_condition->threshold_bytes.set) {
3863 threshold = use_condition->threshold_bytes.value;
3864 } else {
3865 /*
3866 * Threshold was expressed as a ratio.
3867 *
3868 * TODO the threshold (in bytes) of conditions expressed
3869 * as a ratio of total buffer size could be cached to
3870 * forego this double-multiplication or it could be performed
3871 * as fixed-point math.
3872 *
d49487dc 3873 * Note that caching should accommodates the case where the
ab0ee2ca
JG
3874 * condition applies to multiple channels (i.e. don't assume
3875 * that all channels matching my_chann* have the same size...)
3876 */
3877 threshold = (uint64_t) (use_condition->threshold_ratio.value *
3878 (double) buffer_capacity);
3879 }
3880
3881 condition_type = lttng_condition_get_type(condition);
3882 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
3883 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3884 threshold, sample->highest_usage);
3885
3886 /*
3887 * The low condition should only be triggered once _all_ of the
3888 * streams in a channel have gone below the "low" threshold.
3889 */
3890 if (sample->highest_usage <= threshold) {
3891 result = true;
3892 }
3893 } else {
3894 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3895 threshold, sample->highest_usage);
3896
3897 /*
3898 * For high buffer usage scenarios, we want to trigger whenever
3899 * _any_ of the streams has reached the "high" threshold.
3900 */
3901 if (sample->highest_usage >= threshold) {
3902 result = true;
3903 }
3904 }
e8360425 3905
ab0ee2ca
JG
3906 return result;
3907}
3908
3909static
e8360425
JD
3910bool evaluate_session_consumed_size_condition(
3911 const struct lttng_condition *condition,
3912 uint64_t session_consumed_size)
3913{
3914 uint64_t threshold;
3915 const struct lttng_condition_session_consumed_size *size_condition =
3916 container_of(condition,
3917 struct lttng_condition_session_consumed_size,
3918 parent);
3919
3920 threshold = size_condition->consumed_threshold_bytes.value;
3921 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
3922 threshold, session_consumed_size);
3923 return session_consumed_size >= threshold;
3924}
3925
3926static
ed327204 3927int evaluate_buffer_condition(const struct lttng_condition *condition,
ab0ee2ca 3928 struct lttng_evaluation **evaluation,
e8360425
JD
3929 const struct notification_thread_state *state,
3930 const struct channel_state_sample *previous_sample,
3931 const struct channel_state_sample *latest_sample,
3932 uint64_t previous_session_consumed_total,
3933 uint64_t latest_session_consumed_total,
3934 struct channel_info *channel_info)
ab0ee2ca
JG
3935{
3936 int ret = 0;
3937 enum lttng_condition_type condition_type;
e8360425
JD
3938 const bool previous_sample_available = !!previous_sample;
3939 bool previous_sample_result = false;
ab0ee2ca
JG
3940 bool latest_sample_result;
3941
3942 condition_type = lttng_condition_get_type(condition);
ab0ee2ca 3943
e8360425
JD
3944 switch (condition_type) {
3945 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3946 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3947 if (caa_likely(previous_sample_available)) {
3948 previous_sample_result =
3949 evaluate_buffer_usage_condition(condition,
3950 previous_sample, channel_info->capacity);
3951 }
3952 latest_sample_result = evaluate_buffer_usage_condition(
3953 condition, latest_sample,
3954 channel_info->capacity);
3955 break;
3956 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3957 if (caa_likely(previous_sample_available)) {
3958 previous_sample_result =
3959 evaluate_session_consumed_size_condition(
3960 condition,
3961 previous_session_consumed_total);
3962 }
3963 latest_sample_result =
3964 evaluate_session_consumed_size_condition(
3965 condition,
3966 latest_session_consumed_total);
3967 break;
3968 default:
3969 /* Unknown condition type; internal error. */
3970 abort();
3971 }
ab0ee2ca
JG
3972
3973 if (!latest_sample_result ||
3974 (previous_sample_result == latest_sample_result)) {
3975 /*
3976 * Only trigger on a condition evaluation transition.
3977 *
3978 * NOTE: This edge-triggered logic may not be appropriate for
3979 * future condition types.
3980 */
3981 goto end;
3982 }
3983
e8360425
JD
3984 if (!evaluation || !latest_sample_result) {
3985 goto end;
3986 }
3987
3988 switch (condition_type) {
3989 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3990 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
ab0ee2ca
JG
3991 *evaluation = lttng_evaluation_buffer_usage_create(
3992 condition_type,
3993 latest_sample->highest_usage,
e8360425
JD
3994 channel_info->capacity);
3995 break;
3996 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3997 *evaluation = lttng_evaluation_session_consumed_size_create(
e8360425
JD
3998 latest_session_consumed_total);
3999 break;
4000 default:
4001 abort();
4002 }
4003
4004 if (!*evaluation) {
4005 ret = -1;
4006 goto end;
ab0ee2ca
JG
4007 }
4008end:
4009 return ret;
4010}
4011
4012static
f2b3ef9f 4013int client_notification_overflow(struct notification_client *client)
ab0ee2ca 4014{
f2b3ef9f
JG
4015 int ret = 0;
4016 const struct lttng_notification_channel_message msg = {
ab0ee2ca 4017 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
ab0ee2ca
JG
4018 };
4019
505b2d90
JG
4020 ASSERT_LOCKED(client->lock);
4021
f2b3ef9f
JG
4022 DBG("Dropping notification addressed to client (socket fd = %i)",
4023 client->socket);
4024 if (client->communication.outbound.dropped_notification) {
4025 /*
4026 * The client already has a "notification dropped" message
4027 * in its outgoing queue. Nothing to do since all
4028 * of those messages are coalesced.
4029 */
4030 goto end;
4031 }
4032
4033 client->communication.outbound.dropped_notification = true;
ab0ee2ca 4034 ret = lttng_dynamic_buffer_append(
882093ee 4035 &client->communication.outbound.payload.buffer, &msg,
ab0ee2ca 4036 sizeof(msg));
f2b3ef9f
JG
4037 if (ret) {
4038 PERROR("Failed to enqueue \"dropped notification\" message in client's (socket fd = %i) outgoing queue",
4039 client->socket);
4040 }
4041end:
ab0ee2ca
JG
4042 return ret;
4043}
4044
f2b3ef9f
JG
4045static int client_handle_transmission_status_wrapper(
4046 struct notification_client *client,
4047 enum client_transmission_status status,
4048 void *user_data)
4049{
4050 return client_handle_transmission_status(client, status,
4051 (struct notification_thread_state *) user_data);
4052}
4053
4054static
4055int send_evaluation_to_clients(const struct lttng_trigger *trigger,
4056 const struct lttng_evaluation *evaluation,
4057 struct notification_client_list* client_list,
4058 struct notification_thread_state *state,
4059 uid_t object_uid, gid_t object_gid)
4060{
ff588497
JR
4061 const struct lttng_credentials creds = {
4062 .uid = LTTNG_OPTIONAL_INIT_VALUE(object_uid),
4063 .gid = LTTNG_OPTIONAL_INIT_VALUE(object_gid),
4064 };
4065
f2b3ef9f
JG
4066 return notification_client_list_send_evaluation(client_list,
4067 lttng_trigger_get_const_condition(trigger), evaluation,
4068 lttng_trigger_get_credentials(trigger),
ff588497 4069 &creds,
f2b3ef9f
JG
4070 client_handle_transmission_status_wrapper, state);
4071}
4072
f37d0f86
JG
4073/*
4074 * Permission checks relative to notification channel clients are performed
4075 * here. Notice how object, client, and trigger credentials are involved in
4076 * this check.
4077 *
4078 * The `object` credentials are the credentials associated with the "subject"
4079 * of a condition. For instance, a `rotation completed` condition applies
4080 * to a session. When that condition is met, it will produce an evaluation
4081 * against a session. Hence, in this case, the `object` credentials are the
4082 * credentials of the "subject" session.
4083 *
4084 * The `trigger` credentials are the credentials of the user that registered the
4085 * trigger.
4086 *
4087 * The `client` credentials are the credentials of the user that created a given
4088 * notification channel.
4089 *
4090 * In terms of visibility, it is expected that non-privilieged users can only
4091 * register triggers against "their" objects (their own sessions and
4092 * applications they are allowed to interact with). They can then open a
4093 * notification channel and subscribe to notifications associated with those
4094 * triggers.
4095 *
4096 * As for privilieged users, they can register triggers against the objects of
4097 * other users. They can then subscribe to the notifications associated to their
4098 * triggers. Privilieged users _can't_ subscribe to the notifications of
4099 * triggers owned by other users; they must create their own triggers.
4100 *
4101 * This is more a concern of usability than security. It would be difficult for
4102 * a root user reliably subscribe to a specific set of conditions without
4103 * interference from external users (those could, for instance, unregister
4104 * their triggers).
4105 */
f2b3ef9f
JG
4106LTTNG_HIDDEN
4107int notification_client_list_send_evaluation(
4108 struct notification_client_list *client_list,
4109 const struct lttng_condition *condition,
9b63a4aa 4110 const struct lttng_evaluation *evaluation,
f2b3ef9f
JG
4111 const struct lttng_credentials *trigger_creds,
4112 const struct lttng_credentials *source_object_creds,
4113 report_client_transmission_result_cb client_report,
4114 void *user_data)
ab0ee2ca
JG
4115{
4116 int ret = 0;
c0a66c84 4117 struct lttng_payload msg_payload;
ab0ee2ca 4118 struct notification_client_list_element *client_list_element, *tmp;
9b63a4aa 4119 const struct lttng_notification notification = {
f2b3ef9f 4120 .condition = (struct lttng_condition *) condition,
9b63a4aa
JG
4121 .evaluation = (struct lttng_evaluation *) evaluation,
4122 };
3647288f
JG
4123 struct lttng_notification_channel_message msg_header = {
4124 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
4125 };
ab0ee2ca 4126
c0a66c84 4127 lttng_payload_init(&msg_payload);
ab0ee2ca 4128
c0a66c84 4129 ret = lttng_dynamic_buffer_append(&msg_payload.buffer, &msg_header,
3647288f 4130 sizeof(msg_header));
ab0ee2ca
JG
4131 if (ret) {
4132 goto end;
4133 }
4134
c0a66c84 4135 ret = lttng_notification_serialize(&notification, &msg_payload);
ab0ee2ca 4136 if (ret) {
ab0ee2ca
JG
4137 ERR("[notification-thread] Failed to serialize notification");
4138 ret = -1;
4139 goto end;
4140 }
4141
3647288f 4142 /* Update payload size. */
505b2d90
JG
4143 ((struct lttng_notification_channel_message *) msg_payload.buffer.data)
4144 ->size = (uint32_t)(
4145 msg_payload.buffer.size - sizeof(msg_header));
3647288f 4146
882093ee
JR
4147 /* Update the payload number of fds. */
4148 {
4149 const struct lttng_payload_view pv = lttng_payload_view_from_payload(
4150 &msg_payload, 0, -1);
4151
4152 ((struct lttng_notification_channel_message *)
4153 msg_payload.buffer.data)->fds = (uint32_t)
4154 lttng_payload_view_get_fd_handle_count(&pv);
4155 }
4156
505b2d90 4157 pthread_mutex_lock(&client_list->lock);
ab0ee2ca
JG
4158 cds_list_for_each_entry_safe(client_list_element, tmp,
4159 &client_list->list, node) {
f2b3ef9f 4160 enum client_transmission_status transmission_status;
ab0ee2ca
JG
4161 struct notification_client *client =
4162 client_list_element->client;
4163
505b2d90
JG
4164 ret = 0;
4165 pthread_mutex_lock(&client->lock);
6c24d3fd
JG
4166 if (!client->communication.active) {
4167 /*
4168 * Skip inactive client (protocol error or
4169 * disconnecting).
4170 */
4171 DBG("Skipping client at it is marked as inactive");
4172 goto skip_client;
4173 }
4174
f2b3ef9f 4175 if (source_object_creds) {
ff588497
JR
4176 if (client->uid != lttng_credentials_get_uid(source_object_creds) &&
4177 client->gid != lttng_credentials_get_gid(source_object_creds) &&
f2b3ef9f
JG
4178 client->uid != 0) {
4179 /*
4180 * Client is not allowed to monitor this
4181 * object.
4182 */
4183 DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
6c24d3fd 4184 goto skip_client;
f2b3ef9f 4185 }
90843f49
JR
4186 }
4187
ff588497 4188 if (client->uid != lttng_credentials_get_uid(trigger_creds) && client->gid != lttng_credentials_get_gid(trigger_creds)) {
90843f49 4189 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this trigger");
6c24d3fd 4190 goto skip_client;
ab0ee2ca
JG
4191 }
4192
4193 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
c0a66c84 4194 client->socket, msg_payload.buffer.size);
882093ee
JR
4195
4196 if (client_has_outbound_data_left(client)) {
ab0ee2ca
JG
4197 /*
4198 * Outgoing data is already buffered for this client;
4199 * drop the notification and enqueue a "dropped
4200 * notification" message if this is the first dropped
4201 * notification since the socket spilled-over to the
4202 * queue.
4203 */
f2b3ef9f
JG
4204 ret = client_notification_overflow(client);
4205 if (ret) {
6c24d3fd
JG
4206 /* Fatal error. */
4207 goto skip_client;
ab0ee2ca 4208 }
ab0ee2ca
JG
4209 }
4210
882093ee 4211 ret = lttng_payload_copy(&msg_payload, &client->communication.outbound.payload);
ab0ee2ca 4212 if (ret) {
6c24d3fd
JG
4213 /* Fatal error. */
4214 goto skip_client;
ab0ee2ca
JG
4215 }
4216
f2b3ef9f 4217 transmission_status = client_flush_outgoing_queue(client);
6c24d3fd 4218 pthread_mutex_unlock(&client->lock);
f2b3ef9f 4219 ret = client_report(client, transmission_status, user_data);
ab0ee2ca 4220 if (ret) {
6c24d3fd
JG
4221 /* Fatal error. */
4222 goto end_unlock_list;
505b2d90 4223 }
6c24d3fd
JG
4224
4225 continue;
4226
4227skip_client:
505b2d90
JG
4228 pthread_mutex_unlock(&client->lock);
4229 if (ret) {
6c24d3fd 4230 /* Fatal error. */
505b2d90 4231 goto end_unlock_list;
ab0ee2ca
JG
4232 }
4233 }
4234 ret = 0;
505b2d90
JG
4235
4236end_unlock_list:
4237 pthread_mutex_unlock(&client_list->lock);
ab0ee2ca 4238end:
c0a66c84 4239 lttng_payload_reset(&msg_payload);
ab0ee2ca
JG
4240 return ret;
4241}
4242
8b524060
FD
4243static
4244struct lttng_event_notifier_notification *recv_one_event_notifier_notification(
b9a8d78f 4245 int notification_pipe_read_fd, enum lttng_domain_type domain)
94078603
JR
4246{
4247 int ret;
b9a8d78f
FD
4248 uint64_t token;
4249 struct lttng_event_notifier_notification *notification = NULL;
82b3cbf4
JR
4250 char *capture_buffer = NULL;
4251 size_t capture_buffer_size;
94078603
JR
4252 void *reception_buffer;
4253 size_t reception_size;
4254
fc4b93fa 4255 struct lttng_ust_abi_event_notifier_notification ust_notification;
b9a8d78f 4256 struct lttng_kernel_event_notifier_notification kernel_notification;
94078603 4257
b9a8d78f 4258 /* Init lttng_event_notifier_notification */
94078603
JR
4259 switch(domain) {
4260 case LTTNG_DOMAIN_UST:
4261 reception_buffer = (void *) &ust_notification;
4262 reception_size = sizeof(ust_notification);
94078603
JR
4263 break;
4264 case LTTNG_DOMAIN_KERNEL:
4265 reception_buffer = (void *) &kernel_notification;
4266 reception_size = sizeof(kernel_notification);
94078603
JR
4267 break;
4268 default:
4269 abort();
4270 }
4271
4272 /*
4273 * The monitoring pipe only holds messages smaller than PIPE_BUF,
4274 * ensuring that read/write of tracer notifications are atomic.
4275 */
4276 ret = lttng_read(notification_pipe_read_fd, reception_buffer,
4277 reception_size);
4278 if (ret != reception_size) {
4279 PERROR("Failed to read from event source notification pipe: fd = %d, size to read = %zu, ret = %d",
4280 notification_pipe_read_fd, reception_size, ret);
4281 ret = -1;
4282 goto end;
4283 }
4284
4285 switch(domain) {
4286 case LTTNG_DOMAIN_UST:
b9a8d78f 4287 token = ust_notification.token;
82b3cbf4 4288 capture_buffer_size = ust_notification.capture_buf_size;
94078603
JR
4289 break;
4290 case LTTNG_DOMAIN_KERNEL:
b9a8d78f 4291 token = kernel_notification.token;
82b3cbf4 4292 capture_buffer_size = 0;
94078603
JR
4293 break;
4294 default:
4295 abort();
4296 }
4297
82b3cbf4
JR
4298 if (capture_buffer_size == 0) {
4299 capture_buffer = NULL;
4300 goto skip_capture;
4301 }
4302
4303 if (capture_buffer_size > MAX_CAPTURE_SIZE) {
4304 ERR("[notification-thread] Event notifier has a capture payload size which exceeds the maximum allowed size: capture_payload_size = %zu bytes, max allowed size = %d bytes",
4305 capture_buffer_size, MAX_CAPTURE_SIZE);
4306 goto end;
4307 }
4308
4309 capture_buffer = zmalloc(capture_buffer_size);
4310 if (!capture_buffer) {
4311 ERR("[notification-thread] Failed to allocate capture buffer");
4312 goto end;
4313 }
4314
4315 /* Fetch additional payload (capture). */
4316 ret = lttng_read(notification_pipe_read_fd, capture_buffer, capture_buffer_size);
4317 if (ret != capture_buffer_size) {
4318 ERR("[notification-thread] Failed to read from event source pipe (fd = %i)",
4319 notification_pipe_read_fd);
4320 goto end;
4321 }
4322
4323skip_capture:
4324 notification = lttng_event_notifier_notification_create(token, domain,
4325 capture_buffer, capture_buffer_size);
4326 if (notification == NULL) {
4327 goto end;
4328 }
4329
4330 /*
4331 * Ownership transfered to the lttng_event_notifier_notification object.
4332 */
4333 capture_buffer = NULL;
4334
b9a8d78f 4335end:
82b3cbf4 4336 free(capture_buffer);
b9a8d78f
FD
4337 return notification;
4338}
4339
8b524060
FD
4340static
4341int dispatch_one_event_notifier_notification(struct notification_thread_state *state,
4342 struct lttng_event_notifier_notification *notification)
b9a8d78f 4343{
b9a8d78f
FD
4344 struct cds_lfht_node *node;
4345 struct cds_lfht_iter iter;
4346 struct notification_trigger_tokens_ht_element *element;
8b524060 4347 enum lttng_trigger_status trigger_status;
b9a8d78f 4348 struct lttng_evaluation *evaluation = NULL;
b9a8d78f
FD
4349 enum action_executor_status executor_status;
4350 struct notification_client_list *client_list = NULL;
4351 const char *trigger_name;
8b524060 4352 int ret;
b9a8d78f 4353
94078603
JR
4354 /* Find triggers associated with this token. */
4355 rcu_read_lock();
4356 cds_lfht_lookup(state->trigger_tokens_ht,
b9a8d78f
FD
4357 hash_key_u64(&notification->tracer_token, lttng_ht_seed),
4358 match_trigger_token, &notification->tracer_token, &iter);
94078603 4359 node = cds_lfht_iter_get_node(&iter);
f1446131 4360 if (caa_unlikely(!node)) {
94078603 4361 /*
f1446131
JR
4362 * This is not an error, slow consumption of the tracer
4363 * notifications can lead to situations where a trigger is
4364 * removed but we still get tracer notifications matching a
4365 * trigger that no longer exists.
94078603
JR
4366 */
4367 ret = 0;
4368 goto end_unlock;
4369 }
4370
4371 element = caa_container_of(node,
4372 struct notification_trigger_tokens_ht_element,
4373 node);
4374
f1446131
JR
4375 if (!lttng_trigger_should_fire(element->trigger)) {
4376 ret = 0;
4377 goto end_unlock;
4378 }
94078603 4379
f1446131 4380 lttng_trigger_fire(element->trigger);
94078603 4381
b9a8d78f
FD
4382 trigger_status = lttng_trigger_get_name(element->trigger, &trigger_name);
4383 assert(trigger_status == LTTNG_TRIGGER_STATUS_OK);
4384
4385 evaluation = lttng_evaluation_event_rule_create(
4386 trigger_name);
f1446131 4387 if (evaluation == NULL) {
b9a8d78f 4388 ERR("[notification-thread] Failed to create event rule hit evaluation while creating and enqueuing action executor job");
f1446131
JR
4389 ret = -1;
4390 goto end_unlock;
4391 }
4392
4393 client_list = get_client_list_from_condition(state,
4394 lttng_trigger_get_const_condition(element->trigger));
4395 executor_status = action_executor_enqueue(state->executor,
4396 element->trigger, evaluation, NULL, client_list);
4397 switch (executor_status) {
4398 case ACTION_EXECUTOR_STATUS_OK:
4399 ret = 0;
4400 break;
4401 case ACTION_EXECUTOR_STATUS_OVERFLOW:
4402 {
4403 struct notification_client_list_element *client_list_element,
4404 *tmp;
4405
4406 /*
4407 * Not a fatal error; this is expected and simply means the
4408 * executor has too much work queued already.
4409 */
4410 ret = 0;
4411
4412 /* No clients subscribed to notifications for this trigger. */
4413 if (!client_list) {
4414 break;
4415 }
4416
4417 /* Warn clients that a notification (or more) was dropped. */
4418 pthread_mutex_lock(&client_list->lock);
4419 cds_list_for_each_entry_safe(client_list_element, tmp,
4420 &client_list->list, node) {
4421 enum client_transmission_status transmission_status;
4422 struct notification_client *client =
4423 client_list_element->client;
4424
4425 pthread_mutex_lock(&client->lock);
4426 ret = client_notification_overflow(client);
4427 if (ret) {
4428 /* Fatal error. */
4429 goto next_client;
4430 }
4431
4432 transmission_status =
4433 client_flush_outgoing_queue(client);
4434 ret = client_handle_transmission_status(
4435 client, transmission_status, state);
4436 if (ret) {
4437 /* Fatal error. */
4438 goto next_client;
4439 }
4440next_client:
4441 pthread_mutex_unlock(&client->lock);
4442 if (ret) {
4443 break;
4444 }
4445 }
4446
4447 pthread_mutex_unlock(&client_list->lock);
4448 break;
4449 }
4450 case ACTION_EXECUTOR_STATUS_ERROR:
4451 /* Fatal error, shut down everything. */
4452 ERR("Fatal error encoutered while enqueuing action to the action executor");
4453 ret = -1;
4454 goto end_unlock;
4455 default:
4456 /* Unhandled error. */
4457 abort();
4458 }
94078603
JR
4459
4460end_unlock:
f1446131 4461 notification_client_list_put(client_list);
94078603 4462 rcu_read_unlock();
8b524060
FD
4463 return ret;
4464}
4465
4466static
4467int handle_one_event_notifier_notification(
4468 struct notification_thread_state *state,
4469 int pipe, enum lttng_domain_type domain)
4470{
82b3cbf4 4471 int ret = 0;
8b524060
FD
4472 struct lttng_event_notifier_notification *notification = NULL;
4473
4474 notification = recv_one_event_notifier_notification(pipe, domain);
4475 if (notification == NULL) {
82b3cbf4 4476 /* Reception failed, don't consider it fatal. */
8b524060
FD
4477 ERR("[notification-thread] Error receiving an event notifier notification from tracer: fd = %i, domain = %s",
4478 pipe, lttng_domain_type_str(domain));
8b524060
FD
4479 goto end;
4480 }
4481
4482 ret = dispatch_one_event_notifier_notification(state, notification);
4483 if (ret) {
4484 ERR("[notification-thread] Error dispatching an event notifier notification from tracer: fd = %i, domain = %s",
4485 pipe, lttng_domain_type_str(domain));
4486 goto end;
4487 }
4488
94078603 4489end:
8b524060 4490 lttng_event_notifier_notification_destroy(notification);
94078603
JR
4491 return ret;
4492}
4493
8b524060
FD
4494int handle_notification_thread_event_notification(struct notification_thread_state *state,
4495 int pipe, enum lttng_domain_type domain)
4496{
4497 return handle_one_event_notifier_notification(state, pipe, domain);
4498}
4499
ab0ee2ca
JG
4500int handle_notification_thread_channel_sample(
4501 struct notification_thread_state *state, int pipe,
4502 enum lttng_domain_type domain)
4503{
4504 int ret = 0;
4505 struct lttcomm_consumer_channel_monitor_msg sample_msg;
ab0ee2ca
JG
4506 struct channel_info *channel_info;
4507 struct cds_lfht_node *node;
4508 struct cds_lfht_iter iter;
4509 struct lttng_channel_trigger_list *trigger_list;
4510 struct lttng_trigger_list_element *trigger_list_element;
4511 bool previous_sample_available = false;
e8360425
JD
4512 struct channel_state_sample previous_sample, latest_sample;
4513 uint64_t previous_session_consumed_total, latest_session_consumed_total;
f2b3ef9f 4514 struct lttng_credentials channel_creds;
ab0ee2ca
JG
4515
4516 /*
4517 * The monitoring pipe only holds messages smaller than PIPE_BUF,
4518 * ensuring that read/write of sampling messages are atomic.
4519 */
7c2551ef 4520 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
ab0ee2ca
JG
4521 if (ret != sizeof(sample_msg)) {
4522 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
4523 pipe);
4524 ret = -1;
4525 goto end;
4526 }
4527
4528 ret = 0;
4529 latest_sample.key.key = sample_msg.key;
4530 latest_sample.key.domain = domain;
4531 latest_sample.highest_usage = sample_msg.highest;
4532 latest_sample.lowest_usage = sample_msg.lowest;
e8360425 4533 latest_sample.channel_total_consumed = sample_msg.total_consumed;
ab0ee2ca
JG
4534
4535 rcu_read_lock();
4536
4537 /* Retrieve the channel's informations */
4538 cds_lfht_lookup(state->channels_ht,
4539 hash_channel_key(&latest_sample.key),
4540 match_channel_info,
4541 &latest_sample.key,
4542 &iter);
4543 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 4544 if (caa_unlikely(!node)) {
ab0ee2ca
JG
4545 /*
4546 * Not an error since the consumer can push a sample to the pipe
4547 * and the rest of the session daemon could notify us of the
4548 * channel's destruction before we get a chance to process that
4549 * sample.
4550 */
4551 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
4552 latest_sample.key.key,
526200e4 4553 lttng_domain_type_str(domain));
ab0ee2ca
JG
4554 goto end_unlock;
4555 }
4556 channel_info = caa_container_of(node, struct channel_info,
4557 channels_ht_node);
e8360425 4558 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
8abe313a 4559 channel_info->name,
ab0ee2ca 4560 latest_sample.key.key,
8abe313a 4561 channel_info->session_info->name,
ab0ee2ca 4562 latest_sample.highest_usage,
e8360425
JD
4563 latest_sample.lowest_usage,
4564 latest_sample.channel_total_consumed);
4565
4566 previous_session_consumed_total =
4567 channel_info->session_info->consumed_data_size;
ab0ee2ca
JG
4568
4569 /* Retrieve the channel's last sample, if it exists, and update it. */
4570 cds_lfht_lookup(state->channel_state_ht,
4571 hash_channel_key(&latest_sample.key),
4572 match_channel_state_sample,
4573 &latest_sample.key,
4574 &iter);
4575 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 4576 if (caa_likely(node)) {
ab0ee2ca
JG
4577 struct channel_state_sample *stored_sample;
4578
4579 /* Update the sample stored. */
4580 stored_sample = caa_container_of(node,
4581 struct channel_state_sample,
4582 channel_state_ht_node);
e8360425 4583
ab0ee2ca
JG
4584 memcpy(&previous_sample, stored_sample,
4585 sizeof(previous_sample));
4586 stored_sample->highest_usage = latest_sample.highest_usage;
4587 stored_sample->lowest_usage = latest_sample.lowest_usage;
0f0479d7 4588 stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
ab0ee2ca 4589 previous_sample_available = true;
e8360425
JD
4590
4591 latest_session_consumed_total =
4592 previous_session_consumed_total +
4593 (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
ab0ee2ca
JG
4594 } else {
4595 /*
4596 * This is the channel's first sample, allocate space for and
4597 * store the new sample.
4598 */
4599 struct channel_state_sample *stored_sample;
4600
4601 stored_sample = zmalloc(sizeof(*stored_sample));
4602 if (!stored_sample) {
4603 ret = -1;
4604 goto end_unlock;
4605 }
4606
4607 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
4608 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
4609 cds_lfht_add(state->channel_state_ht,
4610 hash_channel_key(&stored_sample->key),
4611 &stored_sample->channel_state_ht_node);
e8360425
JD
4612
4613 latest_session_consumed_total =
4614 previous_session_consumed_total +
4615 latest_sample.channel_total_consumed;
ab0ee2ca
JG
4616 }
4617
e8360425
JD
4618 channel_info->session_info->consumed_data_size =
4619 latest_session_consumed_total;
4620
ab0ee2ca
JG
4621 /* Find triggers associated with this channel. */
4622 cds_lfht_lookup(state->channel_triggers_ht,
4623 hash_channel_key(&latest_sample.key),
4624 match_channel_trigger_list,
4625 &latest_sample.key,
4626 &iter);
4627 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 4628 if (caa_likely(!node)) {
ab0ee2ca
JG
4629 goto end_unlock;
4630 }
4631
f2b3ef9f 4632 channel_creds = (typeof(channel_creds)) {
ff588497
JR
4633 .uid = LTTNG_OPTIONAL_INIT_VALUE(channel_info->session_info->uid),
4634 .gid = LTTNG_OPTIONAL_INIT_VALUE(channel_info->session_info->gid),
f2b3ef9f
JG
4635 };
4636
ab0ee2ca
JG
4637 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
4638 channel_triggers_ht_node);
4639 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
e742b055 4640 node) {
9b63a4aa 4641 const struct lttng_condition *condition;
f2b3ef9f 4642 struct lttng_trigger *trigger;
505b2d90 4643 struct notification_client_list *client_list = NULL;
ab0ee2ca 4644 struct lttng_evaluation *evaluation = NULL;
f2b3ef9f 4645 enum action_executor_status executor_status;
ab0ee2ca 4646
505b2d90 4647 ret = 0;
ab0ee2ca 4648 trigger = trigger_list_element->trigger;
9b63a4aa 4649 condition = lttng_trigger_get_const_condition(trigger);
ab0ee2ca 4650 assert(condition);
ab0ee2ca
JG
4651
4652 /*
4653 * Check if any client is subscribed to the result of this
4654 * evaluation.
4655 */
ed327204 4656 client_list = get_client_list_from_condition(state, condition);
ab0ee2ca 4657
ed327204 4658 ret = evaluate_buffer_condition(condition, &evaluation, state,
ab0ee2ca 4659 previous_sample_available ? &previous_sample : NULL,
e8360425
JD
4660 &latest_sample,
4661 previous_session_consumed_total,
4662 latest_session_consumed_total,
4663 channel_info);
4664 if (caa_unlikely(ret)) {
505b2d90 4665 goto put_list;
ab0ee2ca
JG
4666 }
4667
e8360425 4668 if (caa_likely(!evaluation)) {
505b2d90 4669 goto put_list;
ab0ee2ca
JG
4670 }
4671
f8522f5c
JR
4672 if (!lttng_trigger_should_fire(trigger)) {
4673 goto put_list;
4674 }
4675
4676 lttng_trigger_fire(trigger);
4677
f2b3ef9f
JG
4678 /*
4679 * Ownership of `evaluation` transferred to the action executor
4680 * no matter the result.
4681 */
4682 executor_status = action_executor_enqueue(state->executor,
4683 trigger, evaluation, &channel_creds,
4684 client_list);
4685 evaluation = NULL;
4686 switch (executor_status) {
4687 case ACTION_EXECUTOR_STATUS_OK:
4688 break;
4689 case ACTION_EXECUTOR_STATUS_ERROR:
4690 case ACTION_EXECUTOR_STATUS_INVALID:
4691 /*
4692 * TODO Add trigger identification (name/id) when
4693 * it is added to the API.
4694 */
4695 ERR("Fatal error occurred while enqueuing action associated with buffer-condition trigger");
4696 ret = -1;
4697 goto put_list;
4698 case ACTION_EXECUTOR_STATUS_OVERFLOW:
4699 /*
4700 * TODO Add trigger identification (name/id) when
4701 * it is added to the API.
4702 *
4703 * Not a fatal error.
4704 */
4705 WARN("No space left when enqueuing action associated with buffer-condition trigger");
4706 ret = 0;
4707 goto put_list;
4708 default:
4709 abort();
4710 }
4711
505b2d90
JG
4712put_list:
4713 notification_client_list_put(client_list);
e0b5f87b 4714 if (caa_unlikely(ret)) {
505b2d90 4715 break;
ab0ee2ca
JG
4716 }
4717 }
4718end_unlock:
4719 rcu_read_unlock();
4720end:
4721 return ret;
4722}
This page took 0.27553 seconds and 4 git commands to generate.