#include <lttngerr.h>
struct lttng_consumer_global_data consumer_data = {
- .stream_list.head = CDS_LIST_HEAD_INIT(consumer_data.stream_list.head),
- .channel_list.head = CDS_LIST_HEAD_INIT(consumer_data.channel_list.head),
.stream_count = 0,
.need_update = 1,
.type = LTTNG_CONSUMER_UNKNOWN,
*/
static struct lttng_consumer_stream *consumer_find_stream(int key)
{
- struct lttng_consumer_stream *iter;
+ struct lttng_ht_iter iter;
+ struct lttng_ht_node_ulong *node;
+ struct lttng_consumer_stream *stream = NULL;
- cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) {
- if (iter->key == key) {
- DBG("Found stream key %d", key);
- return iter;
- }
+ /* Negative keys are lookup failures */
+ if (key < 0)
+ return NULL;
+
+ rcu_read_lock();
+
+ lttng_ht_lookup(consumer_data.stream_ht, (void *)((unsigned long) key),
+ &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ if (node != NULL) {
+ stream = caa_container_of(node, struct lttng_consumer_stream, node);
}
- return NULL;
+
+ rcu_read_unlock();
+
+ return stream;
+}
+
+static void consumer_steal_stream_key(int key)
+{
+ struct lttng_consumer_stream *stream;
+
+ stream = consumer_find_stream(key);
+ if (stream)
+ stream->key = -1;
}
static struct lttng_consumer_channel *consumer_find_channel(int key)
{
- struct lttng_consumer_channel *iter;
+ struct lttng_ht_iter iter;
+ struct lttng_ht_node_ulong *node;
+ struct lttng_consumer_channel *channel = NULL;
- cds_list_for_each_entry(iter, &consumer_data.channel_list.head, list) {
- if (iter->key == key) {
- DBG("Found channel key %d", key);
- return iter;
- }
+ /* Negative keys are lookup failures */
+ if (key < 0)
+ return NULL;
+
+ rcu_read_lock();
+
+ lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
+ &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ if (node != NULL) {
+ channel = caa_container_of(node, struct lttng_consumer_channel, node);
}
- return NULL;
+
+ rcu_read_unlock();
+
+ return channel;
+}
+
+static void consumer_steal_channel_key(int key)
+{
+ struct lttng_consumer_channel *channel;
+
+ channel = consumer_find_channel(key);
+ if (channel)
+ channel->key = -1;
}
/*
void consumer_del_stream(struct lttng_consumer_stream *stream)
{
int ret;
+ struct lttng_ht_iter iter;
struct lttng_consumer_channel *free_chan = NULL;
pthread_mutex_lock(&consumer_data.lock);
}
}
break;
- case LTTNG_CONSUMER_UST:
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
lttng_ustconsumer_del_stream(stream);
break;
default:
goto end;
}
- cds_list_del(&stream->list);
+ rcu_read_lock();
+
+ /* Get stream node from hash table */
+ lttng_ht_lookup(consumer_data.stream_ht,
+ (void *)((unsigned long) stream->key), &iter);
+ /* Remove stream node from hash table */
+ ret = lttng_ht_del(consumer_data.stream_ht, &iter);
+ assert(!ret);
+
+ rcu_read_unlock();
+
if (consumer_data.stream_count <= 0) {
goto end;
}
consumer_del_channel(free_chan);
}
+static void consumer_del_stream_rcu(struct rcu_head *head)
+{
+ struct lttng_ht_node_ulong *node =
+ caa_container_of(head, struct lttng_ht_node_ulong, head);
+ struct lttng_consumer_stream *stream =
+ caa_container_of(node, struct lttng_consumer_stream, node);
+
+ consumer_del_stream(stream);
+}
+
struct lttng_consumer_stream *consumer_allocate_stream(
int channel_key, int stream_key,
int shm_fd, int wait_fd,
enum lttng_consumer_stream_state state,
uint64_t mmap_len,
enum lttng_event_output output,
- const char *path_name)
+ const char *path_name,
+ uid_t uid,
+ gid_t gid)
{
struct lttng_consumer_stream *stream;
int ret;
- stream = malloc(sizeof(*stream));
+ stream = zmalloc(sizeof(*stream));
if (stream == NULL) {
perror("malloc struct lttng_consumer_stream");
goto end;
stream->mmap_len = mmap_len;
stream->mmap_base = NULL;
stream->output = output;
+ stream->uid = uid;
+ stream->gid = gid;
strncpy(stream->path_name, path_name, PATH_MAX - 1);
stream->path_name[PATH_MAX - 1] = '\0';
+ lttng_ht_node_init_ulong(&stream->node, stream->key);
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
break;
- case LTTNG_CONSUMER_UST:
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
stream->cpu = stream->chan->cpucount++;
ret = lttng_ustconsumer_allocate_stream(stream);
if (ret) {
int ret = 0;
pthread_mutex_lock(&consumer_data.lock);
- /* Check if already exist */
- if (consumer_find_stream(stream->key)) {
- ret = -1;
- goto end;
- }
- cds_list_add(&stream->list, &consumer_data.stream_list.head);
+ /* Steal stream identifier, for UST */
+ consumer_steal_stream_key(stream->key);
+ rcu_read_lock();
+ lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
+ rcu_read_unlock();
consumer_data.stream_count++;
consumer_data.need_update = 1;
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
break;
- case LTTNG_CONSUMER_UST:
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
/* Streams are in CPU number order (we rely on this) */
stream->cpu = stream->chan->nr_streams++;
break;
void consumer_del_channel(struct lttng_consumer_channel *channel)
{
int ret;
+ struct lttng_ht_iter iter;
pthread_mutex_lock(&consumer_data.lock);
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
break;
- case LTTNG_CONSUMER_UST:
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
lttng_ustconsumer_del_channel(channel);
break;
default:
goto end;
}
- cds_list_del(&channel->list);
+ rcu_read_lock();
+
+ lttng_ht_lookup(consumer_data.channel_ht,
+ (void *)((unsigned long) channel->key), &iter);
+ ret = lttng_ht_del(consumer_data.channel_ht, &iter);
+ assert(!ret);
+
+ rcu_read_unlock();
+
if (channel->mmap_base != NULL) {
ret = munmap(channel->mmap_base, channel->mmap_len);
if (ret != 0) {
pthread_mutex_unlock(&consumer_data.lock);
}
+static void consumer_del_channel_rcu(struct rcu_head *head)
+{
+ struct lttng_ht_node_ulong *node =
+ caa_container_of(head, struct lttng_ht_node_ulong, head);
+ struct lttng_consumer_channel *channel=
+ caa_container_of(node, struct lttng_consumer_channel, node);
+
+ consumer_del_channel(channel);
+}
+
struct lttng_consumer_channel *consumer_allocate_channel(
int channel_key,
int shm_fd, int wait_fd,
struct lttng_consumer_channel *channel;
int ret;
- channel = malloc(sizeof(*channel));
+ channel = zmalloc(sizeof(*channel));
if (channel == NULL) {
perror("malloc struct lttng_consumer_channel");
goto end;
channel->max_sb_size = max_sb_size;
channel->refcount = 0;
channel->nr_streams = 0;
+ lttng_ht_node_init_ulong(&channel->node, channel->key);
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
channel->mmap_base = NULL;
channel->mmap_len = 0;
break;
- case LTTNG_CONSUMER_UST:
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
ret = lttng_ustconsumer_allocate_channel(channel);
if (ret) {
free(channel);
*/
int consumer_add_channel(struct lttng_consumer_channel *channel)
{
- int ret = 0;
-
pthread_mutex_lock(&consumer_data.lock);
- /* Check if already exist */
- if (consumer_find_channel(channel->key)) {
- ret = -1;
- goto end;
- }
- cds_list_add(&channel->list, &consumer_data.channel_list.head);
-end:
+ /* Steal channel identifier, for UST */
+ consumer_steal_channel_key(channel->key);
+ rcu_read_lock();
+ lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node);
+ rcu_read_unlock();
pthread_mutex_unlock(&consumer_data.lock);
- return ret;
+ return 0;
}
/*
struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
struct lttng_consumer_stream **local_stream)
{
- struct lttng_consumer_stream *iter;
int i = 0;
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_stream *stream;
DBG("Updating poll fd array");
- cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) {
- if (iter->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
+ cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream,
+ node.node) {
+ if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
continue;
}
- DBG("Active FD %d", iter->wait_fd);
- (*pollfd)[i].fd = iter->wait_fd;
+ DBG("Active FD %d", stream->wait_fd);
+ (*pollfd)[i].fd = stream->wait_fd;
(*pollfd)[i].events = POLLIN | POLLPRI;
- local_stream[i] = iter;
+ local_stream[i] = stream;
i++;
}
*/
void lttng_consumer_cleanup(void)
{
- struct lttng_consumer_stream *iter, *tmp;
- struct lttng_consumer_channel *citer, *ctmp;
+ int ret;
+ struct lttng_ht_iter iter;
+ struct lttng_ht_node_ulong *node;
+
+ rcu_read_lock();
/*
- * close all outfd. Called when there are no more threads
- * running (after joining on the threads), no need to protect
- * list iteration with mutex.
+ * close all outfd. Called when there are no more threads running (after
+ * joining on the threads), no need to protect list iteration with mutex.
*/
- cds_list_for_each_entry_safe(iter, tmp,
- &consumer_data.stream_list.head, list) {
- consumer_del_stream(iter);
+ cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, node,
+ node) {
+ ret = lttng_ht_del(consumer_data.stream_ht, &iter);
+ assert(!ret);
+ call_rcu(&node->head, consumer_del_stream_rcu);
}
- cds_list_for_each_entry_safe(citer, ctmp,
- &consumer_data.channel_list.head, list) {
- consumer_del_channel(citer);
+
+ cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, node,
+ node) {
+ ret = lttng_ht_del(consumer_data.channel_ht, &iter);
+ assert(!ret);
+ call_rcu(&node->head, consumer_del_channel_rcu);
}
+
+ rcu_read_unlock();
}
/*
consumer_data.type == type);
consumer_data.type = type;
- ctx = malloc(sizeof(struct lttng_consumer_local_data));
+ ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
if (ctx == NULL) {
perror("allocating context");
goto error;
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
return lttng_kconsumer_on_read_subbuffer_mmap(ctx, stream, len);
- case LTTNG_CONSUMER_UST:
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
return lttng_ustconsumer_on_read_subbuffer_mmap(ctx, stream, len);
default:
ERR("Unknown consumer_data type");
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
return lttng_kconsumer_on_read_subbuffer_splice(ctx, stream, len);
- case LTTNG_CONSUMER_UST:
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
return -ENOSYS;
default:
ERR("Unknown consumer_data type");
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
return lttng_kconsumer_take_snapshot(ctx, stream);
- case LTTNG_CONSUMER_UST:
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
return lttng_ustconsumer_take_snapshot(ctx, stream);
default:
ERR("Unknown consumer_data type");
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
return lttng_kconsumer_get_produced_snapshot(ctx, stream, pos);
- case LTTNG_CONSUMER_UST:
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
return lttng_ustconsumer_get_produced_snapshot(ctx, stream, pos);
default:
ERR("Unknown consumer_data type");
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
- case LTTNG_CONSUMER_UST:
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
default:
ERR("Unknown consumer_data type");
}
/*
- * This thread polls the fds in the ltt_fd_list to consume the data and write
+ * This thread polls the fds in the set to consume the data and write
* it to tracefile if necessary.
*/
void *lttng_consumer_thread_poll_fds(void *data)
int tmp2;
struct lttng_consumer_local_data *ctx = data;
- local_stream = malloc(sizeof(struct lttng_consumer_stream));
+ rcu_register_thread();
+
+ local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
while (1) {
high_prio = 0;
num_hup = 0;
/*
- * the ltt_fd_list has been updated, we need to update our
+ * the fds set has been updated, we need to update our
* local array as well
*/
pthread_mutex_lock(&consumer_data.lock);
}
/* allocate for all fds + 1 for the consumer_poll_pipe */
- pollfd = malloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
+ pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
if (pollfd == NULL) {
perror("pollfd malloc");
pthread_mutex_unlock(&consumer_data.lock);
}
/* allocate for all fds + 1 for the consumer_poll_pipe */
- local_stream = malloc((consumer_data.stream_count + 1) *
+ local_stream = zmalloc((consumer_data.stream_count + 1) *
sizeof(struct lttng_consumer_stream));
if (local_stream == NULL) {
perror("local_stream malloc");
}
} else if (pollfd[i].revents & POLLERR) {
ERR("Error returned in polling fd %d.", pollfd[i].fd);
- consumer_del_stream(local_stream[i]);
+ rcu_read_lock();
+ consumer_del_stream_rcu(&local_stream[i]->node.head);
+ rcu_read_unlock();
num_hup++;
} else if (pollfd[i].revents & POLLNVAL) {
ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
- consumer_del_stream(local_stream[i]);
+ rcu_read_lock();
+ consumer_del_stream_rcu(&local_stream[i]->node.head);
+ rcu_read_unlock();
num_hup++;
} else if ((pollfd[i].revents & POLLHUP) &&
!(pollfd[i].revents & POLLIN)) {
- DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
- consumer_del_stream(local_stream[i]);
+ if (consumer_data.type == LTTNG_CONSUMER32_UST
+ || consumer_data.type == LTTNG_CONSUMER64_UST) {
+ DBG("Polling fd %d tells it has hung up. Attempting flush and read.",
+ pollfd[i].fd);
+ if (!local_stream[i]->hangup_flush_done) {
+ lttng_ustconsumer_on_stream_hangup(local_stream[i]);
+ /* read after flush */
+ do {
+ ret = ctx->on_buffer_ready(local_stream[i], ctx);
+ } while (ret == EAGAIN);
+ }
+ } else {
+ DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
+ }
+ rcu_read_lock();
+ consumer_del_stream_rcu(&local_stream[i]->node.head);
+ rcu_read_unlock();
num_hup++;
}
}
free(local_stream);
local_stream = NULL;
}
+ rcu_unregister_thread();
return NULL;
}
struct pollfd consumer_sockpoll[2];
struct lttng_consumer_local_data *ctx = data;
+ rcu_register_thread();
+
DBG("Creating command socket %s", ctx->consumer_command_sock_path);
unlink(ctx->consumer_command_sock_path);
client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
if (ret < 0) {
perror("poll pipe write");
}
+ rcu_unregister_thread();
return NULL;
}
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
return lttng_kconsumer_read_subbuffer(stream, ctx);
- case LTTNG_CONSUMER_UST:
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
return lttng_ustconsumer_read_subbuffer(stream, ctx);
default:
ERR("Unknown consumer_data type");
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
return lttng_kconsumer_on_recv_stream(stream);
- case LTTNG_CONSUMER_UST:
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
return lttng_ustconsumer_on_recv_stream(stream);
default:
ERR("Unknown consumer_data type");
return -ENOSYS;
}
}
+
+/*
+ * Allocate and set consumer data hash tables.
+ */
+void lttng_consumer_init(void)
+{
+ consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+}
+