X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=c794b93f8e9e9574036bd4cef79468be9a853f1a;hp=6a692b9ba5a8bffd7826bd164dfc3f1de9f8c9df;hb=309167d2a6f59d0c8cbf64eb23ba912cdea76a34;hpb=efa116c6f19ecdc344a82709e7d919703ec96c45 diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 6a692b9ba..c794b93f8 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -40,6 +40,7 @@ #include #include #include +#include #include "ust-consumer.h" @@ -825,7 +826,7 @@ static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id, ret = utils_create_stream_file(path, metadata_stream->name, metadata_stream->chan->tracefile_size, metadata_stream->tracefile_count_current, - metadata_stream->uid, metadata_stream->gid); + metadata_stream->uid, metadata_stream->gid, NULL); if (ret < 0) { goto error_stream; } @@ -905,7 +906,7 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, ret = utils_create_stream_file(path, stream->name, stream->chan->tracefile_size, stream->tracefile_count_current, - stream->uid, stream->gid); + stream->uid, stream->gid, NULL); if (ret < 0) { goto error_unlock; } @@ -975,7 +976,7 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, } read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len, - padded_len - len); + padded_len - len, NULL); if (use_relayd) { if (read_len != len) { ret = -EPERM; @@ -1600,14 +1601,72 @@ void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream) ustctl_destroy_stream(stream->ustream); } +/* + * Populate index values of a UST stream. Values are set in big endian order. + * + * Return 0 on success or else a negative value. + */ +static int get_index_values(struct lttng_packet_index *index, + struct ustctl_consumer_stream *ustream) +{ + int ret; + + ret = ustctl_get_timestamp_begin(ustream, &index->timestamp_begin); + if (ret < 0) { + PERROR("ustctl_get_timestamp_begin"); + goto error; + } + index->timestamp_begin = htobe64(index->timestamp_begin); + + ret = ustctl_get_timestamp_end(ustream, &index->timestamp_end); + if (ret < 0) { + PERROR("ustctl_get_timestamp_end"); + goto error; + } + index->timestamp_end = htobe64(index->timestamp_end); + + ret = ustctl_get_events_discarded(ustream, &index->events_discarded); + if (ret < 0) { + PERROR("ustctl_get_events_discarded"); + goto error; + } + index->events_discarded = htobe64(index->events_discarded); + + ret = ustctl_get_content_size(ustream, &index->content_size); + if (ret < 0) { + PERROR("ustctl_get_content_size"); + goto error; + } + index->content_size = htobe64(index->content_size); + + ret = ustctl_get_packet_size(ustream, &index->packet_size); + if (ret < 0) { + PERROR("ustctl_get_packet_size"); + goto error; + } + index->packet_size = htobe64(index->packet_size); + + ret = ustctl_get_stream_id(ustream, &index->stream_id); + if (ret < 0) { + PERROR("ustctl_get_stream_id"); + goto error; + } + index->stream_id = htobe64(index->stream_id); + +error: + return ret; +} + + int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { unsigned long len, subbuf_size, padding; - int err; + int err, write_index = 0; long ret = 0; char dummy; struct ustctl_consumer_stream *ustream; + struct lttng_packet_index index; assert(stream); assert(stream->ustream); @@ -1619,6 +1678,11 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, /* Ease our life for what's next. */ ustream = stream->ustream; + /* Indicate that for this stream we have to write the index. */ + if (stream->index_fd >= 0) { + write_index = 1; + } + /* We can consume the 1 byte written into the wait_fd by UST */ if (stream->monitor && !stream->hangup_flush_done) { ssize_t readlen; @@ -1676,6 +1740,15 @@ retry: goto end; } assert(stream->chan->output == CONSUMER_CHANNEL_MMAP); + + if (!stream->metadata_flag && write_index) { + index.offset = htobe64(stream->out_fd_offset); + ret = get_index_values(&index, ustream); + if (ret < 0) { + goto end; + } + } + /* Get the full padded subbuffer size */ err = ustctl_get_padded_subbuf_size(ustream, &len); assert(err == 0); @@ -1689,7 +1762,7 @@ retry: padding = len - subbuf_size; /* write the subbuffer to the tracefile */ - ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding); + ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding, &index); /* * The mmap operation should write subbuf_size amount of data when network * streaming or the full padding (len) size when we are _not_ streaming. @@ -1707,10 +1780,20 @@ retry: DBG("Error writing to tracefile " "(ret: %ld != len: %lu != subbuf_size: %lu)", ret, len, subbuf_size); + write_index = 0; } err = ustctl_put_next_subbuf(ustream); assert(err == 0); + /* Write index if needed. */ + if (write_index) { + err = index_write(stream->index_fd, &index, sizeof(index)); + if (err < 0) { + ret = -1; + goto end; + } + } + end: return ret; } @@ -1730,12 +1813,23 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream) if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) { ret = utils_create_stream_file(stream->chan->pathname, stream->name, stream->chan->tracefile_size, stream->tracefile_count_current, - stream->uid, stream->gid); + stream->uid, stream->gid, NULL); if (ret < 0) { goto error; } stream->out_fd = ret; stream->tracefile_size_current = 0; + + if (!stream->metadata_flag) { + ret = index_create_file(stream->chan->pathname, + stream->name, stream->uid, stream->gid, + stream->chan->tracefile_size, + stream->tracefile_count_current); + if (ret < 0) { + goto error; + } + stream->index_fd = ret; + } } ret = 0;