assert(stream);
assert(sock >= 0);
- DBG2("UST consumer sending stream %" PRIu64 " to sessiond", stream->key);
+ DBG("UST consumer sending stream %" PRIu64 " to sessiond", stream->key);
/* Send stream to session daemon. */
ret = ustctl_send_stream_to_sessiond(sock, stream->ustream);
}
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&channel->lock);
if (cds_lfht_is_node_deleted(&channel->node.node)) {
goto error_unlock;
}
error_unlock:
+ pthread_mutex_unlock(&channel->lock);
pthread_mutex_unlock(&consumer_data.lock);
error:
return ret;
* and ultimately try to get rid of this global consumer data lock.
*/
pthread_mutex_lock(&consumer_data.lock);
-
+ pthread_mutex_lock(&channel->lock);
pthread_mutex_lock(&channel->metadata_cache->lock);
ret = consumer_metadata_cache_write(channel, offset, len, metadata_str);
if (ret < 0) {
* waiting for the metadata cache to be flushed.
*/
pthread_mutex_unlock(&channel->metadata_cache->lock);
+ pthread_mutex_unlock(&channel->lock);
pthread_mutex_unlock(&consumer_data.lock);
goto end_free;
}
pthread_mutex_unlock(&channel->metadata_cache->lock);
+ pthread_mutex_unlock(&channel->lock);
pthread_mutex_unlock(&consumer_data.lock);
while (consumer_metadata_cache_flushed(channel, offset + len)) {
assert(stream->ustream);
assert(ctx);
- DBG2("In UST read_subbuffer (wait_fd: %d, name: %s)", stream->wait_fd,
+ DBG("In UST read_subbuffer (wait_fd: %d, name: %s)", stream->wait_fd,
stream->name);
/* Ease our life for what's next. */
DBG("UST consumer checking data pending");
+ if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
+ ret = 0;
+ goto end;
+ }
+
ret = ustctl_get_next_subbuf(stream->ustream);
if (ret == 0) {
/* There is still data so let's put back this subbuffer. */