/*
* Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
* Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * 2012 - David Goulet <dgoulet@efficios.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License, version 2 only,
#include <common/common.h>
#include <common/kernel-ctl/kernel-ctl.h>
+#include <common/sessiond-comm/relayd.h>
#include <common/sessiond-comm/sessiond-comm.h>
#include <common/kernel-consumer/kernel-consumer.h>
+#include <common/relayd/relayd.h>
#include <common/ust-consumer/ust-consumer.h>
#include "consumer.h"
free(stream);
}
+/*
+ * RCU protected relayd socket pair free.
+ */
+static void consumer_rcu_free_relayd(struct rcu_head *head)
+{
+ struct lttng_ht_node_ulong *node =
+ caa_container_of(head, struct lttng_ht_node_ulong, head);
+ struct consumer_relayd_sock_pair *relayd =
+ caa_container_of(node, struct consumer_relayd_sock_pair, node);
+
+ free(relayd);
+}
+
+/*
+ * Destroy and free relayd socket pair object.
+ *
+ * This function MUST be called with the consumer_data lock acquired.
+ */
+void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
+{
+ int ret;
+ struct lttng_ht_iter iter;
+
+ DBG("Consumer destroy and close relayd socket pair");
+
+ iter.iter.node = &relayd->node.node;
+ ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
+ assert(!ret);
+
+ /* Close all sockets */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ (void) relayd_close(&relayd->control_sock);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ (void) relayd_close(&relayd->data_sock);
+
+ /* RCU free() call */
+ call_rcu(&relayd->node.head, consumer_rcu_free_relayd);
+}
+
/*
* Remove a stream from the global list protected by a mutex. This
* function is also responsible for freeing its data structures.
int ret;
struct lttng_ht_iter iter;
struct lttng_consumer_channel *free_chan = NULL;
+ struct consumer_relayd_sock_pair *relayd;
+
+ assert(stream);
pthread_mutex_lock(&consumer_data.lock);
PERROR("close");
}
}
- if (!--stream->chan->refcount)
+
+ /* Check and cleanup relayd */
+ relayd = consumer_find_relayd(stream->net_seq_idx);
+ if (relayd != NULL) {
+ /* We are about to modify the relayd refcount */
+ rcu_read_lock();
+ if (!--relayd->refcount) {
+ /* Refcount of the relayd struct is 0, destroy it */
+ consumer_destroy_relayd(relayd);
+ }
+ rcu_read_unlock();
+ }
+
+ if (!--stream->chan->refcount) {
free_chan = stream->chan;
+ }
+
call_rcu(&stream->node.head, consumer_free_stream);
end:
enum lttng_event_output output,
const char *path_name,
uid_t uid,
- gid_t gid)
+ gid_t gid,
+ int net_index,
+ int metadata_flag)
{
struct lttng_consumer_stream *stream;
int ret;
stream->output = output;
stream->uid = uid;
stream->gid = gid;
- strncpy(stream->path_name, path_name, PATH_MAX - 1);
- stream->path_name[PATH_MAX - 1] = '\0';
+ stream->net_seq_idx = net_index;
+ stream->metadata_flag = metadata_flag;
+ strncpy(stream->path_name, path_name, sizeof(stream->path_name));
+ stream->path_name[sizeof(stream->path_name) - 1] = '\0';
lttng_ht_node_init_ulong(&stream->node, stream->key);
+ lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd);
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
assert(0);
goto end;
}
- DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d)",
+ DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d, net_seq_idx %d)",
stream->path_name, stream->key,
stream->shm_fd,
stream->wait_fd,
(unsigned long long) stream->mmap_len,
- stream->out_fd);
+ stream->out_fd,
+ stream->net_seq_idx);
end:
return stream;
}
int ret = 0;
struct lttng_ht_node_ulong *node;
struct lttng_ht_iter iter;
+ struct consumer_relayd_sock_pair *relayd;
pthread_mutex_lock(&consumer_data.lock);
/* Steal stream identifier, for UST */
lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
rcu_read_unlock();
+
+ /* Check and cleanup relayd */
+ relayd = consumer_find_relayd(stream->net_seq_idx);
+ if (relayd != NULL) {
+ /* We are about to modify the relayd refcount */
+ rcu_read_lock();
+ relayd->refcount++;
+ rcu_read_unlock();
+ }
+
+ /* Update consumer data */
consumer_data.stream_count++;
consumer_data.need_update = 1;
return ret;
}
+/*
+ * Add relayd socket to global consumer data hashtable.
+ */
+int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd)
+{
+ int ret = 0;
+ struct lttng_ht_node_ulong *node;
+ struct lttng_ht_iter iter;
+
+ if (relayd == NULL) {
+ ret = -1;
+ goto end;
+ }
+
+ rcu_read_lock();
+
+ lttng_ht_lookup(consumer_data.relayd_ht,
+ (void *)((unsigned long) relayd->net_seq_idx), &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ if (node != NULL) {
+ rcu_read_unlock();
+ /* Relayd already exist. Ignore the insertion */
+ goto end;
+ }
+ lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node);
+
+ rcu_read_unlock();
+
+end:
+ return ret;
+}
+
+/*
+ * Allocate and return a consumer relayd socket.
+ */
+struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
+ int net_seq_idx)
+{
+ struct consumer_relayd_sock_pair *obj = NULL;
+
+ /* Negative net sequence index is a failure */
+ if (net_seq_idx < 0) {
+ goto error;
+ }
+
+ obj = zmalloc(sizeof(struct consumer_relayd_sock_pair));
+ if (obj == NULL) {
+ PERROR("zmalloc relayd sock");
+ goto error;
+ }
+
+ obj->net_seq_idx = net_seq_idx;
+ obj->refcount = 0;
+ lttng_ht_node_init_ulong(&obj->node, obj->net_seq_idx);
+ pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
+
+error:
+ return obj;
+}
+
+/*
+ * Find a relayd socket pair in the global consumer data.
+ *
+ * Return the object if found else NULL.
+ */
+struct consumer_relayd_sock_pair *consumer_find_relayd(int key)
+{
+ struct lttng_ht_iter iter;
+ struct lttng_ht_node_ulong *node;
+ struct consumer_relayd_sock_pair *relayd = NULL;
+
+ /* Negative keys are lookup failures */
+ if (key < 0) {
+ goto error;
+ }
+
+ rcu_read_lock();
+
+ lttng_ht_lookup(consumer_data.relayd_ht, (void *)((unsigned long) key),
+ &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ if (node != NULL) {
+ relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
+ }
+
+ rcu_read_unlock();
+
+error:
+ return relayd;
+}
+
+/*
+ * Handle stream for relayd transmission if the stream applies for network
+ * streaming where the net sequence index is set.
+ *
+ * Return destination file descriptor or negative value on error.
+ */
+int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
+ size_t data_size)
+{
+ int outfd = -1, ret;
+ struct consumer_relayd_sock_pair *relayd;
+ struct lttcomm_relayd_data_hdr data_hdr;
+
+ /* Safety net */
+ assert(stream);
+
+ /* Reset data header */
+ memset(&data_hdr, 0, sizeof(data_hdr));
+
+ /* Get relayd reference of the stream. */
+ relayd = consumer_find_relayd(stream->net_seq_idx);
+ if (relayd == NULL) {
+ /* Stream is either local or corrupted */
+ goto error;
+ }
+
+ DBG("Consumer found relayd socks with index %d", stream->net_seq_idx);
+ if (stream->metadata_flag) {
+ /* Caller MUST acquire the relayd control socket lock */
+ ret = relayd_send_metadata(&relayd->control_sock, data_size);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Metadata are always sent on the control socket. */
+ outfd = relayd->control_sock.fd;
+ } else {
+ /* Set header with stream information */
+ data_hdr.stream_id = htobe64(stream->relayd_stream_id);
+ data_hdr.data_size = htobe32(data_size);
+ /* Other fields are zeroed previously */
+
+ ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
+ sizeof(data_hdr));
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Set to go on data socket */
+ outfd = relayd->data_sock.fd;
+ }
+
+error:
+ return outfd;
+}
+
/*
* Update a stream according to what we just received.
*/
goto end;
}
DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)",
- channel->key,
- channel->shm_fd,
- channel->wait_fd,
+ channel->key, channel->shm_fd, channel->wait_fd,
(unsigned long long) channel->mmap_len,
(unsigned long long) channel->max_sb_size);
end:
*/
int consumer_update_poll_array(
struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
- struct lttng_consumer_stream **local_stream)
+ struct lttng_consumer_stream **local_stream,
+ struct lttng_ht *metadata_ht)
{
int i = 0;
struct lttng_ht_iter iter;
DBG("Active FD %d", stream->wait_fd);
(*pollfd)[i].fd = stream->wait_fd;
(*pollfd)[i].events = POLLIN | POLLPRI;
+ if (stream->metadata_flag && metadata_ht) {
+ lttng_ht_add_unique_ulong(metadata_ht, &stream->waitfd_node);
+ DBG("Active FD added to metadata hash table");
+ }
local_stream[i] = stream;
i++;
}
/*
* Set the command socket path.
*/
-
void lttng_consumer_set_command_sock_path(
struct lttng_consumer_local_data *ctx, char *sock)
{
}
}
-void lttng_consumer_sync_trace_file(
- struct lttng_consumer_stream *stream, off_t orig_offset)
+void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
+ off_t orig_offset)
{
int outfd = stream->out_fd;
/* local view of consumer_data.fds_count */
int nb_fd = 0;
struct lttng_consumer_local_data *ctx = data;
+ struct lttng_ht *metadata_ht;
+ struct lttng_ht_iter iter;
+ struct lttng_ht_node_ulong *node;
+ struct lttng_consumer_stream *metadata_stream;
+ ssize_t len;
+
+ metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
rcu_register_thread();
pthread_mutex_unlock(&consumer_data.lock);
goto end;
}
- ret = consumer_update_poll_array(ctx, &pollfd, local_stream);
+ ret = consumer_update_poll_array(ctx, &pollfd, local_stream,
+ metadata_ht);
if (ret < 0) {
ERR("Error in allocating pollfd or local_outfds");
lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
}
/*
- * If the consumer_poll_pipe triggered poll go
- * directly to the beginning of the loop to update the
- * array. We want to prioritize array update over
- * low-priority reads.
+ * If the consumer_poll_pipe triggered poll go directly to the
+ * beginning of the loop to update the array. We want to prioritize
+ * array update over low-priority reads.
*/
if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
size_t pipe_readlen;
/* Take care of high priority channels first. */
for (i = 0; i < nb_fd; i++) {
- if (pollfd[i].revents & POLLPRI) {
- ssize_t len;
-
+ /* Lookup for metadata which is the highest priority */
+ lttng_ht_lookup(metadata_ht,
+ (void *)((unsigned long) pollfd[i].fd), &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ if (node != NULL &&
+ (pollfd[i].revents & (POLLIN | POLLPRI))) {
+ DBG("Urgent metadata read on fd %d", pollfd[i].fd);
+ metadata_stream = caa_container_of(node,
+ struct lttng_consumer_stream, waitfd_node);
+ high_prio = 1;
+ len = ctx->on_buffer_ready(metadata_stream, ctx);
+ /* it's ok to have an unavailable sub-buffer */
+ if (len < 0 && len != -EAGAIN) {
+ goto end;
+ } else if (len > 0) {
+ metadata_stream->data_read = 1;
+ }
+ } else if (pollfd[i].revents & POLLPRI) {
DBG("Urgent read on fd %d", pollfd[i].fd);
high_prio = 1;
len = ctx->on_buffer_ready(local_stream[i], ctx);
for (i = 0; i < nb_fd; i++) {
if ((pollfd[i].revents & POLLIN) ||
local_stream[i]->hangup_flush_done) {
- ssize_t len;
-
DBG("Normal read on fd %d", pollfd[i].fd);
len = ctx->on_buffer_ready(local_stream[i], ctx);
/* it's ok to have an unavailable sub-buffer */
if ((pollfd[i].revents & POLLHUP)) {
DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
if (!local_stream[i]->data_read) {
+ if (local_stream[i]->metadata_flag) {
+ iter.iter.node = &local_stream[i]->waitfd_node.node;
+ ret = lttng_ht_del(metadata_ht, &iter);
+ assert(!ret);
+ }
consumer_del_stream(local_stream[i]);
num_hup++;
}
} else if (pollfd[i].revents & POLLERR) {
ERR("Error returned in polling fd %d.", pollfd[i].fd);
if (!local_stream[i]->data_read) {
+ if (local_stream[i]->metadata_flag) {
+ iter.iter.node = &local_stream[i]->waitfd_node.node;
+ ret = lttng_ht_del(metadata_ht, &iter);
+ assert(!ret);
+ }
consumer_del_stream(local_stream[i]);
num_hup++;
}
} else if (pollfd[i].revents & POLLNVAL) {
ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
if (!local_stream[i]->data_read) {
+ if (local_stream[i]->metadata_flag) {
+ iter.iter.node = &local_stream[i]->waitfd_node.node;
+ ret = lttng_ht_del(metadata_ht, &iter);
+ assert(!ret);
+ }
consumer_del_stream(local_stream[i]);
num_hup++;
}
{
consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
}
-