X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=ea32af463417a81d3eb243ce9c91b0066915d0a0;hb=77c7c900d190f7fb4f99a456c767f069da7e72b8;hp=0e33bea72e96b4c6a7c35fd82ec9e1cfb9bb82da;hpb=09e26845dd435a5975299a380847dad06e0a6836;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index 0e33bea72..ea32af463 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -27,6 +27,7 @@ #include #include #include +#include #include #include @@ -274,12 +275,19 @@ void consumer_del_stream(struct lttng_consumer_stream *stream) uatomic_dec(&relayd->refcount); assert(uatomic_read(&relayd->refcount) >= 0); + /* Closing streams requires to lock the control socket. */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_send_close_stream(&relayd->control_sock, stream->relayd_stream_id, stream->next_net_seq_num - 1); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { - ERR("Unable to close stream on the relayd. Continuing"); - /* Continue here. There is nothing we can do for the relayd.*/ + DBG("Unable to close stream on the relayd. Continuing"); + /* + * Continue here. There is nothing we can do for the relayd. + * Chances are that the relayd has closed the socket so we just + * continue cleaning up. + */ } /* Both conditions are met, we destroy the relayd. */ @@ -434,8 +442,10 @@ end: } /* - * Add relayd socket to global consumer data hashtable. + * Add relayd socket to global consumer data hashtable. RCU read side lock MUST + * be acquired before calling this. */ + int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd) { int ret = 0; @@ -447,20 +457,15 @@ int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd) goto end; } - rcu_read_lock(); - lttng_ht_lookup(consumer_data.relayd_ht, (void *)((unsigned long) relayd->net_seq_idx), &iter); node = lttng_ht_iter_get_node_ulong(&iter); if (node != NULL) { - rcu_read_unlock(); /* Relayd already exist. Ignore the insertion */ goto end; } lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node); - rcu_read_unlock(); - end: return ret; } @@ -1087,7 +1092,7 @@ static int write_relayd_metadata_id(int fd, PERROR("write metadata stream id"); goto end; } - DBG("Metadata stream id %zu written before data", + DBG("Metadata stream id %" PRIu64 " written before data", stream->relayd_stream_id); end: @@ -1188,14 +1193,14 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( } goto end; } else if (ret > len) { - PERROR("Error in file write (ret %ld > len %lu)", ret, len); + PERROR("Error in file write (ret %zd > len %lu)", ret, len); written += ret; goto end; } else { len -= ret; mmap_offset += ret; } - DBG("Consumer mmap write() ret %ld (len %lu)", ret, len); + DBG("Consumer mmap write() ret %zd (len %lu)", ret, len); /* This call is useless on a socket so better save a syscall. */ if (!relayd) {