Fix: make sure no index is in flight before using inactivity beacons
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index 5b41679cba9c7310f7513e89d1f7483f9060f850..75f2d91931749dd78e72121a7130f0f78a324478 100644 (file)
@@ -251,8 +251,9 @@ int parse_args(int argc, char **argv)
 
        /* assign default values */
        if (control_uri == NULL) {
-               ret = asprintf(&default_address, "tcp://0.0.0.0:%d",
-                               DEFAULT_NETWORK_CONTROL_PORT);
+               ret = asprintf(&default_address,
+                       "tcp://" DEFAULT_NETWORK_CONTROL_BIND_ADDRESS ":%d",
+                       DEFAULT_NETWORK_CONTROL_PORT);
                if (ret < 0) {
                        PERROR("asprintf default data address");
                        goto exit;
@@ -266,8 +267,9 @@ int parse_args(int argc, char **argv)
                }
        }
        if (data_uri == NULL) {
-               ret = asprintf(&default_address, "tcp://0.0.0.0:%d",
-                               DEFAULT_NETWORK_DATA_PORT);
+               ret = asprintf(&default_address,
+                       "tcp://" DEFAULT_NETWORK_DATA_BIND_ADDRESS ":%d",
+                       DEFAULT_NETWORK_DATA_PORT);
                if (ret < 0) {
                        PERROR("asprintf default data address");
                        goto exit;
@@ -281,8 +283,9 @@ int parse_args(int argc, char **argv)
                }
        }
        if (live_uri == NULL) {
-               ret = asprintf(&default_address, "tcp://0.0.0.0:%d",
-                               DEFAULT_NETWORK_VIEWER_PORT);
+               ret = asprintf(&default_address,
+                       "tcp://" DEFAULT_NETWORK_VIEWER_BIND_ADDRESS ":%d",
+                       DEFAULT_NETWORK_VIEWER_PORT);
                if (ret < 0) {
                        PERROR("asprintf default viewer control address");
                        goto exit;
@@ -593,7 +596,7 @@ static void try_close_stream(struct relay_session *session,
        pthread_mutex_unlock(&session->viewer_ready_lock);
 
        ret = stream_close(session, stream);
-       if (!ret) {
+       if (ret || session->snapshot) {
                /* Already close thus the ctf trace is being or has been destroyed. */
                goto end;
        }
@@ -1062,6 +1065,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
        stream->session_id = session->id;
        stream->index_fd = -1;
        stream->read_index_fd = -1;
+       stream->ctf_stream_id = -1ULL;
        lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
        pthread_mutex_init(&stream->lock, NULL);
 
@@ -1125,6 +1129,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
                        stream->stream_handle);
 
 end:
+       memset(&reply, 0, sizeof(reply));
        reply.handle = htobe64(stream->stream_handle);
        /* send the session id to the client or a negative return code on error */
        if (ret < 0) {
@@ -1206,6 +1211,7 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
 end_unlock:
        rcu_read_unlock();
 
+       memset(&reply, 0, sizeof(reply));
        if (ret < 0) {
                reply.ret_code = htobe32(LTTNG_ERR_UNK);
        } else {
@@ -1231,6 +1237,7 @@ void relay_unknown_command(struct relay_connection *conn)
        struct lttcomm_relayd_generic_reply reply;
        int ret;
 
+       memset(&reply, 0, sizeof(reply));
        reply.ret_code = htobe32(LTTNG_ERR_UNK);
        ret = conn->sock->ops->sendmsg(conn->sock, &reply,
                        sizeof(struct lttcomm_relayd_generic_reply), 0);
@@ -1256,6 +1263,7 @@ int relay_start(struct lttcomm_relayd_hdr *recv_hdr,
                ret = htobe32(LTTNG_ERR_UNK);
        }
 
+       memset(&reply, 0, sizeof(reply));
        reply.ret_code = ret;
        ret = conn->sock->ops->sendmsg(conn->sock, &reply,
                        sizeof(struct lttcomm_relayd_generic_reply), 0);
@@ -1417,6 +1425,7 @@ int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr,
                goto end;
        }
 
+       memset(&reply, 0, sizeof(reply));
        reply.major = RELAYD_VERSION_COMM_MAJOR;
        reply.minor = RELAYD_VERSION_COMM_MINOR;
 
@@ -1516,6 +1525,7 @@ int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
 end_unlock:
        rcu_read_unlock();
 
+       memset(&reply, 0, sizeof(reply));
        reply.ret_code = htobe32(ret);
        ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
        if (ret < 0) {
@@ -1579,6 +1589,7 @@ int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr,
        }
        rcu_read_unlock();
 
+       memset(&reply, 0, sizeof(reply));
        reply.ret_code = htobe32(LTTNG_OK);
        ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
        if (ret < 0) {
@@ -1649,6 +1660,7 @@ int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
        }
        rcu_read_unlock();
 
+       memset(&reply, 0, sizeof(reply));
        /* All good, send back reply. */
        reply.ret_code = htobe32(LTTNG_OK);
 
@@ -1713,7 +1725,7 @@ int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
        cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
                        node.node) {
                if (stream->session_id == session_id &&
-                               !stream->data_pending_check_done) {
+                               !stream->data_pending_check_done && !stream->terminated_flag) {
                        is_data_inflight = 1;
                        DBG("Data is still in flight for stream %" PRIu64,
                                        stream->stream_handle);
@@ -1722,6 +1734,7 @@ int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
        }
        rcu_read_unlock();
 
+       memset(&reply, 0, sizeof(reply));
        /* All good, send back reply. */
        reply.ret_code = htobe32(is_data_inflight);
 
@@ -1789,9 +1802,10 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
                DBG("Received live beacon for stream %" PRIu64, stream->stream_handle);
 
                /*
-                * Only flag a stream inactive when it has already received data.
+                * Only flag a stream inactive when it has already received data
+                * and no indexes are in flight.
                 */
-               if (stream->total_index_received > 0) {
+               if (stream->total_index_received > 0 && stream->indexes_in_flight == 0) {
                        stream->beacon_ts_end = be64toh(index_info.timestamp_end);
                }
                ret = 0;
@@ -1808,9 +1822,13 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
                        goto end_rcu_unlock;
                }
                index_created = 1;
+               stream->indexes_in_flight++;
        }
 
        copy_index_control_data(index, &index_info);
+       if (stream->ctf_stream_id == -1ULL) {
+               stream->ctf_stream_id = be64toh(index_info.stream_id);
+       }
 
        if (index_created) {
                /*
@@ -1830,27 +1848,19 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
 
        /* Do we have a writable ready index to write on disk. */
        if (wr_index) {
-               /* Starting at 2.4, create the index file if none available. */
-               if (conn->minor >= 4 && stream->index_fd < 0) {
-                       ret = index_create_file(stream->path_name, stream->channel_name,
-                                       relayd_uid, relayd_gid, stream->tracefile_size,
-                                       stream->tracefile_count_current);
-                       if (ret < 0) {
-                               goto end_rcu_unlock;
-                       }
-                       stream->index_fd = ret;
-               }
-
                ret = relay_index_write(wr_index->fd, wr_index);
                if (ret < 0) {
                        goto end_rcu_unlock;
                }
                stream->total_index_received++;
+               stream->indexes_in_flight--;
+               assert(stream->indexes_in_flight >= 0);
        }
 
 end_rcu_unlock:
        rcu_read_unlock();
 
+       memset(&reply, 0, sizeof(reply));
        if (ret < 0) {
                reply.ret_code = htobe32(LTTNG_ERR_UNK);
        } else {
@@ -1897,8 +1907,11 @@ int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr,
        /*
         * Inform the viewer that there are new streams in the session.
         */
-       uatomic_set(&conn->session->new_streams, 1);
+       if (conn->session->viewer_refcount) {
+               uatomic_set(&conn->session->new_streams, 1);
+       }
 
+       memset(&reply, 0, sizeof(reply));
        reply.ret_code = htobe32(LTTNG_OK);
        send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
        if (send_ret < 0) {
@@ -2005,6 +2018,7 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
                        goto error;
                }
                index_created = 1;
+               stream->indexes_in_flight++;
        }
 
        if (rotate_index || stream->index_fd < 0) {
@@ -2047,6 +2061,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
                        goto error;
                }
                stream->total_index_received++;
+               stream->indexes_in_flight--;
+               assert(stream->indexes_in_flight >= 0);
        }
 
 error:
@@ -2248,7 +2264,7 @@ static void destroy_connection(struct lttng_ht *relay_connections_ht,
        connection_delete(relay_connections_ht, conn);
 
        /* For the control socket, we try to destroy the session. */
-       if (conn->type == RELAY_CONTROL) {
+       if (conn->type == RELAY_CONTROL && conn->session) {
                destroy_session(conn->session, conn->sessions_ht);
        }
 
This page took 0.028361 seconds and 4 git commands to generate.