X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=ba97ab3d78a05bd2cec21c564b8ce4ca6bf35262;hp=43e6f318aef9eec57daecb9a6106f4b314c9296b;hb=94d4914075c61cd1ee2ec00d8b61eacff105fc47;hpb=d3e2ba59faddb31870e2ce29b6a881f7ad5ad883 diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 43e6f318a..ba97ab3d7 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -761,6 +761,50 @@ void deferred_free_session(struct rcu_head *head) free(session); } +static void close_stream(struct relay_stream *stream, + struct lttng_ht *viewer_streams_ht, struct lttng_ht *ctf_traces_ht) +{ + int delret; + struct relay_viewer_stream *vstream; + struct lttng_ht_iter iter; + + assert(stream); + assert(viewer_streams_ht); + + delret = close(stream->fd); + if (delret < 0) { + PERROR("close stream"); + } + + if (stream->index_fd >= 0) { + delret = close(stream->index_fd); + if (delret < 0) { + PERROR("close stream index_fd"); + } + } + + vstream = live_find_viewer_stream_by_id(stream->stream_handle, + viewer_streams_ht); + if (vstream) { + /* + * Set the last good value into the viewer stream. This is done + * right before the stream gets deleted from the hash table. The + * lookup failure on the live thread side of a stream indicates + * that the viewer stream index received value should be used. + */ + vstream->total_index_received = stream->total_index_received; + } + + iter.iter.node = &stream->stream_n.node; + delret = lttng_ht_del(relay_streams_ht, &iter); + assert(!delret); + iter.iter.node = &stream->ctf_trace_node.node; + delret = lttng_ht_del(ctf_traces_ht, &iter); + assert(!delret); + call_rcu(&stream->rcu_node, deferred_free_stream); + DBG("Closed tracefile %d from close stream", stream->fd); +} + /* * relay_delete_session: Free all memory associated with a session and * close all the FDs @@ -1031,12 +1075,11 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, struct relay_command *cmd, struct lttng_ht *viewer_streams_ht) { + int ret, send_ret; struct relay_session *session = cmd->session; struct lttcomm_relayd_close_stream stream_info; struct lttcomm_relayd_generic_reply reply; struct relay_stream *stream; - int ret, send_ret; - struct lttng_ht_iter iter; DBG("Close stream received"); @@ -1070,42 +1113,7 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, stream->close_flag = 1; if (close_stream_check(stream)) { - int delret; - struct relay_viewer_stream *vstream; - - delret = close(stream->fd); - if (delret < 0) { - PERROR("close stream"); - } - - if (stream->index_fd >= 0) { - delret = close(stream->index_fd); - if (delret < 0) { - PERROR("close stream index_fd"); - } - } - - vstream = live_find_viewer_stream_by_id(stream->stream_handle, - viewer_streams_ht); - if (vstream) { - /* - * Set the last good value into the viewer stream. This is done - * right before the stream gets deleted from the hash table. The - * lookup failure on the live thread side of a stream indicates - * that the viewer stream index received value should be used. - */ - vstream->total_index_received = stream->total_index_received; - } - - iter.iter.node = &stream->stream_n.node; - delret = lttng_ht_del(relay_streams_ht, &iter); - assert(!delret); - iter.iter.node = &stream->ctf_trace_node.node; - delret = lttng_ht_del(cmd->ctf_traces_ht, &iter); - assert(!delret); - call_rcu(&stream->rcu_node, - deferred_free_stream); - DBG("Closed tracefile %d from close stream", stream->fd); + close_stream(stream, viewer_streams_ht, cmd->ctf_traces_ht); } end_unlock: @@ -1829,7 +1837,7 @@ end: */ static int relay_process_data(struct relay_command *cmd, - struct lttng_ht *indexes_ht) + struct lttng_ht *indexes_ht, struct lttng_ht *viewer_streams_ht) { int ret = 0, rotate_index = 0, index_created = 0; struct relay_stream *stream; @@ -2001,24 +2009,7 @@ int relay_process_data(struct relay_command *cmd, /* Check if we need to close the FD */ if (close_stream_check(stream)) { - int cret; - struct lttng_ht_iter iter; - - cret = close(stream->fd); - if (cret < 0) { - PERROR("close stream process data"); - } - - cret = close(stream->index_fd); - if (cret < 0) { - PERROR("close stream index_fd"); - } - iter.iter.node = &stream->stream_n.node; - ret = lttng_ht_del(relay_streams_ht, &iter); - assert(!ret); - call_rcu(&stream->rcu_node, - deferred_free_stream); - DBG("Closed tracefile %d after recv data", stream->fd); + close_stream(stream, viewer_streams_ht, cmd->ctf_traces_ht); } end_rcu_unlock: @@ -2320,7 +2311,8 @@ restart: continue; } - ret = relay_process_data(relay_connection, indexes_ht); + ret = relay_process_data(relay_connection, indexes_ht, + relay_ctx->viewer_streams_ht); /* connection closed */ if (ret < 0) { relay_cleanup_poll_connection(&events, pollfd); @@ -2360,9 +2352,15 @@ error: &iter, relay_connection, sessions_ht); } } - rcu_read_unlock(); error_poll_create: - lttng_ht_destroy(indexes_ht); + { + struct relay_index *index; + cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, index_n.node) { + relay_index_delete(index, indexes_ht); + } + lttng_ht_destroy(indexes_ht); + } + rcu_read_unlock(); indexes_ht_error: lttng_ht_destroy(relay_connections_ht); relay_connections_ht_error: