X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer-stream.c;h=ca2c4536cb78f99758ba1a8ec2e2b81fead12f4f;hb=9276e5c88e693249bd31197baecf58310df8167e;hp=a62cef272294d360953e5d685fca745d7509bcff;hpb=c8fea79c745d42ea8143b7020ae11b4fc2da0d8a;p=lttng-tools.git diff --git a/src/common/consumer/consumer-stream.c b/src/common/consumer/consumer-stream.c index a62cef272..ca2c4536c 100644 --- a/src/common/consumer/consumer-stream.c +++ b/src/common/consumer/consumer-stream.c @@ -73,12 +73,8 @@ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream, stream->next_net_seq_num - 1); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { - DBG("Unable to close stream on the relayd. Continuing"); - /* - * Continue here. There is nothing we can do for the relayd. - * Chances are that the relayd has closed the socket so we just - * continue cleaning up. - */ + ERR("Relayd send close stream failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); } /* Both conditions are met, we destroy the relayd. */ @@ -163,12 +159,9 @@ void consumer_stream_close(struct lttng_consumer_stream *stream) stream->out_fd = -1; } - if (stream->index_fd >= 0) { - ret = close(stream->index_fd); - if (ret) { - PERROR("close stream index_fd"); - } - stream->index_fd = -1; + if (stream->index_file) { + lttng_index_file_put(stream->index_file); + stream->index_file = NULL; } /* Check and cleanup relayd if needed. */ @@ -359,27 +352,38 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, * Return 0 on success or else a negative value. */ int consumer_stream_write_index(struct lttng_consumer_stream *stream, - struct ctf_packet_index *index) + struct ctf_packet_index *element) { int ret; - struct consumer_relayd_sock_pair *relayd; assert(stream); - assert(index); + assert(element); rcu_read_lock(); - relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd) { - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = relayd_send_index(&relayd->control_sock, index, + if (stream->net_seq_idx != (uint64_t) -1ULL) { + struct consumer_relayd_sock_pair *relayd; + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd) { + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_send_index(&relayd->control_sock, element, stream->relayd_stream_id, stream->next_net_seq_num - 1); - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0) { + /* + * Communication error with lttng-relayd, + * perform cleanup now + */ + ERR("Relayd send index failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); + ret = -1; + } + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + } else { + ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't write index.", + stream->key, stream->net_seq_idx); + ret = -1; + } } else { - ssize_t size_ret; - - size_ret = index_write(stream->index_fd, index, - sizeof(struct ctf_packet_index)); - if (size_ret < sizeof(struct ctf_packet_index)) { + if (lttng_index_file_write(stream->index_file, element)) { ret = -1; } else { ret = 0;