X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=liblttng-ustconsumer%2Flttng-ustconsumer.c;h=ed4a27c9da927d47ed52cadf129a6592d17fca48;hp=951ef78812e3661980bc95b7a3b31674969146e5;hb=d5cd1f3415b6d3cafa347b8e0068de2245014f25;hpb=1316184615e422526ef4fae68f980443414969c1 diff --git a/liblttng-ustconsumer/lttng-ustconsumer.c b/liblttng-ustconsumer/lttng-ustconsumer.c index 951ef7881..ed4a27c9d 100644 --- a/liblttng-ustconsumer/lttng-ustconsumer.c +++ b/liblttng-ustconsumer/lttng-ustconsumer.c @@ -29,9 +29,9 @@ #include #include -#include #include #include +#include #include extern struct lttng_consumer_global_data consumer_data; @@ -208,6 +208,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, DBG("consumer_add_stream %s (%d,%d)", msg.u.stream.path_name, fds[0], fds[1]); + assert(msg.u.stream.output == LTTNG_EVENT_MMAP); new_stream = consumer_allocate_stream(msg.u.stream.channel_key, msg.u.stream.stream_key, fds[0], fds[1], @@ -271,9 +272,20 @@ int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan) if (!chan->handle) { return -ENOMEM; } + /* + * The channel fds are passed to ustctl, we only keep a copy. + */ + chan->shm_fd_is_copy = 1; + chan->wait_fd_is_copy = 1; + return 0; } +void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream) +{ + ustctl_flush_buffer(stream->chan->handle, stream->buf, 0); +} + void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan) { ustctl_unmap_channel(chan->handle); @@ -298,6 +310,12 @@ int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream) if (!stream->mmap_base) { return -EINVAL; } + /* + * The stream fds are passed to ustctl, we only keep a copy. + */ + stream->shm_fd_is_copy = 1; + stream->wait_fd_is_copy = 1; + return 0; } @@ -305,3 +323,98 @@ void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream) { ustctl_close_stream_read(stream->chan->handle, stream->buf); } + + +int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx) +{ + unsigned long len; + int err; + long ret = 0; + struct lttng_ust_shm_handle *handle; + struct lttng_ust_lib_ring_buffer *buf; + char dummy; + ssize_t readlen; + + DBG("In read_subbuffer (wait_fd: %d, stream key: %d)", + stream->wait_fd, stream->key); + + /* We can consume the 1 byte written into the wait_fd by UST */ + do { + readlen = read(stream->wait_fd, &dummy, 1); + } while (readlen == -1 && errno == -EINTR); + if (readlen == -1) { + ret = readlen; + goto end; + } + + buf = stream->buf; + handle = stream->chan->handle; + /* Get the next subbuffer */ + err = ustctl_get_next_subbuf(handle, buf); + if (err != 0) { + ret = errno; + /* + * This is a debug message even for single-threaded consumer, + * because poll() have more relaxed criterions than get subbuf, + * so get_subbuf may fail for short race windows where poll() + * would issue wakeups. + */ + DBG("Reserving sub buffer failed (everything is normal, " + "it is due to concurrency)"); + goto end; + } + assert(stream->output == LTTNG_EVENT_MMAP); + /* read the used subbuffer size */ + err = ustctl_get_padded_subbuf_size(handle, buf, &len); + if (err != 0) { + ret = errno; + perror("Getting sub-buffer len failed."); + goto end; + } + /* write the subbuffer to the tracefile */ + ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len); + if (ret < 0) { + /* + * display the error but continue processing to try + * to release the subbuffer + */ + ERR("Error writing to tracefile"); + } + err = ustctl_put_next_subbuf(handle, buf); + if (err != 0) { + ret = errno; + if (errno == EFAULT) { + perror("Error in unreserving sub buffer\n"); + } else if (errno == EIO) { + /* Should never happen with newer LTTng versions */ + perror("Reader has been pushed by the writer, last sub-buffer corrupted."); + } + goto end; + } +end: + return ret; +} + +int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream) +{ + int ret; + + /* Opening the tracefile in write mode */ + if (stream->path_name != NULL) { + ret = open(stream->path_name, + O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO); + if (ret < 0) { + ERR("Opening %s", stream->path_name); + perror("open"); + goto error; + } + stream->out_fd = ret; + } + + /* we return 0 to let the library handle the FD internally */ + return 0; + +error: + return ret; +}