X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=70cde9c1028d978d7deb1865ff84fe9455988a56;hp=0e73bc9b8aeda6766953fbd381fcc05c370f795c;hb=228b5bf7fa0bd5cf9e709717e578e28bccd57a73;hpb=74251bb8269c4609c776e28fcd137628225e6ca7 diff --git a/src/common/consumer.c b/src/common/consumer.c index 0e73bc9b8..70cde9c10 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -127,6 +127,12 @@ void consumer_steal_stream_key(int key, struct lttng_ht *ht) rcu_read_unlock(); } +/* + * Return a channel object for the given key. + * + * RCU read side lock MUST be acquired before calling this function and + * protects the channel ptr. + */ static struct lttng_consumer_channel *consumer_find_channel(int key) { struct lttng_ht_iter iter; @@ -138,8 +144,6 @@ static struct lttng_consumer_channel *consumer_find_channel(int key) return NULL; } - rcu_read_lock(); - lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key), &iter); node = lttng_ht_iter_get_node_ulong(&iter); @@ -147,8 +151,6 @@ static struct lttng_consumer_channel *consumer_find_channel(int key) channel = caa_container_of(node, struct lttng_consumer_channel, node); } - rcu_read_unlock(); - return channel; } @@ -232,6 +234,27 @@ static void destroy_relayd(struct consumer_relayd_sock_pair *relayd) call_rcu(&relayd->node.head, consumer_rcu_free_relayd); } +/* + * Iterate over the relayd hash table and destroy each element. Finally, + * destroy the whole hash table. + */ +static void cleanup_relayd_ht(void) +{ + struct lttng_ht_iter iter; + struct consumer_relayd_sock_pair *relayd; + + rcu_read_lock(); + + cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd, + node.node) { + destroy_relayd(relayd); + } + + lttng_ht_destroy(consumer_data.relayd_ht); + + rcu_read_unlock(); +} + /* * Update the end point status of all streams having the given network sequence * index (relayd index). @@ -475,6 +498,8 @@ struct lttng_consumer_stream *consumer_allocate_stream( goto end; } + rcu_read_lock(); + /* * Get stream's channel reference. Needed when adding the stream to the * global hash table. @@ -531,9 +556,12 @@ struct lttng_consumer_stream *consumer_allocate_stream( stream->path_name, stream->key, stream->shm_fd, stream->wait_fd, (unsigned long long) stream->mmap_len, stream->out_fd, stream->net_seq_idx, stream->session_id); + + rcu_read_unlock(); return stream; error: + rcu_read_unlock(); free(stream); end: return NULL; @@ -757,6 +785,8 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) int ret; struct lttng_ht_iter iter; + DBG("Consumer delete channel key %d", channel->key); + pthread_mutex_lock(&consumer_data.lock); switch (consumer_data.type) { @@ -910,7 +940,7 @@ static int consumer_update_poll_array( * changed where this function will be called back again. */ if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM || - stream->endpoint_status) { + stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) { continue; } DBG("Active FD %d", stream->wait_fd); @@ -994,8 +1024,8 @@ int lttng_consumer_send_error( } /* - * Close all the tracefiles and stream fds, should be called when all instances - * are destroyed. + * Close all the tracefiles and stream fds and MUST be called when all + * instances are destroyed i.e. when all threads were joined and are ended. */ void lttng_consumer_cleanup(void) { @@ -1014,6 +1044,15 @@ void lttng_consumer_cleanup(void) rcu_read_unlock(); lttng_ht_destroy(consumer_data.channel_ht); + + cleanup_relayd_ht(); + + /* + * This HT contains streams that are freed by either the metadata thread or + * the data thread so we do *nothing* on the hash table and simply destroy + * it. + */ + lttng_ht_destroy(consumer_data.stream_list_ht); } /* @@ -1260,6 +1299,8 @@ end: * core function for writing trace buffers to either the local filesystem or * the network. * + * It must be called with the stream lock held. + * * Careful review MUST be put if any changes occur! * * Returns the number of bytes written @@ -1412,6 +1453,8 @@ end: /* * Splice the data from the ring buffer to the tracefile. * + * It must be called with the stream lock held. + * * Returns the number of bytes spliced. */ ssize_t lttng_consumer_on_read_subbuffer_splice( @@ -1684,7 +1727,6 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, */ static void destroy_data_stream_ht(struct lttng_ht *ht) { - int ret; struct lttng_ht_iter iter; struct lttng_consumer_stream *stream; @@ -1694,10 +1736,11 @@ static void destroy_data_stream_ht(struct lttng_ht *ht) rcu_read_lock(); cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { - ret = lttng_ht_del(ht, &iter); - assert(!ret); - - call_rcu(&stream->node.head, consumer_free_stream); + /* + * Ignore return value since we are currently cleaning up so any error + * can't be handled. + */ + (void) consumer_del_stream(stream, ht); } rcu_read_unlock(); @@ -1711,7 +1754,6 @@ static void destroy_data_stream_ht(struct lttng_ht *ht) */ static void destroy_stream_ht(struct lttng_ht *ht) { - int ret; struct lttng_ht_iter iter; struct lttng_consumer_stream *stream; @@ -1721,10 +1763,11 @@ static void destroy_stream_ht(struct lttng_ht *ht) rcu_read_lock(); cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { - ret = lttng_ht_del(ht, &iter); - assert(!ret); - - call_rcu(&stream->node.head, consumer_free_stream); + /* + * Ignore return value since we are currently cleaning up so any error + * can't be handled. + */ + (void) consumer_del_metadata_stream(stream, ht); } rcu_read_unlock(); @@ -1943,7 +1986,7 @@ static void validate_endpoint_status_data_stream(void) rcu_read_lock(); cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) { /* Validate delete flag of the stream */ - if (stream->endpoint_status != CONSUMER_ENDPOINT_INACTIVE) { + if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) { continue; } /* Delete it right now */ @@ -1968,7 +2011,7 @@ static void validate_endpoint_status_metadata_stream( rcu_read_lock(); cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) { /* Validate delete flag of the stream */ - if (!stream->endpoint_status) { + if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) { continue; } /* @@ -2000,6 +2043,12 @@ void *consumer_thread_metadata_poll(void *data) rcu_register_thread(); + metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + if (!metadata_ht) { + /* ENOMEM at this point. Better to bail out. */ + goto error; + } + DBG("Thread metadata poll started"); /* Size is set to 1 for the consumer_metadata pipe */ @@ -2166,9 +2215,7 @@ end: DBG("Metadata poll thread exiting"); lttng_poll_clean(&events); - if (metadata_ht) { - destroy_stream_ht(metadata_ht); - } + destroy_stream_ht(metadata_ht); rcu_unregister_thread(); return NULL; @@ -2193,6 +2240,7 @@ void *consumer_thread_data_poll(void *data) data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); if (data_ht == NULL) { + /* ENOMEM at this point. Better to bail out. */ goto end; } @@ -2429,9 +2477,7 @@ end: PERROR("close data pipe"); } - if (data_ht) { - destroy_data_stream_ht(data_ht); - } + destroy_data_stream_ht(data_ht); rcu_unregister_thread(); return NULL; @@ -2443,7 +2489,7 @@ end: */ void *consumer_thread_sessiond_poll(void *data) { - int sock, client_socket, ret; + int sock = -1, client_socket, ret; /* * structure to poll for incoming data on communication socket avoids * making blocking sockets. @@ -2503,6 +2549,13 @@ void *consumer_thread_sessiond_poll(void *data) goto end; } + /* This socket is not useful anymore. */ + ret = close(client_socket); + if (ret < 0) { + PERROR("close client_socket"); + } + client_socket = -1; + /* update the polling structure to poll on the established socket */ consumer_sockpoll[1].fd = sock; consumer_sockpoll[1].events = POLLIN | POLLPRI; @@ -2546,6 +2599,20 @@ end: */ notify_thread_pipe(ctx->consumer_data_pipe[1]); + /* Cleaning up possibly open sockets. */ + if (sock >= 0) { + ret = close(sock); + if (ret < 0) { + PERROR("close sock sessiond poll"); + } + } + if (client_socket >= 0) { + ret = close(sock); + if (ret < 0) { + PERROR("close client_socket sessiond poll"); + } + } + rcu_unregister_thread(); return NULL; } @@ -2599,11 +2666,6 @@ void lttng_consumer_init(void) consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); - - metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); - assert(metadata_ht); - data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); - assert(data_ht); } /* @@ -2616,7 +2678,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock) { - int fd, ret = -1; + int fd = -1, ret = -1; struct consumer_relayd_sock_pair *relayd; DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx); @@ -2643,6 +2705,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, if (ret != sizeof(fd)) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD); ret = -1; + fd = -1; /* Just in case it gets set with an invalid value. */ goto error; } @@ -2652,14 +2715,15 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, /* Copy received lttcomm socket */ lttcomm_copy_sock(&relayd->control_sock, relayd_sock); ret = lttcomm_create_sock(&relayd->control_sock); - if (ret < 0) { - goto error; + /* Immediately try to close the created socket if valid. */ + if (relayd->control_sock.fd >= 0) { + if (close(relayd->control_sock.fd)) { + PERROR("close relayd control socket"); + } } - - /* Close the created socket fd which is useless */ - ret = close(relayd->control_sock.fd); + /* Handle create_sock error. */ if (ret < 0) { - PERROR("close relayd control socket"); + goto error; } /* Assign new file descriptor */ @@ -2669,14 +2733,15 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, /* Copy received lttcomm socket */ lttcomm_copy_sock(&relayd->data_sock, relayd_sock); ret = lttcomm_create_sock(&relayd->data_sock); - if (ret < 0) { - goto error; + /* Immediately try to close the created socket if valid. */ + if (relayd->data_sock.fd >= 0) { + if (close(relayd->data_sock.fd)) { + PERROR("close relayd data socket"); + } } - - /* Close the created socket fd which is useless */ - ret = close(relayd->data_sock.fd); + /* Handle create_sock error. */ if (ret < 0) { - PERROR("close relayd control socket"); + goto error; } /* Assign new file descriptor */ @@ -2698,9 +2763,15 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, add_relayd(relayd); /* All good! */ - ret = 0; + return 0; error: + /* Close received socket if valid. */ + if (fd >= 0) { + if (close(fd)) { + PERROR("close received socket"); + } + } return ret; }