X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=liblttng-ustconsumer%2Flttng-ustconsumer.c;h=8305b061a882fff9021037e1716ff32fb3f36935;hp=806ebd3c03977f340245ed41e98047a7255461fc;hb=7fe5a50305da84bb64058c4721ff346c1b14f4ec;hpb=d056b47720cf547dd8c4ca59076ffcd215d58f2c diff --git a/liblttng-ustconsumer/lttng-ustconsumer.c b/liblttng-ustconsumer/lttng-ustconsumer.c index 806ebd3c0..8305b061a 100644 --- a/liblttng-ustconsumer/lttng-ustconsumer.c +++ b/liblttng-ustconsumer/lttng-ustconsumer.c @@ -33,6 +33,7 @@ #include #include #include +#include extern struct lttng_consumer_global_data consumer_data; extern int consumer_poll_timeout; @@ -175,7 +176,6 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.channel.mmap_len, msg.u.channel.max_sb_size); if (new_channel == NULL) { - fprintf(stderr, "AAAAA\n"); lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); goto end_nosignal; } @@ -210,15 +210,16 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, DBG("consumer_add_stream %s (%d,%d)", msg.u.stream.path_name, fds[0], fds[1]); assert(msg.u.stream.output == LTTNG_EVENT_MMAP); - new_stream = consumer_allocate_stream(msg.u.stream.channel_key, + new_stream = consumer_allocate_stream(msg.u.channel.channel_key, msg.u.stream.stream_key, fds[0], fds[1], msg.u.stream.state, msg.u.stream.mmap_len, msg.u.stream.output, - msg.u.stream.path_name); + msg.u.stream.path_name, + msg.u.stream.uid, + msg.u.stream.gid); if (new_stream == NULL) { - fprintf(stderr, "BBBBBB\n"); lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); goto end; } @@ -236,6 +237,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_UPDATE_STREAM: { + return -ENOSYS; +#if 0 if (ctx->on_update_stream != NULL) { ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state); if (ret == 0) { @@ -247,6 +250,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state); } +#endif break; } default: @@ -274,11 +278,8 @@ int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan) if (!chan->handle) { return -ENOMEM; } - /* - * The channel fds are passed to ustctl, we only keep a copy. - */ - chan->shm_fd_is_copy = 1; chan->wait_fd_is_copy = 1; + chan->shm_fd = -1; return 0; } @@ -286,6 +287,7 @@ int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan) void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream) { ustctl_flush_buffer(stream->chan->handle, stream->buf, 0); + stream->hangup_flush_done = 1; } void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan) @@ -308,15 +310,14 @@ int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream) stream->buf = ustctl_open_stream_read(stream->chan->handle, stream->cpu); if (!stream->buf) return -EBUSY; + /* ustctl_open_stream_read has closed the shm fd. */ + stream->wait_fd_is_copy = 1; + stream->shm_fd = -1; + stream->mmap_base = ustctl_get_mmap_base(stream->chan->handle, stream->buf); if (!stream->mmap_base) { return -EINVAL; } - /* - * The stream fds are passed to ustctl, we only keep a copy. - */ - stream->shm_fd_is_copy = 1; - stream->wait_fd_is_copy = 1; return 0; } @@ -342,12 +343,14 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, stream->wait_fd, stream->key); /* We can consume the 1 byte written into the wait_fd by UST */ - do { - readlen = read(stream->wait_fd, &dummy, 1); - } while (readlen == -1 && errno == -EINTR); - if (readlen == -1) { - ret = readlen; - goto end; + if (!stream->hangup_flush_done) { + do { + readlen = read(stream->wait_fd, &dummy, 1); + } while (readlen == -1 && errno == -EINTR); + if (readlen == -1) { + ret = readlen; + goto end; + } } buf = stream->buf; @@ -355,7 +358,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, /* Get the next subbuffer */ err = ustctl_get_next_subbuf(handle, buf); if (err != 0) { - ret = errno; + ret = -ret; /* ustctl_get_next_subbuf returns negative, caller expect positive. */ /* * This is a debug message even for single-threaded consumer, * because poll() have more relaxed criterions than get subbuf, @@ -369,11 +372,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, assert(stream->output == LTTNG_EVENT_MMAP); /* read the used subbuffer size */ err = ustctl_get_padded_subbuf_size(handle, buf, &len); - if (err != 0) { - ret = errno; - perror("Getting sub-buffer len failed."); - goto end; - } + assert(err == 0); /* write the subbuffer to the tracefile */ ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len); if (ret < 0) { @@ -384,16 +383,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, ERR("Error writing to tracefile"); } err = ustctl_put_next_subbuf(handle, buf); - if (err != 0) { - ret = errno; - if (errno == EFAULT) { - perror("Error in unreserving sub buffer\n"); - } else if (errno == EIO) { - /* Should never happen with newer LTTng versions */ - perror("Reader has been pushed by the writer, last sub-buffer corrupted."); - } - goto end; - } + assert(err == 0); end: return ret; } @@ -404,8 +394,10 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream) /* Opening the tracefile in write mode */ if (stream->path_name != NULL) { - ret = open(stream->path_name, - O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO); + ret = open_run_as(stream->path_name, + O_WRONLY|O_CREAT|O_TRUNC, + S_IRWXU|S_IRWXG|S_IRWXO, + stream->uid, stream->gid); if (ret < 0) { ERR("Opening %s", stream->path_name); perror("open");