#include <bin/lttng-consumerd/health-consumerd.h>
#include <common/common.h>
#include <common/utils.h>
+#include <common/time.h>
#include <common/compat/poll.h>
#include <common/compat/endian.h>
#include <common/index/index.h>
#include <common/consumer/consumer-testpoint.h>
#include <common/align.h>
#include <common/consumer/consumer-metadata-cache.h>
+#include <common/trace-chunk.h>
+#include <common/trace-chunk-registry.h>
+#include <common/string-utils/format.h>
+#include <common/dynamic-array.h>
struct lttng_consumer_global_data consumer_data = {
.stream_count = 0,
(void) relayd_close(&relayd->control_sock);
(void) relayd_close(&relayd->data_sock);
+ pthread_mutex_destroy(&relayd->ctrl_sock_mutex);
free(relayd);
}
*/
void consumer_del_channel(struct lttng_consumer_channel *channel)
{
- int ret;
struct lttng_ht_iter iter;
DBG("Consumer delete channel key %" PRIu64, channel->key);
goto end;
}
- rcu_read_lock();
- iter.iter.node = &channel->node.node;
- ret = lttng_ht_del(consumer_data.channel_ht, &iter);
- assert(!ret);
- rcu_read_unlock();
+ lttng_trace_chunk_put(channel->trace_chunk);
+ channel->trace_chunk = NULL;
+
+ if (channel->is_published) {
+ int ret;
- call_rcu(&channel->node.head, free_channel_rcu);
+ rcu_read_lock();
+ iter.iter.node = &channel->node.node;
+ ret = lttng_ht_del(consumer_data.channel_ht, &iter);
+ assert(!ret);
+
+ iter.iter.node = &channel->channels_by_session_id_ht_node.node;
+ ret = lttng_ht_del(consumer_data.channels_by_session_id_ht,
+ &iter);
+ assert(!ret);
+ rcu_read_unlock();
+ }
+
+ call_rcu(&channel->node.head, free_channel_rcu);
end:
pthread_mutex_unlock(&channel->lock);
pthread_mutex_unlock(&consumer_data.lock);
* If a local data context is available, notify the threads that the streams'
* state have changed.
*/
-static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
- struct lttng_consumer_local_data *ctx)
+void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd)
{
uint64_t netidx;
assert(relayd);
- DBG("Cleaning up relayd sockets");
+ DBG("Cleaning up relayd object ID %"PRIu64, relayd->net_seq_idx);
/* Save the net sequence index before destroying the object */
netidx = relayd->net_seq_idx;
* memory barrier ordering the updates of the end point status from the
* read of this status which happens AFTER receiving this notify.
*/
- if (ctx) {
- notify_thread_lttng_pipe(ctx->consumer_data_pipe);
- notify_thread_lttng_pipe(ctx->consumer_metadata_pipe);
- }
+ notify_thread_lttng_pipe(relayd->ctx->consumer_data_pipe);
+ notify_thread_lttng_pipe(relayd->ctx->consumer_metadata_pipe);
}
/*
consumer_stream_destroy(stream, metadata_ht);
}
+void consumer_stream_update_channel_attributes(
+ struct lttng_consumer_stream *stream,
+ struct lttng_consumer_channel *channel)
+{
+ stream->channel_read_only_attributes.tracefile_size =
+ channel->tracefile_size;
+}
+
struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
uint64_t stream_key,
- enum lttng_consumer_stream_state state,
const char *channel_name,
- uid_t uid,
- gid_t gid,
uint64_t relayd_id,
uint64_t session_id,
+ struct lttng_trace_chunk *trace_chunk,
int cpu,
int *alloc_ret,
enum consumer_channel_type type,
goto end;
}
- rcu_read_lock();
+ if (trace_chunk && !lttng_trace_chunk_get(trace_chunk)) {
+ ERR("Failed to acquire trace chunk reference during the creation of a stream");
+ ret = -1;
+ goto error;
+ }
+ rcu_read_lock();
stream->key = stream_key;
+ stream->trace_chunk = trace_chunk;
stream->out_fd = -1;
stream->out_fd_offset = 0;
stream->output_written = 0;
- stream->state = state;
- stream->uid = uid;
- stream->gid = gid;
stream->net_seq_idx = relayd_id;
stream->session_id = session_id;
stream->monitor = monitor;
error:
rcu_read_unlock();
+ lttng_trace_chunk_put(stream->trace_chunk);
free(stream);
end:
if (alloc_ret) {
/*
* Add a stream to the global list protected by a mutex.
*/
-int consumer_add_data_stream(struct lttng_consumer_stream *stream)
+void consumer_add_data_stream(struct lttng_consumer_stream *stream)
{
struct lttng_ht *ht = data_ht;
- int ret = 0;
assert(stream);
assert(ht);
pthread_mutex_unlock(&stream->chan->timer_lock);
pthread_mutex_unlock(&stream->chan->lock);
pthread_mutex_unlock(&consumer_data.lock);
-
- return ret;
}
void consumer_del_data_stream(struct lttng_consumer_stream *stream)
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_add_stream(&relayd->control_sock, stream->name,
path, &stream->relayd_stream_id,
- stream->chan->tracefile_size, stream->chan->tracefile_count);
+ stream->chan->tracefile_size,
+ stream->chan->tracefile_count,
+ stream->trace_chunk);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
+ ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
goto end;
}
ret = relayd_streams_sent(&relayd->control_sock);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
+ ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
goto end;
}
} else {
data_hdr.stream_id = htobe64(stream->relayd_stream_id);
data_hdr.data_size = htobe32(data_size);
data_hdr.padding_size = htobe32(padding);
+
/*
* Note that net_seq_num below is assigned with the *current* value of
* next_net_seq_num and only after that the next_net_seq_num will be
return outfd;
}
+/*
+ * Trigger a dump of the metadata content. Following/during the succesful
+ * completion of this call, the metadata poll thread will start receiving
+ * metadata packets to consume.
+ *
+ * The caller must hold the channel and stream locks.
+ */
+static
+int consumer_metadata_stream_dump(struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ ASSERT_LOCKED(stream->chan->lock);
+ ASSERT_LOCKED(stream->lock);
+ assert(stream->metadata_flag);
+ assert(stream->chan->trace_chunk);
+
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ /*
+ * Reset the position of what has been read from the
+ * metadata cache to 0 so we can dump it again.
+ */
+ ret = kernctl_metadata_cache_dump(stream->wait_fd);
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ /*
+ * Reset the position pushed from the metadata cache so it
+ * will write from the beginning on the next push.
+ */
+ stream->ust_metadata_pushed = 0;
+ ret = consumer_metadata_wakeup_pipe(stream->chan);
+ break;
+ default:
+ ERR("Unknown consumer_data type");
+ abort();
+ }
+ if (ret < 0) {
+ ERR("Failed to dump the metadata cache");
+ }
+ return ret;
+}
+
+static
+int lttng_consumer_channel_set_trace_chunk(
+ struct lttng_consumer_channel *channel,
+ struct lttng_trace_chunk *new_trace_chunk)
+{
+ int ret = 0;
+ const bool is_local_trace = channel->relayd_id == -1ULL;
+ bool update_stream_trace_chunk;
+ struct cds_lfht_iter iter;
+ struct lttng_consumer_stream *stream;
+ unsigned long channel_hash;
+
+ pthread_mutex_lock(&channel->lock);
+ /*
+ * A stream can transition to a state where it and its channel
+ * no longer belong to a trace chunk. For instance, this happens when
+ * a session is rotated while it is inactive. After the rotation
+ * of an inactive session completes, the channel and its streams no
+ * longer belong to a trace chunk.
+ *
+ * However, if a session is stopped, rotated, and started again,
+ * the session daemon will create a new chunk and send it to its peers.
+ * In that case, the streams' transition to a new chunk can be performed
+ * immediately.
+ *
+ * This trace chunk transition could also be performed lazily when
+ * a buffer is consumed. However, creating the files here allows the
+ * consumer daemon to report any creation error to the session daemon
+ * and cause the start of the tracing session to fail.
+ */
+ update_stream_trace_chunk = !channel->trace_chunk && new_trace_chunk;
+
+ /*
+ * The acquisition of the reference cannot fail (barring
+ * a severe internal error) since a reference to the published
+ * chunk is already held by the caller.
+ */
+ if (new_trace_chunk) {
+ const bool acquired_reference = lttng_trace_chunk_get(
+ new_trace_chunk);
+
+ assert(acquired_reference);
+ }
+
+ lttng_trace_chunk_put(channel->trace_chunk);
+ channel->trace_chunk = new_trace_chunk;
+ if (!is_local_trace || !new_trace_chunk) {
+ /* Not an error. */
+ goto end;
+ }
+
+ if (!update_stream_trace_chunk) {
+ goto end;
+ }
+
+ channel_hash = consumer_data.stream_per_chan_id_ht->hash_fct(
+ &channel->key, lttng_ht_seed);
+ rcu_read_lock();
+ cds_lfht_for_each_entry_duplicate(consumer_data.stream_per_chan_id_ht->ht,
+ channel_hash,
+ consumer_data.stream_per_chan_id_ht->match_fct,
+ &channel->key, &iter, stream, node_channel_id.node) {
+ bool acquired_reference, should_regenerate_metadata = false;
+
+ acquired_reference = lttng_trace_chunk_get(channel->trace_chunk);
+ assert(acquired_reference);
+
+ pthread_mutex_lock(&stream->lock);
+
+ /*
+ * On a transition from "no-chunk" to a new chunk, a metadata
+ * stream's content must be entirely dumped. This must occcur
+ * _after_ the creation of the metadata stream's output files
+ * as the consumption thread (not necessarily the one executing
+ * this) may start to consume during the call to
+ * consumer_metadata_stream_dump().
+ */
+ should_regenerate_metadata =
+ stream->metadata_flag &&
+ !stream->trace_chunk && channel->trace_chunk;
+ stream->trace_chunk = channel->trace_chunk;
+ ret = consumer_stream_create_output_files(stream, true);
+ if (ret) {
+ pthread_mutex_unlock(&stream->lock);
+ goto end_rcu_unlock;
+ }
+ if (should_regenerate_metadata) {
+ ret = consumer_metadata_stream_dump(stream);
+ }
+ pthread_mutex_unlock(&stream->lock);
+ if (ret) {
+ goto end_rcu_unlock;
+ }
+ }
+end_rcu_unlock:
+ rcu_read_unlock();
+end:
+ pthread_mutex_unlock(&channel->lock);
+ return ret;
+}
+
/*
* Allocate and return a new lttng_consumer_channel object using the given key
* to initialize the hash table node.
*/
struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
uint64_t session_id,
+ const uint64_t *chunk_id,
const char *pathname,
const char *name,
- uid_t uid,
- gid_t gid,
uint64_t relayd_id,
enum lttng_event_output output,
uint64_t tracefile_size,
const char *root_shm_path,
const char *shm_path)
{
- struct lttng_consumer_channel *channel;
+ struct lttng_consumer_channel *channel = NULL;
+ struct lttng_trace_chunk *trace_chunk = NULL;
+
+ if (chunk_id) {
+ trace_chunk = lttng_trace_chunk_registry_find_chunk(
+ consumer_data.chunk_registry, session_id,
+ *chunk_id);
+ if (!trace_chunk) {
+ ERR("Failed to find trace chunk reference during creation of channel");
+ goto end;
+ }
+ }
channel = zmalloc(sizeof(*channel));
if (channel == NULL) {
channel->refcount = 0;
channel->session_id = session_id;
channel->session_id_per_pid = session_id_per_pid;
- channel->uid = uid;
- channel->gid = gid;
channel->relayd_id = relayd_id;
channel->tracefile_size = tracefile_size;
channel->tracefile_count = tracefile_count;
}
lttng_ht_node_init_u64(&channel->node, channel->key);
+ lttng_ht_node_init_u64(&channel->channels_by_session_id_ht_node,
+ channel->session_id);
channel->wait_fd = -1;
-
CDS_INIT_LIST_HEAD(&channel->streams.head);
+ if (trace_chunk) {
+ int ret = lttng_consumer_channel_set_trace_chunk(channel,
+ trace_chunk);
+ if (ret) {
+ goto error;
+ }
+ }
+
DBG("Allocated channel (key %" PRIu64 ")", channel->key);
end:
+ lttng_trace_chunk_put(trace_chunk);
return channel;
+error:
+ consumer_del_channel(channel);
+ channel = NULL;
+ goto end;
}
/*
rcu_read_lock();
lttng_ht_add_unique_u64(consumer_data.channel_ht, &channel->node);
+ lttng_ht_add_u64(consumer_data.channels_by_session_id_ht,
+ &channel->channels_by_session_id_ht_node);
rcu_read_unlock();
+ channel->is_published = true;
pthread_mutex_unlock(&channel->timer_lock);
pthread_mutex_unlock(&channel->lock);
*/
static int update_poll_array(struct lttng_consumer_local_data *ctx,
struct pollfd **pollfd, struct lttng_consumer_stream **local_stream,
- struct lttng_ht *ht)
+ struct lttng_ht *ht, int *nb_inactive_fd)
{
int i = 0;
struct lttng_ht_iter iter;
assert(local_stream);
DBG("Updating poll fd array");
+ *nb_inactive_fd = 0;
rcu_read_lock();
cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
/*
* just after the check. However, this is OK since the stream(s) will
* be deleted once the thread is notified that the end point state has
* changed where this function will be called back again.
+ *
+ * We track the number of inactive FDs because they still need to be
+ * closed by the polling thread after a wakeup on the data_pipe or
+ * metadata_pipe.
*/
- if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
- stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
+ if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
+ (*nb_inactive_fd)++;
continue;
}
/*
* This clobbers way too much the debug output. Uncomment that if you
* need it for debugging purposes.
- *
- * DBG("Active FD %d", stream->wait_fd);
*/
(*pollfd)[i].fd = stream->wait_fd;
(*pollfd)[i].events = POLLIN | POLLPRI;
rcu_read_unlock();
lttng_ht_destroy(consumer_data.channel_ht);
+ lttng_ht_destroy(consumer_data.channels_by_session_id_ht);
cleanup_relayd_ht();
* it.
*/
lttng_ht_destroy(consumer_data.stream_list_ht);
+
+ lttng_trace_chunk_registry_destroy(consumer_data.chunk_registry);
}
/*
*/
static int write_relayd_metadata_id(int fd,
struct lttng_consumer_stream *stream,
- struct consumer_relayd_sock_pair *relayd, unsigned long padding)
+ unsigned long padding)
{
ssize_t ret;
struct lttcomm_relayd_metadata_payload hdr;
* core function for writing trace buffers to either the local filesystem or
* the network.
*
- * It must be called with the stream lock held.
+ * It must be called with the stream and the channel lock held.
*
* Careful review MUST be put if any changes occur!
*
/* RCU lock for the relayd pointer */
rcu_read_lock();
+ assert(stream->net_seq_idx != (uint64_t) -1ULL ||
+ stream->trace_chunk);
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != (uint64_t) -1ULL) {
/* Write metadata stream id before payload */
if (stream->metadata_flag) {
- ret = write_relayd_metadata_id(outfd, stream, relayd, padding);
+ ret = write_relayd_metadata_id(outfd, stream, padding);
if (ret < 0) {
relayd_hang_up = 1;
goto write_error;
if (stream->chan->tracefile_size > 0 &&
(stream->tracefile_size_current + len) >
stream->chan->tracefile_size) {
- ret = utils_rotate_stream_file(stream->chan->pathname,
- stream->name, stream->chan->tracefile_size,
- stream->chan->tracefile_count, stream->uid, stream->gid,
- stream->out_fd, &(stream->tracefile_count_current),
- &stream->out_fd);
- if (ret < 0) {
- ERR("Rotating output file");
+ ret = consumer_stream_rotate_output_files(stream);
+ if (ret) {
goto end;
}
outfd = stream->out_fd;
-
- if (stream->index_file) {
- lttng_index_file_put(stream->index_file);
- stream->index_file = lttng_index_file_create(stream->chan->pathname,
- stream->name, stream->uid, stream->gid,
- stream->chan->tracefile_size,
- stream->tracefile_count_current,
- CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
- if (!stream->index_file) {
- goto end;
- }
- }
-
- /* Reset current size because we just perform a rotation. */
- stream->tracefile_size_current = 0;
- stream->out_fd_offset = 0;
orig_offset = 0;
}
stream->tracefile_size_current += len;
relayd_hang_up = 1;
/* Socket operation failed. We consider the relayd dead */
- if (errno == EPIPE || errno == EINVAL || errno == EBADF) {
+ if (errno == EPIPE) {
/*
* This is possible if the fd is closed on the other side
* (outfd) or any write problem. It can be verbose a bit for a
* cleanup the relayd object and all associated streams.
*/
if (relayd && relayd_hang_up) {
- cleanup_relayd(relayd, ctx);
+ ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
}
end:
}
stream->reset_metadata_flag = 0;
}
- ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd,
+ ret = write_relayd_metadata_id(splice_pipe[1], stream,
padding);
if (ret < 0) {
written = ret;
if (stream->chan->tracefile_size > 0 &&
(stream->tracefile_size_current + len) >
stream->chan->tracefile_size) {
- ret = utils_rotate_stream_file(stream->chan->pathname,
- stream->name, stream->chan->tracefile_size,
- stream->chan->tracefile_count, stream->uid, stream->gid,
- stream->out_fd, &(stream->tracefile_count_current),
- &stream->out_fd);
+ ret = consumer_stream_rotate_output_files(stream);
if (ret < 0) {
written = ret;
- ERR("Rotating output file");
goto end;
}
outfd = stream->out_fd;
-
- if (stream->index_file) {
- lttng_index_file_put(stream->index_file);
- stream->index_file = lttng_index_file_create(stream->chan->pathname,
- stream->name, stream->uid, stream->gid,
- stream->chan->tracefile_size,
- stream->tracefile_count_current,
- CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
- if (!stream->index_file) {
- goto end;
- }
- }
-
- /* Reset current size because we just perform a rotation. */
- stream->tracefile_size_current = 0;
- stream->out_fd_offset = 0;
orig_offset = 0;
}
stream->tracefile_size_current += len;
* cleanup the relayd object and all associated streams.
*/
if (relayd && relayd_hang_up) {
- cleanup_relayd(relayd, ctx);
+ ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
/* Skip splice error so the consumer does not fail */
goto end;
}
return written;
}
+/*
+ * Sample the snapshot positions for a specific fd
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream)
+{
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ return lttng_kconsumer_sample_snapshot_positions(stream);
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ return lttng_ustconsumer_sample_snapshot_positions(stream);
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ return -ENOSYS;
+ }
+}
/*
* Take a snapshot for a specific fd
*
}
}
+/*
+ * Get the consumed position (free-running counter position in bytes).
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
+ unsigned long *pos)
+{
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ return lttng_kconsumer_get_consumed_snapshot(stream, pos);
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ return lttng_ustconsumer_get_consumed_snapshot(stream, pos);
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ return -ENOSYS;
+ }
+}
+
int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
int sock, struct pollfd *consumer_sockpoll)
{
consumer_del_channel(free_chan);
}
+ lttng_trace_chunk_put(stream->trace_chunk);
+ stream->trace_chunk = NULL;
consumer_stream_free(stream);
}
* Action done with the metadata stream when adding it to the consumer internal
* data structures to handle it.
*/
-int consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
+void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
{
struct lttng_ht *ht = metadata_ht;
- int ret = 0;
struct lttng_ht_iter iter;
struct lttng_ht_node_u64 *node;
lttng_ht_add_unique_u64(ht, &stream->node);
- lttng_ht_add_unique_u64(consumer_data.stream_per_chan_id_ht,
+ lttng_ht_add_u64(consumer_data.stream_per_chan_id_ht,
&stream->node_channel_id);
/*
pthread_mutex_unlock(&stream->chan->lock);
pthread_mutex_unlock(&stream->chan->timer_lock);
pthread_mutex_unlock(&consumer_data.lock);
- return ret;
}
/*
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
- if (!revents) {
- /* No activity for this FD (poll implementation). */
- continue;
- }
-
if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
if (revents & LPOLLIN) {
ssize_t pipe_len;
struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
/* local view of consumer_data.fds_count */
int nb_fd = 0;
+ /* 2 for the consumer_data_pipe and wake up pipe */
+ const int nb_pipes_fd = 2;
+ /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */
+ int nb_inactive_fd = 0;
struct lttng_consumer_local_data *ctx = data;
ssize_t len;
free(local_stream);
local_stream = NULL;
- /*
- * Allocate for all fds +1 for the consumer_data_pipe and +1 for
- * wake up pipe.
- */
- pollfd = zmalloc((consumer_data.stream_count + 2) * sizeof(struct pollfd));
+ /* Allocate for all fds */
+ pollfd = zmalloc((consumer_data.stream_count + nb_pipes_fd) * sizeof(struct pollfd));
if (pollfd == NULL) {
PERROR("pollfd malloc");
pthread_mutex_unlock(&consumer_data.lock);
goto end;
}
- local_stream = zmalloc((consumer_data.stream_count + 2) *
+ local_stream = zmalloc((consumer_data.stream_count + nb_pipes_fd) *
sizeof(struct lttng_consumer_stream *));
if (local_stream == NULL) {
PERROR("local_stream malloc");
goto end;
}
ret = update_poll_array(ctx, &pollfd, local_stream,
- data_ht);
+ data_ht, &nb_inactive_fd);
if (ret < 0) {
ERR("Error in allocating pollfd or local_outfds");
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
pthread_mutex_unlock(&consumer_data.lock);
/* No FDs and consumer_quit, consumer_cleanup the thread */
- if (nb_fd == 0 && CMM_LOAD_SHARED(consumer_quit) == 1) {
+ if (nb_fd == 0 && nb_inactive_fd == 0 &&
+ CMM_LOAD_SHARED(consumer_quit) == 1) {
err = 0; /* All is OK */
goto end;
}
/* poll on the array of fds */
restart:
- DBG("polling on %d fd", nb_fd + 2);
+ DBG("polling on %d fd", nb_fd + nb_pipes_fd);
if (testpoint(consumerd_thread_data_poll)) {
goto end;
}
health_poll_entry();
- num_rdy = poll(pollfd, nb_fd + 2, -1);
+ num_rdy = poll(pollfd, nb_fd + nb_pipes_fd, -1);
health_poll_exit();
DBG("poll num_rdy : %d", num_rdy);
if (num_rdy == -1) {
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
- if (!revents) {
- /* No activity for this FD (poll implementation). */
- continue;
- }
-
if (pollfd == ctx->consumer_channel_pipe[0]) {
if (revents & LPOLLIN) {
enum consumer_channel_action action;
{
ssize_t ret;
+ pthread_mutex_lock(&stream->chan->lock);
pthread_mutex_lock(&stream->lock);
if (stream->metadata_flag) {
pthread_mutex_lock(&stream->metadata_rdv_lock);
pthread_mutex_unlock(&stream->metadata_rdv_lock);
}
pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&stream->chan->lock);
+
return ret;
}
goto error;
}
+ consumer_data.channels_by_session_id_ht =
+ lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!consumer_data.channels_by_session_id_ht) {
+ goto error;
+ }
+
consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!consumer_data.relayd_ht) {
goto error;
goto error;
}
+ consumer_data.chunk_registry = lttng_trace_chunk_registry_create();
+ if (!consumer_data.chunk_registry) {
+ goto error;
+ }
+
return 0;
error:
* This will create a relayd socket pair and add it to the relayd hash table.
* The caller MUST acquire a RCU read side lock before calling it.
*/
-int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
+ void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
struct lttng_consumer_local_data *ctx, int sock,
struct pollfd *consumer_sockpoll,
struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id,
/* Not found. Allocate one. */
relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
if (relayd == NULL) {
- ret = -ENOMEM;
ret_code = LTTCOMM_CONSUMERD_ENOMEM;
goto error;
} else {
if (ret) {
/* Needing to exit in the middle of a command: error. */
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
- ret = -EINTR;
goto error_nosignal;
}
/* Get relayd socket from session daemon */
ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
if (ret != sizeof(fd)) {
- ret = -1;
fd = -1; /* Just in case it gets set with an invalid value. */
/*
/* Assign new file descriptor */
relayd->control_sock.sock.fd = fd;
- fd = -1; /* For error path */
/* Assign version values. */
relayd->control_sock.major = relayd_sock->major;
relayd->control_sock.minor = relayd_sock->minor;
/* Assign new file descriptor */
relayd->data_sock.sock.fd = fd;
- fd = -1; /* for eventual error paths */
/* Assign version values. */
relayd->data_sock.major = relayd_sock->major;
relayd->data_sock.minor = relayd_sock->minor;
break;
default:
ERR("Unknown relayd socket type (%d)", sock_type);
- ret = -1;
ret_code = LTTCOMM_CONSUMERD_FATAL;
goto error;
}
DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
relayd->net_seq_idx, fd);
+ /*
+ * We gave the ownership of the fd to the relayd structure. Set the
+ * fd to -1 so we don't call close() on it in the error path below.
+ */
+ fd = -1;
/* We successfully added the socket. Send status back. */
ret = consumer_send_status_msg(sock, ret_code);
* Add relayd socket pair to consumer data hashtable. If object already
* exists or on error, the function gracefully returns.
*/
+ relayd->ctx = ctx;
add_relayd(relayd);
/* All good! */
- return 0;
+ return;
error:
if (consumer_send_status_msg(sock, ret_code) < 0) {
if (relayd_created) {
free(relayd);
}
-
- return ret;
-}
-
-/*
- * Try to lock the stream mutex.
- *
- * On success, 1 is returned else 0 indicating that the mutex is NOT lock.
- */
-static int stream_try_lock(struct lttng_consumer_stream *stream)
-{
- int ret;
-
- assert(stream);
-
- /*
- * Try to lock the stream mutex. On failure, we know that the stream is
- * being used else where hence there is data still being extracted.
- */
- ret = pthread_mutex_trylock(&stream->lock);
- if (ret) {
- /* For both EBUSY and EINVAL error, the mutex is NOT locked. */
- ret = 0;
- goto end;
- }
-
- ret = 1;
-
-end:
- return ret;
}
/*
/* Ease our life a bit */
ht = consumer_data.stream_list_ht;
- relayd = find_relayd_by_session_id(id);
- if (relayd) {
- /* Send init command for data pending. */
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_begin_data_pending(&relayd->control_sock,
- relayd->relayd_session_id);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- if (ret < 0) {
- /* Communication error thus the relayd so no data pending. */
- goto data_not_pending;
- }
- }
-
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct(&id, lttng_ht_seed),
ht->match_fct, &id,
&iter.iter, stream, node_session_id.node) {
- /* If this call fails, the stream is being used hence data pending. */
- ret = stream_try_lock(stream);
- if (!ret) {
- goto data_pending;
- }
+ pthread_mutex_lock(&stream->lock);
/*
* A removed node from the hash table indicates that the stream has
}
}
- /* Relayd check */
- if (relayd) {
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ pthread_mutex_unlock(&stream->lock);
+ }
+
+ relayd = find_relayd_by_session_id(id);
+ if (relayd) {
+ unsigned int is_data_inflight = 0;
+
+ /* Send init command for data pending. */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_begin_data_pending(&relayd->control_sock,
+ relayd->relayd_session_id);
+ if (ret < 0) {
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ /* Communication error thus the relayd so no data pending. */
+ goto data_not_pending;
+ }
+
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&id, lttng_ht_seed),
+ ht->match_fct, &id,
+ &iter.iter, stream, node_session_id.node) {
if (stream->metadata_flag) {
ret = relayd_quiescent_control(&relayd->control_sock,
stream->relayd_stream_id);
stream->relayd_stream_id,
stream->next_net_seq_num - 1);
}
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+
if (ret == 1) {
- pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
goto data_pending;
+ } else if (ret < 0) {
+ ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ goto data_not_pending;
}
}
- pthread_mutex_unlock(&stream->lock);
- }
-
- if (relayd) {
- unsigned int is_data_inflight = 0;
- /* Send init command for data pending. */
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ /* Send end command for data pending. */
ret = relayd_end_data_pending(&relayd->control_sock,
relayd->relayd_session_id, &is_data_inflight);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
+ ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
goto data_not_pending;
}
if (is_data_inflight) {
}
return start_pos;
}
+
+static
+int consumer_flush_buffer(struct lttng_consumer_stream *stream, int producer_active)
+{
+ int ret = 0;
+
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ ret = kernctl_buffer_flush(stream->wait_fd);
+ if (ret < 0) {
+ ERR("Failed to flush kernel stream");
+ goto end;
+ }
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ lttng_ustctl_flush_buffer(stream, producer_active);
+ break;
+ default:
+ ERR("Unknown consumer_data type");
+ abort();
+ }
+
+end:
+ return ret;
+}
+
+/*
+ * Sample the rotate position for all the streams of a channel. If a stream
+ * is already at the rotate position (produced == consumed), we flag it as
+ * ready for rotation. The rotation of ready streams occurs after we have
+ * replied to the session daemon that we have finished sampling the positions.
+ * Must be called with RCU read-side lock held to ensure existence of channel.
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
+ uint64_t key, uint64_t relayd_id, uint32_t metadata,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret;
+ struct lttng_consumer_stream *stream;
+ struct lttng_ht_iter iter;
+ struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+ struct lttng_dynamic_array stream_rotation_positions;
+ uint64_t next_chunk_id, stream_count = 0;
+ enum lttng_trace_chunk_status chunk_status;
+ const bool is_local_trace = relayd_id == -1ULL;
+ struct consumer_relayd_sock_pair *relayd = NULL;
+ bool rotating_to_new_chunk = true;
+
+ DBG("Consumer sample rotate position for channel %" PRIu64, key);
+
+ lttng_dynamic_array_init(&stream_rotation_positions,
+ sizeof(struct relayd_stream_rotation_position), NULL);
+
+ rcu_read_lock();
+
+ pthread_mutex_lock(&channel->lock);
+ assert(channel->trace_chunk);
+ chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk,
+ &next_chunk_id);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret = -1;
+ goto end_unlock_channel;
+ }
+
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct, &channel->key, &iter.iter,
+ stream, node_channel_id.node) {
+ unsigned long consumed_pos;
+
+ health_code_update();
+
+ /*
+ * Lock stream because we are about to change its state.
+ */
+ pthread_mutex_lock(&stream->lock);
+
+ if (stream->trace_chunk == stream->chan->trace_chunk) {
+ rotating_to_new_chunk = false;
+ }
+
+ ret = lttng_consumer_sample_snapshot_positions(stream);
+ if (ret < 0) {
+ ERR("Failed to sample snapshot position during channel rotation");
+ goto end_unlock_stream;
+ }
+
+ ret = lttng_consumer_get_produced_snapshot(stream,
+ &stream->rotate_position);
+ if (ret < 0) {
+ ERR("Failed to sample produced position during channel rotation");
+ goto end_unlock_stream;
+ }
+
+ lttng_consumer_get_consumed_snapshot(stream,
+ &consumed_pos);
+ if (consumed_pos == stream->rotate_position) {
+ stream->rotate_ready = true;
+ }
+
+ /*
+ * Active flush; has no effect if the production position
+ * is at a packet boundary.
+ */
+ ret = consumer_flush_buffer(stream, 1);
+ if (ret < 0) {
+ ERR("Failed to flush stream %" PRIu64 " during channel rotation",
+ stream->key);
+ goto end_unlock_stream;
+ }
+
+ if (!is_local_trace) {
+ /*
+ * The relay daemon control protocol expects a rotation
+ * position as "the sequence number of the first packet
+ * _after_ the current trace chunk.
+ *
+ * At the moment when the positions of the buffers are
+ * sampled, the production position does not necessarily
+ * sit at a packet boundary. The 'active' flush
+ * operation above will push the production position to
+ * the next packet boundary _if_ it is not already
+ * sitting at such a boundary.
+ *
+ * Assuming a current production position that is not
+ * on the bound of a packet, the 'target' sequence
+ * number is
+ * (consumed_pos / subbuffer_size) + 1
+ * Note the '+ 1' to ensure the current packet is
+ * part of the current trace chunk.
+ *
+ * However, if the production position is already at
+ * a packet boundary, the '+ 1' is not necessary as the
+ * last packet of the current chunk is already
+ * 'complete'.
+ */
+ const struct relayd_stream_rotation_position position = {
+ .stream_id = stream->relayd_stream_id,
+ .rotate_at_seq_num = (stream->rotate_position / stream->max_sb_size) +
+ !!(stream->rotate_position % stream->max_sb_size),
+ };
+
+ ret = lttng_dynamic_array_add_element(
+ &stream_rotation_positions,
+ &position);
+ if (ret) {
+ ERR("Failed to allocate stream rotation position");
+ goto end_unlock_stream;
+ }
+ stream_count++;
+ }
+ pthread_mutex_unlock(&stream->lock);
+ }
+ stream = NULL;
+ pthread_mutex_unlock(&channel->lock);
+
+ if (is_local_trace) {
+ ret = 0;
+ goto end;
+ }
+
+ relayd = consumer_find_relayd(relayd_id);
+ if (!relayd) {
+ ERR("Failed to find relayd %" PRIu64, relayd_id);
+ ret = -1;
+ goto end;
+ }
+
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_rotate_streams(&relayd->control_sock, stream_count,
+ rotating_to_new_chunk ? &next_chunk_id : NULL,
+ (const struct relayd_stream_rotation_position *)
+ stream_rotation_positions.buffer.data);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
+ relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
+ goto end;
+ }
+
+ ret = 0;
+ goto end;
+
+end_unlock_stream:
+ pthread_mutex_unlock(&stream->lock);
+end_unlock_channel:
+ pthread_mutex_unlock(&channel->lock);
+end:
+ rcu_read_unlock();
+ lttng_dynamic_array_reset(&stream_rotation_positions);
+ return ret;
+}
+
+/*
+ * Check if a stream is ready to be rotated after extracting it.
+ *
+ * Return 1 if it is ready for rotation, 0 if it is not, a negative value on
+ * error. Stream lock must be held.
+ */
+int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream)
+{
+ int ret;
+ unsigned long consumed_pos;
+
+ if (!stream->rotate_position && !stream->rotate_ready) {
+ ret = 0;
+ goto end;
+ }
+
+ if (stream->rotate_ready) {
+ ret = 1;
+ goto end;
+ }
+
+ /*
+ * If we don't have the rotate_ready flag, check the consumed position
+ * to determine if we need to rotate.
+ */
+ ret = lttng_consumer_sample_snapshot_positions(stream);
+ if (ret < 0) {
+ ERR("Taking snapshot positions");
+ goto end;
+ }
+
+ ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
+ if (ret < 0) {
+ ERR("Consumed snapshot position");
+ goto end;
+ }
+
+ /* Rotate position not reached yet (with check for overflow). */
+ if ((long) (consumed_pos - stream->rotate_position) < 0) {
+ ret = 0;
+ goto end;
+ }
+ ret = 1;
+
+end:
+ return ret;
+}
+
+/*
+ * Reset the state for a stream after a rotation occurred.
+ */
+void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream)
+{
+ stream->rotate_position = 0;
+ stream->rotate_ready = false;
+}
+
+/*
+ * Perform the rotation a local stream file.
+ */
+static
+int rotate_local_stream(struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_stream *stream)
+{
+ int ret = 0;
+
+ DBG("Rotate local stream: stream key %" PRIu64 ", channel key %" PRIu64,
+ stream->key,
+ stream->chan->key);
+ stream->tracefile_size_current = 0;
+ stream->tracefile_count_current = 0;
+
+ if (stream->out_fd >= 0) {
+ ret = close(stream->out_fd);
+ if (ret) {
+ PERROR("Failed to close stream out_fd of channel \"%s\"",
+ stream->chan->name);
+ }
+ stream->out_fd = -1;
+ }
+
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = NULL;
+ }
+
+ if (!stream->trace_chunk) {
+ goto end;
+ }
+
+ ret = consumer_stream_create_output_files(stream, true);
+end:
+ return ret;
+}
+
+/*
+ * Performs the stream rotation for the rotate session feature if needed.
+ * It must be called with the channel and stream locks held.
+ *
+ * Return 0 on success, a negative number of error.
+ */
+int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ DBG("Consumer rotate stream %" PRIu64, stream->key);
+
+ /*
+ * Update the stream's 'current' chunk to the session's (channel)
+ * now-current chunk.
+ */
+ lttng_trace_chunk_put(stream->trace_chunk);
+ if (stream->chan->trace_chunk == stream->trace_chunk) {
+ /*
+ * A channel can be rotated and not have a "next" chunk
+ * to transition to. In that case, the channel's "current chunk"
+ * has not been closed yet, but it has not been updated to
+ * a "next" trace chunk either. Hence, the stream, like its
+ * parent channel, becomes part of no chunk and can't output
+ * anything until a new trace chunk is created.
+ */
+ stream->trace_chunk = NULL;
+ } else if (stream->chan->trace_chunk &&
+ !lttng_trace_chunk_get(stream->chan->trace_chunk)) {
+ ERR("Failed to acquire a reference to channel's trace chunk during stream rotation");
+ ret = -1;
+ goto error;
+ } else {
+ /*
+ * Update the stream's trace chunk to its parent channel's
+ * current trace chunk.
+ */
+ stream->trace_chunk = stream->chan->trace_chunk;
+ }
+
+ if (stream->net_seq_idx == (uint64_t) -1ULL) {
+ ret = rotate_local_stream(ctx, stream);
+ if (ret < 0) {
+ ERR("Failed to rotate stream, ret = %i", ret);
+ goto error;
+ }
+ }
+
+ if (stream->metadata_flag && stream->trace_chunk) {
+ /*
+ * If the stream has transitioned to a new trace
+ * chunk, the metadata should be re-dumped to the
+ * newest chunk.
+ *
+ * However, it is possible for a stream to transition to
+ * a "no-chunk" state. This can happen if a rotation
+ * occurs on an inactive session. In such cases, the metadata
+ * regeneration will happen when the next trace chunk is
+ * created.
+ */
+ ret = consumer_metadata_stream_dump(stream);
+ if (ret) {
+ goto error;
+ }
+ }
+ lttng_consumer_reset_stream_rotate_state(stream);
+
+ ret = 0;
+
+error:
+ return ret;
+}
+
+/*
+ * Rotate all the ready streams now.
+ *
+ * This is especially important for low throughput streams that have already
+ * been consumed, we cannot wait for their next packet to perform the
+ * rotation.
+ * Need to be called with RCU read-side lock held to ensure existence of
+ * channel.
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
+ uint64_t key, struct lttng_consumer_local_data *ctx)
+{
+ int ret;
+ struct lttng_consumer_stream *stream;
+ struct lttng_ht_iter iter;
+ struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+
+ rcu_read_lock();
+
+ DBG("Consumer rotate ready streams in channel %" PRIu64, key);
+
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct, &channel->key, &iter.iter,
+ stream, node_channel_id.node) {
+ health_code_update();
+
+ pthread_mutex_lock(&stream->chan->lock);
+ pthread_mutex_lock(&stream->lock);
+
+ if (!stream->rotate_ready) {
+ pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&stream->chan->lock);
+ continue;
+ }
+ DBG("Consumer rotate ready stream %" PRIu64, stream->key);
+
+ ret = lttng_consumer_rotate_stream(ctx, stream);
+ pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&stream->chan->lock);
+ if (ret) {
+ goto end;
+ }
+ }
+
+ ret = 0;
+
+end:
+ rcu_read_unlock();
+ return ret;
+}
+
+enum lttcomm_return_code lttng_consumer_init_command(
+ struct lttng_consumer_local_data *ctx,
+ const lttng_uuid sessiond_uuid)
+{
+ enum lttcomm_return_code ret;
+ char uuid_str[UUID_STR_LEN];
+
+ if (ctx->sessiond_uuid.is_set) {
+ ret = LTTCOMM_CONSUMERD_ALREADY_SET;
+ goto end;
+ }
+
+ ctx->sessiond_uuid.is_set = true;
+ memcpy(ctx->sessiond_uuid.value, sessiond_uuid, sizeof(lttng_uuid));
+ ret = LTTCOMM_CONSUMERD_SUCCESS;
+ lttng_uuid_to_str(sessiond_uuid, uuid_str);
+ DBG("Received session daemon UUID: %s", uuid_str);
+end:
+ return ret;
+}
+
+enum lttcomm_return_code lttng_consumer_create_trace_chunk(
+ const uint64_t *relayd_id, uint64_t session_id,
+ uint64_t chunk_id,
+ time_t chunk_creation_timestamp,
+ const char *chunk_override_name,
+ const struct lttng_credentials *credentials,
+ struct lttng_directory_handle *chunk_directory_handle)
+{
+ int ret;
+ enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ struct lttng_trace_chunk *created_chunk, *published_chunk;
+ enum lttng_trace_chunk_status chunk_status;
+ char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
+ char creation_timestamp_buffer[ISO8601_STR_LEN];
+ const char *relayd_id_str = "(none)";
+ const char *creation_timestamp_str;
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_channel *channel;
+
+ if (relayd_id) {
+ /* Only used for logging purposes. */
+ ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer),
+ "%" PRIu64, *relayd_id);
+ if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
+ relayd_id_str = relayd_id_buffer;
+ } else {
+ relayd_id_str = "(formatting error)";
+ }
+ }
+
+ /* Local protocol error. */
+ assert(chunk_creation_timestamp);
+ ret = time_to_iso8601_str(chunk_creation_timestamp,
+ creation_timestamp_buffer,
+ sizeof(creation_timestamp_buffer));
+ creation_timestamp_str = !ret ? creation_timestamp_buffer :
+ "(formatting error)";
+
+ DBG("Consumer create trace chunk command: relay_id = %s"
+ ", session_id = %" PRIu64 ", chunk_id = %" PRIu64
+ ", chunk_override_name = %s"
+ ", chunk_creation_timestamp = %s",
+ relayd_id_str, session_id, chunk_id,
+ chunk_override_name ? : "(none)",
+ creation_timestamp_str);
+
+ /*
+ * The trace chunk registry, as used by the consumer daemon, implicitly
+ * owns the trace chunks. This is only needed in the consumer since
+ * the consumer has no notion of a session beyond session IDs being
+ * used to identify other objects.
+ *
+ * The lttng_trace_chunk_registry_publish() call below provides a
+ * reference which is not released; it implicitly becomes the session
+ * daemon's reference to the chunk in the consumer daemon.
+ *
+ * The lifetime of trace chunks in the consumer daemon is managed by
+ * the session daemon through the LTTNG_CONSUMER_CREATE_TRACE_CHUNK
+ * and LTTNG_CONSUMER_DESTROY_TRACE_CHUNK commands.
+ */
+ created_chunk = lttng_trace_chunk_create(chunk_id,
+ chunk_creation_timestamp);
+ if (!created_chunk) {
+ ERR("Failed to create trace chunk");
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto end;
+ }
+
+ if (chunk_override_name) {
+ chunk_status = lttng_trace_chunk_override_name(created_chunk,
+ chunk_override_name);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto end;
+ }
+ }
+
+ if (chunk_directory_handle) {
+ chunk_status = lttng_trace_chunk_set_credentials(created_chunk,
+ credentials);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ERR("Failed to set trace chunk credentials");
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto end;
+ }
+ /*
+ * The consumer daemon has no ownership of the chunk output
+ * directory.
+ */
+ chunk_status = lttng_trace_chunk_set_as_user(created_chunk,
+ chunk_directory_handle);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ERR("Failed to set trace chunk's directory handle");
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto end;
+ }
+ }
+
+ published_chunk = lttng_trace_chunk_registry_publish_chunk(
+ consumer_data.chunk_registry, session_id,
+ created_chunk);
+ lttng_trace_chunk_put(created_chunk);
+ created_chunk = NULL;
+ if (!published_chunk) {
+ ERR("Failed to publish trace chunk");
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto end;
+ }
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry_duplicate(consumer_data.channels_by_session_id_ht->ht,
+ consumer_data.channels_by_session_id_ht->hash_fct(
+ &session_id, lttng_ht_seed),
+ consumer_data.channels_by_session_id_ht->match_fct,
+ &session_id, &iter.iter, channel,
+ channels_by_session_id_ht_node.node) {
+ ret = lttng_consumer_channel_set_trace_chunk(channel,
+ published_chunk);
+ if (ret) {
+ /*
+ * Roll-back the creation of this chunk.
+ *
+ * This is important since the session daemon will
+ * assume that the creation of this chunk failed and
+ * will never ask for it to be closed, resulting
+ * in a leak and an inconsistent state for some
+ * channels.
+ */
+ enum lttcomm_return_code close_ret;
+
+ DBG("Failed to set new trace chunk on existing channels, rolling back");
+ close_ret = lttng_consumer_close_trace_chunk(relayd_id,
+ session_id, chunk_id,
+ chunk_creation_timestamp, NULL);
+ if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
+ ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
+ session_id, chunk_id);
+ }
+
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ break;
+ }
+ }
+
+ if (relayd_id) {
+ struct consumer_relayd_sock_pair *relayd;
+
+ relayd = consumer_find_relayd(*relayd_id);
+ if (relayd) {
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_create_trace_chunk(
+ &relayd->control_sock, published_chunk);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ } else {
+ ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id);
+ }
+
+ if (!relayd || ret) {
+ enum lttcomm_return_code close_ret;
+
+ close_ret = lttng_consumer_close_trace_chunk(relayd_id,
+ session_id,
+ chunk_id,
+ chunk_creation_timestamp,
+ NULL);
+ if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
+ ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
+ session_id,
+ chunk_id);
+ }
+
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto error;
+ }
+ }
+error:
+ rcu_read_unlock();
+ /* Release the reference returned by the "publish" operation. */
+ lttng_trace_chunk_put(published_chunk);
+end:
+ return ret_code;
+}
+
+enum lttcomm_return_code lttng_consumer_close_trace_chunk(
+ const uint64_t *relayd_id, uint64_t session_id,
+ uint64_t chunk_id, time_t chunk_close_timestamp,
+ const enum lttng_trace_chunk_command_type *close_command)
+{
+ enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ struct lttng_trace_chunk *chunk;
+ char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
+ const char *relayd_id_str = "(none)";
+ const char *close_command_name = "none";
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_channel *channel;
+ enum lttng_trace_chunk_status chunk_status;
+
+ if (relayd_id) {
+ int ret;
+
+ /* Only used for logging purposes. */
+ ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer),
+ "%" PRIu64, *relayd_id);
+ if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
+ relayd_id_str = relayd_id_buffer;
+ } else {
+ relayd_id_str = "(formatting error)";
+ }
+ }
+ if (close_command) {
+ close_command_name = lttng_trace_chunk_command_type_get_name(
+ *close_command);
+ }
+
+ DBG("Consumer close trace chunk command: relayd_id = %s"
+ ", session_id = %" PRIu64 ", chunk_id = %" PRIu64
+ ", close command = %s",
+ relayd_id_str, session_id, chunk_id,
+ close_command_name);
+
+ chunk = lttng_trace_chunk_registry_find_chunk(
+ consumer_data.chunk_registry, session_id, chunk_id);
+ if (!chunk) {
+ ERR("Failed to find chunk: session_id = %" PRIu64
+ ", chunk_id = %" PRIu64,
+ session_id, chunk_id);
+ ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
+ goto end;
+ }
+
+ chunk_status = lttng_trace_chunk_set_close_timestamp(chunk,
+ chunk_close_timestamp);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+ goto end;
+ }
+
+ if (close_command) {
+ chunk_status = lttng_trace_chunk_set_close_command(
+ chunk, *close_command);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+ goto end;
+ }
+ }
+
+ /*
+ * chunk is now invalid to access as we no longer hold a reference to
+ * it; it is only kept around to compare it (by address) to the
+ * current chunk found in the session's channels.
+ */
+ rcu_read_lock();
+ cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter,
+ channel, node.node) {
+ int ret;
+
+ /*
+ * Only change the channel's chunk to NULL if it still
+ * references the chunk being closed. The channel may
+ * reference a newer channel in the case of a session
+ * rotation. When a session rotation occurs, the "next"
+ * chunk is created before the "current" chunk is closed.
+ */
+ if (channel->trace_chunk != chunk) {
+ continue;
+ }
+ ret = lttng_consumer_channel_set_trace_chunk(channel, NULL);
+ if (ret) {
+ /*
+ * Attempt to close the chunk on as many channels as
+ * possible.
+ */
+ ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+ }
+ }
+
+ if (relayd_id) {
+ int ret;
+ struct consumer_relayd_sock_pair *relayd;
+
+ relayd = consumer_find_relayd(*relayd_id);
+ if (relayd) {
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_close_trace_chunk(
+ &relayd->control_sock, chunk);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ } else {
+ ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64,
+ *relayd_id);
+ }
+
+ if (!relayd || ret) {
+ ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+ goto error_unlock;
+ }
+ }
+error_unlock:
+ rcu_read_unlock();
+end:
+ /*
+ * Release the reference returned by the "find" operation and
+ * the session daemon's implicit reference to the chunk.
+ */
+ lttng_trace_chunk_put(chunk);
+ lttng_trace_chunk_put(chunk);
+
+ return ret_code;
+}
+
+enum lttcomm_return_code lttng_consumer_trace_chunk_exists(
+ const uint64_t *relayd_id, uint64_t session_id,
+ uint64_t chunk_id)
+{
+ int ret;
+ enum lttcomm_return_code ret_code;
+ struct lttng_trace_chunk *chunk;
+ char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
+ const char *relayd_id_str = "(none)";
+ const bool is_local_trace = !relayd_id;
+ struct consumer_relayd_sock_pair *relayd = NULL;
+ bool chunk_exists_remote;
+
+ if (relayd_id) {
+ int ret;
+
+ /* Only used for logging purposes. */
+ ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer),
+ "%" PRIu64, *relayd_id);
+ if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
+ relayd_id_str = relayd_id_buffer;
+ } else {
+ relayd_id_str = "(formatting error)";
+ }
+ }
+
+ DBG("Consumer trace chunk exists command: relayd_id = %s"
+ ", chunk_id = %" PRIu64, relayd_id_str,
+ chunk_id);
+ chunk = lttng_trace_chunk_registry_find_chunk(
+ consumer_data.chunk_registry, session_id,
+ chunk_id);
+ DBG("Trace chunk %s locally", chunk ? "exists" : "does not exist");
+ if (chunk) {
+ ret_code = LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL;
+ lttng_trace_chunk_put(chunk);
+ goto end;
+ } else if (is_local_trace) {
+ ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
+ goto end;
+ }
+
+ rcu_read_lock();
+ relayd = consumer_find_relayd(*relayd_id);
+ if (!relayd) {
+ ERR("Failed to find relayd %" PRIu64, *relayd_id);
+ ret_code = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
+ goto end_rcu_unlock;
+ }
+ DBG("Looking up existence of trace chunk on relay daemon");
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_trace_chunk_exists(&relayd->control_sock, chunk_id,
+ &chunk_exists_remote);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ ERR("Failed to look-up the existence of trace chunk on relay daemon");
+ ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
+ goto end_rcu_unlock;
+ }
+
+ ret_code = chunk_exists_remote ?
+ LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE :
+ LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
+ DBG("Trace chunk %s on relay daemon",
+ chunk_exists_remote ? "exists" : "does not exist");
+
+end_rcu_unlock:
+ rcu_read_unlock();
+end:
+ return ret_code;
+}