uint64_t key; /* del */
};
+/* Flag used to temporarily pause data consumption from testpoints. */
+int data_consumption_paused;
+
/*
* Flag to inform the polling thread to quit when all fd hung up. Updated by
* the consumer_thread_receive_fds when it notices that all fds has hung up.
if (channel->live_timer_enabled == 1) {
consumer_timer_live_stop(channel);
}
+ if (channel->monitor_timer_enabled == 1) {
+ consumer_timer_monitor_stop(channel);
+ }
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
goto error_metadata_pipe;
}
+ ctx->channel_monitor_pipe = -1;
+
return ctx;
error_metadata_pipe:
pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&stream->chan->lock);
+ pthread_mutex_lock(&stream->lock);
if (stream->chan->metadata_cache) {
/* Only applicable to userspace consumers. */
pthread_mutex_lock(&stream->chan->metadata_cache->lock);
}
- pthread_mutex_lock(&stream->lock);
/* Remove any reference to that stream. */
consumer_stream_delete(stream, ht);
*/
stream->chan->metadata_stream = NULL;
- pthread_mutex_unlock(&stream->lock);
if (stream->chan->metadata_cache) {
pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
}
+ pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&stream->chan->lock);
pthread_mutex_unlock(&consumer_data.lock);
/* poll on the array of fds */
restart:
DBG("polling on %d fd", nb_fd + 2);
+ if (testpoint(consumerd_thread_data_poll)) {
+ goto end;
+ }
health_poll_entry();
num_rdy = poll(pollfd, nb_fd + 2, -1);
health_poll_exit();
goto end;
}
+ if (caa_unlikely(data_consumption_paused)) {
+ DBG("Data consumption paused, sleeping...");
+ sleep(1);
+ goto restart;
+ }
+
/*
* If the consumer_data_pipe triggered poll go directly to the
* beginning of the loop to update the array. We want to prioritize