X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=liblttng-ustconsumer%2Flttng-ustconsumer.c;h=8305b061a882fff9021037e1716ff32fb3f36935;hp=3c81c32e7d6ca4db5590ca72fd2d8fd65136f8aa;hb=7fe5a50305da84bb64058c4721ff346c1b14f4ec;hpb=9df8df5ea4a12be72f265c3c0d6911ac4e207bc0 diff --git a/liblttng-ustconsumer/lttng-ustconsumer.c b/liblttng-ustconsumer/lttng-ustconsumer.c index 3c81c32e7..8305b061a 100644 --- a/liblttng-ustconsumer/lttng-ustconsumer.c +++ b/liblttng-ustconsumer/lttng-ustconsumer.c @@ -33,6 +33,7 @@ #include #include #include +#include extern struct lttng_consumer_global_data consumer_data; extern int consumer_poll_timeout; @@ -208,13 +209,16 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, DBG("consumer_add_stream %s (%d,%d)", msg.u.stream.path_name, fds[0], fds[1]); - new_stream = consumer_allocate_stream(msg.u.stream.channel_key, + assert(msg.u.stream.output == LTTNG_EVENT_MMAP); + new_stream = consumer_allocate_stream(msg.u.channel.channel_key, msg.u.stream.stream_key, fds[0], fds[1], msg.u.stream.state, msg.u.stream.mmap_len, msg.u.stream.output, - msg.u.stream.path_name); + msg.u.stream.path_name, + msg.u.stream.uid, + msg.u.stream.gid); if (new_stream == NULL) { lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); goto end; @@ -233,6 +237,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_UPDATE_STREAM: { + return -ENOSYS; +#if 0 if (ctx->on_update_stream != NULL) { ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state); if (ret == 0) { @@ -244,6 +250,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state); } +#endif break; } default: @@ -271,9 +278,18 @@ int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan) if (!chan->handle) { return -ENOMEM; } + chan->wait_fd_is_copy = 1; + chan->shm_fd = -1; + return 0; } +void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream) +{ + ustctl_flush_buffer(stream->chan->handle, stream->buf, 0); + stream->hangup_flush_done = 1; +} + void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan) { ustctl_unmap_channel(chan->handle); @@ -294,10 +310,15 @@ int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream) stream->buf = ustctl_open_stream_read(stream->chan->handle, stream->cpu); if (!stream->buf) return -EBUSY; + /* ustctl_open_stream_read has closed the shm fd. */ + stream->wait_fd_is_copy = 1; + stream->shm_fd = -1; + stream->mmap_base = ustctl_get_mmap_base(stream->chan->handle, stream->buf); if (!stream->mmap_base) { return -EINVAL; } + return 0; } @@ -305,3 +326,89 @@ void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream) { ustctl_close_stream_read(stream->chan->handle, stream->buf); } + + +int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx) +{ + unsigned long len; + int err; + long ret = 0; + struct lttng_ust_shm_handle *handle; + struct lttng_ust_lib_ring_buffer *buf; + char dummy; + ssize_t readlen; + + DBG("In read_subbuffer (wait_fd: %d, stream key: %d)", + stream->wait_fd, stream->key); + + /* We can consume the 1 byte written into the wait_fd by UST */ + if (!stream->hangup_flush_done) { + do { + readlen = read(stream->wait_fd, &dummy, 1); + } while (readlen == -1 && errno == -EINTR); + if (readlen == -1) { + ret = readlen; + goto end; + } + } + + buf = stream->buf; + handle = stream->chan->handle; + /* Get the next subbuffer */ + err = ustctl_get_next_subbuf(handle, buf); + if (err != 0) { + ret = -ret; /* ustctl_get_next_subbuf returns negative, caller expect positive. */ + /* + * This is a debug message even for single-threaded consumer, + * because poll() have more relaxed criterions than get subbuf, + * so get_subbuf may fail for short race windows where poll() + * would issue wakeups. + */ + DBG("Reserving sub buffer failed (everything is normal, " + "it is due to concurrency)"); + goto end; + } + assert(stream->output == LTTNG_EVENT_MMAP); + /* read the used subbuffer size */ + err = ustctl_get_padded_subbuf_size(handle, buf, &len); + assert(err == 0); + /* write the subbuffer to the tracefile */ + ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len); + if (ret < 0) { + /* + * display the error but continue processing to try + * to release the subbuffer + */ + ERR("Error writing to tracefile"); + } + err = ustctl_put_next_subbuf(handle, buf); + assert(err == 0); +end: + return ret; +} + +int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream) +{ + int ret; + + /* Opening the tracefile in write mode */ + if (stream->path_name != NULL) { + ret = open_run_as(stream->path_name, + O_WRONLY|O_CREAT|O_TRUNC, + S_IRWXU|S_IRWXG|S_IRWXO, + stream->uid, stream->gid); + if (ret < 0) { + ERR("Opening %s", stream->path_name); + perror("open"); + goto error; + } + stream->out_fd = ret; + } + + /* we return 0 to let the library handle the FD internally */ + return 0; + +error: + return ret; +}