Add UST test binaries to gitignore
[lttng-tools.git] / liblttng-ustconsumer / lttng-ustconsumer.c
index 1e0bf55ec70727578bbff92fe921ba8af74b55ca..8305b061a882fff9021037e1716ff32fb3f36935 100644 (file)
 #include <sys/types.h>
 #include <unistd.h>
 
-#include <ust/lttng-ust-ctl.h>
 #include <lttng-sessiond-comm.h>
 #include <lttng/lttng-ustconsumer.h>
+#include <lttng/ust-ctl.h>
 #include <lttngerr.h>
+#include <runas.h>
 
 extern struct lttng_consumer_global_data consumer_data;
 extern int consumer_poll_timeout;
@@ -208,13 +209,16 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                DBG("consumer_add_stream %s (%d,%d)", msg.u.stream.path_name,
                        fds[0], fds[1]);
-               new_stream = consumer_allocate_stream(msg.u.stream.channel_key,
+               assert(msg.u.stream.output == LTTNG_EVENT_MMAP);
+               new_stream = consumer_allocate_stream(msg.u.channel.channel_key,
                                msg.u.stream.stream_key,
                                fds[0], fds[1],
                                msg.u.stream.state,
                                msg.u.stream.mmap_len,
                                msg.u.stream.output,
-                               msg.u.stream.path_name);
+                               msg.u.stream.path_name,
+                               msg.u.stream.uid,
+                               msg.u.stream.gid);
                if (new_stream == NULL) {
                        lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
                        goto end;
@@ -233,6 +237,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        }
        case LTTNG_CONSUMER_UPDATE_STREAM:
        {
+               return -ENOSYS;
+#if 0
                if (ctx->on_update_stream != NULL) {
                        ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state);
                        if (ret == 0) {
@@ -244,6 +250,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        consumer_change_stream_state(msg.u.stream.stream_key,
                                msg.u.stream.state);
                }
+#endif
                break;
        }
        default:
@@ -261,7 +268,7 @@ end_nosignal:
 
 int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan)
 {
-       struct object_data obj;
+       struct lttng_ust_object_data obj;
 
        obj.handle = -1;
        obj.shm_fd = chan->shm_fd;
@@ -271,9 +278,18 @@ int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan)
        if (!chan->handle) {
                return -ENOMEM;
        }
+       chan->wait_fd_is_copy = 1;
+       chan->shm_fd = -1;
+
        return 0;
 }
 
+void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
+{
+       ustctl_flush_buffer(stream->chan->handle, stream->buf, 0);
+       stream->hangup_flush_done = 1;
+}
+
 void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
 {
        ustctl_unmap_channel(chan->handle);
@@ -281,7 +297,7 @@ void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
 
 int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream)
 {
-       struct object_data obj;
+       struct lttng_ust_object_data obj;
        int ret;
 
        obj.handle = -1;
@@ -294,10 +310,15 @@ int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream)
        stream->buf = ustctl_open_stream_read(stream->chan->handle, stream->cpu);
        if (!stream->buf)
                return -EBUSY;
+       /* ustctl_open_stream_read has closed the shm fd. */
+       stream->wait_fd_is_copy = 1;
+       stream->shm_fd = -1;
+
        stream->mmap_base = ustctl_get_mmap_base(stream->chan->handle, stream->buf);
        if (!stream->mmap_base) {
                return -EINVAL;
        }
+
        return 0;
 }
 
@@ -305,3 +326,89 @@ void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream)
 {
        ustctl_close_stream_read(stream->chan->handle, stream->buf);
 }
+
+
+int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
+               struct lttng_consumer_local_data *ctx)
+{
+       unsigned long len;
+       int err;
+       long ret = 0;
+       struct lttng_ust_shm_handle *handle;
+       struct lttng_ust_lib_ring_buffer *buf;
+       char dummy;
+       ssize_t readlen;
+
+       DBG("In read_subbuffer (wait_fd: %d, stream key: %d)",
+               stream->wait_fd, stream->key);
+
+       /* We can consume the 1 byte written into the wait_fd by UST */
+       if (!stream->hangup_flush_done) {
+               do {
+                       readlen = read(stream->wait_fd, &dummy, 1);
+               } while (readlen == -1 && errno == -EINTR);
+               if (readlen == -1) {
+                       ret = readlen;
+                       goto end;
+               }
+       }
+
+       buf = stream->buf;
+       handle = stream->chan->handle;
+       /* Get the next subbuffer */
+       err = ustctl_get_next_subbuf(handle, buf);
+       if (err != 0) {
+               ret = -ret;     /* ustctl_get_next_subbuf returns negative, caller expect positive. */
+               /*
+                * This is a debug message even for single-threaded consumer,
+                * because poll() have more relaxed criterions than get subbuf,
+                * so get_subbuf may fail for short race windows where poll()
+                * would issue wakeups.
+                */
+               DBG("Reserving sub buffer failed (everything is normal, "
+                               "it is due to concurrency)");
+               goto end;
+       }
+       assert(stream->output == LTTNG_EVENT_MMAP);
+       /* read the used subbuffer size */
+       err = ustctl_get_padded_subbuf_size(handle, buf, &len);
+       assert(err == 0);
+       /* write the subbuffer to the tracefile */
+       ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
+       if (ret < 0) {
+               /*
+                * display the error but continue processing to try
+                * to release the subbuffer
+                */
+               ERR("Error writing to tracefile");
+       }
+       err = ustctl_put_next_subbuf(handle, buf);
+       assert(err == 0);
+end:
+       return ret;
+}
+
+int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
+{
+       int ret;
+
+       /* Opening the tracefile in write mode */
+       if (stream->path_name != NULL) {
+               ret = open_run_as(stream->path_name,
+                               O_WRONLY|O_CREAT|O_TRUNC,
+                               S_IRWXU|S_IRWXG|S_IRWXO,
+                               stream->uid, stream->gid);
+               if (ret < 0) {
+                       ERR("Opening %s", stream->path_name);
+                       perror("open");
+                       goto error;
+               }
+               stream->out_fd = ret;
+       }
+
+       /* we return 0 to let the library handle the FD internally */
+       return 0;
+
+error:
+       return ret;
+}
This page took 0.026059 seconds and 4 git commands to generate.