2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24 #include <common/common.h>
25 #include <common/defaults.h>
26 #include <common/compat/string.h>
29 #include "health-sessiond.h"
30 #include "kernel-consumer.h"
31 #include "notification-thread-commands.h"
33 #include "lttng-sessiond.h"
35 static char *create_channel_path(struct consumer_output
*consumer
,
39 char tmp_path
[PATH_MAX
];
40 char *pathname
= NULL
;
44 /* Get the right path name destination */
45 if (consumer
->type
== CONSUMER_DST_LOCAL
) {
46 /* Set application path to the destination path */
47 ret
= snprintf(tmp_path
, sizeof(tmp_path
), "%s%s",
48 consumer
->dst
.trace_path
, consumer
->subdir
);
50 PERROR("snprintf kernel channel path");
53 pathname
= lttng_strndup(tmp_path
, sizeof(tmp_path
));
55 PERROR("lttng_strndup");
59 /* Create directory */
60 ret
= run_as_mkdir_recursive(pathname
, S_IRWXU
| S_IRWXG
, uid
, gid
);
62 if (errno
!= EEXIST
) {
63 ERR("Trace directory creation error");
67 DBG3("Kernel local consumer tracefile path: %s", pathname
);
69 ret
= snprintf(tmp_path
, sizeof(tmp_path
), "%s", consumer
->subdir
);
71 PERROR("snprintf kernel metadata path");
74 pathname
= lttng_strndup(tmp_path
, sizeof(tmp_path
));
76 PERROR("lttng_strndup");
79 DBG3("Kernel network consumer subdir path: %s", pathname
);
90 * Sending a single channel to the consumer with command ADD_CHANNEL.
92 int kernel_consumer_add_channel(struct consumer_socket
*sock
,
93 struct ltt_kernel_channel
*channel
,
94 struct ltt_kernel_session
*ksession
,
99 struct lttcomm_consumer_msg lkm
;
100 struct consumer_output
*consumer
;
101 enum lttng_error_code status
;
102 struct ltt_session
*session
;
103 struct lttng_channel_extended
*channel_attr_extended
;
108 assert(ksession
->consumer
);
110 consumer
= ksession
->consumer
;
111 channel_attr_extended
= (struct lttng_channel_extended
*)
112 channel
->channel
->attr
.extended
.ptr
;
114 DBG("Kernel consumer adding channel %s to kernel consumer",
115 channel
->channel
->name
);
118 pathname
= create_channel_path(consumer
, ksession
->uid
,
122 pathname
= strdup("");
129 /* Prep channel message structure */
130 consumer_init_channel_comm_msg(&lkm
,
131 LTTNG_CONSUMER_ADD_CHANNEL
,
137 consumer
->net_seq_index
,
138 channel
->channel
->name
,
139 channel
->stream_count
,
140 channel
->channel
->attr
.output
,
141 CONSUMER_CHANNEL_TYPE_DATA
,
142 channel
->channel
->attr
.tracefile_size
,
143 channel
->channel
->attr
.tracefile_count
,
145 channel
->channel
->attr
.live_timer_interval
,
146 channel_attr_extended
->monitor_timer_interval
);
148 health_code_update();
150 ret
= consumer_send_channel(sock
, &lkm
);
155 health_code_update();
157 session
= session_find_by_id(ksession
->id
);
160 status
= notification_thread_command_add_channel(
161 notification_thread_handle
, session
->name
,
162 ksession
->uid
, ksession
->gid
,
163 channel
->channel
->name
, channel
->fd
,
165 channel
->channel
->attr
.subbuf_size
* channel
->channel
->attr
.num_subbuf
);
167 if (status
!= LTTNG_OK
) {
177 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
179 int kernel_consumer_add_metadata(struct consumer_socket
*sock
,
180 struct ltt_kernel_session
*session
, unsigned int monitor
)
184 struct lttcomm_consumer_msg lkm
;
185 struct consumer_output
*consumer
;
189 assert(session
->consumer
);
192 DBG("Sending metadata %d to kernel consumer", session
->metadata_stream_fd
);
194 /* Get consumer output pointer */
195 consumer
= session
->consumer
;
198 pathname
= create_channel_path(consumer
, session
->uid
, session
->gid
);
201 pathname
= strdup("");
208 /* Prep channel message structure */
209 consumer_init_channel_comm_msg(&lkm
,
210 LTTNG_CONSUMER_ADD_CHANNEL
,
211 session
->metadata
->fd
,
216 consumer
->net_seq_index
,
217 DEFAULT_METADATA_NAME
,
219 DEFAULT_KERNEL_CHANNEL_OUTPUT
,
220 CONSUMER_CHANNEL_TYPE_METADATA
,
224 health_code_update();
226 ret
= consumer_send_channel(sock
, &lkm
);
231 health_code_update();
233 /* Prep stream message structure */
234 consumer_init_stream_comm_msg(&lkm
,
235 LTTNG_CONSUMER_ADD_STREAM
,
236 session
->metadata
->fd
,
237 session
->metadata_stream_fd
,
238 0); /* CPU: 0 for metadata. */
240 health_code_update();
242 /* Send stream and file descriptor */
243 ret
= consumer_send_stream(sock
, consumer
, &lkm
,
244 &session
->metadata_stream_fd
, 1);
249 health_code_update();
257 * Sending a single stream to the consumer with command ADD_STREAM.
259 int kernel_consumer_add_stream(struct consumer_socket
*sock
,
260 struct ltt_kernel_channel
*channel
, struct ltt_kernel_stream
*stream
,
261 struct ltt_kernel_session
*session
, unsigned int monitor
)
264 struct lttcomm_consumer_msg lkm
;
265 struct consumer_output
*consumer
;
270 assert(session
->consumer
);
273 DBG("Sending stream %d of channel %s to kernel consumer",
274 stream
->fd
, channel
->channel
->name
);
276 /* Get consumer output pointer */
277 consumer
= session
->consumer
;
279 /* Prep stream consumer message */
280 consumer_init_stream_comm_msg(&lkm
,
281 LTTNG_CONSUMER_ADD_STREAM
,
286 health_code_update();
288 /* Send stream and file descriptor */
289 ret
= consumer_send_stream(sock
, consumer
, &lkm
, &stream
->fd
, 1);
294 health_code_update();
301 * Sending the notification that all streams were sent with STREAMS_SENT.
303 int kernel_consumer_streams_sent(struct consumer_socket
*sock
,
304 struct ltt_kernel_session
*session
, uint64_t channel_key
)
307 struct lttcomm_consumer_msg lkm
;
308 struct consumer_output
*consumer
;
313 DBG("Sending streams_sent");
314 /* Get consumer output pointer */
315 consumer
= session
->consumer
;
317 /* Prep stream consumer message */
318 consumer_init_streams_sent_comm_msg(&lkm
,
319 LTTNG_CONSUMER_STREAMS_SENT
,
320 channel_key
, consumer
->net_seq_index
);
322 health_code_update();
324 /* Send stream and file descriptor */
325 ret
= consumer_send_msg(sock
, &lkm
);
335 * Send all stream fds of kernel channel to the consumer.
337 int kernel_consumer_send_channel_stream(struct consumer_socket
*sock
,
338 struct ltt_kernel_channel
*channel
, struct ltt_kernel_session
*session
,
339 unsigned int monitor
)
342 struct ltt_kernel_stream
*stream
;
347 assert(session
->consumer
);
350 /* Bail out if consumer is disabled */
351 if (!session
->consumer
->enabled
) {
356 DBG("Sending streams of channel %s to kernel consumer",
357 channel
->channel
->name
);
359 ret
= kernel_consumer_add_channel(sock
, channel
, session
, monitor
);
365 cds_list_for_each_entry(stream
, &channel
->stream_list
.head
, list
) {
366 if (!stream
->fd
|| stream
->sent_to_consumer
) {
370 /* Add stream on the kernel consumer side. */
371 ret
= kernel_consumer_add_stream(sock
, channel
, stream
, session
,
376 stream
->sent_to_consumer
= true;
384 * Send all stream fds of the kernel session to the consumer.
386 int kernel_consumer_send_session(struct consumer_socket
*sock
,
387 struct ltt_kernel_session
*session
)
389 int ret
, monitor
= 0;
390 struct ltt_kernel_channel
*chan
;
394 assert(session
->consumer
);
397 /* Bail out if consumer is disabled */
398 if (!session
->consumer
->enabled
) {
403 /* Don't monitor the streams on the consumer if in flight recorder. */
404 if (session
->output_traces
) {
408 DBG("Sending session stream to kernel consumer");
410 if (session
->metadata_stream_fd
>= 0 && session
->metadata
) {
411 ret
= kernel_consumer_add_metadata(sock
, session
, monitor
);
417 /* Send channel and streams of it */
418 cds_list_for_each_entry(chan
, &session
->channel_list
.head
, list
) {
419 ret
= kernel_consumer_send_channel_stream(sock
, chan
, session
,
426 * Inform the relay that all the streams for the
429 ret
= kernel_consumer_streams_sent(sock
, session
, chan
->fd
);
436 DBG("Kernel consumer FDs of metadata and channel streams sent");
438 session
->consumer_fds_sent
= 1;
445 int kernel_consumer_destroy_channel(struct consumer_socket
*socket
,
446 struct ltt_kernel_channel
*channel
)
449 struct lttcomm_consumer_msg msg
;
454 DBG("Sending kernel consumer destroy channel key %d", channel
->fd
);
456 memset(&msg
, 0, sizeof(msg
));
457 msg
.cmd_type
= LTTNG_CONSUMER_DESTROY_CHANNEL
;
458 msg
.u
.destroy_channel
.key
= channel
->fd
;
460 pthread_mutex_lock(socket
->lock
);
461 health_code_update();
463 ret
= consumer_send_msg(socket
, &msg
);
469 health_code_update();
470 pthread_mutex_unlock(socket
->lock
);
474 int kernel_consumer_destroy_metadata(struct consumer_socket
*socket
,
475 struct ltt_kernel_metadata
*metadata
)
478 struct lttcomm_consumer_msg msg
;
483 DBG("Sending kernel consumer destroy channel key %d", metadata
->fd
);
485 memset(&msg
, 0, sizeof(msg
));
486 msg
.cmd_type
= LTTNG_CONSUMER_DESTROY_CHANNEL
;
487 msg
.u
.destroy_channel
.key
= metadata
->fd
;
489 pthread_mutex_lock(socket
->lock
);
490 health_code_update();
492 ret
= consumer_send_msg(socket
, &msg
);
498 health_code_update();
499 pthread_mutex_unlock(socket
->lock
);