+static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
+ uint64_t stream_id)
+{
+ int ret;
+ struct ctf_packet_index index;
+
+ memset(&index, 0, sizeof(index));
+ index.stream_id = htobe64(stream_id);
+ index.timestamp_end = htobe64(ts);
+ ret = consumer_stream_write_index(stream, &index);
+ if (ret < 0) {
+ goto error;
+ }
+
+error:
+ return ret;
+}
+
+static int check_kernel_stream(struct lttng_consumer_stream *stream)
+{
+ uint64_t ts, stream_id;
+ int ret;
+
+ /*
+ * While holding the stream mutex, try to take a snapshot, if it
+ * succeeds, it means that data is ready to be sent, just let the data
+ * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
+ * means that there is no data to read after the flush, so we can
+ * safely send the empty index.
+ */
+ pthread_mutex_lock(&stream->lock);
+ ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
+ if (ret < 0) {
+ ERR("Failed to get the current timestamp");
+ goto error_unlock;
+ }
+ ret = kernctl_buffer_flush(stream->wait_fd);
+ if (ret < 0) {
+ ERR("Failed to flush kernel stream");
+ goto error_unlock;
+ }
+ ret = kernctl_snapshot(stream->wait_fd);
+ if (ret < 0) {
+ if (errno != EAGAIN && errno != ENODATA) {
+ PERROR("live timer kernel snapshot");
+ ret = -1;
+ goto error_unlock;
+ }
+ ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
+ if (ret < 0) {
+ PERROR("kernctl_get_stream_id");
+ goto error_unlock;
+ }
+ DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
+ ret = send_empty_index(stream, ts, stream_id);
+ if (ret < 0) {
+ goto error_unlock;
+ }
+ }
+ ret = 0;
+
+error_unlock:
+ pthread_mutex_unlock(&stream->lock);
+ return ret;
+}
+
+static int check_ust_stream(struct lttng_consumer_stream *stream)
+{
+ uint64_t ts, stream_id;
+ int ret;
+
+ assert(stream);
+ assert(stream->ustream);
+ /*
+ * While holding the stream mutex, try to take a snapshot, if it
+ * succeeds, it means that data is ready to be sent, just let the data
+ * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
+ * means that there is no data to read after the flush, so we can
+ * safely send the empty index.
+ */
+ pthread_mutex_lock(&stream->lock);
+ ret = cds_lfht_is_node_deleted(&stream->node.node);
+ if (ret) {
+ goto error_unlock;
+ }
+
+ ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
+ if (ret < 0) {
+ ERR("Failed to get the current timestamp");
+ goto error_unlock;
+ }
+ lttng_ustconsumer_flush_buffer(stream, 1);
+ ret = lttng_ustconsumer_take_snapshot(stream);
+ if (ret < 0) {
+ if (ret != -EAGAIN) {
+ ERR("Taking UST snapshot");
+ ret = -1;
+ goto error_unlock;
+ }
+ ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
+ if (ret < 0) {
+ PERROR("ustctl_get_stream_id");
+ goto error_unlock;
+ }
+ DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
+ ret = send_empty_index(stream, ts, stream_id);
+ if (ret < 0) {
+ goto error_unlock;
+ }
+ }
+ ret = 0;
+
+error_unlock:
+ pthread_mutex_unlock(&stream->lock);
+ return ret;
+}
+
+/*
+ * Execute action on a live timer
+ */
+static void live_timer(struct lttng_consumer_local_data *ctx,
+ int sig, siginfo_t *si, void *uc)
+{
+ int ret;
+ struct lttng_consumer_channel *channel;
+ struct lttng_consumer_stream *stream;
+ struct lttng_ht *ht;
+ struct lttng_ht_iter iter;
+
+ channel = si->si_value.sival_ptr;
+ assert(channel);
+
+ if (channel->switch_timer_error) {
+ goto error;
+ }
+ ht = consumer_data.stream_per_chan_id_ht;
+
+ DBG("Live timer for channel %" PRIu64, channel->key);
+
+ rcu_read_lock();
+ switch (ctx->type) {
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct, &channel->key, &iter.iter,
+ stream, node_channel_id.node) {
+ ret = check_ust_stream(stream);
+ if (ret < 0) {
+ goto error_unlock;
+ }
+ }
+ break;
+ case LTTNG_CONSUMER_KERNEL:
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct, &channel->key, &iter.iter,
+ stream, node_channel_id.node) {
+ ret = check_kernel_stream(stream);
+ if (ret < 0) {
+ goto error_unlock;
+ }
+ }
+ break;
+ case LTTNG_CONSUMER_UNKNOWN:
+ assert(0);
+ break;
+ }
+
+error_unlock:
+ rcu_read_unlock();
+
+error:
+ return;
+}
+
+static
+void consumer_timer_signal_thread_qs(unsigned int signr)
+{
+ sigset_t pending_set;
+ int ret;
+
+ /*
+ * We need to be the only thread interacting with the thread
+ * that manages signals for teardown synchronization.
+ */
+ pthread_mutex_lock(&timer_signal.lock);
+
+ /* Ensure we don't have any signal queued for this channel. */
+ for (;;) {
+ ret = sigemptyset(&pending_set);
+ if (ret == -1) {
+ PERROR("sigemptyset");
+ }
+ ret = sigpending(&pending_set);
+ if (ret == -1) {
+ PERROR("sigpending");
+ }
+ if (!sigismember(&pending_set, LTTNG_CONSUMER_SIG_SWITCH)) {
+ break;
+ }
+ caa_cpu_relax();
+ }
+
+ /*
+ * From this point, no new signal handler will be fired that would try to
+ * access "chan". However, we still need to wait for any currently
+ * executing handler to complete.
+ */
+ cmm_smp_mb();
+ CMM_STORE_SHARED(timer_signal.qs_done, 0);
+ cmm_smp_mb();
+
+ /*
+ * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
+ * up.
+ */
+ kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
+
+ while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
+ caa_cpu_relax();
+ }
+ cmm_smp_mb();
+
+ pthread_mutex_unlock(&timer_signal.lock);
+}
+