*
* This function MUST be called with the consumer_data lock acquired.
*/
-void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
+static void destroy_relayd(struct consumer_relayd_sock_pair *relayd)
{
int ret;
struct lttng_ht_iter iter;
/* Destroy the relayd if refcount is 0 */
if (uatomic_read(&relayd->refcount) == 0) {
- consumer_destroy_relayd(relayd);
+ destroy_relayd(relayd);
}
}
/* Both conditions are met, we destroy the relayd. */
if (uatomic_read(&relayd->refcount) == 0 &&
uatomic_read(&relayd->destroy_flag)) {
- consumer_destroy_relayd(relayd);
+ destroy_relayd(relayd);
}
}
rcu_read_unlock();
- if (!--stream->chan->refcount) {
+ uatomic_dec(&stream->chan->refcount);
+ if (!uatomic_read(&stream->chan->refcount)
+ && !uatomic_read(&stream->chan->nb_init_streams)) {
free_chan = stream->chan;
}
-
call_rcu(&stream->node.head, consumer_free_stream);
end:
consumer_data.need_update = 1;
pthread_mutex_unlock(&consumer_data.lock);
- if (free_chan)
+ if (free_chan) {
consumer_del_channel(free_chan);
+ }
}
struct lttng_consumer_stream *consumer_allocate_stream(
assert(0);
goto end;
}
- DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d, net_seq_idx %d)",
- stream->path_name, stream->key,
- stream->shm_fd,
- stream->wait_fd,
- (unsigned long long) stream->mmap_len,
- stream->out_fd,
+
+ /*
+ * When nb_init_streams reaches 0, we don't need to trigger any action in
+ * terms of destroying the associated channel, because the action that
+ * causes the count to become 0 also causes a stream to be added. The
+ * channel deletion will thus be triggered by the following removal of this
+ * stream.
+ */
+ if (uatomic_read(&stream->chan->nb_init_streams) > 0) {
+ uatomic_dec(&stream->chan->nb_init_streams);
+ }
+
+ DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu,"
+ " out_fd %d, net_seq_idx %d)", stream->path_name, stream->key,
+ stream->shm_fd, stream->wait_fd,
+ (unsigned long long) stream->mmap_len, stream->out_fd,
stream->net_seq_idx);
+
end:
return stream;
}
* Add relayd socket to global consumer data hashtable. RCU read side lock MUST
* be acquired before calling this.
*/
-
-int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd)
+static int add_relayd(struct consumer_relayd_sock_pair *relayd)
{
int ret = 0;
struct lttng_ht_node_ulong *node;
int channel_key,
int shm_fd, int wait_fd,
uint64_t mmap_len,
- uint64_t max_sb_size)
+ uint64_t max_sb_size,
+ unsigned int nb_init_streams)
{
struct lttng_consumer_channel *channel;
int ret;
channel->mmap_len = mmap_len;
channel->max_sb_size = max_sb_size;
channel->refcount = 0;
+ channel->nb_init_streams = nb_init_streams;
lttng_ht_node_init_ulong(&channel->node, channel->key);
switch (consumer_data.type) {
return;
}
+ rcu_read_lock();
cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
ret = lttng_ht_del(ht, &iter);
assert(!ret);
free(stream);
}
+ rcu_read_unlock();
lttng_ht_destroy(ht);
}
static void consumer_del_metadata_stream(struct lttng_consumer_stream *stream)
{
int ret;
- struct lttng_consumer_channel *free_chan = NULL;
struct consumer_relayd_sock_pair *relayd;
assert(stream);
/* Both conditions are met, we destroy the relayd. */
if (uatomic_read(&relayd->refcount) == 0 &&
uatomic_read(&relayd->destroy_flag)) {
- consumer_destroy_relayd(relayd);
+ destroy_relayd(relayd);
}
}
rcu_read_unlock();
/* Atomically decrement channel refcount since other threads can use it. */
uatomic_dec(&stream->chan->refcount);
- if (!uatomic_read(&stream->chan->refcount)) {
- free_chan = stream->chan;
- }
-
- if (free_chan) {
- consumer_del_channel(free_chan);
+ if (!uatomic_read(&stream->chan->refcount)
+ && !uatomic_read(&stream->chan->nb_init_streams)) {
+ /* Go for channel deletion! */
+ consumer_del_channel(stream->chan);
}
free(stream);
DBG("Adding metadata stream %d to poll set",
stream->wait_fd);
+ rcu_read_lock();
/* The node should be init at this point */
lttng_ht_add_unique_ulong(metadata_ht,
&stream->waitfd_node);
+ rcu_read_unlock();
/* Add metadata stream to the global poll events list */
lttng_poll_add(&events, stream->wait_fd,
/* From here, the event is a metadata wait fd */
+ rcu_read_lock();
lttng_ht_lookup(metadata_ht, (void *)((unsigned long) pollfd),
&iter);
node = lttng_ht_iter_get_node_ulong(&iter);
if (node == NULL) {
/* FD not found, continue loop */
+ rcu_read_unlock();
continue;
}
len = ctx->on_buffer_ready(stream, ctx);
/* It's ok to have an unavailable sub-buffer */
if (len < 0 && len != -EAGAIN) {
+ rcu_read_unlock();
goto end;
} else if (len > 0) {
stream->data_read = 1;
len = ctx->on_buffer_ready(stream, ctx);
/* It's ok to have an unavailable sub-buffer */
if (len < 0 && len != -EAGAIN) {
+ rcu_read_unlock();
goto end;
}
}
/* Removing it from hash table, poll set and free memory */
lttng_ht_del(metadata_ht, &iter);
+
lttng_poll_del(&events, stream->wait_fd);
consumer_del_metadata_stream(stream);
}
+ rcu_read_unlock();
}
}
* Add relayd socket pair to consumer data hashtable. If object already
* exists or on error, the function gracefully returns.
*/
- consumer_add_relayd(relayd);
+ add_relayd(relayd);
/* All good! */
ret = 0;