Clean-up: hide internal kernel_consumer_add_channel() symbol
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.c
CommitLineData
f1e16794
DG
1/*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
6c1c0768 18#define _LGPL_SOURCE
f1e16794
DG
19#include <stdio.h>
20#include <stdlib.h>
f1e16794
DG
21#include <sys/stat.h>
22#include <unistd.h>
e1f3997a 23#include <inttypes.h>
f1e16794
DG
24
25#include <common/common.h>
26#include <common/defaults.h>
f5436bfc 27#include <common/compat/string.h>
f1e16794 28
00e2e675 29#include "consumer.h"
8782cc74 30#include "health-sessiond.h"
f1e16794 31#include "kernel-consumer.h"
e9404c27
JG
32#include "notification-thread-commands.h"
33#include "session.h"
34#include "lttng-sessiond.h"
f1e16794 35
2bba9e53
DG
36static char *create_channel_path(struct consumer_output *consumer,
37 uid_t uid, gid_t gid)
00e2e675
DG
38{
39 int ret;
ffe60014 40 char tmp_path[PATH_MAX];
2bba9e53 41 char *pathname = NULL;
00e2e675 42
2bba9e53 43 assert(consumer);
00e2e675 44
ffe60014
DG
45 /* Get the right path name destination */
46 if (consumer->type == CONSUMER_DST_LOCAL) {
47 /* Set application path to the destination path */
366a9222
JD
48 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s%s",
49 consumer->dst.session_root_path,
50 consumer->chunk_path,
51 consumer->subdir);
ffe60014 52 if (ret < 0) {
2bba9e53 53 PERROR("snprintf kernel channel path");
ffe60014 54 goto error;
dba13f1d
JG
55 } else if (ret >= sizeof(tmp_path)) {
56 ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s%s\"",
57 sizeof(tmp_path), ret,
58 consumer->dst.session_root_path,
59 consumer->chunk_path,
60 consumer->subdir);
61 goto error;
ffe60014 62 }
f5436bfc 63 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
bb3c4e70 64 if (!pathname) {
f5436bfc 65 PERROR("lttng_strndup");
bb3c4e70
MD
66 goto error;
67 }
ffe60014
DG
68
69 /* Create directory */
2bba9e53 70 ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG, uid, gid);
ffe60014 71 if (ret < 0) {
df5b86c8 72 if (errno != EEXIST) {
ffe60014
DG
73 ERR("Trace directory creation error");
74 goto error;
75 }
76 }
77 DBG3("Kernel local consumer tracefile path: %s", pathname);
78 } else {
2b29c638
JD
79 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s",
80 consumer->dst.net.base_dir,
81 consumer->subdir);
ffe60014 82 if (ret < 0) {
2bba9e53 83 PERROR("snprintf kernel metadata path");
ffe60014 84 goto error;
dba13f1d
JG
85 } else if (ret >= sizeof(tmp_path)) {
86 ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s\"",
87 sizeof(tmp_path), ret,
88 consumer->dst.net.base_dir,
89 consumer->subdir);
90 goto error;
ffe60014 91 }
f5436bfc 92 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
bb3c4e70 93 if (!pathname) {
f5436bfc 94 PERROR("lttng_strndup");
bb3c4e70
MD
95 goto error;
96 }
ffe60014
DG
97 DBG3("Kernel network consumer subdir path: %s", pathname);
98 }
99
2bba9e53
DG
100 return pathname;
101
102error:
103 free(pathname);
104 return NULL;
105}
106
107/*
108 * Sending a single channel to the consumer with command ADD_CHANNEL.
109 */
ba27cd00 110static
2bba9e53 111int kernel_consumer_add_channel(struct consumer_socket *sock,
e9404c27
JG
112 struct ltt_kernel_channel *channel,
113 struct ltt_kernel_session *ksession,
2bba9e53
DG
114 unsigned int monitor)
115{
116 int ret;
117 char *pathname;
118 struct lttcomm_consumer_msg lkm;
119 struct consumer_output *consumer;
e9404c27 120 enum lttng_error_code status;
e32d7f27 121 struct ltt_session *session = NULL;
e9404c27 122 struct lttng_channel_extended *channel_attr_extended;
2bba9e53
DG
123
124 /* Safety net */
125 assert(channel);
e9404c27
JG
126 assert(ksession);
127 assert(ksession->consumer);
2bba9e53 128
e9404c27
JG
129 consumer = ksession->consumer;
130 channel_attr_extended = (struct lttng_channel_extended *)
131 channel->channel->attr.extended.ptr;
2bba9e53
DG
132
133 DBG("Kernel consumer adding channel %s to kernel consumer",
134 channel->channel->name);
135
136 if (monitor) {
e9404c27
JG
137 pathname = create_channel_path(consumer, ksession->uid,
138 ksession->gid);
2bba9e53
DG
139 } else {
140 /* Empty path. */
53efb85a 141 pathname = strdup("");
2bba9e53 142 }
bb3c4e70
MD
143 if (!pathname) {
144 ret = -1;
145 goto error;
146 }
2bba9e53 147
00e2e675 148 /* Prep channel message structure */
638e7b4e 149 consumer_init_add_channel_comm_msg(&lkm,
e1f3997a 150 channel->key,
e9404c27 151 ksession->id,
ffe60014 152 pathname,
e9404c27
JG
153 ksession->uid,
154 ksession->gid,
ffe60014 155 consumer->net_seq_index,
c30aaa51 156 channel->channel->name,
ffe60014
DG
157 channel->stream_count,
158 channel->channel->attr.output,
1624d5b7
JD
159 CONSUMER_CHANNEL_TYPE_DATA,
160 channel->channel->attr.tracefile_size,
2bba9e53 161 channel->channel->attr.tracefile_count,
ecc48a90 162 monitor,
e9404c27
JG
163 channel->channel->attr.live_timer_interval,
164 channel_attr_extended->monitor_timer_interval);
00e2e675 165
840cb59c 166 health_code_update();
ca03de58 167
00e2e675
DG
168 ret = consumer_send_channel(sock, &lkm);
169 if (ret < 0) {
170 goto error;
171 }
172
840cb59c 173 health_code_update();
e9404c27
JG
174 rcu_read_lock();
175 session = session_find_by_id(ksession->id);
176 assert(session);
e098433c
JG
177 assert(pthread_mutex_trylock(&session->lock));
178 assert(session_trylock_list());
ca03de58 179
e9404c27
JG
180 status = notification_thread_command_add_channel(
181 notification_thread_handle, session->name,
182 ksession->uid, ksession->gid,
e1f3997a 183 channel->channel->name, channel->key,
e9404c27
JG
184 LTTNG_DOMAIN_KERNEL,
185 channel->channel->attr.subbuf_size * channel->channel->attr.num_subbuf);
186 rcu_read_unlock();
187 if (status != LTTNG_OK) {
188 ret = -1;
189 goto error;
190 }
753873bf
JR
191
192 channel->published_to_notification_thread = true;
193
00e2e675 194error:
e32d7f27
JG
195 if (session) {
196 session_put(session);
197 }
53efb85a 198 free(pathname);
00e2e675
DG
199 return ret;
200}
201
202/*
203 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
9a318688
JG
204 *
205 * The consumer socket lock must be held by the caller.
00e2e675 206 */
f50f23d9 207int kernel_consumer_add_metadata(struct consumer_socket *sock,
e098433c 208 struct ltt_kernel_session *ksession, unsigned int monitor)
00e2e675
DG
209{
210 int ret;
2bba9e53 211 char *pathname;
00e2e675 212 struct lttcomm_consumer_msg lkm;
a7d9a3e7 213 struct consumer_output *consumer;
e32d7f27 214 struct ltt_session *session = NULL;
e098433c
JG
215
216 rcu_read_lock();
00e2e675
DG
217
218 /* Safety net */
e098433c
JG
219 assert(ksession);
220 assert(ksession->consumer);
f50f23d9 221 assert(sock);
00e2e675 222
e098433c
JG
223 DBG("Sending metadata %d to kernel consumer",
224 ksession->metadata_stream_fd);
00e2e675
DG
225
226 /* Get consumer output pointer */
e098433c 227 consumer = ksession->consumer;
00e2e675 228
2bba9e53 229 if (monitor) {
e098433c
JG
230 pathname = create_channel_path(consumer,
231 ksession->uid, ksession->gid);
00e2e675 232 } else {
2bba9e53 233 /* Empty path. */
53efb85a 234 pathname = strdup("");
00e2e675 235 }
bb3c4e70
MD
236 if (!pathname) {
237 ret = -1;
238 goto error;
239 }
00e2e675 240
e098433c
JG
241 session = session_find_by_id(ksession->id);
242 assert(session);
243 assert(pthread_mutex_trylock(&session->lock));
244 assert(session_trylock_list());
245
00e2e675 246 /* Prep channel message structure */
638e7b4e 247 consumer_init_add_channel_comm_msg(&lkm,
e098433c
JG
248 ksession->metadata->key,
249 ksession->id,
ffe60014 250 pathname,
e098433c
JG
251 ksession->uid,
252 ksession->gid,
ffe60014 253 consumer->net_seq_index,
30079b6b 254 DEFAULT_METADATA_NAME,
ffe60014
DG
255 1,
256 DEFAULT_KERNEL_CHANNEL_OUTPUT,
1624d5b7 257 CONSUMER_CHANNEL_TYPE_METADATA,
2bba9e53 258 0, 0,
e9404c27 259 monitor, 0, 0);
00e2e675 260
840cb59c 261 health_code_update();
ca03de58 262
00e2e675
DG
263 ret = consumer_send_channel(sock, &lkm);
264 if (ret < 0) {
265 goto error;
266 }
267
840cb59c 268 health_code_update();
ca03de58 269
00e2e675 270 /* Prep stream message structure */
e098433c
JG
271 consumer_init_add_stream_comm_msg(&lkm,
272 ksession->metadata->key,
273 ksession->metadata_stream_fd,
274 0 /* CPU: 0 for metadata. */,
275 session->current_archive_id);
00e2e675 276
840cb59c 277 health_code_update();
ca03de58 278
00e2e675 279 /* Send stream and file descriptor */
a7d9a3e7 280 ret = consumer_send_stream(sock, consumer, &lkm,
e098433c 281 &ksession->metadata_stream_fd, 1);
00e2e675
DG
282 if (ret < 0) {
283 goto error;
284 }
285
840cb59c 286 health_code_update();
ca03de58 287
00e2e675 288error:
e098433c 289 rcu_read_unlock();
53efb85a 290 free(pathname);
e32d7f27
JG
291 if (session) {
292 session_put(session);
293 }
00e2e675
DG
294 return ret;
295}
296
297/*
298 * Sending a single stream to the consumer with command ADD_STREAM.
299 */
5b0e3ccb 300static
f50f23d9 301int kernel_consumer_add_stream(struct consumer_socket *sock,
e098433c
JG
302 struct ltt_kernel_channel *channel,
303 struct ltt_kernel_stream *stream,
304 struct ltt_kernel_session *session, unsigned int monitor,
305 uint64_t trace_archive_id)
00e2e675
DG
306{
307 int ret;
00e2e675 308 struct lttcomm_consumer_msg lkm;
a7d9a3e7 309 struct consumer_output *consumer;
00e2e675
DG
310
311 assert(channel);
312 assert(stream);
313 assert(session);
314 assert(session->consumer);
f50f23d9 315 assert(sock);
00e2e675
DG
316
317 DBG("Sending stream %d of channel %s to kernel consumer",
318 stream->fd, channel->channel->name);
319
320 /* Get consumer output pointer */
a7d9a3e7 321 consumer = session->consumer;
00e2e675 322
00e2e675 323 /* Prep stream consumer message */
e098433c 324 consumer_init_add_stream_comm_msg(&lkm,
e1f3997a 325 channel->key,
00e2e675 326 stream->fd,
e098433c
JG
327 stream->cpu,
328 trace_archive_id);
00e2e675 329
840cb59c 330 health_code_update();
ca03de58 331
00e2e675 332 /* Send stream and file descriptor */
a7d9a3e7 333 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
00e2e675
DG
334 if (ret < 0) {
335 goto error;
336 }
337
840cb59c 338 health_code_update();
ca03de58 339
00e2e675
DG
340error:
341 return ret;
342}
343
a4baae1b
JD
344/*
345 * Sending the notification that all streams were sent with STREAMS_SENT.
346 */
347int kernel_consumer_streams_sent(struct consumer_socket *sock,
348 struct ltt_kernel_session *session, uint64_t channel_key)
349{
350 int ret;
351 struct lttcomm_consumer_msg lkm;
352 struct consumer_output *consumer;
353
354 assert(sock);
355 assert(session);
356
357 DBG("Sending streams_sent");
358 /* Get consumer output pointer */
359 consumer = session->consumer;
360
361 /* Prep stream consumer message */
362 consumer_init_streams_sent_comm_msg(&lkm,
363 LTTNG_CONSUMER_STREAMS_SENT,
364 channel_key, consumer->net_seq_index);
365
366 health_code_update();
367
368 /* Send stream and file descriptor */
369 ret = consumer_send_msg(sock, &lkm);
370 if (ret < 0) {
371 goto error;
372 }
373
374error:
375 return ret;
376}
377
f1e16794
DG
378/*
379 * Send all stream fds of kernel channel to the consumer.
9a318688
JG
380 *
381 * The consumer socket lock must be held by the caller.
f1e16794 382 */
1fc1b7c8 383int kernel_consumer_send_channel_streams(struct consumer_socket *sock,
e098433c 384 struct ltt_kernel_channel *channel, struct ltt_kernel_session *ksession,
2bba9e53 385 unsigned int monitor)
f1e16794 386{
e99f9447 387 int ret = LTTNG_OK;
f1e16794 388 struct ltt_kernel_stream *stream;
e32d7f27 389 struct ltt_session *session = NULL;
00e2e675
DG
390
391 /* Safety net */
392 assert(channel);
e098433c
JG
393 assert(ksession);
394 assert(ksession->consumer);
f50f23d9 395 assert(sock);
00e2e675 396
e098433c
JG
397 rcu_read_lock();
398
399 session = session_find_by_id(ksession->id);
400 assert(session);
401 assert(pthread_mutex_trylock(&session->lock));
402 assert(session_trylock_list());
403
00e2e675 404 /* Bail out if consumer is disabled */
e098433c 405 if (!ksession->consumer->enabled) {
f73fabfd 406 ret = LTTNG_OK;
00e2e675
DG
407 goto error;
408 }
f1e16794
DG
409
410 DBG("Sending streams of channel %s to kernel consumer",
411 channel->channel->name);
412
e99f9447 413 if (!channel->sent_to_consumer) {
e098433c 414 ret = kernel_consumer_add_channel(sock, channel, ksession, monitor);
e99f9447
MD
415 if (ret < 0) {
416 goto error;
417 }
418 channel->sent_to_consumer = true;
f1e16794
DG
419 }
420
421 /* Send streams */
422 cds_list_for_each_entry(stream, &channel->stream_list.head, list) {
6986ab9b 423 if (!stream->fd || stream->sent_to_consumer) {
f1e16794
DG
424 continue;
425 }
00e2e675
DG
426
427 /* Add stream on the kernel consumer side. */
e098433c
JG
428 ret = kernel_consumer_add_stream(sock, channel, stream,
429 ksession, monitor, session->current_archive_id);
f1e16794 430 if (ret < 0) {
f1e16794
DG
431 goto error;
432 }
6986ab9b 433 stream->sent_to_consumer = true;
f1e16794
DG
434 }
435
f1e16794 436error:
e098433c 437 rcu_read_unlock();
e32d7f27
JG
438 if (session) {
439 session_put(session);
440 }
f1e16794
DG
441 return ret;
442}
443
444/*
445 * Send all stream fds of the kernel session to the consumer.
9a318688
JG
446 *
447 * The consumer socket lock must be held by the caller.
f1e16794 448 */
f50f23d9
DG
449int kernel_consumer_send_session(struct consumer_socket *sock,
450 struct ltt_kernel_session *session)
f1e16794 451{
2bba9e53 452 int ret, monitor = 0;
f1e16794 453 struct ltt_kernel_channel *chan;
f1e16794 454
00e2e675
DG
455 /* Safety net */
456 assert(session);
457 assert(session->consumer);
f50f23d9 458 assert(sock);
f1e16794 459
00e2e675
DG
460 /* Bail out if consumer is disabled */
461 if (!session->consumer->enabled) {
f73fabfd 462 ret = LTTNG_OK;
00e2e675 463 goto error;
f1e16794
DG
464 }
465
2bba9e53
DG
466 /* Don't monitor the streams on the consumer if in flight recorder. */
467 if (session->output_traces) {
468 monitor = 1;
469 }
470
00e2e675
DG
471 DBG("Sending session stream to kernel consumer");
472
609af759 473 if (session->metadata_stream_fd >= 0 && session->metadata) {
2bba9e53 474 ret = kernel_consumer_add_metadata(sock, session, monitor);
f1e16794 475 if (ret < 0) {
f1e16794
DG
476 goto error;
477 }
f1e16794
DG
478 }
479
00e2e675 480 /* Send channel and streams of it */
f1e16794 481 cds_list_for_each_entry(chan, &session->channel_list.head, list) {
1fc1b7c8 482 ret = kernel_consumer_send_channel_streams(sock, chan, session,
2bba9e53 483 monitor);
f1e16794
DG
484 if (ret < 0) {
485 goto error;
486 }
601262d6
JD
487 if (monitor) {
488 /*
489 * Inform the relay that all the streams for the
490 * channel were sent.
491 */
e1f3997a 492 ret = kernel_consumer_streams_sent(sock, session, chan->key);
601262d6
JD
493 if (ret < 0) {
494 goto error;
495 }
496 }
f1e16794
DG
497 }
498
00e2e675 499 DBG("Kernel consumer FDs of metadata and channel streams sent");
f1e16794 500
4ce9ff51 501 session->consumer_fds_sent = 1;
f1e16794
DG
502 return 0;
503
504error:
505 return ret;
506}
07b86b52
JD
507
508int kernel_consumer_destroy_channel(struct consumer_socket *socket,
509 struct ltt_kernel_channel *channel)
510{
511 int ret;
512 struct lttcomm_consumer_msg msg;
513
514 assert(channel);
515 assert(socket);
07b86b52 516
e1f3997a 517 DBG("Sending kernel consumer destroy channel key %" PRIu64, channel->key);
07b86b52 518
53efb85a 519 memset(&msg, 0, sizeof(msg));
07b86b52 520 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
e1f3997a 521 msg.u.destroy_channel.key = channel->key;
07b86b52
JD
522
523 pthread_mutex_lock(socket->lock);
524 health_code_update();
525
526 ret = consumer_send_msg(socket, &msg);
527 if (ret < 0) {
528 goto error;
529 }
530
531error:
532 health_code_update();
533 pthread_mutex_unlock(socket->lock);
534 return ret;
535}
536
537int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
538 struct ltt_kernel_metadata *metadata)
539{
540 int ret;
541 struct lttcomm_consumer_msg msg;
542
543 assert(metadata);
544 assert(socket);
07b86b52 545
d40f0359 546 DBG("Sending kernel consumer destroy channel key %" PRIu64, metadata->key);
07b86b52 547
53efb85a 548 memset(&msg, 0, sizeof(msg));
07b86b52 549 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
d40f0359 550 msg.u.destroy_channel.key = metadata->key;
07b86b52
JD
551
552 pthread_mutex_lock(socket->lock);
553 health_code_update();
554
555 ret = consumer_send_msg(socket, &msg);
556 if (ret < 0) {
557 goto error;
558 }
559
560error:
561 health_code_update();
562 pthread_mutex_unlock(socket->lock);
563 return ret;
564}
This page took 0.075677 seconds and 4 git commands to generate.