X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fcommon%2Fconsumer.c;h=c1dadddb3dd08854bffd8eef8cca3e9a76e82bed;hb=3f8e211fbe73cbcf69d52af5e839b14d1a951ed7;hp=761ce93753667f1b93d6f151c93a68d014855e7f;hpb=6197aea7399cfe3bb67f8602ba4c3122867ecf52;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index 761ce9375..c1dadddb3 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -274,12 +274,19 @@ void consumer_del_stream(struct lttng_consumer_stream *stream) uatomic_dec(&relayd->refcount); assert(uatomic_read(&relayd->refcount) >= 0); + /* Closing streams requires to lock the control socket. */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_send_close_stream(&relayd->control_sock, stream->relayd_stream_id, stream->next_net_seq_num - 1); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { - ERR("Unable to close stream on the relayd. Continuing"); - /* Continue here. There is nothing we can do for the relayd.*/ + DBG("Unable to close stream on the relayd. Continuing"); + /* + * Continue here. There is nothing we can do for the relayd. + * Chances are that the relayd has closed the socket so we just + * continue cleaning up. + */ } /* Both conditions are met, we destroy the relayd. */ @@ -434,8 +441,10 @@ end: } /* - * Add relayd socket to global consumer data hashtable. + * Add relayd socket to global consumer data hashtable. RCU read side lock MUST + * be acquired before calling this. */ + int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd) { int ret = 0; @@ -447,20 +456,15 @@ int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd) goto end; } - rcu_read_lock(); - lttng_ht_lookup(consumer_data.relayd_ht, (void *)((unsigned long) relayd->net_seq_idx), &iter); node = lttng_ht_iter_get_node_ulong(&iter); if (node != NULL) { - rcu_read_unlock(); /* Relayd already exist. Ignore the insertion */ goto end; } lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node); - rcu_read_unlock(); - end: return ret; } @@ -1095,7 +1099,11 @@ end: } /* - * Mmap the ring buffer, read it and write the data to the tracefile. + * Mmap the ring buffer, read it and write the data to the tracefile. This is a + * core function for writing trace buffers to either the local filesystem or + * the network. + * + * Careful review MUST be put if any changes occur! * * Returns the number of bytes written */ @@ -1168,12 +1176,6 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( written = ret; goto end; } - - /* - * We do this so the return value can match the len passed as - * argument to this function. - */ - written -= sizeof(stream->relayd_stream_id); } } /* Else, use the default set before which is the filesystem. */