2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
25 #include <common/common.h>
26 #include <common/defaults.h>
27 #include <common/compat/string.h>
30 #include "health-sessiond.h"
31 #include "kernel-consumer.h"
32 #include "notification-thread-commands.h"
34 #include "lttng-sessiond.h"
36 static char *create_channel_path(struct consumer_output
*consumer
)
39 char tmp_path
[PATH_MAX
];
40 char *pathname
= NULL
;
44 /* Get the right path name destination */
45 if (consumer
->type
== CONSUMER_DST_LOCAL
||
46 (consumer
->type
== CONSUMER_DST_NET
&&
47 consumer
->relay_major_version
== 2 &&
48 consumer
->relay_minor_version
>= 11)) {
49 pathname
= strdup(consumer
->domain_subdir
);
51 PERROR("Failed to copy domain subdirectory string %s",
52 consumer
->domain_subdir
);
55 DBG3("Kernel local consumer trace path relative to current trace chunk: \"%s\"",
59 ret
= snprintf(tmp_path
, sizeof(tmp_path
), "%s%s",
60 consumer
->dst
.net
.base_dir
,
61 consumer
->domain_subdir
);
63 PERROR("snprintf kernel metadata path");
65 } else if (ret
>= sizeof(tmp_path
)) {
66 ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s\"",
67 sizeof(tmp_path
), ret
,
68 consumer
->dst
.net
.base_dir
,
69 consumer
->domain_subdir
);
72 pathname
= lttng_strndup(tmp_path
, sizeof(tmp_path
));
74 PERROR("lttng_strndup");
77 DBG3("Kernel network consumer subdir path: %s", pathname
);
88 * Sending a single channel to the consumer with command ADD_CHANNEL.
91 int kernel_consumer_add_channel(struct consumer_socket
*sock
,
92 struct ltt_kernel_channel
*channel
,
93 struct ltt_kernel_session
*ksession
,
97 char *pathname
= NULL
;
98 struct lttcomm_consumer_msg lkm
;
99 struct consumer_output
*consumer
;
100 enum lttng_error_code status
;
101 struct ltt_session
*session
= NULL
;
102 struct lttng_channel_extended
*channel_attr_extended
;
108 assert(ksession
->consumer
);
110 consumer
= ksession
->consumer
;
111 channel_attr_extended
= (struct lttng_channel_extended
*)
112 channel
->channel
->attr
.extended
.ptr
;
114 DBG("Kernel consumer adding channel %s to kernel consumer",
115 channel
->channel
->name
);
116 is_local_trace
= consumer
->net_seq_index
== -1ULL;
118 pathname
= create_channel_path(consumer
);
124 if (is_local_trace
&& ksession
->current_trace_chunk
) {
125 enum lttng_trace_chunk_status chunk_status
;
126 char *pathname_index
;
128 ret
= asprintf(&pathname_index
, "%s/" DEFAULT_INDEX_DIR
,
131 ERR("Failed to format channel index directory");
137 * Create the index subdirectory which will take care
138 * of implicitly creating the channel's path.
140 chunk_status
= lttng_trace_chunk_create_subdirectory(
141 ksession
->current_trace_chunk
, pathname_index
);
142 free(pathname_index
);
143 if (chunk_status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
149 /* Prep channel message structure */
150 consumer_init_add_channel_comm_msg(&lkm
,
156 consumer
->net_seq_index
,
157 channel
->channel
->name
,
158 channel
->stream_count
,
159 channel
->channel
->attr
.output
,
160 CONSUMER_CHANNEL_TYPE_DATA
,
161 channel
->channel
->attr
.tracefile_size
,
162 channel
->channel
->attr
.tracefile_count
,
164 channel
->channel
->attr
.live_timer_interval
,
165 ksession
->is_live_session
,
166 channel_attr_extended
->monitor_timer_interval
,
167 ksession
->current_trace_chunk
);
169 health_code_update();
171 ret
= consumer_send_channel(sock
, &lkm
);
176 health_code_update();
178 session
= session_find_by_id(ksession
->id
);
180 assert(pthread_mutex_trylock(&session
->lock
));
181 assert(session_trylock_list());
183 status
= notification_thread_command_add_channel(
184 notification_thread_handle
, session
->name
,
185 ksession
->uid
, ksession
->gid
,
186 channel
->channel
->name
, channel
->key
,
188 channel
->channel
->attr
.subbuf_size
* channel
->channel
->attr
.num_subbuf
);
190 if (status
!= LTTNG_OK
) {
195 channel
->published_to_notification_thread
= true;
199 session_put(session
);
206 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
208 * The consumer socket lock must be held by the caller.
210 int kernel_consumer_add_metadata(struct consumer_socket
*sock
,
211 struct ltt_kernel_session
*ksession
, unsigned int monitor
)
214 struct lttcomm_consumer_msg lkm
;
215 struct consumer_output
*consumer
;
221 assert(ksession
->consumer
);
224 DBG("Sending metadata %d to kernel consumer",
225 ksession
->metadata_stream_fd
);
227 /* Get consumer output pointer */
228 consumer
= ksession
->consumer
;
230 /* Prep channel message structure */
231 consumer_init_add_channel_comm_msg(&lkm
,
232 ksession
->metadata
->key
,
234 DEFAULT_KERNEL_TRACE_DIR
,
237 consumer
->net_seq_index
,
238 ksession
->metadata
->conf
->name
,
240 ksession
->metadata
->conf
->attr
.output
,
241 CONSUMER_CHANNEL_TYPE_METADATA
,
242 ksession
->metadata
->conf
->attr
.tracefile_size
,
243 ksession
->metadata
->conf
->attr
.tracefile_count
,
245 ksession
->metadata
->conf
->attr
.live_timer_interval
,
246 ksession
->is_live_session
,
248 ksession
->current_trace_chunk
);
250 health_code_update();
252 ret
= consumer_send_channel(sock
, &lkm
);
257 health_code_update();
259 /* Prep stream message structure */
260 consumer_init_add_stream_comm_msg(&lkm
,
261 ksession
->metadata
->key
,
262 ksession
->metadata_stream_fd
,
263 0 /* CPU: 0 for metadata. */);
265 health_code_update();
267 /* Send stream and file descriptor */
268 ret
= consumer_send_stream(sock
, consumer
, &lkm
,
269 &ksession
->metadata_stream_fd
, 1);
274 health_code_update();
282 * Sending a single stream to the consumer with command ADD_STREAM.
285 int kernel_consumer_add_stream(struct consumer_socket
*sock
,
286 struct ltt_kernel_channel
*channel
,
287 struct ltt_kernel_stream
*stream
,
288 struct ltt_kernel_session
*session
, unsigned int monitor
)
291 struct lttcomm_consumer_msg lkm
;
292 struct consumer_output
*consumer
;
297 assert(session
->consumer
);
300 DBG("Sending stream %d of channel %s to kernel consumer",
301 stream
->fd
, channel
->channel
->name
);
303 /* Get consumer output pointer */
304 consumer
= session
->consumer
;
306 /* Prep stream consumer message */
307 consumer_init_add_stream_comm_msg(&lkm
,
312 health_code_update();
314 /* Send stream and file descriptor */
315 ret
= consumer_send_stream(sock
, consumer
, &lkm
, &stream
->fd
, 1);
320 health_code_update();
327 * Sending the notification that all streams were sent with STREAMS_SENT.
329 int kernel_consumer_streams_sent(struct consumer_socket
*sock
,
330 struct ltt_kernel_session
*session
, uint64_t channel_key
)
333 struct lttcomm_consumer_msg lkm
;
334 struct consumer_output
*consumer
;
339 DBG("Sending streams_sent");
340 /* Get consumer output pointer */
341 consumer
= session
->consumer
;
343 /* Prep stream consumer message */
344 consumer_init_streams_sent_comm_msg(&lkm
,
345 LTTNG_CONSUMER_STREAMS_SENT
,
346 channel_key
, consumer
->net_seq_index
);
348 health_code_update();
350 /* Send stream and file descriptor */
351 ret
= consumer_send_msg(sock
, &lkm
);
361 * Send all stream fds of kernel channel to the consumer.
363 * The consumer socket lock must be held by the caller.
365 int kernel_consumer_send_channel_streams(struct consumer_socket
*sock
,
366 struct ltt_kernel_channel
*channel
, struct ltt_kernel_session
*ksession
,
367 unsigned int monitor
)
370 struct ltt_kernel_stream
*stream
;
375 assert(ksession
->consumer
);
380 /* Bail out if consumer is disabled */
381 if (!ksession
->consumer
->enabled
) {
386 DBG("Sending streams of channel %s to kernel consumer",
387 channel
->channel
->name
);
389 if (!channel
->sent_to_consumer
) {
390 ret
= kernel_consumer_add_channel(sock
, channel
, ksession
, monitor
);
394 channel
->sent_to_consumer
= true;
398 cds_list_for_each_entry(stream
, &channel
->stream_list
.head
, list
) {
399 if (!stream
->fd
|| stream
->sent_to_consumer
) {
403 /* Add stream on the kernel consumer side. */
404 ret
= kernel_consumer_add_stream(sock
, channel
, stream
,
409 stream
->sent_to_consumer
= true;
418 * Send all stream fds of the kernel session to the consumer.
420 * The consumer socket lock must be held by the caller.
422 int kernel_consumer_send_session(struct consumer_socket
*sock
,
423 struct ltt_kernel_session
*session
)
425 int ret
, monitor
= 0;
426 struct ltt_kernel_channel
*chan
;
430 assert(session
->consumer
);
433 /* Bail out if consumer is disabled */
434 if (!session
->consumer
->enabled
) {
439 /* Don't monitor the streams on the consumer if in flight recorder. */
440 if (session
->output_traces
) {
444 DBG("Sending session stream to kernel consumer");
446 if (session
->metadata_stream_fd
>= 0 && session
->metadata
) {
447 ret
= kernel_consumer_add_metadata(sock
, session
, monitor
);
453 /* Send channel and streams of it */
454 cds_list_for_each_entry(chan
, &session
->channel_list
.head
, list
) {
455 ret
= kernel_consumer_send_channel_streams(sock
, chan
, session
,
462 * Inform the relay that all the streams for the
465 ret
= kernel_consumer_streams_sent(sock
, session
, chan
->key
);
472 DBG("Kernel consumer FDs of metadata and channel streams sent");
474 session
->consumer_fds_sent
= 1;
481 int kernel_consumer_destroy_channel(struct consumer_socket
*socket
,
482 struct ltt_kernel_channel
*channel
)
485 struct lttcomm_consumer_msg msg
;
490 DBG("Sending kernel consumer destroy channel key %" PRIu64
, channel
->key
);
492 memset(&msg
, 0, sizeof(msg
));
493 msg
.cmd_type
= LTTNG_CONSUMER_DESTROY_CHANNEL
;
494 msg
.u
.destroy_channel
.key
= channel
->key
;
496 pthread_mutex_lock(socket
->lock
);
497 health_code_update();
499 ret
= consumer_send_msg(socket
, &msg
);
505 health_code_update();
506 pthread_mutex_unlock(socket
->lock
);
510 int kernel_consumer_destroy_metadata(struct consumer_socket
*socket
,
511 struct ltt_kernel_metadata
*metadata
)
514 struct lttcomm_consumer_msg msg
;
519 DBG("Sending kernel consumer destroy channel key %" PRIu64
, metadata
->key
);
521 memset(&msg
, 0, sizeof(msg
));
522 msg
.cmd_type
= LTTNG_CONSUMER_DESTROY_CHANNEL
;
523 msg
.u
.destroy_channel
.key
= metadata
->key
;
525 pthread_mutex_lock(socket
->lock
);
526 health_code_update();
528 ret
= consumer_send_msg(socket
, &msg
);
534 health_code_update();
535 pthread_mutex_unlock(socket
->lock
);