X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=09ccda329526b69cf1e5aa147ed32f4dc8a67e81;hp=3086abefa2b88cb2abfc5632fafd54cd58946e21;hb=309167d2a6f59d0c8cbf64eb23ba912cdea76a34;hpb=81ea21bfd975bdf60dd30229a4caece7c98a2912 diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 3086abefa..09ccda329 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -38,6 +38,7 @@ #include #include #include +#include #include "kernel-consumer.h" @@ -57,8 +58,8 @@ int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream) ret = kernctl_snapshot(infd); if (ret != 0) { - errno = -ret; perror("Getting sub-buffer snapshot."); + ret = -errno; } return ret; @@ -77,8 +78,8 @@ int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream, ret = kernctl_snapshot_get_produced(infd, pos); if (ret != 0) { - errno = -ret; perror("kernctl_snapshot_get_produced"); + ret = -errno; } return ret; @@ -97,8 +98,8 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, ret = kernctl_snapshot_get_consumed(infd, pos); if (ret != 0) { - errno = -ret; perror("kernctl_snapshot_get_consumed"); + ret = -errno; } return ret; @@ -158,7 +159,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, ret = utils_create_stream_file(path, stream->name, stream->chan->tracefile_size, stream->tracefile_count_current, - stream->uid, stream->gid); + stream->uid, stream->gid, NULL); if (ret < 0) { ERR("utils_create_stream_file"); goto end_unlock; @@ -174,6 +175,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, ret = kernctl_buffer_flush(stream->wait_fd); if (ret < 0) { ERR("Failed to flush kernel stream"); + ret = -errno; goto end_unlock; } @@ -200,6 +202,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, &stream->max_sb_size); if (ret < 0) { ERR("Getting kernel max_sb_size"); + ret = -errno; goto end_unlock; } } @@ -223,6 +226,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, if (ret < 0) { if (errno != EAGAIN) { PERROR("kernctl_get_subbuf snapshot"); + ret = -errno; goto end_unlock; } DBG("Kernel consumer get subbuf failed. Skipping it."); @@ -233,17 +237,19 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, ret = kernctl_get_subbuf_size(stream->wait_fd, &len); if (ret < 0) { ERR("Snapshot kernctl_get_subbuf_size"); + ret = -errno; goto error_put_subbuf; } ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len); if (ret < 0) { ERR("Snapshot kernctl_get_padded_subbuf_size"); + ret = -errno; goto error_put_subbuf; } read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len, - padded_len - len); + padded_len - len, NULL); /* * We write the padded len in local tracefiles but the data len * when using a relay. Display the error but continue processing @@ -264,6 +270,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, ret = kernctl_put_subbuf(stream->wait_fd); if (ret < 0) { ERR("Snapshot kernctl_put_subbuf"); + ret = -errno; goto end_unlock; } consumed_pos += stream->max_sb_size; @@ -292,6 +299,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, error_put_subbuf: ret = kernctl_put_subbuf(stream->wait_fd); if (ret < 0) { + ret = -errno; ERR("Snapshot kernctl_put_subbuf error path"); } end_unlock: @@ -345,7 +353,7 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path, ret = utils_create_stream_file(path, metadata_stream->name, metadata_stream->chan->tracefile_size, metadata_stream->tracefile_count_current, - metadata_stream->uid, metadata_stream->gid); + metadata_stream->uid, metadata_stream->gid, NULL); if (ret < 0) { goto error; } @@ -355,7 +363,7 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path, do { ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx); if (ret_read < 0) { - if (ret_read != -EPERM) { + if (ret_read != -EAGAIN) { ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)", ret_read); goto error; @@ -836,6 +844,61 @@ error_fatal: return -1; } +/* + * Populate index values of a kernel stream. Values are set in big endian order. + * + * Return 0 on success or else a negative value. + */ +static int get_index_values(struct lttng_packet_index *index, int infd) +{ + int ret; + + ret = kernctl_get_timestamp_begin(infd, &index->timestamp_begin); + if (ret < 0) { + PERROR("kernctl_get_timestamp_begin"); + goto error; + } + index->timestamp_begin = htobe64(index->timestamp_begin); + + ret = kernctl_get_timestamp_end(infd, &index->timestamp_end); + if (ret < 0) { + PERROR("kernctl_get_timestamp_end"); + goto error; + } + index->timestamp_end = htobe64(index->timestamp_end); + + ret = kernctl_get_events_discarded(infd, &index->events_discarded); + if (ret < 0) { + PERROR("kernctl_get_events_discarded"); + goto error; + } + index->events_discarded = htobe64(index->events_discarded); + + ret = kernctl_get_content_size(infd, &index->content_size); + if (ret < 0) { + PERROR("kernctl_get_content_size"); + goto error; + } + index->content_size = htobe64(index->content_size); + + ret = kernctl_get_packet_size(infd, &index->packet_size); + if (ret < 0) { + PERROR("kernctl_get_packet_size"); + goto error; + } + index->packet_size = htobe64(index->packet_size); + + ret = kernctl_get_stream_id(infd, &index->stream_id); + if (ret < 0) { + PERROR("kernctl_get_stream_id"); + goto error; + } + index->stream_id = htobe64(index->stream_id); + +error: + return ret; +} + /* * Consume data on a file descriptor and write it on a trace file. */ @@ -843,15 +906,21 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { unsigned long len, subbuf_size, padding; - int err; + int err, write_index = 0; ssize_t ret = 0; int infd = stream->wait_fd; + struct lttng_packet_index index; DBG("In read_subbuffer (infd : %d)", infd); + + /* Indicate that for this stream we have to write the index. */ + if (stream->index_fd >= 0) { + write_index = 1; + } + /* Get the next subbuffer */ err = kernctl_get_next_subbuf(infd); if (err != 0) { - ret = err; /* * This is a debug message even for single-threaded consumer, * because poll() have more relaxed criterions than get subbuf, @@ -860,18 +929,25 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, */ DBG("Reserving sub buffer failed (everything is normal, " "it is due to concurrency)"); + ret = -errno; goto end; } /* Get the full subbuffer size including padding */ err = kernctl_get_padded_subbuf_size(infd, &len); if (err != 0) { - errno = -err; perror("Getting sub-buffer len failed."); - ret = err; + ret = -errno; goto end; } + if (!stream->metadata_flag && write_index) { + ret = get_index_values(&index, infd); + if (ret < 0) { + goto end; + } + } + switch (stream->chan->output) { case CONSUMER_CHANNEL_SPLICE: /* @@ -884,7 +960,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, /* splice the subbuffer to the tracefile */ ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, subbuf_size, - padding); + padding, &index); /* * XXX: Splice does not support network streaming so the return value * is simply checked against subbuf_size and not like the mmap() op. @@ -896,15 +972,15 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, */ ERR("Error splicing to tracefile (ret: %zd != len: %lu)", ret, subbuf_size); + write_index = 0; } break; case CONSUMER_CHANNEL_MMAP: /* Get subbuffer size without padding */ err = kernctl_get_subbuf_size(infd, &subbuf_size); if (err != 0) { - errno = -err; perror("Getting sub-buffer len failed."); - ret = err; + ret = -errno; goto end; } @@ -915,7 +991,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, /* write the subbuffer to the tracefile */ ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, - padding); + padding, &index); /* * The mmap operation should write subbuf_size amount of data when * network streaming or the full padding (len) size when we are _not_ @@ -930,27 +1006,35 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, ERR("Error writing to tracefile " "(ret: %zd != len: %lu != subbuf_size: %lu)", ret, len, subbuf_size); + write_index = 0; } break; default: ERR("Unknown output method"); - ret = -1; + ret = -EPERM; } err = kernctl_put_next_subbuf(infd); if (err != 0) { - errno = -err; if (errno == EFAULT) { perror("Error in unreserving sub buffer\n"); } else if (errno == EIO) { /* Should never happen with newer LTTng versions */ perror("Reader has been pushed by the writer, last sub-buffer corrupted."); } - - ret = -err; + ret = -errno; goto end; } + /* Write index if needed. */ + if (write_index) { + err = index_write(stream->index_fd, &index, sizeof(index)); + if (err < 0) { + ret = -1; + goto end; + } + } + end: return ret; } @@ -968,12 +1052,23 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) { ret = utils_create_stream_file(stream->chan->pathname, stream->name, stream->chan->tracefile_size, stream->tracefile_count_current, - stream->uid, stream->gid); + stream->uid, stream->gid, NULL); if (ret < 0) { goto error; } stream->out_fd = ret; stream->tracefile_size_current = 0; + + if (!stream->metadata_flag) { + ret = index_create_file(stream->chan->pathname, + stream->name, stream->uid, stream->gid, + stream->chan->tracefile_size, + stream->tracefile_count_current); + if (ret < 0) { + goto error; + } + stream->index_fd = ret; + } } if (stream->output == LTTNG_EVENT_MMAP) { @@ -982,8 +1077,8 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len); if (ret != 0) { - errno = -ret; PERROR("kernctl_get_mmap_len"); + ret = -errno; goto error_close_fd; } stream->mmap_len = (size_t) mmap_len;