X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=liblttng-ustconsumer%2Flttng-ustconsumer.c;h=8305b061a882fff9021037e1716ff32fb3f36935;hp=ed4a27c9da927d47ed52cadf129a6592d17fca48;hb=fc34caaa25f9780eb8509f243f910c3f2aaa5a69;hpb=276b26d1bad8b6ede22c70f786f52e467e658bf6 diff --git a/liblttng-ustconsumer/lttng-ustconsumer.c b/liblttng-ustconsumer/lttng-ustconsumer.c index ed4a27c9d..8305b061a 100644 --- a/liblttng-ustconsumer/lttng-ustconsumer.c +++ b/liblttng-ustconsumer/lttng-ustconsumer.c @@ -33,6 +33,7 @@ #include #include #include +#include extern struct lttng_consumer_global_data consumer_data; extern int consumer_poll_timeout; @@ -209,13 +210,15 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, DBG("consumer_add_stream %s (%d,%d)", msg.u.stream.path_name, fds[0], fds[1]); assert(msg.u.stream.output == LTTNG_EVENT_MMAP); - new_stream = consumer_allocate_stream(msg.u.stream.channel_key, + new_stream = consumer_allocate_stream(msg.u.channel.channel_key, msg.u.stream.stream_key, fds[0], fds[1], msg.u.stream.state, msg.u.stream.mmap_len, msg.u.stream.output, - msg.u.stream.path_name); + msg.u.stream.path_name, + msg.u.stream.uid, + msg.u.stream.gid); if (new_stream == NULL) { lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); goto end; @@ -234,6 +237,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_UPDATE_STREAM: { + return -ENOSYS; +#if 0 if (ctx->on_update_stream != NULL) { ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state); if (ret == 0) { @@ -245,6 +250,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state); } +#endif break; } default: @@ -272,11 +278,8 @@ int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan) if (!chan->handle) { return -ENOMEM; } - /* - * The channel fds are passed to ustctl, we only keep a copy. - */ - chan->shm_fd_is_copy = 1; chan->wait_fd_is_copy = 1; + chan->shm_fd = -1; return 0; } @@ -284,6 +287,7 @@ int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan) void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream) { ustctl_flush_buffer(stream->chan->handle, stream->buf, 0); + stream->hangup_flush_done = 1; } void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan) @@ -306,15 +310,14 @@ int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream) stream->buf = ustctl_open_stream_read(stream->chan->handle, stream->cpu); if (!stream->buf) return -EBUSY; + /* ustctl_open_stream_read has closed the shm fd. */ + stream->wait_fd_is_copy = 1; + stream->shm_fd = -1; + stream->mmap_base = ustctl_get_mmap_base(stream->chan->handle, stream->buf); if (!stream->mmap_base) { return -EINVAL; } - /* - * The stream fds are passed to ustctl, we only keep a copy. - */ - stream->shm_fd_is_copy = 1; - stream->wait_fd_is_copy = 1; return 0; } @@ -340,12 +343,14 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, stream->wait_fd, stream->key); /* We can consume the 1 byte written into the wait_fd by UST */ - do { - readlen = read(stream->wait_fd, &dummy, 1); - } while (readlen == -1 && errno == -EINTR); - if (readlen == -1) { - ret = readlen; - goto end; + if (!stream->hangup_flush_done) { + do { + readlen = read(stream->wait_fd, &dummy, 1); + } while (readlen == -1 && errno == -EINTR); + if (readlen == -1) { + ret = readlen; + goto end; + } } buf = stream->buf; @@ -353,7 +358,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, /* Get the next subbuffer */ err = ustctl_get_next_subbuf(handle, buf); if (err != 0) { - ret = errno; + ret = -ret; /* ustctl_get_next_subbuf returns negative, caller expect positive. */ /* * This is a debug message even for single-threaded consumer, * because poll() have more relaxed criterions than get subbuf, @@ -367,11 +372,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, assert(stream->output == LTTNG_EVENT_MMAP); /* read the used subbuffer size */ err = ustctl_get_padded_subbuf_size(handle, buf, &len); - if (err != 0) { - ret = errno; - perror("Getting sub-buffer len failed."); - goto end; - } + assert(err == 0); /* write the subbuffer to the tracefile */ ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len); if (ret < 0) { @@ -382,16 +383,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, ERR("Error writing to tracefile"); } err = ustctl_put_next_subbuf(handle, buf); - if (err != 0) { - ret = errno; - if (errno == EFAULT) { - perror("Error in unreserving sub buffer\n"); - } else if (errno == EIO) { - /* Should never happen with newer LTTng versions */ - perror("Reader has been pushed by the writer, last sub-buffer corrupted."); - } - goto end; - } + assert(err == 0); end: return ret; } @@ -402,8 +394,10 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream) /* Opening the tracefile in write mode */ if (stream->path_name != NULL) { - ret = open(stream->path_name, - O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO); + ret = open_run_as(stream->path_name, + O_WRONLY|O_CREAT|O_TRUNC, + S_IRWXU|S_IRWXG|S_IRWXO, + stream->uid, stream->gid); if (ret < 0) { ERR("Opening %s", stream->path_name); perror("open");