X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fcommon%2Fconsumer-stream.c;h=920948760264405f16941f0069617513e9d27d6f;hb=d3e2ba59faddb31870e2ce29b6a881f7ad5ad883;hp=02887fcc457aaa3deb435d94bfc3ccb04fa2496b;hpb=ec6ea7d01adc8a9d1481ba645b282c92ec27208e;p=lttng-tools.git diff --git a/src/common/consumer-stream.c b/src/common/consumer-stream.c index 02887fcc4..920948760 100644 --- a/src/common/consumer-stream.c +++ b/src/common/consumer-stream.c @@ -24,6 +24,7 @@ #include #include +#include #include #include @@ -135,6 +136,14 @@ void consumer_stream_close(struct lttng_consumer_stream *stream) stream->out_fd = -1; } + if (stream->index_fd >= 0) { + ret = close(stream->index_fd); + if (ret) { + PERROR("close stream index_fd"); + } + stream->index_fd = -1; + } + /* Check and cleanup relayd if needed. */ rcu_read_lock(); relayd = consumer_find_relayd(stream->net_seq_idx); @@ -281,7 +290,6 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, if (stream->globally_visible) { pthread_mutex_lock(&consumer_data.lock); pthread_mutex_lock(&stream->chan->lock); - pthread_mutex_lock(&stream->chan->timer_lock); pthread_mutex_lock(&stream->lock); /* Remove every reference of the stream in the consumer. */ consumer_stream_delete(stream, ht); @@ -295,7 +303,6 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, consumer_data.need_update = 1; pthread_mutex_unlock(&stream->lock); - pthread_mutex_unlock(&stream->chan->timer_lock); pthread_mutex_unlock(&stream->chan->lock); pthread_mutex_unlock(&consumer_data.lock); } else { @@ -316,3 +323,35 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, /* Free stream within a RCU call. */ consumer_stream_free(stream); } + +/* + * Write index of a specific stream either on the relayd or local disk. + * + * Return 0 on success or else a negative value. + */ +int consumer_stream_write_index(struct lttng_consumer_stream *stream, + struct lttng_packet_index *index) +{ + int ret; + struct consumer_relayd_sock_pair *relayd; + + assert(stream); + assert(index); + + rcu_read_lock(); + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd) { + ret = relayd_send_index(&relayd->control_sock, index, + stream->relayd_stream_id, stream->next_net_seq_num - 1); + } else { + ret = index_write(stream->index_fd, index, + sizeof(struct lttng_packet_index)); + } + if (ret < 0) { + goto error; + } + +error: + rcu_read_unlock(); + return ret; +}