int ret;
struct lttng_ht_iter iter;
+ if (relayd == NULL) {
+ return;
+ }
+
DBG("Consumer destroy and close relayd socket pair");
iter.iter.node = &relayd->node.node;
ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
- assert(!ret);
+ if (ret != 0) {
+ /* We assume the relayd was already destroyed */
+ return;
+ }
/* Close all sockets */
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
if (relayd != NULL) {
uatomic_dec(&relayd->refcount);
assert(uatomic_read(&relayd->refcount) >= 0);
- if (uatomic_read(&relayd->refcount) == 0) {
- /* Refcount of the relayd struct is 0, destroy it */
+
+ ret = relayd_send_close_stream(&relayd->control_sock,
+ stream->relayd_stream_id,
+ stream->next_net_seq_num - 1);
+ if (ret < 0) {
+ ERR("Unable to close stream on the relayd. Continuing");
+ /* Continue here. There is nothing we can do for the relayd.*/
+ }
+
+ /* Both conditions are met, we destroy the relayd. */
+ if (uatomic_read(&relayd->refcount) == 0 &&
+ uatomic_read(&relayd->destroy_flag)) {
consumer_destroy_relayd(relayd);
}
}
obj->net_seq_idx = net_seq_idx;
obj->refcount = 0;
+ obj->destroy_flag = 0;
lttng_ht_node_init_ulong(&obj->node, obj->net_seq_idx);
pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
/* Set header with stream information */
data_hdr.stream_id = htobe64(stream->relayd_stream_id);
data_hdr.data_size = htobe32(data_size);
+ data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++);
/* Other fields are zeroed previously */
ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,