consumerd: send a buffer static sample on flush command
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.cpp
CommitLineData
f1e16794 1/*
ab5be9fa 2 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
f1e16794 3 *
ab5be9fa 4 * SPDX-License-Identifier: GPL-2.0-only
f1e16794 5 *
f1e16794
DG
6 */
7
6c1c0768 8#define _LGPL_SOURCE
f1e16794
DG
9#include <stdio.h>
10#include <stdlib.h>
f1e16794
DG
11#include <sys/stat.h>
12#include <unistd.h>
e1f3997a 13#include <inttypes.h>
f1e16794 14
c9e313bc
SM
15#include <common/common.hpp>
16#include <common/defaults.hpp>
17#include <common/compat/string.hpp>
f1e16794 18
c9e313bc
SM
19#include "consumer.hpp"
20#include "health-sessiond.hpp"
21#include "kernel-consumer.hpp"
22#include "notification-thread-commands.hpp"
23#include "session.hpp"
24#include "lttng-sessiond.hpp"
f1e16794 25
5da88b0f
MD
26static char *create_channel_path(struct consumer_output *consumer,
27 size_t *consumer_path_offset)
00e2e675
DG
28{
29 int ret;
ffe60014 30 char tmp_path[PATH_MAX];
2bba9e53 31 char *pathname = NULL;
00e2e675 32
a0377dfe 33 LTTNG_ASSERT(consumer);
00e2e675 34
ffe60014 35 /* Get the right path name destination */
a5ba6fdd
JG
36 if (consumer->type == CONSUMER_DST_LOCAL ||
37 (consumer->type == CONSUMER_DST_NET &&
38 consumer->relay_major_version == 2 &&
39 consumer->relay_minor_version >= 11)) {
d2956687 40 pathname = strdup(consumer->domain_subdir);
bb3c4e70 41 if (!pathname) {
d2956687
JG
42 PERROR("Failed to copy domain subdirectory string %s",
43 consumer->domain_subdir);
bb3c4e70
MD
44 goto error;
45 }
5da88b0f 46 *consumer_path_offset = strlen(consumer->domain_subdir);
d2956687
JG
47 DBG3("Kernel local consumer trace path relative to current trace chunk: \"%s\"",
48 pathname);
ffe60014 49 } else {
5da88b0f 50 /* Network output, relayd < 2.11. */
2b29c638
JD
51 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s",
52 consumer->dst.net.base_dir,
b178f53e 53 consumer->domain_subdir);
ffe60014 54 if (ret < 0) {
2bba9e53 55 PERROR("snprintf kernel metadata path");
ffe60014 56 goto error;
dba13f1d
JG
57 } else if (ret >= sizeof(tmp_path)) {
58 ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s\"",
59 sizeof(tmp_path), ret,
60 consumer->dst.net.base_dir,
b178f53e 61 consumer->domain_subdir);
dba13f1d 62 goto error;
ffe60014 63 }
f5436bfc 64 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
bb3c4e70 65 if (!pathname) {
f5436bfc 66 PERROR("lttng_strndup");
bb3c4e70
MD
67 goto error;
68 }
5da88b0f 69 *consumer_path_offset = 0;
ffe60014
DG
70 DBG3("Kernel network consumer subdir path: %s", pathname);
71 }
72
2bba9e53
DG
73 return pathname;
74
75error:
76 free(pathname);
77 return NULL;
78}
79
80/*
81 * Sending a single channel to the consumer with command ADD_CHANNEL.
82 */
ba27cd00 83static
2bba9e53 84int kernel_consumer_add_channel(struct consumer_socket *sock,
e9404c27
JG
85 struct ltt_kernel_channel *channel,
86 struct ltt_kernel_session *ksession,
2bba9e53
DG
87 unsigned int monitor)
88{
89 int ret;
d2956687 90 char *pathname = NULL;
2bba9e53
DG
91 struct lttcomm_consumer_msg lkm;
92 struct consumer_output *consumer;
e9404c27 93 enum lttng_error_code status;
e32d7f27 94 struct ltt_session *session = NULL;
e9404c27 95 struct lttng_channel_extended *channel_attr_extended;
d2956687 96 bool is_local_trace;
5da88b0f 97 size_t consumer_path_offset = 0;
2bba9e53
DG
98
99 /* Safety net */
a0377dfe
FD
100 LTTNG_ASSERT(channel);
101 LTTNG_ASSERT(ksession);
102 LTTNG_ASSERT(ksession->consumer);
2bba9e53 103
e9404c27
JG
104 consumer = ksession->consumer;
105 channel_attr_extended = (struct lttng_channel_extended *)
106 channel->channel->attr.extended.ptr;
2bba9e53
DG
107
108 DBG("Kernel consumer adding channel %s to kernel consumer",
109 channel->channel->name);
d2956687 110 is_local_trace = consumer->net_seq_index == -1ULL;
2bba9e53 111
5da88b0f 112 pathname = create_channel_path(consumer, &consumer_path_offset);
bb3c4e70
MD
113 if (!pathname) {
114 ret = -1;
115 goto error;
116 }
2bba9e53 117
d2956687
JG
118 if (is_local_trace && ksession->current_trace_chunk) {
119 enum lttng_trace_chunk_status chunk_status;
120 char *pathname_index;
121
122 ret = asprintf(&pathname_index, "%s/" DEFAULT_INDEX_DIR,
123 pathname);
124 if (ret < 0) {
125 ERR("Failed to format channel index directory");
126 ret = -1;
127 goto error;
128 }
129
130 /*
131 * Create the index subdirectory which will take care
132 * of implicitly creating the channel's path.
133 */
134 chunk_status = lttng_trace_chunk_create_subdirectory(
135 ksession->current_trace_chunk, pathname_index);
136 free(pathname_index);
137 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
138 ret = -1;
139 goto error;
140 }
141 }
142
00e2e675 143 /* Prep channel message structure */
638e7b4e 144 consumer_init_add_channel_comm_msg(&lkm,
e1f3997a 145 channel->key,
e9404c27 146 ksession->id,
5da88b0f 147 &pathname[consumer_path_offset],
ffe60014 148 consumer->net_seq_index,
c30aaa51 149 channel->channel->name,
ffe60014
DG
150 channel->stream_count,
151 channel->channel->attr.output,
1624d5b7
JD
152 CONSUMER_CHANNEL_TYPE_DATA,
153 channel->channel->attr.tracefile_size,
2bba9e53 154 channel->channel->attr.tracefile_count,
ecc48a90 155 monitor,
e9404c27 156 channel->channel->attr.live_timer_interval,
a2814ea7 157 ksession->is_live_session,
d2956687
JG
158 channel_attr_extended->monitor_timer_interval,
159 ksession->current_trace_chunk);
00e2e675 160
840cb59c 161 health_code_update();
ca03de58 162
00e2e675
DG
163 ret = consumer_send_channel(sock, &lkm);
164 if (ret < 0) {
165 goto error;
166 }
167
840cb59c 168 health_code_update();
e9404c27
JG
169 rcu_read_lock();
170 session = session_find_by_id(ksession->id);
a0377dfe 171 LTTNG_ASSERT(session);
3130a40c
JG
172 ASSERT_LOCKED(session->lock);
173 ASSERT_SESSION_LIST_LOCKED();
ca03de58 174
e9404c27 175 status = notification_thread_command_add_channel(
412d7227
SM
176 the_notification_thread_handle, session->name,
177 ksession->uid, ksession->gid, channel->channel->name,
178 channel->key, LTTNG_DOMAIN_KERNEL,
179 channel->channel->attr.subbuf_size *
180 channel->channel->attr.num_subbuf);
e9404c27
JG
181 rcu_read_unlock();
182 if (status != LTTNG_OK) {
183 ret = -1;
184 goto error;
185 }
753873bf
JR
186
187 channel->published_to_notification_thread = true;
188
00e2e675 189error:
e32d7f27
JG
190 if (session) {
191 session_put(session);
192 }
53efb85a 193 free(pathname);
00e2e675
DG
194 return ret;
195}
196
197/*
198 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
9a318688
JG
199 *
200 * The consumer socket lock must be held by the caller.
00e2e675 201 */
f50f23d9 202int kernel_consumer_add_metadata(struct consumer_socket *sock,
e098433c 203 struct ltt_kernel_session *ksession, unsigned int monitor)
00e2e675
DG
204{
205 int ret;
00e2e675 206 struct lttcomm_consumer_msg lkm;
a7d9a3e7 207 struct consumer_output *consumer;
e098433c
JG
208
209 rcu_read_lock();
00e2e675
DG
210
211 /* Safety net */
a0377dfe
FD
212 LTTNG_ASSERT(ksession);
213 LTTNG_ASSERT(ksession->consumer);
214 LTTNG_ASSERT(sock);
00e2e675 215
e098433c
JG
216 DBG("Sending metadata %d to kernel consumer",
217 ksession->metadata_stream_fd);
00e2e675
DG
218
219 /* Get consumer output pointer */
e098433c 220 consumer = ksession->consumer;
00e2e675 221
00e2e675 222 /* Prep channel message structure */
d42266a4
JG
223 consumer_init_add_channel_comm_msg(&lkm,
224 ksession->metadata->key,
225 ksession->id,
226 "",
d42266a4
JG
227 consumer->net_seq_index,
228 ksession->metadata->conf->name,
229 1,
230 ksession->metadata->conf->attr.output,
231 CONSUMER_CHANNEL_TYPE_METADATA,
232 ksession->metadata->conf->attr.tracefile_size,
233 ksession->metadata->conf->attr.tracefile_count,
234 monitor,
235 ksession->metadata->conf->attr.live_timer_interval,
236 ksession->is_live_session,
237 0,
a2814ea7 238 ksession->current_trace_chunk);
00e2e675 239
840cb59c 240 health_code_update();
ca03de58 241
00e2e675
DG
242 ret = consumer_send_channel(sock, &lkm);
243 if (ret < 0) {
244 goto error;
245 }
246
840cb59c 247 health_code_update();
ca03de58 248
00e2e675 249 /* Prep stream message structure */
e098433c
JG
250 consumer_init_add_stream_comm_msg(&lkm,
251 ksession->metadata->key,
252 ksession->metadata_stream_fd,
d2956687 253 0 /* CPU: 0 for metadata. */);
00e2e675 254
840cb59c 255 health_code_update();
ca03de58 256
00e2e675 257 /* Send stream and file descriptor */
a7d9a3e7 258 ret = consumer_send_stream(sock, consumer, &lkm,
e098433c 259 &ksession->metadata_stream_fd, 1);
00e2e675
DG
260 if (ret < 0) {
261 goto error;
262 }
263
840cb59c 264 health_code_update();
ca03de58 265
00e2e675 266error:
e098433c 267 rcu_read_unlock();
00e2e675
DG
268 return ret;
269}
270
271/*
272 * Sending a single stream to the consumer with command ADD_STREAM.
273 */
5b0e3ccb 274static
f50f23d9 275int kernel_consumer_add_stream(struct consumer_socket *sock,
e098433c
JG
276 struct ltt_kernel_channel *channel,
277 struct ltt_kernel_stream *stream,
f46376a1 278 struct ltt_kernel_session *session)
00e2e675
DG
279{
280 int ret;
00e2e675 281 struct lttcomm_consumer_msg lkm;
a7d9a3e7 282 struct consumer_output *consumer;
00e2e675 283
a0377dfe
FD
284 LTTNG_ASSERT(channel);
285 LTTNG_ASSERT(stream);
286 LTTNG_ASSERT(session);
287 LTTNG_ASSERT(session->consumer);
288 LTTNG_ASSERT(sock);
00e2e675
DG
289
290 DBG("Sending stream %d of channel %s to kernel consumer",
291 stream->fd, channel->channel->name);
292
293 /* Get consumer output pointer */
a7d9a3e7 294 consumer = session->consumer;
00e2e675 295
00e2e675 296 /* Prep stream consumer message */
e098433c 297 consumer_init_add_stream_comm_msg(&lkm,
e1f3997a 298 channel->key,
00e2e675 299 stream->fd,
d2956687 300 stream->cpu);
00e2e675 301
840cb59c 302 health_code_update();
ca03de58 303
00e2e675 304 /* Send stream and file descriptor */
a7d9a3e7 305 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
00e2e675
DG
306 if (ret < 0) {
307 goto error;
308 }
309
840cb59c 310 health_code_update();
ca03de58 311
00e2e675
DG
312error:
313 return ret;
314}
315
a4baae1b
JD
316/*
317 * Sending the notification that all streams were sent with STREAMS_SENT.
318 */
319int kernel_consumer_streams_sent(struct consumer_socket *sock,
320 struct ltt_kernel_session *session, uint64_t channel_key)
321{
322 int ret;
323 struct lttcomm_consumer_msg lkm;
324 struct consumer_output *consumer;
325
a0377dfe
FD
326 LTTNG_ASSERT(sock);
327 LTTNG_ASSERT(session);
a4baae1b
JD
328
329 DBG("Sending streams_sent");
330 /* Get consumer output pointer */
331 consumer = session->consumer;
332
333 /* Prep stream consumer message */
334 consumer_init_streams_sent_comm_msg(&lkm,
335 LTTNG_CONSUMER_STREAMS_SENT,
336 channel_key, consumer->net_seq_index);
337
338 health_code_update();
339
340 /* Send stream and file descriptor */
341 ret = consumer_send_msg(sock, &lkm);
342 if (ret < 0) {
343 goto error;
344 }
345
346error:
347 return ret;
348}
349
f1e16794
DG
350/*
351 * Send all stream fds of kernel channel to the consumer.
9a318688
JG
352 *
353 * The consumer socket lock must be held by the caller.
f1e16794 354 */
1fc1b7c8 355int kernel_consumer_send_channel_streams(struct consumer_socket *sock,
e098433c 356 struct ltt_kernel_channel *channel, struct ltt_kernel_session *ksession,
2bba9e53 357 unsigned int monitor)
f1e16794 358{
e99f9447 359 int ret = LTTNG_OK;
f1e16794 360 struct ltt_kernel_stream *stream;
00e2e675
DG
361
362 /* Safety net */
a0377dfe
FD
363 LTTNG_ASSERT(channel);
364 LTTNG_ASSERT(ksession);
365 LTTNG_ASSERT(ksession->consumer);
366 LTTNG_ASSERT(sock);
00e2e675 367
e098433c
JG
368 rcu_read_lock();
369
00e2e675 370 /* Bail out if consumer is disabled */
e098433c 371 if (!ksession->consumer->enabled) {
f73fabfd 372 ret = LTTNG_OK;
00e2e675
DG
373 goto error;
374 }
f1e16794
DG
375
376 DBG("Sending streams of channel %s to kernel consumer",
377 channel->channel->name);
378
e99f9447 379 if (!channel->sent_to_consumer) {
e098433c 380 ret = kernel_consumer_add_channel(sock, channel, ksession, monitor);
e99f9447
MD
381 if (ret < 0) {
382 goto error;
383 }
384 channel->sent_to_consumer = true;
f1e16794
DG
385 }
386
387 /* Send streams */
388 cds_list_for_each_entry(stream, &channel->stream_list.head, list) {
6986ab9b 389 if (!stream->fd || stream->sent_to_consumer) {
f1e16794
DG
390 continue;
391 }
00e2e675
DG
392
393 /* Add stream on the kernel consumer side. */
e098433c 394 ret = kernel_consumer_add_stream(sock, channel, stream,
f46376a1 395 ksession);
f1e16794 396 if (ret < 0) {
f1e16794
DG
397 goto error;
398 }
6986ab9b 399 stream->sent_to_consumer = true;
f1e16794
DG
400 }
401
f1e16794 402error:
e098433c 403 rcu_read_unlock();
f1e16794
DG
404 return ret;
405}
406
407/*
408 * Send all stream fds of the kernel session to the consumer.
9a318688
JG
409 *
410 * The consumer socket lock must be held by the caller.
f1e16794 411 */
f50f23d9
DG
412int kernel_consumer_send_session(struct consumer_socket *sock,
413 struct ltt_kernel_session *session)
f1e16794 414{
2bba9e53 415 int ret, monitor = 0;
f1e16794 416 struct ltt_kernel_channel *chan;
f1e16794 417
00e2e675 418 /* Safety net */
a0377dfe
FD
419 LTTNG_ASSERT(session);
420 LTTNG_ASSERT(session->consumer);
421 LTTNG_ASSERT(sock);
f1e16794 422
00e2e675
DG
423 /* Bail out if consumer is disabled */
424 if (!session->consumer->enabled) {
f73fabfd 425 ret = LTTNG_OK;
00e2e675 426 goto error;
f1e16794
DG
427 }
428
2bba9e53
DG
429 /* Don't monitor the streams on the consumer if in flight recorder. */
430 if (session->output_traces) {
431 monitor = 1;
432 }
433
00e2e675
DG
434 DBG("Sending session stream to kernel consumer");
435
609af759 436 if (session->metadata_stream_fd >= 0 && session->metadata) {
2bba9e53 437 ret = kernel_consumer_add_metadata(sock, session, monitor);
f1e16794 438 if (ret < 0) {
f1e16794
DG
439 goto error;
440 }
f1e16794
DG
441 }
442
00e2e675 443 /* Send channel and streams of it */
f1e16794 444 cds_list_for_each_entry(chan, &session->channel_list.head, list) {
1fc1b7c8 445 ret = kernel_consumer_send_channel_streams(sock, chan, session,
2bba9e53 446 monitor);
f1e16794
DG
447 if (ret < 0) {
448 goto error;
449 }
601262d6
JD
450 if (monitor) {
451 /*
452 * Inform the relay that all the streams for the
453 * channel were sent.
454 */
e1f3997a 455 ret = kernel_consumer_streams_sent(sock, session, chan->key);
601262d6
JD
456 if (ret < 0) {
457 goto error;
458 }
459 }
f1e16794
DG
460 }
461
00e2e675 462 DBG("Kernel consumer FDs of metadata and channel streams sent");
f1e16794 463
4ce9ff51 464 session->consumer_fds_sent = 1;
f1e16794
DG
465 return 0;
466
467error:
468 return ret;
469}
07b86b52
JD
470
471int kernel_consumer_destroy_channel(struct consumer_socket *socket,
472 struct ltt_kernel_channel *channel)
473{
474 int ret;
475 struct lttcomm_consumer_msg msg;
476
a0377dfe
FD
477 LTTNG_ASSERT(channel);
478 LTTNG_ASSERT(socket);
07b86b52 479
e1f3997a 480 DBG("Sending kernel consumer destroy channel key %" PRIu64, channel->key);
07b86b52 481
53efb85a 482 memset(&msg, 0, sizeof(msg));
07b86b52 483 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
e1f3997a 484 msg.u.destroy_channel.key = channel->key;
07b86b52
JD
485
486 pthread_mutex_lock(socket->lock);
487 health_code_update();
488
489 ret = consumer_send_msg(socket, &msg);
490 if (ret < 0) {
491 goto error;
492 }
493
494error:
495 health_code_update();
496 pthread_mutex_unlock(socket->lock);
497 return ret;
498}
499
500int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
501 struct ltt_kernel_metadata *metadata)
502{
503 int ret;
504 struct lttcomm_consumer_msg msg;
505
a0377dfe
FD
506 LTTNG_ASSERT(metadata);
507 LTTNG_ASSERT(socket);
07b86b52 508
d40f0359 509 DBG("Sending kernel consumer destroy channel key %" PRIu64, metadata->key);
07b86b52 510
53efb85a 511 memset(&msg, 0, sizeof(msg));
07b86b52 512 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
d40f0359 513 msg.u.destroy_channel.key = metadata->key;
07b86b52
JD
514
515 pthread_mutex_lock(socket->lock);
516 health_code_update();
517
518 ret = consumer_send_msg(socket, &msg);
519 if (ret < 0) {
520 goto error;
521 }
522
523error:
524 health_code_update();
525 pthread_mutex_unlock(socket->lock);
526 return ret;
527}
This page took 0.098533 seconds and 4 git commands to generate.