X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=bc3ddc930cf1cb8a10d0cd03b9150a4385c15bed;hp=40d0f71960fb6204019b508bd0ca1caffb4f838c;hb=fe4477ee14abb348ce9e167f8b4c09312d67de36;hpb=f50f23d9f80ed9fae7fe5c49aee65e813e0031c8 diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 40d0f7196..bc3ddc930 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -35,6 +35,7 @@ #include #include #include +#include #include "kernel-consumer.h" @@ -47,8 +48,7 @@ extern volatile int consumer_quit; * * Returns 0 on success, < 0 on error */ -int lttng_kconsumer_take_snapshot(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream) +int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream) { int ret = 0; int infd = stream->wait_fd; @@ -67,9 +67,7 @@ int lttng_kconsumer_take_snapshot(struct lttng_consumer_local_data *ctx, * * Returns 0 on success, < 0 on error */ -int lttng_kconsumer_get_produced_snapshot( - struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream, +int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos) { int ret; @@ -117,7 +115,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Session daemon status message are handled in the following call. */ ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index, msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll, - &msg.u.relayd_sock.sock); + &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id); goto end_nosignal; } case LTTNG_CONSUMER_ADD_CHANNEL: @@ -131,25 +129,39 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_nosignal; } - DBG("consumer_add_channel %d", msg.u.channel.channel_key); + DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key); new_channel = consumer_allocate_channel(msg.u.channel.channel_key, - -1, -1, - msg.u.channel.mmap_len, - msg.u.channel.max_sb_size, - msg.u.channel.nb_init_streams); + msg.u.channel.session_id, msg.u.channel.pathname, + msg.u.channel.name, msg.u.channel.uid, msg.u.channel.gid, + msg.u.channel.relayd_id, msg.u.channel.output, + msg.u.channel.tracefile_size, + msg.u.channel.tracefile_count); if (new_channel == NULL) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; } + new_channel->nb_init_stream_left = msg.u.channel.nb_init_streams; + + /* Translate and save channel type. */ + switch (msg.u.channel.type) { + case CONSUMER_CHANNEL_TYPE_DATA: + case CONSUMER_CHANNEL_TYPE_METADATA: + new_channel->type = msg.u.channel.type; + break; + default: + assert(0); + goto end_nosignal; + }; + if (ctx->on_recv_channel != NULL) { ret = ctx->on_recv_channel(new_channel); if (ret == 0) { - consumer_add_channel(new_channel); + consumer_add_channel(new_channel, ctx); } else if (ret < 0) { goto end_nosignal; } } else { - consumer_add_channel(new_channel); + consumer_add_channel(new_channel, ctx); } goto end_nosignal; } @@ -158,12 +170,30 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int fd, stream_pipe; struct consumer_relayd_sock_pair *relayd = NULL; struct lttng_consumer_stream *new_stream; + struct lttng_consumer_channel *channel; int alloc_ret = 0; + /* + * Get stream's channel reference. Needed when adding the stream to the + * global hash table. + */ + channel = consumer_find_channel(msg.u.stream.channel_key); + if (!channel) { + /* + * We could not find the channel. Can happen if cpu hotplug + * happens while tearing down. + */ + ERR("Unable to find channel key %" PRIu64, msg.u.stream.channel_key); + ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND; + } + /* First send a status message before receiving the fds. */ ret = consumer_send_status_msg(sock, ret_code); - if (ret < 0) { - /* Somehow, the session daemon is not responding anymore. */ + if (ret < 0 || ret_code != LTTNG_OK) { + /* + * Somehow, the session daemon is not responding anymore or the + * channel was not found. + */ goto end_nosignal; } @@ -192,19 +222,17 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_nosignal; } - new_stream = consumer_allocate_stream(msg.u.stream.channel_key, - msg.u.stream.stream_key, - fd, fd, - msg.u.stream.state, - msg.u.stream.mmap_len, - msg.u.stream.output, - msg.u.stream.path_name, - msg.u.stream.uid, - msg.u.stream.gid, - msg.u.stream.net_index, - msg.u.stream.metadata_flag, - msg.u.stream.session_id, - &alloc_ret); + new_stream = consumer_allocate_stream(channel->key, + fd, + LTTNG_CONSUMER_ACTIVE_STREAM, + channel->name, + channel->uid, + channel->gid, + channel->relayd_id, + channel->session_id, + msg.u.stream.cpu, + &alloc_ret, + channel->type); if (new_stream == NULL) { switch (alloc_ret) { case -ENOMEM: @@ -212,16 +240,11 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, default: lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); break; - case -ENOENT: - /* - * We could not find the channel. Can happen if cpu hotplug - * happens while tearing down. - */ - DBG3("Could not find channel"); - break; } goto end_nosignal; } + new_stream->chan = channel; + new_stream->wait_fd = fd; /* * The buffer flush is done on the session daemon side for the kernel @@ -233,21 +256,21 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, new_stream->hangup_flush_done = 0; /* The stream is not metadata. Get relayd reference if exists. */ - relayd = consumer_find_relayd(msg.u.stream.net_index); + relayd = consumer_find_relayd(new_stream->net_seq_idx); if (relayd != NULL) { /* Add stream on the relayd */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_add_stream(&relayd->control_sock, - msg.u.stream.name, msg.u.stream.path_name, + new_stream->name, new_stream->chan->pathname, &new_stream->relayd_stream_id); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { consumer_del_stream(new_stream, NULL); goto end_nosignal; } - } else if (msg.u.stream.net_index != -1) { - ERR("Network sequence index %d unknown. Not adding stream.", - msg.u.stream.net_index); + } else if (new_stream->net_seq_idx != (uint64_t) -1ULL) { + ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.", + new_stream->net_seq_idx); consumer_del_stream(new_stream, NULL); goto end_nosignal; } @@ -279,7 +302,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } DBG("Kernel consumer ADD_STREAM %s (fd: %d) with relayd id %" PRIu64, - msg.u.stream.path_name, fd, new_stream->relayd_stream_id); + new_stream->name, fd, new_stream->relayd_stream_id); break; } case LTTNG_CONSUMER_UPDATE_STREAM: @@ -297,7 +320,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Get relayd reference if exists. */ relayd = consumer_find_relayd(index); if (relayd == NULL) { - ERR("Unable to find relayd %" PRIu64, index); + DBG("Unable to find relayd %" PRIu64, index); ret_code = LTTNG_ERR_NO_CONSUMER; } @@ -394,7 +417,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, goto end; } - switch (stream->output) { + switch (stream->chan->output) { case LTTNG_EVENT_SPLICE: /* @@ -444,8 +467,8 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, * network streaming or the full padding (len) size when we are _not_ * streaming. */ - if ((ret != subbuf_size && stream->net_seq_idx != -1) || - (ret != len && stream->net_seq_idx == -1)) { + if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) || + (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) { /* * Display the error but continue processing to try to release the * subbuffer @@ -482,18 +505,18 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) { int ret; - /* Opening the tracefile in write mode */ - if (strlen(stream->path_name) > 0 && stream->net_seq_idx == -1) { - ret = run_as_open(stream->path_name, - O_WRONLY|O_CREAT|O_TRUNC, - S_IRWXU|S_IRWXG|S_IRWXO, + assert(stream); + + /* Don't create anything if this is set for streaming. */ + if (stream->net_seq_idx == (uint64_t) -1ULL) { + ret = utils_create_stream_file(stream->chan->pathname, stream->name, + stream->chan->tracefile_size, stream->tracefile_count_current, stream->uid, stream->gid); if (ret < 0) { - ERR("Opening %s", stream->path_name); - perror("open"); goto error; } stream->out_fd = ret; + stream->tracefile_size_current = 0; } if (stream->output == LTTNG_EVENT_MMAP) { @@ -503,15 +526,15 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len); if (ret != 0) { errno = -ret; - perror("kernctl_get_mmap_len"); + PERROR("kernctl_get_mmap_len"); goto error_close_fd; } stream->mmap_len = (size_t) mmap_len; - stream->mmap_base = mmap(NULL, stream->mmap_len, - PROT_READ, MAP_PRIVATE, stream->wait_fd, 0); + stream->mmap_base = mmap(NULL, stream->mmap_len, PROT_READ, + MAP_PRIVATE, stream->wait_fd, 0); if (stream->mmap_base == MAP_FAILED) { - perror("Error mmaping"); + PERROR("Error mmaping"); ret = -1; goto error_close_fd; }