X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.c;h=d09c2f13ea39af1f0971dcf5aed37273c6503e55;hb=424150266e9afc978cca8d00f1d70c31f534e656;hp=03aab2d3c42b2a9a7472d34c024b987f3ab1b47a;hpb=d3e2ba59faddb31870e2ce29b6a881f7ad5ad883;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index 03aab2d3c..d09c2f13e 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -96,8 +96,6 @@ void cleanup(void) { DBG("Cleaning up"); - /* Close thread quit pipes */ - utils_close_pipe(live_thread_quit_pipe); free(live_uri); } @@ -139,21 +137,6 @@ void stop_threads(void) futex_nto1_wake(&viewer_cmd_queue.futex); } -/* - * Init thread quit pipe. - * - * Return -1 on error or 0 if all pipes are created. - */ -static -int init_thread_quit_pipe(void) -{ - int ret; - - ret = utils_create_pipe_cloexec(live_thread_quit_pipe); - - return ret; -} - /* * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set. */ @@ -585,14 +568,12 @@ end_no_session: * Returns 0 on success or a negative value on error. */ static -int init_viewer_stream(struct relay_stream *stream, - struct lttng_ht *viewer_streams_ht) +int init_viewer_stream(struct relay_stream *stream) { int ret; struct relay_viewer_stream *viewer_stream; assert(stream); - assert(viewer_streams_ht); viewer_stream = zmalloc(sizeof(*viewer_stream)); if (!viewer_stream) { @@ -643,8 +624,7 @@ error: */ static int viewer_attach_session(struct relay_command *cmd, - struct lttng_ht *sessions_ht, - struct lttng_ht *viewer_streams_ht) + struct lttng_ht *sessions_ht) { int ret, send_streams = 0, nb_streams = 0; struct lttng_viewer_attach_session_request request; @@ -659,7 +639,6 @@ int viewer_attach_session(struct relay_command *cmd, assert(cmd); assert(sessions_ht); - assert(viewer_streams_ht); DBG("Attach session received"); @@ -693,7 +672,7 @@ int viewer_attach_session(struct relay_command *cmd, } session = caa_container_of(node, struct relay_session, session_n); - if (cmd->session == session) { + if (cmd->session_id == session->id) { /* Same viewer already attached, just send the stream list. */ send_streams = 1; response.status = htobe32(VIEWER_ATTACH_OK); @@ -709,6 +688,7 @@ int viewer_attach_session(struct relay_command *cmd, session->viewer_attached++; send_streams = 1; response.status = htobe32(VIEWER_ATTACH_OK); + cmd->session_id = session->id; cmd->session = session; } @@ -755,10 +735,9 @@ int viewer_attach_session(struct relay_command *cmd, continue; } - vstream = live_find_viewer_stream_by_id(stream->stream_handle, - viewer_streams_ht); + vstream = live_find_viewer_stream_by_id(stream->stream_handle); if (!vstream) { - ret = init_viewer_stream(stream, viewer_streams_ht); + ret = init_viewer_stream(stream); if (ret < 0) { goto end_unlock; } @@ -891,15 +870,12 @@ end: * * RCU read side lock MUST be acquired. */ -struct relay_viewer_stream *live_find_viewer_stream_by_id(uint64_t stream_id, - struct lttng_ht *viewer_streams_ht) +struct relay_viewer_stream *live_find_viewer_stream_by_id(uint64_t stream_id) { struct lttng_ht_node_u64 *node; struct lttng_ht_iter iter; struct relay_viewer_stream *stream = NULL; - assert(viewer_streams_ht); - lttng_ht_lookup(viewer_streams_ht, &stream_id, &iter); node = lttng_ht_iter_get_node_u64(&iter); if (node == NULL) { @@ -919,7 +895,7 @@ end: */ static int viewer_get_next_index(struct relay_command *cmd, - struct lttng_ht *viewer_streams_ht, struct lttng_ht *sessions_ht) + struct lttng_ht *sessions_ht) { int ret; struct lttng_viewer_get_next_index request_index; @@ -929,7 +905,6 @@ int viewer_get_next_index(struct relay_command *cmd, struct relay_stream *rstream; assert(cmd); - assert(viewer_streams_ht); assert(sessions_ht); DBG("Viewer get next index"); @@ -949,8 +924,7 @@ int viewer_get_next_index(struct relay_command *cmd, } rcu_read_lock(); - vstream = live_find_viewer_stream_by_id(be64toh(request_index.stream_id), - viewer_streams_ht); + vstream = live_find_viewer_stream_by_id(be64toh(request_index.stream_id)); if (!vstream) { ret = -1; goto end_unlock; @@ -1058,8 +1032,7 @@ end: * Return 0 on success or else a negative value. */ static -int viewer_get_packet(struct relay_command *cmd, - struct lttng_ht *viewer_streams_ht) +int viewer_get_packet(struct relay_command *cmd) { int ret, send_data = 0; char *data = NULL; @@ -1070,7 +1043,6 @@ int viewer_get_packet(struct relay_command *cmd, struct relay_viewer_stream *stream; assert(cmd); - assert(viewer_streams_ht); DBG2("Relay get data packet"); @@ -1089,8 +1061,7 @@ int viewer_get_packet(struct relay_command *cmd, } rcu_read_lock(); - stream = live_find_viewer_stream_by_id(be64toh(get_packet_info.stream_id), - viewer_streams_ht); + stream = live_find_viewer_stream_by_id(be64toh(get_packet_info.stream_id)); if (!stream) { goto error; } @@ -1187,8 +1158,7 @@ end: * Return 0 on success else a negative value. */ static -int viewer_get_metadata(struct relay_command *cmd, - struct lttng_ht *viewer_streams_ht) +int viewer_get_metadata(struct relay_command *cmd) { int ret = 0; ssize_t read_len; @@ -1199,7 +1169,6 @@ int viewer_get_metadata(struct relay_command *cmd, struct relay_viewer_stream *stream; assert(cmd); - assert(viewer_streams_ht); DBG("Relay get metadata"); @@ -1218,8 +1187,7 @@ int viewer_get_metadata(struct relay_command *cmd, } rcu_read_lock(); - stream = live_find_viewer_stream_by_id(be64toh(request.stream_id), - viewer_streams_ht); + stream = live_find_viewer_stream_by_id(be64toh(request.stream_id)); if (!stream || !stream->metadata_flag) { ERR("Invalid metadata stream"); goto error; @@ -1320,8 +1288,7 @@ void live_relay_unknown_command(struct relay_command *cmd) */ static int process_control(struct lttng_viewer_cmd *recv_hdr, - struct relay_command *cmd, struct lttng_ht *sessions_ht, - struct lttng_ht *viewer_streams_ht) + struct relay_command *cmd, struct lttng_ht *sessions_ht) { int ret = 0; @@ -1333,17 +1300,16 @@ int process_control(struct lttng_viewer_cmd *recv_hdr, ret = viewer_list_sessions(cmd, sessions_ht); break; case VIEWER_ATTACH_SESSION: - ret = viewer_attach_session(cmd, sessions_ht, - viewer_streams_ht); + ret = viewer_attach_session(cmd, sessions_ht); break; case VIEWER_GET_NEXT_INDEX: - ret = viewer_get_next_index(cmd, viewer_streams_ht, sessions_ht); + ret = viewer_get_next_index(cmd, sessions_ht); break; case VIEWER_GET_PACKET: - ret = viewer_get_packet(cmd, viewer_streams_ht); + ret = viewer_get_packet(cmd); break; case VIEWER_GET_METADATA: - ret = viewer_get_metadata(cmd, viewer_streams_ht); + ret = viewer_get_metadata(cmd); break; default: ERR("Received unknown viewer command (%u)", be32toh(recv_hdr->cmd)); @@ -1451,17 +1417,13 @@ void deferred_free_viewer_stream(struct rcu_head *head) } static -void viewer_del_streams(struct lttng_ht *viewer_streams_ht, - struct relay_session *session) +void viewer_del_streams(uint64_t session_id) { int ret; struct relay_viewer_stream *stream; struct lttng_ht_node_u64 *node; struct lttng_ht_iter iter; - assert(viewer_streams_ht); - assert(session); - rcu_read_lock(); cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, node, node) { node = lttng_ht_iter_get_node_u64(&iter); @@ -1470,7 +1432,7 @@ void viewer_del_streams(struct lttng_ht *viewer_streams_ht, } stream = caa_container_of(node, struct relay_viewer_stream, stream_n); - if (stream->session_id != session->id) { + if (stream->session_id != session_id) { continue; } @@ -1503,22 +1465,18 @@ void viewer_del_streams(struct lttng_ht *viewer_streams_ht, */ static void del_connection(struct lttng_ht *relay_connections_ht, - struct lttng_ht_iter *iter, struct relay_command *relay_connection, - struct lttng_ht *viewer_streams_ht) + struct lttng_ht_iter *iter, struct relay_command *relay_connection) { int ret; assert(relay_connections_ht); assert(iter); assert(relay_connection); - assert(viewer_streams_ht); ret = lttng_ht_del(relay_connections_ht, iter); assert(!ret); - if (relay_connection->session) { - viewer_del_streams(viewer_streams_ht, relay_connection->session); - } + viewer_del_streams(relay_connection->session_id); call_rcu(&relay_connection->rcu_node, deferred_free_connection); } @@ -1539,7 +1497,6 @@ void *thread_worker(void *data) struct lttng_viewer_cmd recv_hdr; struct relay_local_data *relay_ctx = (struct relay_local_data *) data; struct lttng_ht *sessions_ht = relay_ctx->sessions_ht; - struct lttng_ht *viewer_streams_ht = relay_ctx->viewer_streams_ht; DBG("[thread] Live viewer relay worker started"); @@ -1624,15 +1581,14 @@ restart: sock_n); if (revents & (LPOLLERR)) { - ERR("VIEWER POLL ERROR"); cleanup_poll_connection(&events, pollfd); del_connection(relay_connections_ht, &iter, - relay_connection, viewer_streams_ht); + relay_connection); } else if (revents & (LPOLLHUP | LPOLLRDHUP)) { DBG("Viewer socket %d hung up", pollfd); cleanup_poll_connection(&events, pollfd); del_connection(relay_connections_ht, &iter, - relay_connection, viewer_streams_ht); + relay_connection); } else if (revents & LPOLLIN) { ret = relay_connection->sock->ops->recvmsg( relay_connection->sock, &recv_hdr, @@ -1642,7 +1598,7 @@ restart: if (ret <= 0) { cleanup_poll_connection(&events, pollfd); del_connection( relay_connections_ht, &iter, - relay_connection, viewer_streams_ht); + relay_connection); DBG("Viewer control connection closed with %d", pollfd); } else { @@ -1652,12 +1608,12 @@ restart: relay_connection->session->id); } ret = process_control(&recv_hdr, relay_connection, - sessions_ht, viewer_streams_ht); + sessions_ht); if (ret < 0) { /* Clear the session on error. */ cleanup_poll_connection(&events, pollfd); del_connection(relay_connections_ht, &iter, - relay_connection, viewer_streams_ht); + relay_connection); DBG("Viewer connection closed with %d", pollfd); } } @@ -1681,8 +1637,7 @@ error: relay_connection = caa_container_of(node, struct relay_command, sock_n); - del_connection(relay_connections_ht, &iter, relay_connection, - viewer_streams_ht); + del_connection(relay_connections_ht, &iter, relay_connection); } rcu_read_unlock(); error_poll_create: @@ -1747,7 +1702,7 @@ error: * main */ int live_start_threads(struct lttng_uri *uri, - struct relay_local_data *relay_ctx) + struct relay_local_data *relay_ctx, int quit_pipe[2]) { int ret = 0; void *status; @@ -1756,10 +1711,8 @@ int live_start_threads(struct lttng_uri *uri, assert(uri); live_uri = uri; - /* Create thread quit pipe */ - if ((ret = init_thread_quit_pipe()) < 0) { - goto error; - } + live_thread_quit_pipe[0] = quit_pipe[0]; + live_thread_quit_pipe[1] = quit_pipe[1]; /* Check if daemon is UID = 0 */ is_root = !getuid();