X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.cpp;h=ef830d4ba0d4daa7a391f5c6d6e06753fad0bb78;hp=5aa899b282605a0c058f7f497f9b500ad7e0226b;hb=HEAD;hpb=0283afa961253376a5e18de140d29fffd9e89ae3 diff --git a/src/bin/lttng-relayd/live.cpp b/src/bin/lttng-relayd/live.cpp index 5aa899b28..699778d0e 100644 --- a/src/bin/lttng-relayd/live.cpp +++ b/src/bin/lttng-relayd/live.cpp @@ -8,6 +8,37 @@ */ #define _LGPL_SOURCE +#include "cmd.hpp" +#include "connection.hpp" +#include "ctf-trace.hpp" +#include "health-relayd.hpp" +#include "live.hpp" +#include "lttng-relayd.hpp" +#include "session.hpp" +#include "stream.hpp" +#include "testpoint.hpp" +#include "utils.hpp" +#include "viewer-session.hpp" +#include "viewer-stream.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + #include #include #include @@ -18,6 +49,7 @@ #include #include #include +#include #include #include #include @@ -29,38 +61,8 @@ #include #include #include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "cmd.h" -#include "connection.h" -#include "ctf-trace.h" -#include "health-relayd.h" -#include "live.h" -#include "lttng-relayd.h" -#include "session.h" -#include "stream.h" -#include "testpoint.h" -#include "utils.h" -#include "viewer-session.h" -#include "viewer-stream.h" - -#define SESSION_BUF_DEFAULT_COUNT 16 +#define SESSION_BUF_DEFAULT_COUNT 16 static struct lttng_uri *live_uri; @@ -86,11 +88,9 @@ static pthread_t live_worker_thread; static struct relay_conn_queue viewer_conn_queue; static uint64_t last_relay_viewer_session_id; -static pthread_mutex_t last_relay_viewer_session_id_lock = - PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t last_relay_viewer_session_id_lock = PTHREAD_MUTEX_INITIALIZER; -static -const char *lttng_viewer_command_str(lttng_viewer_command cmd) +static const char *lttng_viewer_command_str(lttng_viewer_command cmd) { switch (cmd) { case LTTNG_VIEWER_CONNECT: @@ -116,9 +116,8 @@ const char *lttng_viewer_command_str(lttng_viewer_command cmd) } } -static -const char *lttng_viewer_next_index_return_code_str( - enum lttng_viewer_next_index_return_code code) +static const char * +lttng_viewer_next_index_return_code_str(enum lttng_viewer_next_index_return_code code) { switch (code) { case LTTNG_VIEWER_INDEX_OK: @@ -138,9 +137,7 @@ const char *lttng_viewer_next_index_return_code_str( } } -static -const char *lttng_viewer_attach_return_code_str( - enum lttng_viewer_attach_return_code code) +static const char *lttng_viewer_attach_return_code_str(enum lttng_viewer_attach_return_code code) { switch (code) { case LTTNG_VIEWER_ATTACH_OK: @@ -160,9 +157,8 @@ const char *lttng_viewer_attach_return_code_str( } }; -static -const char *lttng_viewer_get_packet_return_code_str( - enum lttng_viewer_get_packet_return_code code) +static const char * +lttng_viewer_get_packet_return_code_str(enum lttng_viewer_get_packet_return_code code) { switch (code) { case LTTNG_VIEWER_GET_PACKET_OK: @@ -181,8 +177,7 @@ const char *lttng_viewer_get_packet_return_code_str( /* * Cleanup the daemon */ -static -void cleanup_relayd_live(void) +static void cleanup_relayd_live() { DBG("Cleaning up"); @@ -196,8 +191,7 @@ void cleanup_relayd_live(void) * Return the size of the received message or else a negative value on error * with errno being set by recvmsg() syscall. */ -static -ssize_t recv_request(struct lttcomm_sock *sock, void *buf, size_t size) +static ssize_t recv_request(struct lttcomm_sock *sock, void *buf, size_t size) { ssize_t ret; @@ -222,8 +216,7 @@ ssize_t recv_request(struct lttcomm_sock *sock, void *buf, size_t size) * Return the size of the sent message or else a negative value on error with * errno being set by sendmsg() syscall. */ -static -ssize_t send_response(struct lttcomm_sock *sock, void *buf, size_t size) +static ssize_t send_response(struct lttcomm_sock *sock, void *buf, size_t size) { ssize_t ret; @@ -242,32 +235,36 @@ ssize_t send_response(struct lttcomm_sock *sock, void *buf, size_t size) * Returns 1 if new streams got added, 0 if nothing changed, a negative value * on error. */ -static -int check_new_streams(struct relay_connection *conn) +static int check_new_streams(struct relay_connection *conn) { struct relay_session *session; - unsigned long current_val; int ret = 0; if (!conn->viewer_session) { goto end; } - rcu_read_lock(); - cds_list_for_each_entry_rcu(session, - &conn->viewer_session->session_list, - viewer_session_node) { - if (!session_get(session)) { - continue; - } - current_val = uatomic_cmpxchg(&session->new_streams, 1, 0); - ret = current_val; - session_put(session); - if (ret == 1) { - goto end; + + { + lttng::urcu::read_lock_guard read_lock; + cds_list_for_each_entry_rcu( + session, &conn->viewer_session->session_list, viewer_session_node) + { + if (!session_get(session)) { + continue; + } + + ret = uatomic_read(&session->new_streams); + session_put(session); + if (ret == 1) { + goto end; + } } } + end: - rcu_read_unlock(); + DBG("Viewer connection has%s new streams: socket_fd = %d", + ret == 0 ? " no" : "", + conn->sock->fd); return ret; } @@ -277,73 +274,72 @@ end: * * Return 0 on success or else a negative value. */ -static -ssize_t send_viewer_streams(struct lttcomm_sock *sock, - uint64_t session_id, unsigned int ignore_sent_flag) +static ssize_t +send_viewer_streams(struct lttcomm_sock *sock, uint64_t session_id, unsigned int ignore_sent_flag) { ssize_t ret; struct lttng_ht_iter iter; struct relay_viewer_stream *vstream; - rcu_read_lock(); + { + lttng::urcu::read_lock_guard read_lock; - cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream, - stream_n.node) { - struct ctf_trace *ctf_trace; - struct lttng_viewer_stream send_stream = {}; + cds_lfht_for_each_entry ( + viewer_streams_ht->ht, &iter.iter, vstream, stream_n.node) { + struct ctf_trace *ctf_trace; + struct lttng_viewer_stream send_stream = {}; - health_code_update(); + health_code_update(); - if (!viewer_stream_get(vstream)) { - continue; - } + if (!viewer_stream_get(vstream)) { + continue; + } - pthread_mutex_lock(&vstream->stream->lock); - /* Ignore if not the same session. */ - if (vstream->stream->trace->session->id != session_id || - (!ignore_sent_flag && vstream->sent_flag)) { - pthread_mutex_unlock(&vstream->stream->lock); - viewer_stream_put(vstream); - continue; - } + pthread_mutex_lock(&vstream->stream->lock); + /* Ignore if not the same session. */ + if (vstream->stream->trace->session->id != session_id || + (!ignore_sent_flag && vstream->sent_flag)) { + pthread_mutex_unlock(&vstream->stream->lock); + viewer_stream_put(vstream); + continue; + } - ctf_trace = vstream->stream->trace; - send_stream.id = htobe64(vstream->stream->stream_handle); - send_stream.ctf_trace_id = htobe64(ctf_trace->id); - send_stream.metadata_flag = htobe32( - vstream->stream->is_metadata); - if (lttng_strncpy(send_stream.path_name, vstream->path_name, - sizeof(send_stream.path_name))) { - pthread_mutex_unlock(&vstream->stream->lock); - viewer_stream_put(vstream); - ret = -1; /* Error. */ - goto end_unlock; - } - if (lttng_strncpy(send_stream.channel_name, - vstream->channel_name, - sizeof(send_stream.channel_name))) { - pthread_mutex_unlock(&vstream->stream->lock); - viewer_stream_put(vstream); - ret = -1; /* Error. */ - goto end_unlock; - } + ctf_trace = vstream->stream->trace; + send_stream.id = htobe64(vstream->stream->stream_handle); + send_stream.ctf_trace_id = htobe64(ctf_trace->id); + send_stream.metadata_flag = htobe32(vstream->stream->is_metadata); + if (lttng_strncpy(send_stream.path_name, + vstream->path_name, + sizeof(send_stream.path_name))) { + pthread_mutex_unlock(&vstream->stream->lock); + viewer_stream_put(vstream); + ret = -1; /* Error. */ + goto end; + } + if (lttng_strncpy(send_stream.channel_name, + vstream->channel_name, + sizeof(send_stream.channel_name))) { + pthread_mutex_unlock(&vstream->stream->lock); + viewer_stream_put(vstream); + ret = -1; /* Error. */ + goto end; + } - DBG("Sending stream %" PRIu64 " to viewer", - vstream->stream->stream_handle); - vstream->sent_flag = 1; - pthread_mutex_unlock(&vstream->stream->lock); + DBG("Sending stream %" PRIu64 " to viewer", vstream->stream->stream_handle); + vstream->sent_flag = true; + pthread_mutex_unlock(&vstream->stream->lock); - ret = send_response(sock, &send_stream, sizeof(send_stream)); - viewer_stream_put(vstream); - if (ret < 0) { - goto end_unlock; + ret = send_response(sock, &send_stream, sizeof(send_stream)); + viewer_stream_put(vstream); + if (ret < 0) { + goto end; + } } } ret = 0; -end_unlock: - rcu_read_unlock(); +end: return ret; } @@ -359,17 +355,17 @@ end_unlock: * Return 0 on success or else a negative value. */ static int make_viewer_streams(struct relay_session *relay_session, - struct relay_viewer_session *viewer_session, - enum lttng_viewer_seek seek_t, - uint32_t *nb_total, - uint32_t *nb_unsent, - uint32_t *nb_created, - bool *closed) + struct relay_viewer_session *viewer_session, + enum lttng_viewer_seek seek_t, + uint32_t *nb_total, + uint32_t *nb_unsent, + uint32_t *nb_created, + bool *closed) { int ret; struct lttng_ht_iter iter; struct ctf_trace *ctf_trace; - struct relay_stream *relay_stream = NULL; + struct relay_stream *relay_stream = nullptr; LTTNG_ASSERT(relay_session); ASSERT_LOCKED(relay_session->lock); @@ -382,197 +378,201 @@ static int make_viewer_streams(struct relay_session *relay_session, * Create viewer streams for relay streams that are ready to be * used for a the given session id only. */ - rcu_read_lock(); - cds_lfht_for_each_entry (relay_session->ctf_traces_ht->ht, &iter.iter, - ctf_trace, node.node) { - bool trace_has_metadata_stream = false; + { + lttng::urcu::read_lock_guard read_lock; - health_code_update(); - - if (!ctf_trace_get(ctf_trace)) { - continue; - } + cds_lfht_for_each_entry ( + relay_session->ctf_traces_ht->ht, &iter.iter, ctf_trace, node.node) { + bool trace_has_metadata_stream = false; - /* - * Iterate over all the streams of the trace to see if we have a - * metadata stream. - */ - cds_list_for_each_entry_rcu(relay_stream, - &ctf_trace->stream_list, stream_node) - { - bool is_metadata_stream; - - pthread_mutex_lock(&relay_stream->lock); - is_metadata_stream = relay_stream->is_metadata; - pthread_mutex_unlock(&relay_stream->lock); + health_code_update(); - if (is_metadata_stream) { - trace_has_metadata_stream = true; - break; + if (!ctf_trace_get(ctf_trace)) { + continue; } - } - - relay_stream = NULL; - - /* - * If there is no metadata stream in this trace at the moment - * and we never sent one to the viewer, skip the trace. We - * accept that the viewer will not see this trace at all. - */ - if (!trace_has_metadata_stream && - !ctf_trace->metadata_stream_sent_to_viewer) { - ctf_trace_put(ctf_trace); - continue; - } - cds_list_for_each_entry_rcu(relay_stream, - &ctf_trace->stream_list, stream_node) - { - struct relay_viewer_stream *viewer_stream; - - if (!stream_get(relay_stream)) { - continue; + /* + * Iterate over all the streams of the trace to see if we have a + * metadata stream. + */ + cds_list_for_each_entry_rcu( + relay_stream, &ctf_trace->stream_list, stream_node) + { + bool is_metadata_stream; + + pthread_mutex_lock(&relay_stream->lock); + is_metadata_stream = relay_stream->is_metadata; + pthread_mutex_unlock(&relay_stream->lock); + + if (is_metadata_stream) { + trace_has_metadata_stream = true; + break; + } } - pthread_mutex_lock(&relay_stream->lock); + relay_stream = nullptr; + /* - * stream published is protected by the session lock. + * If there is no metadata stream in this trace at the moment + * and we never sent one to the viewer, skip the trace. We + * accept that the viewer will not see this trace at all. */ - if (!relay_stream->published) { - goto next; + if (!trace_has_metadata_stream && + !ctf_trace->metadata_stream_sent_to_viewer) { + ctf_trace_put(ctf_trace); + continue; } - viewer_stream = viewer_stream_get_by_id( - relay_stream->stream_handle); - if (!viewer_stream) { - struct lttng_trace_chunk *viewer_stream_trace_chunk = NULL; - /* - * Save that we sent the metadata stream to the - * viewer. So that we know what trace the viewer - * is aware of. - */ - if (relay_stream->is_metadata) { - ctf_trace->metadata_stream_sent_to_viewer = true; + cds_list_for_each_entry_rcu( + relay_stream, &ctf_trace->stream_list, stream_node) + { + struct relay_viewer_stream *viewer_stream; + + if (!stream_get(relay_stream)) { + continue; } + pthread_mutex_lock(&relay_stream->lock); /* - * If a rotation is ongoing, use a copy of the - * relay stream's chunk to ensure the stream - * files exist. - * - * Otherwise, the viewer session's current trace - * chunk can be used safely. + * stream published is protected by the session lock. */ - if ((relay_stream->ongoing_rotation.is_set || - session_has_ongoing_rotation(relay_session)) && - relay_stream->trace_chunk) { - viewer_stream_trace_chunk = lttng_trace_chunk_copy( - relay_stream->trace_chunk); - if (!viewer_stream_trace_chunk) { - ret = -1; - ctf_trace_put(ctf_trace); - goto error_unlock; - } - } else { + if (!relay_stream->published) { + goto next; + } + viewer_stream = + viewer_stream_get_by_id(relay_stream->stream_handle); + if (!viewer_stream) { + struct lttng_trace_chunk *viewer_stream_trace_chunk = + nullptr; + /* - * Transition the viewer session into the newest trace chunk available. + * Save that we sent the metadata stream to the + * viewer. So that we know what trace the viewer + * is aware of. */ - if (!lttng_trace_chunk_ids_equal(viewer_session->current_trace_chunk, - relay_stream->trace_chunk)) { + if (relay_stream->is_metadata) { + ctf_trace->metadata_stream_sent_to_viewer = true; + } - ret = viewer_session_set_trace_chunk_copy( - viewer_session, - relay_stream->trace_chunk); - if (ret) { + /* + * If a rotation is ongoing, use a copy of the + * relay stream's chunk to ensure the stream + * files exist. + * + * Otherwise, the viewer session's current trace + * chunk can be used safely. + */ + if ((relay_stream->ongoing_rotation.is_set || + session_has_ongoing_rotation(relay_session)) && + relay_stream->trace_chunk) { + viewer_stream_trace_chunk = lttng_trace_chunk_copy( + relay_stream->trace_chunk); + if (!viewer_stream_trace_chunk) { ret = -1; ctf_trace_put(ctf_trace); goto error_unlock; } - } - - if (relay_stream->trace_chunk) { + } else { /* - * If the corresponding relay - * stream's trace chunk is set, - * the viewer stream will be - * created under it. - * - * Note that a relay stream can - * have a NULL output trace - * chunk (for instance, after a - * clear against a stopped - * session). + * Transition the viewer session into the newest + * trace chunk available. */ - const bool reference_acquired = lttng_trace_chunk_get( - viewer_session->current_trace_chunk); + if (!lttng_trace_chunk_ids_equal( + viewer_session->current_trace_chunk, + relay_stream->trace_chunk)) { + ret = viewer_session_set_trace_chunk_copy( + viewer_session, + relay_stream->trace_chunk); + if (ret) { + ret = -1; + ctf_trace_put(ctf_trace); + goto error_unlock; + } + } - LTTNG_ASSERT(reference_acquired); - viewer_stream_trace_chunk = + if (relay_stream->trace_chunk) { + /* + * If the corresponding relay + * stream's trace chunk is set, + * the viewer stream will be + * created under it. + * + * Note that a relay stream can + * have a NULL output trace + * chunk (for instance, after a + * clear against a stopped + * session). + */ + const bool reference_acquired = + lttng_trace_chunk_get( + viewer_session + ->current_trace_chunk); + + LTTNG_ASSERT(reference_acquired); + viewer_stream_trace_chunk = viewer_session->current_trace_chunk; + } } - } - viewer_stream = viewer_stream_create( - relay_stream, - viewer_stream_trace_chunk, - seek_t); - lttng_trace_chunk_put(viewer_stream_trace_chunk); - viewer_stream_trace_chunk = NULL; - if (!viewer_stream) { - ret = -1; - ctf_trace_put(ctf_trace); - goto error_unlock; - } + viewer_stream = viewer_stream_create( + relay_stream, viewer_stream_trace_chunk, seek_t); + lttng_trace_chunk_put(viewer_stream_trace_chunk); + viewer_stream_trace_chunk = nullptr; + if (!viewer_stream) { + ret = -1; + ctf_trace_put(ctf_trace); + goto error_unlock; + } - if (nb_created) { - /* Update number of created stream counter. */ - (*nb_created)++; - } - /* - * Ensure a self-reference is preserved even - * after we have put our local reference. - */ - if (!viewer_stream_get(viewer_stream)) { - ERR("Unable to get self-reference on viewer stream, logic error."); - abort(); - } - } else { - if (!viewer_stream->sent_flag && nb_unsent) { - /* Update number of unsent stream counter. */ - (*nb_unsent)++; - } - } - /* Update number of total stream counter. */ - if (nb_total) { - if (relay_stream->is_metadata) { - if (!relay_stream->closed || - relay_stream->metadata_received > - viewer_stream->metadata_sent) { - (*nb_total)++; + if (nb_created) { + /* Update number of created stream counter. */ + (*nb_created)++; + } + /* + * Ensure a self-reference is preserved even + * after we have put our local reference. + */ + if (!viewer_stream_get(viewer_stream)) { + ERR("Unable to get self-reference on viewer stream, logic error."); + abort(); } } else { - if (!relay_stream->closed || - !(((int64_t)(relay_stream->prev_data_seq - - relay_stream->last_net_seq_num)) >= - 0)) { - (*nb_total)++; + if (!viewer_stream->sent_flag && nb_unsent) { + /* Update number of unsent stream counter. */ + (*nb_unsent)++; } } + /* Update number of total stream counter. */ + if (nb_total) { + if (relay_stream->is_metadata) { + if (!relay_stream->closed || + relay_stream->metadata_received > + viewer_stream->metadata_sent) { + (*nb_total)++; + } + } else { + if (!relay_stream->closed || + !(((int64_t) (relay_stream->prev_data_seq - + relay_stream->last_net_seq_num)) >= + 0)) { + (*nb_total)++; + } + } + } + /* Put local reference. */ + viewer_stream_put(viewer_stream); + next: + pthread_mutex_unlock(&relay_stream->lock); + stream_put(relay_stream); } - /* Put local reference. */ - viewer_stream_put(viewer_stream); - next: - pthread_mutex_unlock(&relay_stream->lock); - stream_put(relay_stream); + relay_stream = nullptr; + ctf_trace_put(ctf_trace); } - relay_stream = NULL; - ctf_trace_put(ctf_trace); } ret = 0; error_unlock: - rcu_read_unlock(); if (relay_stream) { pthread_mutex_unlock(&relay_stream->lock); @@ -582,7 +582,7 @@ error_unlock: return ret; } -int relayd_live_stop(void) +int relayd_live_stop() { /* Stop dispatch thread */ CMM_STORE_SHARED(live_dispatch_thread_exit, 1); @@ -590,56 +590,7 @@ int relayd_live_stop(void) return 0; } -/* - * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set. - */ -static -int create_named_thread_poll_set(struct lttng_poll_event *events, - int size, const char *name) -{ - int ret; - - if (events == NULL || size == 0) { - ret = -1; - goto error; - } - - ret = fd_tracker_util_poll_create(the_fd_tracker, - name, events, 1, LTTNG_CLOEXEC); - if (ret) { - PERROR("Failed to create \"%s\" poll file descriptor", name); - goto error; - } - - /* Add quit pipe */ - ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR); - if (ret < 0) { - goto error; - } - - return 0; - -error: - return ret; -} - -/* - * Check if the thread quit pipe was triggered. - * - * Return 1 if it was triggered else 0; - */ -static -int check_thread_quit_pipe(int fd, uint32_t events) -{ - if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) { - return 1; - } - - return 0; -} - -static -int create_sock(void *data, int *out_fd) +static int create_sock(void *data, int *out_fd) { int ret; struct lttcomm_sock *sock = (lttcomm_sock *) data; @@ -654,8 +605,7 @@ end: return ret; } -static -int close_sock(void *data, int *in_fd __attribute__((unused))) +static int close_sock(void *data, int *in_fd __attribute__((unused))) { struct lttcomm_sock *sock = (lttcomm_sock *) data; @@ -679,16 +629,14 @@ end: return ret; } -static -struct lttcomm_sock *accept_live_sock(struct lttcomm_sock *listening_sock, - const char *name) +static struct lttcomm_sock *accept_live_sock(struct lttcomm_sock *listening_sock, const char *name) { int out_fd, ret; - struct lttcomm_sock *socks[2] = { listening_sock, NULL }; - struct lttcomm_sock *new_sock = NULL; + struct lttcomm_sock *socks[2] = { listening_sock, nullptr }; + struct lttcomm_sock *new_sock = nullptr; - ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker, &out_fd, - (const char **) &name, 1, accept_sock, &socks); + ret = fd_tracker_open_unsuspendable_fd( + the_fd_tracker, &out_fd, (const char **) &name, 1, accept_sock, &socks); if (ret) { goto end; } @@ -701,16 +649,15 @@ end: /* * Create and init socket from uri. */ -static -struct lttcomm_sock *init_socket(struct lttng_uri *uri, const char *name) +static struct lttcomm_sock *init_socket(struct lttng_uri *uri, const char *name) { int ret, sock_fd; - struct lttcomm_sock *sock = NULL; + struct lttcomm_sock *sock = nullptr; char uri_str[LTTNG_PATH_MAX]; - char *formated_name = NULL; + char *formated_name = nullptr; sock = lttcomm_alloc_sock_from_uri(uri); - if (sock == NULL) { + if (sock == nullptr) { ERR("Allocating socket"); goto error; } @@ -722,19 +669,21 @@ struct lttcomm_sock *init_socket(struct lttng_uri *uri, const char *name) ret = uri_to_str_url(uri, uri_str, sizeof(uri_str)); uri_str[sizeof(uri_str) - 1] = '\0'; if (ret >= 0) { - ret = asprintf(&formated_name, "%s socket @ %s", name, - uri_str); + ret = asprintf(&formated_name, "%s socket @ %s", name, uri_str); if (ret < 0) { - formated_name = NULL; + formated_name = nullptr; } } - ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker, &sock_fd, - (const char **) (formated_name ? &formated_name : NULL), - 1, create_sock, sock); + ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker, + &sock_fd, + (const char **) (formated_name ? &formated_name : + nullptr), + 1, + create_sock, + sock); if (ret) { - PERROR("Failed to create \"%s\" socket", - formated_name ?: "Unknown"); + PERROR("Failed to create \"%s\" socket", formated_name ?: "Unknown"); goto error; } DBG("Listening on %s socket %d", name, sock->fd); @@ -748,7 +697,6 @@ struct lttcomm_sock *init_socket(struct lttng_uri *uri, const char *name) ret = sock->ops->listen(sock, -1); if (ret < 0) { goto error; - } free(formated_name); @@ -759,17 +707,16 @@ error: lttcomm_destroy_sock(sock); } free(formated_name); - return NULL; + return nullptr; } /* * This thread manages the listening for new connections on the network */ -static -void *thread_listener(void *data __attribute__((unused))) +static void *thread_listener(void *data __attribute__((unused))) { - int i, ret, pollfd, err = -1; - uint32_t revents, nb_fd; + int i, ret, err = -1; + uint32_t nb_fd; struct lttng_poll_event events; struct lttcomm_sock *live_control_sock; @@ -786,8 +733,7 @@ void *thread_listener(void *data __attribute__((unused))) } /* Pass 2 as size here for the thread quit pipe and control sockets. */ - ret = create_named_thread_poll_set(&events, 2, - "Live listener thread epoll"); + ret = create_named_thread_poll_set(&events, 2, "Live listener thread epoll"); if (ret < 0) { goto error_create_poll; } @@ -804,12 +750,12 @@ void *thread_listener(void *data __attribute__((unused))) goto error_testpoint; } - while (1) { + while (true) { health_code_update(); DBG("Listener accepting live viewers connections"); -restart: + restart: health_poll_entry(); ret = lttng_poll_wait(&events, -1); health_poll_exit(); @@ -826,15 +772,15 @@ restart: DBG("Relay new viewer connection received"); for (i = 0; i < nb_fd; i++) { - health_code_update(); - /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); + const auto revents = LTTNG_POLL_GETEV(&events, i); + const auto pollfd = LTTNG_POLL_GETFD(&events, i); + + health_code_update(); - /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); - if (ret) { + /* Activity on thread quit pipe, exiting. */ + if (relayd_is_thread_quit_pipe(pollfd)) { + DBG("Activity on thread quit pipe"); err = 0; goto exit; } @@ -851,15 +797,15 @@ restart: struct lttcomm_sock *newsock; newsock = accept_live_sock(live_control_sock, - "Live socket to client"); + "Live socket to client"); if (!newsock) { PERROR("accepting control sock"); goto error; } DBG("Relay viewer connection accepted socket %d", newsock->fd); - ret = setsockopt(newsock->fd, SOL_SOCKET, SO_REUSEADDR, &val, - sizeof(val)); + ret = setsockopt( + newsock->fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); if (ret < 0) { PERROR("setsockopt inet"); lttcomm_destroy_sock(newsock); @@ -871,13 +817,12 @@ restart: goto error; } /* Ownership assumed by the connection. */ - newsock = NULL; + newsock = nullptr; /* Enqueue request for the dispatcher thread. */ cds_wfcq_head_ptr_t head; head.h = &viewer_conn_queue.head; - cds_wfcq_enqueue(head, &viewer_conn_queue.tail, - &new_conn->qnode); + cds_wfcq_enqueue(head, &viewer_conn_queue.tail, &new_conn->qnode); /* * Wake the dispatch queue futex. @@ -904,9 +849,8 @@ error_create_poll: if (live_control_sock->fd >= 0) { int sock_fd = live_control_sock->fd; - ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker, - &sock_fd, 1, close_sock, - live_control_sock); + ret = fd_tracker_close_unsuspendable_fd( + the_fd_tracker, &sock_fd, 1, close_sock, live_control_sock); if (ret) { PERROR("close"); } @@ -924,19 +868,18 @@ error_sock_control: if (lttng_relay_stop_threads()) { ERR("Error stopping threads"); } - return NULL; + return nullptr; } /* * This thread manages the dispatching of the requests to worker threads */ -static -void *thread_dispatcher(void *data __attribute__((unused))) +static void *thread_dispatcher(void *data __attribute__((unused))) { int err = -1; ssize_t ret; struct cds_wfcq_node *node; - struct relay_connection *conn = NULL; + struct relay_connection *conn = nullptr; DBG("[thread] Live viewer relay dispatcher started"); @@ -964,15 +907,14 @@ void *thread_dispatcher(void *data __attribute__((unused))) /* Dequeue commands */ node = cds_wfcq_dequeue_blocking(&viewer_conn_queue.head, &viewer_conn_queue.tail); - if (node == NULL) { + if (node == nullptr) { DBG("Woken up but nothing in the live-viewer " - "relay command queue"); + "relay command queue"); /* Continue thread execution */ break; } - conn = caa_container_of(node, struct relay_connection, qnode); - DBG("Dispatching viewer request waiting on sock %d", - conn->sock->fd); + conn = lttng::utils::container_of(node, &relay_connection::qnode); + DBG("Dispatching viewer request waiting on sock %d", conn->sock->fd); /* * Inform worker thread of the new request. This @@ -980,13 +922,15 @@ void *thread_dispatcher(void *data __attribute__((unused))) * the data will be read at some point in time * or wait to the end of the world :) */ - ret = lttng_write(live_conn_pipe[1], &conn, sizeof(conn)); + ret = lttng_write(live_conn_pipe[1], &conn, sizeof(conn)); /* NOLINT sizeof + used on a + pointer. */ if (ret < 0) { PERROR("write conn pipe"); connection_put(conn); goto error; } - } while (node != NULL); + } while (node != nullptr); /* Futex wait on queue. Blocking call on futex() */ health_poll_entry(); @@ -1008,7 +952,7 @@ error_testpoint: if (lttng_relay_stop_threads()) { ERR("Error stopping threads"); } - return NULL; + return nullptr; } /* @@ -1016,13 +960,12 @@ error_testpoint: * * Return 0 on success or else negative value. */ -static -int viewer_connect(struct relay_connection *conn) +static int viewer_connect(struct relay_connection *conn) { int ret; struct lttng_viewer_connect reply, msg; - conn->version_check_done = 1; + conn->version_check_done = true; health_code_update(); @@ -1040,7 +983,8 @@ int viewer_connect(struct relay_connection *conn) /* Major versions must be the same */ if (reply.major != be32toh(msg.major)) { DBG("Incompatible major versions ([relayd] %u vs [client] %u)", - reply.major, be32toh(msg.major)); + reply.major, + be32toh(msg.major)); ret = -1; goto end; } @@ -1101,77 +1045,80 @@ end: * * Return 0 on success or else a negative value. */ -static -int viewer_list_sessions(struct relay_connection *conn) +static int viewer_list_sessions(struct relay_connection *conn) { int ret = 0; struct lttng_viewer_list_sessions session_list; struct lttng_ht_iter iter; struct relay_session *session; - struct lttng_viewer_session *send_session_buf = NULL; + struct lttng_viewer_session *send_session_buf = nullptr; uint32_t buf_count = SESSION_BUF_DEFAULT_COUNT; uint32_t count = 0; - send_session_buf = (lttng_viewer_session *) zmalloc(SESSION_BUF_DEFAULT_COUNT * sizeof(*send_session_buf)); + send_session_buf = calloc(SESSION_BUF_DEFAULT_COUNT); if (!send_session_buf) { return -1; } - rcu_read_lock(); - cds_lfht_for_each_entry(sessions_ht->ht, &iter.iter, session, - session_n.node) { - struct lttng_viewer_session *send_session; + { + lttng::urcu::read_lock_guard read_lock; - health_code_update(); + cds_lfht_for_each_entry (sessions_ht->ht, &iter.iter, session, session_n.node) { + struct lttng_viewer_session *send_session; - pthread_mutex_lock(&session->lock); - if (session->connection_closed) { - /* Skip closed session */ - goto next_session; - } + health_code_update(); + + pthread_mutex_lock(&session->lock); + if (session->connection_closed) { + /* Skip closed session */ + goto next_session; + } - if (count >= buf_count) { - struct lttng_viewer_session *newbuf; - uint32_t new_buf_count = buf_count << 1; + if (count >= buf_count) { + struct lttng_viewer_session *newbuf; + uint32_t new_buf_count = buf_count << 1; - newbuf = (lttng_viewer_session *) realloc(send_session_buf, - new_buf_count * sizeof(*send_session_buf)); - if (!newbuf) { + newbuf = (lttng_viewer_session *) realloc( + send_session_buf, + new_buf_count * sizeof(*send_session_buf)); + if (!newbuf) { + ret = -1; + goto break_loop; + } + send_session_buf = newbuf; + buf_count = new_buf_count; + } + send_session = &send_session_buf[count]; + if (lttng_strncpy(send_session->session_name, + session->session_name, + sizeof(send_session->session_name))) { ret = -1; goto break_loop; } - send_session_buf = newbuf; - buf_count = new_buf_count; - } - send_session = &send_session_buf[count]; - if (lttng_strncpy(send_session->session_name, - session->session_name, - sizeof(send_session->session_name))) { - ret = -1; - goto break_loop; - } - if (lttng_strncpy(send_session->hostname, session->hostname, - sizeof(send_session->hostname))) { - ret = -1; - goto break_loop; - } - send_session->id = htobe64(session->id); - send_session->live_timer = htobe32(session->live_timer); - if (session->viewer_attached) { - send_session->clients = htobe32(1); - } else { - send_session->clients = htobe32(0); + if (lttng_strncpy(send_session->hostname, + session->hostname, + sizeof(send_session->hostname))) { + ret = -1; + goto break_loop; + } + send_session->id = htobe64(session->id); + send_session->live_timer = htobe32(session->live_timer); + if (session->viewer_attached) { + send_session->clients = htobe32(1); + } else { + send_session->clients = htobe32(0); + } + send_session->streams = htobe32(session->stream_count); + count++; + next_session: + pthread_mutex_unlock(&session->lock); + continue; + break_loop: + pthread_mutex_unlock(&session->lock); + break; } - send_session->streams = htobe32(session->stream_count); - count++; - next_session: - pthread_mutex_unlock(&session->lock); - continue; - break_loop: - pthread_mutex_unlock(&session->lock); - break; } - rcu_read_unlock(); + if (ret < 0) { goto end_free; } @@ -1187,8 +1134,7 @@ int viewer_list_sessions(struct relay_connection *conn) health_code_update(); - ret = send_response(conn->sock, send_session_buf, - count * sizeof(*send_session_buf)); + ret = send_response(conn->sock, send_session_buf, count * sizeof(*send_session_buf)); if (ret < 0) { goto end_free; } @@ -1203,14 +1149,13 @@ end_free: /* * Send the viewer the list of current streams. */ -static -int viewer_get_new_streams(struct relay_connection *conn) +static int viewer_get_new_streams(struct relay_connection *conn) { int ret, send_streams = 0; uint32_t nb_created = 0, nb_unsent = 0, nb_streams = 0, nb_total = 0; struct lttng_viewer_new_streams_request request; struct lttng_viewer_new_streams_response response; - struct relay_session *session = NULL; + struct relay_session *session = nullptr; uint64_t session_id; bool closed = false; @@ -1262,9 +1207,12 @@ int viewer_get_new_streams(struct relay_connection *conn) goto send_reply_unlock; } ret = make_viewer_streams(session, - conn->viewer_session, - LTTNG_VIEWER_SEEK_BEGINNING, &nb_total, &nb_unsent, - &nb_created, &closed); + conn->viewer_session, + LTTNG_VIEWER_SEEK_BEGINNING, + &nb_total, + &nb_unsent, + &nb_created, + &closed); if (ret < 0) { /* * This is caused by an internal error; propagate the negative @@ -1273,6 +1221,8 @@ int viewer_get_new_streams(struct relay_connection *conn) response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR); goto send_reply_unlock; } + + uatomic_set(&session->new_streams, 0); send_streams = 1; response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK); @@ -1331,8 +1281,7 @@ error: /* * Send the viewer the list of current sessions. */ -static -int viewer_attach_session(struct relay_connection *conn) +static int viewer_attach_session(struct relay_connection *conn) { int send_streams = 0; ssize_t ret; @@ -1340,7 +1289,7 @@ int viewer_attach_session(struct relay_connection *conn) enum lttng_viewer_seek seek_type; struct lttng_viewer_attach_session_request request; struct lttng_viewer_attach_session_response response; - struct relay_session *session = NULL; + struct relay_session *session = nullptr; enum lttng_viewer_attach_return_code viewer_attach_status; bool closed = false; uint64_t session_id; @@ -1364,7 +1313,7 @@ int viewer_attach_session(struct relay_connection *conn) if (!conn->viewer_session) { viewer_attach_status = LTTNG_VIEWER_ATTACH_NO_SESSION; DBG("Client trying to attach before creating a live viewer session, returning status=%s", - lttng_viewer_attach_return_code_str(viewer_attach_status)); + lttng_viewer_attach_return_code_str(viewer_attach_status)); goto send_reply; } @@ -1372,8 +1321,8 @@ int viewer_attach_session(struct relay_connection *conn) if (!session) { viewer_attach_status = LTTNG_VIEWER_ATTACH_UNK; DBG("Relay session %" PRIu64 " not found, returning status=%s", - session_id, - lttng_viewer_attach_return_code_str(viewer_attach_status)); + session_id, + lttng_viewer_attach_return_code_str(viewer_attach_status)); goto send_reply; } DBG("Attach relay session ID %" PRIu64 " received", session_id); @@ -1382,18 +1331,17 @@ int viewer_attach_session(struct relay_connection *conn) if (session->live_timer == 0) { viewer_attach_status = LTTNG_VIEWER_ATTACH_NOT_LIVE; DBG("Relay session ID %" PRIu64 " is not a live session, returning status=%s", - session_id, - lttng_viewer_attach_return_code_str(viewer_attach_status)); + session_id, + lttng_viewer_attach_return_code_str(viewer_attach_status)); goto send_reply; } send_streams = 1; - viewer_attach_status = viewer_session_attach(conn->viewer_session, - session); + viewer_attach_status = viewer_session_attach(conn->viewer_session, session); if (viewer_attach_status != LTTNG_VIEWER_ATTACH_OK) { DBG("Error attaching to relay session %" PRIu64 ", returning status=%s", - session_id, - lttng_viewer_attach_return_code_str(viewer_attach_status)); + session_id, + lttng_viewer_attach_return_code_str(viewer_attach_status)); goto send_reply; } @@ -1404,9 +1352,9 @@ int viewer_attach_session(struct relay_connection *conn) seek_type = (lttng_viewer_seek) be32toh(request.seek); break; default: - ERR("Wrong seek parameter for relay session %" PRIu64 - ", returning status=%s", session_id, - lttng_viewer_attach_return_code_str(viewer_attach_status)); + ERR("Wrong seek parameter for relay session %" PRIu64 ", returning status=%s", + session_id, + lttng_viewer_attach_return_code_str(viewer_attach_status)); viewer_attach_status = LTTNG_VIEWER_ATTACH_SEEK_ERR; send_streams = 0; goto send_reply; @@ -1423,15 +1371,14 @@ int viewer_attach_session(struct relay_connection *conn) goto send_reply; } - ret = make_viewer_streams(session, - conn->viewer_session, seek_type, - &nb_streams, NULL, NULL, &closed); + ret = make_viewer_streams( + session, conn->viewer_session, seek_type, &nb_streams, nullptr, nullptr, &closed); if (ret < 0) { goto end_put_session; } pthread_mutex_unlock(&session->lock); session_put(session); - session = NULL; + session = nullptr; response.streams_count = htobe32(nb_streams); /* @@ -1445,8 +1392,8 @@ int viewer_attach_session(struct relay_connection *conn) response.streams_count = 0; viewer_attach_status = LTTNG_VIEWER_ATTACH_UNK; ERR("Session %" PRIu64 " is closed, returning status=%s", - session_id, - lttng_viewer_attach_return_code_str(viewer_attach_status)); + session_id, + lttng_viewer_attach_return_code_str(viewer_attach_status)); goto send_reply; } @@ -1495,8 +1442,7 @@ error: * * Called with rstream lock held. */ -static int try_open_index(struct relay_viewer_stream *vstream, - struct relay_stream *rstream) +static int try_open_index(struct relay_viewer_stream *vstream, struct relay_stream *rstream) { int ret = 0; const uint32_t connection_major = rstream->trace->session->major; @@ -1510,19 +1456,21 @@ static int try_open_index(struct relay_viewer_stream *vstream, /* * First time, we open the index file and at least one index is ready. */ - if (rstream->index_received_seqcount == 0 || - !vstream->stream_file.trace_chunk) { + if (rstream->index_received_seqcount == 0 || !vstream->stream_file.trace_chunk) { ret = -ENOENT; goto end; } chunk_status = lttng_index_file_create_from_trace_chunk_read_only( - vstream->stream_file.trace_chunk, rstream->path_name, - rstream->channel_name, rstream->tracefile_size, - vstream->current_tracefile_id, - lttng_to_index_major(connection_major, connection_minor), - lttng_to_index_minor(connection_major, connection_minor), - true, &vstream->index_file); + vstream->stream_file.trace_chunk, + rstream->path_name, + rstream->channel_name, + rstream->tracefile_size, + vstream->current_tracefile_id, + lttng_to_index_major(connection_major, connection_minor), + lttng_to_index_minor(connection_major, connection_minor), + true, + &vstream->index_file); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE) { ret = -ENOENT; @@ -1547,37 +1495,37 @@ end: * Called with rstream lock held. */ static int check_index_status(struct relay_viewer_stream *vstream, - struct relay_stream *rstream, struct ctf_trace *trace, - struct lttng_viewer_index *index) + struct relay_stream *rstream, + struct ctf_trace *trace, + struct lttng_viewer_index *index) { int ret; DBG("Check index status: index_received_seqcount %" PRIu64 " " - "index_sent_seqcount %" PRIu64 " " - "for stream %" PRIu64, - rstream->index_received_seqcount, - vstream->index_sent_seqcount, - vstream->stream->stream_handle); - if ((trace->session->connection_closed || rstream->closed) - && rstream->index_received_seqcount - == vstream->index_sent_seqcount) { + "index_sent_seqcount %" PRIu64 " " + "for stream %" PRIu64, + rstream->index_received_seqcount, + vstream->index_sent_seqcount, + vstream->stream->stream_handle); + if ((trace->session->connection_closed || rstream->closed) && + rstream->index_received_seqcount == vstream->index_sent_seqcount) { /* * Last index sent and session connection or relay * stream are closed. */ index->status = LTTNG_VIEWER_INDEX_HUP; DBG("Check index status: Connection or stream are closed, stream %" PRIu64 - ",connection-closed=%d, relay-stream-closed=%d, returning status=%s", - vstream->stream->stream_handle, - trace->session->connection_closed, rstream->closed, - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) index->status)); + ",connection-closed=%d, relay-stream-closed=%d, returning status=%s", + vstream->stream->stream_handle, + trace->session->connection_closed, + rstream->closed, + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) index->status)); goto hup; } else if (rstream->beacon_ts_end != -1ULL && - (rstream->index_received_seqcount == 0 || - (vstream->index_sent_seqcount != 0 && - rstream->index_received_seqcount - <= vstream->index_sent_seqcount))) { + (rstream->index_received_seqcount == 0 || + (vstream->index_sent_seqcount != 0 && + rstream->index_received_seqcount <= vstream->index_sent_seqcount))) { /* * We've received a synchronization beacon and the last index * available has been sent, the index for now is inactive. @@ -1597,15 +1545,14 @@ static int check_index_status(struct relay_viewer_stream *vstream, index->timestamp_end = htobe64(rstream->beacon_ts_end); index->stream_id = htobe64(rstream->ctf_stream_id); DBG("Check index status: inactive with beacon, for stream %" PRIu64 - ", returning status=%s", - vstream->stream->stream_handle, - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) index->status)); + ", returning status=%s", + vstream->stream->stream_handle, + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) index->status)); goto index_ready; } else if (rstream->index_received_seqcount == 0 || - (vstream->index_sent_seqcount != 0 && - rstream->index_received_seqcount - <= vstream->index_sent_seqcount)) { + (vstream->index_sent_seqcount != 0 && + rstream->index_received_seqcount <= vstream->index_sent_seqcount)) { /* * This checks whether received <= sent seqcount. In * this case, we have not received a beacon. Therefore, @@ -1619,32 +1566,29 @@ static int check_index_status(struct relay_viewer_stream *vstream, */ index->status = LTTNG_VIEWER_INDEX_RETRY; DBG("Check index status:" - "did not received beacon for stream %" PRIu64 - ", returning status=%s", - vstream->stream->stream_handle, - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) index->status)); + "did not received beacon for stream %" PRIu64 ", returning status=%s", + vstream->stream->stream_handle, + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) index->status)); goto index_ready; } else if (!tracefile_array_seq_in_file(rstream->tfa, - vstream->current_tracefile_id, - vstream->index_sent_seqcount)) { + vstream->current_tracefile_id, + vstream->index_sent_seqcount)) { /* * The next index we want to send cannot be read either * because we need to perform a rotation, or due to * the producer having overwritten its trace file. */ - DBG("Viewer stream %" PRIu64 " rotation", - vstream->stream->stream_handle); + DBG("Viewer stream %" PRIu64 " rotation", vstream->stream->stream_handle); ret = viewer_stream_rotate(vstream); if (ret == 1) { /* EOF across entire stream. */ index->status = LTTNG_VIEWER_INDEX_HUP; DBG("Check index status:" - "reached end of file for stream %" PRIu64 - ", returning status=%s", - vstream->stream->stream_handle, - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) index->status)); + "reached end of file for stream %" PRIu64 ", returning status=%s", + vstream->stream->stream_handle, + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) index->status)); goto hup; } /* @@ -1662,24 +1606,21 @@ static int check_index_status(struct relay_viewer_stream *vstream, * still unavailable. */ if (rstream->tracefile_count == 1 && - !tracefile_array_seq_in_file( - rstream->tfa, - vstream->current_tracefile_id, - vstream->index_sent_seqcount)) { + !tracefile_array_seq_in_file(rstream->tfa, + vstream->current_tracefile_id, + vstream->index_sent_seqcount)) { index->status = LTTNG_VIEWER_INDEX_RETRY; DBG("Check index status:" - "tracefile array sequence number %" PRIu64 - " not in file for stream %" PRIu64 - ", returning status=%s", - vstream->index_sent_seqcount, - vstream->stream->stream_handle, - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) index->status)); + "tracefile array sequence number %" PRIu64 + " not in file for stream %" PRIu64 ", returning status=%s", + vstream->index_sent_seqcount, + vstream->stream->stream_handle, + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) index->status)); goto index_ready; } - LTTNG_ASSERT(tracefile_array_seq_in_file(rstream->tfa, - vstream->current_tracefile_id, - vstream->index_sent_seqcount)); + LTTNG_ASSERT(tracefile_array_seq_in_file( + rstream->tfa, vstream->current_tracefile_id, vstream->index_sent_seqcount)); } /* ret == 0 means successful so we continue. */ ret = 0; @@ -1691,15 +1632,13 @@ index_ready: return 1; } -static -void viewer_stream_rotate_to_trace_chunk(struct relay_viewer_stream *vstream, - struct lttng_trace_chunk *new_trace_chunk) +static void viewer_stream_rotate_to_trace_chunk(struct relay_viewer_stream *vstream, + struct lttng_trace_chunk *new_trace_chunk) { lttng_trace_chunk_put(vstream->stream_file.trace_chunk); if (new_trace_chunk) { - const bool acquired_reference = lttng_trace_chunk_get( - new_trace_chunk); + const bool acquired_reference = lttng_trace_chunk_get(new_trace_chunk); LTTNG_ASSERT(acquired_reference); } @@ -1714,20 +1653,20 @@ void viewer_stream_rotate_to_trace_chunk(struct relay_viewer_stream *vstream, * * Return 0 on success or else a negative value. */ -static -int viewer_get_next_index(struct relay_connection *conn) +static int viewer_get_next_index(struct relay_connection *conn) { int ret; struct lttng_viewer_get_next_index request_index; struct lttng_viewer_index viewer_index; struct ctf_packet_index packet_index; - struct relay_viewer_stream *vstream = NULL; - struct relay_stream *rstream = NULL; - struct ctf_trace *ctf_trace = NULL; - struct relay_viewer_stream *metadata_viewer_stream = NULL; + struct relay_viewer_stream *vstream = nullptr; + struct relay_stream *rstream = nullptr; + struct ctf_trace *ctf_trace = nullptr; + struct relay_viewer_stream *metadata_viewer_stream = nullptr; bool viewer_stream_and_session_in_same_chunk, viewer_stream_one_rotation_behind; uint64_t stream_file_chunk_id = -1ULL, viewer_session_chunk_id = -1ULL; enum lttng_trace_chunk_status status; + bool attached_sessions_have_new_streams = false; LTTNG_ASSERT(conn); @@ -1743,10 +1682,10 @@ int viewer_get_next_index(struct relay_connection *conn) vstream = viewer_stream_get_by_id(be64toh(request_index.stream_id)); if (!vstream) { viewer_index.status = LTTNG_VIEWER_INDEX_ERR; - DBG("Client requested index of unknown stream id %" PRIu64", returning status=%s", - (uint64_t) be64toh(request_index.stream_id), - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) viewer_index.status)); + DBG("Client requested index of unknown stream id %" PRIu64 ", returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); goto send_reply; } @@ -1755,8 +1694,7 @@ int viewer_get_next_index(struct relay_connection *conn) ctf_trace = rstream->trace; /* metadata_viewer_stream may be NULL. */ - metadata_viewer_stream = - ctf_trace_get_viewer_metadata_stream(ctf_trace); + metadata_viewer_stream = ctf_trace_get_viewer_metadata_stream(ctf_trace); /* * Hold the session lock to protect against concurrent changes @@ -1772,51 +1710,63 @@ int viewer_get_next_index(struct relay_connection *conn) */ if (rstream->is_metadata) { viewer_index.status = LTTNG_VIEWER_INDEX_HUP; - DBG("Client requested index of a metadata stream id %" PRIu64", returning status=%s", - (uint64_t) be64toh(request_index.stream_id), - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) viewer_index.status)); + DBG("Client requested index of a metadata stream id %" PRIu64 + ", returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); + goto send_reply; + } + + ret = check_new_streams(conn); + if (ret < 0) { + viewer_index.status = LTTNG_VIEWER_INDEX_ERR; + ERR("Error checking for new streams in the attached sessions, returning status=%s", + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); goto send_reply; + } else if (ret == 1) { + attached_sessions_have_new_streams = true; } if (rstream->ongoing_rotation.is_set) { /* Rotation is ongoing, try again later. */ viewer_index.status = LTTNG_VIEWER_INDEX_RETRY; - DBG("Client requested index for stream id %" PRIu64" while a stream rotation is ongoing, returning status=%s", - (uint64_t) be64toh(request_index.stream_id), - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) viewer_index.status)); + DBG("Client requested index for stream id %" PRIu64 + " while a stream rotation is ongoing, returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); goto send_reply; } if (session_has_ongoing_rotation(rstream->trace->session)) { /* Rotation is ongoing, try again later. */ viewer_index.status = LTTNG_VIEWER_INDEX_RETRY; - DBG("Client requested index for stream id %" PRIu64" while a session rotation is ongoing, returning status=%s", - (uint64_t) be64toh(request_index.stream_id), - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) viewer_index.status)); + DBG("Client requested index for stream id %" PRIu64 + " while a session rotation is ongoing, returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); goto send_reply; } /* * Transition the viewer session into the newest trace chunk available. */ - if (!lttng_trace_chunk_ids_equal( - conn->viewer_session->current_trace_chunk, - rstream->trace_chunk)) { + if (!lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk, + rstream->trace_chunk)) { DBG("Relay stream and viewer chunk ids differ"); - ret = viewer_session_set_trace_chunk_copy( - conn->viewer_session, - rstream->trace_chunk); + ret = viewer_session_set_trace_chunk_copy(conn->viewer_session, + rstream->trace_chunk); if (ret) { viewer_index.status = LTTNG_VIEWER_INDEX_ERR; ERR("Error copying trace chunk for stream id %" PRIu64 - ", returning status=%s", - (uint64_t) be64toh(request_index.stream_id), - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) viewer_index.status)); + ", returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); goto send_reply; } } @@ -1833,53 +1783,49 @@ int viewer_get_next_index(struct relay_connection *conn) * after a session's destruction. */ if (vstream->stream_file.trace_chunk) { - status = lttng_trace_chunk_get_id( - vstream->stream_file.trace_chunk, - &stream_file_chunk_id); + status = lttng_trace_chunk_get_id(vstream->stream_file.trace_chunk, + &stream_file_chunk_id); LTTNG_ASSERT(status == LTTNG_TRACE_CHUNK_STATUS_OK); } if (conn->viewer_session->current_trace_chunk) { - status = lttng_trace_chunk_get_id( - conn->viewer_session->current_trace_chunk, - &viewer_session_chunk_id); + status = lttng_trace_chunk_get_id(conn->viewer_session->current_trace_chunk, + &viewer_session_chunk_id); LTTNG_ASSERT(status == LTTNG_TRACE_CHUNK_STATUS_OK); } viewer_stream_and_session_in_same_chunk = lttng_trace_chunk_ids_equal( - conn->viewer_session->current_trace_chunk, - vstream->stream_file.trace_chunk); + conn->viewer_session->current_trace_chunk, vstream->stream_file.trace_chunk); viewer_stream_one_rotation_behind = rstream->completed_rotation_count == - vstream->last_seen_rotation_count + 1; + vstream->last_seen_rotation_count + 1; if (viewer_stream_and_session_in_same_chunk) { DBG("Transition to latest chunk check (%s -> %s): Same chunk, no need to rotate", - vstream->stream_file.trace_chunk ? - std::to_string(stream_file_chunk_id).c_str() : - "None", - conn->viewer_session->current_trace_chunk ? - std::to_string(viewer_session_chunk_id).c_str() : - "None"); + vstream->stream_file.trace_chunk ? + std::to_string(stream_file_chunk_id).c_str() : + "None", + conn->viewer_session->current_trace_chunk ? + std::to_string(viewer_session_chunk_id).c_str() : + "None"); } else if (viewer_stream_one_rotation_behind && !rstream->trace_chunk) { DBG("Transition to latest chunk check (%s -> %s): One chunk behind relay stream which is being destroyed, no need to rotate", - vstream->stream_file.trace_chunk ? - std::to_string(stream_file_chunk_id).c_str() : - "None", - conn->viewer_session->current_trace_chunk ? - std::to_string(viewer_session_chunk_id).c_str() : - "None"); + vstream->stream_file.trace_chunk ? + std::to_string(stream_file_chunk_id).c_str() : + "None", + conn->viewer_session->current_trace_chunk ? + std::to_string(viewer_session_chunk_id).c_str() : + "None"); } else { DBG("Transition to latest chunk check (%s -> %s): Viewer stream chunk ID and viewer session chunk ID differ, rotating viewer stream", - vstream->stream_file.trace_chunk ? - std::to_string(stream_file_chunk_id).c_str() : - "None", - conn->viewer_session->current_trace_chunk ? - std::to_string(viewer_session_chunk_id).c_str() : - "None"); + vstream->stream_file.trace_chunk ? + std::to_string(stream_file_chunk_id).c_str() : + "None", + conn->viewer_session->current_trace_chunk ? + std::to_string(viewer_session_chunk_id).c_str() : + "None"); viewer_stream_rotate_to_trace_chunk(vstream, - conn->viewer_session->current_trace_chunk); - vstream->last_seen_rotation_count = - rstream->completed_rotation_count; + conn->viewer_session->current_trace_chunk); + vstream->last_seen_rotation_count = rstream->completed_rotation_count; } ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index); @@ -1892,37 +1838,36 @@ int viewer_get_next_index(struct relay_connection *conn) */ goto send_reply; } + /* At this point, ret is 0 thus we will be able to read the index. */ LTTNG_ASSERT(!ret); /* Try to open an index if one is needed for that stream. */ ret = try_open_index(vstream, rstream); if (ret == -ENOENT) { - if (rstream->closed) { + if (rstream->closed) { viewer_index.status = LTTNG_VIEWER_INDEX_HUP; DBG("Cannot open index for stream id %" PRIu64 - "stream is closed, returning status=%s", - (uint64_t) be64toh(request_index.stream_id), - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) viewer_index.status)); + "stream is closed, returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); goto send_reply; - } else { + } else { viewer_index.status = LTTNG_VIEWER_INDEX_RETRY; - DBG("Cannot open index for stream id %" PRIu64 - ", returning status=%s", - (uint64_t) be64toh(request_index.stream_id), - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) viewer_index.status)); + DBG("Cannot open index for stream id %" PRIu64 ", returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); goto send_reply; - } + } } if (ret < 0) { viewer_index.status = LTTNG_VIEWER_INDEX_ERR; - ERR("Error opening index for stream id %" PRIu64 - ", returning status=%s", - (uint64_t) be64toh(request_index.stream_id), - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) viewer_index.status)); + ERR("Error opening index for stream id %" PRIu64 ", returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); goto send_reply; } @@ -1938,9 +1883,12 @@ int viewer_get_next_index(struct relay_connection *conn) struct fs_handle *fs_handle; ret = utils_stream_file_path(rstream->path_name, - rstream->channel_name, rstream->tracefile_size, - vstream->current_tracefile_id, NULL, file_path, - sizeof(file_path)); + rstream->channel_name, + rstream->tracefile_size, + vstream->current_tracefile_id, + nullptr, + file_path, + sizeof(file_path)); if (ret < 0) { goto error_put; } @@ -1951,17 +1899,16 @@ int viewer_get_next_index(struct relay_connection *conn) * per-pid buffers) and a clear command has been performed. */ status = lttng_trace_chunk_open_fs_handle( - vstream->stream_file.trace_chunk, - file_path, O_RDONLY, 0, &fs_handle, true); + vstream->stream_file.trace_chunk, file_path, O_RDONLY, 0, &fs_handle, true); if (status != LTTNG_TRACE_CHUNK_STATUS_OK) { - if (status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE && - rstream->closed) { + if (status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE && rstream->closed) { viewer_index.status = LTTNG_VIEWER_INDEX_HUP; DBG("Cannot find trace chunk file and stream is closed for stream id %" PRIu64 - ", returning status=%s", - (uint64_t) be64toh(request_index.stream_id), - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) viewer_index.status)); + ", returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) + viewer_index.status)); goto send_reply; } PERROR("Failed to open trace file for viewer stream"); @@ -1970,35 +1917,20 @@ int viewer_get_next_index(struct relay_connection *conn) vstream->stream_file.handle = fs_handle; } - ret = check_new_streams(conn); - if (ret < 0) { - viewer_index.status = LTTNG_VIEWER_INDEX_ERR; - ERR("Error checking for new streams before sending new index to stream id %" PRIu64 - ", returning status=%s", - (uint64_t) be64toh(request_index.stream_id), - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) viewer_index.status)); - goto send_reply; - } else if (ret == 1) { - viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM; - } - ret = lttng_index_file_read(vstream->index_file, &packet_index); if (ret) { viewer_index.status = LTTNG_VIEWER_INDEX_ERR; - ERR("Relay error reading index file for stream id %" PRIu64 - ", returning status=%s", - (uint64_t) be64toh(request_index.stream_id), - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) viewer_index.status)); + ERR("Relay error reading index file for stream id %" PRIu64 ", returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); goto send_reply; } else { viewer_index.status = LTTNG_VIEWER_INDEX_OK; - DBG("Read index file for stream id %" PRIu64 - ", returning status=%s", - (uint64_t) be64toh(request_index.stream_id), - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) viewer_index.status)); + DBG("Read index file for stream id %" PRIu64 ", returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); vstream->index_sent_seqcount++; } @@ -2006,8 +1938,8 @@ int viewer_get_next_index(struct relay_connection *conn) * Indexes are stored in big endian, no need to switch before sending. */ DBG("Sending viewer index for stream %" PRIu64 " offset %" PRIu64, - rstream->stream_handle, - (uint64_t) be64toh(packet_index.offset)); + rstream->stream_handle, + (uint64_t) be64toh(packet_index.offset)); viewer_index.offset = packet_index.offset; viewer_index.packet_size = packet_index.packet_size; viewer_index.content_size = packet_index.content_size; @@ -2024,18 +1956,21 @@ send_reply: if (metadata_viewer_stream) { pthread_mutex_lock(&metadata_viewer_stream->stream->lock); - DBG("get next index metadata check: recv %" PRIu64 - " sent %" PRIu64, - metadata_viewer_stream->stream->metadata_received, - metadata_viewer_stream->metadata_sent); + DBG("get next index metadata check: recv %" PRIu64 " sent %" PRIu64, + metadata_viewer_stream->stream->metadata_received, + metadata_viewer_stream->metadata_sent); if (!metadata_viewer_stream->stream->metadata_received || - metadata_viewer_stream->stream->metadata_received > - metadata_viewer_stream->metadata_sent) { + metadata_viewer_stream->stream->metadata_received > + metadata_viewer_stream->metadata_sent) { viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA; } pthread_mutex_unlock(&metadata_viewer_stream->stream->lock); } + if (attached_sessions_have_new_streams) { + viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM; + } + viewer_index.flags = htobe32(viewer_index.flags); viewer_index.status = htobe32(viewer_index.status); health_code_update(); @@ -2048,8 +1983,8 @@ send_reply: if (vstream) { DBG("Index %" PRIu64 " for stream %" PRIu64 " sent", - vstream->index_sent_seqcount, - vstream->stream->stream_handle); + vstream->index_sent_seqcount, + vstream->stream->stream_handle); } end: if (metadata_viewer_stream) { @@ -2075,15 +2010,14 @@ error_put: * * Return 0 on success or else a negative value. */ -static -int viewer_get_packet(struct relay_connection *conn) +static int viewer_get_packet(struct relay_connection *conn) { int ret; off_t lseek_ret; - char *reply = NULL; + char *reply = nullptr; struct lttng_viewer_get_packet get_packet_info; struct lttng_viewer_trace_packet reply_header; - struct relay_viewer_stream *vstream = NULL; + struct relay_viewer_stream *vstream = nullptr; uint32_t reply_size = sizeof(reply_header); uint32_t packet_data_len = 0; ssize_t read_len; @@ -2092,8 +2026,7 @@ int viewer_get_packet(struct relay_connection *conn) health_code_update(); - ret = recv_request(conn->sock, &get_packet_info, - sizeof(get_packet_info)); + ret = recv_request(conn->sock, &get_packet_info, sizeof(get_packet_info)); if (ret < 0) { goto end; } @@ -2106,42 +2039,44 @@ int viewer_get_packet(struct relay_connection *conn) vstream = viewer_stream_get_by_id(stream_id); if (!vstream) { get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR; - DBG("Client requested packet of unknown stream id %" PRIu64 - ", returning status=%s", stream_id, - lttng_viewer_get_packet_return_code_str(get_packet_status)); + DBG("Client requested packet of unknown stream id %" PRIu64 ", returning status=%s", + stream_id, + lttng_viewer_get_packet_return_code_str(get_packet_status)); goto send_reply_nolock; } else { packet_data_len = be32toh(get_packet_info.len); reply_size += packet_data_len; } - reply = (char *) zmalloc(reply_size); + reply = zmalloc(reply_size); if (!reply) { get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR; PERROR("Falled to allocate reply, returning status=%s", - lttng_viewer_get_packet_return_code_str(get_packet_status)); + lttng_viewer_get_packet_return_code_str(get_packet_status)); goto error; } pthread_mutex_lock(&vstream->stream->lock); - lseek_ret = fs_handle_seek(vstream->stream_file.handle, - be64toh(get_packet_info.offset), SEEK_SET); + lseek_ret = fs_handle_seek( + vstream->stream_file.handle, be64toh(get_packet_info.offset), SEEK_SET); if (lseek_ret < 0) { get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR; PERROR("Failed to seek file system handle of viewer stream %" PRIu64 - " to offset %" PRIu64", returning status=%s", stream_id, - (uint64_t) be64toh(get_packet_info.offset), - lttng_viewer_get_packet_return_code_str(get_packet_status)); + " to offset %" PRIu64 ", returning status=%s", + stream_id, + (uint64_t) be64toh(get_packet_info.offset), + lttng_viewer_get_packet_return_code_str(get_packet_status)); goto error; } - read_len = fs_handle_read(vstream->stream_file.handle, - reply + sizeof(reply_header), packet_data_len); + read_len = fs_handle_read( + vstream->stream_file.handle, reply + sizeof(reply_header), packet_data_len); if (read_len < packet_data_len) { get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR; PERROR("Failed to read from file system handle of viewer stream id %" PRIu64 - ", offset: %" PRIu64 ", returning status=%s", stream_id, + ", offset: %" PRIu64 ", returning status=%s", + stream_id, (uint64_t) be64toh(get_packet_info.offset), - lttng_viewer_get_packet_return_code_str(get_packet_status)); + lttng_viewer_get_packet_return_code_str(get_packet_status)); goto error; } @@ -2167,8 +2102,7 @@ send_reply_nolock: ret = send_response(conn->sock, reply, reply_size); } else { /* No reply to send. */ - ret = send_response(conn->sock, &reply_header, - reply_size); + ret = send_response(conn->sock, &reply_header, reply_size); } health_code_update(); @@ -2193,17 +2127,17 @@ end: * * Return 0 on success else a negative value. */ -static -int viewer_get_metadata(struct relay_connection *conn) +static int viewer_get_metadata(struct relay_connection *conn) { int ret = 0; int fd = -1; ssize_t read_len; uint64_t len = 0; - char *data = NULL; + char *data = nullptr; struct lttng_viewer_get_metadata request; struct lttng_viewer_metadata_packet reply; - struct relay_viewer_stream *vstream = NULL; + struct relay_viewer_stream *vstream = nullptr; + bool dispose_of_stream = false; LTTNG_ASSERT(conn); @@ -2228,10 +2162,13 @@ int viewer_get_metadata(struct relay_connection *conn) * find it. */ DBG("Client requested metadata of unknown stream id %" PRIu64, - (uint64_t) be64toh(request.stream_id)); + (uint64_t) be64toh(request.stream_id)); reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR); goto send_reply; } + + pthread_mutex_lock(&vstream->stream->trace->session->lock); + pthread_mutex_lock(&vstream->stream->trace->lock); pthread_mutex_lock(&vstream->stream->lock); if (!vstream->stream->is_metadata) { ERR("Invalid metadata stream"); @@ -2240,11 +2177,7 @@ int viewer_get_metadata(struct relay_connection *conn) if (vstream->metadata_sent >= vstream->stream->metadata_received) { /* - * The live viewers expect to receive a NO_NEW_METADATA - * status before a stream disappears, otherwise they abort the - * entire live connection when receiving an error status. - * - * Clear feature resets the metadata_sent to 0 until the + * Clear feature resets the metadata_received to 0 until the * same metadata is received again. */ reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA); @@ -2252,33 +2185,18 @@ int viewer_get_metadata(struct relay_connection *conn) * The live viewer considers a closed 0 byte metadata stream as * an error. */ - if (vstream->metadata_sent > 0) { - if (vstream->stream->closed && vstream->stream->no_new_metadata_notified) { - /* - * Release ownership for the viewer metadata - * stream. Note that this reference is the - * viewer's reference. The vstream still exists - * until the end of the function as - * viewer_stream_get_by_id() took a reference. - */ - viewer_stream_put(vstream); - } - - vstream->stream->no_new_metadata_notified = true; - } + dispose_of_stream = vstream->metadata_sent > 0 && vstream->stream->closed; goto send_reply; } if (vstream->stream->trace_chunk && - !lttng_trace_chunk_ids_equal( - conn->viewer_session->current_trace_chunk, - vstream->stream->trace_chunk)) { + !lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk, + vstream->stream->trace_chunk)) { /* A rotation has occurred on the relay stream. */ DBG("Metadata relay stream and viewer chunk ids differ"); - ret = viewer_session_set_trace_chunk_copy( - conn->viewer_session, - vstream->stream->trace_chunk); + ret = viewer_session_set_trace_chunk_copy(conn->viewer_session, + vstream->stream->trace_chunk); if (ret) { reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR); goto send_reply; @@ -2286,30 +2204,42 @@ int viewer_get_metadata(struct relay_connection *conn) } if (conn->viewer_session->current_trace_chunk && - !lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk, - vstream->stream_file.trace_chunk)) { + !lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk, + vstream->stream_file.trace_chunk)) { bool acquired_reference; DBG("Viewer session and viewer stream chunk differ: " - "vsession chunk %p vstream chunk %p", - conn->viewer_session->current_trace_chunk, - vstream->stream_file.trace_chunk); + "vsession chunk %p vstream chunk %p", + conn->viewer_session->current_trace_chunk, + vstream->stream_file.trace_chunk); lttng_trace_chunk_put(vstream->stream_file.trace_chunk); - acquired_reference = lttng_trace_chunk_get(conn->viewer_session->current_trace_chunk); + acquired_reference = + lttng_trace_chunk_get(conn->viewer_session->current_trace_chunk); LTTNG_ASSERT(acquired_reference); - vstream->stream_file.trace_chunk = - conn->viewer_session->current_trace_chunk; + vstream->stream_file.trace_chunk = conn->viewer_session->current_trace_chunk; viewer_stream_close_files(vstream); } len = vstream->stream->metadata_received - vstream->metadata_sent; if (!vstream->stream_file.trace_chunk) { + if (vstream->stream->trace->session->connection_closed) { + /* + * If the connection is closed, there is no way for the metadata stream + * to ever transition back to an active chunk. As such, signal to the viewer + * that there is no new metadata available. + * + * The stream can be disposed-of. On the next execution of this command, + * the relay daemon will reply with an error status since the stream can't + * be found. + */ + dispose_of_stream = true; + } + reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA); len = 0; goto send_reply; - } else if (vstream->stream_file.trace_chunk && - !vstream->stream_file.handle && len > 0) { + } else if (vstream->stream_file.trace_chunk && !vstream->stream_file.handle && len > 0) { /* * Either this is the first time the metadata file is read, or a * rotation of the corresponding relay stream has occurred. @@ -2320,9 +2250,12 @@ int viewer_get_metadata(struct relay_connection *conn) struct relay_stream *rstream = vstream->stream; ret = utils_stream_file_path(rstream->path_name, - rstream->channel_name, rstream->tracefile_size, - vstream->current_tracefile_id, NULL, file_path, - sizeof(file_path)); + rstream->channel_name, + rstream->tracefile_size, + vstream->current_tracefile_id, + nullptr, + file_path, + sizeof(file_path)); if (ret < 0) { goto error; } @@ -2333,8 +2266,7 @@ int viewer_get_metadata(struct relay_connection *conn) * per-pid buffers) and a clear command has been performed. */ status = lttng_trace_chunk_open_fs_handle( - vstream->stream_file.trace_chunk, - file_path, O_RDONLY, 0, &fs_handle, true); + vstream->stream_file.trace_chunk, file_path, O_RDONLY, 0, &fs_handle, true); if (status != LTTNG_TRACE_CHUNK_STATUS_OK) { if (status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE) { reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA); @@ -2365,12 +2297,12 @@ int viewer_get_metadata(struct relay_connection *conn) * safe to assume that * `metadata_received` > `metadata_sent`. */ - const off_t seek_ret = fs_handle_seek(fs_handle, - vstream->metadata_sent, SEEK_SET); + const off_t seek_ret = + fs_handle_seek(fs_handle, vstream->metadata_sent, SEEK_SET); if (seek_ret < 0) { PERROR("Failed to seek metadata viewer stream file to `sent` position: pos = %" PRId64, - vstream->metadata_sent); + vstream->metadata_sent); reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR); goto send_reply; } @@ -2378,7 +2310,7 @@ int viewer_get_metadata(struct relay_connection *conn) } reply.len = htobe64(len); - data = (char *) zmalloc(len); + data = zmalloc(len); if (!data) { PERROR("viewer metadata zmalloc"); goto error; @@ -2407,12 +2339,12 @@ int viewer_get_metadata(struct relay_connection *conn) * attempt to parse an incomplete (incoherent) metadata * stream, which would result in an error. */ - const off_t seek_ret = fs_handle_seek( - vstream->stream_file.handle, -read_len, - SEEK_CUR); + const off_t seek_ret = + fs_handle_seek(vstream->stream_file.handle, -read_len, SEEK_CUR); DBG("Failed to read metadata: requested = %" PRIu64 ", got = %zd", - len, read_len); + len, + read_len); read_len = 0; len = 0; if (seek_ret < 0) { @@ -2434,6 +2366,8 @@ send_reply: health_code_update(); if (vstream) { pthread_mutex_unlock(&vstream->stream->lock); + pthread_mutex_unlock(&vstream->stream->trace->lock); + pthread_mutex_unlock(&vstream->stream->trace->session->lock); } ret = send_response(conn->sock, &reply, sizeof(reply)); if (ret < 0) { @@ -2448,8 +2382,9 @@ send_reply: } } - DBG("Sent %" PRIu64 " bytes of metadata for stream %" PRIu64, len, - (uint64_t) be64toh(request.stream_id)); + DBG("Sent %" PRIu64 " bytes of metadata for stream %" PRIu64, + len, + (uint64_t) be64toh(request.stream_id)); DBG("Metadata sent"); @@ -2458,7 +2393,22 @@ end_free: end: if (vstream) { viewer_stream_put(vstream); + if (dispose_of_stream) { + /* + * Trigger the destruction of the viewer stream + * by releasing its global reference. + * + * The live viewers expect to receive a NO_NEW_METADATA + * status before a stream disappears, otherwise they abort the + * entire live connection when receiving an error status. + * + * On the next query for this stream, an error will be reported to the + * client. + */ + viewer_stream_put(vstream); + } } + return ret; } @@ -2467,8 +2417,7 @@ end: * * Return 0 on success or else a negative value. */ -static -int viewer_create_session(struct relay_connection *conn) +static int viewer_create_session(struct relay_connection *conn) { int ret; struct lttng_viewer_create_session_response resp; @@ -2500,13 +2449,12 @@ end: * * Return 0 on success or else a negative value. */ -static -int viewer_detach_session(struct relay_connection *conn) +static int viewer_detach_session(struct relay_connection *conn) { int ret; struct lttng_viewer_detach_session_response response; struct lttng_viewer_detach_session_request request; - struct relay_session *session = NULL; + struct relay_session *session = nullptr; uint64_t viewer_session_to_close; LTTNG_ASSERT(conn); @@ -2533,8 +2481,7 @@ int viewer_detach_session(struct relay_connection *conn) session = session_get_by_id(be64toh(request.session_id)); if (!session) { - DBG("Relay session %" PRIu64 " not found", - (uint64_t) be64toh(request.session_id)); + DBG("Relay session %" PRIu64 " not found", (uint64_t) be64toh(request.session_id)); response.status = htobe32(LTTNG_VIEWER_DETACH_SESSION_UNK); goto send_reply; } @@ -2569,8 +2516,7 @@ end: /* * live_relay_unknown_command: send -1 if received unknown command */ -static -void live_relay_unknown_command(struct relay_connection *conn) +static void live_relay_unknown_command(struct relay_connection *conn) { struct lttcomm_relayd_generic_reply reply; @@ -2582,13 +2528,10 @@ void live_relay_unknown_command(struct relay_connection *conn) /* * Process the commands received on the control socket */ -static -int process_control(struct lttng_viewer_cmd *recv_hdr, - struct relay_connection *conn) +static int process_control(struct lttng_viewer_cmd *recv_hdr, struct relay_connection *conn) { int ret = 0; - lttng_viewer_command cmd = - (lttng_viewer_command) be32toh(recv_hdr->cmd); + lttng_viewer_command cmd = (lttng_viewer_command) be32toh(recv_hdr->cmd); /* * Make sure we've done the version check before any command other then @@ -2596,13 +2539,15 @@ int process_control(struct lttng_viewer_cmd *recv_hdr, */ if (cmd != LTTNG_VIEWER_CONNECT && !conn->version_check_done) { ERR("Viewer on connection %d requested %s command before version check", - conn->sock->fd, lttng_viewer_command_str(cmd)); + conn->sock->fd, + lttng_viewer_command_str(cmd)); ret = -1; goto end; } DBG("Processing %s viewer command from connection %d", - lttng_viewer_command_str(cmd), conn->sock->fd); + lttng_viewer_command_str(cmd), + conn->sock->fd); switch (cmd) { case LTTNG_VIEWER_CONNECT: @@ -2633,8 +2578,7 @@ int process_control(struct lttng_viewer_cmd *recv_hdr, ret = viewer_detach_session(conn); break; default: - ERR("Received unknown viewer command (%u)", - be32toh(recv_hdr->cmd)); + ERR("Received unknown viewer command (%u)", be32toh(recv_hdr->cmd)); live_relay_unknown_command(conn); ret = -1; goto end; @@ -2644,15 +2588,14 @@ end: return ret; } -static -void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd) +static void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd) { int ret; (void) lttng_poll_del(events, pollfd); - ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker, &pollfd, 1, - fd_tracker_util_close_fd, NULL); + ret = fd_tracker_close_unsuspendable_fd( + the_fd_tracker, &pollfd, 1, fd_tracker_util_close_fd, nullptr); if (ret < 0) { ERR("Closing pollfd %d", pollfd); } @@ -2661,8 +2604,7 @@ void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd) /* * This thread does the actual work */ -static -void *thread_worker(void *data __attribute__((unused))) +static void *thread_worker(void *data __attribute__((unused))) { int ret, err = -1; uint32_t nb_fd; @@ -2688,8 +2630,7 @@ void *thread_worker(void *data __attribute__((unused))) goto viewer_connections_ht_error; } - ret = create_named_thread_poll_set(&events, 2, - "Live viewer worker thread epoll"); + ret = create_named_thread_poll_set(&events, 2, "Live viewer worker thread epoll"); if (ret < 0) { goto error_poll_create; } @@ -2700,7 +2641,7 @@ void *thread_worker(void *data __attribute__((unused))) } restart: - while (1) { + while (true) { int i; health_code_update(); @@ -2729,14 +2670,14 @@ restart: */ for (i = 0; i < nb_fd; i++) { /* Fetch once the poll data */ - uint32_t revents = LTTNG_POLL_GETEV(&events, i); - int pollfd = LTTNG_POLL_GETFD(&events, i); + const auto revents = LTTNG_POLL_GETEV(&events, i); + const auto pollfd = LTTNG_POLL_GETFD(&events, i); health_code_update(); - /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); - if (ret) { + /* Activity on thread quit pipe, exiting. */ + if (relayd_is_thread_quit_pipe(pollfd)) { + DBG("Activity on thread quit pipe"); err = 0; goto exit; } @@ -2747,13 +2688,14 @@ restart: struct relay_connection *conn; ret = lttng_read(live_conn_pipe[0], - &conn, sizeof(conn)); + &conn, + sizeof(conn)); /* NOLINT sizeof used on a + pointer. */ if (ret < 0) { goto error; } - ret = lttng_poll_add(&events, - conn->sock->fd, - LPOLLIN | LPOLLRDHUP); + ret = lttng_poll_add( + &events, conn->sock->fd, LPOLLIN | LPOLLRDHUP); if (ret) { ERR("Failed to add new live connection file descriptor to poll set"); goto error; @@ -2764,7 +2706,9 @@ restart: ERR("Relay live pipe error"); goto error; } else { - ERR("Unexpected poll events %u for sock %d", revents, pollfd); + ERR("Unexpected poll events %u for sock %d", + revents, + pollfd); goto error; } } else { @@ -2777,8 +2721,8 @@ restart: } if (revents & LPOLLIN) { - ret = conn->sock->ops->recvmsg(conn->sock, &recv_hdr, - sizeof(recv_hdr), 0); + ret = conn->sock->ops->recvmsg( + conn->sock, &recv_hdr, sizeof(recv_hdr), 0); if (ret <= 0) { /* Connection closed. */ cleanup_connection_pollfd(&events, pollfd); @@ -2792,7 +2736,8 @@ restart: cleanup_connection_pollfd(&events, pollfd); /* Put "create" ownership reference. */ connection_put(conn); - DBG("Viewer connection closed with %d", pollfd); + DBG("Viewer connection closed with %d", + pollfd); } } } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { @@ -2800,7 +2745,9 @@ restart: /* Put "create" ownership reference. */ connection_put(conn); } else { - ERR("Unexpected poll events %u for sock %d", revents, pollfd); + ERR("Unexpected poll events %u for sock %d", + revents, + pollfd); connection_put(conn); goto error; } @@ -2815,14 +2762,15 @@ error: (void) fd_tracker_util_poll_clean(the_fd_tracker, &events); /* Cleanup remaining connection object. */ - rcu_read_lock(); - cds_lfht_for_each_entry(viewer_connections_ht->ht, &iter.iter, - destroy_conn, - sock_n.node) { - health_code_update(); - connection_put(destroy_conn); + { + lttng::urcu::read_lock_guard read_lock; + + cds_lfht_for_each_entry ( + viewer_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) { + health_code_update(); + connection_put(destroy_conn); + } } - rcu_read_unlock(); error_poll_create: lttng_ht_destroy(viewer_connections_ht); viewer_connections_ht_error: @@ -2842,20 +2790,20 @@ error_testpoint: ERR("Error stopping threads"); } rcu_unregister_thread(); - return NULL; + return nullptr; } /* * Create the relay command pipe to wake thread_manage_apps. * Closed in cleanup(). */ -static int create_conn_pipe(void) +static int create_conn_pipe() { - return fd_tracker_util_pipe_open_cloexec(the_fd_tracker, - "Live connection pipe", live_conn_pipe); + return fd_tracker_util_pipe_open_cloexec( + the_fd_tracker, "Live connection pipe", live_conn_pipe); } -int relayd_live_join(void) +int relayd_live_join() { int ret, retval = 0; void *status; @@ -2928,8 +2876,10 @@ int relayd_live_create(struct lttng_uri *uri) } /* Setup the dispatcher thread */ - ret = pthread_create(&live_dispatcher_thread, default_pthread_attr(), - thread_dispatcher, (void *) NULL); + ret = pthread_create(&live_dispatcher_thread, + default_pthread_attr(), + thread_dispatcher, + (void *) nullptr); if (ret) { errno = ret; PERROR("pthread_create viewer dispatcher"); @@ -2938,8 +2888,7 @@ int relayd_live_create(struct lttng_uri *uri) } /* Setup the worker thread */ - ret = pthread_create(&live_worker_thread, default_pthread_attr(), - thread_worker, NULL); + ret = pthread_create(&live_worker_thread, default_pthread_attr(), thread_worker, nullptr); if (ret) { errno = ret; PERROR("pthread_create viewer worker"); @@ -2948,8 +2897,8 @@ int relayd_live_create(struct lttng_uri *uri) } /* Setup the listener thread */ - ret = pthread_create(&live_listener_thread, default_pthread_attr(), - thread_listener, (void *) NULL); + ret = pthread_create( + &live_listener_thread, default_pthread_attr(), thread_listener, (void *) nullptr); if (ret) { errno = ret; PERROR("pthread_create viewer listener");