+
+
+int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx)
+{
+ unsigned long len;
+ int err;
+ long ret = 0;
+ struct lttng_ust_shm_handle *handle;
+ struct lttng_ust_lib_ring_buffer *buf;
+ char dummy;
+ ssize_t readlen;
+
+ DBG("In read_subbuffer (wait_fd: %d, stream key: %d)",
+ stream->wait_fd, stream->key);
+
+ /* We can consume the 1 byte written into the wait_fd by UST */
+ if (!stream->hangup_flush_done) {
+ do {
+ readlen = read(stream->wait_fd, &dummy, 1);
+ } while (readlen == -1 && errno == -EINTR);
+ if (readlen == -1) {
+ ret = readlen;
+ goto end;
+ }
+ }
+
+ buf = stream->buf;
+ handle = stream->chan->handle;
+ /* Get the next subbuffer */
+ err = ustctl_get_next_subbuf(handle, buf);
+ if (err != 0) {
+ ret = -ret; /* ustctl_get_next_subbuf returns negative, caller expect positive. */
+ /*
+ * This is a debug message even for single-threaded consumer,
+ * because poll() have more relaxed criterions than get subbuf,
+ * so get_subbuf may fail for short race windows where poll()
+ * would issue wakeups.
+ */
+ DBG("Reserving sub buffer failed (everything is normal, "
+ "it is due to concurrency)");
+ goto end;
+ }
+ assert(stream->output == LTTNG_EVENT_MMAP);
+ /* read the used subbuffer size */
+ err = ustctl_get_padded_subbuf_size(handle, buf, &len);
+ assert(err == 0);
+ /* write the subbuffer to the tracefile */
+ ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
+ if (ret < 0) {
+ /*
+ * display the error but continue processing to try
+ * to release the subbuffer
+ */
+ ERR("Error writing to tracefile");
+ }
+ err = ustctl_put_next_subbuf(handle, buf);
+ assert(err == 0);
+end:
+ return ret;
+}
+
+int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ /* Opening the tracefile in write mode */
+ if (stream->path_name != NULL) {
+ ret = run_as_open(stream->path_name,
+ O_WRONLY|O_CREAT|O_TRUNC,
+ S_IRWXU|S_IRWXG|S_IRWXO,
+ stream->uid, stream->gid);
+ if (ret < 0) {
+ ERR("Opening %s", stream->path_name);
+ perror("open");
+ goto error;
+ }
+ stream->out_fd = ret;
+ }
+
+ /* we return 0 to let the library handle the FD internally */
+ return 0;
+
+error:
+ return ret;
+}