X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=5766860a89823bf40c4ebd98448fc233e54c06f2;hp=51f861e23695c642a56a943aa272fb24c89c7a5d;hb=4028eeb991f5fc7e430ce09036f57bfbaa59a664;hpb=d56db448a421370a0f33d6737cb366488d134b26 diff --git a/src/common/consumer.c b/src/common/consumer.c index 51f861e23..5766860a8 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -917,7 +917,7 @@ static int consumer_update_poll_array( * changed where this function will be called back again. */ if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM || - stream->endpoint_status) { + stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) { continue; } DBG("Active FD %d", stream->wait_fd); @@ -1267,6 +1267,8 @@ end: * core function for writing trace buffers to either the local filesystem or * the network. * + * It must be called with the stream lock held. + * * Careful review MUST be put if any changes occur! * * Returns the number of bytes written @@ -1419,6 +1421,8 @@ end: /* * Splice the data from the ring buffer to the tracefile. * + * It must be called with the stream lock held. + * * Returns the number of bytes spliced. */ ssize_t lttng_consumer_on_read_subbuffer_splice( @@ -1950,7 +1954,7 @@ static void validate_endpoint_status_data_stream(void) rcu_read_lock(); cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) { /* Validate delete flag of the stream */ - if (stream->endpoint_status != CONSUMER_ENDPOINT_INACTIVE) { + if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) { continue; } /* Delete it right now */ @@ -1975,7 +1979,7 @@ static void validate_endpoint_status_metadata_stream( rcu_read_lock(); cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) { /* Validate delete flag of the stream */ - if (!stream->endpoint_status) { + if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) { continue; } /* @@ -2450,7 +2454,7 @@ end: */ void *consumer_thread_sessiond_poll(void *data) { - int sock, client_socket, ret; + int sock = -1, client_socket, ret; /* * structure to poll for incoming data on communication socket avoids * making blocking sockets. @@ -2510,6 +2514,13 @@ void *consumer_thread_sessiond_poll(void *data) goto end; } + /* This socket is not useful anymore. */ + ret = close(client_socket); + if (ret < 0) { + PERROR("close client_socket"); + } + client_socket = -1; + /* update the polling structure to poll on the established socket */ consumer_sockpoll[1].fd = sock; consumer_sockpoll[1].events = POLLIN | POLLPRI; @@ -2553,6 +2564,20 @@ end: */ notify_thread_pipe(ctx->consumer_data_pipe[1]); + /* Cleaning up possibly open sockets. */ + if (sock >= 0) { + ret = close(sock); + if (ret < 0) { + PERROR("close sock sessiond poll"); + } + } + if (client_socket >= 0) { + ret = close(sock); + if (ret < 0) { + PERROR("close client_socket sessiond poll"); + } + } + rcu_unregister_thread(); return NULL; } @@ -2623,7 +2648,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock) { - int fd, ret = -1; + int fd = -1, ret = -1; struct consumer_relayd_sock_pair *relayd; DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx); @@ -2650,6 +2675,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, if (ret != sizeof(fd)) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD); ret = -1; + fd = -1; /* Just in case it gets set with an invalid value. */ goto error; } @@ -2659,14 +2685,15 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, /* Copy received lttcomm socket */ lttcomm_copy_sock(&relayd->control_sock, relayd_sock); ret = lttcomm_create_sock(&relayd->control_sock); - if (ret < 0) { - goto error; + /* Immediately try to close the created socket if valid. */ + if (relayd->control_sock.fd >= 0) { + if (close(relayd->control_sock.fd)) { + PERROR("close relayd control socket"); + } } - - /* Close the created socket fd which is useless */ - ret = close(relayd->control_sock.fd); + /* Handle create_sock error. */ if (ret < 0) { - PERROR("close relayd control socket"); + goto error; } /* Assign new file descriptor */ @@ -2676,14 +2703,15 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, /* Copy received lttcomm socket */ lttcomm_copy_sock(&relayd->data_sock, relayd_sock); ret = lttcomm_create_sock(&relayd->data_sock); - if (ret < 0) { - goto error; + /* Immediately try to close the created socket if valid. */ + if (relayd->data_sock.fd >= 0) { + if (close(relayd->data_sock.fd)) { + PERROR("close relayd data socket"); + } } - - /* Close the created socket fd which is useless */ - ret = close(relayd->data_sock.fd); + /* Handle create_sock error. */ if (ret < 0) { - PERROR("close relayd control socket"); + goto error; } /* Assign new file descriptor */ @@ -2705,9 +2733,15 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, add_relayd(relayd); /* All good! */ - ret = 0; + return 0; error: + /* Close received socket if valid. */ + if (fd >= 0) { + if (close(fd)) { + PERROR("close received socket"); + } + } return ret; }