Add important DEBUG statement
[lttng-tools.git] / liblttng-ustconsumer / lttng-ustconsumer.c
index bf3ae08452f9371afc08393b6ba56c4f71e1c541..ed4a27c9da927d47ed52cadf129a6592d17fca48 100644 (file)
@@ -208,6 +208,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                DBG("consumer_add_stream %s (%d,%d)", msg.u.stream.path_name,
                        fds[0], fds[1]);
+               assert(msg.u.stream.output == LTTNG_EVENT_MMAP);
                new_stream = consumer_allocate_stream(msg.u.stream.channel_key,
                                msg.u.stream.stream_key,
                                fds[0], fds[1],
@@ -272,14 +273,19 @@ int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan)
                return -ENOMEM;
        }
        /*
-        * The channel shm and wait fds are passed to ustctl, set them
-        * to -1 here.
+        * The channel fds are passed to ustctl, we only keep a copy.
         */
-       chan->shm_fd = -1;
-       chan->wait_fd = -1;
+       chan->shm_fd_is_copy = 1;
+       chan->wait_fd_is_copy = 1;
+
        return 0;
 }
 
+void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
+{
+       ustctl_flush_buffer(stream->chan->handle, stream->buf, 0);
+}
+
 void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
 {
        ustctl_unmap_channel(chan->handle);
@@ -305,11 +311,10 @@ int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream)
                return -EINVAL;
        }
        /*
-        * The stream shm and wait fds are passed to ustctl, set them to
-        * -1 here.
+        * The stream fds are passed to ustctl, we only keep a copy.
         */
-       stream->shm_fd = -1;
-       stream->wait_fd = -1;
+       stream->shm_fd_is_copy = 1;
+       stream->wait_fd_is_copy = 1;
 
        return 0;
 }
@@ -318,3 +323,98 @@ void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream)
 {
        ustctl_close_stream_read(stream->chan->handle, stream->buf);
 }
+
+
+int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
+               struct lttng_consumer_local_data *ctx)
+{
+       unsigned long len;
+       int err;
+       long ret = 0;
+       struct lttng_ust_shm_handle *handle;
+       struct lttng_ust_lib_ring_buffer *buf;
+       char dummy;
+       ssize_t readlen;
+
+       DBG("In read_subbuffer (wait_fd: %d, stream key: %d)",
+               stream->wait_fd, stream->key);
+
+       /* We can consume the 1 byte written into the wait_fd by UST */
+       do {
+               readlen = read(stream->wait_fd, &dummy, 1);
+       } while (readlen == -1 && errno == -EINTR);
+       if (readlen == -1) {
+               ret = readlen;
+               goto end;
+       }
+
+       buf = stream->buf;
+       handle = stream->chan->handle;
+       /* Get the next subbuffer */
+       err = ustctl_get_next_subbuf(handle, buf);
+       if (err != 0) {
+               ret = errno;
+               /*
+                * This is a debug message even for single-threaded consumer,
+                * because poll() have more relaxed criterions than get subbuf,
+                * so get_subbuf may fail for short race windows where poll()
+                * would issue wakeups.
+                */
+               DBG("Reserving sub buffer failed (everything is normal, "
+                               "it is due to concurrency)");
+               goto end;
+       }
+       assert(stream->output == LTTNG_EVENT_MMAP);
+       /* read the used subbuffer size */
+       err = ustctl_get_padded_subbuf_size(handle, buf, &len);
+       if (err != 0) {
+               ret = errno;
+               perror("Getting sub-buffer len failed.");
+               goto end;
+       }
+       /* write the subbuffer to the tracefile */
+       ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
+       if (ret < 0) {
+               /*
+                * display the error but continue processing to try
+                * to release the subbuffer
+                */
+               ERR("Error writing to tracefile");
+       }
+       err = ustctl_put_next_subbuf(handle, buf);
+       if (err != 0) {
+               ret = errno;
+               if (errno == EFAULT) {
+                       perror("Error in unreserving sub buffer\n");
+               } else if (errno == EIO) {
+                       /* Should never happen with newer LTTng versions */
+                       perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
+               }
+               goto end;
+       }
+end:
+       return ret;
+}
+
+int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
+{
+       int ret;
+
+       /* Opening the tracefile in write mode */
+       if (stream->path_name != NULL) {
+               ret = open(stream->path_name,
+                               O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
+               if (ret < 0) {
+                       ERR("Opening %s", stream->path_name);
+                       perror("open");
+                       goto error;
+               }
+               stream->out_fd = ret;
+       }
+
+       /* we return 0 to let the library handle the FD internally */
+       return 0;
+
+error:
+       return ret;
+}
This page took 0.023954 seconds and 4 git commands to generate.