Fix: split index and data file rotation logic
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index 44ea6cdeee8cd06baf8e0127032de7fc0b4aed03..1e2e9050fef2d6d77ca78e9767a2fea759ba99be 100644 (file)
@@ -83,6 +83,14 @@ NULL
 #endif
 ;
 
+enum relay_connection_status {
+       RELAY_CONNECTION_STATUS_OK,
+       /* An error occured while processing an event on the connection. */
+       RELAY_CONNECTION_STATUS_ERROR,
+       /* Connection closed/shutdown cleanly. */
+       RELAY_CONNECTION_STATUS_CLOSED,
+};
+
 /* command line options */
 char *opt_output_path;
 static int opt_daemon, opt_background;
@@ -1075,6 +1083,11 @@ static int set_index_control_data(struct relay_index *index,
        return relay_index_set_data(index, &index_data);
 }
 
+static bool session_streams_have_index(const struct relay_session *session)
+{
+       return session->minor >= 4 && !session->snapshot;
+}
+
 /*
  * Handle the RELAYD_CREATE_SESSION command.
  *
@@ -1098,16 +1111,19 @@ static int relay_create_session(const struct lttcomm_relayd_hdr *recv_hdr,
 
        memset(&reply, 0, sizeof(reply));
 
-       switch (conn->minor) {
-       case 1:
-       case 2:
-       case 3:
-               break;
-       case 4: /* LTTng sessiond 2.4 */
-       default:
+       if (conn->minor < 4) {
+               /* From 2.1 to 2.3 */
+               ret = 0;
+       } else if (conn->minor >= 4 && conn->minor < 11) {
+               /* From 2.4 to 2.10 */
                ret = cmd_create_session_2_4(payload, session_name,
                        hostname, &live_timer, &snapshot);
+       } else {
+               /* From 2.11 to ... */
+               ret = cmd_create_session_2_11(payload, session_name,
+                       hostname, &live_timer, &snapshot);
        }
+
        if (ret < 0) {
                goto send_reply;
        }
@@ -1187,6 +1203,7 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr,
        uint64_t stream_handle = -1ULL;
        char *path_name = NULL, *channel_name = NULL;
        uint64_t tracefile_size = 0, tracefile_count = 0;
+       struct relay_stream_chunk_id stream_chunk_id = { 0 };
 
        if (!session || !conn->version_check_done) {
                ERR("Trying to add a stream before version check");
@@ -1194,17 +1211,22 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr,
                goto end_no_session;
        }
 
-       switch (session->minor) {
-       case 1: /* LTTng sessiond 2.1. Allocates path_name and channel_name. */
+       if (session->minor == 1) {
+               /* For 2.1 */
                ret = cmd_recv_stream_2_1(payload, &path_name,
                        &channel_name);
-               break;
-       case 2: /* LTTng sessiond 2.2. Allocates path_name and channel_name. */
-       default:
+       } else if (session->minor > 1 && session->minor < 11) {
+               /* From 2.2 to 2.10 */
                ret = cmd_recv_stream_2_2(payload, &path_name,
                        &channel_name, &tracefile_size, &tracefile_count);
-               break;
+       } else {
+               /* From 2.11 to ... */
+               ret = cmd_recv_stream_2_11(payload, &path_name,
+                       &channel_name, &tracefile_size, &tracefile_count,
+                       &stream_chunk_id.value);
+               stream_chunk_id.is_set = true;
        }
+
        if (ret < 0) {
                goto send_reply;
        }
@@ -1221,7 +1243,8 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr,
 
        /* We pass ownership of path_name and channel_name. */
        stream = stream_create(trace, stream_handle, path_name,
-                       channel_name, tracefile_size, tracefile_count);
+               channel_name, tracefile_size, tracefile_count,
+               &stream_chunk_id);
        path_name = NULL;
        channel_name = NULL;
 
@@ -1516,7 +1539,8 @@ end:
  * Return 0 on success, -1 on error.
  */
 static
-int create_rotate_index_file(struct relay_stream *stream)
+int create_rotate_index_file(struct relay_stream *stream,
+               const char *stream_path)
 {
        int ret;
        uint32_t major, minor;
@@ -1528,7 +1552,7 @@ int create_rotate_index_file(struct relay_stream *stream)
        }
        major = stream->trace->session->major;
        minor = stream->trace->session->minor;
-       stream->index_file = lttng_index_file_create(stream->path_name,
+       stream->index_file = lttng_index_file_create(stream_path,
                        stream->channel_name,
                        -1, -1, stream->tracefile_size,
                        tracefile_array_get_file_index_head(stream->tfa),
@@ -1546,10 +1570,12 @@ end:
 }
 
 static
-int do_rotate_stream(struct relay_stream *stream)
+int do_rotate_stream_data(struct relay_stream *stream)
 {
        int ret;
 
+       DBG("Rotating stream %" PRIu64 " data file",
+                       stream->stream_handle);
        /* Perform the stream rotation. */
        ret = utils_rotate_stream_file(stream->path_name,
                        stream->channel_name, stream->tracefile_size,
@@ -1561,19 +1587,17 @@ int do_rotate_stream(struct relay_stream *stream)
                goto end;
        }
        stream->tracefile_size_current = 0;
-
-       /* Rotate also the index if the stream is not a metadata stream. */
-       if (!stream->is_metadata) {
-               ret = create_rotate_index_file(stream);
-               if (ret < 0) {
-                       ERR("Failed to rotate index file");
-                       goto end;
-               }
-       }
-
-       stream->rotate_at_seq_num = -1ULL;
        stream->pos_after_last_complete_data_index = 0;
+       stream->data_rotated = true;
 
+       if (stream->data_rotated && stream->index_rotated) {
+               /* Rotation completed; reset its state. */
+               DBG("Rotation completed for stream %" PRIu64,
+                               stream->stream_handle);
+               stream->rotate_at_seq_num = -1ULL;
+               stream->data_rotated = false;
+               stream->index_rotated = false;
+       }
 end:
        return ret;
 }
@@ -1585,9 +1609,7 @@ end:
  * connections are separate, the indexes as well as the commands arrive from
  * the control connection and we have no control over the order so we could be
  * in a situation where too much data has been received on the data connection
- * before the rotation command on the control connection arrives. We don't need
- * to update the index because its order is guaranteed with the rotation
- * command message.
+ * before the rotation command on the control connection arrives.
  */
 static
 int rotate_truncate_stream(struct relay_stream *stream)
@@ -1686,12 +1708,6 @@ int rotate_truncate_stream(struct relay_stream *stream)
                goto end;
        }
 
-       ret = create_rotate_index_file(stream);
-       if (ret < 0) {
-               ERR("Rotate stream index file");
-               goto end;
-       }
-
        /*
         * Update the offset and FD of all the eventual indexes created by the
         * data connection before the rotation command arrived.
@@ -1714,27 +1730,100 @@ end:
 }
 
 /*
- * Check if a stream should perform a rotation (for session rotation).
+ * Check if a stream's index file should be rotated (for session rotation).
  * Must be called with the stream lock held.
  *
  * Return 0 on success, a negative value on error.
  */
 static
-int try_rotate_stream(struct relay_stream *stream)
+int try_rotate_stream_index(struct relay_stream *stream)
 {
        int ret = 0;
 
-       /* No rotation expected. */
        if (stream->rotate_at_seq_num == -1ULL) {
+               /* No rotation expected. */
+               goto end;
+       }
+
+       if (stream->index_rotated) {
+               /* Rotation of the index has already occurred. */
                goto end;
        }
 
-       if (stream->prev_seq < stream->rotate_at_seq_num ||
-                       stream->prev_seq == -1ULL) {
-               DBG("Stream %" PRIu64 " no yet ready for rotation",
+       if (stream->prev_index_seq == -1ULL ||
+                       stream->prev_index_seq < stream->rotate_at_seq_num) {
+               DBG("Stream %" PRIu64 " index not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
+                               stream->stream_handle,
+                               stream->rotate_at_seq_num,
+                               stream->prev_index_seq);
+               goto end;
+       } else if (stream->prev_index_seq != stream->rotate_at_seq_num) {
+               /*
+                * Unexpected, protocol error/bug.
+                * It could mean that we received a rotation position
+                * that is in the past.
+                */
+               ERR("Stream %" PRIu64 " index is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
+                               stream->stream_handle,
+                               stream->rotate_at_seq_num,
+                               stream->prev_data_seq,
+                               stream->prev_index_seq);
+               ret = -1;
+               goto end;
+       } else {
+               DBG("Rotating stream %" PRIu64 " index file",
                                stream->stream_handle);
+               ret = create_rotate_index_file(stream, stream->path_name);
+               stream->index_rotated = true;
+
+               if (stream->data_rotated && stream->index_rotated) {
+                       /* Rotation completed; reset its state. */
+                       DBG("Rotation completed for stream %" PRIu64,
+                                       stream->stream_handle);
+                       stream->rotate_at_seq_num = -1ULL;
+                       stream->data_rotated = false;
+                       stream->index_rotated = false;
+               }
+       }
+
+end:
+       return ret;
+}
+
+/*
+ * Check if a stream's data file (as opposed to index) should be rotated
+ * (for session rotation).
+ * Must be called with the stream lock held.
+ *
+ * Return 0 on success, a negative value on error.
+ */
+static
+int try_rotate_stream_data(struct relay_stream *stream)
+{
+       int ret = 0;
+
+       if (stream->rotate_at_seq_num == -1ULL) {
+               /* No rotation expected. */
+               goto end;
+       }
+
+       if (stream->data_rotated) {
+               /* Rotation of the data file has already occurred. */
+               goto end;
+       }
+
+       if (stream->prev_data_seq == -1ULL ||
+                       stream->prev_data_seq < stream->rotate_at_seq_num) {
+               DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ")",
+                               stream->stream_handle,
+                               stream->rotate_at_seq_num,
+                               stream->prev_data_seq);
                goto end;
-       } else if (stream->prev_seq > stream->rotate_at_seq_num) {
+       } else if (stream->prev_data_seq > stream->rotate_at_seq_num) {
+               /*
+                * prev_data_seq is checked here since indexes and rotation
+                * commands are serialized with respect to each other.
+                */
                DBG("Rotation after too much data has been written in tracefile "
                                "for stream %" PRIu64 ", need to truncate before "
                                "rotating", stream->stream_handle);
@@ -1743,11 +1832,20 @@ int try_rotate_stream(struct relay_stream *stream)
                        ERR("Failed to truncate stream");
                        goto end;
                }
+       } else if (stream->prev_data_seq != stream->rotate_at_seq_num) {
+               /*
+                * Unexpected, protocol error/bug.
+                * It could mean that we received a rotation position
+                * that is in the past.
+                */
+               ERR("Stream %" PRIu64 " data is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ")",
+                               stream->stream_handle,
+                               stream->rotate_at_seq_num,
+                               stream->prev_data_seq);
+               ret = -1;
+               goto end;
        } else {
-               /* stream->prev_seq == stream->rotate_at_seq_num */
-               DBG("Stream %" PRIu64 " ready for rotation",
-                               stream->stream_handle);
-               ret = do_rotate_stream(stream);
+               ret = do_rotate_stream_data(stream);
        }
 
 end:
@@ -1808,7 +1906,7 @@ static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr,
 
        size_ret = write_padding_to_file(metadata_stream->stream_fd->fd,
                        metadata_payload_header.padding_size);
-       if (size_ret < 0) {
+       if (size_ret < (int64_t) metadata_payload_header.padding_size) {
                ret = -1;
                goto end_put;
        }
@@ -1818,7 +1916,7 @@ static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr,
        DBG2("Relay metadata written. Updated metadata_received %" PRIu64,
                metadata_stream->metadata_received);
 
-       ret = try_rotate_stream(metadata_stream);
+       ret = try_rotate_stream_data(metadata_stream);
        if (ret < 0) {
                goto end_put;
        }
@@ -1913,6 +2011,7 @@ static int relay_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
        struct relay_stream *stream;
        ssize_t send_ret;
        int ret;
+       uint64_t stream_seq;
 
        DBG("Data pending command received");
 
@@ -1940,12 +2039,23 @@ static int relay_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
 
        pthread_mutex_lock(&stream->lock);
 
-       DBG("Data pending for stream id %" PRIu64 " prev_seq %" PRIu64
-                       " and last_seq %" PRIu64, msg.stream_id,
-                       stream->prev_seq, msg.last_net_seq_num);
+       if (session_streams_have_index(session)) {
+               /*
+                * Ensure that both the index and stream data have been
+                * flushed up to the requested point.
+                */
+               stream_seq = min(stream->prev_data_seq, stream->prev_index_seq);
+       } else {
+               stream_seq = stream->prev_data_seq;
+       }
+       DBG("Data pending for stream id %" PRIu64 ": prev_data_seq %" PRIu64
+                       ", prev_index_seq %" PRIu64
+                       ", and last_seq %" PRIu64, msg.stream_id,
+                       stream->prev_data_seq, stream->prev_index_seq,
+                       msg.last_net_seq_num);
 
        /* Avoid wrapping issue */
-       if (((int64_t) (stream->prev_seq - msg.last_net_seq_num)) >= 0) {
+       if (((int64_t) (stream_seq - msg.last_net_seq_num)) >= 0) {
                /* Data has in fact been written and is NOT pending */
                ret = 0;
        } else {
@@ -2165,7 +2275,18 @@ static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
                }
                pthread_mutex_lock(&stream->lock);
                if (!stream->data_pending_check_done) {
-                       if (!stream->closed || !(((int64_t) (stream->prev_seq - stream->last_net_seq_num)) >= 0)) {
+                       uint64_t stream_seq;
+
+                       if (session_streams_have_index(conn->session)) {
+                               /*
+                                * Ensure that both the index and stream data have been
+                                * flushed up to the requested point.
+                                */
+                               stream_seq = min(stream->prev_data_seq, stream->prev_index_seq);
+                       } else {
+                               stream_seq = stream->prev_data_seq;
+                       }
+                       if (!stream->closed || !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) {
                                is_data_inflight = 1;
                                DBG("Data is still in flight for stream %" PRIu64,
                                                stream->stream_handle);
@@ -2242,8 +2363,12 @@ static int relay_recv_index(const struct lttcomm_relayd_hdr *recv_hdr,
        index_info.timestamp_end = be64toh(index_info.timestamp_end);
        index_info.events_discarded = be64toh(index_info.events_discarded);
        index_info.stream_id = be64toh(index_info.stream_id);
-       index_info.stream_instance_id = be64toh(index_info.stream_instance_id);
-       index_info.packet_seq_num = be64toh(index_info.packet_seq_num);
+
+       if (conn->minor >= 8) {
+               index_info.stream_instance_id =
+                               be64toh(index_info.stream_instance_id);
+               index_info.packet_seq_num = be64toh(index_info.packet_seq_num);
+       }
 
        stream = stream_get_by_id(index_info.relay_stream_id);
        if (!stream) {
@@ -2292,12 +2417,23 @@ static int relay_recv_index(const struct lttcomm_relayd_hdr *recv_hdr,
                tracefile_array_commit_seq(stream->tfa);
                stream->index_received_seqcount++;
                stream->pos_after_last_complete_data_index += index->total_size;
+               stream->prev_index_seq = index_info.net_seq_num;
+
+               ret = try_rotate_stream_index(stream);
+               if (ret < 0) {
+                       goto end_stream_put;
+               }
        } else if (ret > 0) {
                /* no flush. */
                ret = 0;
        } else {
+               /*
+                * ret < 0
+                *
+                * relay_index_try_flush is responsible for the self-reference
+                * put of the index object on error.
+                */
                ERR("relay_index_try_flush error %d", ret);
-               relay_index_put(index);
                ret = -1;
        }
 
@@ -2448,7 +2584,8 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr
         * Update the trace path (just the folder, the stream name does not
         * change).
         */
-       free(stream->path_name);
+       free(stream->prev_path_name);
+       stream->prev_path_name = stream->path_name;
        stream->path_name = create_output_path(new_path_view.data);
        if (!stream->path_name) {
                ERR("Failed to create a new output path");
@@ -2463,21 +2600,32 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr
                goto end_stream_unlock;
        }
 
-       stream->chunk_id = stream_info.new_chunk_id;
+       assert(stream->current_chunk_id.is_set);
+       stream->current_chunk_id.value = stream_info.new_chunk_id;
 
        if (stream->is_metadata) {
+               /*
+                * Metadata streams have no index; consider its rotation
+                * complete.
+                */
+               stream->index_rotated = true;
                /*
                 * The metadata stream is sent only over the control connection
                 * so we know we have all the data to perform the stream
                 * rotation.
                 */
-               ret = do_rotate_stream(stream);
+               ret = do_rotate_stream_data(stream);
        } else {
                stream->rotate_at_seq_num = stream_info.rotate_at_seq_num;
-               ret = try_rotate_stream(stream);
-       }
-       if (ret < 0) {
-               goto end_stream_unlock;
+               ret = try_rotate_stream_data(stream);
+               if (ret < 0) {
+                       goto end_stream_unlock;
+               }
+
+               ret = try_rotate_stream_index(stream);
+               if (ret < 0) {
+                       goto end_stream_unlock;
+               }
        }
 
 end_stream_unlock:
@@ -2571,6 +2719,7 @@ static int relay_mkdir(const struct lttcomm_relayd_hdr *recv_hdr,
                goto end;
        }
 
+       DBG("MKDIR command has path \"%s\", changed to \"%s\"", path_view.data, path);
        ret = utils_mkdir_recursive(path, S_IRWXU | S_IRWXG, -1, -1);
        if (ret < 0) {
                ERR("relay creating output directory");
@@ -2694,6 +2843,8 @@ static int relay_rotate_rename(const struct lttcomm_relayd_hdr *recv_hdr,
                goto end;
        }
 
+       DBG("ROTATE_RENAME command has argument old path = \"%s\", new_path = \"%s\"",
+                       old_path_view.data, new_path_view.data);
        complete_old_path = create_output_path(old_path_view.data);
        if (!complete_old_path) {
                ERR("Failed to build old output path in rotate_rename command");
@@ -2707,6 +2858,8 @@ static int relay_rotate_rename(const struct lttcomm_relayd_hdr *recv_hdr,
                ret = -1;
                goto end;
        }
+       DBG("Expanded ROTATE_RENAME arguments to old path = \"%s\", new_path = \"%s\"",
+                       complete_old_path, complete_new_path);
 
        ret = utils_mkdir_recursive(complete_new_path, S_IRWXU | S_IRWXG,
                        -1, -1);
@@ -2797,7 +2950,8 @@ int relay_rotate_pending(const struct lttcomm_relayd_hdr *recv_hdr,
 
        chunk_id = be64toh(msg.chunk_id);
 
-       DBG("Evaluating rotate pending for chunk id %" PRIu64, chunk_id);
+       DBG("Evaluating rotate pending for session \"%s\" and  chunk id %" PRIu64,
+                       session->session_name, chunk_id);
 
        /*
         * Iterate over all the streams in the session and check if they are
@@ -2819,7 +2973,7 @@ int relay_rotate_pending(const struct lttcomm_relayd_hdr *recv_hdr,
                        rotate_pending = true;
                        DBG("Stream %" PRIu64 " is still rotating",
                                        stream->stream_handle);
-               } else if (stream->chunk_id < chunk_id) {
+               } else if (stream->current_chunk_id.value < chunk_id) {
                        /*
                         * Stream closed on the consumer but still active on the
                         * relay.
@@ -2945,9 +3099,11 @@ end:
        return ret;
 }
 
-static int relay_process_control_receive_payload(struct relay_connection *conn)
+static enum relay_connection_status relay_process_control_receive_payload(
+               struct relay_connection *conn)
 {
        int ret = 0;
+       enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK;
        struct lttng_dynamic_buffer *reception_buffer =
                        &conn->protocol.ctrl.reception_buffer;
        struct ctrl_connection_state_receive_payload *state =
@@ -2963,11 +3119,15 @@ static int relay_process_control_receive_payload(struct relay_connection *conn)
                        reception_buffer->data + state->received,
                        state->left_to_receive, MSG_DONTWAIT);
        if (ret < 0) {
-               ERR("Unable to receive command payload on sock %d", conn->sock->fd);
+               if (errno != EAGAIN && errno != EWOULDBLOCK) {
+                       PERROR("Unable to receive command payload on sock %d",
+                                       conn->sock->fd);
+                       status = RELAY_CONNECTION_STATUS_ERROR;
+               }
                goto end;
        } else if (ret == 0) {
                DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd);
-               ret = -1;
+               status = RELAY_CONNECTION_STATUS_CLOSED;
                goto end;
        }
 
@@ -2985,7 +3145,6 @@ static int relay_process_control_receive_payload(struct relay_connection *conn)
                DBG3("Partial reception of control connection protocol payload (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)",
                                state->received, state->left_to_receive,
                                conn->sock->fd);
-               ret = 0;
                goto end;
        }
 
@@ -3004,17 +3163,23 @@ reception_complete:
        ret = relay_process_control_command(conn,
                        &state->header, &payload_view);
        if (ret < 0) {
+               status = RELAY_CONNECTION_STATUS_ERROR;
                goto end;
        }
 
        ret = connection_reset_protocol_state(conn);
+       if (ret) {
+               status = RELAY_CONNECTION_STATUS_ERROR;
+       }
 end:
-       return ret;
+       return status;
 }
 
-static int relay_process_control_receive_header(struct relay_connection *conn)
+static enum relay_connection_status relay_process_control_receive_header(
+               struct relay_connection *conn)
 {
        int ret = 0;
+       enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK;
        struct lttcomm_relayd_hdr header;
        struct lttng_dynamic_buffer *reception_buffer =
                        &conn->protocol.ctrl.reception_buffer;
@@ -3027,11 +3192,15 @@ static int relay_process_control_receive_header(struct relay_connection *conn)
                        reception_buffer->data + state->received,
                        state->left_to_receive, MSG_DONTWAIT);
        if (ret < 0) {
-               ERR("Unable to receive control command header on sock %d", conn->sock->fd);
+               if (errno != EAGAIN && errno != EWOULDBLOCK) {
+                       PERROR("Unable to receive control command header on sock %d",
+                                       conn->sock->fd);
+                       status = RELAY_CONNECTION_STATUS_ERROR;
+               }
                goto end;
        } else if (ret == 0) {
                DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd);
-               ret = -1;
+               status = RELAY_CONNECTION_STATUS_CLOSED;
                goto end;
        }
 
@@ -3049,7 +3218,6 @@ static int relay_process_control_receive_header(struct relay_connection *conn)
                DBG3("Partial reception of control connection protocol header (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)",
                                state->received, state->left_to_receive,
                                conn->sock->fd);
-               ret = 0;
                goto end;
        }
 
@@ -3068,11 +3236,10 @@ static int relay_process_control_receive_header(struct relay_connection *conn)
                        conn->sock->fd, header.cmd, header.cmd_version,
                        header.data_size);
 
-       /* FIXME temporary arbitrary limit on data size. */
-       if (header.data_size > (128 * 1024 * 1024)) {
+       if (header.data_size > DEFAULT_NETWORK_RELAYD_CTRL_MAX_PAYLOAD_SIZE) {
                ERR("Command header indicates a payload (%" PRIu64 " bytes) that exceeds the maximal payload size allowed on a control connection.",
                                header.data_size);
-               ret = -1;
+               status = RELAY_CONNECTION_STATUS_ERROR;
                goto end;
        }
 
@@ -3082,6 +3249,7 @@ static int relay_process_control_receive_header(struct relay_connection *conn)
        ret = lttng_dynamic_buffer_set_size(reception_buffer,
                        header.data_size);
        if (ret) {
+               status = RELAY_CONNECTION_STATUS_ERROR;
                goto end;
        }
 
@@ -3090,32 +3258,33 @@ static int relay_process_control_receive_header(struct relay_connection *conn)
                 * Manually invoke the next state as the poll loop
                 * will not wake-up to allow us to proceed further.
                 */
-               ret = relay_process_control_receive_payload(conn);
+               status = relay_process_control_receive_payload(conn);
        }
 end:
-       return ret;
+       return status;
 }
 
 /*
  * Process the commands received on the control socket
  */
-static int relay_process_control(struct relay_connection *conn)
+static enum relay_connection_status relay_process_control(
+               struct relay_connection *conn)
 {
-       int ret = 0;
+       enum relay_connection_status status;
 
        switch (conn->protocol.ctrl.state_id) {
        case CTRL_CONNECTION_STATE_RECEIVE_HEADER:
-               ret = relay_process_control_receive_header(conn);
+               status = relay_process_control_receive_header(conn);
                break;
        case CTRL_CONNECTION_STATE_RECEIVE_PAYLOAD:
-               ret = relay_process_control_receive_payload(conn);
+               status = relay_process_control_receive_payload(conn);
                break;
        default:
                ERR("Unknown control connection protocol state encountered.");
                abort();
        }
 
-       return ret;
+       return status;
 }
 
 /*
@@ -3150,7 +3319,34 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
        }
 
        if (rotate_index || !stream->index_file) {
-               ret = create_rotate_index_file(stream);
+               const char *stream_path;
+
+               /*
+                * The data connection creates the stream's first index file.
+                *
+                * This can happen _after_ a ROTATE_STREAM command. In
+                * other words, the data of the first packet of this stream
+                * can be received after a ROTATE_STREAM command.
+                *
+                * The ROTATE_STREAM command changes the stream's path_name
+                * to point to the "next" chunk. If a rotation is pending for
+                * this stream, as indicated by "rotate_at_seq_num != -1ULL",
+                * it means that we are still receiving data that belongs in the
+                * stream's former path.
+                *
+                * In this very specific case, we must ensure that the index
+                * file is created in the streams's former path,
+                * "prev_path_name".
+                *
+                * All other rotations beyond the first one are not affected
+                * by this problem since the actual rotation operation creates
+                * the new chunk's index file.
+                */
+               stream_path = stream->rotate_at_seq_num == -1ULL ?
+                               stream->path_name:
+                               stream->prev_path_name;
+
+               ret = create_rotate_index_file(stream, stream_path);
                if (ret < 0) {
                        ERR("Failed to rotate index");
                        /* Put self-ref for this index due to error. */
@@ -3178,18 +3374,24 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
                /* No flush. */
                ret = 0;
        } else {
-               /* Put self-ref for this index due to error. */
-               relay_index_put(index);
-               index = NULL;
+               /*
+                * ret < 0
+                *
+                * relay_index_try_flush is responsible for the self-reference
+                * put of the index object on error.
+                */
+               ERR("relay_index_try_flush error %d", ret);
                ret = -1;
        }
 end:
        return ret;
 }
 
-static int relay_process_data_receive_header(struct relay_connection *conn)
+static enum relay_connection_status relay_process_data_receive_header(
+               struct relay_connection *conn)
 {
        int ret;
+       enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK;
        struct data_connection_state_receive_header *state =
                        &conn->protocol.data.state.receive_header;
        struct lttcomm_relayd_data_hdr header;
@@ -3201,12 +3403,15 @@ static int relay_process_data_receive_header(struct relay_connection *conn)
                        state->header_reception_buffer + state->received,
                        state->left_to_receive, MSG_DONTWAIT);
        if (ret < 0) {
-               ERR("Unable to receive data header on sock %d", conn->sock->fd);
+               if (errno != EAGAIN && errno != EWOULDBLOCK) {
+                       PERROR("Unable to receive data header on sock %d", conn->sock->fd);
+                       status = RELAY_CONNECTION_STATUS_ERROR;
+               }
                goto end;
        } else if (ret == 0) {
                /* Orderly shutdown. Not necessary to print an error. */
                DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd);
-               ret = -1;
+               status = RELAY_CONNECTION_STATUS_CLOSED;
                goto end;
        }
 
@@ -3224,7 +3429,6 @@ static int relay_process_data_receive_header(struct relay_connection *conn)
                DBG3("Partial reception of data connection header (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)",
                                state->received, state->left_to_receive,
                                conn->sock->fd);
-               ret = 0;
                goto end;
        }
 
@@ -3253,7 +3457,8 @@ static int relay_process_data_receive_header(struct relay_connection *conn)
        if (!stream) {
                DBG("relay_process_data_receive_payload: Cannot find stream %" PRIu64,
                                header.stream_id);
-               ret = 0;
+               /* Protocol error. */
+               status = RELAY_CONNECTION_STATUS_ERROR;
                goto end;
        }
 
@@ -3278,6 +3483,7 @@ static int relay_process_data_receive_header(struct relay_connection *conn)
                                &new_id, &stream->stream_fd->fd);
                if (ret < 0) {
                        ERR("Failed to rotate stream output file");
+                       status = RELAY_CONNECTION_STATUS_ERROR;
                        goto end_stream_unlock;
                }
 
@@ -3289,17 +3495,18 @@ static int relay_process_data_receive_header(struct relay_connection *conn)
                conn->protocol.data.state.receive_payload.rotate_index = true;
        }
 
-       ret = 0;
 end_stream_unlock:
        pthread_mutex_unlock(&stream->lock);
        stream_put(stream);
 end:
-       return ret;
+       return status;
 }
 
-static int relay_process_data_receive_payload(struct relay_connection *conn)
+static enum relay_connection_status relay_process_data_receive_payload(
+               struct relay_connection *conn)
 {
        int ret;
+       enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK;
        struct relay_stream *stream;
        struct data_connection_state_receive_payload *state =
                        &conn->protocol.data.state.receive_payload;
@@ -3310,20 +3517,28 @@ static int relay_process_data_receive_payload(struct relay_connection *conn)
        uint64_t left_to_receive = state->left_to_receive;
        struct relay_session *session;
 
+       DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive",
+                       state->header.stream_id, state->header.net_seq_num,
+                       state->received, left_to_receive);
+
        stream = stream_get_by_id(state->header.stream_id);
        if (!stream) {
-               DBG("relay_process_data_receive_payload: Cannot find stream %" PRIu64,
+               /* Protocol error. */
+               ERR("relay_process_data_receive_payload: cannot find stream %" PRIu64,
                                state->header.stream_id);
-               ret = 0;
+               status = RELAY_CONNECTION_STATUS_ERROR;
                goto end;
        }
 
        pthread_mutex_lock(&stream->lock);
        session = stream->trace->session;
-
-       DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive",
-                       state->header.stream_id, state->header.net_seq_num,
-                       state->received, left_to_receive);
+       if (!conn->session) {
+               ret = connection_set_session(conn, session);
+               if (ret) {
+                       status = RELAY_CONNECTION_STATUS_ERROR;
+                       goto end_stream_unlock;
+               }
+       }
 
        /*
         * The size of the "chunk" received on any iteration is bounded by:
@@ -3338,13 +3553,16 @@ static int relay_process_data_receive_payload(struct relay_connection *conn)
                ret = conn->sock->ops->recvmsg(conn->sock, data_buffer,
                                recv_size, MSG_DONTWAIT);
                if (ret < 0) {
-                       ERR("Socket %d error %d", conn->sock->fd, ret);
-                       ret = -1;
+                       if (errno != EAGAIN && errno != EWOULDBLOCK) {
+                               PERROR("Socket %d error", conn->sock->fd);
+                               status = RELAY_CONNECTION_STATUS_ERROR;
+                       }
                        goto end_stream_unlock;
                } else if (ret == 0) {
                        /* No more data ready to be consumed on socket. */
                        DBG3("No more data ready for consumption on data socket of stream id %" PRIu64,
                                        state->header.stream_id);
+                       status = RELAY_CONNECTION_STATUS_CLOSED;
                        break;
                } else if (ret < (int) recv_size) {
                        /*
@@ -3361,7 +3579,7 @@ static int relay_process_data_receive_payload(struct relay_connection *conn)
                                recv_size);
                if (write_ret < (ssize_t) recv_size) {
                        ERR("Relay error writing data to file");
-                       ret = -1;
+                       status = RELAY_CONNECTION_STATUS_ERROR;
                        goto end_stream_unlock;
                }
 
@@ -3381,27 +3599,28 @@ static int relay_process_data_receive_payload(struct relay_connection *conn)
                DBG3("Partial receive on data connection of stream id %" PRIu64 ", %" PRIu64 " bytes received, %" PRIu64 " bytes left to receive",
                                state->header.stream_id, state->received,
                                state->left_to_receive);
-               ret = 0;
                goto end_stream_unlock;
        }
 
        ret = write_padding_to_file(stream->stream_fd->fd,
                        state->header.padding_size);
-       if (ret < 0) {
+       if ((int64_t) ret < (int64_t) state->header.padding_size) {
                ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
                                stream->stream_handle,
                                state->header.net_seq_num, ret);
+               status = RELAY_CONNECTION_STATUS_ERROR;
                goto end_stream_unlock;
        }
 
 
-       if (session->minor >= 4 && !session->snapshot) {
+       if (session_streams_have_index(session)) {
                ret = handle_index_data(stream, state->header.net_seq_num,
                                state->rotate_index, &index_flushed, state->header.data_size + state->header.padding_size);
                if (ret < 0) {
                        ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
                                        stream->stream_handle,
                                        state->header.net_seq_num, ret);
+                       status = RELAY_CONNECTION_STATUS_ERROR;
                        goto end_stream_unlock;
                }
        }
@@ -3409,15 +3628,20 @@ static int relay_process_data_receive_payload(struct relay_connection *conn)
        stream->tracefile_size_current += state->header.data_size +
                        state->header.padding_size;
 
-       if (stream->prev_seq == -1ULL) {
+       if (stream->prev_data_seq == -1ULL) {
                new_stream = true;
        }
        if (index_flushed) {
                stream->pos_after_last_complete_data_index =
                                stream->tracefile_size_current;
+               stream->prev_index_seq = state->header.net_seq_num;
+               ret = try_rotate_stream_index(stream);
+               if (ret < 0) {
+                       goto end_stream_unlock;
+               }
        }
 
-       stream->prev_seq = state->header.net_seq_num;
+       stream->prev_data_seq = state->header.net_seq_num;
 
        /*
         * Resetting the protocol state (to RECEIVE_HEADER) will trash the
@@ -3427,8 +3651,9 @@ static int relay_process_data_receive_payload(struct relay_connection *conn)
        connection_reset_protocol_state(conn);
        state = NULL;
 
-       ret = try_rotate_stream(stream);
+       ret = try_rotate_stream_data(stream);
        if (ret < 0) {
+               status = RELAY_CONNECTION_STATUS_ERROR;
                goto end_stream_unlock;
        }
 
@@ -3447,29 +3672,30 @@ end_stream_unlock:
 
        stream_put(stream);
 end:
-       return ret;
+       return status;
 }
 
 /*
  * relay_process_data: Process the data received on the data socket
  */
-static int relay_process_data(struct relay_connection *conn)
+static enum relay_connection_status relay_process_data(
+               struct relay_connection *conn)
 {
-       int ret;
+       enum relay_connection_status status;
 
        switch (conn->protocol.data.state_id) {
        case DATA_CONNECTION_STATE_RECEIVE_HEADER:
-               ret = relay_process_data_receive_header(conn);
+               status = relay_process_data_receive_header(conn);
                break;
        case DATA_CONNECTION_STATE_RECEIVE_PAYLOAD:
-               ret = relay_process_data_receive_payload(conn);
+               status = relay_process_data_receive_payload(conn);
                break;
        default:
                ERR("Unexpected data connection communication state.");
                abort();
        }
 
-       return ret;
+       return status;
 }
 
 static void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
@@ -3641,9 +3867,31 @@ restart:
                                assert(ctrl_conn->type == RELAY_CONTROL);
 
                                if (revents & LPOLLIN) {
-                                       ret = relay_process_control(ctrl_conn);
-                                       if (ret < 0) {
-                                               /* Clear the connection on error. */
+                                       enum relay_connection_status status;
+
+                                       status = relay_process_control(ctrl_conn);
+                                       if (status != RELAY_CONNECTION_STATUS_OK) {
+                                               /*
+                                                * On socket error flag the session as aborted to force
+                                                * the cleanup of its stream otherwise it can leak
+                                                * during the lifetime of the relayd.
+                                                *
+                                                * This prevents situations in which streams can be
+                                                * left opened because an index was received, the
+                                                * control connection is closed, and the data
+                                                * connection is closed (uncleanly) before the packet's
+                                                * data provided.
+                                                *
+                                                * Since the control connection encountered an error,
+                                                * it is okay to be conservative and close the
+                                                * session right now as we can't rely on the protocol
+                                                * being respected anymore.
+                                                */
+                                               if (status == RELAY_CONNECTION_STATUS_ERROR) {
+                                                       session_abort(ctrl_conn->session);
+                                               }
+
+                                               /* Clear the connection on error or close. */
                                                relay_thread_close_connection(&events,
                                                                pollfd,
                                                                ctrl_conn);
@@ -3717,9 +3965,30 @@ restart:
                        assert(data_conn->type == RELAY_DATA);
 
                        if (revents & LPOLLIN) {
-                               ret = relay_process_data(data_conn);
-                               /* Connection closed */
-                               if (ret < 0) {
+                               enum relay_connection_status status;
+
+                               status = relay_process_data(data_conn);
+                               /* Connection closed or error. */
+                               if (status != RELAY_CONNECTION_STATUS_OK) {
+                                       /*
+                                        * On socket error flag the session as aborted to force
+                                        * the cleanup of its stream otherwise it can leak
+                                        * during the lifetime of the relayd.
+                                        *
+                                        * This prevents situations in which streams can be
+                                        * left opened because an index was received, the
+                                        * control connection is closed, and the data
+                                        * connection is closed (uncleanly) before the packet's
+                                        * data provided.
+                                        *
+                                        * Since the data connection encountered an error,
+                                        * it is okay to be conservative and close the
+                                        * session right now as we can't rely on the protocol
+                                        * being respected anymore.
+                                        */
+                                       if (status == RELAY_CONNECTION_STATUS_ERROR) {
+                                               session_abort(data_conn->session);
+                                       }
                                        relay_thread_close_connection(&events, pollfd,
                                                        data_conn);
                                        /*
@@ -3751,16 +4020,14 @@ restart:
 
 exit:
 error:
-       /* Cleanup reamaining connection object. */
+       /* Cleanup remaining connection object. */
        rcu_read_lock();
        cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter,
                        destroy_conn,
                        sock_n.node) {
                health_code_update();
 
-               if (session_abort(destroy_conn->session)) {
-                       assert(0);
-               }
+               session_abort(destroy_conn->session);
 
                /*
                 * No need to grab another ref, because we own
This page took 0.036763 seconds and 4 git commands to generate.