Generate session name and default output on sessiond's end
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.c
CommitLineData
f1e16794
DG
1/*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
6c1c0768 18#define _LGPL_SOURCE
f1e16794
DG
19#include <stdio.h>
20#include <stdlib.h>
f1e16794
DG
21#include <sys/stat.h>
22#include <unistd.h>
e1f3997a 23#include <inttypes.h>
f1e16794
DG
24
25#include <common/common.h>
26#include <common/defaults.h>
f5436bfc 27#include <common/compat/string.h>
f1e16794 28
00e2e675 29#include "consumer.h"
8782cc74 30#include "health-sessiond.h"
f1e16794 31#include "kernel-consumer.h"
e9404c27
JG
32#include "notification-thread-commands.h"
33#include "session.h"
34#include "lttng-sessiond.h"
f1e16794 35
2bba9e53
DG
36static char *create_channel_path(struct consumer_output *consumer,
37 uid_t uid, gid_t gid)
00e2e675
DG
38{
39 int ret;
ffe60014 40 char tmp_path[PATH_MAX];
2bba9e53 41 char *pathname = NULL;
00e2e675 42
2bba9e53 43 assert(consumer);
00e2e675 44
ffe60014
DG
45 /* Get the right path name destination */
46 if (consumer->type == CONSUMER_DST_LOCAL) {
47 /* Set application path to the destination path */
366a9222
JD
48 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s%s",
49 consumer->dst.session_root_path,
50 consumer->chunk_path,
b178f53e 51 consumer->domain_subdir);
ffe60014 52 if (ret < 0) {
2bba9e53 53 PERROR("snprintf kernel channel path");
ffe60014 54 goto error;
dba13f1d
JG
55 } else if (ret >= sizeof(tmp_path)) {
56 ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s%s\"",
57 sizeof(tmp_path), ret,
58 consumer->dst.session_root_path,
59 consumer->chunk_path,
b178f53e 60 consumer->domain_subdir);
dba13f1d 61 goto error;
ffe60014 62 }
f5436bfc 63 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
bb3c4e70 64 if (!pathname) {
f5436bfc 65 PERROR("lttng_strndup");
bb3c4e70
MD
66 goto error;
67 }
ffe60014
DG
68
69 /* Create directory */
2bba9e53 70 ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG, uid, gid);
ffe60014 71 if (ret < 0) {
df5b86c8 72 if (errno != EEXIST) {
ffe60014
DG
73 ERR("Trace directory creation error");
74 goto error;
75 }
76 }
77 DBG3("Kernel local consumer tracefile path: %s", pathname);
78 } else {
b178f53e 79 /* Network output. */
2b29c638
JD
80 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s",
81 consumer->dst.net.base_dir,
b178f53e 82 consumer->domain_subdir);
ffe60014 83 if (ret < 0) {
2bba9e53 84 PERROR("snprintf kernel metadata path");
ffe60014 85 goto error;
dba13f1d
JG
86 } else if (ret >= sizeof(tmp_path)) {
87 ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s\"",
88 sizeof(tmp_path), ret,
89 consumer->dst.net.base_dir,
b178f53e 90 consumer->domain_subdir);
dba13f1d 91 goto error;
ffe60014 92 }
f5436bfc 93 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
bb3c4e70 94 if (!pathname) {
f5436bfc 95 PERROR("lttng_strndup");
bb3c4e70
MD
96 goto error;
97 }
ffe60014
DG
98 DBG3("Kernel network consumer subdir path: %s", pathname);
99 }
100
2bba9e53
DG
101 return pathname;
102
103error:
104 free(pathname);
105 return NULL;
106}
107
108/*
109 * Sending a single channel to the consumer with command ADD_CHANNEL.
110 */
ba27cd00 111static
2bba9e53 112int kernel_consumer_add_channel(struct consumer_socket *sock,
e9404c27
JG
113 struct ltt_kernel_channel *channel,
114 struct ltt_kernel_session *ksession,
2bba9e53
DG
115 unsigned int monitor)
116{
117 int ret;
118 char *pathname;
119 struct lttcomm_consumer_msg lkm;
120 struct consumer_output *consumer;
e9404c27 121 enum lttng_error_code status;
e32d7f27 122 struct ltt_session *session = NULL;
e9404c27 123 struct lttng_channel_extended *channel_attr_extended;
2bba9e53
DG
124
125 /* Safety net */
126 assert(channel);
e9404c27
JG
127 assert(ksession);
128 assert(ksession->consumer);
2bba9e53 129
e9404c27
JG
130 consumer = ksession->consumer;
131 channel_attr_extended = (struct lttng_channel_extended *)
132 channel->channel->attr.extended.ptr;
2bba9e53
DG
133
134 DBG("Kernel consumer adding channel %s to kernel consumer",
135 channel->channel->name);
136
137 if (monitor) {
e9404c27
JG
138 pathname = create_channel_path(consumer, ksession->uid,
139 ksession->gid);
2bba9e53
DG
140 } else {
141 /* Empty path. */
53efb85a 142 pathname = strdup("");
2bba9e53 143 }
bb3c4e70
MD
144 if (!pathname) {
145 ret = -1;
146 goto error;
147 }
2bba9e53 148
00e2e675 149 /* Prep channel message structure */
638e7b4e 150 consumer_init_add_channel_comm_msg(&lkm,
e1f3997a 151 channel->key,
e9404c27 152 ksession->id,
ffe60014 153 pathname,
e9404c27
JG
154 ksession->uid,
155 ksession->gid,
ffe60014 156 consumer->net_seq_index,
c30aaa51 157 channel->channel->name,
ffe60014
DG
158 channel->stream_count,
159 channel->channel->attr.output,
1624d5b7
JD
160 CONSUMER_CHANNEL_TYPE_DATA,
161 channel->channel->attr.tracefile_size,
2bba9e53 162 channel->channel->attr.tracefile_count,
ecc48a90 163 monitor,
e9404c27
JG
164 channel->channel->attr.live_timer_interval,
165 channel_attr_extended->monitor_timer_interval);
00e2e675 166
840cb59c 167 health_code_update();
ca03de58 168
00e2e675
DG
169 ret = consumer_send_channel(sock, &lkm);
170 if (ret < 0) {
171 goto error;
172 }
173
840cb59c 174 health_code_update();
e9404c27
JG
175 rcu_read_lock();
176 session = session_find_by_id(ksession->id);
177 assert(session);
e098433c
JG
178 assert(pthread_mutex_trylock(&session->lock));
179 assert(session_trylock_list());
ca03de58 180
e9404c27
JG
181 status = notification_thread_command_add_channel(
182 notification_thread_handle, session->name,
183 ksession->uid, ksession->gid,
e1f3997a 184 channel->channel->name, channel->key,
e9404c27
JG
185 LTTNG_DOMAIN_KERNEL,
186 channel->channel->attr.subbuf_size * channel->channel->attr.num_subbuf);
187 rcu_read_unlock();
188 if (status != LTTNG_OK) {
189 ret = -1;
190 goto error;
191 }
753873bf
JR
192
193 channel->published_to_notification_thread = true;
194
00e2e675 195error:
e32d7f27
JG
196 if (session) {
197 session_put(session);
198 }
53efb85a 199 free(pathname);
00e2e675
DG
200 return ret;
201}
202
203/*
204 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
9a318688
JG
205 *
206 * The consumer socket lock must be held by the caller.
00e2e675 207 */
f50f23d9 208int kernel_consumer_add_metadata(struct consumer_socket *sock,
e098433c 209 struct ltt_kernel_session *ksession, unsigned int monitor)
00e2e675
DG
210{
211 int ret;
2bba9e53 212 char *pathname;
00e2e675 213 struct lttcomm_consumer_msg lkm;
a7d9a3e7 214 struct consumer_output *consumer;
e32d7f27 215 struct ltt_session *session = NULL;
e098433c
JG
216
217 rcu_read_lock();
00e2e675
DG
218
219 /* Safety net */
e098433c
JG
220 assert(ksession);
221 assert(ksession->consumer);
f50f23d9 222 assert(sock);
00e2e675 223
e098433c
JG
224 DBG("Sending metadata %d to kernel consumer",
225 ksession->metadata_stream_fd);
00e2e675
DG
226
227 /* Get consumer output pointer */
e098433c 228 consumer = ksession->consumer;
00e2e675 229
2bba9e53 230 if (monitor) {
e098433c
JG
231 pathname = create_channel_path(consumer,
232 ksession->uid, ksession->gid);
00e2e675 233 } else {
2bba9e53 234 /* Empty path. */
53efb85a 235 pathname = strdup("");
00e2e675 236 }
bb3c4e70
MD
237 if (!pathname) {
238 ret = -1;
239 goto error;
240 }
00e2e675 241
e098433c
JG
242 session = session_find_by_id(ksession->id);
243 assert(session);
244 assert(pthread_mutex_trylock(&session->lock));
245 assert(session_trylock_list());
246
00e2e675 247 /* Prep channel message structure */
638e7b4e 248 consumer_init_add_channel_comm_msg(&lkm,
e098433c
JG
249 ksession->metadata->key,
250 ksession->id,
ffe60014 251 pathname,
e098433c
JG
252 ksession->uid,
253 ksession->gid,
ffe60014 254 consumer->net_seq_index,
30079b6b 255 DEFAULT_METADATA_NAME,
ffe60014
DG
256 1,
257 DEFAULT_KERNEL_CHANNEL_OUTPUT,
1624d5b7 258 CONSUMER_CHANNEL_TYPE_METADATA,
2bba9e53 259 0, 0,
e9404c27 260 monitor, 0, 0);
00e2e675 261
840cb59c 262 health_code_update();
ca03de58 263
00e2e675
DG
264 ret = consumer_send_channel(sock, &lkm);
265 if (ret < 0) {
266 goto error;
267 }
268
840cb59c 269 health_code_update();
ca03de58 270
00e2e675 271 /* Prep stream message structure */
e098433c
JG
272 consumer_init_add_stream_comm_msg(&lkm,
273 ksession->metadata->key,
274 ksession->metadata_stream_fd,
275 0 /* CPU: 0 for metadata. */,
276 session->current_archive_id);
00e2e675 277
840cb59c 278 health_code_update();
ca03de58 279
00e2e675 280 /* Send stream and file descriptor */
a7d9a3e7 281 ret = consumer_send_stream(sock, consumer, &lkm,
e098433c 282 &ksession->metadata_stream_fd, 1);
00e2e675
DG
283 if (ret < 0) {
284 goto error;
285 }
286
840cb59c 287 health_code_update();
ca03de58 288
00e2e675 289error:
e098433c 290 rcu_read_unlock();
53efb85a 291 free(pathname);
e32d7f27
JG
292 if (session) {
293 session_put(session);
294 }
00e2e675
DG
295 return ret;
296}
297
298/*
299 * Sending a single stream to the consumer with command ADD_STREAM.
300 */
5b0e3ccb 301static
f50f23d9 302int kernel_consumer_add_stream(struct consumer_socket *sock,
e098433c
JG
303 struct ltt_kernel_channel *channel,
304 struct ltt_kernel_stream *stream,
305 struct ltt_kernel_session *session, unsigned int monitor,
306 uint64_t trace_archive_id)
00e2e675
DG
307{
308 int ret;
00e2e675 309 struct lttcomm_consumer_msg lkm;
a7d9a3e7 310 struct consumer_output *consumer;
00e2e675
DG
311
312 assert(channel);
313 assert(stream);
314 assert(session);
315 assert(session->consumer);
f50f23d9 316 assert(sock);
00e2e675
DG
317
318 DBG("Sending stream %d of channel %s to kernel consumer",
319 stream->fd, channel->channel->name);
320
321 /* Get consumer output pointer */
a7d9a3e7 322 consumer = session->consumer;
00e2e675 323
00e2e675 324 /* Prep stream consumer message */
e098433c 325 consumer_init_add_stream_comm_msg(&lkm,
e1f3997a 326 channel->key,
00e2e675 327 stream->fd,
e098433c
JG
328 stream->cpu,
329 trace_archive_id);
00e2e675 330
840cb59c 331 health_code_update();
ca03de58 332
00e2e675 333 /* Send stream and file descriptor */
a7d9a3e7 334 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
00e2e675
DG
335 if (ret < 0) {
336 goto error;
337 }
338
840cb59c 339 health_code_update();
ca03de58 340
00e2e675
DG
341error:
342 return ret;
343}
344
a4baae1b
JD
345/*
346 * Sending the notification that all streams were sent with STREAMS_SENT.
347 */
348int kernel_consumer_streams_sent(struct consumer_socket *sock,
349 struct ltt_kernel_session *session, uint64_t channel_key)
350{
351 int ret;
352 struct lttcomm_consumer_msg lkm;
353 struct consumer_output *consumer;
354
355 assert(sock);
356 assert(session);
357
358 DBG("Sending streams_sent");
359 /* Get consumer output pointer */
360 consumer = session->consumer;
361
362 /* Prep stream consumer message */
363 consumer_init_streams_sent_comm_msg(&lkm,
364 LTTNG_CONSUMER_STREAMS_SENT,
365 channel_key, consumer->net_seq_index);
366
367 health_code_update();
368
369 /* Send stream and file descriptor */
370 ret = consumer_send_msg(sock, &lkm);
371 if (ret < 0) {
372 goto error;
373 }
374
375error:
376 return ret;
377}
378
f1e16794
DG
379/*
380 * Send all stream fds of kernel channel to the consumer.
9a318688
JG
381 *
382 * The consumer socket lock must be held by the caller.
f1e16794 383 */
1fc1b7c8 384int kernel_consumer_send_channel_streams(struct consumer_socket *sock,
e098433c 385 struct ltt_kernel_channel *channel, struct ltt_kernel_session *ksession,
2bba9e53 386 unsigned int monitor)
f1e16794 387{
e99f9447 388 int ret = LTTNG_OK;
f1e16794 389 struct ltt_kernel_stream *stream;
e32d7f27 390 struct ltt_session *session = NULL;
00e2e675
DG
391
392 /* Safety net */
393 assert(channel);
e098433c
JG
394 assert(ksession);
395 assert(ksession->consumer);
f50f23d9 396 assert(sock);
00e2e675 397
e098433c
JG
398 rcu_read_lock();
399
400 session = session_find_by_id(ksession->id);
401 assert(session);
402 assert(pthread_mutex_trylock(&session->lock));
403 assert(session_trylock_list());
404
00e2e675 405 /* Bail out if consumer is disabled */
e098433c 406 if (!ksession->consumer->enabled) {
f73fabfd 407 ret = LTTNG_OK;
00e2e675
DG
408 goto error;
409 }
f1e16794
DG
410
411 DBG("Sending streams of channel %s to kernel consumer",
412 channel->channel->name);
413
e99f9447 414 if (!channel->sent_to_consumer) {
e098433c 415 ret = kernel_consumer_add_channel(sock, channel, ksession, monitor);
e99f9447
MD
416 if (ret < 0) {
417 goto error;
418 }
419 channel->sent_to_consumer = true;
f1e16794
DG
420 }
421
422 /* Send streams */
423 cds_list_for_each_entry(stream, &channel->stream_list.head, list) {
6986ab9b 424 if (!stream->fd || stream->sent_to_consumer) {
f1e16794
DG
425 continue;
426 }
00e2e675
DG
427
428 /* Add stream on the kernel consumer side. */
e098433c
JG
429 ret = kernel_consumer_add_stream(sock, channel, stream,
430 ksession, monitor, session->current_archive_id);
f1e16794 431 if (ret < 0) {
f1e16794
DG
432 goto error;
433 }
6986ab9b 434 stream->sent_to_consumer = true;
f1e16794
DG
435 }
436
f1e16794 437error:
e098433c 438 rcu_read_unlock();
e32d7f27
JG
439 if (session) {
440 session_put(session);
441 }
f1e16794
DG
442 return ret;
443}
444
445/*
446 * Send all stream fds of the kernel session to the consumer.
9a318688
JG
447 *
448 * The consumer socket lock must be held by the caller.
f1e16794 449 */
f50f23d9
DG
450int kernel_consumer_send_session(struct consumer_socket *sock,
451 struct ltt_kernel_session *session)
f1e16794 452{
2bba9e53 453 int ret, monitor = 0;
f1e16794 454 struct ltt_kernel_channel *chan;
f1e16794 455
00e2e675
DG
456 /* Safety net */
457 assert(session);
458 assert(session->consumer);
f50f23d9 459 assert(sock);
f1e16794 460
00e2e675
DG
461 /* Bail out if consumer is disabled */
462 if (!session->consumer->enabled) {
f73fabfd 463 ret = LTTNG_OK;
00e2e675 464 goto error;
f1e16794
DG
465 }
466
2bba9e53
DG
467 /* Don't monitor the streams on the consumer if in flight recorder. */
468 if (session->output_traces) {
469 monitor = 1;
470 }
471
00e2e675
DG
472 DBG("Sending session stream to kernel consumer");
473
609af759 474 if (session->metadata_stream_fd >= 0 && session->metadata) {
2bba9e53 475 ret = kernel_consumer_add_metadata(sock, session, monitor);
f1e16794 476 if (ret < 0) {
f1e16794
DG
477 goto error;
478 }
f1e16794
DG
479 }
480
00e2e675 481 /* Send channel and streams of it */
f1e16794 482 cds_list_for_each_entry(chan, &session->channel_list.head, list) {
1fc1b7c8 483 ret = kernel_consumer_send_channel_streams(sock, chan, session,
2bba9e53 484 monitor);
f1e16794
DG
485 if (ret < 0) {
486 goto error;
487 }
601262d6
JD
488 if (monitor) {
489 /*
490 * Inform the relay that all the streams for the
491 * channel were sent.
492 */
e1f3997a 493 ret = kernel_consumer_streams_sent(sock, session, chan->key);
601262d6
JD
494 if (ret < 0) {
495 goto error;
496 }
497 }
f1e16794
DG
498 }
499
00e2e675 500 DBG("Kernel consumer FDs of metadata and channel streams sent");
f1e16794 501
4ce9ff51 502 session->consumer_fds_sent = 1;
f1e16794
DG
503 return 0;
504
505error:
506 return ret;
507}
07b86b52
JD
508
509int kernel_consumer_destroy_channel(struct consumer_socket *socket,
510 struct ltt_kernel_channel *channel)
511{
512 int ret;
513 struct lttcomm_consumer_msg msg;
514
515 assert(channel);
516 assert(socket);
07b86b52 517
e1f3997a 518 DBG("Sending kernel consumer destroy channel key %" PRIu64, channel->key);
07b86b52 519
53efb85a 520 memset(&msg, 0, sizeof(msg));
07b86b52 521 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
e1f3997a 522 msg.u.destroy_channel.key = channel->key;
07b86b52
JD
523
524 pthread_mutex_lock(socket->lock);
525 health_code_update();
526
527 ret = consumer_send_msg(socket, &msg);
528 if (ret < 0) {
529 goto error;
530 }
531
532error:
533 health_code_update();
534 pthread_mutex_unlock(socket->lock);
535 return ret;
536}
537
538int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
539 struct ltt_kernel_metadata *metadata)
540{
541 int ret;
542 struct lttcomm_consumer_msg msg;
543
544 assert(metadata);
545 assert(socket);
07b86b52 546
d40f0359 547 DBG("Sending kernel consumer destroy channel key %" PRIu64, metadata->key);
07b86b52 548
53efb85a 549 memset(&msg, 0, sizeof(msg));
07b86b52 550 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
d40f0359 551 msg.u.destroy_channel.key = metadata->key;
07b86b52
JD
552
553 pthread_mutex_lock(socket->lock);
554 health_code_update();
555
556 ret = consumer_send_msg(socket, &msg);
557 if (ret < 0) {
558 goto error;
559 }
560
561error:
562 health_code_update();
563 pthread_mutex_unlock(socket->lock);
564 return ret;
565}
This page took 0.076041 seconds and 4 git commands to generate.