Create stream files relative to a stream's current trace chunk
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.c
CommitLineData
f1e16794
DG
1/*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
6c1c0768 18#define _LGPL_SOURCE
f1e16794
DG
19#include <stdio.h>
20#include <stdlib.h>
f1e16794
DG
21#include <sys/stat.h>
22#include <unistd.h>
e1f3997a 23#include <inttypes.h>
f1e16794
DG
24
25#include <common/common.h>
26#include <common/defaults.h>
f5436bfc 27#include <common/compat/string.h>
f1e16794 28
00e2e675 29#include "consumer.h"
8782cc74 30#include "health-sessiond.h"
f1e16794 31#include "kernel-consumer.h"
e9404c27
JG
32#include "notification-thread-commands.h"
33#include "session.h"
34#include "lttng-sessiond.h"
f1e16794 35
d2956687 36static char *create_channel_path(struct consumer_output *consumer)
00e2e675
DG
37{
38 int ret;
ffe60014 39 char tmp_path[PATH_MAX];
2bba9e53 40 char *pathname = NULL;
00e2e675 41
2bba9e53 42 assert(consumer);
00e2e675 43
ffe60014
DG
44 /* Get the right path name destination */
45 if (consumer->type == CONSUMER_DST_LOCAL) {
d2956687 46 pathname = strdup(consumer->domain_subdir);
bb3c4e70 47 if (!pathname) {
d2956687
JG
48 PERROR("Failed to copy domain subdirectory string %s",
49 consumer->domain_subdir);
bb3c4e70
MD
50 goto error;
51 }
d2956687
JG
52 DBG3("Kernel local consumer trace path relative to current trace chunk: \"%s\"",
53 pathname);
ffe60014 54 } else {
b178f53e 55 /* Network output. */
2b29c638
JD
56 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s",
57 consumer->dst.net.base_dir,
b178f53e 58 consumer->domain_subdir);
ffe60014 59 if (ret < 0) {
2bba9e53 60 PERROR("snprintf kernel metadata path");
ffe60014 61 goto error;
dba13f1d
JG
62 } else if (ret >= sizeof(tmp_path)) {
63 ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s\"",
64 sizeof(tmp_path), ret,
65 consumer->dst.net.base_dir,
b178f53e 66 consumer->domain_subdir);
dba13f1d 67 goto error;
ffe60014 68 }
f5436bfc 69 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
bb3c4e70 70 if (!pathname) {
f5436bfc 71 PERROR("lttng_strndup");
bb3c4e70
MD
72 goto error;
73 }
ffe60014
DG
74 DBG3("Kernel network consumer subdir path: %s", pathname);
75 }
76
2bba9e53
DG
77 return pathname;
78
79error:
80 free(pathname);
81 return NULL;
82}
83
84/*
85 * Sending a single channel to the consumer with command ADD_CHANNEL.
86 */
ba27cd00 87static
2bba9e53 88int kernel_consumer_add_channel(struct consumer_socket *sock,
e9404c27
JG
89 struct ltt_kernel_channel *channel,
90 struct ltt_kernel_session *ksession,
2bba9e53
DG
91 unsigned int monitor)
92{
93 int ret;
d2956687 94 char *pathname = NULL;
2bba9e53
DG
95 struct lttcomm_consumer_msg lkm;
96 struct consumer_output *consumer;
e9404c27 97 enum lttng_error_code status;
e32d7f27 98 struct ltt_session *session = NULL;
e9404c27 99 struct lttng_channel_extended *channel_attr_extended;
d2956687 100 bool is_local_trace;
2bba9e53
DG
101
102 /* Safety net */
103 assert(channel);
e9404c27
JG
104 assert(ksession);
105 assert(ksession->consumer);
2bba9e53 106
e9404c27
JG
107 consumer = ksession->consumer;
108 channel_attr_extended = (struct lttng_channel_extended *)
109 channel->channel->attr.extended.ptr;
2bba9e53
DG
110
111 DBG("Kernel consumer adding channel %s to kernel consumer",
112 channel->channel->name);
d2956687 113 is_local_trace = consumer->net_seq_index == -1ULL;
2bba9e53 114
d2956687 115 pathname = create_channel_path(consumer);
bb3c4e70
MD
116 if (!pathname) {
117 ret = -1;
118 goto error;
119 }
2bba9e53 120
d2956687
JG
121 if (is_local_trace && ksession->current_trace_chunk) {
122 enum lttng_trace_chunk_status chunk_status;
123 char *pathname_index;
124
125 ret = asprintf(&pathname_index, "%s/" DEFAULT_INDEX_DIR,
126 pathname);
127 if (ret < 0) {
128 ERR("Failed to format channel index directory");
129 ret = -1;
130 goto error;
131 }
132
133 /*
134 * Create the index subdirectory which will take care
135 * of implicitly creating the channel's path.
136 */
137 chunk_status = lttng_trace_chunk_create_subdirectory(
138 ksession->current_trace_chunk, pathname_index);
139 free(pathname_index);
140 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
141 ret = -1;
142 goto error;
143 }
144 }
145
00e2e675 146 /* Prep channel message structure */
638e7b4e 147 consumer_init_add_channel_comm_msg(&lkm,
e1f3997a 148 channel->key,
e9404c27 149 ksession->id,
ffe60014 150 pathname,
e9404c27
JG
151 ksession->uid,
152 ksession->gid,
ffe60014 153 consumer->net_seq_index,
c30aaa51 154 channel->channel->name,
ffe60014
DG
155 channel->stream_count,
156 channel->channel->attr.output,
1624d5b7
JD
157 CONSUMER_CHANNEL_TYPE_DATA,
158 channel->channel->attr.tracefile_size,
2bba9e53 159 channel->channel->attr.tracefile_count,
ecc48a90 160 monitor,
e9404c27 161 channel->channel->attr.live_timer_interval,
d2956687
JG
162 channel_attr_extended->monitor_timer_interval,
163 ksession->current_trace_chunk);
00e2e675 164
840cb59c 165 health_code_update();
ca03de58 166
00e2e675
DG
167 ret = consumer_send_channel(sock, &lkm);
168 if (ret < 0) {
169 goto error;
170 }
171
840cb59c 172 health_code_update();
e9404c27
JG
173 rcu_read_lock();
174 session = session_find_by_id(ksession->id);
175 assert(session);
e098433c
JG
176 assert(pthread_mutex_trylock(&session->lock));
177 assert(session_trylock_list());
ca03de58 178
e9404c27
JG
179 status = notification_thread_command_add_channel(
180 notification_thread_handle, session->name,
181 ksession->uid, ksession->gid,
e1f3997a 182 channel->channel->name, channel->key,
e9404c27
JG
183 LTTNG_DOMAIN_KERNEL,
184 channel->channel->attr.subbuf_size * channel->channel->attr.num_subbuf);
185 rcu_read_unlock();
186 if (status != LTTNG_OK) {
187 ret = -1;
188 goto error;
189 }
753873bf
JR
190
191 channel->published_to_notification_thread = true;
192
00e2e675 193error:
e32d7f27
JG
194 if (session) {
195 session_put(session);
196 }
53efb85a 197 free(pathname);
00e2e675
DG
198 return ret;
199}
200
201/*
202 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
9a318688
JG
203 *
204 * The consumer socket lock must be held by the caller.
00e2e675 205 */
f50f23d9 206int kernel_consumer_add_metadata(struct consumer_socket *sock,
e098433c 207 struct ltt_kernel_session *ksession, unsigned int monitor)
00e2e675
DG
208{
209 int ret;
00e2e675 210 struct lttcomm_consumer_msg lkm;
a7d9a3e7 211 struct consumer_output *consumer;
e098433c
JG
212
213 rcu_read_lock();
00e2e675
DG
214
215 /* Safety net */
e098433c
JG
216 assert(ksession);
217 assert(ksession->consumer);
f50f23d9 218 assert(sock);
00e2e675 219
e098433c
JG
220 DBG("Sending metadata %d to kernel consumer",
221 ksession->metadata_stream_fd);
00e2e675
DG
222
223 /* Get consumer output pointer */
e098433c 224 consumer = ksession->consumer;
00e2e675 225
00e2e675 226 /* Prep channel message structure */
638e7b4e 227 consumer_init_add_channel_comm_msg(&lkm,
e098433c
JG
228 ksession->metadata->key,
229 ksession->id,
d2956687 230 DEFAULT_KERNEL_TRACE_DIR,
e098433c
JG
231 ksession->uid,
232 ksession->gid,
ffe60014 233 consumer->net_seq_index,
30079b6b 234 DEFAULT_METADATA_NAME,
ffe60014
DG
235 1,
236 DEFAULT_KERNEL_CHANNEL_OUTPUT,
1624d5b7 237 CONSUMER_CHANNEL_TYPE_METADATA,
2bba9e53 238 0, 0,
d2956687 239 monitor, 0, 0, ksession->current_trace_chunk);
00e2e675 240
840cb59c 241 health_code_update();
ca03de58 242
00e2e675
DG
243 ret = consumer_send_channel(sock, &lkm);
244 if (ret < 0) {
245 goto error;
246 }
247
840cb59c 248 health_code_update();
ca03de58 249
00e2e675 250 /* Prep stream message structure */
e098433c
JG
251 consumer_init_add_stream_comm_msg(&lkm,
252 ksession->metadata->key,
253 ksession->metadata_stream_fd,
d2956687 254 0 /* CPU: 0 for metadata. */);
00e2e675 255
840cb59c 256 health_code_update();
ca03de58 257
00e2e675 258 /* Send stream and file descriptor */
a7d9a3e7 259 ret = consumer_send_stream(sock, consumer, &lkm,
e098433c 260 &ksession->metadata_stream_fd, 1);
00e2e675
DG
261 if (ret < 0) {
262 goto error;
263 }
264
840cb59c 265 health_code_update();
ca03de58 266
00e2e675 267error:
e098433c 268 rcu_read_unlock();
00e2e675
DG
269 return ret;
270}
271
272/*
273 * Sending a single stream to the consumer with command ADD_STREAM.
274 */
5b0e3ccb 275static
f50f23d9 276int kernel_consumer_add_stream(struct consumer_socket *sock,
e098433c
JG
277 struct ltt_kernel_channel *channel,
278 struct ltt_kernel_stream *stream,
d2956687 279 struct ltt_kernel_session *session, unsigned int monitor)
00e2e675
DG
280{
281 int ret;
00e2e675 282 struct lttcomm_consumer_msg lkm;
a7d9a3e7 283 struct consumer_output *consumer;
00e2e675
DG
284
285 assert(channel);
286 assert(stream);
287 assert(session);
288 assert(session->consumer);
f50f23d9 289 assert(sock);
00e2e675
DG
290
291 DBG("Sending stream %d of channel %s to kernel consumer",
292 stream->fd, channel->channel->name);
293
294 /* Get consumer output pointer */
a7d9a3e7 295 consumer = session->consumer;
00e2e675 296
00e2e675 297 /* Prep stream consumer message */
e098433c 298 consumer_init_add_stream_comm_msg(&lkm,
e1f3997a 299 channel->key,
00e2e675 300 stream->fd,
d2956687 301 stream->cpu);
00e2e675 302
840cb59c 303 health_code_update();
ca03de58 304
00e2e675 305 /* Send stream and file descriptor */
a7d9a3e7 306 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
00e2e675
DG
307 if (ret < 0) {
308 goto error;
309 }
310
840cb59c 311 health_code_update();
ca03de58 312
00e2e675
DG
313error:
314 return ret;
315}
316
a4baae1b
JD
317/*
318 * Sending the notification that all streams were sent with STREAMS_SENT.
319 */
320int kernel_consumer_streams_sent(struct consumer_socket *sock,
321 struct ltt_kernel_session *session, uint64_t channel_key)
322{
323 int ret;
324 struct lttcomm_consumer_msg lkm;
325 struct consumer_output *consumer;
326
327 assert(sock);
328 assert(session);
329
330 DBG("Sending streams_sent");
331 /* Get consumer output pointer */
332 consumer = session->consumer;
333
334 /* Prep stream consumer message */
335 consumer_init_streams_sent_comm_msg(&lkm,
336 LTTNG_CONSUMER_STREAMS_SENT,
337 channel_key, consumer->net_seq_index);
338
339 health_code_update();
340
341 /* Send stream and file descriptor */
342 ret = consumer_send_msg(sock, &lkm);
343 if (ret < 0) {
344 goto error;
345 }
346
347error:
348 return ret;
349}
350
f1e16794
DG
351/*
352 * Send all stream fds of kernel channel to the consumer.
9a318688
JG
353 *
354 * The consumer socket lock must be held by the caller.
f1e16794 355 */
1fc1b7c8 356int kernel_consumer_send_channel_streams(struct consumer_socket *sock,
e098433c 357 struct ltt_kernel_channel *channel, struct ltt_kernel_session *ksession,
2bba9e53 358 unsigned int monitor)
f1e16794 359{
e99f9447 360 int ret = LTTNG_OK;
f1e16794 361 struct ltt_kernel_stream *stream;
00e2e675
DG
362
363 /* Safety net */
364 assert(channel);
e098433c
JG
365 assert(ksession);
366 assert(ksession->consumer);
f50f23d9 367 assert(sock);
00e2e675 368
e098433c
JG
369 rcu_read_lock();
370
00e2e675 371 /* Bail out if consumer is disabled */
e098433c 372 if (!ksession->consumer->enabled) {
f73fabfd 373 ret = LTTNG_OK;
00e2e675
DG
374 goto error;
375 }
f1e16794
DG
376
377 DBG("Sending streams of channel %s to kernel consumer",
378 channel->channel->name);
379
e99f9447 380 if (!channel->sent_to_consumer) {
e098433c 381 ret = kernel_consumer_add_channel(sock, channel, ksession, monitor);
e99f9447
MD
382 if (ret < 0) {
383 goto error;
384 }
385 channel->sent_to_consumer = true;
f1e16794
DG
386 }
387
388 /* Send streams */
389 cds_list_for_each_entry(stream, &channel->stream_list.head, list) {
6986ab9b 390 if (!stream->fd || stream->sent_to_consumer) {
f1e16794
DG
391 continue;
392 }
00e2e675
DG
393
394 /* Add stream on the kernel consumer side. */
e098433c 395 ret = kernel_consumer_add_stream(sock, channel, stream,
d2956687 396 ksession, monitor);
f1e16794 397 if (ret < 0) {
f1e16794
DG
398 goto error;
399 }
6986ab9b 400 stream->sent_to_consumer = true;
f1e16794
DG
401 }
402
f1e16794 403error:
e098433c 404 rcu_read_unlock();
f1e16794
DG
405 return ret;
406}
407
408/*
409 * Send all stream fds of the kernel session to the consumer.
9a318688
JG
410 *
411 * The consumer socket lock must be held by the caller.
f1e16794 412 */
f50f23d9
DG
413int kernel_consumer_send_session(struct consumer_socket *sock,
414 struct ltt_kernel_session *session)
f1e16794 415{
2bba9e53 416 int ret, monitor = 0;
f1e16794 417 struct ltt_kernel_channel *chan;
f1e16794 418
00e2e675
DG
419 /* Safety net */
420 assert(session);
421 assert(session->consumer);
f50f23d9 422 assert(sock);
f1e16794 423
00e2e675
DG
424 /* Bail out if consumer is disabled */
425 if (!session->consumer->enabled) {
f73fabfd 426 ret = LTTNG_OK;
00e2e675 427 goto error;
f1e16794
DG
428 }
429
2bba9e53
DG
430 /* Don't monitor the streams on the consumer if in flight recorder. */
431 if (session->output_traces) {
432 monitor = 1;
433 }
434
00e2e675
DG
435 DBG("Sending session stream to kernel consumer");
436
609af759 437 if (session->metadata_stream_fd >= 0 && session->metadata) {
2bba9e53 438 ret = kernel_consumer_add_metadata(sock, session, monitor);
f1e16794 439 if (ret < 0) {
f1e16794
DG
440 goto error;
441 }
f1e16794
DG
442 }
443
00e2e675 444 /* Send channel and streams of it */
f1e16794 445 cds_list_for_each_entry(chan, &session->channel_list.head, list) {
1fc1b7c8 446 ret = kernel_consumer_send_channel_streams(sock, chan, session,
2bba9e53 447 monitor);
f1e16794
DG
448 if (ret < 0) {
449 goto error;
450 }
601262d6
JD
451 if (monitor) {
452 /*
453 * Inform the relay that all the streams for the
454 * channel were sent.
455 */
e1f3997a 456 ret = kernel_consumer_streams_sent(sock, session, chan->key);
601262d6
JD
457 if (ret < 0) {
458 goto error;
459 }
460 }
f1e16794
DG
461 }
462
00e2e675 463 DBG("Kernel consumer FDs of metadata and channel streams sent");
f1e16794 464
4ce9ff51 465 session->consumer_fds_sent = 1;
f1e16794
DG
466 return 0;
467
468error:
469 return ret;
470}
07b86b52
JD
471
472int kernel_consumer_destroy_channel(struct consumer_socket *socket,
473 struct ltt_kernel_channel *channel)
474{
475 int ret;
476 struct lttcomm_consumer_msg msg;
477
478 assert(channel);
479 assert(socket);
07b86b52 480
e1f3997a 481 DBG("Sending kernel consumer destroy channel key %" PRIu64, channel->key);
07b86b52 482
53efb85a 483 memset(&msg, 0, sizeof(msg));
07b86b52 484 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
e1f3997a 485 msg.u.destroy_channel.key = channel->key;
07b86b52
JD
486
487 pthread_mutex_lock(socket->lock);
488 health_code_update();
489
490 ret = consumer_send_msg(socket, &msg);
491 if (ret < 0) {
492 goto error;
493 }
494
495error:
496 health_code_update();
497 pthread_mutex_unlock(socket->lock);
498 return ret;
499}
500
501int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
502 struct ltt_kernel_metadata *metadata)
503{
504 int ret;
505 struct lttcomm_consumer_msg msg;
506
507 assert(metadata);
508 assert(socket);
07b86b52 509
d40f0359 510 DBG("Sending kernel consumer destroy channel key %" PRIu64, metadata->key);
07b86b52 511
53efb85a 512 memset(&msg, 0, sizeof(msg));
07b86b52 513 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
d40f0359 514 msg.u.destroy_channel.key = metadata->key;
07b86b52
JD
515
516 pthread_mutex_lock(socket->lock);
517 health_code_update();
518
519 ret = consumer_send_msg(socket, &msg);
520 if (ret < 0) {
521 goto error;
522 }
523
524error:
525 health_code_update();
526 pthread_mutex_unlock(socket->lock);
527 return ret;
528}
This page took 0.07392 seconds and 4 git commands to generate.