#include <common/compat/fcntl.h>
#include <common/consumer-metadata-cache.h>
#include <common/consumer-timer.h>
+#include <common/utils.h>
#include "ust-consumer.h"
if (stream->metadata_flag) {
stream_pipe = ctx->consumer_metadata_pipe[1];
} else {
- stream_pipe = ctx->consumer_data_pipe[1];
+ stream_pipe = lttng_pipe_get_writefd(ctx->consumer_data_pipe);
}
do {
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
/* Add stream on the relayd */
ret = relayd_add_stream(&relayd->control_sock, stream->name,
- stream->chan->pathname, &stream->relayd_stream_id);
+ stream->chan->pathname, &stream->relayd_stream_id,
+ stream->chan->tracefile_size,
+ stream->chan->tracefile_count);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
goto error;
*/
stream->wait_fd = wait_fd;
+ /*
+ * Increment channel refcount since the channel reference has now been
+ * assigned in the allocation process above.
+ */
+ uatomic_inc(&stream->chan->refcount);
+
/*
* Order is important this is why a list is used. On error, the caller
* should clean this list.
ret = ustctl_write_metadata_to_channel(metadata->uchan,
metadata_str + target_offset, len);
if (ret < 0) {
- ERR("ustctl write metadata fail with ret %d, len %ld", ret, len);
+ ERR("ustctl write metadata fail with ret %d, len %" PRIu64, ret, len);
goto error;
}
struct lttng_ht *ht;
struct lttng_ht_iter iter;
- DBG("UST consumer flush channel key %lu", chan_key);
+ DBG("UST consumer flush channel key %" PRIu64, chan_key);
+ rcu_read_lock();
channel = consumer_find_channel(chan_key);
if (!channel) {
- ERR("UST consumer flush channel %lu not found", chan_key);
+ ERR("UST consumer flush channel %" PRIu64 " not found", chan_key);
ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
goto error;
}
ht = consumer_data.stream_per_chan_id_ht;
/* For each stream of the channel id, flush it. */
- rcu_read_lock();
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
&channel->key, &iter.iter, stream, node_channel_id.node) {
ustctl_flush_buffer(stream->ustream, 1);
}
- rcu_read_unlock();
-
error:
+ rcu_read_unlock();
return ret;
}
/*
* Close metadata stream wakeup_fd using the given key to retrieve the channel.
+ * RCU read side lock MUST be acquired before calling this function.
*
* Return 0 on success else an LTTng error code.
*/
static int close_metadata(uint64_t chan_key)
{
- int ret;
+ int ret = 0;
struct lttng_consumer_channel *channel;
- DBG("UST consumer close metadata key %lu", chan_key);
+ DBG("UST consumer close metadata key %" PRIu64, chan_key);
channel = consumer_find_channel(chan_key);
if (!channel) {
- ERR("UST consumer close metadata %lu not found", chan_key);
+ ERR("UST consumer close metadata %" PRIu64 " not found", chan_key);
ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
goto error;
}
- ret = ustctl_stream_close_wakeup_fd(channel->metadata_stream->ustream);
- if (ret < 0) {
- ERR("UST consumer unable to close fd of metadata (ret: %d)", ret);
- ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
- goto error;
- }
- if (channel->switch_timer_enabled == 1) {
- DBG("Deleting timer on metadata channel");
- consumer_timer_switch_stop(channel);
+ pthread_mutex_lock(&consumer_data.lock);
+ if (!cds_lfht_is_node_deleted(&channel->node.node)) {
+ if (channel->switch_timer_enabled == 1) {
+ DBG("Deleting timer on metadata channel");
+ consumer_timer_switch_stop(channel);
+ }
+ ret = ustctl_stream_close_wakeup_fd(channel->metadata_stream->ustream);
+ if (ret < 0) {
+ ERR("UST consumer unable to close fd of metadata (ret: %d)", ret);
+ ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
+ goto error_unlock;
+ }
}
- consumer_metadata_cache_destroy(channel);
+error_unlock:
+ pthread_mutex_unlock(&consumer_data.lock);
error:
return ret;
}
int ret;
struct lttng_consumer_channel *metadata;
- DBG("UST consumer setup metadata key %lu", key);
+ DBG("UST consumer setup metadata key %" PRIu64, key);
metadata = consumer_find_channel(key);
if (!metadata) {
int ret, ret_code = LTTNG_OK;
char *metadata_str;
- DBG("UST consumer push metadata key %lu of len %lu", key, len);
+ DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len);
metadata_str = zmalloc(len * sizeof(char));
if (!metadata_str) {
case LTTNG_UST_CHAN_PER_CPU:
channel->type = CONSUMER_CHANNEL_TYPE_DATA;
attr.type = LTTNG_UST_CHAN_PER_CPU;
+ /*
+ * Set refcount to 1 for owner. Below, we will
+ * pass ownership to the
+ * consumer_thread_channel_poll() thread.
+ */
+ channel->refcount = 1;
break;
case LTTNG_UST_CHAN_METADATA:
channel->type = CONSUMER_CHANNEL_TYPE_METADATA;
goto end_channel_error;
}
+ if (msg.u.ask_channel.type == LTTNG_UST_CHAN_METADATA) {
+ ret = consumer_metadata_cache_allocate(channel);
+ if (ret < 0) {
+ ERR("Allocating metadata cache");
+ goto end_channel_error;
+ }
+ consumer_timer_switch_start(channel, attr.switch_timer_interval);
+ attr.switch_timer_interval = 0;
+ }
+
/*
* Add the channel to the internal state AFTER all streams were created
* and successfully sent to session daemon. This way, all streams must
* be ready before this channel is visible to the threads.
+ * If add_channel succeeds, ownership of the channel is
+ * passed to consumer_thread_channel_poll().
*/
ret = add_channel(channel, ctx);
if (ret < 0) {
+ if (msg.u.ask_channel.type == LTTNG_UST_CHAN_METADATA) {
+ if (channel->switch_timer_enabled == 1) {
+ consumer_timer_switch_stop(channel);
+ }
+ consumer_metadata_cache_destroy(channel);
+ }
goto end_channel_error;
}
-
/*
* Channel and streams are now created. Inform the session daemon that
* everything went well and should wait to receive the channel and
goto end_nosignal;
}
- if (msg.u.ask_channel.type == LTTNG_UST_CHAN_METADATA) {
- ret = consumer_metadata_cache_allocate(channel);
- if (ret < 0) {
- ERR("Allocating metadata cache");
- goto end_channel_error;
- }
- consumer_timer_switch_start(channel, attr.switch_timer_interval);
- attr.switch_timer_interval = 0;
- }
-
break;
}
case LTTNG_CONSUMER_GET_CHANNEL:
channel = consumer_find_channel(key);
if (!channel) {
- ERR("UST consumer get channel key %lu not found", key);
+ ERR("UST consumer get channel key %" PRIu64 " not found", key);
ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
goto end_msg_sessiond;
}
case LTTNG_CONSUMER_DESTROY_CHANNEL:
{
uint64_t key = msg.u.destroy_channel.key;
- struct lttng_consumer_channel *channel;
-
- channel = consumer_find_channel(key);
- if (!channel) {
- ERR("UST consumer get channel key %lu not found", key);
- ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
- goto end_msg_sessiond;
- }
-
- destroy_channel(channel);
+ /*
+ * Only called if streams have not been sent to stream
+ * manager thread. However, channel has been sent to
+ * channel manager thread.
+ */
+ notify_thread_del_channel(ctx, key);
goto end_msg_sessiond;
}
case LTTNG_CONSUMER_CLOSE_METADATA:
uint64_t offset = msg.u.push_metadata.target_offset;
struct lttng_consumer_channel *channel;
- DBG("UST consumer push metadata key %lu of len %lu", key, len);
+ DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key,
+ len);
channel = consumer_find_channel(key);
if (!channel) {
- ERR("UST consumer push metadata %lu not found", key);
+ ERR("UST consumer push metadata %" PRIu64 " not found", key);
ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+ goto end_msg_sessiond;
}
/* Tell session daemon we are ready to receive the metadata. */
assert(chan);
assert(chan->uchan);
+ if (chan->switch_timer_enabled == 1) {
+ consumer_timer_switch_stop(chan);
+ }
+ consumer_metadata_cache_destroy(chan);
ustctl_destroy_channel(chan->uchan);
}
assert(stream);
assert(stream->ustream);
+ if (stream->chan->switch_timer_enabled == 1) {
+ consumer_timer_switch_stop(stream->chan);
+ }
ustctl_destroy_stream(stream->ustream);
}
* happen and it is OK with the code flow.
*/
DBG("Error writing to tracefile "
- "(ret: %zd != len: %lu != subbuf_size: %lu)",
+ "(ret: %ld != len: %lu != subbuf_size: %lu)",
ret, len, subbuf_size);
}
err = ustctl_put_next_subbuf(ustream);
/*
* Called when a stream is created.
+ *
+ * Return 0 on success or else a negative value.
*/
int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
{
- return lttng_create_output_file(stream);
+ int ret;
+
+ /* Don't create anything if this is set for streaming. */
+ if (stream->net_seq_idx == (uint64_t) -1ULL) {
+ ret = utils_create_stream_file(stream->chan->pathname, stream->name,
+ stream->chan->tracefile_size, stream->tracefile_count_current,
+ stream->uid, stream->gid);
+ if (ret < 0) {
+ goto error;
+ }
+ stream->out_fd = ret;
+ stream->tracefile_size_current = 0;
+ }
+ ret = 0;
+
+error:
+ return ret;
}
/*
ret = lttcomm_recv_unix_sock(ctx->consumer_metadata_socket, &msg,
sizeof(msg));
if (ret != sizeof(msg)) {
- DBG("Consumer received unexpected message size %d (expects %lu)",
+ DBG("Consumer received unexpected message size %d (expects %zu)",
ret, sizeof(msg));
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
/*