Cleanup: use CMM accessors for consumer_quit variable
[lttng-tools.git] / src / common / consumer / consumer.c
index 234944dad0abf6d6c6594fa68ea64721f1488d87..9834322cf589d0c00c0ffef796e0ebb461291964 100644 (file)
@@ -67,13 +67,16 @@ struct consumer_channel_msg {
        uint64_t key;                           /* del */
 };
 
+/* Flag used to temporarily pause data consumption from testpoints. */
+int data_consumption_paused;
+
 /*
  * Flag to inform the polling thread to quit when all fd hung up. Updated by
  * the consumer_thread_receive_fds when it notices that all fds has hung up.
  * Also updated by the signal handler (consumer_should_exit()). Read by the
  * polling threads.
  */
-volatile int consumer_quit;
+int consumer_quit;
 
 /*
  * Global hash table containing respectively metadata and data streams. The
@@ -368,6 +371,9 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
        if (channel->live_timer_enabled == 1) {
                consumer_timer_live_stop(channel);
        }
+       if (channel->monitor_timer_enabled == 1) {
+               consumer_timer_monitor_stop(channel);
+       }
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -1221,7 +1227,7 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
 {
        ssize_t ret;
 
-       consumer_quit = 1;
+       CMM_STORE_SHARED(consumer_quit, 1);
        ret = lttng_write(ctx->consumer_should_quit[1], "4", 1);
        if (ret < 1) {
                PERROR("write consumer quit");
@@ -1348,6 +1354,8 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                goto error_metadata_pipe;
        }
 
+       ctx->channel_monitor_pipe = -1;
+
        return ctx;
 
 error_metadata_pipe:
@@ -2051,9 +2059,12 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
        DBG3("Consumer delete metadata stream %d", stream->wait_fd);
 
        pthread_mutex_lock(&consumer_data.lock);
-       pthread_mutex_lock(&stream->chan->metadata_cache->lock);
        pthread_mutex_lock(&stream->chan->lock);
        pthread_mutex_lock(&stream->lock);
+       if (stream->chan->metadata_cache) {
+               /* Only applicable to userspace consumers. */
+               pthread_mutex_lock(&stream->chan->metadata_cache->lock);
+       }
 
        /* Remove any reference to that stream. */
        consumer_stream_delete(stream, ht);
@@ -2077,9 +2088,11 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
         */
        stream->chan->metadata_stream = NULL;
 
+       if (stream->chan->metadata_cache) {
+               pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
+       }
        pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&stream->chan->lock);
-       pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
        if (free_chan) {
@@ -2513,13 +2526,16 @@ void *consumer_thread_data_poll(void *data)
                pthread_mutex_unlock(&consumer_data.lock);
 
                /* No FDs and consumer_quit, consumer_cleanup the thread */
-               if (nb_fd == 0 && consumer_quit == 1) {
+               if (nb_fd == 0 && CMM_LOAD_SHARED(consumer_quit) == 1) {
                        err = 0;        /* All is OK */
                        goto end;
                }
                /* poll on the array of fds */
        restart:
                DBG("polling on %d fd", nb_fd + 2);
+               if (testpoint(consumerd_thread_data_poll)) {
+                       goto end;
+               }
                health_poll_entry();
                num_rdy = poll(pollfd, nb_fd + 2, -1);
                health_poll_exit();
@@ -2539,6 +2555,12 @@ void *consumer_thread_data_poll(void *data)
                        goto end;
                }
 
+               if (caa_unlikely(data_consumption_paused)) {
+                       DBG("Data consumption paused, sleeping...");
+                       sleep(1);
+                       goto restart;
+               }
+
                /*
                 * If the consumer_data_pipe triggered poll go directly to the
                 * beginning of the loop to update the array. We want to prioritize
@@ -3181,7 +3203,7 @@ void *consumer_thread_sessiond_poll(void *data)
                        err = 0;
                        goto end;
                }
-               if (consumer_quit) {
+               if (CMM_LOAD_SHARED(consumer_quit)) {
                        DBG("consumer_thread_receive_fds received quit from signal");
                        err = 0;        /* All is OK */
                        goto end;
@@ -3206,7 +3228,7 @@ end:
         * when all fds have hung up, the polling thread
         * can exit cleanly
         */
-       consumer_quit = 1;
+       CMM_STORE_SHARED(consumer_quit, 1);
 
        /*
         * Notify the data poll thread to poll back again and test the
This page took 0.02515 seconds and 4 git commands to generate.