Tests: Fix nprocesses applications shutdown
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index 5d9dc96c7925fd61d1efa44f43637e31df79b9e1..06b59c58442e4a833b3b0947aa3d34244b28a305 100644 (file)
@@ -88,14 +88,14 @@ static int add_channel(struct lttng_consumer_channel *channel,
        if (ctx->on_recv_channel != NULL) {
                ret = ctx->on_recv_channel(channel);
                if (ret == 0) {
-                       ret = consumer_add_channel(channel);
+                       ret = consumer_add_channel(channel, ctx);
                } else if (ret < 0) {
                        /* Most likely an ENOMEM. */
                        lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
                        goto error;
                }
        } else {
-               ret = consumer_add_channel(channel);
+               ret = consumer_add_channel(channel, ctx);
        }
 
        DBG("UST consumer channel added (key: %" PRIu64 ")", channel->key);
@@ -368,11 +368,6 @@ static int send_sessiond_stream(int sock, struct lttng_consumer_stream *stream)
                goto error;
        }
 
-       ret = ustctl_stream_close_wakeup_fd(stream->ustream);
-       if (ret < 0) {
-               goto error;
-       }
-
 error:
        return ret;
 }
@@ -401,6 +396,11 @@ static int send_sessiond_channel(int sock,
                goto error;
        }
 
+       ret = ustctl_channel_close_wakeup_fd(channel->uchan);
+       if (ret < 0) {
+               goto error;
+       }
+
        /* The channel was sent successfully to the sessiond at this point. */
        cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
                /* Try to send the stream to the relayd if one is available. */
@@ -476,6 +476,12 @@ static int ask_channel(struct lttng_consumer_local_data *ctx, int sock,
                goto error;
        }
 
+       channel->wait_fd = ustctl_channel_get_wait_fd(channel->uchan);
+
+       if (ret < 0) {
+               goto error;
+       }
+
        /* Open all streams for this channel. */
        ret = create_ust_streams(channel, ctx);
        if (ret < 0) {
@@ -551,6 +557,43 @@ error:
        return ret;
 }
 
+/*
+ * Flush channel's streams using the given key to retrieve the channel.
+ *
+ * Return 0 on success else an LTTng error code.
+ */
+static int flush_channel(uint64_t chan_key)
+{
+       int ret = 0;
+       struct lttng_consumer_channel *channel;
+       struct lttng_consumer_stream *stream;
+       struct lttng_ht *ht;
+       struct lttng_ht_iter iter;
+
+       DBG("UST consumer flush channel key %lu", chan_key);
+
+       channel = consumer_find_channel(chan_key);
+       if (!channel) {
+               ERR("UST consumer flush channel %lu not found", chan_key);
+               ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+               goto error;
+       }
+
+       ht = consumer_data.stream_per_chan_id_ht;
+
+       /* For each stream of the channel id, flush it. */
+       rcu_read_lock();
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
+                       &channel->key, &iter.iter, stream, node_channel_id.node) {
+                       ustctl_flush_buffer(stream->ustream, 1);
+       }
+       rcu_read_unlock();
+
+error:
+       return ret;
+}
+
 /*
  * Close metadata stream wakeup_fd using the given key to retrieve the channel.
  *
@@ -761,6 +804,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                attr.overwrite = msg.u.ask_channel.overwrite;
                attr.switch_timer_interval = msg.u.ask_channel.switch_timer_interval;
                attr.read_timer_interval = msg.u.ask_channel.read_timer_interval;
+               attr.chan_id = msg.u.ask_channel.chan_id;
                memcpy(attr.uuid, msg.u.ask_channel.uuid, sizeof(attr.uuid));
 
                /* Translate the event output type to UST. */
@@ -898,6 +942,17 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                goto end_msg_sessiond;
        }
+       case LTTNG_CONSUMER_FLUSH_CHANNEL:
+       {
+               int ret;
+
+               ret = flush_channel(msg.u.flush_channel.key);
+               if (ret != 0) {
+                       ret_code = ret;
+               }
+
+               goto end_msg_sessiond;
+       }
        case LTTNG_CONSUMER_PUSH_METADATA:
        {
                int ret;
@@ -913,6 +968,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                if (!channel) {
                        ERR("UST consumer push metadata %lu not found", key);
                        ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+                       goto end_msg_sessiond;
                }
 
                metadata_str = zmalloc(len * sizeof(char));
@@ -1277,3 +1333,13 @@ void lttng_ustconsumer_close_metadata(struct lttng_ht *metadata_ht)
        }
        rcu_read_unlock();
 }
+
+void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
+{
+       int ret;
+
+       ret = ustctl_stream_close_wakeup_fd(stream->ustream);
+       if (ret < 0) {
+               ERR("Unable to close wakeup fd");
+       }
+}
This page took 0.025201 seconds and 4 git commands to generate.