/*
* Sending a single channel to the consumer with command ADD_CHANNEL.
*/
-int kernel_consumer_add_channel(int sock, struct ltt_kernel_channel *channel)
+int kernel_consumer_add_channel(struct consumer_socket *sock,
+ struct ltt_kernel_channel *channel)
{
int ret;
struct lttcomm_consumer_msg lkm;
channel->fd,
channel->channel->attr.subbuf_size,
0, /* Kernel */
- channel->channel->name);
+ channel->channel->name,
+ channel->stream_count);
+
+ health_code_update(&health_thread_kernel);
ret = consumer_send_channel(sock, &lkm);
if (ret < 0) {
goto error;
}
+ health_code_update(&health_thread_kernel);
+
error:
return ret;
}
/*
* Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
*/
-int kernel_consumer_add_metadata(int sock, struct ltt_kernel_session *session)
+int kernel_consumer_add_metadata(struct consumer_socket *sock,
+ struct ltt_kernel_session *session)
{
int ret;
+ char tmp_path[PATH_MAX];
const char *pathname;
struct lttcomm_consumer_msg lkm;
- struct consumer_output *output;
+ struct consumer_output *consumer;
/* Safety net */
assert(session);
assert(session->consumer);
+ assert(sock);
DBG("Sending metadata %d to kernel consumer", session->metadata_stream_fd);
/* Get consumer output pointer */
- output = session->consumer;
+ consumer = session->consumer;
+
+ /* Get the right path name destination */
+ if (consumer->type == CONSUMER_DST_LOCAL) {
+ /* Set application path to the destination path */
+ ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
+ consumer->dst.trace_path, consumer->subdir);
+ if (ret < 0) {
+ PERROR("snprintf metadata path");
+ goto error;
+ }
+ pathname = tmp_path;
- /* Get correct path name destination */
- if (output->type == CONSUMER_DST_LOCAL) {
- pathname = output->dst.trace_path;
+ /* Create directory */
+ ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG,
+ session->uid, session->gid);
+ if (ret < 0) {
+ if (ret != -EEXIST) {
+ ERR("Trace directory creation error");
+ goto error;
+ }
+ }
+ DBG3("Kernel local consumer tracefile path: %s", pathname);
} else {
- pathname = output->subdir;
+ ret = snprintf(tmp_path, sizeof(tmp_path), "%s", consumer->subdir);
+ if (ret < 0) {
+ PERROR("snprintf metadata path");
+ goto error;
+ }
+ pathname = tmp_path;
+ DBG3("Kernel network consumer subdir path: %s", pathname);
}
/* Prep channel message structure */
session->metadata->fd,
session->metadata->conf->attr.subbuf_size,
0, /* for kernel */
- "metadata");
+ "metadata",
+ 1);
+
+ health_code_update(&health_thread_kernel);
ret = consumer_send_channel(sock, &lkm);
if (ret < 0) {
goto error;
}
+ health_code_update(&health_thread_kernel);
+
/* Prep stream message structure */
consumer_init_stream_comm_msg(&lkm,
LTTNG_CONSUMER_ADD_STREAM,
0, /* Kernel */
session->uid,
session->gid,
- output->net_seq_index,
+ consumer->net_seq_index,
1, /* Metadata flag set */
"metadata",
- pathname);
+ pathname,
+ session->id);
+
+ health_code_update(&health_thread_kernel);
/* Send stream and file descriptor */
- ret = consumer_send_stream(sock, output, &lkm,
+ ret = consumer_send_stream(sock, consumer, &lkm,
&session->metadata_stream_fd, 1);
if (ret < 0) {
goto error;
}
+ health_code_update(&health_thread_kernel);
+
error:
return ret;
}
/*
* Sending a single stream to the consumer with command ADD_STREAM.
*/
-int kernel_consumer_add_stream(int sock, struct ltt_kernel_channel *channel,
- struct ltt_kernel_stream *stream, struct ltt_kernel_session *session)
+int kernel_consumer_add_stream(struct consumer_socket *sock,
+ struct ltt_kernel_channel *channel, struct ltt_kernel_stream *stream,
+ struct ltt_kernel_session *session)
{
int ret;
+ char tmp_path[PATH_MAX];
const char *pathname;
struct lttcomm_consumer_msg lkm;
- struct consumer_output *output;
+ struct consumer_output *consumer;
assert(channel);
assert(stream);
assert(session);
assert(session->consumer);
+ assert(sock);
DBG("Sending stream %d of channel %s to kernel consumer",
stream->fd, channel->channel->name);
/* Get consumer output pointer */
- output = session->consumer;
+ consumer = session->consumer;
- /* Get correct path name destination */
- if (output->type == CONSUMER_DST_LOCAL) {
- pathname = output->dst.trace_path;
- DBG3("Consumer is local to %s", pathname);
+ /* Get the right path name destination */
+ if (consumer->type == CONSUMER_DST_LOCAL) {
+ /* Set application path to the destination path */
+ ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
+ consumer->dst.trace_path, consumer->subdir);
+ if (ret < 0) {
+ PERROR("snprintf stream path");
+ goto error;
+ }
+ pathname = tmp_path;
+ DBG3("Kernel local consumer tracefile path: %s", pathname);
} else {
- pathname = output->subdir;
- DBG3("Consumer is network to subdir %s", pathname);
+ ret = snprintf(tmp_path, sizeof(tmp_path), "%s", consumer->subdir);
+ if (ret < 0) {
+ PERROR("snprintf stream path");
+ goto error;
+ }
+ pathname = tmp_path;
+ DBG3("Kernel network consumer subdir path: %s", pathname);
}
/* Prep stream consumer message */
0, /* Kernel */
session->uid,
session->gid,
- output->net_seq_index,
+ consumer->net_seq_index,
0, /* Metadata flag unset */
stream->name,
- pathname);
+ pathname,
+ session->id);
+
+ health_code_update(&health_thread_kernel);
/* Send stream and file descriptor */
- ret = consumer_send_stream(sock, output, &lkm, &stream->fd, 1);
+ ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
if (ret < 0) {
goto error;
}
+ health_code_update(&health_thread_kernel);
+
error:
return ret;
}
/*
* Send all stream fds of kernel channel to the consumer.
*/
-int kernel_consumer_send_channel_stream(int sock,
+int kernel_consumer_send_channel_stream(struct consumer_socket *sock,
struct ltt_kernel_channel *channel, struct ltt_kernel_session *session)
{
int ret;
assert(channel);
assert(session);
assert(session->consumer);
+ assert(sock);
/* Bail out if consumer is disabled */
if (!session->consumer->enabled) {
- ret = LTTCOMM_OK;
+ ret = LTTNG_OK;
goto error;
}
/*
* Send all stream fds of the kernel session to the consumer.
*/
-int kernel_consumer_send_session(int sock, struct ltt_kernel_session *session)
+int kernel_consumer_send_session(struct consumer_socket *sock,
+ struct ltt_kernel_session *session)
{
int ret;
struct ltt_kernel_channel *chan;
/* Safety net */
assert(session);
assert(session->consumer);
+ assert(sock);
/* Bail out if consumer is disabled */
if (!session->consumer->enabled) {
- ret = LTTCOMM_OK;
+ ret = LTTNG_OK;
goto error;
}