X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer-stream.c;h=d0b1ddef2259fc57342cac15ef5003f19fc03e6f;hp=a62cef272294d360953e5d685fca745d7509bcff;hb=3a84e2f3c3df5b461d9621b64f14abc3b8c3c29c;hpb=c8fea79c745d42ea8143b7020ae11b4fc2da0d8a diff --git a/src/common/consumer/consumer-stream.c b/src/common/consumer/consumer-stream.c index a62cef272..d0b1ddef2 100644 --- a/src/common/consumer/consumer-stream.c +++ b/src/common/consumer/consumer-stream.c @@ -163,12 +163,9 @@ void consumer_stream_close(struct lttng_consumer_stream *stream) stream->out_fd = -1; } - if (stream->index_fd >= 0) { - ret = close(stream->index_fd); - if (ret) { - PERROR("close stream index_fd"); - } - stream->index_fd = -1; + if (stream->index_file) { + lttng_index_file_put(stream->index_file); + stream->index_file = NULL; } /* Check and cleanup relayd if needed. */ @@ -359,27 +356,29 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, * Return 0 on success or else a negative value. */ int consumer_stream_write_index(struct lttng_consumer_stream *stream, - struct ctf_packet_index *index) + struct ctf_packet_index *element) { int ret; - struct consumer_relayd_sock_pair *relayd; assert(stream); - assert(index); + assert(element); rcu_read_lock(); - relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd) { - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = relayd_send_index(&relayd->control_sock, index, + if (stream->net_seq_idx != (uint64_t) -1ULL) { + struct consumer_relayd_sock_pair *relayd; + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd) { + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_send_index(&relayd->control_sock, element, stream->relayd_stream_id, stream->next_net_seq_num - 1); - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + } else { + ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't write index.", + stream->key, stream->net_seq_idx); + ret = -1; + } } else { - ssize_t size_ret; - - size_ret = index_write(stream->index_fd, index, - sizeof(struct ctf_packet_index)); - if (size_ret < sizeof(struct ctf_packet_index)) { + if (lttng_index_file_write(stream->index_file, element)) { ret = -1; } else { ret = 0;