From 92c6ca54f70068b7cd440d0ccc58610fe9b9d010 Mon Sep 17 00:00:00 2001 From: David Goulet Date: Fri, 27 Sep 2013 14:35:27 -0400 Subject: [PATCH] Make viewer streams HT global This hash table is meant to be global by design thus making it globally accessible in the relayd so we don't pass it around in every function call. Signed-off-by: David Goulet --- src/bin/lttng-relayd/live.c | 74 ++++++++++------------------- src/bin/lttng-relayd/live.h | 3 +- src/bin/lttng-relayd/lttng-relayd.h | 2 +- src/bin/lttng-relayd/main.c | 28 +++++------ 4 files changed, 41 insertions(+), 66 deletions(-) diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index 03aab2d3c..2f32c2537 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -585,14 +585,12 @@ end_no_session: * Returns 0 on success or a negative value on error. */ static -int init_viewer_stream(struct relay_stream *stream, - struct lttng_ht *viewer_streams_ht) +int init_viewer_stream(struct relay_stream *stream) { int ret; struct relay_viewer_stream *viewer_stream; assert(stream); - assert(viewer_streams_ht); viewer_stream = zmalloc(sizeof(*viewer_stream)); if (!viewer_stream) { @@ -643,8 +641,7 @@ error: */ static int viewer_attach_session(struct relay_command *cmd, - struct lttng_ht *sessions_ht, - struct lttng_ht *viewer_streams_ht) + struct lttng_ht *sessions_ht) { int ret, send_streams = 0, nb_streams = 0; struct lttng_viewer_attach_session_request request; @@ -659,7 +656,6 @@ int viewer_attach_session(struct relay_command *cmd, assert(cmd); assert(sessions_ht); - assert(viewer_streams_ht); DBG("Attach session received"); @@ -755,10 +751,9 @@ int viewer_attach_session(struct relay_command *cmd, continue; } - vstream = live_find_viewer_stream_by_id(stream->stream_handle, - viewer_streams_ht); + vstream = live_find_viewer_stream_by_id(stream->stream_handle); if (!vstream) { - ret = init_viewer_stream(stream, viewer_streams_ht); + ret = init_viewer_stream(stream); if (ret < 0) { goto end_unlock; } @@ -891,15 +886,12 @@ end: * * RCU read side lock MUST be acquired. */ -struct relay_viewer_stream *live_find_viewer_stream_by_id(uint64_t stream_id, - struct lttng_ht *viewer_streams_ht) +struct relay_viewer_stream *live_find_viewer_stream_by_id(uint64_t stream_id) { struct lttng_ht_node_u64 *node; struct lttng_ht_iter iter; struct relay_viewer_stream *stream = NULL; - assert(viewer_streams_ht); - lttng_ht_lookup(viewer_streams_ht, &stream_id, &iter); node = lttng_ht_iter_get_node_u64(&iter); if (node == NULL) { @@ -919,7 +911,7 @@ end: */ static int viewer_get_next_index(struct relay_command *cmd, - struct lttng_ht *viewer_streams_ht, struct lttng_ht *sessions_ht) + struct lttng_ht *sessions_ht) { int ret; struct lttng_viewer_get_next_index request_index; @@ -929,7 +921,6 @@ int viewer_get_next_index(struct relay_command *cmd, struct relay_stream *rstream; assert(cmd); - assert(viewer_streams_ht); assert(sessions_ht); DBG("Viewer get next index"); @@ -949,8 +940,7 @@ int viewer_get_next_index(struct relay_command *cmd, } rcu_read_lock(); - vstream = live_find_viewer_stream_by_id(be64toh(request_index.stream_id), - viewer_streams_ht); + vstream = live_find_viewer_stream_by_id(be64toh(request_index.stream_id)); if (!vstream) { ret = -1; goto end_unlock; @@ -1058,8 +1048,7 @@ end: * Return 0 on success or else a negative value. */ static -int viewer_get_packet(struct relay_command *cmd, - struct lttng_ht *viewer_streams_ht) +int viewer_get_packet(struct relay_command *cmd) { int ret, send_data = 0; char *data = NULL; @@ -1070,7 +1059,6 @@ int viewer_get_packet(struct relay_command *cmd, struct relay_viewer_stream *stream; assert(cmd); - assert(viewer_streams_ht); DBG2("Relay get data packet"); @@ -1089,8 +1077,7 @@ int viewer_get_packet(struct relay_command *cmd, } rcu_read_lock(); - stream = live_find_viewer_stream_by_id(be64toh(get_packet_info.stream_id), - viewer_streams_ht); + stream = live_find_viewer_stream_by_id(be64toh(get_packet_info.stream_id)); if (!stream) { goto error; } @@ -1187,8 +1174,7 @@ end: * Return 0 on success else a negative value. */ static -int viewer_get_metadata(struct relay_command *cmd, - struct lttng_ht *viewer_streams_ht) +int viewer_get_metadata(struct relay_command *cmd) { int ret = 0; ssize_t read_len; @@ -1199,7 +1185,6 @@ int viewer_get_metadata(struct relay_command *cmd, struct relay_viewer_stream *stream; assert(cmd); - assert(viewer_streams_ht); DBG("Relay get metadata"); @@ -1218,8 +1203,7 @@ int viewer_get_metadata(struct relay_command *cmd, } rcu_read_lock(); - stream = live_find_viewer_stream_by_id(be64toh(request.stream_id), - viewer_streams_ht); + stream = live_find_viewer_stream_by_id(be64toh(request.stream_id)); if (!stream || !stream->metadata_flag) { ERR("Invalid metadata stream"); goto error; @@ -1320,8 +1304,7 @@ void live_relay_unknown_command(struct relay_command *cmd) */ static int process_control(struct lttng_viewer_cmd *recv_hdr, - struct relay_command *cmd, struct lttng_ht *sessions_ht, - struct lttng_ht *viewer_streams_ht) + struct relay_command *cmd, struct lttng_ht *sessions_ht) { int ret = 0; @@ -1333,17 +1316,16 @@ int process_control(struct lttng_viewer_cmd *recv_hdr, ret = viewer_list_sessions(cmd, sessions_ht); break; case VIEWER_ATTACH_SESSION: - ret = viewer_attach_session(cmd, sessions_ht, - viewer_streams_ht); + ret = viewer_attach_session(cmd, sessions_ht); break; case VIEWER_GET_NEXT_INDEX: - ret = viewer_get_next_index(cmd, viewer_streams_ht, sessions_ht); + ret = viewer_get_next_index(cmd, sessions_ht); break; case VIEWER_GET_PACKET: - ret = viewer_get_packet(cmd, viewer_streams_ht); + ret = viewer_get_packet(cmd); break; case VIEWER_GET_METADATA: - ret = viewer_get_metadata(cmd, viewer_streams_ht); + ret = viewer_get_metadata(cmd); break; default: ERR("Received unknown viewer command (%u)", be32toh(recv_hdr->cmd)); @@ -1451,15 +1433,13 @@ void deferred_free_viewer_stream(struct rcu_head *head) } static -void viewer_del_streams(struct lttng_ht *viewer_streams_ht, - struct relay_session *session) +void viewer_del_streams(struct relay_session *session) { int ret; struct relay_viewer_stream *stream; struct lttng_ht_node_u64 *node; struct lttng_ht_iter iter; - assert(viewer_streams_ht); assert(session); rcu_read_lock(); @@ -1503,21 +1483,19 @@ void viewer_del_streams(struct lttng_ht *viewer_streams_ht, */ static void del_connection(struct lttng_ht *relay_connections_ht, - struct lttng_ht_iter *iter, struct relay_command *relay_connection, - struct lttng_ht *viewer_streams_ht) + struct lttng_ht_iter *iter, struct relay_command *relay_connection) { int ret; assert(relay_connections_ht); assert(iter); assert(relay_connection); - assert(viewer_streams_ht); ret = lttng_ht_del(relay_connections_ht, iter); assert(!ret); if (relay_connection->session) { - viewer_del_streams(viewer_streams_ht, relay_connection->session); + viewer_del_streams(relay_connection->session); } call_rcu(&relay_connection->rcu_node, deferred_free_connection); @@ -1539,7 +1517,6 @@ void *thread_worker(void *data) struct lttng_viewer_cmd recv_hdr; struct relay_local_data *relay_ctx = (struct relay_local_data *) data; struct lttng_ht *sessions_ht = relay_ctx->sessions_ht; - struct lttng_ht *viewer_streams_ht = relay_ctx->viewer_streams_ht; DBG("[thread] Live viewer relay worker started"); @@ -1627,12 +1604,12 @@ restart: ERR("VIEWER POLL ERROR"); cleanup_poll_connection(&events, pollfd); del_connection(relay_connections_ht, &iter, - relay_connection, viewer_streams_ht); + relay_connection); } else if (revents & (LPOLLHUP | LPOLLRDHUP)) { DBG("Viewer socket %d hung up", pollfd); cleanup_poll_connection(&events, pollfd); del_connection(relay_connections_ht, &iter, - relay_connection, viewer_streams_ht); + relay_connection); } else if (revents & LPOLLIN) { ret = relay_connection->sock->ops->recvmsg( relay_connection->sock, &recv_hdr, @@ -1642,7 +1619,7 @@ restart: if (ret <= 0) { cleanup_poll_connection(&events, pollfd); del_connection( relay_connections_ht, &iter, - relay_connection, viewer_streams_ht); + relay_connection); DBG("Viewer control connection closed with %d", pollfd); } else { @@ -1652,12 +1629,12 @@ restart: relay_connection->session->id); } ret = process_control(&recv_hdr, relay_connection, - sessions_ht, viewer_streams_ht); + sessions_ht); if (ret < 0) { /* Clear the session on error. */ cleanup_poll_connection(&events, pollfd); del_connection(relay_connections_ht, &iter, - relay_connection, viewer_streams_ht); + relay_connection); DBG("Viewer connection closed with %d", pollfd); } } @@ -1681,8 +1658,7 @@ error: relay_connection = caa_container_of(node, struct relay_command, sock_n); - del_connection(relay_connections_ht, &iter, relay_connection, - viewer_streams_ht); + del_connection(relay_connections_ht, &iter, relay_connection); } rcu_read_unlock(); error_poll_create: diff --git a/src/bin/lttng-relayd/live.h b/src/bin/lttng-relayd/live.h index dd6bb6091..52608a4f7 100644 --- a/src/bin/lttng-relayd/live.h +++ b/src/bin/lttng-relayd/live.h @@ -27,7 +27,6 @@ int live_start_threads(struct lttng_uri *live_uri, struct relay_local_data *relay_ctx); void live_stop_threads(void); -struct relay_viewer_stream *live_find_viewer_stream_by_id(uint64_t stream_id, - struct lttng_ht *viewer_streams_ht); +struct relay_viewer_stream *live_find_viewer_stream_by_id(uint64_t stream_id); #endif /* LTTNG_RELAYD_LIVE_H */ diff --git a/src/bin/lttng-relayd/lttng-relayd.h b/src/bin/lttng-relayd/lttng-relayd.h index 638ecbbea..fbd7d7846 100644 --- a/src/bin/lttng-relayd/lttng-relayd.h +++ b/src/bin/lttng-relayd/lttng-relayd.h @@ -160,12 +160,12 @@ struct relay_command { struct relay_local_data { struct lttng_ht *sessions_ht; - struct lttng_ht *viewer_streams_ht; }; extern char *opt_output_path; extern struct lttng_ht *relay_streams_ht; +extern struct lttng_ht *viewer_streams_ht; struct relay_stream *relay_stream_find_by_id(uint64_t stream_id); diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index ba97ab3d7..2208144dc 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -111,6 +111,9 @@ static gid_t relayd_gid; /* Global relay stream hash table. */ struct lttng_ht *relay_streams_ht; +/* Global relay viewer stream hash table. */ +struct lttng_ht *viewer_streams_ht; + /* * usage function on stderr */ @@ -762,14 +765,13 @@ void deferred_free_session(struct rcu_head *head) } static void close_stream(struct relay_stream *stream, - struct lttng_ht *viewer_streams_ht, struct lttng_ht *ctf_traces_ht) + struct lttng_ht *ctf_traces_ht) { int delret; struct relay_viewer_stream *vstream; struct lttng_ht_iter iter; assert(stream); - assert(viewer_streams_ht); delret = close(stream->fd); if (delret < 0) { @@ -783,8 +785,7 @@ static void close_stream(struct relay_stream *stream, } } - vstream = live_find_viewer_stream_by_id(stream->stream_handle, - viewer_streams_ht); + vstream = live_find_viewer_stream_by_id(stream->stream_handle); if (vstream) { /* * Set the last good value into the viewer stream. This is done @@ -1073,7 +1074,7 @@ err_free_stream: */ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_command *cmd, struct lttng_ht *viewer_streams_ht) + struct relay_command *cmd) { int ret, send_ret; struct relay_session *session = cmd->session; @@ -1113,7 +1114,7 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, stream->close_flag = 1; if (close_stream_check(stream)) { - close_stream(stream, viewer_streams_ht, cmd->ctf_traces_ht); + close_stream(stream, cmd->ctf_traces_ht); } end_unlock: @@ -1803,7 +1804,7 @@ int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, ret = relay_send_version(recv_hdr, cmd, ctx->sessions_ht); break; case RELAYD_CLOSE_STREAM: - ret = relay_close_stream(recv_hdr, cmd, ctx->viewer_streams_ht); + ret = relay_close_stream(recv_hdr, cmd); break; case RELAYD_DATA_PENDING: ret = relay_data_pending(recv_hdr, cmd); @@ -1837,7 +1838,7 @@ end: */ static int relay_process_data(struct relay_command *cmd, - struct lttng_ht *indexes_ht, struct lttng_ht *viewer_streams_ht) + struct lttng_ht *indexes_ht) { int ret = 0, rotate_index = 0, index_created = 0; struct relay_stream *stream; @@ -2009,7 +2010,7 @@ int relay_process_data(struct relay_command *cmd, /* Check if we need to close the FD */ if (close_stream_check(stream)) { - close_stream(stream, viewer_streams_ht, cmd->ctf_traces_ht); + close_stream(stream, cmd->ctf_traces_ht); } end_rcu_unlock: @@ -2311,8 +2312,7 @@ restart: continue; } - ret = relay_process_data(relay_connection, indexes_ht, - relay_ctx->viewer_streams_ht); + ret = relay_process_data(relay_connection, indexes_ht); /* connection closed */ if (ret < 0) { relay_cleanup_poll_connection(&events, pollfd); @@ -2482,8 +2482,8 @@ int main(int argc, char **argv) } /* tables of streams indexed by stream ID */ - relay_ctx->viewer_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); - if (!relay_ctx->viewer_streams_ht) { + viewer_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + if (!viewer_streams_ht) { goto exit_relay_ctx_viewer_streams; } @@ -2536,7 +2536,7 @@ exit_dispatcher: PERROR("pthread_join"); goto error; /* join error, exit without cleanup */ } - lttng_ht_destroy(relay_ctx->viewer_streams_ht); + lttng_ht_destroy(viewer_streams_ht); exit_relay_ctx_viewer_streams: lttng_ht_destroy(relay_streams_ht); -- 2.34.1