#include <stdlib.h>
#include <sys/stat.h>
#include <unistd.h>
+#include <inttypes.h>
#include <common/common.h>
#include <common/defaults.h>
/* Get the right path name destination */
if (consumer->type == CONSUMER_DST_LOCAL) {
/* Set application path to the destination path */
- ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s",
- consumer->dst.trace_path, consumer->subdir);
+ ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s%s",
+ consumer->dst.session_root_path,
+ consumer->chunk_path,
+ consumer->subdir);
if (ret < 0) {
PERROR("snprintf kernel channel path");
goto error;
+ } else if (ret >= sizeof(tmp_path)) {
+ ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s%s\"",
+ sizeof(tmp_path), ret,
+ consumer->dst.session_root_path,
+ consumer->chunk_path,
+ consumer->subdir);
+ goto error;
}
pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
if (!pathname) {
}
DBG3("Kernel local consumer tracefile path: %s", pathname);
} else {
- ret = snprintf(tmp_path, sizeof(tmp_path), "%s", consumer->subdir);
+ ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s",
+ consumer->dst.net.base_dir,
+ consumer->subdir);
if (ret < 0) {
PERROR("snprintf kernel metadata path");
goto error;
+ } else if (ret >= sizeof(tmp_path)) {
+ ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s\"",
+ sizeof(tmp_path), ret,
+ consumer->dst.net.base_dir,
+ consumer->subdir);
+ goto error;
}
pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
if (!pathname) {
}
/* Prep channel message structure */
- consumer_init_channel_comm_msg(&lkm,
- LTTNG_CONSUMER_ADD_CHANNEL,
- channel->fd,
+ consumer_init_add_channel_comm_msg(&lkm,
+ channel->key,
ksession->id,
pathname,
ksession->uid,
status = notification_thread_command_add_channel(
notification_thread_handle, session->name,
ksession->uid, ksession->gid,
- channel->channel->name, channel->fd,
+ channel->channel->name, channel->key,
LTTNG_DOMAIN_KERNEL,
channel->channel->attr.subbuf_size * channel->channel->attr.num_subbuf);
rcu_read_unlock();
ret = -1;
goto error;
}
+
+ channel->published_to_notification_thread = true;
+
error:
free(pathname);
return ret;
/*
* Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
+ *
+ * The consumer socket lock must be held by the caller.
*/
int kernel_consumer_add_metadata(struct consumer_socket *sock,
struct ltt_kernel_session *session, unsigned int monitor)
}
/* Prep channel message structure */
- consumer_init_channel_comm_msg(&lkm,
- LTTNG_CONSUMER_ADD_CHANNEL,
- session->metadata->fd,
+ consumer_init_add_channel_comm_msg(&lkm,
+ session->metadata->key,
session->id,
pathname,
session->uid,
/* Prep stream message structure */
consumer_init_stream_comm_msg(&lkm,
LTTNG_CONSUMER_ADD_STREAM,
- session->metadata->fd,
+ session->metadata->key,
session->metadata_stream_fd,
0); /* CPU: 0 for metadata. */
/*
* Sending a single stream to the consumer with command ADD_STREAM.
*/
+static
int kernel_consumer_add_stream(struct consumer_socket *sock,
struct ltt_kernel_channel *channel, struct ltt_kernel_stream *stream,
struct ltt_kernel_session *session, unsigned int monitor)
/* Prep stream consumer message */
consumer_init_stream_comm_msg(&lkm,
LTTNG_CONSUMER_ADD_STREAM,
- channel->fd,
+ channel->key,
stream->fd,
stream->cpu);
/*
* Send all stream fds of kernel channel to the consumer.
+ *
+ * The consumer socket lock must be held by the caller.
*/
-int kernel_consumer_send_channel_stream(struct consumer_socket *sock,
+int kernel_consumer_send_channel_streams(struct consumer_socket *sock,
struct ltt_kernel_channel *channel, struct ltt_kernel_session *session,
unsigned int monitor)
{
- int ret;
+ int ret = LTTNG_OK;
struct ltt_kernel_stream *stream;
/* Safety net */
DBG("Sending streams of channel %s to kernel consumer",
channel->channel->name);
- ret = kernel_consumer_add_channel(sock, channel, session, monitor);
- if (ret < 0) {
- goto error;
+ if (!channel->sent_to_consumer) {
+ ret = kernel_consumer_add_channel(sock, channel, session, monitor);
+ if (ret < 0) {
+ goto error;
+ }
+ channel->sent_to_consumer = true;
}
/* Send streams */
/*
* Send all stream fds of the kernel session to the consumer.
+ *
+ * The consumer socket lock must be held by the caller.
*/
int kernel_consumer_send_session(struct consumer_socket *sock,
struct ltt_kernel_session *session)
/* Send channel and streams of it */
cds_list_for_each_entry(chan, &session->channel_list.head, list) {
- ret = kernel_consumer_send_channel_stream(sock, chan, session,
+ ret = kernel_consumer_send_channel_streams(sock, chan, session,
monitor);
if (ret < 0) {
goto error;
* Inform the relay that all the streams for the
* channel were sent.
*/
- ret = kernel_consumer_streams_sent(sock, session, chan->fd);
+ ret = kernel_consumer_streams_sent(sock, session, chan->key);
if (ret < 0) {
goto error;
}
assert(channel);
assert(socket);
- DBG("Sending kernel consumer destroy channel key %d", channel->fd);
+ DBG("Sending kernel consumer destroy channel key %" PRIu64, channel->key);
memset(&msg, 0, sizeof(msg));
msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
- msg.u.destroy_channel.key = channel->fd;
+ msg.u.destroy_channel.key = channel->key;
pthread_mutex_lock(socket->lock);
health_code_update();
assert(metadata);
assert(socket);
- DBG("Sending kernel consumer destroy channel key %d", metadata->fd);
+ DBG("Sending kernel consumer destroy channel key %" PRIu64, metadata->key);
memset(&msg, 0, sizeof(msg));
msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
- msg.u.destroy_channel.key = metadata->fd;
+ msg.u.destroy_channel.key = metadata->key;
pthread_mutex_lock(socket->lock);
health_code_update();