Fix: ret is uninitialized on standard path
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index e8283f037e43a3a9429b57ec0e606a44097192f2..45b7de8c657f9b43a426057fc07f7a299689dc75 100644 (file)
@@ -2789,6 +2789,112 @@ end_no_reply:
        return ret;
 }
 
+/*
+ * Check if all the streams in the session have completed the last rotation.
+ * The chunk_id value is used to distinguish the cases where a stream was
+ * closed on the consumerd before the rotation started but it still active on
+ * the relayd, and the case where a stream appeared on the consumerd/relayd
+ * after the last rotation started (in that case, it is already writing in the
+ * new chunk folder).
+ */
+static
+int relay_rotate_pending(struct lttcomm_relayd_hdr *recv_hdr,
+               struct relay_connection *conn)
+{
+       struct relay_session *session = conn->session;
+       struct lttcomm_relayd_rotate_pending msg;
+       struct lttcomm_relayd_generic_reply reply;
+       struct lttng_ht_iter iter;
+       struct relay_stream *stream;
+       int ret = 0;
+       ssize_t network_ret;
+       uint64_t chunk_id;
+        bool rotate_pending = false;
+
+       DBG("Rotate pending command received");
+
+       if (!session || !conn->version_check_done) {
+               ERR("Trying to check for data before version check");
+               ret = -1;
+               goto end_no_reply;
+       }
+
+       if (session->major == 2 && session->minor < 11) {
+               ERR("Unsupported feature before 2.11");
+               ret = -1;
+               goto end_no_reply;
+       }
+
+       network_ret = conn->sock->ops->recvmsg(conn->sock, &msg, sizeof(msg), 0);
+       if (network_ret < (ssize_t) sizeof(msg)) {
+               if (network_ret == 0) {
+                       /* Orderly shutdown. Not necessary to print an error. */
+                       DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+               } else {
+                       ERR("Relay didn't receive valid rotate_pending struct size : %zi",
+                                       network_ret);
+               }
+               ret = -1;
+               goto end_no_reply;
+       }
+
+       chunk_id = be64toh(msg.chunk_id);
+       DBG("Evaluating rotate pending for chunk id %" PRIu64, chunk_id);
+
+       /*
+        * Iterate over all the streams in the session and check if they are
+        * still waiting for data to perform their rotation.
+        */
+       rcu_read_lock();
+       cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
+                       node.node) {
+               if (!stream_get(stream)) {
+                       continue;
+               }
+               if (stream->trace->session != session) {
+                       stream_put(stream);
+                       continue;
+               }
+               pthread_mutex_lock(&stream->lock);
+               if (stream->rotate_at_seq_num != -1ULL) {
+                       /* We have not yet performed the rotation. */
+                       rotate_pending = true;
+                       DBG("Stream %" PRIu64 " is still rotating",
+                                       stream->stream_handle);
+               } else if (stream->chunk_id < chunk_id) {
+                       /*
+                        * Stream closed on the consumer but still active on the
+                        * relay.
+                        */
+                       rotate_pending = true;
+                       DBG("Stream %" PRIu64 " did not exist on the consumer "
+                                       "when the last rotation started, but is"
+                                       "still waiting for data before getting"
+                                       "closed",
+                                       stream->stream_handle);
+               }
+               pthread_mutex_unlock(&stream->lock);
+               stream_put(stream);
+               if (rotate_pending) {
+                       goto send_reply;
+               }
+       }
+
+send_reply:
+       rcu_read_unlock();
+       memset(&reply, 0, sizeof(reply));
+       reply.ret_code = htobe32(rotate_pending ? 1 : 0);
+       network_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
+                       sizeof(reply), 0);
+       if (network_ret < (ssize_t) sizeof(reply)) {
+               ERR("Relay rotate pending ret code failed");
+               ret = -1;
+       }
+
+end_no_reply:
+       return ret;
+}
+
 /*
  * Process the commands received on the control socket
  */
@@ -2843,6 +2949,9 @@ static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
        case RELAYD_ROTATE_RENAME:
                ret = relay_rotate_rename(recv_hdr, conn);
                break;
+       case RELAYD_ROTATE_PENDING:
+               ret = relay_rotate_pending(recv_hdr, conn);
+               break;
        case RELAYD_MKDIR:
                ret = relay_mkdir(recv_hdr, conn);
                break;
This page took 0.024379 seconds and 4 git commands to generate.