X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=70cde9c1028d978d7deb1865ff84fe9455988a56;hp=51f861e23695c642a56a943aa272fb24c89c7a5d;hb=228b5bf7fa0bd5cf9e709717e578e28bccd57a73;hpb=d56db448a421370a0f33d6737cb366488d134b26 diff --git a/src/common/consumer.c b/src/common/consumer.c index 51f861e23..70cde9c10 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -234,6 +234,27 @@ static void destroy_relayd(struct consumer_relayd_sock_pair *relayd) call_rcu(&relayd->node.head, consumer_rcu_free_relayd); } +/* + * Iterate over the relayd hash table and destroy each element. Finally, + * destroy the whole hash table. + */ +static void cleanup_relayd_ht(void) +{ + struct lttng_ht_iter iter; + struct consumer_relayd_sock_pair *relayd; + + rcu_read_lock(); + + cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd, + node.node) { + destroy_relayd(relayd); + } + + lttng_ht_destroy(consumer_data.relayd_ht); + + rcu_read_unlock(); +} + /* * Update the end point status of all streams having the given network sequence * index (relayd index). @@ -764,6 +785,8 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) int ret; struct lttng_ht_iter iter; + DBG("Consumer delete channel key %d", channel->key); + pthread_mutex_lock(&consumer_data.lock); switch (consumer_data.type) { @@ -917,7 +940,7 @@ static int consumer_update_poll_array( * changed where this function will be called back again. */ if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM || - stream->endpoint_status) { + stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) { continue; } DBG("Active FD %d", stream->wait_fd); @@ -1001,8 +1024,8 @@ int lttng_consumer_send_error( } /* - * Close all the tracefiles and stream fds, should be called when all instances - * are destroyed. + * Close all the tracefiles and stream fds and MUST be called when all + * instances are destroyed i.e. when all threads were joined and are ended. */ void lttng_consumer_cleanup(void) { @@ -1021,6 +1044,15 @@ void lttng_consumer_cleanup(void) rcu_read_unlock(); lttng_ht_destroy(consumer_data.channel_ht); + + cleanup_relayd_ht(); + + /* + * This HT contains streams that are freed by either the metadata thread or + * the data thread so we do *nothing* on the hash table and simply destroy + * it. + */ + lttng_ht_destroy(consumer_data.stream_list_ht); } /* @@ -1267,6 +1299,8 @@ end: * core function for writing trace buffers to either the local filesystem or * the network. * + * It must be called with the stream lock held. + * * Careful review MUST be put if any changes occur! * * Returns the number of bytes written @@ -1419,6 +1453,8 @@ end: /* * Splice the data from the ring buffer to the tracefile. * + * It must be called with the stream lock held. + * * Returns the number of bytes spliced. */ ssize_t lttng_consumer_on_read_subbuffer_splice( @@ -1691,7 +1727,6 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, */ static void destroy_data_stream_ht(struct lttng_ht *ht) { - int ret; struct lttng_ht_iter iter; struct lttng_consumer_stream *stream; @@ -1701,10 +1736,11 @@ static void destroy_data_stream_ht(struct lttng_ht *ht) rcu_read_lock(); cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { - ret = lttng_ht_del(ht, &iter); - assert(!ret); - - call_rcu(&stream->node.head, consumer_free_stream); + /* + * Ignore return value since we are currently cleaning up so any error + * can't be handled. + */ + (void) consumer_del_stream(stream, ht); } rcu_read_unlock(); @@ -1718,7 +1754,6 @@ static void destroy_data_stream_ht(struct lttng_ht *ht) */ static void destroy_stream_ht(struct lttng_ht *ht) { - int ret; struct lttng_ht_iter iter; struct lttng_consumer_stream *stream; @@ -1728,10 +1763,11 @@ static void destroy_stream_ht(struct lttng_ht *ht) rcu_read_lock(); cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { - ret = lttng_ht_del(ht, &iter); - assert(!ret); - - call_rcu(&stream->node.head, consumer_free_stream); + /* + * Ignore return value since we are currently cleaning up so any error + * can't be handled. + */ + (void) consumer_del_metadata_stream(stream, ht); } rcu_read_unlock(); @@ -1950,7 +1986,7 @@ static void validate_endpoint_status_data_stream(void) rcu_read_lock(); cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) { /* Validate delete flag of the stream */ - if (stream->endpoint_status != CONSUMER_ENDPOINT_INACTIVE) { + if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) { continue; } /* Delete it right now */ @@ -1975,7 +2011,7 @@ static void validate_endpoint_status_metadata_stream( rcu_read_lock(); cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) { /* Validate delete flag of the stream */ - if (!stream->endpoint_status) { + if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) { continue; } /* @@ -2007,6 +2043,12 @@ void *consumer_thread_metadata_poll(void *data) rcu_register_thread(); + metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + if (!metadata_ht) { + /* ENOMEM at this point. Better to bail out. */ + goto error; + } + DBG("Thread metadata poll started"); /* Size is set to 1 for the consumer_metadata pipe */ @@ -2173,9 +2215,7 @@ end: DBG("Metadata poll thread exiting"); lttng_poll_clean(&events); - if (metadata_ht) { - destroy_stream_ht(metadata_ht); - } + destroy_stream_ht(metadata_ht); rcu_unregister_thread(); return NULL; @@ -2200,6 +2240,7 @@ void *consumer_thread_data_poll(void *data) data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); if (data_ht == NULL) { + /* ENOMEM at this point. Better to bail out. */ goto end; } @@ -2436,9 +2477,7 @@ end: PERROR("close data pipe"); } - if (data_ht) { - destroy_data_stream_ht(data_ht); - } + destroy_data_stream_ht(data_ht); rcu_unregister_thread(); return NULL; @@ -2450,7 +2489,7 @@ end: */ void *consumer_thread_sessiond_poll(void *data) { - int sock, client_socket, ret; + int sock = -1, client_socket, ret; /* * structure to poll for incoming data on communication socket avoids * making blocking sockets. @@ -2510,6 +2549,13 @@ void *consumer_thread_sessiond_poll(void *data) goto end; } + /* This socket is not useful anymore. */ + ret = close(client_socket); + if (ret < 0) { + PERROR("close client_socket"); + } + client_socket = -1; + /* update the polling structure to poll on the established socket */ consumer_sockpoll[1].fd = sock; consumer_sockpoll[1].events = POLLIN | POLLPRI; @@ -2553,6 +2599,20 @@ end: */ notify_thread_pipe(ctx->consumer_data_pipe[1]); + /* Cleaning up possibly open sockets. */ + if (sock >= 0) { + ret = close(sock); + if (ret < 0) { + PERROR("close sock sessiond poll"); + } + } + if (client_socket >= 0) { + ret = close(sock); + if (ret < 0) { + PERROR("close client_socket sessiond poll"); + } + } + rcu_unregister_thread(); return NULL; } @@ -2606,11 +2666,6 @@ void lttng_consumer_init(void) consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); - - metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); - assert(metadata_ht); - data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); - assert(data_ht); } /* @@ -2623,7 +2678,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock) { - int fd, ret = -1; + int fd = -1, ret = -1; struct consumer_relayd_sock_pair *relayd; DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx); @@ -2650,6 +2705,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, if (ret != sizeof(fd)) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD); ret = -1; + fd = -1; /* Just in case it gets set with an invalid value. */ goto error; } @@ -2659,14 +2715,15 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, /* Copy received lttcomm socket */ lttcomm_copy_sock(&relayd->control_sock, relayd_sock); ret = lttcomm_create_sock(&relayd->control_sock); - if (ret < 0) { - goto error; + /* Immediately try to close the created socket if valid. */ + if (relayd->control_sock.fd >= 0) { + if (close(relayd->control_sock.fd)) { + PERROR("close relayd control socket"); + } } - - /* Close the created socket fd which is useless */ - ret = close(relayd->control_sock.fd); + /* Handle create_sock error. */ if (ret < 0) { - PERROR("close relayd control socket"); + goto error; } /* Assign new file descriptor */ @@ -2676,14 +2733,15 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, /* Copy received lttcomm socket */ lttcomm_copy_sock(&relayd->data_sock, relayd_sock); ret = lttcomm_create_sock(&relayd->data_sock); - if (ret < 0) { - goto error; + /* Immediately try to close the created socket if valid. */ + if (relayd->data_sock.fd >= 0) { + if (close(relayd->data_sock.fd)) { + PERROR("close relayd data socket"); + } } - - /* Close the created socket fd which is useless */ - ret = close(relayd->data_sock.fd); + /* Handle create_sock error. */ if (ret < 0) { - PERROR("close relayd control socket"); + goto error; } /* Assign new file descriptor */ @@ -2705,9 +2763,15 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, add_relayd(relayd); /* All good! */ - ret = 0; + return 0; error: + /* Close received socket if valid. */ + if (fd >= 0) { + if (close(fd)) { + PERROR("close received socket"); + } + } return ret; }