X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=f47c498777531ef6702b41d34b153abba3320e1e;hp=0de73443e1bee5c51549e608d20b1548cb007257;hb=9d9353f936717527863fda5afcbb89aa459f0852;hpb=d8ef542d25837bdfb960e5df2a91c5d18f5ef401 diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 0de73443e..f47c49877 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -34,7 +34,9 @@ #include #include #include +#include #include +#include #include "kernel-consumer.h" @@ -120,6 +122,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, case LTTNG_CONSUMER_ADD_CHANNEL: { struct lttng_consumer_channel *new_channel; + int ret_recv; /* First send a status message before receiving the fds. */ ret = consumer_send_status_msg(sock, ret_code); @@ -127,12 +130,13 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Somehow, the session daemon is not responding anymore. */ goto end_nosignal; } - DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key); new_channel = consumer_allocate_channel(msg.u.channel.channel_key, msg.u.channel.session_id, msg.u.channel.pathname, msg.u.channel.name, msg.u.channel.uid, msg.u.channel.gid, - msg.u.channel.relayd_id, msg.u.channel.output); + msg.u.channel.relayd_id, msg.u.channel.output, + msg.u.channel.tracefile_size, + msg.u.channel.tracefile_count); if (new_channel == NULL) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; @@ -151,20 +155,28 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, }; if (ctx->on_recv_channel != NULL) { - ret = ctx->on_recv_channel(new_channel); - if (ret == 0) { - consumer_add_channel(new_channel, ctx); - } else if (ret < 0) { + ret_recv = ctx->on_recv_channel(new_channel); + if (ret_recv == 0) { + ret = consumer_add_channel(new_channel, ctx); + } else if (ret_recv < 0) { goto end_nosignal; } } else { - consumer_add_channel(new_channel, ctx); + ret = consumer_add_channel(new_channel, ctx); + } + + /* If we received an error in add_channel, we need to report it. */ + if (ret != 0) { + consumer_send_status_msg(sock, ret); + goto end_nosignal; } + goto end_nosignal; } case LTTNG_CONSUMER_ADD_STREAM: { - int fd, stream_pipe; + int fd; + struct lttng_pipe *stream_pipe; struct consumer_relayd_sock_pair *relayd = NULL; struct lttng_consumer_stream *new_stream; struct lttng_consumer_channel *channel; @@ -243,6 +255,12 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, new_stream->chan = channel; new_stream->wait_fd = fd; + /* Metadata chan refcount is increment in add_metadata_stream */ + if (new_stream->chan->type != CONSUMER_CHANNEL_TYPE_METADATA) { + /* Update channel refcount */ + uatomic_inc(&new_stream->chan->refcount); + } + /* * The buffer flush is done on the session daemon side for the kernel * so no need for the stream "hangup_flush_done" variable to be @@ -259,7 +277,9 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_add_stream(&relayd->control_sock, new_stream->name, new_stream->chan->pathname, - &new_stream->relayd_stream_id); + &new_stream->relayd_stream_id, + new_stream->chan->tracefile_size, + new_stream->chan->tracefile_count); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { consumer_del_stream(new_stream, NULL); @@ -282,18 +302,16 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Get the right pipe where the stream will be sent. */ if (new_stream->metadata_flag) { - stream_pipe = ctx->consumer_metadata_pipe[1]; + stream_pipe = ctx->consumer_metadata_pipe; } else { - stream_pipe = ctx->consumer_data_pipe[1]; + stream_pipe = ctx->consumer_data_pipe; } - do { - ret = write(stream_pipe, &new_stream, sizeof(new_stream)); - } while (ret < 0 && errno == EINTR); + ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream)); if (ret < 0) { - PERROR("Consumer write %s stream to pipe %d", + ERR("Consumer write %s stream to pipe %d", new_stream->metadata_flag ? "metadata" : "data", - stream_pipe); + lttng_pipe_get_writefd(stream_pipe)); consumer_del_stream(new_stream, NULL); goto end_nosignal; } @@ -501,26 +519,19 @@ end: int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) { int ret; - char full_path[PATH_MAX]; assert(stream); - ret = snprintf(full_path, sizeof(full_path), "%s/%s", - stream->chan->pathname, stream->name); - if (ret < 0) { - PERROR("snprintf on_recv_stream"); - goto error; - } - - /* Opening the tracefile in write mode */ + /* Don't create anything if this is set for streaming. */ if (stream->net_seq_idx == (uint64_t) -1ULL) { - ret = run_as_open(full_path, O_WRONLY | O_CREAT | O_TRUNC, - S_IRWXU|S_IRWXG|S_IRWXO, stream->uid, stream->gid); + ret = utils_create_stream_file(stream->chan->pathname, stream->name, + stream->chan->tracefile_size, stream->tracefile_count_current, + stream->uid, stream->gid); if (ret < 0) { - PERROR("open kernel stream path %s", full_path); goto error; } stream->out_fd = ret; + stream->tracefile_size_current = 0; } if (stream->output == LTTNG_EVENT_MMAP) {