Fix: ust-consumer: flush empty packets on snapshot channel
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index 4d1e7f155abd0af1474b32eeb9fdc49905127f4c..89109b919a85231615fdb7f86a3c98518a9d422a 100644 (file)
@@ -767,7 +767,54 @@ static int flush_channel(uint64_t chan_key)
 
                health_code_update();
 
-               ustctl_flush_buffer(stream->ustream, 1);
+               pthread_mutex_lock(&stream->lock);
+               if (!stream->quiescent) {
+                       ustctl_flush_buffer(stream->ustream, 0);
+                       stream->quiescent = true;
+               }
+               pthread_mutex_unlock(&stream->lock);
+       }
+error:
+       rcu_read_unlock();
+       return ret;
+}
+
+/*
+ * Clear quiescent state from channel's streams using the given key to
+ * retrieve the channel.
+ *
+ * Return 0 on success else an LTTng error code.
+ */
+static int clear_quiescent_channel(uint64_t chan_key)
+{
+       int ret = 0;
+       struct lttng_consumer_channel *channel;
+       struct lttng_consumer_stream *stream;
+       struct lttng_ht *ht;
+       struct lttng_ht_iter iter;
+
+       DBG("UST consumer clear quiescent channel key %" PRIu64, chan_key);
+
+       rcu_read_lock();
+       channel = consumer_find_channel(chan_key);
+       if (!channel) {
+               ERR("UST consumer clear quiescent channel %" PRIu64 " not found", chan_key);
+               ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+               goto error;
+       }
+
+       ht = consumer_data.stream_per_chan_id_ht;
+
+       /* For each stream of the channel id, clear quiescent state. */
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
+                       &channel->key, &iter.iter, stream, node_channel_id.node) {
+
+               health_code_update();
+
+               pthread_mutex_lock(&stream->lock);
+               stream->quiescent = false;
+               pthread_mutex_unlock(&stream->lock);
        }
 error:
        rcu_read_unlock();
@@ -1064,7 +1111,13 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
                        }
                }
 
-               ustctl_flush_buffer(stream->ustream, 1);
+               /*
+                * If tracing is active, we want to perform a "full" buffer flush.
+                * Else, if quiescent, it has already been done by the prior stop.
+                */
+               if (!stream->quiescent) {
+                       ustctl_flush_buffer(stream->ustream, 0);
+               }
 
                ret = lttng_ustconsumer_take_snapshot(stream);
                if (ret < 0) {
@@ -1582,6 +1635,18 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                goto end_msg_sessiond;
        }
+       case LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL:
+       {
+               int ret;
+
+               ret = clear_quiescent_channel(
+                               msg.u.clear_quiescent_channel.key);
+               if (ret != 0) {
+                       ret_code = ret;
+               }
+
+               goto end_msg_sessiond;
+       }
        case LTTNG_CONSUMER_PUSH_METADATA:
        {
                int ret;
@@ -1945,14 +2010,19 @@ int lttng_ustconsumer_get_sequence_number(
 }
 
 /*
- * Called when the stream signal the consumer that it has hang up.
+ * Called when the stream signals the consumer that it has hung up.
  */
 void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
 {
        assert(stream);
        assert(stream->ustream);
 
-       ustctl_flush_buffer(stream->ustream, 0);
+       pthread_mutex_lock(&stream->lock);
+       if (!stream->quiescent) {
+               ustctl_flush_buffer(stream->ustream, 0);
+               stream->quiescent = true;
+       }
+       pthread_mutex_unlock(&stream->lock);
        stream->hangup_flush_done = 1;
 }
 
This page took 0.02412 seconds and 4 git commands to generate.