X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=831592a1e96ef4a8a22b64fdc29ffeaa75f62786;hp=05bf85b3dc8d8e73e0bbe62497a64bf0f0bc3e26;hb=f64161251bd649abe5b6a473531adfa3af9bd6b6;hpb=a186a15913c34e8adc83ea71565d3b0eec296774 diff --git a/src/common/consumer.c b/src/common/consumer.c index 05bf85b3d..831592a1e 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -1,20 +1,20 @@ /* * Copyright (C) 2011 - Julien Desfossez * Mathieu Desnoyers + * 2012 - David Goulet * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; only version 2 - * of the License. + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2 only, + * as published by the Free Software Foundation. * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #define _GNU_SOURCE @@ -30,8 +30,10 @@ #include #include +#include #include #include +#include #include #include "consumer.h" @@ -85,9 +87,18 @@ static void consumer_steal_stream_key(int key) { struct lttng_consumer_stream *stream; + rcu_read_lock(); stream = consumer_find_stream(key); - if (stream) + if (stream) { stream->key = -1; + /* + * We don't want the lookup to match, but we still need + * to iterate on this stream when iterating over the hash table. Just + * change the node key. + */ + stream->node.key = -1; + } + rcu_read_unlock(); } static struct lttng_consumer_channel *consumer_find_channel(int key) @@ -118,9 +129,18 @@ static void consumer_steal_channel_key(int key) { struct lttng_consumer_channel *channel; + rcu_read_lock(); channel = consumer_find_channel(key); - if (channel) + if (channel) { channel->key = -1; + /* + * We don't want the lookup to match, but we still need + * to iterate on this channel when iterating over the hash table. Just + * change the node key. + */ + channel->node.key = -1; + } + rcu_read_unlock(); } static @@ -134,6 +154,45 @@ void consumer_free_stream(struct rcu_head *head) free(stream); } +/* + * RCU protected relayd socket pair free. + */ +static void consumer_rcu_free_relayd(struct rcu_head *head) +{ + struct lttng_ht_node_ulong *node = + caa_container_of(head, struct lttng_ht_node_ulong, head); + struct consumer_relayd_sock_pair *relayd = + caa_container_of(node, struct consumer_relayd_sock_pair, node); + + free(relayd); +} + +/* + * Destroy and free relayd socket pair object. + * + * This function MUST be called with the consumer_data lock acquired. + */ +void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd) +{ + int ret; + struct lttng_ht_iter iter; + + DBG("Consumer destroy and close relayd socket pair"); + + iter.iter.node = &relayd->node.node; + ret = lttng_ht_del(consumer_data.relayd_ht, &iter); + assert(!ret); + + /* Close all sockets */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + (void) relayd_close(&relayd->control_sock); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + (void) relayd_close(&relayd->data_sock); + + /* RCU free() call */ + call_rcu(&relayd->node.head, consumer_rcu_free_relayd); +} + /* * Remove a stream from the global list protected by a mutex. This * function is also responsible for freeing its data structures. @@ -143,6 +202,9 @@ void consumer_del_stream(struct lttng_consumer_stream *stream) int ret; struct lttng_ht_iter iter; struct lttng_consumer_channel *free_chan = NULL; + struct consumer_relayd_sock_pair *relayd; + + assert(stream); pthread_mutex_lock(&consumer_data.lock); @@ -166,15 +228,9 @@ void consumer_del_stream(struct lttng_consumer_stream *stream) } rcu_read_lock(); - - /* Get stream node from hash table */ - lttng_ht_lookup(consumer_data.stream_ht, - (void *)((unsigned long) stream->key), &iter); - /* - * Remove stream node from hash table. It can fail if it's been - * replaced due to key reuse. - */ - (void) lttng_ht_del(consumer_data.stream_ht, &iter); + iter.iter.node = &stream->node.node; + ret = lttng_ht_del(consumer_data.stream_ht, &iter); + assert(!ret); rcu_read_unlock(); @@ -203,8 +259,24 @@ void consumer_del_stream(struct lttng_consumer_stream *stream) PERROR("close"); } } - if (!--stream->chan->refcount) + + /* Check and cleanup relayd */ + rcu_read_lock(); + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd != NULL) { + uatomic_dec(&relayd->refcount); + assert(uatomic_read(&relayd->refcount) >= 0); + if (uatomic_read(&relayd->refcount) == 0) { + /* Refcount of the relayd struct is 0, destroy it */ + consumer_destroy_relayd(relayd); + } + } + rcu_read_unlock(); + + if (!--stream->chan->refcount) { free_chan = stream->chan; + } + call_rcu(&stream->node.head, consumer_free_stream); end: @@ -223,7 +295,9 @@ struct lttng_consumer_stream *consumer_allocate_stream( enum lttng_event_output output, const char *path_name, uid_t uid, - gid_t gid) + gid_t gid, + int net_index, + int metadata_flag) { struct lttng_consumer_stream *stream; int ret; @@ -250,9 +324,12 @@ struct lttng_consumer_stream *consumer_allocate_stream( stream->output = output; stream->uid = uid; stream->gid = gid; - strncpy(stream->path_name, path_name, PATH_MAX - 1); - stream->path_name[PATH_MAX - 1] = '\0'; + stream->net_seq_idx = net_index; + stream->metadata_flag = metadata_flag; + strncpy(stream->path_name, path_name, sizeof(stream->path_name)); + stream->path_name[sizeof(stream->path_name) - 1] = '\0'; lttng_ht_node_init_ulong(&stream->node, stream->key); + lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd); switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: @@ -271,12 +348,13 @@ struct lttng_consumer_stream *consumer_allocate_stream( assert(0); goto end; } - DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d)", + DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d, net_seq_idx %d)", stream->path_name, stream->key, stream->shm_fd, stream->wait_fd, (unsigned long long) stream->mmap_len, - stream->out_fd); + stream->out_fd, + stream->net_seq_idx); end: return stream; } @@ -287,18 +365,34 @@ end: int consumer_add_stream(struct lttng_consumer_stream *stream) { int ret = 0; + struct lttng_ht_node_ulong *node; + struct lttng_ht_iter iter; + struct consumer_relayd_sock_pair *relayd; pthread_mutex_lock(&consumer_data.lock); /* Steal stream identifier, for UST */ consumer_steal_stream_key(stream->key); + rcu_read_lock(); - /* - * We simply remove the old channel from the hash table. It's - * ok, since we know for sure the sessiond wants to replace it - * with the new version, because the key has been reused. - */ - (void) lttng_ht_add_replace_ulong(consumer_data.stream_ht, &stream->node); + lttng_ht_lookup(consumer_data.stream_ht, + (void *)((unsigned long) stream->key), &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node != NULL) { + rcu_read_unlock(); + /* Stream already exist. Ignore the insertion */ + goto end; + } + + lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node); + + /* Check and cleanup relayd */ + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd != NULL) { + uatomic_inc(&relayd->refcount); + } rcu_read_unlock(); + + /* Update consumer data */ consumer_data.stream_count++; consumer_data.need_update = 1; @@ -322,6 +416,153 @@ end: return ret; } +/* + * Add relayd socket to global consumer data hashtable. + */ +int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd) +{ + int ret = 0; + struct lttng_ht_node_ulong *node; + struct lttng_ht_iter iter; + + if (relayd == NULL) { + ret = -1; + goto end; + } + + rcu_read_lock(); + + lttng_ht_lookup(consumer_data.relayd_ht, + (void *)((unsigned long) relayd->net_seq_idx), &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node != NULL) { + rcu_read_unlock(); + /* Relayd already exist. Ignore the insertion */ + goto end; + } + lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node); + + rcu_read_unlock(); + +end: + return ret; +} + +/* + * Allocate and return a consumer relayd socket. + */ +struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair( + int net_seq_idx) +{ + struct consumer_relayd_sock_pair *obj = NULL; + + /* Negative net sequence index is a failure */ + if (net_seq_idx < 0) { + goto error; + } + + obj = zmalloc(sizeof(struct consumer_relayd_sock_pair)); + if (obj == NULL) { + PERROR("zmalloc relayd sock"); + goto error; + } + + obj->net_seq_idx = net_seq_idx; + obj->refcount = 0; + lttng_ht_node_init_ulong(&obj->node, obj->net_seq_idx); + pthread_mutex_init(&obj->ctrl_sock_mutex, NULL); + +error: + return obj; +} + +/* + * Find a relayd socket pair in the global consumer data. + * + * Return the object if found else NULL. + * RCU read-side lock must be held across this call and while using the + * returned object. + */ +struct consumer_relayd_sock_pair *consumer_find_relayd(int key) +{ + struct lttng_ht_iter iter; + struct lttng_ht_node_ulong *node; + struct consumer_relayd_sock_pair *relayd = NULL; + + /* Negative keys are lookup failures */ + if (key < 0) { + goto error; + } + + lttng_ht_lookup(consumer_data.relayd_ht, (void *)((unsigned long) key), + &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node != NULL) { + relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node); + } + +error: + return relayd; +} + +/* + * Handle stream for relayd transmission if the stream applies for network + * streaming where the net sequence index is set. + * + * Return destination file descriptor or negative value on error. + */ +int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, + size_t data_size) +{ + int outfd = -1, ret; + struct consumer_relayd_sock_pair *relayd; + struct lttcomm_relayd_data_hdr data_hdr; + + /* Safety net */ + assert(stream); + + /* Reset data header */ + memset(&data_hdr, 0, sizeof(data_hdr)); + + rcu_read_lock(); + /* Get relayd reference of the stream. */ + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd == NULL) { + /* Stream is either local or corrupted */ + goto error; + } + + DBG("Consumer found relayd socks with index %d", stream->net_seq_idx); + if (stream->metadata_flag) { + /* Caller MUST acquire the relayd control socket lock */ + ret = relayd_send_metadata(&relayd->control_sock, data_size); + if (ret < 0) { + goto error; + } + + /* Metadata are always sent on the control socket. */ + outfd = relayd->control_sock.fd; + } else { + /* Set header with stream information */ + data_hdr.stream_id = htobe64(stream->relayd_stream_id); + data_hdr.data_size = htobe32(data_size); + /* Other fields are zeroed previously */ + + ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr, + sizeof(data_hdr)); + if (ret < 0) { + goto error; + } + + /* Set to go on data socket */ + outfd = relayd->data_sock.fd; + } + +error: + rcu_read_unlock(); + return outfd; +} + /* * Update a stream according to what we just received. */ @@ -375,16 +616,9 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) } rcu_read_lock(); - - lttng_ht_lookup(consumer_data.channel_ht, - (void *)((unsigned long) channel->key), &iter); - - /* - * Remove channel node from hash table. It can fail if it's been - * replaced due to key reuse. - */ - (void) lttng_ht_del(consumer_data.channel_ht, &iter); - + iter.iter.node = &channel->node.node; + ret = lttng_ht_del(consumer_data.channel_ht, &iter); + assert(!ret); rcu_read_unlock(); if (channel->mmap_base != NULL) { @@ -453,9 +687,7 @@ struct lttng_consumer_channel *consumer_allocate_channel( goto end; } DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)", - channel->key, - channel->shm_fd, - channel->wait_fd, + channel->key, channel->shm_fd, channel->wait_fd, (unsigned long long) channel->mmap_len, (unsigned long long) channel->max_sb_size); end: @@ -467,16 +699,25 @@ end: */ int consumer_add_channel(struct lttng_consumer_channel *channel) { + struct lttng_ht_node_ulong *node; + struct lttng_ht_iter iter; + pthread_mutex_lock(&consumer_data.lock); /* Steal channel identifier, for UST */ consumer_steal_channel_key(channel->key); rcu_read_lock(); - /* - * We simply remove the old channel from the hash table. It's - * ok, since we know for sure the sessiond wants to replace it - * with the new version, because the key has been reused. - */ - (void) lttng_ht_add_replace_ulong(consumer_data.channel_ht, &channel->node); + + lttng_ht_lookup(consumer_data.channel_ht, + (void *)((unsigned long) channel->key), &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node != NULL) { + /* Channel already exist. Ignore the insertion */ + goto end; + } + + lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node); + +end: rcu_read_unlock(); pthread_mutex_unlock(&consumer_data.lock); @@ -492,13 +733,15 @@ int consumer_add_channel(struct lttng_consumer_channel *channel) */ int consumer_update_poll_array( struct lttng_consumer_local_data *ctx, struct pollfd **pollfd, - struct lttng_consumer_stream **local_stream) + struct lttng_consumer_stream **local_stream, + struct lttng_ht *metadata_ht) { int i = 0; struct lttng_ht_iter iter; struct lttng_consumer_stream *stream; DBG("Updating poll fd array"); + rcu_read_lock(); cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream, node.node) { if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) { @@ -507,16 +750,21 @@ int consumer_update_poll_array( DBG("Active FD %d", stream->wait_fd); (*pollfd)[i].fd = stream->wait_fd; (*pollfd)[i].events = POLLIN | POLLPRI; + if (stream->metadata_flag && metadata_ht) { + lttng_ht_add_unique_ulong(metadata_ht, &stream->waitfd_node); + DBG("Active FD added to metadata hash table"); + } local_stream[i] = stream; i++; } + rcu_read_unlock(); /* * Insert the consumer_poll_pipe at the end of the array and don't * increment i so nb_fd is the number of real FD. */ (*pollfd)[i].fd = ctx->consumer_poll_pipe[0]; - (*pollfd)[i].events = POLLIN; + (*pollfd)[i].events = POLLIN | POLLPRI; return i; } @@ -540,7 +788,7 @@ restart: perror("Poll error"); goto exit; } - if (consumer_sockpoll[0].revents == POLLIN) { + if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) { DBG("consumer_should_quit wake up"); goto exit; } @@ -562,7 +810,6 @@ void lttng_consumer_set_error_sock( /* * Set the command socket path. */ - void lttng_consumer_set_command_sock_path( struct lttng_consumer_local_data *ctx, char *sock) { @@ -614,6 +861,9 @@ void lttng_consumer_cleanup(void) } rcu_read_unlock(); + + lttng_ht_destroy(consumer_data.stream_ht); + lttng_ht_destroy(consumer_data.channel_ht); } /* @@ -623,14 +873,16 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx) { int ret; consumer_quit = 1; - ret = write(ctx->consumer_should_quit[1], "4", 1); + do { + ret = write(ctx->consumer_should_quit[1], "4", 1); + } while (ret < 0 && errno == EINTR); if (ret < 0) { perror("write consumer quit"); } } -void lttng_consumer_sync_trace_file( - struct lttng_consumer_stream *stream, off_t orig_offset) +void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, + off_t orig_offset) { int outfd = stream->out_fd; @@ -714,6 +966,20 @@ struct lttng_consumer_local_data *lttng_consumer_create( goto error_poll_pipe; } + /* set read end of the pipe to non-blocking */ + ret = fcntl(ctx->consumer_poll_pipe[0], F_SETFL, O_NONBLOCK); + if (ret < 0) { + perror("fcntl O_NONBLOCK"); + goto error_poll_fcntl; + } + + /* set write end of the pipe to non-blocking */ + ret = fcntl(ctx->consumer_poll_pipe[1], F_SETFL, O_NONBLOCK); + if (ret < 0) { + perror("fcntl O_NONBLOCK"); + goto error_poll_fcntl; + } + ret = pipe(ctx->consumer_should_quit); if (ret < 0) { perror("Error creating recv pipe"); @@ -738,6 +1004,7 @@ error_thread_pipe: PERROR("close"); } } +error_poll_fcntl: error_quit_pipe: for (i = 0; i < 2; i++) { int err; @@ -911,9 +1178,14 @@ void *lttng_consumer_thread_poll_fds(void *data) struct lttng_consumer_stream **local_stream = NULL; /* local view of consumer_data.fds_count */ int nb_fd = 0; - char tmp; - int tmp2; struct lttng_consumer_local_data *ctx = data; + struct lttng_ht *metadata_ht; + struct lttng_ht_iter iter; + struct lttng_ht_node_ulong *node; + struct lttng_consumer_stream *metadata_stream; + ssize_t len; + + metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); rcu_register_thread(); @@ -954,7 +1226,8 @@ void *lttng_consumer_thread_poll_fds(void *data) pthread_mutex_unlock(&consumer_data.lock); goto end; } - ret = consumer_update_poll_array(ctx, &pollfd, local_stream); + ret = consumer_update_poll_array(ctx, &pollfd, local_stream, + metadata_ht); if (ret < 0) { ERR("Error in allocating pollfd or local_outfds"); lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR); @@ -991,25 +1264,42 @@ void *lttng_consumer_thread_poll_fds(void *data) } /* - * If the consumer_poll_pipe triggered poll go - * directly to the beginning of the loop to update the - * array. We want to prioritize array update over - * low-priority reads. + * If the consumer_poll_pipe triggered poll go directly to the + * beginning of the loop to update the array. We want to prioritize + * array update over low-priority reads. */ - if (pollfd[nb_fd].revents & POLLIN) { + if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) { + size_t pipe_readlen; + char tmp; + DBG("consumer_poll_pipe wake up"); - tmp2 = read(ctx->consumer_poll_pipe[0], &tmp, 1); - if (tmp2 < 0) { - perror("read consumer poll"); - } + /* Consume 1 byte of pipe data */ + do { + pipe_readlen = read(ctx->consumer_poll_pipe[0], &tmp, 1); + } while (pipe_readlen == -1 && errno == EINTR); continue; } /* Take care of high priority channels first. */ for (i = 0; i < nb_fd; i++) { - if (pollfd[i].revents & POLLPRI) { - ssize_t len; - + /* Lookup for metadata which is the highest priority */ + lttng_ht_lookup(metadata_ht, + (void *)((unsigned long) pollfd[i].fd), &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node != NULL && + (pollfd[i].revents & (POLLIN | POLLPRI))) { + DBG("Urgent metadata read on fd %d", pollfd[i].fd); + metadata_stream = caa_container_of(node, + struct lttng_consumer_stream, waitfd_node); + high_prio = 1; + len = ctx->on_buffer_ready(metadata_stream, ctx); + /* it's ok to have an unavailable sub-buffer */ + if (len < 0 && len != -EAGAIN) { + goto end; + } else if (len > 0) { + metadata_stream->data_read = 1; + } + } else if (pollfd[i].revents & POLLPRI) { DBG("Urgent read on fd %d", pollfd[i].fd); high_prio = 1; len = ctx->on_buffer_ready(local_stream[i], ctx); @@ -1034,10 +1324,6 @@ void *lttng_consumer_thread_poll_fds(void *data) for (i = 0; i < nb_fd; i++) { if ((pollfd[i].revents & POLLIN) || local_stream[i]->hangup_flush_done) { - ssize_t len; - - assert(!(pollfd[i].revents & POLLERR)); - assert(!(pollfd[i].revents & POLLNVAL)); DBG("Normal read on fd %d", pollfd[i].fd); len = ctx->on_buffer_ready(local_stream[i], ctx); /* it's ok to have an unavailable sub-buffer */ @@ -1069,18 +1355,33 @@ void *lttng_consumer_thread_poll_fds(void *data) if ((pollfd[i].revents & POLLHUP)) { DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); if (!local_stream[i]->data_read) { + if (local_stream[i]->metadata_flag) { + iter.iter.node = &local_stream[i]->waitfd_node.node; + ret = lttng_ht_del(metadata_ht, &iter); + assert(!ret); + } consumer_del_stream(local_stream[i]); num_hup++; } } else if (pollfd[i].revents & POLLERR) { ERR("Error returned in polling fd %d.", pollfd[i].fd); if (!local_stream[i]->data_read) { + if (local_stream[i]->metadata_flag) { + iter.iter.node = &local_stream[i]->waitfd_node.node; + ret = lttng_ht_del(metadata_ht, &iter); + assert(!ret); + } consumer_del_stream(local_stream[i]); num_hup++; } } else if (pollfd[i].revents & POLLNVAL) { ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); if (!local_stream[i]->data_read) { + if (local_stream[i]->metadata_flag) { + iter.iter.node = &local_stream[i]->waitfd_node.node; + ret = lttng_ht_del(metadata_ht, &iter); + assert(!ret); + } consumer_del_stream(local_stream[i]); num_hup++; } @@ -1208,11 +1509,20 @@ end: */ consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT; - /* wake up the polling thread */ - ret = write(ctx->consumer_poll_pipe[1], "4", 1); - if (ret < 0) { - perror("poll pipe write"); - } + /* + * Wake-up the other end by writing a null byte in the pipe + * (non-blocking). Important note: Because writing into the + * pipe is non-blocking (and therefore we allow dropping wakeup + * data, as long as there is wakeup data present in the pipe + * buffer to wake up the other end), the other end should + * perform the following sequence for waiting: + * 1) empty the pipe (reads). + * 2) perform update operation. + * 3) wait on the pipe (poll). + */ + do { + ret = write(ctx->consumer_poll_pipe[1], "", 1); + } while (ret < 0 && errno == EINTR); rcu_unregister_thread(); return NULL; } @@ -1255,5 +1565,5 @@ void lttng_consumer_init(void) { consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); } -