Session consumed size notification
[lttng-tools.git] / src / common / consumer / consumer-timer.c
index 60ed94083e543a4d082e6f4e700f1ca57662120a..721f146e20fdff2a5d7cecd0e488008bd618269f 100644 (file)
@@ -72,6 +72,10 @@ static void setmask(sigset_t *mask)
        if (ret) {
                PERROR("sigaddset monitor");
        }
+       ret = sigaddset(mask, LTTNG_CONSUMER_SIG_EXIT);
+       if (ret) {
+               PERROR("sigaddset exit");
+       }
 }
 
 static int channel_monitor_pipe = -1;
@@ -394,7 +398,7 @@ void consumer_timer_signal_thread_qs(unsigned int signr)
                if (ret == -1) {
                        PERROR("sigpending");
                }
-               if (!sigismember(&pending_set, LTTNG_CONSUMER_SIG_SWITCH)) {
+               if (!sigismember(&pending_set, signr)) {
                        break;
                }
                caa_cpu_relax();
@@ -629,7 +633,7 @@ int consumer_signal_init(void)
 
 static
 int sample_channel_positions(struct lttng_consumer_channel *channel,
-               uint64_t *_highest_use, uint64_t *_lowest_use,
+               uint64_t *_highest_use, uint64_t *_lowest_use, uint64_t *_total_consumed,
                sample_positions_cb sample, get_consumed_cb get_consumed,
                get_produced_cb get_produced)
 {
@@ -640,6 +644,8 @@ int sample_channel_positions(struct lttng_consumer_channel *channel,
        uint64_t high = 0, low = UINT64_MAX;
        struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
 
+       *_total_consumed = 0;
+
        rcu_read_lock();
 
        cds_lfht_for_each_entry_duplicate(ht->ht,
@@ -677,6 +683,15 @@ int sample_channel_positions(struct lttng_consumer_channel *channel,
                usage = produced - consumed;
                high = (usage > high) ? usage : high;
                low = (usage < low) ? usage : low;
+
+               /*
+                * We don't use consumed here for 2 reasons:
+                *  - output_written takes into account the padding written in the
+                *    tracefiles when we stop the session;
+                *  - the consumed position is not the accurate representation of what
+                *    was extracted from a buffer in overwrite mode.
+                */
+               *_total_consumed += stream->output_written;
        next:
                pthread_mutex_unlock(&stream->lock);
        }
@@ -731,7 +746,7 @@ void monitor_timer(struct lttng_consumer_local_data *ctx,
        }
 
        ret = sample_channel_positions(channel, &msg.highest, &msg.lowest,
-                       sample, get_consumed, get_produced);
+                       &msg.total_consumed, sample, get_consumed, get_produced);
        if (ret) {
                return;
        }
@@ -782,7 +797,7 @@ end:
 /*
  * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
  * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
- * LTTNG_CONSUMER_SIG_MONITOR.
+ * LTTNG_CONSUMER_SIG_MONITOR, LTTNG_CONSUMER_SIG_EXIT.
  */
 void *consumer_timer_thread(void *data)
 {
@@ -836,6 +851,9 @@ void *consumer_timer_thread(void *data)
 
                        channel = info.si_value.sival_ptr;
                        monitor_timer(ctx, channel);
+               } else if (signr == LTTNG_CONSUMER_SIG_EXIT) {
+                       assert(CMM_LOAD_SHARED(consumer_quit));
+                       goto end;
                } else {
                        ERR("Unexpected signal %d\n", info.si_signo);
                }
@@ -844,10 +862,8 @@ void *consumer_timer_thread(void *data)
 error_testpoint:
        /* Only reached in testpoint error */
        health_error();
+end:
        health_unregister(health_consumerd);
-
        rcu_unregister_thread();
-
-       /* Never return */
        return NULL;
 }
This page took 0.024115 seconds and 4 git commands to generate.