struct lttng_consumer_channel *channel,
struct lttng_consumer_local_data *ctx, int *relayd_error)
{
- int ret, ret_code = LTTNG_OK;
+ int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
struct lttng_consumer_stream *stream;
+ uint64_t net_seq_idx = -1ULL;
assert(channel);
assert(ctx);
}
ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
}
+ if (net_seq_idx == -1ULL) {
+ net_seq_idx = stream->net_seq_idx;
+ }
}
}
/* Inform sessiond that we are about to send channel and streams. */
ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0 || ret_code != LTTNG_OK) {
+ if (ret < 0 || ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
/*
* Either the session daemon is not responding or the relayd died so we
* stop now.
return 0;
error:
- if (ret_code != LTTNG_OK) {
+ if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
ret = -1;
}
return ret;
rcu_read_unlock();
return ret;
}
-/*
- * Close metadata stream wakeup_fd using the given key to retrieve the channel.
- * RCU read side lock MUST be acquired before calling this function.
- *
- * NOTE: This function does NOT take any channel nor stream lock.
- *
- * Return 0 on success else LTTng error code.
- */
-static int _close_metadata(struct lttng_consumer_channel *channel)
-{
- int ret = LTTNG_OK;
-
- assert(channel);
- assert(channel->type == CONSUMER_CHANNEL_TYPE_METADATA);
-
- if (channel->switch_timer_enabled == 1) {
- DBG("Deleting timer on metadata channel");
- consumer_timer_switch_stop(channel);
- }
-
- if (channel->metadata_stream) {
- ret = ustctl_stream_close_wakeup_fd(channel->metadata_stream->ustream);
- if (ret < 0) {
- ERR("UST consumer unable to close fd of metadata (ret: %d)", ret);
- ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
- }
-
- if (channel->monitor) {
- /* Close the read-side in consumer_del_metadata_stream */
- ret = close(channel->metadata_stream->ust_metadata_poll_pipe[1]);
- if (ret < 0) {
- PERROR("Close UST metadata write-side poll pipe");
- ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
- }
- }
- }
-
- return ret;
-}
/*
* Close metadata stream wakeup_fd using the given key to retrieve the channel.
goto error_unlock;
}
- ret = _close_metadata(channel);
+ lttng_ustconsumer_close_metadata(channel);
error_unlock:
pthread_mutex_unlock(&channel->lock);
ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
goto error;
}
+ ret = consumer_send_relayd_streams_sent(
+ metadata->metadata_stream->net_seq_idx);
+ if (ret < 0) {
+ ret = LTTCOMM_CONSUMERD_RELAYD_FAIL;
+ goto error;
+ }
}
ret = send_streams_to_thread(metadata, ctx);
DBG("UST consumer snapshot stream %s/%s (%" PRIu64 ")", path,
stream->name, stream->key);
}
+ if (relayd_id != -1ULL) {
+ ret = consumer_send_relayd_streams_sent(relayd_id);
+ if (ret < 0) {
+ goto error_unlock;
+ }
+ }
ustctl_flush_buffer(stream->ustream, 1);
uint64_t len, struct lttng_consumer_channel *channel,
int timer, int wait)
{
- int ret, ret_code = LTTNG_OK;
+ int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
char *metadata_str;
DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len);
int sock, struct pollfd *consumer_sockpoll)
{
ssize_t ret;
- enum lttng_error_code ret_code = LTTNG_OK;
+ enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
struct lttcomm_consumer_msg msg;
struct lttng_consumer_channel *channel = NULL;
attr.switch_timer_interval = msg.u.ask_channel.switch_timer_interval;
attr.read_timer_interval = msg.u.ask_channel.read_timer_interval;
attr.chan_id = msg.u.ask_channel.chan_id;
- attr.output = msg.u.ask_channel.output;
memcpy(attr.uuid, msg.u.ask_channel.uuid, sizeof(attr.uuid));
+ /* Match channel buffer type to the UST abi. */
+ switch (msg.u.ask_channel.output) {
+ case LTTNG_EVENT_MMAP:
+ default:
+ attr.output = LTTNG_UST_MMAP;
+ break;
+ }
+
/* Translate and save channel type. */
switch (msg.u.ask_channel.type) {
case LTTNG_UST_CHAN_PER_CPU:
channel = consumer_find_channel(key);
if (!channel) {
- ERR("UST consumer push metadata %" PRIu64 " not found", key);
- ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+ /*
+ * This is possible if the metadata creation on the consumer side
+ * is in flight vis-a-vis a concurrent push metadata from the
+ * session daemon. Simply return that the channel failed and the
+ * session daemon will handle that message correctly considering
+ * that this race is acceptable thus the DBG() statement here.
+ */
+ DBG("UST consumer push metadata %" PRIu64 " not found", key);
+ ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
goto end_msg_sessiond;
}
health_code_update();
/* Tell session daemon we are ready to receive the metadata. */
- ret = consumer_send_status_msg(sock, LTTNG_OK);
+ ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
if (ret < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto error_fatal;
ustctl_destroy_stream(stream->ustream);
}
+int lttng_ustconsumer_get_wakeup_fd(struct lttng_consumer_stream *stream)
+{
+ assert(stream);
+ assert(stream->ustream);
+
+ return ustctl_stream_get_wakeup_fd(stream->ustream);
+}
+
+int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream)
+{
+ assert(stream);
+ assert(stream->ustream);
+
+ return ustctl_stream_close_wakeup_fd(stream->ustream);
+}
+
/*
* Populate index values of a UST stream. Values are set in big endian order.
*
* Return 0 on success or else a negative value.
*/
-static int get_index_values(struct lttng_packet_index *index,
+static int get_index_values(struct ctf_packet_index *index,
struct ustctl_consumer_stream *ustream)
{
int ret;
long ret = 0;
char dummy;
struct ustctl_consumer_stream *ustream;
- struct lttng_packet_index index;
+ struct ctf_packet_index index;
assert(stream);
assert(stream->ustream);
return ret;
}
+/*
+ * Stop a given metadata channel timer if enabled and close the wait fd which
+ * is the poll pipe of the metadata stream.
+ *
+ * This MUST be called with the metadata channel acquired.
+ */
+void lttng_ustconsumer_close_metadata(struct lttng_consumer_channel *metadata)
+{
+ int ret;
+
+ assert(metadata);
+ assert(metadata->type == CONSUMER_CHANNEL_TYPE_METADATA);
+
+ DBG("Closing metadata channel key %" PRIu64, metadata->key);
+
+ if (metadata->switch_timer_enabled == 1) {
+ consumer_timer_switch_stop(metadata);
+ }
+
+ if (!metadata->metadata_stream) {
+ goto end;
+ }
+
+ /*
+ * Closing write side so the thread monitoring the stream wakes up if any
+ * and clean the metadata stream.
+ */
+ if (metadata->metadata_stream->ust_metadata_poll_pipe[1] >= 0) {
+ ret = close(metadata->metadata_stream->ust_metadata_poll_pipe[1]);
+ if (ret < 0) {
+ PERROR("closing metadata pipe write side");
+ }
+ metadata->metadata_stream->ust_metadata_poll_pipe[1] = -1;
+ }
+
+end:
+ return;
+}
+
/*
* Close every metadata stream wait fd of the metadata hash table. This
* function MUST be used very carefully so not to run into a race between the
* producer so calling this is safe because we are assured that no state change
* can occur in the metadata thread for the streams in the hash table.
*/
-void lttng_ustconsumer_close_metadata(struct lttng_ht *metadata_ht)
+void lttng_ustconsumer_close_all_metadata(struct lttng_ht *metadata_ht)
{
struct lttng_ht_iter iter;
struct lttng_consumer_stream *stream;
health_code_update();
pthread_mutex_lock(&stream->chan->lock);
- /*
- * Whatever returned value, we must continue to try to close everything
- * so ignore it.
- */
- (void) _close_metadata(stream->chan);
- DBG("Metadata wait fd %d and poll pipe fd %d closed", stream->wait_fd,
- stream->ust_metadata_poll_pipe[1]);
+ lttng_ustconsumer_close_metadata(stream->chan);
pthread_mutex_unlock(&stream->chan->lock);
}
{
struct lttcomm_metadata_request_msg request;
struct lttcomm_consumer_msg msg;
- enum lttng_error_code ret_code = LTTNG_OK;
+ enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
uint64_t len, key, offset;
int ret;
/* Tell session daemon we are ready to receive the metadata. */
ret = consumer_send_status_msg(ctx->consumer_metadata_socket,
- LTTNG_OK);
+ LTTCOMM_CONSUMERD_SUCCESS);
if (ret < 0 || len == 0) {
/*
* Somehow, the session daemon is not responding anymore or there is
health_code_update();
- ret_code = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
+ ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
key, offset, len, channel, timer, wait);
- if (ret_code >= 0) {
+ if (ret >= 0) {
/*
* Only send the status msg if the sessiond is alive meaning a positive
* ret code.
*/
- (void) consumer_send_status_msg(ctx->consumer_metadata_socket, ret_code);
+ (void) consumer_send_status_msg(ctx->consumer_metadata_socket, ret);
}
ret = 0;