pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&stream->chan->lock);
+ pthread_mutex_lock(&stream->chan->timer_lock);
pthread_mutex_lock(&stream->lock);
rcu_read_lock();
rcu_read_unlock();
pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&stream->chan->timer_lock);
pthread_mutex_unlock(&stream->chan->lock);
pthread_mutex_unlock(&consumer_data.lock);
channel->tracefile_count = tracefile_count;
channel->monitor = monitor;
pthread_mutex_init(&channel->lock, NULL);
+ pthread_mutex_init(&channel->timer_lock, NULL);
/*
* In monitor mode, the streams associated with the channel will be put in
pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&channel->lock);
+ pthread_mutex_lock(&channel->timer_lock);
rcu_read_lock();
lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter);
end:
rcu_read_unlock();
+ pthread_mutex_unlock(&channel->timer_lock);
pthread_mutex_unlock(&channel->lock);
pthread_mutex_unlock(&consumer_data.lock);
ctx->consumer_error_socket = -1;
ctx->consumer_metadata_socket = -1;
+ pthread_mutex_init(&ctx->metadata_socket_lock, NULL);
/* assign the callbacks */
ctx->on_buffer_ready = buffer_ready;
ctx->on_recv_channel = recv_channel;
break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
+ if (stream->monitor) {
+ /* close the write-side in close_metadata */
+ ret = close(stream->ust_metadata_poll_pipe[0]);
+ if (ret < 0) {
+ PERROR("Close UST metadata read-side poll pipe");
+ }
+ }
lttng_ustconsumer_del_stream(stream);
break;
default:
end:
/*
* Nullify the stream reference so it is not used after deletion. The
- * consumer data lock MUST be acquired before being able to check for a
- * NULL pointer value.
+ * channel lock MUST be acquired before being able to check for
+ * a NULL pointer value.
*/
stream->chan->metadata_stream = NULL;
pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&stream->chan->lock);
+ pthread_mutex_lock(&stream->chan->timer_lock);
pthread_mutex_lock(&stream->lock);
/*
pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&stream->chan->lock);
+ pthread_mutex_unlock(&stream->chan->timer_lock);
pthread_mutex_unlock(&consumer_data.lock);
return ret;
}
DBG("Metadata available on fd %d", pollfd);
assert(stream->wait_fd == pollfd);
- len = ctx->on_buffer_ready(stream, ctx);
+ do {
+ len = ctx->on_buffer_ready(stream, ctx);
+ /*
+ * We don't check the return value here since if we get
+ * a negative len, it means an error occured thus we
+ * simply remove it from the poll set and free the
+ * stream.
+ */
+ } while (len > 0);
+
/* It's ok to have an unavailable sub-buffer */
if (len < 0 && len != -EAGAIN && len != -ENODATA) {
/* Clean up stream from consumer and free it. */
lttng_poll_del(&events, stream->wait_fd);
consumer_del_metadata_stream(stream, metadata_ht);
- } else if (len > 0) {
- stream->data_read = 1;
}
}