uint64_t key; /* del */
};
+/* Flag used to temporarily pause data consumption from testpoints. */
+int data_consumption_paused;
+
/*
* Flag to inform the polling thread to quit when all fd hung up. Updated by
* the consumer_thread_receive_fds when it notices that all fds has hung up.
* Also updated by the signal handler (consumer_should_exit()). Read by the
* polling threads.
*/
-volatile int consumer_quit;
+int consumer_quit;
/*
* Global hash table containing respectively metadata and data streams. The
if (channel->live_timer_enabled == 1) {
consumer_timer_live_stop(channel);
}
+ if (channel->monitor_timer_enabled == 1) {
+ consumer_timer_monitor_stop(channel);
+ }
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
{
ssize_t ret;
- consumer_quit = 1;
+ CMM_STORE_SHARED(consumer_quit, 1);
ret = lttng_write(ctx->consumer_should_quit[1], "4", 1);
if (ret < 1) {
PERROR("write consumer quit");
goto error_metadata_pipe;
}
+ ctx->channel_monitor_pipe = -1;
+
return ctx;
error_metadata_pipe:
pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&stream->chan->lock);
+ pthread_mutex_lock(&stream->lock);
if (stream->chan->metadata_cache) {
/* Only applicable to userspace consumers. */
pthread_mutex_lock(&stream->chan->metadata_cache->lock);
}
- pthread_mutex_lock(&stream->lock);
/* Remove any reference to that stream. */
consumer_stream_delete(stream, ht);
*/
stream->chan->metadata_stream = NULL;
- pthread_mutex_unlock(&stream->lock);
if (stream->chan->metadata_cache) {
pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
}
+ pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&stream->chan->lock);
pthread_mutex_unlock(&consumer_data.lock);
pthread_mutex_unlock(&consumer_data.lock);
/* No FDs and consumer_quit, consumer_cleanup the thread */
- if (nb_fd == 0 && consumer_quit == 1) {
+ if (nb_fd == 0 && CMM_LOAD_SHARED(consumer_quit) == 1) {
err = 0; /* All is OK */
goto end;
}
/* poll on the array of fds */
restart:
DBG("polling on %d fd", nb_fd + 2);
+ if (testpoint(consumerd_thread_data_poll)) {
+ goto end;
+ }
health_poll_entry();
num_rdy = poll(pollfd, nb_fd + 2, -1);
health_poll_exit();
goto end;
}
+ if (caa_unlikely(data_consumption_paused)) {
+ DBG("Data consumption paused, sleeping...");
+ sleep(1);
+ goto restart;
+ }
+
/*
* If the consumer_data_pipe triggered poll go directly to the
* beginning of the loop to update the array. We want to prioritize
err = 0;
goto end;
}
- if (consumer_quit) {
+ if (CMM_LOAD_SHARED(consumer_quit)) {
DBG("consumer_thread_receive_fds received quit from signal");
err = 0; /* All is OK */
goto end;
* when all fds have hung up, the polling thread
* can exit cleanly
*/
- consumer_quit = 1;
+ CMM_STORE_SHARED(consumer_quit, 1);
/*
* Notify the data poll thread to poll back again and test the