+ ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr,
+ sizeof(struct lttcomm_relayd_data_hdr), 0);
+ if (ret <= 0) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+ } else {
+ ERR("Unable to receive data header on sock %d", conn->sock->fd);
+ }
+ ret = -1;
+ goto end;
+ }
+
+ stream_id = be64toh(data_hdr.stream_id);
+
+ rcu_read_lock();
+ stream = stream_find_by_id(relay_streams_ht, stream_id);
+ if (!stream) {
+ ret = -1;
+ goto end_rcu_unlock;
+ }
+
+ session = session_find_by_id(conn->sessions_ht, stream->session_id);
+ assert(session);
+
+ data_size = be32toh(data_hdr.data_size);
+ if (data_buffer_size < data_size) {
+ char *tmp_data_ptr;
+
+ tmp_data_ptr = realloc(data_buffer, data_size);
+ if (!tmp_data_ptr) {
+ ERR("Allocating data buffer");
+ free(data_buffer);
+ ret = -1;
+ goto end_rcu_unlock;
+ }
+ data_buffer = tmp_data_ptr;
+ data_buffer_size = data_size;
+ }
+ memset(data_buffer, 0, data_size);
+
+ net_seq_num = be64toh(data_hdr.net_seq_num);
+
+ DBG3("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64,
+ data_size, stream_id, net_seq_num);
+ ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0);
+ if (ret <= 0) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+ }
+ ret = -1;
+ goto end_rcu_unlock;
+ }
+
+ /* Check if a rotation is needed. */
+ if (stream->tracefile_size > 0 &&
+ (stream->tracefile_size_current + data_size) >
+ stream->tracefile_size) {
+ struct relay_viewer_stream *vstream;
+ uint64_t new_id;
+
+ new_id = (stream->tracefile_count_current + 1) %
+ stream->tracefile_count;
+ /*
+ * When we wrap-around back to 0, we start overwriting old
+ * trace data.
+ */
+ if (!stream->tracefile_overwrite && new_id == 0) {
+ stream->tracefile_overwrite = 1;
+ }
+ pthread_mutex_lock(&stream->viewer_stream_rotation_lock);
+ if (stream->tracefile_overwrite) {
+ stream->oldest_tracefile_id =
+ (stream->oldest_tracefile_id + 1) %
+ stream->tracefile_count;
+ }
+ vstream = viewer_stream_find_by_id(stream->stream_handle);
+ if (vstream) {
+ /*
+ * The viewer is reading a file about to be
+ * overwritten. Close the FDs it is
+ * currently using and let it handle the fault.
+ */
+ if (vstream->tracefile_count_current == new_id) {
+ pthread_mutex_lock(&vstream->overwrite_lock);
+ vstream->abort_flag = 1;
+ pthread_mutex_unlock(&vstream->overwrite_lock);
+ DBG("Streaming side setting abort_flag on stream %s_%lu\n",
+ stream->channel_name, new_id);
+ } else if (vstream->tracefile_count_current ==
+ stream->tracefile_count_current) {
+ /*
+ * The reader and writer were in the
+ * same trace file, inform the viewer
+ * that no new index will ever be added
+ * to this file.
+ */
+ vstream->close_write_flag = 1;
+ }
+ }
+ ret = utils_rotate_stream_file(stream->path_name, stream->channel_name,
+ stream->tracefile_size, stream->tracefile_count,
+ relayd_uid, relayd_gid, stream->fd,
+ &(stream->tracefile_count_current), &stream->fd);
+ stream->total_index_received = 0;
+ pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
+ if (ret < 0) {
+ ERR("Rotating stream output file");
+ goto end_rcu_unlock;
+ }
+ /* Reset current size because we just perform a stream rotation. */
+ stream->tracefile_size_current = 0;
+ rotate_index = 1;
+ }
+
+ /*
+ * Index are handled in protocol version 2.4 and above. Also, snapshot and
+ * index are NOT supported.
+ */
+ if (session->minor >= 4 && !session->snapshot) {
+ ret = handle_index_data(stream, net_seq_num, rotate_index);
+ if (ret < 0) {
+ goto end_rcu_unlock;
+ }
+ }
+
+ /* Write data to stream output fd. */
+ size_ret = lttng_write(stream->fd, data_buffer, data_size);
+ if (size_ret < data_size) {
+ ERR("Relay error writing data to file");
+ ret = -1;
+ goto end_rcu_unlock;
+ }
+
+ DBG2("Relay wrote %d bytes to tracefile for stream id %" PRIu64,
+ ret, stream->stream_handle);
+
+ ret = write_padding_to_file(stream->fd, be32toh(data_hdr.padding_size));
+ if (ret < 0) {
+ goto end_rcu_unlock;
+ }
+ stream->tracefile_size_current += data_size + be32toh(data_hdr.padding_size);
+
+ stream->prev_seq = net_seq_num;
+
+ try_close_stream(session, stream);
+
+end_rcu_unlock:
+ rcu_read_unlock();
+end:
+ return ret;