X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer-stream.c;h=ca2c4536cb78f99758ba1a8ec2e2b81fead12f4f;hb=3654ed19aa453f4be063784e215cab81441e8962;hp=522b3cd5cd8ac0c45876c12dd7e344c8c7c27118;hpb=f8f3885cc52af9d3c951da78989d6f4a25270411;p=lttng-tools.git diff --git a/src/common/consumer/consumer-stream.c b/src/common/consumer/consumer-stream.c index 522b3cd5c..ca2c4536c 100644 --- a/src/common/consumer/consumer-stream.c +++ b/src/common/consumer/consumer-stream.c @@ -73,12 +73,8 @@ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream, stream->next_net_seq_num - 1); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { - DBG("Unable to close stream on the relayd. Continuing"); - /* - * Continue here. There is nothing we can do for the relayd. - * Chances are that the relayd has closed the socket so we just - * continue cleaning up. - */ + ERR("Relayd send close stream failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); } /* Both conditions are met, we destroy the relayd. */ @@ -359,18 +355,33 @@ int consumer_stream_write_index(struct lttng_consumer_stream *stream, struct ctf_packet_index *element) { int ret; - struct consumer_relayd_sock_pair *relayd; assert(stream); assert(element); rcu_read_lock(); - relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd) { - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = relayd_send_index(&relayd->control_sock, element, + if (stream->net_seq_idx != (uint64_t) -1ULL) { + struct consumer_relayd_sock_pair *relayd; + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd) { + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_send_index(&relayd->control_sock, element, stream->relayd_stream_id, stream->next_net_seq_num - 1); - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0) { + /* + * Communication error with lttng-relayd, + * perform cleanup now + */ + ERR("Relayd send index failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); + ret = -1; + } + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + } else { + ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't write index.", + stream->key, stream->net_seq_idx); + ret = -1; + } } else { if (lttng_index_file_write(stream->index_file, element)) { ret = -1;