Fix: keep active session state on redundant start command
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.c
CommitLineData
f1e16794
DG
1/*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
6c1c0768 18#define _LGPL_SOURCE
f1e16794
DG
19#include <stdio.h>
20#include <stdlib.h>
f1e16794
DG
21#include <sys/stat.h>
22#include <unistd.h>
e1f3997a 23#include <inttypes.h>
f1e16794
DG
24
25#include <common/common.h>
26#include <common/defaults.h>
f5436bfc 27#include <common/compat/string.h>
f1e16794 28
00e2e675 29#include "consumer.h"
8782cc74 30#include "health-sessiond.h"
f1e16794 31#include "kernel-consumer.h"
e9404c27
JG
32#include "notification-thread-commands.h"
33#include "session.h"
34#include "lttng-sessiond.h"
f1e16794 35
5da88b0f
MD
36static char *create_channel_path(struct consumer_output *consumer,
37 size_t *consumer_path_offset)
00e2e675
DG
38{
39 int ret;
ffe60014 40 char tmp_path[PATH_MAX];
2bba9e53 41 char *pathname = NULL;
00e2e675 42
2bba9e53 43 assert(consumer);
00e2e675 44
ffe60014 45 /* Get the right path name destination */
a5ba6fdd
JG
46 if (consumer->type == CONSUMER_DST_LOCAL ||
47 (consumer->type == CONSUMER_DST_NET &&
48 consumer->relay_major_version == 2 &&
49 consumer->relay_minor_version >= 11)) {
d2956687 50 pathname = strdup(consumer->domain_subdir);
bb3c4e70 51 if (!pathname) {
d2956687
JG
52 PERROR("Failed to copy domain subdirectory string %s",
53 consumer->domain_subdir);
bb3c4e70
MD
54 goto error;
55 }
5da88b0f 56 *consumer_path_offset = strlen(consumer->domain_subdir);
d2956687
JG
57 DBG3("Kernel local consumer trace path relative to current trace chunk: \"%s\"",
58 pathname);
ffe60014 59 } else {
5da88b0f 60 /* Network output, relayd < 2.11. */
2b29c638
JD
61 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s",
62 consumer->dst.net.base_dir,
b178f53e 63 consumer->domain_subdir);
ffe60014 64 if (ret < 0) {
2bba9e53 65 PERROR("snprintf kernel metadata path");
ffe60014 66 goto error;
dba13f1d
JG
67 } else if (ret >= sizeof(tmp_path)) {
68 ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s\"",
69 sizeof(tmp_path), ret,
70 consumer->dst.net.base_dir,
b178f53e 71 consumer->domain_subdir);
dba13f1d 72 goto error;
ffe60014 73 }
f5436bfc 74 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
bb3c4e70 75 if (!pathname) {
f5436bfc 76 PERROR("lttng_strndup");
bb3c4e70
MD
77 goto error;
78 }
5da88b0f 79 *consumer_path_offset = 0;
ffe60014
DG
80 DBG3("Kernel network consumer subdir path: %s", pathname);
81 }
82
2bba9e53
DG
83 return pathname;
84
85error:
86 free(pathname);
87 return NULL;
88}
89
90/*
91 * Sending a single channel to the consumer with command ADD_CHANNEL.
92 */
ba27cd00 93static
2bba9e53 94int kernel_consumer_add_channel(struct consumer_socket *sock,
e9404c27
JG
95 struct ltt_kernel_channel *channel,
96 struct ltt_kernel_session *ksession,
2bba9e53
DG
97 unsigned int monitor)
98{
99 int ret;
d2956687 100 char *pathname = NULL;
2bba9e53
DG
101 struct lttcomm_consumer_msg lkm;
102 struct consumer_output *consumer;
e9404c27 103 enum lttng_error_code status;
e32d7f27 104 struct ltt_session *session = NULL;
e9404c27 105 struct lttng_channel_extended *channel_attr_extended;
d2956687 106 bool is_local_trace;
5da88b0f 107 size_t consumer_path_offset = 0;
2bba9e53
DG
108
109 /* Safety net */
110 assert(channel);
e9404c27
JG
111 assert(ksession);
112 assert(ksession->consumer);
2bba9e53 113
e9404c27
JG
114 consumer = ksession->consumer;
115 channel_attr_extended = (struct lttng_channel_extended *)
116 channel->channel->attr.extended.ptr;
2bba9e53
DG
117
118 DBG("Kernel consumer adding channel %s to kernel consumer",
119 channel->channel->name);
d2956687 120 is_local_trace = consumer->net_seq_index == -1ULL;
2bba9e53 121
5da88b0f 122 pathname = create_channel_path(consumer, &consumer_path_offset);
bb3c4e70
MD
123 if (!pathname) {
124 ret = -1;
125 goto error;
126 }
2bba9e53 127
d2956687
JG
128 if (is_local_trace && ksession->current_trace_chunk) {
129 enum lttng_trace_chunk_status chunk_status;
130 char *pathname_index;
131
132 ret = asprintf(&pathname_index, "%s/" DEFAULT_INDEX_DIR,
133 pathname);
134 if (ret < 0) {
135 ERR("Failed to format channel index directory");
136 ret = -1;
137 goto error;
138 }
139
140 /*
141 * Create the index subdirectory which will take care
142 * of implicitly creating the channel's path.
143 */
144 chunk_status = lttng_trace_chunk_create_subdirectory(
145 ksession->current_trace_chunk, pathname_index);
146 free(pathname_index);
147 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
148 ret = -1;
149 goto error;
150 }
151 }
152
00e2e675 153 /* Prep channel message structure */
638e7b4e 154 consumer_init_add_channel_comm_msg(&lkm,
e1f3997a 155 channel->key,
e9404c27 156 ksession->id,
5da88b0f 157 &pathname[consumer_path_offset],
e9404c27
JG
158 ksession->uid,
159 ksession->gid,
ffe60014 160 consumer->net_seq_index,
c30aaa51 161 channel->channel->name,
ffe60014
DG
162 channel->stream_count,
163 channel->channel->attr.output,
1624d5b7
JD
164 CONSUMER_CHANNEL_TYPE_DATA,
165 channel->channel->attr.tracefile_size,
2bba9e53 166 channel->channel->attr.tracefile_count,
ecc48a90 167 monitor,
e9404c27 168 channel->channel->attr.live_timer_interval,
d2956687
JG
169 channel_attr_extended->monitor_timer_interval,
170 ksession->current_trace_chunk);
00e2e675 171
840cb59c 172 health_code_update();
ca03de58 173
00e2e675
DG
174 ret = consumer_send_channel(sock, &lkm);
175 if (ret < 0) {
176 goto error;
177 }
178
840cb59c 179 health_code_update();
e9404c27
JG
180 rcu_read_lock();
181 session = session_find_by_id(ksession->id);
182 assert(session);
e098433c
JG
183 assert(pthread_mutex_trylock(&session->lock));
184 assert(session_trylock_list());
ca03de58 185
e9404c27
JG
186 status = notification_thread_command_add_channel(
187 notification_thread_handle, session->name,
188 ksession->uid, ksession->gid,
e1f3997a 189 channel->channel->name, channel->key,
e9404c27
JG
190 LTTNG_DOMAIN_KERNEL,
191 channel->channel->attr.subbuf_size * channel->channel->attr.num_subbuf);
192 rcu_read_unlock();
193 if (status != LTTNG_OK) {
194 ret = -1;
195 goto error;
196 }
753873bf
JR
197
198 channel->published_to_notification_thread = true;
199
00e2e675 200error:
e32d7f27
JG
201 if (session) {
202 session_put(session);
203 }
53efb85a 204 free(pathname);
00e2e675
DG
205 return ret;
206}
207
208/*
209 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
9a318688
JG
210 *
211 * The consumer socket lock must be held by the caller.
00e2e675 212 */
f50f23d9 213int kernel_consumer_add_metadata(struct consumer_socket *sock,
e098433c 214 struct ltt_kernel_session *ksession, unsigned int monitor)
00e2e675
DG
215{
216 int ret;
00e2e675 217 struct lttcomm_consumer_msg lkm;
a7d9a3e7 218 struct consumer_output *consumer;
e098433c
JG
219
220 rcu_read_lock();
00e2e675
DG
221
222 /* Safety net */
e098433c
JG
223 assert(ksession);
224 assert(ksession->consumer);
f50f23d9 225 assert(sock);
00e2e675 226
e098433c
JG
227 DBG("Sending metadata %d to kernel consumer",
228 ksession->metadata_stream_fd);
00e2e675
DG
229
230 /* Get consumer output pointer */
e098433c 231 consumer = ksession->consumer;
00e2e675 232
00e2e675 233 /* Prep channel message structure */
638e7b4e 234 consumer_init_add_channel_comm_msg(&lkm,
e098433c
JG
235 ksession->metadata->key,
236 ksession->id,
5da88b0f 237 "",
e098433c
JG
238 ksession->uid,
239 ksession->gid,
ffe60014 240 consumer->net_seq_index,
30079b6b 241 DEFAULT_METADATA_NAME,
ffe60014
DG
242 1,
243 DEFAULT_KERNEL_CHANNEL_OUTPUT,
1624d5b7 244 CONSUMER_CHANNEL_TYPE_METADATA,
2bba9e53 245 0, 0,
d2956687 246 monitor, 0, 0, ksession->current_trace_chunk);
00e2e675 247
840cb59c 248 health_code_update();
ca03de58 249
00e2e675
DG
250 ret = consumer_send_channel(sock, &lkm);
251 if (ret < 0) {
252 goto error;
253 }
254
840cb59c 255 health_code_update();
ca03de58 256
00e2e675 257 /* Prep stream message structure */
e098433c
JG
258 consumer_init_add_stream_comm_msg(&lkm,
259 ksession->metadata->key,
260 ksession->metadata_stream_fd,
d2956687 261 0 /* CPU: 0 for metadata. */);
00e2e675 262
840cb59c 263 health_code_update();
ca03de58 264
00e2e675 265 /* Send stream and file descriptor */
a7d9a3e7 266 ret = consumer_send_stream(sock, consumer, &lkm,
e098433c 267 &ksession->metadata_stream_fd, 1);
00e2e675
DG
268 if (ret < 0) {
269 goto error;
270 }
271
840cb59c 272 health_code_update();
ca03de58 273
00e2e675 274error:
e098433c 275 rcu_read_unlock();
00e2e675
DG
276 return ret;
277}
278
279/*
280 * Sending a single stream to the consumer with command ADD_STREAM.
281 */
5b0e3ccb 282static
f50f23d9 283int kernel_consumer_add_stream(struct consumer_socket *sock,
e098433c
JG
284 struct ltt_kernel_channel *channel,
285 struct ltt_kernel_stream *stream,
d2956687 286 struct ltt_kernel_session *session, unsigned int monitor)
00e2e675
DG
287{
288 int ret;
00e2e675 289 struct lttcomm_consumer_msg lkm;
a7d9a3e7 290 struct consumer_output *consumer;
00e2e675
DG
291
292 assert(channel);
293 assert(stream);
294 assert(session);
295 assert(session->consumer);
f50f23d9 296 assert(sock);
00e2e675
DG
297
298 DBG("Sending stream %d of channel %s to kernel consumer",
299 stream->fd, channel->channel->name);
300
301 /* Get consumer output pointer */
a7d9a3e7 302 consumer = session->consumer;
00e2e675 303
00e2e675 304 /* Prep stream consumer message */
e098433c 305 consumer_init_add_stream_comm_msg(&lkm,
e1f3997a 306 channel->key,
00e2e675 307 stream->fd,
d2956687 308 stream->cpu);
00e2e675 309
840cb59c 310 health_code_update();
ca03de58 311
00e2e675 312 /* Send stream and file descriptor */
a7d9a3e7 313 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
00e2e675
DG
314 if (ret < 0) {
315 goto error;
316 }
317
840cb59c 318 health_code_update();
ca03de58 319
00e2e675
DG
320error:
321 return ret;
322}
323
a4baae1b
JD
324/*
325 * Sending the notification that all streams were sent with STREAMS_SENT.
326 */
327int kernel_consumer_streams_sent(struct consumer_socket *sock,
328 struct ltt_kernel_session *session, uint64_t channel_key)
329{
330 int ret;
331 struct lttcomm_consumer_msg lkm;
332 struct consumer_output *consumer;
333
334 assert(sock);
335 assert(session);
336
337 DBG("Sending streams_sent");
338 /* Get consumer output pointer */
339 consumer = session->consumer;
340
341 /* Prep stream consumer message */
342 consumer_init_streams_sent_comm_msg(&lkm,
343 LTTNG_CONSUMER_STREAMS_SENT,
344 channel_key, consumer->net_seq_index);
345
346 health_code_update();
347
348 /* Send stream and file descriptor */
349 ret = consumer_send_msg(sock, &lkm);
350 if (ret < 0) {
351 goto error;
352 }
353
354error:
355 return ret;
356}
357
f1e16794
DG
358/*
359 * Send all stream fds of kernel channel to the consumer.
9a318688
JG
360 *
361 * The consumer socket lock must be held by the caller.
f1e16794 362 */
1fc1b7c8 363int kernel_consumer_send_channel_streams(struct consumer_socket *sock,
e098433c 364 struct ltt_kernel_channel *channel, struct ltt_kernel_session *ksession,
2bba9e53 365 unsigned int monitor)
f1e16794 366{
e99f9447 367 int ret = LTTNG_OK;
f1e16794 368 struct ltt_kernel_stream *stream;
00e2e675
DG
369
370 /* Safety net */
371 assert(channel);
e098433c
JG
372 assert(ksession);
373 assert(ksession->consumer);
f50f23d9 374 assert(sock);
00e2e675 375
e098433c
JG
376 rcu_read_lock();
377
00e2e675 378 /* Bail out if consumer is disabled */
e098433c 379 if (!ksession->consumer->enabled) {
f73fabfd 380 ret = LTTNG_OK;
00e2e675
DG
381 goto error;
382 }
f1e16794
DG
383
384 DBG("Sending streams of channel %s to kernel consumer",
385 channel->channel->name);
386
e99f9447 387 if (!channel->sent_to_consumer) {
e098433c 388 ret = kernel_consumer_add_channel(sock, channel, ksession, monitor);
e99f9447
MD
389 if (ret < 0) {
390 goto error;
391 }
392 channel->sent_to_consumer = true;
f1e16794
DG
393 }
394
395 /* Send streams */
396 cds_list_for_each_entry(stream, &channel->stream_list.head, list) {
6986ab9b 397 if (!stream->fd || stream->sent_to_consumer) {
f1e16794
DG
398 continue;
399 }
00e2e675
DG
400
401 /* Add stream on the kernel consumer side. */
e098433c 402 ret = kernel_consumer_add_stream(sock, channel, stream,
d2956687 403 ksession, monitor);
f1e16794 404 if (ret < 0) {
f1e16794
DG
405 goto error;
406 }
6986ab9b 407 stream->sent_to_consumer = true;
f1e16794
DG
408 }
409
f1e16794 410error:
e098433c 411 rcu_read_unlock();
f1e16794
DG
412 return ret;
413}
414
415/*
416 * Send all stream fds of the kernel session to the consumer.
9a318688
JG
417 *
418 * The consumer socket lock must be held by the caller.
f1e16794 419 */
f50f23d9
DG
420int kernel_consumer_send_session(struct consumer_socket *sock,
421 struct ltt_kernel_session *session)
f1e16794 422{
2bba9e53 423 int ret, monitor = 0;
f1e16794 424 struct ltt_kernel_channel *chan;
f1e16794 425
00e2e675
DG
426 /* Safety net */
427 assert(session);
428 assert(session->consumer);
f50f23d9 429 assert(sock);
f1e16794 430
00e2e675
DG
431 /* Bail out if consumer is disabled */
432 if (!session->consumer->enabled) {
f73fabfd 433 ret = LTTNG_OK;
00e2e675 434 goto error;
f1e16794
DG
435 }
436
2bba9e53
DG
437 /* Don't monitor the streams on the consumer if in flight recorder. */
438 if (session->output_traces) {
439 monitor = 1;
440 }
441
00e2e675
DG
442 DBG("Sending session stream to kernel consumer");
443
609af759 444 if (session->metadata_stream_fd >= 0 && session->metadata) {
2bba9e53 445 ret = kernel_consumer_add_metadata(sock, session, monitor);
f1e16794 446 if (ret < 0) {
f1e16794
DG
447 goto error;
448 }
f1e16794
DG
449 }
450
00e2e675 451 /* Send channel and streams of it */
f1e16794 452 cds_list_for_each_entry(chan, &session->channel_list.head, list) {
1fc1b7c8 453 ret = kernel_consumer_send_channel_streams(sock, chan, session,
2bba9e53 454 monitor);
f1e16794
DG
455 if (ret < 0) {
456 goto error;
457 }
601262d6
JD
458 if (monitor) {
459 /*
460 * Inform the relay that all the streams for the
461 * channel were sent.
462 */
e1f3997a 463 ret = kernel_consumer_streams_sent(sock, session, chan->key);
601262d6
JD
464 if (ret < 0) {
465 goto error;
466 }
467 }
f1e16794
DG
468 }
469
00e2e675 470 DBG("Kernel consumer FDs of metadata and channel streams sent");
f1e16794 471
4ce9ff51 472 session->consumer_fds_sent = 1;
f1e16794
DG
473 return 0;
474
475error:
476 return ret;
477}
07b86b52
JD
478
479int kernel_consumer_destroy_channel(struct consumer_socket *socket,
480 struct ltt_kernel_channel *channel)
481{
482 int ret;
483 struct lttcomm_consumer_msg msg;
484
485 assert(channel);
486 assert(socket);
07b86b52 487
e1f3997a 488 DBG("Sending kernel consumer destroy channel key %" PRIu64, channel->key);
07b86b52 489
53efb85a 490 memset(&msg, 0, sizeof(msg));
07b86b52 491 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
e1f3997a 492 msg.u.destroy_channel.key = channel->key;
07b86b52
JD
493
494 pthread_mutex_lock(socket->lock);
495 health_code_update();
496
497 ret = consumer_send_msg(socket, &msg);
498 if (ret < 0) {
499 goto error;
500 }
501
502error:
503 health_code_update();
504 pthread_mutex_unlock(socket->lock);
505 return ret;
506}
507
508int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
509 struct ltt_kernel_metadata *metadata)
510{
511 int ret;
512 struct lttcomm_consumer_msg msg;
513
514 assert(metadata);
515 assert(socket);
07b86b52 516
d40f0359 517 DBG("Sending kernel consumer destroy channel key %" PRIu64, metadata->key);
07b86b52 518
53efb85a 519 memset(&msg, 0, sizeof(msg));
07b86b52 520 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
d40f0359 521 msg.u.destroy_channel.key = metadata->key;
07b86b52
JD
522
523 pthread_mutex_lock(socket->lock);
524 health_code_update();
525
526 ret = consumer_send_msg(socket, &msg);
527 if (ret < 0) {
528 goto error;
529 }
530
531error:
532 health_code_update();
533 pthread_mutex_unlock(socket->lock);
534 return ret;
535}
This page took 0.080354 seconds and 4 git commands to generate.