X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=88e8d9d28017ed5fb0fccbee22fad316f1ed402c;hp=dd8c621f177d56968306fb500ce4546870b5bd2d;hb=00e2e675d54dc726a7c8f8887c889cc8ef022003;hpb=b8aa16822f579a6e15b41d2761801a0a65d5f2a5 diff --git a/src/common/consumer.c b/src/common/consumer.c index dd8c621f1..88e8d9d28 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -1,6 +1,7 @@ /* * Copyright (C) 2011 - Julien Desfossez * Mathieu Desnoyers + * 2012 - David Goulet * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License, version 2 only, @@ -29,8 +30,10 @@ #include #include +#include #include #include +#include #include #include "consumer.h" @@ -151,6 +154,45 @@ void consumer_free_stream(struct rcu_head *head) free(stream); } +/* + * RCU protected relayd socket pair free. + */ +static void consumer_rcu_free_relayd(struct rcu_head *head) +{ + struct lttng_ht_node_ulong *node = + caa_container_of(head, struct lttng_ht_node_ulong, head); + struct consumer_relayd_sock_pair *relayd = + caa_container_of(node, struct consumer_relayd_sock_pair, node); + + free(relayd); +} + +/* + * Destroy and free relayd socket pair object. + * + * This function MUST be called with the consumer_data lock acquired. + */ +void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd) +{ + int ret; + struct lttng_ht_iter iter; + + DBG("Consumer destroy and close relayd socket pair"); + + iter.iter.node = &relayd->node.node; + ret = lttng_ht_del(consumer_data.relayd_ht, &iter); + assert(!ret); + + /* Close all sockets */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + (void) relayd_close(&relayd->control_sock); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + (void) relayd_close(&relayd->data_sock); + + /* RCU free() call */ + call_rcu(&relayd->node.head, consumer_rcu_free_relayd); +} + /* * Remove a stream from the global list protected by a mutex. This * function is also responsible for freeing its data structures. @@ -160,6 +202,9 @@ void consumer_del_stream(struct lttng_consumer_stream *stream) int ret; struct lttng_ht_iter iter; struct lttng_consumer_channel *free_chan = NULL; + struct consumer_relayd_sock_pair *relayd; + + assert(stream); pthread_mutex_lock(&consumer_data.lock); @@ -214,8 +259,23 @@ void consumer_del_stream(struct lttng_consumer_stream *stream) PERROR("close"); } } - if (!--stream->chan->refcount) + + /* Check and cleanup relayd */ + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd != NULL) { + /* We are about to modify the relayd refcount */ + rcu_read_lock(); + if (!--relayd->refcount) { + /* Refcount of the relayd struct is 0, destroy it */ + consumer_destroy_relayd(relayd); + } + rcu_read_unlock(); + } + + if (!--stream->chan->refcount) { free_chan = stream->chan; + } + call_rcu(&stream->node.head, consumer_free_stream); end: @@ -234,7 +294,9 @@ struct lttng_consumer_stream *consumer_allocate_stream( enum lttng_event_output output, const char *path_name, uid_t uid, - gid_t gid) + gid_t gid, + int net_index, + int metadata_flag) { struct lttng_consumer_stream *stream; int ret; @@ -261,9 +323,12 @@ struct lttng_consumer_stream *consumer_allocate_stream( stream->output = output; stream->uid = uid; stream->gid = gid; - strncpy(stream->path_name, path_name, PATH_MAX - 1); - stream->path_name[PATH_MAX - 1] = '\0'; + stream->net_seq_idx = net_index; + stream->metadata_flag = metadata_flag; + strncpy(stream->path_name, path_name, sizeof(stream->path_name)); + stream->path_name[sizeof(stream->path_name) - 1] = '\0'; lttng_ht_node_init_ulong(&stream->node, stream->key); + lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd); switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: @@ -282,12 +347,13 @@ struct lttng_consumer_stream *consumer_allocate_stream( assert(0); goto end; } - DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d)", + DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d, net_seq_idx %d)", stream->path_name, stream->key, stream->shm_fd, stream->wait_fd, (unsigned long long) stream->mmap_len, - stream->out_fd); + stream->out_fd, + stream->net_seq_idx); end: return stream; } @@ -300,6 +366,7 @@ int consumer_add_stream(struct lttng_consumer_stream *stream) int ret = 0; struct lttng_ht_node_ulong *node; struct lttng_ht_iter iter; + struct consumer_relayd_sock_pair *relayd; pthread_mutex_lock(&consumer_data.lock); /* Steal stream identifier, for UST */ @@ -317,6 +384,17 @@ int consumer_add_stream(struct lttng_consumer_stream *stream) lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node); rcu_read_unlock(); + + /* Check and cleanup relayd */ + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd != NULL) { + /* We are about to modify the relayd refcount */ + rcu_read_lock(); + relayd->refcount++; + rcu_read_unlock(); + } + + /* Update consumer data */ consumer_data.stream_count++; consumer_data.need_update = 1; @@ -340,6 +418,153 @@ end: return ret; } +/* + * Add relayd socket to global consumer data hashtable. + */ +int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd) +{ + int ret = 0; + struct lttng_ht_node_ulong *node; + struct lttng_ht_iter iter; + + if (relayd == NULL) { + ret = -1; + goto end; + } + + rcu_read_lock(); + + lttng_ht_lookup(consumer_data.relayd_ht, + (void *)((unsigned long) relayd->net_seq_idx), &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node != NULL) { + rcu_read_unlock(); + /* Relayd already exist. Ignore the insertion */ + goto end; + } + lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node); + + rcu_read_unlock(); + +end: + return ret; +} + +/* + * Allocate and return a consumer relayd socket. + */ +struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair( + int net_seq_idx) +{ + struct consumer_relayd_sock_pair *obj = NULL; + + /* Negative net sequence index is a failure */ + if (net_seq_idx < 0) { + goto error; + } + + obj = zmalloc(sizeof(struct consumer_relayd_sock_pair)); + if (obj == NULL) { + PERROR("zmalloc relayd sock"); + goto error; + } + + obj->net_seq_idx = net_seq_idx; + obj->refcount = 0; + lttng_ht_node_init_ulong(&obj->node, obj->net_seq_idx); + pthread_mutex_init(&obj->ctrl_sock_mutex, NULL); + +error: + return obj; +} + +/* + * Find a relayd socket pair in the global consumer data. + * + * Return the object if found else NULL. + */ +struct consumer_relayd_sock_pair *consumer_find_relayd(int key) +{ + struct lttng_ht_iter iter; + struct lttng_ht_node_ulong *node; + struct consumer_relayd_sock_pair *relayd = NULL; + + /* Negative keys are lookup failures */ + if (key < 0) { + goto error; + } + + rcu_read_lock(); + + lttng_ht_lookup(consumer_data.relayd_ht, (void *)((unsigned long) key), + &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node != NULL) { + relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node); + } + + rcu_read_unlock(); + +error: + return relayd; +} + +/* + * Handle stream for relayd transmission if the stream applies for network + * streaming where the net sequence index is set. + * + * Return destination file descriptor or negative value on error. + */ +int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, + size_t data_size) +{ + int outfd = -1, ret; + struct consumer_relayd_sock_pair *relayd; + struct lttcomm_relayd_data_hdr data_hdr; + + /* Safety net */ + assert(stream); + + /* Reset data header */ + memset(&data_hdr, 0, sizeof(data_hdr)); + + /* Get relayd reference of the stream. */ + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd == NULL) { + /* Stream is either local or corrupted */ + goto error; + } + + DBG("Consumer found relayd socks with index %d", stream->net_seq_idx); + if (stream->metadata_flag) { + /* Caller MUST acquire the relayd control socket lock */ + ret = relayd_send_metadata(&relayd->control_sock, data_size); + if (ret < 0) { + goto error; + } + + /* Metadata are always sent on the control socket. */ + outfd = relayd->control_sock.fd; + } else { + /* Set header with stream information */ + data_hdr.stream_id = htobe64(stream->relayd_stream_id); + data_hdr.data_size = htobe32(data_size); + /* Other fields are zeroed previously */ + + ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr, + sizeof(data_hdr)); + if (ret < 0) { + goto error; + } + + /* Set to go on data socket */ + outfd = relayd->data_sock.fd; + } + +error: + return outfd; +} + /* * Update a stream according to what we just received. */ @@ -464,9 +689,7 @@ struct lttng_consumer_channel *consumer_allocate_channel( goto end; } DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)", - channel->key, - channel->shm_fd, - channel->wait_fd, + channel->key, channel->shm_fd, channel->wait_fd, (unsigned long long) channel->mmap_len, (unsigned long long) channel->max_sb_size); end: @@ -512,7 +735,8 @@ end: */ int consumer_update_poll_array( struct lttng_consumer_local_data *ctx, struct pollfd **pollfd, - struct lttng_consumer_stream **local_stream) + struct lttng_consumer_stream **local_stream, + struct lttng_ht *metadata_ht) { int i = 0; struct lttng_ht_iter iter; @@ -528,6 +752,10 @@ int consumer_update_poll_array( DBG("Active FD %d", stream->wait_fd); (*pollfd)[i].fd = stream->wait_fd; (*pollfd)[i].events = POLLIN | POLLPRI; + if (stream->metadata_flag && metadata_ht) { + lttng_ht_add_unique_ulong(metadata_ht, &stream->waitfd_node); + DBG("Active FD added to metadata hash table"); + } local_stream[i] = stream; i++; } @@ -584,7 +812,6 @@ void lttng_consumer_set_error_sock( /* * Set the command socket path. */ - void lttng_consumer_set_command_sock_path( struct lttng_consumer_local_data *ctx, char *sock) { @@ -654,8 +881,8 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx) } } -void lttng_consumer_sync_trace_file( - struct lttng_consumer_stream *stream, off_t orig_offset) +void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, + off_t orig_offset) { int outfd = stream->out_fd; @@ -952,6 +1179,13 @@ void *lttng_consumer_thread_poll_fds(void *data) /* local view of consumer_data.fds_count */ int nb_fd = 0; struct lttng_consumer_local_data *ctx = data; + struct lttng_ht *metadata_ht; + struct lttng_ht_iter iter; + struct lttng_ht_node_ulong *node; + struct lttng_consumer_stream *metadata_stream; + ssize_t len; + + metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); rcu_register_thread(); @@ -992,7 +1226,8 @@ void *lttng_consumer_thread_poll_fds(void *data) pthread_mutex_unlock(&consumer_data.lock); goto end; } - ret = consumer_update_poll_array(ctx, &pollfd, local_stream); + ret = consumer_update_poll_array(ctx, &pollfd, local_stream, + metadata_ht); if (ret < 0) { ERR("Error in allocating pollfd or local_outfds"); lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR); @@ -1029,10 +1264,9 @@ void *lttng_consumer_thread_poll_fds(void *data) } /* - * If the consumer_poll_pipe triggered poll go - * directly to the beginning of the loop to update the - * array. We want to prioritize array update over - * low-priority reads. + * If the consumer_poll_pipe triggered poll go directly to the + * beginning of the loop to update the array. We want to prioritize + * array update over low-priority reads. */ if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) { size_t pipe_readlen; @@ -1048,9 +1282,24 @@ void *lttng_consumer_thread_poll_fds(void *data) /* Take care of high priority channels first. */ for (i = 0; i < nb_fd; i++) { - if (pollfd[i].revents & POLLPRI) { - ssize_t len; - + /* Lookup for metadata which is the highest priority */ + lttng_ht_lookup(metadata_ht, + (void *)((unsigned long) pollfd[i].fd), &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node != NULL && + (pollfd[i].revents & (POLLIN | POLLPRI))) { + DBG("Urgent metadata read on fd %d", pollfd[i].fd); + metadata_stream = caa_container_of(node, + struct lttng_consumer_stream, waitfd_node); + high_prio = 1; + len = ctx->on_buffer_ready(metadata_stream, ctx); + /* it's ok to have an unavailable sub-buffer */ + if (len < 0 && len != -EAGAIN) { + goto end; + } else if (len > 0) { + metadata_stream->data_read = 1; + } + } else if (pollfd[i].revents & POLLPRI) { DBG("Urgent read on fd %d", pollfd[i].fd); high_prio = 1; len = ctx->on_buffer_ready(local_stream[i], ctx); @@ -1075,8 +1324,6 @@ void *lttng_consumer_thread_poll_fds(void *data) for (i = 0; i < nb_fd; i++) { if ((pollfd[i].revents & POLLIN) || local_stream[i]->hangup_flush_done) { - ssize_t len; - DBG("Normal read on fd %d", pollfd[i].fd); len = ctx->on_buffer_ready(local_stream[i], ctx); /* it's ok to have an unavailable sub-buffer */ @@ -1108,18 +1355,33 @@ void *lttng_consumer_thread_poll_fds(void *data) if ((pollfd[i].revents & POLLHUP)) { DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); if (!local_stream[i]->data_read) { + if (local_stream[i]->metadata_flag) { + iter.iter.node = &local_stream[i]->waitfd_node.node; + ret = lttng_ht_del(metadata_ht, &iter); + assert(!ret); + } consumer_del_stream(local_stream[i]); num_hup++; } } else if (pollfd[i].revents & POLLERR) { ERR("Error returned in polling fd %d.", pollfd[i].fd); if (!local_stream[i]->data_read) { + if (local_stream[i]->metadata_flag) { + iter.iter.node = &local_stream[i]->waitfd_node.node; + ret = lttng_ht_del(metadata_ht, &iter); + assert(!ret); + } consumer_del_stream(local_stream[i]); num_hup++; } } else if (pollfd[i].revents & POLLNVAL) { ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); if (!local_stream[i]->data_read) { + if (local_stream[i]->metadata_flag) { + iter.iter.node = &local_stream[i]->waitfd_node.node; + ret = lttng_ht_del(metadata_ht, &iter); + assert(!ret); + } consumer_del_stream(local_stream[i]); num_hup++; } @@ -1303,5 +1565,5 @@ void lttng_consumer_init(void) { consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); } -