Standardize quit pipes behavior
[lttng-tools.git] / src / bin / lttng-relayd / live.cpp
index 57b55587d2fb281cc46a4f9c13e08c9dadff9c1d..4724a3adaab0bcbfe5d64dfc44245b57c0f202da 100644 (file)
 #include <urcu/uatomic.h>
 #include <string>
 
-#include <common/common.h>
-#include <common/compat/endian.h>
-#include <common/compat/poll.h>
-#include <common/compat/socket.h>
-#include <common/defaults.h>
-#include <common/fd-tracker/utils.h>
-#include <common/fs-handle.h>
-#include <common/futex.h>
-#include <common/index/index.h>
-#include <common/sessiond-comm/inet.h>
-#include <common/sessiond-comm/relayd.h>
-#include <common/sessiond-comm/sessiond-comm.h>
-#include <common/uri.h>
-#include <common/utils.h>
+#include <common/common.hpp>
+#include <common/compat/endian.hpp>
+#include <common/compat/poll.hpp>
+#include <common/compat/socket.hpp>
+#include <common/defaults.hpp>
+#include <common/fd-tracker/utils.hpp>
+#include <common/fs-handle.hpp>
+#include <common/futex.hpp>
+#include <common/index/index.hpp>
+#include <common/sessiond-comm/inet.hpp>
+#include <common/sessiond-comm/relayd.hpp>
+#include <common/sessiond-comm/sessiond-comm.hpp>
+#include <common/uri.hpp>
+#include <common/utils.hpp>
 #include <lttng/lttng.h>
 
-#include "cmd.h"
-#include "connection.h"
-#include "ctf-trace.h"
-#include "health-relayd.h"
-#include "live.h"
-#include "lttng-relayd.h"
-#include "session.h"
-#include "stream.h"
-#include "testpoint.h"
-#include "utils.h"
-#include "viewer-session.h"
-#include "viewer-stream.h"
+#include "cmd.hpp"
+#include "connection.hpp"
+#include "ctf-trace.hpp"
+#include "health-relayd.hpp"
+#include "live.hpp"
+#include "lttng-relayd.hpp"
+#include "session.hpp"
+#include "stream.hpp"
+#include "testpoint.hpp"
+#include "utils.hpp"
+#include "viewer-session.hpp"
+#include "viewer-stream.hpp"
 
 #define SESSION_BUF_DEFAULT_COUNT      16
 
@@ -160,6 +160,24 @@ const char *lttng_viewer_attach_return_code_str(
        }
 };
 
+static
+const char *lttng_viewer_get_packet_return_code_str(
+               enum lttng_viewer_get_packet_return_code code)
+{
+       switch (code) {
+       case LTTNG_VIEWER_GET_PACKET_OK:
+               return "GET_PACKET_OK";
+       case LTTNG_VIEWER_GET_PACKET_RETRY:
+               return "GET_PACKET_RETRY";
+       case LTTNG_VIEWER_GET_PACKET_ERR:
+               return "GET_PACKET_ERR";
+       case LTTNG_VIEWER_GET_PACKET_EOF:
+               return "GET_PACKET_EOF";
+       default:
+               abort();
+       }
+};
+
 /*
  * Cleanup the daemon
  */
@@ -446,7 +464,7 @@ static int make_viewer_streams(struct relay_session *relay_session,
                                 * chunk can be used safely.
                                 */
                                if ((relay_stream->ongoing_rotation.is_set ||
-                                                   relay_session->ongoing_rotation) &&
+                                               session_has_ongoing_rotation(relay_session)) &&
                                                relay_stream->trace_chunk) {
                                        viewer_stream_trace_chunk = lttng_trace_chunk_copy(
                                                        relay_stream->trace_chunk);
@@ -572,54 +590,6 @@ int relayd_live_stop(void)
        return 0;
 }
 
-/*
- * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set.
- */
-static
-int create_named_thread_poll_set(struct lttng_poll_event *events,
-               int size, const char *name)
-{
-       int ret;
-
-       if (events == NULL || size == 0) {
-               ret = -1;
-               goto error;
-       }
-
-       ret = fd_tracker_util_poll_create(the_fd_tracker,
-                       name, events, 1, LTTNG_CLOEXEC);
-       if (ret) {
-               PERROR("Failed to create \"%s\" poll file descriptor", name);
-               goto error;
-       }
-
-       /* Add quit pipe */
-       ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR);
-       if (ret < 0) {
-               goto error;
-       }
-
-       return 0;
-
-error:
-       return ret;
-}
-
-/*
- * Check if the thread quit pipe was triggered.
- *
- * Return 1 if it was triggered else 0;
- */
-static
-int check_thread_quit_pipe(int fd, uint32_t events)
-{
-       if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) {
-               return 1;
-       }
-
-       return 0;
-}
-
 static
 int create_sock(void *data, int *out_fd)
 {
@@ -637,7 +607,7 @@ end:
 }
 
 static
-int close_sock(void *data, int *in_fd)
+int close_sock(void *data, int *in_fd __attribute__((unused)))
 {
        struct lttcomm_sock *sock = (lttcomm_sock *) data;
 
@@ -748,10 +718,10 @@ error:
  * This thread manages the listening for new connections on the network
  */
 static
-void *thread_listener(void *data)
+void *thread_listener(void *data __attribute__((unused)))
 {
-       int i, ret, pollfd, err = -1;
-       uint32_t revents, nb_fd;
+       int i, ret, err = -1;
+       uint32_t nb_fd;
        struct lttng_poll_event events;
        struct lttcomm_sock *live_control_sock;
 
@@ -808,15 +778,15 @@ restart:
 
                DBG("Relay new viewer connection received");
                for (i = 0; i < nb_fd; i++) {
-                       health_code_update();
-
                        /* Fetch once the poll data */
-                       revents = LTTNG_POLL_GETEV(&events, i);
-                       pollfd = LTTNG_POLL_GETFD(&events, i);
+                       const auto revents = LTTNG_POLL_GETEV(&events, i);
+                       const auto pollfd = LTTNG_POLL_GETFD(&events, i);
 
-                       /* Thread quit pipe has been closed. Killing thread. */
-                       ret = check_thread_quit_pipe(pollfd, revents);
-                       if (ret) {
+                       health_code_update();
+
+                       /* Activity on thread quit pipe, exiting. */
+                       if (relayd_is_thread_quit_pipe(pollfd)) {
+                               DBG("Activity on thread quit pipe");
                                err = 0;
                                goto exit;
                        }
@@ -913,7 +883,7 @@ error_sock_control:
  * This thread manages the dispatching of the requests to worker threads
  */
 static
-void *thread_dispatcher(void *data)
+void *thread_dispatcher(void *data __attribute__((unused)))
 {
        int err = -1;
        ssize_t ret;
@@ -952,7 +922,7 @@ void *thread_dispatcher(void *data)
                                /* Continue thread execution */
                                break;
                        }
-                       conn = caa_container_of(node, struct relay_connection, qnode);
+                       conn = lttng::utils::container_of(node, &relay_connection::qnode);
                        DBG("Dispatching viewer request waiting on sock %d",
                                        conn->sock->fd);
 
@@ -1094,7 +1064,7 @@ int viewer_list_sessions(struct relay_connection *conn)
        uint32_t buf_count = SESSION_BUF_DEFAULT_COUNT;
        uint32_t count = 0;
 
-       send_session_buf = (lttng_viewer_session *) zmalloc(SESSION_BUF_DEFAULT_COUNT * sizeof(*send_session_buf));
+       send_session_buf = calloc<lttng_viewer_session>(SESSION_BUF_DEFAULT_COUNT);
        if (!send_session_buf) {
                return -1;
        }
@@ -1238,7 +1208,7 @@ int viewer_get_new_streams(struct relay_connection *conn)
         * stream, because the chunk can be in an intermediate state
         * due to directory renaming.
         */
-       if (session->ongoing_rotation) {
+       if (session_has_ongoing_rotation(session)) {
                DBG("Relay session %" PRIu64 " rotation ongoing", session_id);
                response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_NO_NEW);
                goto send_reply_unlock;
@@ -1248,7 +1218,12 @@ int viewer_get_new_streams(struct relay_connection *conn)
                        LTTNG_VIEWER_SEEK_BEGINNING, &nb_total, &nb_unsent,
                        &nb_created, &closed);
        if (ret < 0) {
-               goto error_unlock_session;
+               /*
+                * This is caused by an internal error; propagate the negative
+                * 'ret' to close the connection.
+                */
+               response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
+               goto send_reply_unlock;
        }
        send_streams = 1;
        response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
@@ -1303,10 +1278,6 @@ end_put_session:
        }
 error:
        return ret;
-error_unlock_session:
-       pthread_mutex_unlock(&session->lock);
-       session_put(session);
-       return ret;
 }
 
 /*
@@ -1398,7 +1369,7 @@ int viewer_attach_session(struct relay_connection *conn)
         * stream, because the chunk can be in an intermediate state
         * due to directory renaming.
         */
-       if (session->ongoing_rotation) {
+       if (session_has_ongoing_rotation(session)) {
                DBG("Relay session %" PRIu64 " rotation ongoing", session_id);
                send_streams = 0;
                goto send_reply;
@@ -1770,7 +1741,7 @@ int viewer_get_next_index(struct relay_connection *conn)
                goto send_reply;
        }
 
-       if (rstream->trace->session->ongoing_rotation) {
+       if (session_has_ongoing_rotation(rstream->trace->session)) {
                /* Rotation is ongoing, try again later. */
                viewer_index.status = LTTNG_VIEWER_INDEX_RETRY;
                DBG("Client requested index for stream id %" PRIu64" while a session rotation is ongoing, returning status=%s",
@@ -2069,6 +2040,7 @@ int viewer_get_packet(struct relay_connection *conn)
        uint32_t packet_data_len = 0;
        ssize_t read_len;
        uint64_t stream_id;
+       enum lttng_viewer_get_packet_return_code get_packet_status;
 
        health_code_update();
 
@@ -2085,19 +2057,21 @@ int viewer_get_packet(struct relay_connection *conn)
 
        vstream = viewer_stream_get_by_id(stream_id);
        if (!vstream) {
-               DBG("Client requested packet of unknown stream id %" PRIu64,
-                               stream_id);
-               reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
+               get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR;
+               DBG("Client requested packet of unknown stream id %" PRIu64
+                       ", returning status=%s", stream_id,
+                       lttng_viewer_get_packet_return_code_str(get_packet_status));
                goto send_reply_nolock;
        } else {
                packet_data_len = be32toh(get_packet_info.len);
                reply_size += packet_data_len;
        }
 
-       reply = (char *) zmalloc(reply_size);
+       reply = zmalloc<char>(reply_size);
        if (!reply) {
-               PERROR("packet reply zmalloc");
-               reply_size = sizeof(reply_header);
+               get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR;
+               PERROR("Falled to allocate reply, returning status=%s",
+                       lttng_viewer_get_packet_return_code_str(get_packet_status));
                goto error;
        }
 
@@ -2105,29 +2079,31 @@ int viewer_get_packet(struct relay_connection *conn)
        lseek_ret = fs_handle_seek(vstream->stream_file.handle,
                        be64toh(get_packet_info.offset), SEEK_SET);
        if (lseek_ret < 0) {
+               get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR;
                PERROR("Failed to seek file system handle of viewer stream %" PRIu64
-                      " to offset %" PRIu64,
-                               stream_id,
-                               (uint64_t) be64toh(get_packet_info.offset));
+                      " to offset %" PRIu64", returning status=%s", stream_id,
+                       (uint64_t) be64toh(get_packet_info.offset),
+                       lttng_viewer_get_packet_return_code_str(get_packet_status));
                goto error;
        }
        read_len = fs_handle_read(vstream->stream_file.handle,
                        reply + sizeof(reply_header), packet_data_len);
        if (read_len < packet_data_len) {
+               get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR;
                PERROR("Failed to read from file system handle of viewer stream id %" PRIu64
-                      ", offset: %" PRIu64,
-                               stream_id,
-                               (uint64_t) be64toh(get_packet_info.offset));
+                      ", offset: %" PRIu64 ", returning status=%s", stream_id,
+                      (uint64_t) be64toh(get_packet_info.offset),
+                       lttng_viewer_get_packet_return_code_str(get_packet_status));
                goto error;
        }
-       reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK);
+
+       get_packet_status = LTTNG_VIEWER_GET_PACKET_OK;
        reply_header.len = htobe32(packet_data_len);
        goto send_reply;
 
 error:
        /* No payload to send on error. */
        reply_size = sizeof(reply_header);
-       reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
 
 send_reply:
        if (vstream) {
@@ -2137,6 +2113,7 @@ send_reply_nolock:
 
        health_code_update();
 
+       reply_header.status = htobe32(get_packet_status);
        if (reply) {
                memcpy(reply, &reply_header, sizeof(reply_header));
                ret = send_response(conn->sock, reply, reply_size);
@@ -2353,7 +2330,7 @@ int viewer_get_metadata(struct relay_connection *conn)
        }
 
        reply.len = htobe64(len);
-       data = (char *) zmalloc(len);
+       data = zmalloc<char>(len);
        if (!data) {
                PERROR("viewer metadata zmalloc");
                goto error;
@@ -2637,7 +2614,7 @@ void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
  * This thread does the actual work
  */
 static
-void *thread_worker(void *data)
+void *thread_worker(void *data __attribute__((unused)))
 {
        int ret, err = -1;
        uint32_t nb_fd;
@@ -2704,14 +2681,14 @@ restart:
                 */
                for (i = 0; i < nb_fd; i++) {
                        /* Fetch once the poll data */
-                       uint32_t revents = LTTNG_POLL_GETEV(&events, i);
-                       int pollfd = LTTNG_POLL_GETFD(&events, i);
+                       const auto revents = LTTNG_POLL_GETEV(&events, i);
+                       const auto pollfd = LTTNG_POLL_GETFD(&events, i);
 
                        health_code_update();
 
-                       /* Thread quit pipe has been closed. Killing thread. */
-                       ret = check_thread_quit_pipe(pollfd, revents);
-                       if (ret) {
+                       /* Activity on thread quit pipe, exiting. */
+                       if (relayd_is_thread_quit_pipe(pollfd)) {
+                               DBG("Activity on thread quit pipe");
                                err = 0;
                                goto exit;
                        }
This page took 0.028684 seconds and 4 git commands to generate.