}
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&channel->lock);
if (cds_lfht_is_node_deleted(&channel->node.node)) {
goto error_unlock;
}
error_unlock:
+ pthread_mutex_unlock(&channel->lock);
pthread_mutex_unlock(&consumer_data.lock);
error:
return ret;
* Returns 0 on success, < 0 on error
*/
static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
- struct lttng_consumer_local_data *ctx)
+ uint64_t max_stream_size, struct lttng_consumer_local_data *ctx)
{
int ret;
unsigned use_relayd = 0;
goto error_unlock;
}
+ /*
+ * The original value is sent back if max stream size is larger than
+ * the possible size of the snapshot. Also, we asume that the session
+ * daemon should never send a maximum stream size that is lower than
+ * subbuffer size.
+ */
+ consumed_pos = consumer_get_consumed_maxsize(consumed_pos,
+ produced_pos, max_stream_size);
+
while (consumed_pos < produced_pos) {
ssize_t read_len;
unsigned long len, padded_len;
* and ultimately try to get rid of this global consumer data lock.
*/
pthread_mutex_lock(&consumer_data.lock);
-
+ pthread_mutex_lock(&channel->lock);
pthread_mutex_lock(&channel->metadata_cache->lock);
ret = consumer_metadata_cache_write(channel, offset, len, metadata_str);
if (ret < 0) {
* waiting for the metadata cache to be flushed.
*/
pthread_mutex_unlock(&channel->metadata_cache->lock);
+ pthread_mutex_unlock(&channel->lock);
pthread_mutex_unlock(&consumer_data.lock);
goto end_free;
}
pthread_mutex_unlock(&channel->metadata_cache->lock);
+ pthread_mutex_unlock(&channel->lock);
pthread_mutex_unlock(&consumer_data.lock);
while (consumer_metadata_cache_flushed(channel, offset + len)) {
ret = snapshot_channel(msg.u.snapshot_channel.key,
msg.u.snapshot_channel.pathname,
msg.u.snapshot_channel.relayd_id,
+ msg.u.snapshot_channel.max_stream_size,
ctx);
if (ret < 0) {
ERR("Snapshot channel failed");
DBG("UST consumer checking data pending");
+ if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
+ ret = 0;
+ goto end;
+ }
+
ret = ustctl_get_next_subbuf(stream->ustream);
if (ret == 0) {
/* There is still data so let's put back this subbuffer. */