projects
/
lttng-tools.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Update version to 2.0-pre17
[lttng-tools.git]
/
liblttng-ustconsumer
/
lttng-ustconsumer.c
diff --git
a/liblttng-ustconsumer/lttng-ustconsumer.c
b/liblttng-ustconsumer/lttng-ustconsumer.c
index 29f735249417709191f344974decbb1f3676fdb3..10213c1c97dcb8b4d93a80895a0e56d7ed9297db 100644
(file)
--- a/
liblttng-ustconsumer/lttng-ustconsumer.c
+++ b/
liblttng-ustconsumer/lttng-ustconsumer.c
@@
-34,6
+34,8
@@
#include <lttng/ust-ctl.h>
#include <lttngerr.h>
#include <lttng/ust-ctl.h>
#include <lttngerr.h>
+#include "common/runas.h"
+
extern struct lttng_consumer_global_data consumer_data;
extern int consumer_poll_timeout;
extern volatile int consumer_quit;
extern struct lttng_consumer_global_data consumer_data;
extern int consumer_poll_timeout;
extern volatile int consumer_quit;
@@
-209,13
+211,15
@@
int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
DBG("consumer_add_stream %s (%d,%d)", msg.u.stream.path_name,
fds[0], fds[1]);
assert(msg.u.stream.output == LTTNG_EVENT_MMAP);
DBG("consumer_add_stream %s (%d,%d)", msg.u.stream.path_name,
fds[0], fds[1]);
assert(msg.u.stream.output == LTTNG_EVENT_MMAP);
- new_stream = consumer_allocate_stream(msg.u.
stream
.channel_key,
+ new_stream = consumer_allocate_stream(msg.u.
channel
.channel_key,
msg.u.stream.stream_key,
fds[0], fds[1],
msg.u.stream.state,
msg.u.stream.mmap_len,
msg.u.stream.output,
msg.u.stream.stream_key,
fds[0], fds[1],
msg.u.stream.state,
msg.u.stream.mmap_len,
msg.u.stream.output,
- msg.u.stream.path_name);
+ msg.u.stream.path_name,
+ msg.u.stream.uid,
+ msg.u.stream.gid);
if (new_stream == NULL) {
lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
goto end;
if (new_stream == NULL) {
lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
goto end;
@@
-234,6
+238,8
@@
int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
}
case LTTNG_CONSUMER_UPDATE_STREAM:
{
}
case LTTNG_CONSUMER_UPDATE_STREAM:
{
+ return -ENOSYS;
+#if 0
if (ctx->on_update_stream != NULL) {
ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state);
if (ret == 0) {
if (ctx->on_update_stream != NULL) {
ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state);
if (ret == 0) {
@@
-245,6
+251,7
@@
int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
consumer_change_stream_state(msg.u.stream.stream_key,
msg.u.stream.state);
}
consumer_change_stream_state(msg.u.stream.stream_key,
msg.u.stream.state);
}
+#endif
break;
}
default:
break;
}
default:
@@
-272,15
+279,18
@@
int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan)
if (!chan->handle) {
return -ENOMEM;
}
if (!chan->handle) {
return -ENOMEM;
}
- /*
- * The channel fds are passed to ustctl, we only keep a copy.
- */
- chan->shm_fd_is_copy = 1;
chan->wait_fd_is_copy = 1;
chan->wait_fd_is_copy = 1;
+ chan->shm_fd = -1;
return 0;
}
return 0;
}
+void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
+{
+ ustctl_flush_buffer(stream->chan->handle, stream->buf, 0);
+ stream->hangup_flush_done = 1;
+}
+
void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
{
ustctl_unmap_channel(chan->handle);
void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
{
ustctl_unmap_channel(chan->handle);
@@
-301,15
+311,14
@@
int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream)
stream->buf = ustctl_open_stream_read(stream->chan->handle, stream->cpu);
if (!stream->buf)
return -EBUSY;
stream->buf = ustctl_open_stream_read(stream->chan->handle, stream->cpu);
if (!stream->buf)
return -EBUSY;
+ /* ustctl_open_stream_read has closed the shm fd. */
+ stream->wait_fd_is_copy = 1;
+ stream->shm_fd = -1;
+
stream->mmap_base = ustctl_get_mmap_base(stream->chan->handle, stream->buf);
if (!stream->mmap_base) {
return -EINVAL;
}
stream->mmap_base = ustctl_get_mmap_base(stream->chan->handle, stream->buf);
if (!stream->mmap_base) {
return -EINVAL;
}
- /*
- * The stream fds are passed to ustctl, we only keep a copy.
- */
- stream->shm_fd_is_copy = 1;
- stream->wait_fd_is_copy = 1;
return 0;
}
return 0;
}
@@
-335,12
+344,14
@@
int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
stream->wait_fd, stream->key);
/* We can consume the 1 byte written into the wait_fd by UST */
stream->wait_fd, stream->key);
/* We can consume the 1 byte written into the wait_fd by UST */
- do {
- readlen = read(stream->wait_fd, &dummy, 1);
- } while (readlen == -1 && errno == -EINTR);
- if (readlen == -1) {
- ret = readlen;
- goto end;
+ if (!stream->hangup_flush_done) {
+ do {
+ readlen = read(stream->wait_fd, &dummy, 1);
+ } while (readlen == -1 && errno == -EINTR);
+ if (readlen == -1) {
+ ret = readlen;
+ goto end;
+ }
}
buf = stream->buf;
}
buf = stream->buf;
@@
-348,7
+359,7
@@
int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
/* Get the next subbuffer */
err = ustctl_get_next_subbuf(handle, buf);
if (err != 0) {
/* Get the next subbuffer */
err = ustctl_get_next_subbuf(handle, buf);
if (err != 0) {
- ret =
errno;
+ ret =
-ret; /* ustctl_get_next_subbuf returns negative, caller expect positive. */
/*
* This is a debug message even for single-threaded consumer,
* because poll() have more relaxed criterions than get subbuf,
/*
* This is a debug message even for single-threaded consumer,
* because poll() have more relaxed criterions than get subbuf,
@@
-362,11
+373,7
@@
int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
assert(stream->output == LTTNG_EVENT_MMAP);
/* read the used subbuffer size */
err = ustctl_get_padded_subbuf_size(handle, buf, &len);
assert(stream->output == LTTNG_EVENT_MMAP);
/* read the used subbuffer size */
err = ustctl_get_padded_subbuf_size(handle, buf, &len);
- if (err != 0) {
- ret = errno;
- perror("Getting sub-buffer len failed.");
- goto end;
- }
+ assert(err == 0);
/* write the subbuffer to the tracefile */
ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
if (ret < 0) {
/* write the subbuffer to the tracefile */
ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
if (ret < 0) {
@@
-377,16
+384,7
@@
int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
ERR("Error writing to tracefile");
}
err = ustctl_put_next_subbuf(handle, buf);
ERR("Error writing to tracefile");
}
err = ustctl_put_next_subbuf(handle, buf);
- if (err != 0) {
- ret = errno;
- if (errno == EFAULT) {
- perror("Error in unreserving sub buffer\n");
- } else if (errno == EIO) {
- /* Should never happen with newer LTTng versions */
- perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
- }
- goto end;
- }
+ assert(err == 0);
end:
return ret;
}
end:
return ret;
}
@@
-397,8
+395,10
@@
int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
/* Opening the tracefile in write mode */
if (stream->path_name != NULL) {
/* Opening the tracefile in write mode */
if (stream->path_name != NULL) {
- ret = open(stream->path_name,
- O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
+ ret = run_as_open(stream->path_name,
+ O_WRONLY|O_CREAT|O_TRUNC,
+ S_IRWXU|S_IRWXG|S_IRWXO,
+ stream->uid, stream->gid);
if (ret < 0) {
ERR("Opening %s", stream->path_name);
perror("open");
if (ret < 0) {
ERR("Opening %s", stream->path_name);
perror("open");
This page took
0.03707 seconds
and
4
git commands to generate.