Fix: ust-consumer: flush empty packets on snapshot channel
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index fe7445b7f133818353fb96b1df10ed0171db88bf..89109b919a85231615fdb7f86a3c98518a9d422a 100644 (file)
@@ -767,7 +767,54 @@ static int flush_channel(uint64_t chan_key)
 
                health_code_update();
 
-               ustctl_flush_buffer(stream->ustream, 1);
+               pthread_mutex_lock(&stream->lock);
+               if (!stream->quiescent) {
+                       ustctl_flush_buffer(stream->ustream, 0);
+                       stream->quiescent = true;
+               }
+               pthread_mutex_unlock(&stream->lock);
+       }
+error:
+       rcu_read_unlock();
+       return ret;
+}
+
+/*
+ * Clear quiescent state from channel's streams using the given key to
+ * retrieve the channel.
+ *
+ * Return 0 on success else an LTTng error code.
+ */
+static int clear_quiescent_channel(uint64_t chan_key)
+{
+       int ret = 0;
+       struct lttng_consumer_channel *channel;
+       struct lttng_consumer_stream *stream;
+       struct lttng_ht *ht;
+       struct lttng_ht_iter iter;
+
+       DBG("UST consumer clear quiescent channel key %" PRIu64, chan_key);
+
+       rcu_read_lock();
+       channel = consumer_find_channel(chan_key);
+       if (!channel) {
+               ERR("UST consumer clear quiescent channel %" PRIu64 " not found", chan_key);
+               ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+               goto error;
+       }
+
+       ht = consumer_data.stream_per_chan_id_ht;
+
+       /* For each stream of the channel id, clear quiescent state. */
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
+                       &channel->key, &iter.iter, stream, node_channel_id.node) {
+
+               health_code_update();
+
+               pthread_mutex_lock(&stream->lock);
+               stream->quiescent = false;
+               pthread_mutex_unlock(&stream->lock);
        }
 error:
        rcu_read_unlock();
@@ -1064,7 +1111,13 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
                        }
                }
 
-               ustctl_flush_buffer(stream->ustream, 1);
+               /*
+                * If tracing is active, we want to perform a "full" buffer flush.
+                * Else, if quiescent, it has already been done by the prior stop.
+                */
+               if (!stream->quiescent) {
+                       ustctl_flush_buffer(stream->ustream, 0);
+               }
 
                ret = lttng_ustconsumer_take_snapshot(stream);
                if (ret < 0) {
@@ -1582,6 +1635,18 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                goto end_msg_sessiond;
        }
+       case LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL:
+       {
+               int ret;
+
+               ret = clear_quiescent_channel(
+                               msg.u.clear_quiescent_channel.key);
+               if (ret != 0) {
+                       ret_code = ret;
+               }
+
+               goto end_msg_sessiond;
+       }
        case LTTNG_CONSUMER_PUSH_METADATA:
        {
                int ret;
@@ -1692,7 +1757,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        }
        case LTTNG_CONSUMER_DISCARDED_EVENTS:
        {
-               uint64_t ret;
+               int ret = 0;
+               uint64_t discarded_events;
                struct lttng_ht_iter iter;
                struct lttng_ht *ht;
                struct lttng_consumer_stream *stream;
@@ -1713,13 +1779,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                 * found (no events are dropped if the channel is not yet in
                 * use).
                 */
-               ret = 0;
+               discarded_events = 0;
                cds_lfht_for_each_entry_duplicate(ht->ht,
                                ht->hash_fct(&id, lttng_ht_seed),
                                ht->match_fct, &id,
                                &iter.iter, stream, node_session_id.node) {
                        if (stream->chan->key == key) {
-                               ret = stream->chan->discarded_events;
+                               discarded_events = stream->chan->discarded_events;
                                break;
                        }
                }
@@ -1732,7 +1798,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                health_code_update();
 
                /* Send back returned value to session daemon */
-               ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+               ret = lttcomm_send_unix_sock(sock, &discarded_events, sizeof(discarded_events));
                if (ret < 0) {
                        PERROR("send discarded events");
                        goto error_fatal;
@@ -1944,14 +2010,19 @@ int lttng_ustconsumer_get_sequence_number(
 }
 
 /*
- * Called when the stream signal the consumer that it has hang up.
+ * Called when the stream signals the consumer that it has hung up.
  */
 void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
 {
        assert(stream);
        assert(stream->ustream);
 
-       ustctl_flush_buffer(stream->ustream, 0);
+       pthread_mutex_lock(&stream->lock);
+       if (!stream->quiescent) {
+               ustctl_flush_buffer(stream->ustream, 0);
+               stream->quiescent = true;
+       }
+       pthread_mutex_unlock(&stream->lock);
        stream->hangup_flush_done = 1;
 }
 
@@ -1985,11 +2056,6 @@ void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
                        }
                }
        }
-       /* Try to rmdir all directories under shm_path root. */
-       if (chan->root_shm_path[0]) {
-               (void) run_as_recursive_rmdir(chan->root_shm_path,
-                               chan->uid, chan->gid);
-       }
 }
 
 void lttng_ustconsumer_free_channel(struct lttng_consumer_channel *chan)
@@ -1999,6 +2065,11 @@ void lttng_ustconsumer_free_channel(struct lttng_consumer_channel *chan)
 
        consumer_metadata_cache_destroy(chan);
        ustctl_destroy_channel(chan->uchan);
+       /* Try to rmdir all directories under shm_path root. */
+       if (chan->root_shm_path[0]) {
+               (void) run_as_recursive_rmdir(chan->root_shm_path,
+                               chan->uid, chan->gid);
+       }
        free(chan->stream_fds);
 }
 
This page took 0.024802 seconds and 4 git commands to generate.