X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=13cbe2149de6852ed08e6532f6025ff3e382a103;hp=11701aebfb9869b7e6dbac8deb26828bd3d8cebe;hb=e316aad5fbbe3782872083cb68dfdd58bccea811;hpb=fb3a43a9284f3300e9b66edc2f2c2d2767895423 diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 11701aebf..13cbe2149 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -118,7 +118,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, new_channel = consumer_allocate_channel(msg.u.channel.channel_key, -1, -1, msg.u.channel.mmap_len, - msg.u.channel.max_sb_size); + msg.u.channel.max_sb_size, + msg.u.channel.nb_init_streams); if (new_channel == NULL) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; @@ -140,6 +141,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int fd; struct consumer_relayd_sock_pair *relayd = NULL; struct lttng_consumer_stream *new_stream; + int alloc_ret = 0; /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { @@ -165,9 +167,23 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.stream.uid, msg.u.stream.gid, msg.u.stream.net_index, - msg.u.stream.metadata_flag); + msg.u.stream.metadata_flag, + &alloc_ret); if (new_stream == NULL) { - lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); + switch (alloc_ret) { + case -ENOMEM: + case -EINVAL: + default: + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); + break; + case -ENOENT: + /* + * We could not find the channel. Can happen if cpu hotplug + * happens while tearing down. + */ + DBG3("Could not find channel"); + break; + } goto end_nosignal; } @@ -190,39 +206,42 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, &new_stream->relayd_stream_id); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { + consumer_del_stream(new_stream, NULL); goto end_nosignal; } } else if (msg.u.stream.net_index != -1) { ERR("Network sequence index %d unknown. Not adding stream.", msg.u.stream.net_index); - free(new_stream); + consumer_del_stream(new_stream, NULL); goto end_nosignal; } - /* Send stream to the metadata thread */ - if (new_stream->metadata_flag) { - if (ctx->on_recv_stream) { - ret = ctx->on_recv_stream(new_stream); - if (ret < 0) { - goto end_nosignal; - } + if (ctx->on_recv_stream) { + ret = ctx->on_recv_stream(new_stream); + if (ret < 0) { + consumer_del_stream(new_stream, NULL); + goto end_nosignal; } + } + /* Send stream to the metadata thread */ + if (new_stream->metadata_flag) { do { - ret = write(ctx->consumer_metadata_pipe[1], new_stream, - sizeof(struct lttng_consumer_stream)); + ret = write(ctx->consumer_metadata_pipe[1], &new_stream, + sizeof(new_stream)); } while (ret < 0 && errno == EINTR); if (ret < 0) { PERROR("write metadata pipe"); + consumer_del_stream(new_stream, NULL); } } else { - if (ctx->on_recv_stream) { - ret = ctx->on_recv_stream(new_stream); - if (ret < 0) { - goto end_nosignal; - } + ret = consumer_add_stream(new_stream); + if (ret) { + ERR("Consumer add stream %d failed. Continuing", + new_stream->key); + consumer_del_stream(new_stream, NULL); + goto end_nosignal; } - consumer_add_stream(new_stream); } DBG("Kernel consumer_add_stream (%d)", fd); @@ -295,7 +314,7 @@ end_nosignal: ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { - unsigned long len; + unsigned long len, subbuf_size, padding; int err; ssize_t ret = 0; int infd = stream->wait_fd; @@ -304,6 +323,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, /* Get the next subbuffer */ err = kernctl_get_next_subbuf(infd); if (err != 0) { + ret = err; /* * This is a debug message even for single-threaded consumer, * because poll() have more relaxed criterions than get subbuf, @@ -315,60 +335,92 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, goto end; } + /* Get the full subbuffer size including padding */ + err = kernctl_get_padded_subbuf_size(infd, &len); + if (err != 0) { + errno = -err; + perror("Getting sub-buffer len failed."); + ret = err; + goto end; + } + switch (stream->output) { - case LTTNG_EVENT_SPLICE: - /* read the whole subbuffer */ - err = kernctl_get_padded_subbuf_size(infd, &len); - if (err != 0) { - errno = -ret; - perror("Getting sub-buffer len failed."); - goto end; - } + case LTTNG_EVENT_SPLICE: - /* splice the subbuffer to the tracefile */ - ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, len); - if (ret != len) { - /* - * display the error but continue processing to try - * to release the subbuffer - */ - ERR("Error splicing to tracefile (ret: %zd != len: %lu)", - ret, len); - } + /* + * XXX: The lttng-modules splice "actor" does not handle copying + * partial pages hence only using the subbuffer size without the + * padding makes the splice fail. + */ + subbuf_size = len; + padding = 0; - break; - case LTTNG_EVENT_MMAP: - /* read the used subbuffer size */ - err = kernctl_get_padded_subbuf_size(infd, &len); - if (err != 0) { - errno = -ret; - perror("Getting sub-buffer len failed."); - goto end; - } - /* write the subbuffer to the tracefile */ - ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len); - if (ret != len) { - /* - * display the error but continue processing to try - * to release the subbuffer - */ - ERR("Error writing to tracefile"); - } - break; - default: - ERR("Unknown output method"); - ret = -1; + /* splice the subbuffer to the tracefile */ + ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, subbuf_size, + padding); + /* + * XXX: Splice does not support network streaming so the return value + * is simply checked against subbuf_size and not like the mmap() op. + */ + if (ret != subbuf_size) { + /* + * display the error but continue processing to try + * to release the subbuffer + */ + ERR("Error splicing to tracefile (ret: %zd != len: %lu)", + ret, subbuf_size); + } + break; + case LTTNG_EVENT_MMAP: + /* Get subbuffer size without padding */ + err = kernctl_get_subbuf_size(infd, &subbuf_size); + if (err != 0) { + errno = -err; + perror("Getting sub-buffer len failed."); + ret = err; + goto end; + } + + /* Make sure the tracer is not gone mad on us! */ + assert(len >= subbuf_size); + + padding = len - subbuf_size; + + /* write the subbuffer to the tracefile */ + ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, + padding); + /* + * The mmap operation should write subbuf_size amount of data when + * network streaming or the full padding (len) size when we are _not_ + * streaming. + */ + if ((ret != subbuf_size && stream->net_seq_idx != -1) || + (ret != len && stream->net_seq_idx == -1)) { + /* + * Display the error but continue processing to try to release the + * subbuffer + */ + ERR("Error writing to tracefile " + "(ret: %zd != len: %lu != subbuf_size: %lu)", + ret, len, subbuf_size); + } + break; + default: + ERR("Unknown output method"); + ret = -1; } err = kernctl_put_next_subbuf(infd); if (err != 0) { - errno = -ret; + errno = -err; if (errno == EFAULT) { perror("Error in unreserving sub buffer\n"); } else if (errno == EIO) { /* Should never happen with newer LTTng versions */ perror("Reader has been pushed by the writer, last sub-buffer corrupted."); } + + ret = -err; goto end; }