Add session rotation ongoing/completed notification commands
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.c
CommitLineData
f1e16794
DG
1/*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
6c1c0768 18#define _LGPL_SOURCE
f1e16794
DG
19#include <stdio.h>
20#include <stdlib.h>
f1e16794
DG
21#include <sys/stat.h>
22#include <unistd.h>
e1f3997a 23#include <inttypes.h>
f1e16794
DG
24
25#include <common/common.h>
26#include <common/defaults.h>
f5436bfc 27#include <common/compat/string.h>
f1e16794 28
00e2e675 29#include "consumer.h"
8782cc74 30#include "health-sessiond.h"
f1e16794 31#include "kernel-consumer.h"
e9404c27
JG
32#include "notification-thread-commands.h"
33#include "session.h"
34#include "lttng-sessiond.h"
f1e16794 35
2bba9e53
DG
36static char *create_channel_path(struct consumer_output *consumer,
37 uid_t uid, gid_t gid)
00e2e675
DG
38{
39 int ret;
ffe60014 40 char tmp_path[PATH_MAX];
2bba9e53 41 char *pathname = NULL;
00e2e675 42
2bba9e53 43 assert(consumer);
00e2e675 44
ffe60014
DG
45 /* Get the right path name destination */
46 if (consumer->type == CONSUMER_DST_LOCAL) {
47 /* Set application path to the destination path */
366a9222
JD
48 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s%s",
49 consumer->dst.session_root_path,
50 consumer->chunk_path,
51 consumer->subdir);
ffe60014 52 if (ret < 0) {
2bba9e53 53 PERROR("snprintf kernel channel path");
ffe60014 54 goto error;
dba13f1d
JG
55 } else if (ret >= sizeof(tmp_path)) {
56 ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s%s\"",
57 sizeof(tmp_path), ret,
58 consumer->dst.session_root_path,
59 consumer->chunk_path,
60 consumer->subdir);
61 goto error;
ffe60014 62 }
f5436bfc 63 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
bb3c4e70 64 if (!pathname) {
f5436bfc 65 PERROR("lttng_strndup");
bb3c4e70
MD
66 goto error;
67 }
ffe60014
DG
68
69 /* Create directory */
2bba9e53 70 ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG, uid, gid);
ffe60014 71 if (ret < 0) {
df5b86c8 72 if (errno != EEXIST) {
ffe60014
DG
73 ERR("Trace directory creation error");
74 goto error;
75 }
76 }
77 DBG3("Kernel local consumer tracefile path: %s", pathname);
78 } else {
2b29c638
JD
79 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s",
80 consumer->dst.net.base_dir,
81 consumer->subdir);
ffe60014 82 if (ret < 0) {
2bba9e53 83 PERROR("snprintf kernel metadata path");
ffe60014 84 goto error;
dba13f1d
JG
85 } else if (ret >= sizeof(tmp_path)) {
86 ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s\"",
87 sizeof(tmp_path), ret,
88 consumer->dst.net.base_dir,
89 consumer->subdir);
90 goto error;
ffe60014 91 }
f5436bfc 92 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
bb3c4e70 93 if (!pathname) {
f5436bfc 94 PERROR("lttng_strndup");
bb3c4e70
MD
95 goto error;
96 }
ffe60014
DG
97 DBG3("Kernel network consumer subdir path: %s", pathname);
98 }
99
2bba9e53
DG
100 return pathname;
101
102error:
103 free(pathname);
104 return NULL;
105}
106
107/*
108 * Sending a single channel to the consumer with command ADD_CHANNEL.
109 */
110int kernel_consumer_add_channel(struct consumer_socket *sock,
e9404c27
JG
111 struct ltt_kernel_channel *channel,
112 struct ltt_kernel_session *ksession,
2bba9e53
DG
113 unsigned int monitor)
114{
115 int ret;
116 char *pathname;
117 struct lttcomm_consumer_msg lkm;
118 struct consumer_output *consumer;
e9404c27
JG
119 enum lttng_error_code status;
120 struct ltt_session *session;
121 struct lttng_channel_extended *channel_attr_extended;
2bba9e53
DG
122
123 /* Safety net */
124 assert(channel);
e9404c27
JG
125 assert(ksession);
126 assert(ksession->consumer);
2bba9e53 127
e9404c27
JG
128 consumer = ksession->consumer;
129 channel_attr_extended = (struct lttng_channel_extended *)
130 channel->channel->attr.extended.ptr;
2bba9e53
DG
131
132 DBG("Kernel consumer adding channel %s to kernel consumer",
133 channel->channel->name);
134
135 if (monitor) {
e9404c27
JG
136 pathname = create_channel_path(consumer, ksession->uid,
137 ksession->gid);
2bba9e53
DG
138 } else {
139 /* Empty path. */
53efb85a 140 pathname = strdup("");
2bba9e53 141 }
bb3c4e70
MD
142 if (!pathname) {
143 ret = -1;
144 goto error;
145 }
2bba9e53 146
00e2e675 147 /* Prep channel message structure */
638e7b4e 148 consumer_init_add_channel_comm_msg(&lkm,
e1f3997a 149 channel->key,
e9404c27 150 ksession->id,
ffe60014 151 pathname,
e9404c27
JG
152 ksession->uid,
153 ksession->gid,
ffe60014 154 consumer->net_seq_index,
c30aaa51 155 channel->channel->name,
ffe60014
DG
156 channel->stream_count,
157 channel->channel->attr.output,
1624d5b7
JD
158 CONSUMER_CHANNEL_TYPE_DATA,
159 channel->channel->attr.tracefile_size,
2bba9e53 160 channel->channel->attr.tracefile_count,
ecc48a90 161 monitor,
e9404c27
JG
162 channel->channel->attr.live_timer_interval,
163 channel_attr_extended->monitor_timer_interval);
00e2e675 164
840cb59c 165 health_code_update();
ca03de58 166
00e2e675
DG
167 ret = consumer_send_channel(sock, &lkm);
168 if (ret < 0) {
169 goto error;
170 }
171
840cb59c 172 health_code_update();
e9404c27
JG
173 rcu_read_lock();
174 session = session_find_by_id(ksession->id);
175 assert(session);
e098433c
JG
176 assert(pthread_mutex_trylock(&session->lock));
177 assert(session_trylock_list());
ca03de58 178
e9404c27
JG
179 status = notification_thread_command_add_channel(
180 notification_thread_handle, session->name,
181 ksession->uid, ksession->gid,
e1f3997a 182 channel->channel->name, channel->key,
e9404c27
JG
183 LTTNG_DOMAIN_KERNEL,
184 channel->channel->attr.subbuf_size * channel->channel->attr.num_subbuf);
185 rcu_read_unlock();
186 if (status != LTTNG_OK) {
187 ret = -1;
188 goto error;
189 }
753873bf
JR
190
191 channel->published_to_notification_thread = true;
192
00e2e675 193error:
53efb85a 194 free(pathname);
00e2e675
DG
195 return ret;
196}
197
198/*
199 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
9a318688
JG
200 *
201 * The consumer socket lock must be held by the caller.
00e2e675 202 */
f50f23d9 203int kernel_consumer_add_metadata(struct consumer_socket *sock,
e098433c 204 struct ltt_kernel_session *ksession, unsigned int monitor)
00e2e675
DG
205{
206 int ret;
2bba9e53 207 char *pathname;
00e2e675 208 struct lttcomm_consumer_msg lkm;
a7d9a3e7 209 struct consumer_output *consumer;
e098433c
JG
210 struct ltt_session *session;
211
212 rcu_read_lock();
00e2e675
DG
213
214 /* Safety net */
e098433c
JG
215 assert(ksession);
216 assert(ksession->consumer);
f50f23d9 217 assert(sock);
00e2e675 218
e098433c
JG
219 DBG("Sending metadata %d to kernel consumer",
220 ksession->metadata_stream_fd);
00e2e675
DG
221
222 /* Get consumer output pointer */
e098433c 223 consumer = ksession->consumer;
00e2e675 224
2bba9e53 225 if (monitor) {
e098433c
JG
226 pathname = create_channel_path(consumer,
227 ksession->uid, ksession->gid);
00e2e675 228 } else {
2bba9e53 229 /* Empty path. */
53efb85a 230 pathname = strdup("");
00e2e675 231 }
bb3c4e70
MD
232 if (!pathname) {
233 ret = -1;
234 goto error;
235 }
00e2e675 236
e098433c
JG
237 session = session_find_by_id(ksession->id);
238 assert(session);
239 assert(pthread_mutex_trylock(&session->lock));
240 assert(session_trylock_list());
241
00e2e675 242 /* Prep channel message structure */
638e7b4e 243 consumer_init_add_channel_comm_msg(&lkm,
e098433c
JG
244 ksession->metadata->key,
245 ksession->id,
ffe60014 246 pathname,
e098433c
JG
247 ksession->uid,
248 ksession->gid,
ffe60014 249 consumer->net_seq_index,
30079b6b 250 DEFAULT_METADATA_NAME,
ffe60014
DG
251 1,
252 DEFAULT_KERNEL_CHANNEL_OUTPUT,
1624d5b7 253 CONSUMER_CHANNEL_TYPE_METADATA,
2bba9e53 254 0, 0,
e9404c27 255 monitor, 0, 0);
00e2e675 256
840cb59c 257 health_code_update();
ca03de58 258
00e2e675
DG
259 ret = consumer_send_channel(sock, &lkm);
260 if (ret < 0) {
261 goto error;
262 }
263
840cb59c 264 health_code_update();
ca03de58 265
00e2e675 266 /* Prep stream message structure */
e098433c
JG
267 consumer_init_add_stream_comm_msg(&lkm,
268 ksession->metadata->key,
269 ksession->metadata_stream_fd,
270 0 /* CPU: 0 for metadata. */,
271 session->current_archive_id);
00e2e675 272
840cb59c 273 health_code_update();
ca03de58 274
00e2e675 275 /* Send stream and file descriptor */
a7d9a3e7 276 ret = consumer_send_stream(sock, consumer, &lkm,
e098433c 277 &ksession->metadata_stream_fd, 1);
00e2e675
DG
278 if (ret < 0) {
279 goto error;
280 }
281
840cb59c 282 health_code_update();
ca03de58 283
00e2e675 284error:
e098433c 285 rcu_read_unlock();
53efb85a 286 free(pathname);
00e2e675
DG
287 return ret;
288}
289
290/*
291 * Sending a single stream to the consumer with command ADD_STREAM.
292 */
5b0e3ccb 293static
f50f23d9 294int kernel_consumer_add_stream(struct consumer_socket *sock,
e098433c
JG
295 struct ltt_kernel_channel *channel,
296 struct ltt_kernel_stream *stream,
297 struct ltt_kernel_session *session, unsigned int monitor,
298 uint64_t trace_archive_id)
00e2e675
DG
299{
300 int ret;
00e2e675 301 struct lttcomm_consumer_msg lkm;
a7d9a3e7 302 struct consumer_output *consumer;
00e2e675
DG
303
304 assert(channel);
305 assert(stream);
306 assert(session);
307 assert(session->consumer);
f50f23d9 308 assert(sock);
00e2e675
DG
309
310 DBG("Sending stream %d of channel %s to kernel consumer",
311 stream->fd, channel->channel->name);
312
313 /* Get consumer output pointer */
a7d9a3e7 314 consumer = session->consumer;
00e2e675 315
00e2e675 316 /* Prep stream consumer message */
e098433c 317 consumer_init_add_stream_comm_msg(&lkm,
e1f3997a 318 channel->key,
00e2e675 319 stream->fd,
e098433c
JG
320 stream->cpu,
321 trace_archive_id);
00e2e675 322
840cb59c 323 health_code_update();
ca03de58 324
00e2e675 325 /* Send stream and file descriptor */
a7d9a3e7 326 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
00e2e675
DG
327 if (ret < 0) {
328 goto error;
329 }
330
840cb59c 331 health_code_update();
ca03de58 332
00e2e675
DG
333error:
334 return ret;
335}
336
a4baae1b
JD
337/*
338 * Sending the notification that all streams were sent with STREAMS_SENT.
339 */
340int kernel_consumer_streams_sent(struct consumer_socket *sock,
341 struct ltt_kernel_session *session, uint64_t channel_key)
342{
343 int ret;
344 struct lttcomm_consumer_msg lkm;
345 struct consumer_output *consumer;
346
347 assert(sock);
348 assert(session);
349
350 DBG("Sending streams_sent");
351 /* Get consumer output pointer */
352 consumer = session->consumer;
353
354 /* Prep stream consumer message */
355 consumer_init_streams_sent_comm_msg(&lkm,
356 LTTNG_CONSUMER_STREAMS_SENT,
357 channel_key, consumer->net_seq_index);
358
359 health_code_update();
360
361 /* Send stream and file descriptor */
362 ret = consumer_send_msg(sock, &lkm);
363 if (ret < 0) {
364 goto error;
365 }
366
367error:
368 return ret;
369}
370
f1e16794
DG
371/*
372 * Send all stream fds of kernel channel to the consumer.
9a318688
JG
373 *
374 * The consumer socket lock must be held by the caller.
f1e16794 375 */
1fc1b7c8 376int kernel_consumer_send_channel_streams(struct consumer_socket *sock,
e098433c 377 struct ltt_kernel_channel *channel, struct ltt_kernel_session *ksession,
2bba9e53 378 unsigned int monitor)
f1e16794 379{
e99f9447 380 int ret = LTTNG_OK;
f1e16794 381 struct ltt_kernel_stream *stream;
e098433c 382 struct ltt_session *session;
00e2e675
DG
383
384 /* Safety net */
385 assert(channel);
e098433c
JG
386 assert(ksession);
387 assert(ksession->consumer);
f50f23d9 388 assert(sock);
00e2e675 389
e098433c
JG
390 rcu_read_lock();
391
392 session = session_find_by_id(ksession->id);
393 assert(session);
394 assert(pthread_mutex_trylock(&session->lock));
395 assert(session_trylock_list());
396
00e2e675 397 /* Bail out if consumer is disabled */
e098433c 398 if (!ksession->consumer->enabled) {
f73fabfd 399 ret = LTTNG_OK;
00e2e675
DG
400 goto error;
401 }
f1e16794
DG
402
403 DBG("Sending streams of channel %s to kernel consumer",
404 channel->channel->name);
405
e99f9447 406 if (!channel->sent_to_consumer) {
e098433c 407 ret = kernel_consumer_add_channel(sock, channel, ksession, monitor);
e99f9447
MD
408 if (ret < 0) {
409 goto error;
410 }
411 channel->sent_to_consumer = true;
f1e16794
DG
412 }
413
414 /* Send streams */
415 cds_list_for_each_entry(stream, &channel->stream_list.head, list) {
6986ab9b 416 if (!stream->fd || stream->sent_to_consumer) {
f1e16794
DG
417 continue;
418 }
00e2e675
DG
419
420 /* Add stream on the kernel consumer side. */
e098433c
JG
421 ret = kernel_consumer_add_stream(sock, channel, stream,
422 ksession, monitor, session->current_archive_id);
f1e16794 423 if (ret < 0) {
f1e16794
DG
424 goto error;
425 }
6986ab9b 426 stream->sent_to_consumer = true;
f1e16794
DG
427 }
428
f1e16794 429error:
e098433c 430 rcu_read_unlock();
f1e16794
DG
431 return ret;
432}
433
434/*
435 * Send all stream fds of the kernel session to the consumer.
9a318688
JG
436 *
437 * The consumer socket lock must be held by the caller.
f1e16794 438 */
f50f23d9
DG
439int kernel_consumer_send_session(struct consumer_socket *sock,
440 struct ltt_kernel_session *session)
f1e16794 441{
2bba9e53 442 int ret, monitor = 0;
f1e16794 443 struct ltt_kernel_channel *chan;
f1e16794 444
00e2e675
DG
445 /* Safety net */
446 assert(session);
447 assert(session->consumer);
f50f23d9 448 assert(sock);
f1e16794 449
00e2e675
DG
450 /* Bail out if consumer is disabled */
451 if (!session->consumer->enabled) {
f73fabfd 452 ret = LTTNG_OK;
00e2e675 453 goto error;
f1e16794
DG
454 }
455
2bba9e53
DG
456 /* Don't monitor the streams on the consumer if in flight recorder. */
457 if (session->output_traces) {
458 monitor = 1;
459 }
460
00e2e675
DG
461 DBG("Sending session stream to kernel consumer");
462
609af759 463 if (session->metadata_stream_fd >= 0 && session->metadata) {
2bba9e53 464 ret = kernel_consumer_add_metadata(sock, session, monitor);
f1e16794 465 if (ret < 0) {
f1e16794
DG
466 goto error;
467 }
f1e16794
DG
468 }
469
00e2e675 470 /* Send channel and streams of it */
f1e16794 471 cds_list_for_each_entry(chan, &session->channel_list.head, list) {
1fc1b7c8 472 ret = kernel_consumer_send_channel_streams(sock, chan, session,
2bba9e53 473 monitor);
f1e16794
DG
474 if (ret < 0) {
475 goto error;
476 }
601262d6
JD
477 if (monitor) {
478 /*
479 * Inform the relay that all the streams for the
480 * channel were sent.
481 */
e1f3997a 482 ret = kernel_consumer_streams_sent(sock, session, chan->key);
601262d6
JD
483 if (ret < 0) {
484 goto error;
485 }
486 }
f1e16794
DG
487 }
488
00e2e675 489 DBG("Kernel consumer FDs of metadata and channel streams sent");
f1e16794 490
4ce9ff51 491 session->consumer_fds_sent = 1;
f1e16794
DG
492 return 0;
493
494error:
495 return ret;
496}
07b86b52
JD
497
498int kernel_consumer_destroy_channel(struct consumer_socket *socket,
499 struct ltt_kernel_channel *channel)
500{
501 int ret;
502 struct lttcomm_consumer_msg msg;
503
504 assert(channel);
505 assert(socket);
07b86b52 506
e1f3997a 507 DBG("Sending kernel consumer destroy channel key %" PRIu64, channel->key);
07b86b52 508
53efb85a 509 memset(&msg, 0, sizeof(msg));
07b86b52 510 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
e1f3997a 511 msg.u.destroy_channel.key = channel->key;
07b86b52
JD
512
513 pthread_mutex_lock(socket->lock);
514 health_code_update();
515
516 ret = consumer_send_msg(socket, &msg);
517 if (ret < 0) {
518 goto error;
519 }
520
521error:
522 health_code_update();
523 pthread_mutex_unlock(socket->lock);
524 return ret;
525}
526
527int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
528 struct ltt_kernel_metadata *metadata)
529{
530 int ret;
531 struct lttcomm_consumer_msg msg;
532
533 assert(metadata);
534 assert(socket);
07b86b52 535
d40f0359 536 DBG("Sending kernel consumer destroy channel key %" PRIu64, metadata->key);
07b86b52 537
53efb85a 538 memset(&msg, 0, sizeof(msg));
07b86b52 539 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
d40f0359 540 msg.u.destroy_channel.key = metadata->key;
07b86b52
JD
541
542 pthread_mutex_lock(socket->lock);
543 health_code_update();
544
545 ret = consumer_send_msg(socket, &msg);
546 if (ret < 0) {
547 goto error;
548 }
549
550error:
551 health_code_update();
552 pthread_mutex_unlock(socket->lock);
553 return ret;
554}
This page took 0.073666 seconds and 4 git commands to generate.