pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&channel->lock);
- pthread_mutex_lock(&channel->timer_lock);
if (cds_lfht_is_node_deleted(&channel->node.node)) {
goto error_unlock;
}
error_unlock:
- pthread_mutex_unlock(&channel->timer_lock);
pthread_mutex_unlock(&channel->lock);
pthread_mutex_unlock(&consumer_data.lock);
error:
* Receive the metadata updates from the sessiond.
*/
int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
- uint64_t len, struct lttng_consumer_channel *channel)
+ uint64_t len, struct lttng_consumer_channel *channel,
+ int timer)
{
int ret, ret_code = LTTNG_OK;
char *metadata_str;
goto end_free;
}
- /*
- * XXX: The consumer data lock is acquired before calling metadata cache
- * write which calls push metadata that MUST be protected by the consumer
- * lock in order to be able to check the validity of the metadata stream of
- * the channel.
- *
- * Note that this will be subject to change to better fine grained locking
- * and ultimately try to get rid of this global consumer data lock.
- */
- pthread_mutex_lock(&consumer_data.lock);
- pthread_mutex_lock(&channel->lock);
+ if (!timer) {
+ pthread_mutex_lock(&channel->lock);
+ }
pthread_mutex_lock(&channel->timer_lock);
pthread_mutex_lock(&channel->metadata_cache->lock);
ret = consumer_metadata_cache_write(channel, offset, len, metadata_str);
*/
pthread_mutex_unlock(&channel->metadata_cache->lock);
pthread_mutex_unlock(&channel->timer_lock);
- pthread_mutex_unlock(&channel->lock);
- pthread_mutex_unlock(&consumer_data.lock);
+ if (!timer) {
+ pthread_mutex_unlock(&channel->lock);
+ }
goto end_free;
}
pthread_mutex_unlock(&channel->metadata_cache->lock);
pthread_mutex_unlock(&channel->timer_lock);
- pthread_mutex_unlock(&channel->lock);
- pthread_mutex_unlock(&consumer_data.lock);
+ if (!timer) {
+ pthread_mutex_unlock(&channel->lock);
+ }
- while (consumer_metadata_cache_flushed(channel, offset + len)) {
+ while (consumer_metadata_cache_flushed(channel, offset + len, timer)) {
DBG("Waiting for metadata to be flushed");
usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
}
}
ret = lttng_ustconsumer_recv_metadata(sock, key, offset,
- len, channel);
+ len, channel, 0);
if (ret < 0) {
/* error receiving from sessiond */
goto error_fatal;
* introduces deadlocks.
*/
int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_channel *channel)
+ struct lttng_consumer_channel *channel, int timer)
{
struct lttcomm_metadata_request_msg request;
struct lttcomm_consumer_msg msg;
}
ret_code = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
- key, offset, len, channel);
+ key, offset, len, channel, timer);
if (ret_code >= 0) {
/*
* Only send the status msg if the sessiond is alive meaning a positive