Refactor consumerd main/cleanup
[lttng-tools.git] / src / bin / lttng-relayd / live.c
index d29804cd624f03795e78b706e11af58f7b3b8ebc..93d458b1f0f24e813f68b7f1ac312fa6ffdc335c 100644 (file)
@@ -17,6 +17,7 @@
  */
 
 #define _GNU_SOURCE
+#define _LGPL_SOURCE
 #include <getopt.h>
 #include <grp.h>
 #include <limits.h>
@@ -43,6 +44,7 @@
 #include <common/common.h>
 #include <common/compat/poll.h>
 #include <common/compat/socket.h>
+#include <common/compat/endian.h>
 #include <common/defaults.h>
 #include <common/futex.h>
 #include <common/index/index.h>
@@ -557,11 +559,12 @@ restart:
                                new_conn->sock = newsock;
 
                                /* Enqueue request for the dispatcher thread. */
-                               cds_wfq_enqueue(&viewer_conn_queue.queue, &new_conn->qnode);
+                               cds_wfcq_enqueue(&viewer_conn_queue.head, &viewer_conn_queue.tail,
+                                                &new_conn->qnode);
 
                                /*
                                 * Wake the dispatch queue futex. Implicit memory barrier with
-                                * the exchange in cds_wfq_enqueue.
+                                * the exchange in cds_wfcq_enqueue.
                                 */
                                futex_nto1_wake(&viewer_conn_queue.futex);
                        }
@@ -600,7 +603,7 @@ void *thread_dispatcher(void *data)
 {
        int err = -1;
        ssize_t ret;
-       struct cds_wfq_node *node;
+       struct cds_wfcq_node *node;
        struct relay_connection *conn = NULL;
 
        DBG("[thread] Live viewer relay dispatcher started");
@@ -623,7 +626,8 @@ void *thread_dispatcher(void *data)
                        health_code_update();
 
                        /* Dequeue commands */
-                       node = cds_wfq_dequeue_blocking(&viewer_conn_queue.queue);
+                       node = cds_wfcq_dequeue_blocking(&viewer_conn_queue.head,
+                                                        &viewer_conn_queue.tail);
                        if (node == NULL) {
                                DBG("Woken up but nothing in the live-viewer "
                                                "relay command queue");
@@ -800,14 +804,9 @@ int viewer_list_sessions(struct relay_connection *conn)
        }
        health_code_update();
 
-       rcu_read_unlock();
        ret = 0;
-       goto end;
-
 end_unlock:
        rcu_read_unlock();
-
-end:
        return ret;
 }
 
@@ -931,6 +930,8 @@ int viewer_get_new_streams(struct relay_connection *conn)
 
        health_code_update();
 
+       memset(&response, 0, sizeof(response));
+
        rcu_read_lock();
        session = session_find_by_id(conn->sessions_ht, session_id);
        if (!session) {
@@ -1032,6 +1033,8 @@ int viewer_attach_session(struct relay_connection *conn)
 
        health_code_update();
 
+       memset(&response, 0, sizeof(response));
+
        if (!conn->viewer_session) {
                DBG("Client trying to attach before creating a live viewer session");
                response.status = htobe32(LTTNG_VIEWER_ATTACH_NO_SESSION);
@@ -1201,6 +1204,7 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                                 */
                                index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE);
                                index->timestamp_end = htobe64(rstream->beacon_ts_end);
+                               index->stream_id = htobe64(rstream->ctf_stream_id);
                                goto index_ready;
                        } else if (rstream->total_index_received <= vstream->last_sent_index
                                        && !vstream->close_write_flag) {
@@ -1314,7 +1318,7 @@ int viewer_get_next_index(struct relay_connection *conn)
        ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index);
        pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
        if (ret < 0) {
-               goto end;
+               goto end_unlock;
        } else if (ret == 1) {
                /*
                 * This means the viewer index data structure has been populated by the
@@ -1337,11 +1341,13 @@ int viewer_get_next_index(struct relay_connection *conn)
                viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
        }
 
+       pthread_mutex_lock(&rstream->viewer_stream_rotation_lock);
        pthread_mutex_lock(&vstream->overwrite_lock);
        if (vstream->abort_flag) {
                /* The file is being overwritten by the writer, we cannot use it. */
                pthread_mutex_unlock(&vstream->overwrite_lock);
                ret = viewer_stream_rotate(vstream, rstream);
+               pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
                if (ret < 0) {
                        goto end_unlock;
                } else if (ret == 1) {
@@ -1357,6 +1363,7 @@ int viewer_get_next_index(struct relay_connection *conn)
        read_ret = lttng_read(vstream->index_read_fd, &packet_index,
                        sizeof(packet_index));
        pthread_mutex_unlock(&vstream->overwrite_lock);
+       pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
        if (read_ret < 0) {
                viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
                viewer_stream_delete(vstream);
@@ -1615,6 +1622,8 @@ int viewer_get_metadata(struct relay_connection *conn)
        }
        health_code_update();
 
+       memset(&reply, 0, sizeof(reply));
+
        rcu_read_lock();
        stream = viewer_stream_find_by_id(be64toh(request.stream_id));
        if (!stream || !stream->metadata_flag) {
@@ -1715,6 +1724,7 @@ int viewer_create_session(struct relay_connection *conn)
 
        DBG("Viewer create session received");
 
+       memset(&resp, 0, sizeof(resp));
        resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_OK);
        conn->viewer_session = zmalloc(sizeof(*conn->viewer_session));
        if (!conn->viewer_session) {
@@ -1746,6 +1756,7 @@ void live_relay_unknown_command(struct relay_connection *conn)
 {
        struct lttcomm_relayd_generic_reply reply;
 
+       memset(&reply, 0, sizeof(reply));
        reply.ret_code = htobe32(LTTNG_ERR_UNK);
        (void) send_response(conn->sock, &reply, sizeof(reply));
 }
@@ -2100,7 +2111,7 @@ int live_start_threads(struct lttng_uri *uri,
        }
 
        /* Init relay command queue. */
-       cds_wfq_init(&viewer_conn_queue.queue);
+       cds_wfcq_init(&viewer_conn_queue.head, &viewer_conn_queue.tail);
 
        /* Set up max poll set size */
        lttng_poll_set_max_size();
This page took 0.024655 seconds and 4 git commands to generate.