#include <common/sessiond-comm/sessiond-comm.h>
#include <common/relayd/relayd.h>
#include <common/compat/fcntl.h>
+#include <common/compat/endian.h>
#include <common/consumer-metadata-cache.h>
#include <common/consumer-stream.h>
#include <common/consumer-timer.h>
net_seq_idx = stream->net_seq_idx;
}
}
- ret = consumer_send_relayd_streams_sent(net_seq_idx);
- if (ret < 0) {
- /*
- * Flag that the relayd was the problem here probably due to a
- * communicaton error on the socket.
- */
- if (relayd_error) {
- *relayd_error = 1;
- }
- ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
- }
}
/* Inform sessiond that we are about to send channel and streams. */
rcu_read_unlock();
return ret;
}
-/*
- * Close metadata stream wakeup_fd using the given key to retrieve the channel.
- * RCU read side lock MUST be acquired before calling this function.
- *
- * NOTE: This function does NOT take any channel nor stream lock.
- *
- * Return 0 on success else LTTng error code.
- */
-static int _close_metadata(struct lttng_consumer_channel *channel)
-{
- int ret = LTTCOMM_CONSUMERD_SUCCESS;
-
- assert(channel);
- assert(channel->type == CONSUMER_CHANNEL_TYPE_METADATA);
-
- if (channel->switch_timer_enabled == 1) {
- DBG("Deleting timer on metadata channel");
- consumer_timer_switch_stop(channel);
- }
-
- if (channel->metadata_stream) {
- ret = ustctl_stream_close_wakeup_fd(channel->metadata_stream->ustream);
- if (ret < 0) {
- ERR("UST consumer unable to close fd of metadata (ret: %d)", ret);
- ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
- }
-
- if (channel->monitor) {
- /* Close the read-side in consumer_del_metadata_stream */
- ret = close(channel->metadata_stream->ust_metadata_poll_pipe[1]);
- if (ret < 0) {
- PERROR("Close UST metadata write-side poll pipe");
- ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
- }
- }
- }
-
- return ret;
-}
/*
* Close metadata stream wakeup_fd using the given key to retrieve the channel.
goto error_unlock;
}
- ret = _close_metadata(channel);
+ lttng_ustconsumer_close_metadata(channel);
error_unlock:
pthread_mutex_unlock(&channel->lock);
ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
goto error;
}
+ ret = consumer_send_relayd_streams_sent(
+ metadata->metadata_stream->net_seq_idx);
+ if (ret < 0) {
+ ret = LTTCOMM_CONSUMERD_RELAYD_FAIL;
+ goto error;
+ }
}
ret = send_streams_to_thread(metadata, ctx);
* the stream is still in the local stream list of the channel. This call
* will make sure to clean that list.
*/
- cds_list_del(&metadata->metadata_stream->send_node);
consumer_stream_destroy(metadata->metadata_stream, NULL);
+ cds_list_del(&metadata->metadata_stream->send_node);
+ metadata->metadata_stream = NULL;
error_no_stream:
end:
return ret;
* Clean up the stream completly because the next snapshot will use a new
* metadata stream.
*/
- cds_list_del(&metadata_stream->send_node);
consumer_stream_destroy(metadata_stream, NULL);
+ cds_list_del(&metadata_stream->send_node);
metadata_channel->metadata_stream = NULL;
error:
health_code_update();
- if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
- /*
- * Notify the session daemon that the command is completed.
- *
- * On transport layer error, the function call will print an error
- * message so handling the returned code is a bit useless since we
- * return an error code anyway.
- */
- (void) consumer_send_status_msg(sock, ret_code);
- return -ENOENT;
- }
+ /* deprecated */
+ assert(msg.cmd_type != LTTNG_CONSUMER_STOP);
health_code_update();
relayd = consumer_find_relayd(index);
if (relayd == NULL) {
DBG("Unable to find relayd %" PRIu64, index);
- ret_code = LTTNG_ERR_NO_CONSUMER;
+ ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
}
/*
channel = consumer_find_channel(key);
if (!channel) {
ERR("UST consumer get channel key %" PRIu64 " not found", key);
- ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
goto end_msg_sessiond;
}
health_poll_entry();
ret = lttng_consumer_poll_socket(consumer_sockpoll);
health_poll_exit();
- if (ret < 0) {
+ if (ret) {
goto error_fatal;
}
ctx);
if (ret < 0) {
ERR("Snapshot metadata failed");
- ret_code = LTTNG_ERR_UST_META_FAIL;
+ ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
}
} else {
ret = snapshot_channel(msg.u.snapshot_channel.key,
ctx);
if (ret < 0) {
ERR("Snapshot channel failed");
- ret_code = LTTNG_ERR_UST_CHAN_FAIL;
+ ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
}
}
ustctl_destroy_stream(stream->ustream);
}
+int lttng_ustconsumer_get_wakeup_fd(struct lttng_consumer_stream *stream)
+{
+ assert(stream);
+ assert(stream->ustream);
+
+ return ustctl_stream_get_wakeup_fd(stream->ustream);
+}
+
+int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream)
+{
+ assert(stream);
+ assert(stream->ustream);
+
+ return ustctl_stream_close_wakeup_fd(stream->ustream);
+}
+
/*
* Populate index values of a UST stream. Values are set in big endian order.
*
return ret;
}
+/*
+ * Return 0 on success else a negative value.
+ */
+static int notify_if_more_data(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret;
+ struct ustctl_consumer_stream *ustream;
+
+ assert(stream);
+ assert(ctx);
+
+ ustream = stream->ustream;
+
+ /*
+ * First, we are going to check if there is a new subbuffer available
+ * before reading the stream wait_fd.
+ */
+ /* Get the next subbuffer */
+ ret = ustctl_get_next_subbuf(ustream);
+ if (ret) {
+ /* No more data found, flag the stream. */
+ stream->has_data = 0;
+ ret = 0;
+ goto end;
+ }
+
+ ret = ustctl_put_next_subbuf(ustream);
+ assert(!ret);
+
+ /* This stream still has data. Flag it and wake up the data thread. */
+ stream->has_data = 1;
+
+ if (stream->monitor && !stream->hangup_flush_done && !ctx->has_wakeup) {
+ ssize_t writelen;
+
+ writelen = lttng_pipe_write(ctx->consumer_wakeup_pipe, "!", 1);
+ if (writelen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
+ ret = writelen;
+ goto end;
+ }
+
+ /* The wake up pipe has been notified. */
+ ctx->has_wakeup = 1;
+ }
+ ret = 0;
+
+end:
+ return ret;
+}
+
/*
* Read subbuffer from the given stream.
*
unsigned long len, subbuf_size, padding;
int err, write_index = 1;
long ret = 0;
- char dummy;
struct ustctl_consumer_stream *ustream;
struct ctf_packet_index index;
ustream = stream->ustream;
/*
- * We can consume the 1 byte written into the wait_fd by UST.
- * Don't trigger error if we cannot read this one byte (read
- * returns 0), or if the error is EAGAIN or EWOULDBLOCK.
+ * We can consume the 1 byte written into the wait_fd by UST. Don't trigger
+ * error if we cannot read this one byte (read returns 0), or if the error
+ * is EAGAIN or EWOULDBLOCK.
+ *
+ * This is only done when the stream is monitored by a thread, before the
+ * flush is done after a hangup and if the stream is not flagged with data
+ * since there might be nothing to consume in the wait fd but still have
+ * data available flagged by the consumer wake up pipe.
*/
- if (stream->monitor && !stream->hangup_flush_done) {
+ if (stream->monitor && !stream->hangup_flush_done && !stream->has_data) {
+ char dummy;
ssize_t readlen;
readlen = lttng_read(stream->wait_fd, &dummy, 1);
err = ustctl_put_next_subbuf(ustream);
assert(err == 0);
+ /*
+ * This will consumer the byte on the wait_fd if and only if there is not
+ * next subbuffer to be acquired.
+ */
+ if (!stream->metadata_flag) {
+ ret = notify_if_more_data(stream, ctx);
+ if (ret < 0) {
+ goto end;
+ }
+ }
+
/* Write index if needed. */
if (!write_index) {
goto end;
*/
DBG("UST consumer metadata pending check: contiguous %" PRIu64 " vs pushed %" PRIu64,
contiguous, pushed);
- assert(((int64_t) contiguous - pushed) >= 0);
+ assert(((int64_t) (contiguous - pushed)) >= 0);
if ((contiguous != pushed) ||
(((int64_t) contiguous - pushed) > 0 || contiguous == 0)) {
ret = 1; /* Data is pending */
return ret;
}
+/*
+ * Stop a given metadata channel timer if enabled and close the wait fd which
+ * is the poll pipe of the metadata stream.
+ *
+ * This MUST be called with the metadata channel acquired.
+ */
+void lttng_ustconsumer_close_metadata(struct lttng_consumer_channel *metadata)
+{
+ int ret;
+
+ assert(metadata);
+ assert(metadata->type == CONSUMER_CHANNEL_TYPE_METADATA);
+
+ DBG("Closing metadata channel key %" PRIu64, metadata->key);
+
+ if (metadata->switch_timer_enabled == 1) {
+ consumer_timer_switch_stop(metadata);
+ }
+
+ if (!metadata->metadata_stream) {
+ goto end;
+ }
+
+ /*
+ * Closing write side so the thread monitoring the stream wakes up if any
+ * and clean the metadata stream.
+ */
+ if (metadata->metadata_stream->ust_metadata_poll_pipe[1] >= 0) {
+ ret = close(metadata->metadata_stream->ust_metadata_poll_pipe[1]);
+ if (ret < 0) {
+ PERROR("closing metadata pipe write side");
+ }
+ metadata->metadata_stream->ust_metadata_poll_pipe[1] = -1;
+ }
+
+end:
+ return;
+}
+
/*
* Close every metadata stream wait fd of the metadata hash table. This
* function MUST be used very carefully so not to run into a race between the
* producer so calling this is safe because we are assured that no state change
* can occur in the metadata thread for the streams in the hash table.
*/
-void lttng_ustconsumer_close_metadata(struct lttng_ht *metadata_ht)
+void lttng_ustconsumer_close_all_metadata(struct lttng_ht *metadata_ht)
{
struct lttng_ht_iter iter;
struct lttng_consumer_stream *stream;
health_code_update();
pthread_mutex_lock(&stream->chan->lock);
- /*
- * Whatever returned value, we must continue to try to close everything
- * so ignore it.
- */
- (void) _close_metadata(stream->chan);
- DBG("Metadata wait fd %d and poll pipe fd %d closed", stream->wait_fd,
- stream->ust_metadata_poll_pipe[1]);
+ lttng_ustconsumer_close_metadata(stream->chan);
pthread_mutex_unlock(&stream->chan->lock);
}
assert(channel);
assert(channel->metadata_cache);
+ memset(&request, 0, sizeof(request));
+
/* send the metadata request to sessiond */
switch (consumer_data.type) {
case LTTNG_CONSUMER64_UST:
pthread_mutex_unlock(&ctx->metadata_socket_lock);
return ret;
}
+
+/*
+ * Return the ustctl call for the get stream id.
+ */
+int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream,
+ uint64_t *stream_id)
+{
+ assert(stream);
+ assert(stream_id);
+
+ return ustctl_get_stream_id(stream->ustream, stream_id);
+}