Run clang-format on the whole tree
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.cpp
CommitLineData
f1e16794 1/*
ab5be9fa 2 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
f1e16794 3 *
ab5be9fa 4 * SPDX-License-Identifier: GPL-2.0-only
f1e16794 5 *
f1e16794
DG
6 */
7
6c1c0768 8#define _LGPL_SOURCE
c9e313bc
SM
9#include "consumer.hpp"
10#include "health-sessiond.hpp"
11#include "kernel-consumer.hpp"
28ab034a 12#include "lttng-sessiond.hpp"
c9e313bc
SM
13#include "notification-thread-commands.hpp"
14#include "session.hpp"
f1e16794 15
28ab034a
JG
16#include <common/common.hpp>
17#include <common/compat/string.hpp>
18#include <common/defaults.hpp>
19
20#include <inttypes.h>
21#include <stdio.h>
22#include <stdlib.h>
23#include <sys/stat.h>
24#include <unistd.h>
25
26static char *create_channel_path(struct consumer_output *consumer, size_t *consumer_path_offset)
00e2e675
DG
27{
28 int ret;
ffe60014 29 char tmp_path[PATH_MAX];
2bba9e53 30 char *pathname = NULL;
00e2e675 31
a0377dfe 32 LTTNG_ASSERT(consumer);
00e2e675 33
ffe60014 34 /* Get the right path name destination */
a5ba6fdd 35 if (consumer->type == CONSUMER_DST_LOCAL ||
28ab034a
JG
36 (consumer->type == CONSUMER_DST_NET && consumer->relay_major_version == 2 &&
37 consumer->relay_minor_version >= 11)) {
d2956687 38 pathname = strdup(consumer->domain_subdir);
bb3c4e70 39 if (!pathname) {
d2956687 40 PERROR("Failed to copy domain subdirectory string %s",
28ab034a 41 consumer->domain_subdir);
bb3c4e70
MD
42 goto error;
43 }
5da88b0f 44 *consumer_path_offset = strlen(consumer->domain_subdir);
d2956687 45 DBG3("Kernel local consumer trace path relative to current trace chunk: \"%s\"",
28ab034a 46 pathname);
ffe60014 47 } else {
5da88b0f 48 /* Network output, relayd < 2.11. */
28ab034a
JG
49 ret = snprintf(tmp_path,
50 sizeof(tmp_path),
51 "%s%s",
52 consumer->dst.net.base_dir,
53 consumer->domain_subdir);
ffe60014 54 if (ret < 0) {
2bba9e53 55 PERROR("snprintf kernel metadata path");
ffe60014 56 goto error;
dba13f1d
JG
57 } else if (ret >= sizeof(tmp_path)) {
58 ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s\"",
28ab034a
JG
59 sizeof(tmp_path),
60 ret,
61 consumer->dst.net.base_dir,
62 consumer->domain_subdir);
dba13f1d 63 goto error;
ffe60014 64 }
f5436bfc 65 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
bb3c4e70 66 if (!pathname) {
f5436bfc 67 PERROR("lttng_strndup");
bb3c4e70
MD
68 goto error;
69 }
5da88b0f 70 *consumer_path_offset = 0;
ffe60014
DG
71 DBG3("Kernel network consumer subdir path: %s", pathname);
72 }
73
2bba9e53
DG
74 return pathname;
75
76error:
77 free(pathname);
78 return NULL;
79}
80
81/*
82 * Sending a single channel to the consumer with command ADD_CHANNEL.
83 */
28ab034a
JG
84static int kernel_consumer_add_channel(struct consumer_socket *sock,
85 struct ltt_kernel_channel *channel,
86 struct ltt_kernel_session *ksession,
87 unsigned int monitor)
2bba9e53
DG
88{
89 int ret;
d2956687 90 char *pathname = NULL;
2bba9e53
DG
91 struct lttcomm_consumer_msg lkm;
92 struct consumer_output *consumer;
e9404c27 93 enum lttng_error_code status;
e32d7f27 94 struct ltt_session *session = NULL;
e9404c27 95 struct lttng_channel_extended *channel_attr_extended;
d2956687 96 bool is_local_trace;
5da88b0f 97 size_t consumer_path_offset = 0;
2bba9e53
DG
98
99 /* Safety net */
a0377dfe
FD
100 LTTNG_ASSERT(channel);
101 LTTNG_ASSERT(ksession);
102 LTTNG_ASSERT(ksession->consumer);
2bba9e53 103
e9404c27 104 consumer = ksession->consumer;
28ab034a
JG
105 channel_attr_extended =
106 (struct lttng_channel_extended *) channel->channel->attr.extended.ptr;
2bba9e53 107
28ab034a 108 DBG("Kernel consumer adding channel %s to kernel consumer", channel->channel->name);
d2956687 109 is_local_trace = consumer->net_seq_index == -1ULL;
2bba9e53 110
5da88b0f 111 pathname = create_channel_path(consumer, &consumer_path_offset);
bb3c4e70
MD
112 if (!pathname) {
113 ret = -1;
114 goto error;
115 }
2bba9e53 116
d2956687
JG
117 if (is_local_trace && ksession->current_trace_chunk) {
118 enum lttng_trace_chunk_status chunk_status;
119 char *pathname_index;
120
28ab034a 121 ret = asprintf(&pathname_index, "%s/" DEFAULT_INDEX_DIR, pathname);
d2956687
JG
122 if (ret < 0) {
123 ERR("Failed to format channel index directory");
124 ret = -1;
125 goto error;
126 }
127
128 /*
129 * Create the index subdirectory which will take care
130 * of implicitly creating the channel's path.
131 */
28ab034a
JG
132 chunk_status = lttng_trace_chunk_create_subdirectory(ksession->current_trace_chunk,
133 pathname_index);
d2956687
JG
134 free(pathname_index);
135 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
136 ret = -1;
137 goto error;
138 }
139 }
140
00e2e675 141 /* Prep channel message structure */
638e7b4e 142 consumer_init_add_channel_comm_msg(&lkm,
28ab034a
JG
143 channel->key,
144 ksession->id,
145 &pathname[consumer_path_offset],
146 consumer->net_seq_index,
147 channel->channel->name,
148 channel->stream_count,
149 channel->channel->attr.output,
150 CONSUMER_CHANNEL_TYPE_DATA,
151 channel->channel->attr.tracefile_size,
152 channel->channel->attr.tracefile_count,
153 monitor,
154 channel->channel->attr.live_timer_interval,
155 ksession->is_live_session,
156 channel_attr_extended->monitor_timer_interval,
157 ksession->current_trace_chunk);
00e2e675 158
840cb59c 159 health_code_update();
ca03de58 160
00e2e675
DG
161 ret = consumer_send_channel(sock, &lkm);
162 if (ret < 0) {
163 goto error;
164 }
165
840cb59c 166 health_code_update();
e9404c27
JG
167 rcu_read_lock();
168 session = session_find_by_id(ksession->id);
a0377dfe 169 LTTNG_ASSERT(session);
3130a40c
JG
170 ASSERT_LOCKED(session->lock);
171 ASSERT_SESSION_LIST_LOCKED();
ca03de58 172
139a8d25 173 status = notification_thread_command_add_channel(the_notification_thread_handle,
28ab034a
JG
174 session->id,
175 channel->channel->name,
176 channel->key,
177 LTTNG_DOMAIN_KERNEL,
178 channel->channel->attr.subbuf_size *
179 channel->channel->attr.num_subbuf);
e9404c27
JG
180 rcu_read_unlock();
181 if (status != LTTNG_OK) {
182 ret = -1;
183 goto error;
184 }
753873bf
JR
185
186 channel->published_to_notification_thread = true;
187
00e2e675 188error:
e32d7f27
JG
189 if (session) {
190 session_put(session);
191 }
53efb85a 192 free(pathname);
00e2e675
DG
193 return ret;
194}
195
196/*
197 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
9a318688
JG
198 *
199 * The consumer socket lock must be held by the caller.
00e2e675 200 */
f50f23d9 201int kernel_consumer_add_metadata(struct consumer_socket *sock,
28ab034a
JG
202 struct ltt_kernel_session *ksession,
203 unsigned int monitor)
00e2e675
DG
204{
205 int ret;
00e2e675 206 struct lttcomm_consumer_msg lkm;
a7d9a3e7 207 struct consumer_output *consumer;
e098433c
JG
208
209 rcu_read_lock();
00e2e675
DG
210
211 /* Safety net */
a0377dfe
FD
212 LTTNG_ASSERT(ksession);
213 LTTNG_ASSERT(ksession->consumer);
214 LTTNG_ASSERT(sock);
00e2e675 215
28ab034a 216 DBG("Sending metadata %d to kernel consumer", ksession->metadata_stream_fd);
00e2e675
DG
217
218 /* Get consumer output pointer */
e098433c 219 consumer = ksession->consumer;
00e2e675 220
00e2e675 221 /* Prep channel message structure */
d42266a4 222 consumer_init_add_channel_comm_msg(&lkm,
28ab034a
JG
223 ksession->metadata->key,
224 ksession->id,
225 "",
226 consumer->net_seq_index,
227 ksession->metadata->conf->name,
228 1,
229 ksession->metadata->conf->attr.output,
230 CONSUMER_CHANNEL_TYPE_METADATA,
231 ksession->metadata->conf->attr.tracefile_size,
232 ksession->metadata->conf->attr.tracefile_count,
233 monitor,
234 ksession->metadata->conf->attr.live_timer_interval,
235 ksession->is_live_session,
236 0,
237 ksession->current_trace_chunk);
00e2e675 238
840cb59c 239 health_code_update();
ca03de58 240
00e2e675
DG
241 ret = consumer_send_channel(sock, &lkm);
242 if (ret < 0) {
243 goto error;
244 }
245
840cb59c 246 health_code_update();
ca03de58 247
00e2e675 248 /* Prep stream message structure */
e098433c 249 consumer_init_add_stream_comm_msg(&lkm,
28ab034a
JG
250 ksession->metadata->key,
251 ksession->metadata_stream_fd,
252 0 /* CPU: 0 for metadata. */);
00e2e675 253
840cb59c 254 health_code_update();
ca03de58 255
00e2e675 256 /* Send stream and file descriptor */
28ab034a 257 ret = consumer_send_stream(sock, consumer, &lkm, &ksession->metadata_stream_fd, 1);
00e2e675
DG
258 if (ret < 0) {
259 goto error;
260 }
261
840cb59c 262 health_code_update();
ca03de58 263
00e2e675 264error:
e098433c 265 rcu_read_unlock();
00e2e675
DG
266 return ret;
267}
268
269/*
270 * Sending a single stream to the consumer with command ADD_STREAM.
271 */
28ab034a
JG
272static int kernel_consumer_add_stream(struct consumer_socket *sock,
273 struct ltt_kernel_channel *channel,
274 struct ltt_kernel_stream *stream,
275 struct ltt_kernel_session *session)
00e2e675
DG
276{
277 int ret;
00e2e675 278 struct lttcomm_consumer_msg lkm;
a7d9a3e7 279 struct consumer_output *consumer;
00e2e675 280
a0377dfe
FD
281 LTTNG_ASSERT(channel);
282 LTTNG_ASSERT(stream);
283 LTTNG_ASSERT(session);
284 LTTNG_ASSERT(session->consumer);
285 LTTNG_ASSERT(sock);
00e2e675
DG
286
287 DBG("Sending stream %d of channel %s to kernel consumer",
28ab034a
JG
288 stream->fd,
289 channel->channel->name);
00e2e675
DG
290
291 /* Get consumer output pointer */
a7d9a3e7 292 consumer = session->consumer;
00e2e675 293
00e2e675 294 /* Prep stream consumer message */
28ab034a 295 consumer_init_add_stream_comm_msg(&lkm, channel->key, stream->fd, stream->cpu);
00e2e675 296
840cb59c 297 health_code_update();
ca03de58 298
00e2e675 299 /* Send stream and file descriptor */
a7d9a3e7 300 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
00e2e675
DG
301 if (ret < 0) {
302 goto error;
303 }
304
840cb59c 305 health_code_update();
ca03de58 306
00e2e675
DG
307error:
308 return ret;
309}
310
a4baae1b
JD
311/*
312 * Sending the notification that all streams were sent with STREAMS_SENT.
313 */
314int kernel_consumer_streams_sent(struct consumer_socket *sock,
28ab034a
JG
315 struct ltt_kernel_session *session,
316 uint64_t channel_key)
a4baae1b
JD
317{
318 int ret;
319 struct lttcomm_consumer_msg lkm;
320 struct consumer_output *consumer;
321
a0377dfe
FD
322 LTTNG_ASSERT(sock);
323 LTTNG_ASSERT(session);
a4baae1b
JD
324
325 DBG("Sending streams_sent");
326 /* Get consumer output pointer */
327 consumer = session->consumer;
328
329 /* Prep stream consumer message */
28ab034a
JG
330 consumer_init_streams_sent_comm_msg(
331 &lkm, LTTNG_CONSUMER_STREAMS_SENT, channel_key, consumer->net_seq_index);
a4baae1b
JD
332
333 health_code_update();
334
335 /* Send stream and file descriptor */
336 ret = consumer_send_msg(sock, &lkm);
337 if (ret < 0) {
338 goto error;
339 }
340
341error:
342 return ret;
343}
344
f1e16794
DG
345/*
346 * Send all stream fds of kernel channel to the consumer.
9a318688
JG
347 *
348 * The consumer socket lock must be held by the caller.
f1e16794 349 */
1fc1b7c8 350int kernel_consumer_send_channel_streams(struct consumer_socket *sock,
28ab034a
JG
351 struct ltt_kernel_channel *channel,
352 struct ltt_kernel_session *ksession,
353 unsigned int monitor)
f1e16794 354{
e99f9447 355 int ret = LTTNG_OK;
f1e16794 356 struct ltt_kernel_stream *stream;
00e2e675
DG
357
358 /* Safety net */
a0377dfe
FD
359 LTTNG_ASSERT(channel);
360 LTTNG_ASSERT(ksession);
361 LTTNG_ASSERT(ksession->consumer);
362 LTTNG_ASSERT(sock);
00e2e675 363
e098433c
JG
364 rcu_read_lock();
365
00e2e675 366 /* Bail out if consumer is disabled */
e098433c 367 if (!ksession->consumer->enabled) {
f73fabfd 368 ret = LTTNG_OK;
00e2e675
DG
369 goto error;
370 }
f1e16794 371
28ab034a 372 DBG("Sending streams of channel %s to kernel consumer", channel->channel->name);
f1e16794 373
e99f9447 374 if (!channel->sent_to_consumer) {
e098433c 375 ret = kernel_consumer_add_channel(sock, channel, ksession, monitor);
e99f9447
MD
376 if (ret < 0) {
377 goto error;
378 }
379 channel->sent_to_consumer = true;
f1e16794
DG
380 }
381
382 /* Send streams */
28ab034a 383 cds_list_for_each_entry (stream, &channel->stream_list.head, list) {
6986ab9b 384 if (!stream->fd || stream->sent_to_consumer) {
f1e16794
DG
385 continue;
386 }
00e2e675
DG
387
388 /* Add stream on the kernel consumer side. */
28ab034a 389 ret = kernel_consumer_add_stream(sock, channel, stream, ksession);
f1e16794 390 if (ret < 0) {
f1e16794
DG
391 goto error;
392 }
6986ab9b 393 stream->sent_to_consumer = true;
f1e16794
DG
394 }
395
f1e16794 396error:
e098433c 397 rcu_read_unlock();
f1e16794
DG
398 return ret;
399}
400
401/*
402 * Send all stream fds of the kernel session to the consumer.
9a318688
JG
403 *
404 * The consumer socket lock must be held by the caller.
f1e16794 405 */
28ab034a 406int kernel_consumer_send_session(struct consumer_socket *sock, struct ltt_kernel_session *session)
f1e16794 407{
2bba9e53 408 int ret, monitor = 0;
f1e16794 409 struct ltt_kernel_channel *chan;
f1e16794 410
00e2e675 411 /* Safety net */
a0377dfe
FD
412 LTTNG_ASSERT(session);
413 LTTNG_ASSERT(session->consumer);
414 LTTNG_ASSERT(sock);
f1e16794 415
00e2e675
DG
416 /* Bail out if consumer is disabled */
417 if (!session->consumer->enabled) {
f73fabfd 418 ret = LTTNG_OK;
00e2e675 419 goto error;
f1e16794
DG
420 }
421
2bba9e53
DG
422 /* Don't monitor the streams on the consumer if in flight recorder. */
423 if (session->output_traces) {
424 monitor = 1;
425 }
426
00e2e675
DG
427 DBG("Sending session stream to kernel consumer");
428
609af759 429 if (session->metadata_stream_fd >= 0 && session->metadata) {
2bba9e53 430 ret = kernel_consumer_add_metadata(sock, session, monitor);
f1e16794 431 if (ret < 0) {
f1e16794
DG
432 goto error;
433 }
f1e16794
DG
434 }
435
00e2e675 436 /* Send channel and streams of it */
28ab034a
JG
437 cds_list_for_each_entry (chan, &session->channel_list.head, list) {
438 ret = kernel_consumer_send_channel_streams(sock, chan, session, monitor);
f1e16794
DG
439 if (ret < 0) {
440 goto error;
441 }
601262d6
JD
442 if (monitor) {
443 /*
444 * Inform the relay that all the streams for the
445 * channel were sent.
446 */
e1f3997a 447 ret = kernel_consumer_streams_sent(sock, session, chan->key);
601262d6
JD
448 if (ret < 0) {
449 goto error;
450 }
451 }
f1e16794
DG
452 }
453
00e2e675 454 DBG("Kernel consumer FDs of metadata and channel streams sent");
f1e16794 455
4ce9ff51 456 session->consumer_fds_sent = 1;
f1e16794
DG
457 return 0;
458
459error:
460 return ret;
461}
07b86b52
JD
462
463int kernel_consumer_destroy_channel(struct consumer_socket *socket,
28ab034a 464 struct ltt_kernel_channel *channel)
07b86b52
JD
465{
466 int ret;
467 struct lttcomm_consumer_msg msg;
468
a0377dfe
FD
469 LTTNG_ASSERT(channel);
470 LTTNG_ASSERT(socket);
07b86b52 471
e1f3997a 472 DBG("Sending kernel consumer destroy channel key %" PRIu64, channel->key);
07b86b52 473
53efb85a 474 memset(&msg, 0, sizeof(msg));
07b86b52 475 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
e1f3997a 476 msg.u.destroy_channel.key = channel->key;
07b86b52
JD
477
478 pthread_mutex_lock(socket->lock);
479 health_code_update();
480
481 ret = consumer_send_msg(socket, &msg);
482 if (ret < 0) {
483 goto error;
484 }
485
486error:
487 health_code_update();
488 pthread_mutex_unlock(socket->lock);
489 return ret;
490}
491
492int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
28ab034a 493 struct ltt_kernel_metadata *metadata)
07b86b52
JD
494{
495 int ret;
496 struct lttcomm_consumer_msg msg;
497
a0377dfe
FD
498 LTTNG_ASSERT(metadata);
499 LTTNG_ASSERT(socket);
07b86b52 500
d40f0359 501 DBG("Sending kernel consumer destroy channel key %" PRIu64, metadata->key);
07b86b52 502
53efb85a 503 memset(&msg, 0, sizeof(msg));
07b86b52 504 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
d40f0359 505 msg.u.destroy_channel.key = metadata->key;
07b86b52
JD
506
507 pthread_mutex_lock(socket->lock);
508 health_code_update();
509
510 ret = consumer_send_msg(socket, &msg);
511 if (ret < 0) {
512 goto error;
513 }
514
515error:
516 health_code_update();
517 pthread_mutex_unlock(socket->lock);
518 return ret;
519}
This page took 0.0966630000000001 seconds and 4 git commands to generate.