if (ctx->on_recv_channel != NULL) {
ret = ctx->on_recv_channel(channel);
if (ret == 0) {
- ret = consumer_add_channel(channel);
+ ret = consumer_add_channel(channel, ctx);
} else if (ret < 0) {
/* Most likely an ENOMEM. */
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
goto error;
}
} else {
- ret = consumer_add_channel(channel);
+ ret = consumer_add_channel(channel, ctx);
}
DBG("UST consumer channel added (key: %" PRIu64 ")", channel->key);
goto error;
}
- ret = ustctl_stream_close_wakeup_fd(stream->ustream);
- if (ret < 0) {
- goto error;
- }
-
error:
return ret;
}
goto error;
}
+ ret = ustctl_channel_close_wakeup_fd(channel->uchan);
+ if (ret < 0) {
+ goto error;
+ }
+
/* The channel was sent successfully to the sessiond at this point. */
cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
/* Try to send the stream to the relayd if one is available. */
goto error;
}
+ channel->wait_fd = ustctl_channel_get_wait_fd(channel->uchan);
+
+ if (ret < 0) {
+ goto error;
+ }
+
/* Open all streams for this channel. */
ret = create_ust_streams(channel, ctx);
if (ret < 0) {
return ret;
}
+/*
+ * Flush channel's streams using the given key to retrieve the channel.
+ *
+ * Return 0 on success else an LTTng error code.
+ */
+static int flush_channel(uint64_t chan_key)
+{
+ int ret = 0;
+ struct lttng_consumer_channel *channel;
+ struct lttng_consumer_stream *stream;
+ struct lttng_ht *ht;
+ struct lttng_ht_iter iter;
+
+ DBG("UST consumer flush channel key %lu", chan_key);
+
+ channel = consumer_find_channel(chan_key);
+ if (!channel) {
+ ERR("UST consumer flush channel %lu not found", chan_key);
+ ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+ goto error;
+ }
+
+ ht = consumer_data.stream_per_chan_id_ht;
+
+ /* For each stream of the channel id, flush it. */
+ rcu_read_lock();
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
+ &channel->key, &iter.iter, stream, node_channel_id.node) {
+ ustctl_flush_buffer(stream->ustream, 1);
+ }
+ rcu_read_unlock();
+
+error:
+ return ret;
+}
+
/*
* Close metadata stream wakeup_fd using the given key to retrieve the channel.
*
attr.overwrite = msg.u.ask_channel.overwrite;
attr.switch_timer_interval = msg.u.ask_channel.switch_timer_interval;
attr.read_timer_interval = msg.u.ask_channel.read_timer_interval;
+ attr.chan_id = msg.u.ask_channel.chan_id;
memcpy(attr.uuid, msg.u.ask_channel.uuid, sizeof(attr.uuid));
/* Translate the event output type to UST. */
goto end_msg_sessiond;
}
+ case LTTNG_CONSUMER_FLUSH_CHANNEL:
+ {
+ int ret;
+
+ ret = flush_channel(msg.u.flush_channel.key);
+ if (ret != 0) {
+ ret_code = ret;
+ }
+
+ goto end_msg_sessiond;
+ }
case LTTNG_CONSUMER_PUSH_METADATA:
{
int ret;
if (!channel) {
ERR("UST consumer push metadata %lu not found", key);
ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+ goto end_msg_sessiond;
}
metadata_str = zmalloc(len * sizeof(char));
}
rcu_read_unlock();
}
+
+void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ ret = ustctl_stream_close_wakeup_fd(stream->ustream);
+ if (ret < 0) {
+ ERR("Unable to close wakeup fd");
+ }
+}