X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=a6b408a90740943138e43ad69160d34a5395baa6;hb=157df58664846e22bdeea84dfcf717cb43360b3f;hp=5782175e6e7f15b57753cc29dedebd54db7a51da;hpb=6b6b9a5a667d6713eb7bcf22daf4f9bbeef00fb1;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 5782175e6..a6b408a90 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -800,8 +800,6 @@ void deferred_free_stream(struct rcu_head *head) struct relay_stream *stream = caa_container_of(head, struct relay_stream, rcu_node); - ctf_trace_try_destroy(stream->ctf_trace); - free(stream->path_name); free(stream->channel_name); free(stream); @@ -849,7 +847,11 @@ static void destroy_stream(struct relay_stream *stream) * lookup failure on the live thread side of a stream indicates * that the viewer stream index received value should be used. */ + pthread_mutex_lock(&stream->viewer_stream_rotation_lock); vstream->total_index_received = stream->total_index_received; + vstream->tracefile_count_last = stream->tracefile_count_current; + vstream->close_write_flag = 1; + pthread_mutex_unlock(&stream->viewer_stream_rotation_lock); } /* Cleanup index of that stream. */ @@ -861,6 +863,11 @@ static void destroy_stream(struct relay_stream *stream) iter.iter.node = &stream->ctf_trace_node.node; delret = lttng_ht_del(stream->ctf_traces_ht, &iter); assert(!delret); + + if (stream->ctf_trace) { + ctf_trace_try_destroy(stream->ctf_trace); + } + call_rcu(&stream->rcu_node, deferred_free_stream); DBG("Closed tracefile %d from close stream", stream->fd); } @@ -2065,18 +2072,9 @@ int relay_process_data(struct relay_command *cmd) * currently using and let it handle the fault. */ if (vstream->tracefile_count_current == new_id) { + pthread_mutex_lock(&vstream->overwrite_lock); vstream->abort_flag = 1; - vstream->close_write_flag = 1; - - ret = close(vstream->read_fd); - if (ret < 0) { - PERROR("close index"); - } - - ret = close(vstream->index_read_fd); - if (ret < 0) { - PERROR("close tracefile"); - } + pthread_mutex_unlock(&vstream->overwrite_lock); DBG("Streaming side setting abort_flag on stream %s_%lu\n", stream->channel_name, new_id); } else if (vstream->tracefile_count_current == @@ -2094,6 +2092,7 @@ int relay_process_data(struct relay_command *cmd) stream->tracefile_size, stream->tracefile_count, relayd_uid, relayd_gid, stream->fd, &(stream->tracefile_count_current), &stream->fd); + stream->total_index_received = 0; pthread_mutex_unlock(&stream->viewer_stream_rotation_lock); if (ret < 0) { ERR("Rotating stream output file");