+ /* Handle stream on the relayd if the output is on the network */
+ if (relayd) {
+ unsigned long netlen = len;
+
+ /*
+ * Lock the control socket for the complete duration of the function
+ * since from this point on we will use the socket.
+ */
+ if (stream->metadata_flag) {
+ /* Metadata requires the control socket. */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ netlen += sizeof(struct lttcomm_relayd_metadata_payload);
+ }
+
+ ret = write_relayd_stream_header(stream, netlen, padding, relayd);
+ if (ret >= 0) {
+ /* Use the returned socket. */
+ outfd = ret;
+
+ /* Write metadata stream id before payload */
+ if (stream->metadata_flag) {
+ ret = write_relayd_metadata_id(outfd, stream, relayd, padding);
+ if (ret < 0) {
+ written = ret;
+ /* Socket operation failed. We consider the relayd dead */
+ if (ret == -EPIPE || ret == -EINVAL) {
+ relayd_hang_up = 1;
+ goto write_error;
+ }
+ goto end;
+ }
+ }
+ } else {
+ /* Socket operation failed. We consider the relayd dead */
+ if (ret == -EPIPE || ret == -EINVAL) {
+ relayd_hang_up = 1;
+ goto write_error;
+ }
+ /* Else, use the default set before which is the filesystem. */
+ }
+ } else {
+ /* No streaming, we have to set the len with the full padding */
+ len += padding;
+
+ /*
+ * Check if we need to change the tracefile before writing the packet.
+ */
+ if (stream->chan->tracefile_size > 0 &&
+ (stream->tracefile_size_current + len) >
+ stream->chan->tracefile_size) {
+ ret = utils_rotate_stream_file(stream->chan->pathname,
+ stream->name, stream->chan->tracefile_size,
+ stream->chan->tracefile_count, stream->uid, stream->gid,
+ stream->out_fd, &(stream->tracefile_count_current),
+ &stream->out_fd);
+ if (ret < 0) {
+ ERR("Rotating output file");
+ goto end;
+ }
+ outfd = stream->out_fd;
+
+ if (stream->index_fd >= 0) {
+ ret = index_create_file(stream->chan->pathname,
+ stream->name, stream->uid, stream->gid,
+ stream->chan->tracefile_size,
+ stream->tracefile_count_current);
+ if (ret < 0) {
+ goto end;
+ }
+ stream->index_fd = ret;
+ }
+
+ /* Reset current size because we just perform a rotation. */
+ stream->tracefile_size_current = 0;
+ stream->out_fd_offset = 0;
+ orig_offset = 0;
+ }
+ stream->tracefile_size_current += len;
+ if (index) {
+ index->offset = htobe64(stream->out_fd_offset);
+ }
+ }
+
+ while (len > 0) {
+ do {
+ ret = write(outfd, mmap_base + mmap_offset, len);
+ } while (ret < 0 && errno == EINTR);
+ DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
+ if (ret < 0) {
+ /*
+ * This is possible if the fd is closed on the other side (outfd)
+ * or any write problem. It can be verbose a bit for a normal
+ * execution if for instance the relayd is stopped abruptly. This
+ * can happen so set this to a DBG statement.
+ */
+ DBG("Error in file write mmap");
+ if (written == 0) {
+ written = -errno;
+ }
+ /* Socket operation failed. We consider the relayd dead */
+ if (errno == EPIPE || errno == EINVAL) {
+ relayd_hang_up = 1;
+ goto write_error;
+ }
+ goto end;
+ } else if (ret > len) {
+ PERROR("Error in file write (ret %zd > len %lu)", ret, len);
+ written += ret;
+ goto end;
+ } else {
+ len -= ret;
+ mmap_offset += ret;
+ }
+
+ /* This call is useless on a socket so better save a syscall. */
+ if (!relayd) {
+ /* This won't block, but will start writeout asynchronously */
+ lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
+ SYNC_FILE_RANGE_WRITE);
+ stream->out_fd_offset += ret;
+ }
+ stream->output_written += ret;
+ written += ret;
+ }
+ lttng_consumer_sync_trace_file(stream, orig_offset);
+
+write_error:
+ /*
+ * This is a special case that the relayd has closed its socket. Let's
+ * cleanup the relayd object and all associated streams.
+ */
+ if (relayd && relayd_hang_up) {
+ cleanup_relayd(relayd, ctx);
+ }
+
+end:
+ /* Unlock only if ctrl socket used */
+ if (relayd && stream->metadata_flag) {
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ }
+
+ rcu_read_unlock();
+ return written;
+}
+
+/*
+ * Splice the data from the ring buffer to the tracefile.
+ *
+ * It must be called with the stream lock held.
+ *
+ * Returns the number of bytes spliced.
+ */
+ssize_t lttng_consumer_on_read_subbuffer_splice(
+ struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_stream *stream, unsigned long len,
+ unsigned long padding,
+ struct lttng_packet_index *index)
+{
+ ssize_t ret = 0, written = 0, ret_splice = 0;
+ loff_t offset = 0;
+ off_t orig_offset = stream->out_fd_offset;
+ int fd = stream->wait_fd;
+ /* Default is on the disk */
+ int outfd = stream->out_fd;
+ struct consumer_relayd_sock_pair *relayd = NULL;
+ int *splice_pipe;
+ unsigned int relayd_hang_up = 0;
+
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ /* Not supported for user space tracing */
+ return -ENOSYS;
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ }
+
+ /* RCU lock for the relayd pointer */
+ rcu_read_lock();
+
+ /* Flag that the current stream if set for network streaming. */
+ if (stream->net_seq_idx != (uint64_t) -1ULL) {
+ relayd = consumer_find_relayd(stream->net_seq_idx);
+ if (relayd == NULL) {
+ ret = -EPIPE;
+ goto end;
+ }
+ }
+
+ /*
+ * Choose right pipe for splice. Metadata and trace data are handled by
+ * different threads hence the use of two pipes in order not to race or
+ * corrupt the written data.
+ */
+ if (stream->metadata_flag) {
+ splice_pipe = ctx->consumer_splice_metadata_pipe;
+ } else {
+ splice_pipe = ctx->consumer_thread_pipe;
+ }
+
+ /* Write metadata stream id before payload */
+ if (relayd) {
+ int total_len = len;
+
+ if (stream->metadata_flag) {
+ /*
+ * Lock the control socket for the complete duration of the function
+ * since from this point on we will use the socket.
+ */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+
+ ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd,
+ padding);
+ if (ret < 0) {
+ written = ret;
+ /* Socket operation failed. We consider the relayd dead */
+ if (ret == -EBADF) {
+ WARN("Remote relayd disconnected. Stopping");
+ relayd_hang_up = 1;
+ goto write_error;
+ }
+ goto end;
+ }
+
+ total_len += sizeof(struct lttcomm_relayd_metadata_payload);
+ }
+
+ ret = write_relayd_stream_header(stream, total_len, padding, relayd);
+ if (ret >= 0) {
+ /* Use the returned socket. */
+ outfd = ret;
+ } else {
+ /* Socket operation failed. We consider the relayd dead */
+ if (ret == -EBADF) {
+ WARN("Remote relayd disconnected. Stopping");
+ relayd_hang_up = 1;
+ goto write_error;
+ }
+ goto end;
+ }
+ } else {
+ /* No streaming, we have to set the len with the full padding */
+ len += padding;
+
+ /*
+ * Check if we need to change the tracefile before writing the packet.
+ */
+ if (stream->chan->tracefile_size > 0 &&
+ (stream->tracefile_size_current + len) >
+ stream->chan->tracefile_size) {
+ ret = utils_rotate_stream_file(stream->chan->pathname,
+ stream->name, stream->chan->tracefile_size,
+ stream->chan->tracefile_count, stream->uid, stream->gid,
+ stream->out_fd, &(stream->tracefile_count_current),
+ &stream->out_fd);
+ if (ret < 0) {
+ ERR("Rotating output file");
+ goto end;
+ }
+ outfd = stream->out_fd;
+
+ if (stream->index_fd >= 0) {
+ ret = index_create_file(stream->chan->pathname,
+ stream->name, stream->uid, stream->gid,
+ stream->chan->tracefile_size,
+ stream->tracefile_count_current);
+ if (ret < 0) {
+ goto end;
+ }
+ stream->index_fd = ret;
+ }
+
+ /* Reset current size because we just perform a rotation. */
+ stream->tracefile_size_current = 0;
+ stream->out_fd_offset = 0;
+ orig_offset = 0;
+ }
+ stream->tracefile_size_current += len;
+ index->offset = htobe64(stream->out_fd_offset);
+ }
+
+ while (len > 0) {
+ DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)",
+ (unsigned long)offset, len, fd, splice_pipe[1]);
+ ret_splice = splice(fd, &offset, splice_pipe[1], NULL, len,
+ SPLICE_F_MOVE | SPLICE_F_MORE);
+ DBG("splice chan to pipe, ret %zd", ret_splice);
+ if (ret_splice < 0) {
+ PERROR("Error in relay splice");
+ if (written == 0) {
+ written = ret_splice;
+ }
+ ret = errno;
+ goto splice_error;
+ }
+
+ /* Handle stream on the relayd if the output is on the network */
+ if (relayd) {
+ if (stream->metadata_flag) {
+ size_t metadata_payload_size =
+ sizeof(struct lttcomm_relayd_metadata_payload);
+
+ /* Update counter to fit the spliced data */
+ ret_splice += metadata_payload_size;
+ len += metadata_payload_size;
+ /*
+ * We do this so the return value can match the len passed as
+ * argument to this function.
+ */
+ written -= metadata_payload_size;
+ }
+ }
+
+ /* Splice data out */
+ ret_splice = splice(splice_pipe[0], NULL, outfd, NULL,
+ ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
+ DBG("Consumer splice pipe to file, ret %zd", ret_splice);
+ if (ret_splice < 0) {
+ PERROR("Error in file splice");
+ if (written == 0) {
+ written = ret_splice;
+ }
+ /* Socket operation failed. We consider the relayd dead */
+ if (errno == EBADF || errno == EPIPE) {
+ WARN("Remote relayd disconnected. Stopping");
+ relayd_hang_up = 1;
+ goto write_error;
+ }
+ ret = errno;
+ goto splice_error;
+ } else if (ret_splice > len) {
+ errno = EINVAL;
+ PERROR("Wrote more data than requested %zd (len: %lu)",
+ ret_splice, len);
+ written += ret_splice;
+ ret = errno;
+ goto splice_error;
+ }
+ len -= ret_splice;
+
+ /* This call is useless on a socket so better save a syscall. */
+ if (!relayd) {
+ /* This won't block, but will start writeout asynchronously */
+ lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice,
+ SYNC_FILE_RANGE_WRITE);
+ stream->out_fd_offset += ret_splice;
+ }
+ stream->output_written += ret_splice;
+ written += ret_splice;
+ }
+ lttng_consumer_sync_trace_file(stream, orig_offset);
+
+ ret = ret_splice;
+
+ goto end;
+
+write_error:
+ /*
+ * This is a special case that the relayd has closed its socket. Let's
+ * cleanup the relayd object and all associated streams.
+ */
+ if (relayd && relayd_hang_up) {
+ cleanup_relayd(relayd, ctx);
+ /* Skip splice error so the consumer does not fail */
+ goto end;
+ }
+
+splice_error:
+ /* send the appropriate error description to sessiond */
+ switch (ret) {
+ case EINVAL:
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
+ break;
+ case ENOMEM:
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ENOMEM);
+ break;
+ case ESPIPE:
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ESPIPE);
+ break;
+ }
+
+end:
+ if (relayd && stream->metadata_flag) {
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ }
+
+ rcu_read_unlock();
+ return written;
+}
+
+/*
+ * Take a snapshot for a specific fd
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream)
+{
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ return lttng_kconsumer_take_snapshot(stream);
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ return lttng_ustconsumer_take_snapshot(stream);
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ return -ENOSYS;
+ }
+}
+
+/*
+ * Get the produced position
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
+ unsigned long *pos)
+{
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ return lttng_kconsumer_get_produced_snapshot(stream, pos);
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ return lttng_ustconsumer_get_produced_snapshot(stream, pos);
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ return -ENOSYS;
+ }
+}
+
+int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
+ int sock, struct pollfd *consumer_sockpoll)
+{
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ return -ENOSYS;
+ }
+}
+
+/*
+ * Iterate over all streams of the hashtable and free them properly.
+ *
+ * WARNING: *MUST* be used with data stream only.
+ */
+static void destroy_data_stream_ht(struct lttng_ht *ht)
+{
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_stream *stream;
+
+ if (ht == NULL) {
+ return;
+ }
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
+ /*
+ * Ignore return value since we are currently cleaning up so any error
+ * can't be handled.
+ */
+ (void) consumer_del_stream(stream, ht);
+ }
+ rcu_read_unlock();
+
+ lttng_ht_destroy(ht);
+}
+
+/*
+ * Iterate over all streams of the hashtable and free them properly.
+ *
+ * XXX: Should not be only for metadata stream or else use an other name.
+ */
+static void destroy_stream_ht(struct lttng_ht *ht)
+{
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_stream *stream;
+
+ if (ht == NULL) {
+ return;
+ }
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
+ /*
+ * Ignore return value since we are currently cleaning up so any error
+ * can't be handled.
+ */
+ (void) consumer_del_metadata_stream(stream, ht);
+ }
+ rcu_read_unlock();
+
+ lttng_ht_destroy(ht);
+}
+
+void lttng_consumer_close_metadata(void)
+{
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ /*
+ * The Kernel consumer has a different metadata scheme so we don't
+ * close anything because the stream will be closed by the session
+ * daemon.
+ */
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ /*
+ * Close all metadata streams. The metadata hash table is passed and
+ * this call iterates over it by closing all wakeup fd. This is safe
+ * because at this point we are sure that the metadata producer is
+ * either dead or blocked.
+ */
+ lttng_ustconsumer_close_metadata(metadata_ht);
+ break;
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ }
+}
+
+/*
+ * Clean up a metadata stream and free its memory.
+ */
+void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
+ struct lttng_ht *ht)
+{
+ int ret;
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_channel *free_chan = NULL;
+ struct consumer_relayd_sock_pair *relayd;
+
+ assert(stream);
+ /*
+ * This call should NEVER receive regular stream. It must always be
+ * metadata stream and this is crucial for data structure synchronization.
+ */
+ assert(stream->metadata_flag);
+
+ DBG3("Consumer delete metadata stream %d", stream->wait_fd);
+
+ if (ht == NULL) {
+ /* Means the stream was allocated but not successfully added */
+ goto free_stream_rcu;
+ }
+
+ pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&stream->chan->lock);
+ pthread_mutex_lock(&stream->lock);
+
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ if (stream->mmap_base != NULL) {
+ ret = munmap(stream->mmap_base, stream->mmap_len);
+ if (ret != 0) {
+ PERROR("munmap metadata stream");
+ }
+ }
+ if (stream->wait_fd >= 0) {
+ ret = close(stream->wait_fd);
+ if (ret < 0) {
+ PERROR("close kernel metadata wait_fd");
+ }
+ }
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ if (stream->monitor) {
+ /* close the write-side in close_metadata */
+ ret = close(stream->ust_metadata_poll_pipe[0]);
+ if (ret < 0) {
+ PERROR("Close UST metadata read-side poll pipe");
+ }
+ }
+ lttng_ustconsumer_del_stream(stream);
+ break;
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ goto end;
+ }
+
+ rcu_read_lock();
+ iter.iter.node = &stream->node.node;
+ ret = lttng_ht_del(ht, &iter);
+ assert(!ret);
+
+ iter.iter.node = &stream->node_channel_id.node;
+ ret = lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter);
+ assert(!ret);
+
+ iter.iter.node = &stream->node_session_id.node;
+ ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
+ assert(!ret);
+ rcu_read_unlock();
+
+ if (stream->out_fd >= 0) {
+ ret = close(stream->out_fd);
+ if (ret) {
+ PERROR("close");
+ }
+ }
+
+ /* Check and cleanup relayd */
+ rcu_read_lock();
+ relayd = consumer_find_relayd(stream->net_seq_idx);
+ if (relayd != NULL) {
+ uatomic_dec(&relayd->refcount);
+ assert(uatomic_read(&relayd->refcount) >= 0);
+
+ /* Closing streams requires to lock the control socket. */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_send_close_stream(&relayd->control_sock,
+ stream->relayd_stream_id, stream->next_net_seq_num - 1);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ DBG("Unable to close stream on the relayd. Continuing");
+ /*
+ * Continue here. There is nothing we can do for the relayd.
+ * Chances are that the relayd has closed the socket so we just
+ * continue cleaning up.
+ */
+ }
+
+ /* Both conditions are met, we destroy the relayd. */
+ if (uatomic_read(&relayd->refcount) == 0 &&
+ uatomic_read(&relayd->destroy_flag)) {
+ consumer_destroy_relayd(relayd);
+ }
+ }
+ rcu_read_unlock();
+
+ /* Atomically decrement channel refcount since other threads can use it. */
+ if (!uatomic_sub_return(&stream->chan->refcount, 1)
+ && !uatomic_read(&stream->chan->nb_init_stream_left)) {
+ /* Go for channel deletion! */
+ free_chan = stream->chan;
+ }
+
+end:
+ /*
+ * Nullify the stream reference so it is not used after deletion. The
+ * channel lock MUST be acquired before being able to check for
+ * a NULL pointer value.
+ */
+ stream->chan->metadata_stream = NULL;
+
+ pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&stream->chan->lock);
+ pthread_mutex_unlock(&consumer_data.lock);
+
+ if (free_chan) {
+ consumer_del_channel(free_chan);
+ }
+
+free_stream_rcu:
+ call_rcu(&stream->node.head, free_stream_rcu);