X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.c;h=2f32c2537503f5ab4f04145bdefa662bf2b49500;hb=92c6ca54f70068b7cd440d0ccc58610fe9b9d010;hp=03aab2d3c42b2a9a7472d34c024b987f3ab1b47a;hpb=84a182ce022e128395c8d1e5762883f5825582d7;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index 03aab2d3c..2f32c2537 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -585,14 +585,12 @@ end_no_session: * Returns 0 on success or a negative value on error. */ static -int init_viewer_stream(struct relay_stream *stream, - struct lttng_ht *viewer_streams_ht) +int init_viewer_stream(struct relay_stream *stream) { int ret; struct relay_viewer_stream *viewer_stream; assert(stream); - assert(viewer_streams_ht); viewer_stream = zmalloc(sizeof(*viewer_stream)); if (!viewer_stream) { @@ -643,8 +641,7 @@ error: */ static int viewer_attach_session(struct relay_command *cmd, - struct lttng_ht *sessions_ht, - struct lttng_ht *viewer_streams_ht) + struct lttng_ht *sessions_ht) { int ret, send_streams = 0, nb_streams = 0; struct lttng_viewer_attach_session_request request; @@ -659,7 +656,6 @@ int viewer_attach_session(struct relay_command *cmd, assert(cmd); assert(sessions_ht); - assert(viewer_streams_ht); DBG("Attach session received"); @@ -755,10 +751,9 @@ int viewer_attach_session(struct relay_command *cmd, continue; } - vstream = live_find_viewer_stream_by_id(stream->stream_handle, - viewer_streams_ht); + vstream = live_find_viewer_stream_by_id(stream->stream_handle); if (!vstream) { - ret = init_viewer_stream(stream, viewer_streams_ht); + ret = init_viewer_stream(stream); if (ret < 0) { goto end_unlock; } @@ -891,15 +886,12 @@ end: * * RCU read side lock MUST be acquired. */ -struct relay_viewer_stream *live_find_viewer_stream_by_id(uint64_t stream_id, - struct lttng_ht *viewer_streams_ht) +struct relay_viewer_stream *live_find_viewer_stream_by_id(uint64_t stream_id) { struct lttng_ht_node_u64 *node; struct lttng_ht_iter iter; struct relay_viewer_stream *stream = NULL; - assert(viewer_streams_ht); - lttng_ht_lookup(viewer_streams_ht, &stream_id, &iter); node = lttng_ht_iter_get_node_u64(&iter); if (node == NULL) { @@ -919,7 +911,7 @@ end: */ static int viewer_get_next_index(struct relay_command *cmd, - struct lttng_ht *viewer_streams_ht, struct lttng_ht *sessions_ht) + struct lttng_ht *sessions_ht) { int ret; struct lttng_viewer_get_next_index request_index; @@ -929,7 +921,6 @@ int viewer_get_next_index(struct relay_command *cmd, struct relay_stream *rstream; assert(cmd); - assert(viewer_streams_ht); assert(sessions_ht); DBG("Viewer get next index"); @@ -949,8 +940,7 @@ int viewer_get_next_index(struct relay_command *cmd, } rcu_read_lock(); - vstream = live_find_viewer_stream_by_id(be64toh(request_index.stream_id), - viewer_streams_ht); + vstream = live_find_viewer_stream_by_id(be64toh(request_index.stream_id)); if (!vstream) { ret = -1; goto end_unlock; @@ -1058,8 +1048,7 @@ end: * Return 0 on success or else a negative value. */ static -int viewer_get_packet(struct relay_command *cmd, - struct lttng_ht *viewer_streams_ht) +int viewer_get_packet(struct relay_command *cmd) { int ret, send_data = 0; char *data = NULL; @@ -1070,7 +1059,6 @@ int viewer_get_packet(struct relay_command *cmd, struct relay_viewer_stream *stream; assert(cmd); - assert(viewer_streams_ht); DBG2("Relay get data packet"); @@ -1089,8 +1077,7 @@ int viewer_get_packet(struct relay_command *cmd, } rcu_read_lock(); - stream = live_find_viewer_stream_by_id(be64toh(get_packet_info.stream_id), - viewer_streams_ht); + stream = live_find_viewer_stream_by_id(be64toh(get_packet_info.stream_id)); if (!stream) { goto error; } @@ -1187,8 +1174,7 @@ end: * Return 0 on success else a negative value. */ static -int viewer_get_metadata(struct relay_command *cmd, - struct lttng_ht *viewer_streams_ht) +int viewer_get_metadata(struct relay_command *cmd) { int ret = 0; ssize_t read_len; @@ -1199,7 +1185,6 @@ int viewer_get_metadata(struct relay_command *cmd, struct relay_viewer_stream *stream; assert(cmd); - assert(viewer_streams_ht); DBG("Relay get metadata"); @@ -1218,8 +1203,7 @@ int viewer_get_metadata(struct relay_command *cmd, } rcu_read_lock(); - stream = live_find_viewer_stream_by_id(be64toh(request.stream_id), - viewer_streams_ht); + stream = live_find_viewer_stream_by_id(be64toh(request.stream_id)); if (!stream || !stream->metadata_flag) { ERR("Invalid metadata stream"); goto error; @@ -1320,8 +1304,7 @@ void live_relay_unknown_command(struct relay_command *cmd) */ static int process_control(struct lttng_viewer_cmd *recv_hdr, - struct relay_command *cmd, struct lttng_ht *sessions_ht, - struct lttng_ht *viewer_streams_ht) + struct relay_command *cmd, struct lttng_ht *sessions_ht) { int ret = 0; @@ -1333,17 +1316,16 @@ int process_control(struct lttng_viewer_cmd *recv_hdr, ret = viewer_list_sessions(cmd, sessions_ht); break; case VIEWER_ATTACH_SESSION: - ret = viewer_attach_session(cmd, sessions_ht, - viewer_streams_ht); + ret = viewer_attach_session(cmd, sessions_ht); break; case VIEWER_GET_NEXT_INDEX: - ret = viewer_get_next_index(cmd, viewer_streams_ht, sessions_ht); + ret = viewer_get_next_index(cmd, sessions_ht); break; case VIEWER_GET_PACKET: - ret = viewer_get_packet(cmd, viewer_streams_ht); + ret = viewer_get_packet(cmd); break; case VIEWER_GET_METADATA: - ret = viewer_get_metadata(cmd, viewer_streams_ht); + ret = viewer_get_metadata(cmd); break; default: ERR("Received unknown viewer command (%u)", be32toh(recv_hdr->cmd)); @@ -1451,15 +1433,13 @@ void deferred_free_viewer_stream(struct rcu_head *head) } static -void viewer_del_streams(struct lttng_ht *viewer_streams_ht, - struct relay_session *session) +void viewer_del_streams(struct relay_session *session) { int ret; struct relay_viewer_stream *stream; struct lttng_ht_node_u64 *node; struct lttng_ht_iter iter; - assert(viewer_streams_ht); assert(session); rcu_read_lock(); @@ -1503,21 +1483,19 @@ void viewer_del_streams(struct lttng_ht *viewer_streams_ht, */ static void del_connection(struct lttng_ht *relay_connections_ht, - struct lttng_ht_iter *iter, struct relay_command *relay_connection, - struct lttng_ht *viewer_streams_ht) + struct lttng_ht_iter *iter, struct relay_command *relay_connection) { int ret; assert(relay_connections_ht); assert(iter); assert(relay_connection); - assert(viewer_streams_ht); ret = lttng_ht_del(relay_connections_ht, iter); assert(!ret); if (relay_connection->session) { - viewer_del_streams(viewer_streams_ht, relay_connection->session); + viewer_del_streams(relay_connection->session); } call_rcu(&relay_connection->rcu_node, deferred_free_connection); @@ -1539,7 +1517,6 @@ void *thread_worker(void *data) struct lttng_viewer_cmd recv_hdr; struct relay_local_data *relay_ctx = (struct relay_local_data *) data; struct lttng_ht *sessions_ht = relay_ctx->sessions_ht; - struct lttng_ht *viewer_streams_ht = relay_ctx->viewer_streams_ht; DBG("[thread] Live viewer relay worker started"); @@ -1627,12 +1604,12 @@ restart: ERR("VIEWER POLL ERROR"); cleanup_poll_connection(&events, pollfd); del_connection(relay_connections_ht, &iter, - relay_connection, viewer_streams_ht); + relay_connection); } else if (revents & (LPOLLHUP | LPOLLRDHUP)) { DBG("Viewer socket %d hung up", pollfd); cleanup_poll_connection(&events, pollfd); del_connection(relay_connections_ht, &iter, - relay_connection, viewer_streams_ht); + relay_connection); } else if (revents & LPOLLIN) { ret = relay_connection->sock->ops->recvmsg( relay_connection->sock, &recv_hdr, @@ -1642,7 +1619,7 @@ restart: if (ret <= 0) { cleanup_poll_connection(&events, pollfd); del_connection( relay_connections_ht, &iter, - relay_connection, viewer_streams_ht); + relay_connection); DBG("Viewer control connection closed with %d", pollfd); } else { @@ -1652,12 +1629,12 @@ restart: relay_connection->session->id); } ret = process_control(&recv_hdr, relay_connection, - sessions_ht, viewer_streams_ht); + sessions_ht); if (ret < 0) { /* Clear the session on error. */ cleanup_poll_connection(&events, pollfd); del_connection(relay_connections_ht, &iter, - relay_connection, viewer_streams_ht); + relay_connection); DBG("Viewer connection closed with %d", pollfd); } } @@ -1681,8 +1658,7 @@ error: relay_connection = caa_container_of(node, struct relay_command, sock_n); - del_connection(relay_connections_ht, &iter, relay_connection, - viewer_streams_ht); + del_connection(relay_connections_ht, &iter, relay_connection); } rcu_read_unlock(); error_poll_create: