X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=bfec4d2f0db343c067c288f62f42d426ed88afb6;hb=56591bac20c0f3b728c95d92702d243de838bdc4;hp=0ac3942d20470b6112351cb11bd908d9b6a44bcf;hpb=2f225ce2a30ad1172d65f77409aadf102825f50b;p=lttng-tools.git diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 0ac3942d2..bfec4d2f0 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -57,8 +57,8 @@ int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream) ret = kernctl_snapshot(infd); if (ret != 0) { - errno = -ret; perror("Getting sub-buffer snapshot."); + ret = -errno; } return ret; @@ -77,8 +77,8 @@ int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream, ret = kernctl_snapshot_get_produced(infd, pos); if (ret != 0) { - errno = -ret; perror("kernctl_snapshot_get_produced"); + ret = -errno; } return ret; @@ -97,8 +97,8 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, ret = kernctl_snapshot_get_consumed(infd, pos); if (ret != 0) { - errno = -ret; perror("kernctl_snapshot_get_consumed"); + ret = -errno; } return ret; @@ -110,20 +110,21 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, * Returns 0 on success, < 0 on error */ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, - uint64_t relayd_id, struct lttng_consumer_local_data *ctx) + uint64_t relayd_id, uint64_t max_stream_size, + struct lttng_consumer_local_data *ctx) { int ret; unsigned long consumed_pos, produced_pos; struct lttng_consumer_channel *channel; struct lttng_consumer_stream *stream; - DBG("Kernel consumer snapshot channel %lu", key); + DBG("Kernel consumer snapshot channel %" PRIu64, key); rcu_read_lock(); channel = consumer_find_channel(key); if (!channel) { - ERR("No channel found for key %lu", key); + ERR("No channel found for key %" PRIu64, key); ret = -1; goto end; } @@ -166,13 +167,14 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, stream->out_fd = ret; stream->tracefile_size_current = 0; - DBG("Kernel consumer snapshot stream %s/%s (%lu)", path, - stream->name, stream->key); + DBG("Kernel consumer snapshot stream %s/%s (%" PRIu64 ")", + path, stream->name, stream->key); } ret = kernctl_buffer_flush(stream->wait_fd); if (ret < 0) { ERR("Failed to flush kernel stream"); + ret = -errno; goto end_unlock; } @@ -199,10 +201,20 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, &stream->max_sb_size); if (ret < 0) { ERR("Getting kernel max_sb_size"); + ret = -errno; goto end_unlock; } } + /* + * The original value is sent back if max stream size is larger than + * the possible size of the snapshot. Also, we asume that the session + * daemon should never send a maximum stream size that is lower than + * subbuffer size. + */ + consumed_pos = consumer_get_consumed_maxsize(consumed_pos, + produced_pos, max_stream_size); + while (consumed_pos < produced_pos) { ssize_t read_len; unsigned long len, padded_len; @@ -213,6 +225,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, if (ret < 0) { if (errno != EAGAIN) { PERROR("kernctl_get_subbuf snapshot"); + ret = -errno; goto end_unlock; } DBG("Kernel consumer get subbuf failed. Skipping it."); @@ -223,12 +236,14 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, ret = kernctl_get_subbuf_size(stream->wait_fd, &len); if (ret < 0) { ERR("Snapshot kernctl_get_subbuf_size"); + ret = -errno; goto error_put_subbuf; } ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len); if (ret < 0) { ERR("Snapshot kernctl_get_padded_subbuf_size"); + ret = -errno; goto error_put_subbuf; } @@ -254,18 +269,21 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, ret = kernctl_put_subbuf(stream->wait_fd); if (ret < 0) { ERR("Snapshot kernctl_put_subbuf"); + ret = -errno; goto end_unlock; } consumed_pos += stream->max_sb_size; } if (relayd_id == (uint64_t) -1ULL) { - ret = close(stream->out_fd); - if (ret < 0) { - PERROR("Kernel consumer snapshot close out_fd"); - goto end_unlock; + if (stream->out_fd >= 0) { + ret = close(stream->out_fd); + if (ret < 0) { + PERROR("Kernel consumer snapshot close out_fd"); + goto end_unlock; + } + stream->out_fd = -1; } - stream->out_fd = -1; } else { close_relayd_stream(stream); stream->net_seq_idx = (uint64_t) -1ULL; @@ -280,6 +298,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, error_put_subbuf: ret = kernctl_put_subbuf(stream->wait_fd); if (ret < 0) { + ret = -errno; ERR("Snapshot kernctl_put_subbuf error path"); } end_unlock: @@ -343,8 +362,8 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path, do { ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx); if (ret_read < 0) { - if (ret_read != -EPERM) { - ERR("Kernel snapshot reading metadata subbuffer (ret: %ld)", + if (ret_read != -EAGAIN) { + ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)", ret_read); goto error; } @@ -357,19 +376,24 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path, close_relayd_stream(metadata_stream); metadata_stream->net_seq_idx = (uint64_t) -1ULL; } else { - ret = close(metadata_stream->out_fd); - if (ret < 0) { - PERROR("Kernel consumer snapshot metadata close out_fd"); - /* - * Don't go on error here since the snapshot was successful at this - * point but somehow the close failed. - */ + if (metadata_stream->out_fd >= 0) { + ret = close(metadata_stream->out_fd); + if (ret < 0) { + PERROR("Kernel consumer snapshot metadata close out_fd"); + /* + * Don't go on error here since the snapshot was successful at this + * point but somehow the close failed. + */ + } + metadata_stream->out_fd = -1; } - metadata_stream->out_fd = -1; } ret = 0; + cds_list_del(&metadata_stream->send_node); + consumer_stream_destroy(metadata_stream, NULL); + metadata_channel->metadata_stream = NULL; error: rcu_read_unlock(); return ret; @@ -389,8 +413,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); if (ret != sizeof(msg)) { - lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); if (ret > 0) { + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); ret = -1; } return ret; @@ -556,7 +580,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, channel->session_id, msg.u.stream.cpu, &alloc_ret, - channel->type); + channel->type, + channel->monitor); if (new_stream == NULL) { switch (alloc_ret) { case -ENOMEM: @@ -623,25 +648,50 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, break; } - ret = consumer_send_relayd_stream(new_stream, NULL); - if (ret < 0) { - consumer_stream_free(new_stream); - goto end_nosignal; + /* Send stream to relayd if the stream has an ID. */ + if (new_stream->net_seq_idx != (uint64_t) -1ULL) { + ret = consumer_send_relayd_stream(new_stream, + new_stream->chan->pathname); + if (ret < 0) { + consumer_stream_free(new_stream); + goto end_nosignal; + } } /* Get the right pipe where the stream will be sent. */ if (new_stream->metadata_flag) { + ret = consumer_add_metadata_stream(new_stream); + if (ret) { + ERR("Consumer add metadata stream %" PRIu64 " failed. Continuing", + new_stream->key); + consumer_stream_free(new_stream); + goto end_nosignal; + } stream_pipe = ctx->consumer_metadata_pipe; } else { + ret = consumer_add_data_stream(new_stream); + if (ret) { + ERR("Consumer add stream %" PRIu64 " failed. Continuing", + new_stream->key); + consumer_stream_free(new_stream); + goto end_nosignal; + } stream_pipe = ctx->consumer_data_pipe; } + /* Vitible to other threads */ + new_stream->globally_visible = 1; + ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream)); if (ret < 0) { ERR("Consumer write %s stream to pipe %d", new_stream->metadata_flag ? "metadata" : "data", lttng_pipe_get_writefd(stream_pipe)); - consumer_stream_free(new_stream); + if (new_stream->metadata_flag) { + consumer_del_stream_for_metadata(new_stream); + } else { + consumer_del_stream_for_data(new_stream); + } goto end_nosignal; } @@ -725,7 +775,9 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } else { ret = lttng_kconsumer_snapshot_channel(msg.u.snapshot_channel.key, msg.u.snapshot_channel.pathname, - msg.u.snapshot_channel.relayd_id, ctx); + msg.u.snapshot_channel.relayd_id, + msg.u.snapshot_channel.max_stream_size, + ctx); if (ret < 0) { ERR("Snapshot channel failed"); ret_code = LTTNG_ERR_KERN_CHAN_FAIL; @@ -806,7 +858,6 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, /* Get the next subbuffer */ err = kernctl_get_next_subbuf(infd); if (err != 0) { - ret = err; /* * This is a debug message even for single-threaded consumer, * because poll() have more relaxed criterions than get subbuf, @@ -815,15 +866,15 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, */ DBG("Reserving sub buffer failed (everything is normal, " "it is due to concurrency)"); + ret = -errno; goto end; } /* Get the full subbuffer size including padding */ err = kernctl_get_padded_subbuf_size(infd, &len); if (err != 0) { - errno = -err; perror("Getting sub-buffer len failed."); - ret = err; + ret = -errno; goto end; } @@ -857,9 +908,8 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, /* Get subbuffer size without padding */ err = kernctl_get_subbuf_size(infd, &subbuf_size); if (err != 0) { - errno = -err; perror("Getting sub-buffer len failed."); - ret = err; + ret = -errno; goto end; } @@ -889,20 +939,18 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, break; default: ERR("Unknown output method"); - ret = -1; + ret = -EPERM; } err = kernctl_put_next_subbuf(infd); if (err != 0) { - errno = -err; if (errno == EFAULT) { perror("Error in unreserving sub buffer\n"); } else if (errno == EIO) { /* Should never happen with newer LTTng versions */ perror("Reader has been pushed by the writer, last sub-buffer corrupted."); } - - ret = -err; + ret = -errno; goto end; } @@ -937,8 +985,8 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len); if (ret != 0) { - errno = -ret; PERROR("kernctl_get_mmap_len"); + ret = -errno; goto error_close_fd; } stream->mmap_len = (size_t) mmap_len; @@ -981,6 +1029,11 @@ int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream) assert(stream); + if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) { + ret = 0; + goto end; + } + ret = kernctl_get_next_subbuf(stream->wait_fd); if (ret == 0) { /* There is still data so let's put back this subbuffer. */