Fix: sessiond: make the --without-lttng-ust version of launch_application_notificatio...
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.c
CommitLineData
f1e16794 1/*
ab5be9fa 2 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
f1e16794 3 *
ab5be9fa 4 * SPDX-License-Identifier: GPL-2.0-only
f1e16794 5 *
f1e16794
DG
6 */
7
6c1c0768 8#define _LGPL_SOURCE
f1e16794
DG
9#include <stdio.h>
10#include <stdlib.h>
f1e16794
DG
11#include <sys/stat.h>
12#include <unistd.h>
e1f3997a 13#include <inttypes.h>
f1e16794
DG
14
15#include <common/common.h>
16#include <common/defaults.h>
f5436bfc 17#include <common/compat/string.h>
f1e16794 18
00e2e675 19#include "consumer.h"
8782cc74 20#include "health-sessiond.h"
f1e16794 21#include "kernel-consumer.h"
e9404c27
JG
22#include "notification-thread-commands.h"
23#include "session.h"
24#include "lttng-sessiond.h"
f1e16794 25
5da88b0f
MD
26static char *create_channel_path(struct consumer_output *consumer,
27 size_t *consumer_path_offset)
00e2e675
DG
28{
29 int ret;
ffe60014 30 char tmp_path[PATH_MAX];
2bba9e53 31 char *pathname = NULL;
00e2e675 32
2bba9e53 33 assert(consumer);
00e2e675 34
ffe60014 35 /* Get the right path name destination */
a5ba6fdd
JG
36 if (consumer->type == CONSUMER_DST_LOCAL ||
37 (consumer->type == CONSUMER_DST_NET &&
38 consumer->relay_major_version == 2 &&
39 consumer->relay_minor_version >= 11)) {
d2956687 40 pathname = strdup(consumer->domain_subdir);
bb3c4e70 41 if (!pathname) {
d2956687
JG
42 PERROR("Failed to copy domain subdirectory string %s",
43 consumer->domain_subdir);
bb3c4e70
MD
44 goto error;
45 }
5da88b0f 46 *consumer_path_offset = strlen(consumer->domain_subdir);
d2956687
JG
47 DBG3("Kernel local consumer trace path relative to current trace chunk: \"%s\"",
48 pathname);
ffe60014 49 } else {
5da88b0f 50 /* Network output, relayd < 2.11. */
2b29c638
JD
51 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s",
52 consumer->dst.net.base_dir,
b178f53e 53 consumer->domain_subdir);
ffe60014 54 if (ret < 0) {
2bba9e53 55 PERROR("snprintf kernel metadata path");
ffe60014 56 goto error;
dba13f1d
JG
57 } else if (ret >= sizeof(tmp_path)) {
58 ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s\"",
59 sizeof(tmp_path), ret,
60 consumer->dst.net.base_dir,
b178f53e 61 consumer->domain_subdir);
dba13f1d 62 goto error;
ffe60014 63 }
f5436bfc 64 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
bb3c4e70 65 if (!pathname) {
f5436bfc 66 PERROR("lttng_strndup");
bb3c4e70
MD
67 goto error;
68 }
5da88b0f 69 *consumer_path_offset = 0;
ffe60014
DG
70 DBG3("Kernel network consumer subdir path: %s", pathname);
71 }
72
2bba9e53
DG
73 return pathname;
74
75error:
76 free(pathname);
77 return NULL;
78}
79
80/*
81 * Sending a single channel to the consumer with command ADD_CHANNEL.
82 */
ba27cd00 83static
2bba9e53 84int kernel_consumer_add_channel(struct consumer_socket *sock,
e9404c27
JG
85 struct ltt_kernel_channel *channel,
86 struct ltt_kernel_session *ksession,
2bba9e53
DG
87 unsigned int monitor)
88{
89 int ret;
d2956687 90 char *pathname = NULL;
2bba9e53
DG
91 struct lttcomm_consumer_msg lkm;
92 struct consumer_output *consumer;
e9404c27 93 enum lttng_error_code status;
e32d7f27 94 struct ltt_session *session = NULL;
e9404c27 95 struct lttng_channel_extended *channel_attr_extended;
d2956687 96 bool is_local_trace;
5da88b0f 97 size_t consumer_path_offset = 0;
2bba9e53
DG
98
99 /* Safety net */
100 assert(channel);
e9404c27
JG
101 assert(ksession);
102 assert(ksession->consumer);
2bba9e53 103
e9404c27
JG
104 consumer = ksession->consumer;
105 channel_attr_extended = (struct lttng_channel_extended *)
106 channel->channel->attr.extended.ptr;
2bba9e53
DG
107
108 DBG("Kernel consumer adding channel %s to kernel consumer",
109 channel->channel->name);
d2956687 110 is_local_trace = consumer->net_seq_index == -1ULL;
2bba9e53 111
5da88b0f 112 pathname = create_channel_path(consumer, &consumer_path_offset);
bb3c4e70
MD
113 if (!pathname) {
114 ret = -1;
115 goto error;
116 }
2bba9e53 117
d2956687
JG
118 if (is_local_trace && ksession->current_trace_chunk) {
119 enum lttng_trace_chunk_status chunk_status;
120 char *pathname_index;
121
122 ret = asprintf(&pathname_index, "%s/" DEFAULT_INDEX_DIR,
123 pathname);
124 if (ret < 0) {
125 ERR("Failed to format channel index directory");
126 ret = -1;
127 goto error;
128 }
129
130 /*
131 * Create the index subdirectory which will take care
132 * of implicitly creating the channel's path.
133 */
134 chunk_status = lttng_trace_chunk_create_subdirectory(
135 ksession->current_trace_chunk, pathname_index);
136 free(pathname_index);
137 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
138 ret = -1;
139 goto error;
140 }
141 }
142
00e2e675 143 /* Prep channel message structure */
638e7b4e 144 consumer_init_add_channel_comm_msg(&lkm,
e1f3997a 145 channel->key,
e9404c27 146 ksession->id,
5da88b0f 147 &pathname[consumer_path_offset],
e9404c27
JG
148 ksession->uid,
149 ksession->gid,
ffe60014 150 consumer->net_seq_index,
c30aaa51 151 channel->channel->name,
ffe60014
DG
152 channel->stream_count,
153 channel->channel->attr.output,
1624d5b7
JD
154 CONSUMER_CHANNEL_TYPE_DATA,
155 channel->channel->attr.tracefile_size,
2bba9e53 156 channel->channel->attr.tracefile_count,
ecc48a90 157 monitor,
e9404c27 158 channel->channel->attr.live_timer_interval,
d2956687
JG
159 channel_attr_extended->monitor_timer_interval,
160 ksession->current_trace_chunk);
00e2e675 161
840cb59c 162 health_code_update();
ca03de58 163
00e2e675
DG
164 ret = consumer_send_channel(sock, &lkm);
165 if (ret < 0) {
166 goto error;
167 }
168
840cb59c 169 health_code_update();
e9404c27
JG
170 rcu_read_lock();
171 session = session_find_by_id(ksession->id);
172 assert(session);
e098433c
JG
173 assert(pthread_mutex_trylock(&session->lock));
174 assert(session_trylock_list());
ca03de58 175
e9404c27
JG
176 status = notification_thread_command_add_channel(
177 notification_thread_handle, session->name,
178 ksession->uid, ksession->gid,
e1f3997a 179 channel->channel->name, channel->key,
e9404c27
JG
180 LTTNG_DOMAIN_KERNEL,
181 channel->channel->attr.subbuf_size * channel->channel->attr.num_subbuf);
182 rcu_read_unlock();
183 if (status != LTTNG_OK) {
184 ret = -1;
185 goto error;
186 }
753873bf
JR
187
188 channel->published_to_notification_thread = true;
189
00e2e675 190error:
e32d7f27
JG
191 if (session) {
192 session_put(session);
193 }
53efb85a 194 free(pathname);
00e2e675
DG
195 return ret;
196}
197
198/*
199 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
9a318688
JG
200 *
201 * The consumer socket lock must be held by the caller.
00e2e675 202 */
f50f23d9 203int kernel_consumer_add_metadata(struct consumer_socket *sock,
e098433c 204 struct ltt_kernel_session *ksession, unsigned int monitor)
00e2e675
DG
205{
206 int ret;
00e2e675 207 struct lttcomm_consumer_msg lkm;
a7d9a3e7 208 struct consumer_output *consumer;
e098433c
JG
209
210 rcu_read_lock();
00e2e675
DG
211
212 /* Safety net */
e098433c
JG
213 assert(ksession);
214 assert(ksession->consumer);
f50f23d9 215 assert(sock);
00e2e675 216
e098433c
JG
217 DBG("Sending metadata %d to kernel consumer",
218 ksession->metadata_stream_fd);
00e2e675
DG
219
220 /* Get consumer output pointer */
e098433c 221 consumer = ksession->consumer;
00e2e675 222
00e2e675 223 /* Prep channel message structure */
638e7b4e 224 consumer_init_add_channel_comm_msg(&lkm,
e098433c
JG
225 ksession->metadata->key,
226 ksession->id,
5da88b0f 227 "",
e098433c
JG
228 ksession->uid,
229 ksession->gid,
ffe60014 230 consumer->net_seq_index,
30079b6b 231 DEFAULT_METADATA_NAME,
ffe60014
DG
232 1,
233 DEFAULT_KERNEL_CHANNEL_OUTPUT,
1624d5b7 234 CONSUMER_CHANNEL_TYPE_METADATA,
2bba9e53 235 0, 0,
d2956687 236 monitor, 0, 0, ksession->current_trace_chunk);
00e2e675 237
840cb59c 238 health_code_update();
ca03de58 239
00e2e675
DG
240 ret = consumer_send_channel(sock, &lkm);
241 if (ret < 0) {
242 goto error;
243 }
244
840cb59c 245 health_code_update();
ca03de58 246
00e2e675 247 /* Prep stream message structure */
e098433c
JG
248 consumer_init_add_stream_comm_msg(&lkm,
249 ksession->metadata->key,
250 ksession->metadata_stream_fd,
d2956687 251 0 /* CPU: 0 for metadata. */);
00e2e675 252
840cb59c 253 health_code_update();
ca03de58 254
00e2e675 255 /* Send stream and file descriptor */
a7d9a3e7 256 ret = consumer_send_stream(sock, consumer, &lkm,
e098433c 257 &ksession->metadata_stream_fd, 1);
00e2e675
DG
258 if (ret < 0) {
259 goto error;
260 }
261
840cb59c 262 health_code_update();
ca03de58 263
00e2e675 264error:
e098433c 265 rcu_read_unlock();
00e2e675
DG
266 return ret;
267}
268
269/*
270 * Sending a single stream to the consumer with command ADD_STREAM.
271 */
5b0e3ccb 272static
f50f23d9 273int kernel_consumer_add_stream(struct consumer_socket *sock,
e098433c
JG
274 struct ltt_kernel_channel *channel,
275 struct ltt_kernel_stream *stream,
d2956687 276 struct ltt_kernel_session *session, unsigned int monitor)
00e2e675
DG
277{
278 int ret;
00e2e675 279 struct lttcomm_consumer_msg lkm;
a7d9a3e7 280 struct consumer_output *consumer;
00e2e675
DG
281
282 assert(channel);
283 assert(stream);
284 assert(session);
285 assert(session->consumer);
f50f23d9 286 assert(sock);
00e2e675
DG
287
288 DBG("Sending stream %d of channel %s to kernel consumer",
289 stream->fd, channel->channel->name);
290
291 /* Get consumer output pointer */
a7d9a3e7 292 consumer = session->consumer;
00e2e675 293
00e2e675 294 /* Prep stream consumer message */
e098433c 295 consumer_init_add_stream_comm_msg(&lkm,
e1f3997a 296 channel->key,
00e2e675 297 stream->fd,
d2956687 298 stream->cpu);
00e2e675 299
840cb59c 300 health_code_update();
ca03de58 301
00e2e675 302 /* Send stream and file descriptor */
a7d9a3e7 303 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
00e2e675
DG
304 if (ret < 0) {
305 goto error;
306 }
307
840cb59c 308 health_code_update();
ca03de58 309
00e2e675
DG
310error:
311 return ret;
312}
313
a4baae1b
JD
314/*
315 * Sending the notification that all streams were sent with STREAMS_SENT.
316 */
317int kernel_consumer_streams_sent(struct consumer_socket *sock,
318 struct ltt_kernel_session *session, uint64_t channel_key)
319{
320 int ret;
321 struct lttcomm_consumer_msg lkm;
322 struct consumer_output *consumer;
323
324 assert(sock);
325 assert(session);
326
327 DBG("Sending streams_sent");
328 /* Get consumer output pointer */
329 consumer = session->consumer;
330
331 /* Prep stream consumer message */
332 consumer_init_streams_sent_comm_msg(&lkm,
333 LTTNG_CONSUMER_STREAMS_SENT,
334 channel_key, consumer->net_seq_index);
335
336 health_code_update();
337
338 /* Send stream and file descriptor */
339 ret = consumer_send_msg(sock, &lkm);
340 if (ret < 0) {
341 goto error;
342 }
343
344error:
345 return ret;
346}
347
f1e16794
DG
348/*
349 * Send all stream fds of kernel channel to the consumer.
9a318688
JG
350 *
351 * The consumer socket lock must be held by the caller.
f1e16794 352 */
1fc1b7c8 353int kernel_consumer_send_channel_streams(struct consumer_socket *sock,
e098433c 354 struct ltt_kernel_channel *channel, struct ltt_kernel_session *ksession,
2bba9e53 355 unsigned int monitor)
f1e16794 356{
e99f9447 357 int ret = LTTNG_OK;
f1e16794 358 struct ltt_kernel_stream *stream;
00e2e675
DG
359
360 /* Safety net */
361 assert(channel);
e098433c
JG
362 assert(ksession);
363 assert(ksession->consumer);
f50f23d9 364 assert(sock);
00e2e675 365
e098433c
JG
366 rcu_read_lock();
367
00e2e675 368 /* Bail out if consumer is disabled */
e098433c 369 if (!ksession->consumer->enabled) {
f73fabfd 370 ret = LTTNG_OK;
00e2e675
DG
371 goto error;
372 }
f1e16794
DG
373
374 DBG("Sending streams of channel %s to kernel consumer",
375 channel->channel->name);
376
e99f9447 377 if (!channel->sent_to_consumer) {
e098433c 378 ret = kernel_consumer_add_channel(sock, channel, ksession, monitor);
e99f9447
MD
379 if (ret < 0) {
380 goto error;
381 }
382 channel->sent_to_consumer = true;
f1e16794
DG
383 }
384
385 /* Send streams */
386 cds_list_for_each_entry(stream, &channel->stream_list.head, list) {
6986ab9b 387 if (!stream->fd || stream->sent_to_consumer) {
f1e16794
DG
388 continue;
389 }
00e2e675
DG
390
391 /* Add stream on the kernel consumer side. */
e098433c 392 ret = kernel_consumer_add_stream(sock, channel, stream,
d2956687 393 ksession, monitor);
f1e16794 394 if (ret < 0) {
f1e16794
DG
395 goto error;
396 }
6986ab9b 397 stream->sent_to_consumer = true;
f1e16794
DG
398 }
399
f1e16794 400error:
e098433c 401 rcu_read_unlock();
f1e16794
DG
402 return ret;
403}
404
405/*
406 * Send all stream fds of the kernel session to the consumer.
9a318688
JG
407 *
408 * The consumer socket lock must be held by the caller.
f1e16794 409 */
f50f23d9
DG
410int kernel_consumer_send_session(struct consumer_socket *sock,
411 struct ltt_kernel_session *session)
f1e16794 412{
2bba9e53 413 int ret, monitor = 0;
f1e16794 414 struct ltt_kernel_channel *chan;
f1e16794 415
00e2e675
DG
416 /* Safety net */
417 assert(session);
418 assert(session->consumer);
f50f23d9 419 assert(sock);
f1e16794 420
00e2e675
DG
421 /* Bail out if consumer is disabled */
422 if (!session->consumer->enabled) {
f73fabfd 423 ret = LTTNG_OK;
00e2e675 424 goto error;
f1e16794
DG
425 }
426
2bba9e53
DG
427 /* Don't monitor the streams on the consumer if in flight recorder. */
428 if (session->output_traces) {
429 monitor = 1;
430 }
431
00e2e675
DG
432 DBG("Sending session stream to kernel consumer");
433
609af759 434 if (session->metadata_stream_fd >= 0 && session->metadata) {
2bba9e53 435 ret = kernel_consumer_add_metadata(sock, session, monitor);
f1e16794 436 if (ret < 0) {
f1e16794
DG
437 goto error;
438 }
f1e16794
DG
439 }
440
00e2e675 441 /* Send channel and streams of it */
f1e16794 442 cds_list_for_each_entry(chan, &session->channel_list.head, list) {
1fc1b7c8 443 ret = kernel_consumer_send_channel_streams(sock, chan, session,
2bba9e53 444 monitor);
f1e16794
DG
445 if (ret < 0) {
446 goto error;
447 }
601262d6
JD
448 if (monitor) {
449 /*
450 * Inform the relay that all the streams for the
451 * channel were sent.
452 */
e1f3997a 453 ret = kernel_consumer_streams_sent(sock, session, chan->key);
601262d6
JD
454 if (ret < 0) {
455 goto error;
456 }
457 }
f1e16794
DG
458 }
459
00e2e675 460 DBG("Kernel consumer FDs of metadata and channel streams sent");
f1e16794 461
4ce9ff51 462 session->consumer_fds_sent = 1;
f1e16794
DG
463 return 0;
464
465error:
466 return ret;
467}
07b86b52
JD
468
469int kernel_consumer_destroy_channel(struct consumer_socket *socket,
470 struct ltt_kernel_channel *channel)
471{
472 int ret;
473 struct lttcomm_consumer_msg msg;
474
475 assert(channel);
476 assert(socket);
07b86b52 477
e1f3997a 478 DBG("Sending kernel consumer destroy channel key %" PRIu64, channel->key);
07b86b52 479
53efb85a 480 memset(&msg, 0, sizeof(msg));
07b86b52 481 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
e1f3997a 482 msg.u.destroy_channel.key = channel->key;
07b86b52
JD
483
484 pthread_mutex_lock(socket->lock);
485 health_code_update();
486
487 ret = consumer_send_msg(socket, &msg);
488 if (ret < 0) {
489 goto error;
490 }
491
492error:
493 health_code_update();
494 pthread_mutex_unlock(socket->lock);
495 return ret;
496}
497
498int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
499 struct ltt_kernel_metadata *metadata)
500{
501 int ret;
502 struct lttcomm_consumer_msg msg;
503
504 assert(metadata);
505 assert(socket);
07b86b52 506
d40f0359 507 DBG("Sending kernel consumer destroy channel key %" PRIu64, metadata->key);
07b86b52 508
53efb85a 509 memset(&msg, 0, sizeof(msg));
07b86b52 510 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
d40f0359 511 msg.u.destroy_channel.key = metadata->key;
07b86b52
JD
512
513 pthread_mutex_lock(socket->lock);
514 health_code_update();
515
516 ret = consumer_send_msg(socket, &msg);
517 if (ret < 0) {
518 goto error;
519 }
520
521error:
522 health_code_update();
523 pthread_mutex_unlock(socket->lock);
524 return ret;
525}
This page took 0.078241 seconds and 4 git commands to generate.