X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=27dfe32b8592487e6c5e881604a650085240f536;hp=c561b9bd2c935e5e275972aa8c8b9965b950671b;hb=4d86847e8786d4902dceeb1dff91791112d2c396;hpb=04bb2b6422d74dd96498fbdda5fce5cc42cb2f4a diff --git a/src/common/consumer.c b/src/common/consumer.c index c561b9bd2..27dfe32b8 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -234,6 +234,27 @@ static void destroy_relayd(struct consumer_relayd_sock_pair *relayd) call_rcu(&relayd->node.head, consumer_rcu_free_relayd); } +/* + * Iterate over the relayd hash table and destroy each element. Finally, + * destroy the whole hash table. + */ +static void cleanup_relayd_ht(void) +{ + struct lttng_ht_iter iter; + struct consumer_relayd_sock_pair *relayd; + + rcu_read_lock(); + + cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd, + node.node) { + destroy_relayd(relayd); + } + + lttng_ht_destroy(consumer_data.relayd_ht); + + rcu_read_unlock(); +} + /* * Update the end point status of all streams having the given network sequence * index (relayd index). @@ -727,6 +748,13 @@ static int write_relayd_stream_header(struct lttng_consumer_stream *stream, data_hdr.stream_id = htobe64(stream->relayd_stream_id); data_hdr.data_size = htobe32(data_size); data_hdr.padding_size = htobe32(padding); + /* + * Note that net_seq_num below is assigned with the *current* value of + * next_net_seq_num and only after that the next_net_seq_num will be + * increment. This is why when issuing a command on the relayd using + * this next value, 1 should always be substracted in order to compare + * the last seen sequence number on the relayd side to the last sent. + */ data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++); /* Other fields are zeroed previously */ @@ -1003,8 +1031,8 @@ int lttng_consumer_send_error( } /* - * Close all the tracefiles and stream fds, should be called when all instances - * are destroyed. + * Close all the tracefiles and stream fds and MUST be called when all + * instances are destroyed i.e. when all threads were joined and are ended. */ void lttng_consumer_cleanup(void) { @@ -1023,6 +1051,15 @@ void lttng_consumer_cleanup(void) rcu_read_unlock(); lttng_ht_destroy(consumer_data.channel_ht); + + cleanup_relayd_ht(); + + /* + * This HT contains streams that are freed by either the metadata thread or + * the data thread so we do *nothing* on the hash table and simply destroy + * it. + */ + lttng_ht_destroy(consumer_data.stream_list_ht); } /* @@ -1035,7 +1072,7 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx) do { ret = write(ctx->consumer_should_quit[1], "4", 1); } while (ret < 0 && errno == EINTR); - if (ret < 0) { + if (ret < 0 || ret != 1) { PERROR("write consumer quit"); } @@ -1253,8 +1290,22 @@ static int write_relayd_metadata_id(int fd, do { ret = write(fd, (void *) &hdr, sizeof(hdr)); } while (ret < 0 && errno == EINTR); - if (ret < 0) { - PERROR("write metadata stream id"); + if (ret < 0 || ret != sizeof(hdr)) { + /* + * This error means that the fd's end is closed so ignore the perror + * not to clubber the error output since this can happen in a normal + * code path. + */ + if (errno != EPIPE) { + PERROR("write metadata stream id"); + } + DBG3("Consumer failed to write relayd metadata id (errno: %d)", errno); + /* + * Set ret to a negative value because if ret != sizeof(hdr), we don't + * handle writting the missing part so report that as an error and + * don't lie to the caller. + */ + ret = -1; goto end; } DBG("Metadata stream id %" PRIu64 " with padding %lu written before data", @@ -1371,7 +1422,13 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( } while (ret < 0 && errno == EINTR); DBG("Consumer mmap write() ret %zd (len %lu)", ret, len); if (ret < 0) { - PERROR("Error in file write"); + /* + * This is possible if the fd is closed on the other side (outfd) + * or any write problem. It can be verbose a bit for a normal + * execution if for instance the relayd is stopped abruptly. This + * can happen so set this to a DBG statement. + */ + DBG("Error in file write mmap"); if (written == 0) { written = ret; } @@ -2037,17 +2094,13 @@ void *consumer_thread_metadata_poll(void *data) DBG("Metadata main loop started"); while (1) { - lttng_poll_reset(&events); - - nb_fd = LTTNG_POLL_GETNB(&events); - /* Only the metadata pipe is set */ - if (nb_fd == 0 && consumer_quit == 1) { + if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) { goto end; } restart: - DBG("Metadata poll wait with %d fd(s)", nb_fd); + DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events)); ret = lttng_poll_wait(&events, -1); DBG("Metadata event catched in thread"); if (ret < 0) { @@ -2058,6 +2111,8 @@ restart: goto error; } + nb_fd = ret; + /* From here, the event is a metadata wait fd */ for (i = 0; i < nb_fd; i++) { revents = LTTNG_POLL_GETEV(&events, i); @@ -2302,6 +2357,11 @@ void *consumer_thread_data_poll(void *data) pipe_readlen = read(ctx->consumer_data_pipe[0], &new_stream, sizeof(new_stream)); } while (pipe_readlen == -1 && errno == EINTR); + if (pipe_readlen < 0) { + PERROR("read consumer data pipe"); + /* Continue so we can at least handle the current stream(s). */ + continue; + } /* * If the stream is NULL, just ignore it. It's also possible that @@ -2509,7 +2569,7 @@ void *consumer_thread_sessiond_poll(void *data) /* Blocking call, waiting for transmission */ sock = lttcomm_accept_unix_sock(client_socket); - if (sock <= 0) { + if (sock < 0) { WARN("On accept"); goto end; } @@ -2646,13 +2706,22 @@ void lttng_consumer_init(void) */ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, struct lttng_consumer_local_data *ctx, int sock, - struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock) + struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock, + unsigned int sessiond_id) { - int fd = -1, ret = -1; + int fd = -1, ret = -1, relayd_created = 0; + enum lttng_error_code ret_code = LTTNG_OK; struct consumer_relayd_sock_pair *relayd; DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx); + /* First send a status message before receiving the fds. */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto error; + } + /* Get relayd reference if exists. */ relayd = consumer_find_relayd(net_seq_idx); if (relayd == NULL) { @@ -2660,8 +2729,11 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, relayd = consumer_allocate_relayd_sock_pair(net_seq_idx); if (relayd == NULL) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); + ret = -1; goto error; } + relayd->sessiond_session_id = (uint64_t) sessiond_id; + relayd_created = 1; } /* Poll on consumer socket. */ @@ -2679,6 +2751,13 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, goto error; } + /* We have the fds without error. Send status back. */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto error; + } + /* Copy socket information and received FD */ switch (sock_type) { case LTTNG_STREAM_CONTROL: @@ -2698,6 +2777,23 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, /* Assign new file descriptor */ relayd->control_sock.fd = fd; + + /* + * Create a session on the relayd and store the returned id. Lock the + * control socket mutex if the relayd was NOT created before. + */ + if (!relayd_created) { + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + } + ret = relayd_create_session(&relayd->control_sock, + &relayd->relayd_session_id); + if (!relayd_created) { + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + } + if (ret < 0) { + goto error; + } + break; case LTTNG_STREAM_DATA: /* Copy received lttcomm socket */ @@ -2719,6 +2815,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, break; default: ERR("Unknown relayd socket type (%d)", sock_type); + ret = -1; goto error; } @@ -2742,6 +2839,14 @@ error: PERROR("close received socket"); } } + + if (relayd_created) { + /* We just want to cleanup. Ignore ret value. */ + (void) relayd_close(&relayd->control_sock); + (void) relayd_close(&relayd->data_sock); + free(relayd); + } + return ret; } @@ -2773,6 +2878,36 @@ end: return ret; } +/* + * Search for a relayd associated to the session id and return the reference. + * + * A rcu read side lock MUST be acquire before calling this function and locked + * until the relayd object is no longer necessary. + */ +static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id) +{ + struct lttng_ht_iter iter; + struct consumer_relayd_sock_pair *relayd = NULL; + + /* Iterate over all relayd since they are indexed by net_seq_idx. */ + cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd, + node.node) { + /* + * Check by sessiond id which is unique here where the relayd session + * id might not be when having multiple relayd. + */ + if (relayd->sessiond_session_id == id) { + /* Found the relayd. There can be only one per id. */ + goto found; + } + } + + return NULL; + +found: + return relayd; +} + /* * Check if for a given session id there is still data needed to be extract * from the buffers. @@ -2785,7 +2920,7 @@ int consumer_data_pending(uint64_t id) struct lttng_ht_iter iter; struct lttng_ht *ht; struct lttng_consumer_stream *stream; - struct consumer_relayd_sock_pair *relayd; + struct consumer_relayd_sock_pair *relayd = NULL; int (*data_pending)(struct lttng_consumer_stream *); DBG("Consumer data pending command on session id %" PRIu64, id); @@ -2809,6 +2944,19 @@ int consumer_data_pending(uint64_t id) /* Ease our life a bit */ ht = consumer_data.stream_list_ht; + relayd = find_relayd_by_session_id(id); + if (relayd) { + /* Send init command for data pending. */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_begin_data_pending(&relayd->control_sock, + relayd->relayd_session_id); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0) { + /* Communication error thus the relayd so no data pending. */ + goto data_not_pending; + } + } + cds_lfht_for_each_entry_duplicate(ht->ht, ht->hash_fct((void *)((unsigned long) id), lttng_ht_seed), ht->match_fct, (void *)((unsigned long) id), @@ -2816,7 +2964,7 @@ int consumer_data_pending(uint64_t id) /* If this call fails, the stream is being used hence data pending. */ ret = stream_try_lock(stream); if (!ret) { - goto data_not_pending; + goto data_pending; } /* @@ -2831,55 +2979,75 @@ int consumer_data_pending(uint64_t id) ret = data_pending(stream); if (ret == 1) { pthread_mutex_unlock(&stream->lock); - goto data_not_pending; + goto data_pending; } } /* Relayd check */ - if (stream->net_seq_idx != -1) { - relayd = consumer_find_relayd(stream->net_seq_idx); - if (!relayd) { - /* - * At this point, if the relayd object is not available for the - * given stream, it is because the relayd is being cleaned up - * so every stream associated with it (for a session id value) - * are or will be marked for deletion hence no data pending. - */ - pthread_mutex_unlock(&stream->lock); - goto data_not_pending; - } - + if (relayd) { pthread_mutex_lock(&relayd->ctrl_sock_mutex); if (stream->metadata_flag) { - ret = relayd_quiescent_control(&relayd->control_sock); + ret = relayd_quiescent_control(&relayd->control_sock, + stream->relayd_stream_id); } else { ret = relayd_data_pending(&relayd->control_sock, - stream->relayd_stream_id, stream->next_net_seq_num); + stream->relayd_stream_id, + stream->next_net_seq_num - 1); } pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret == 1) { pthread_mutex_unlock(&stream->lock); - goto data_not_pending; + goto data_pending; } } pthread_mutex_unlock(&stream->lock); } + if (relayd) { + unsigned int is_data_inflight = 0; + + /* Send init command for data pending. */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_end_data_pending(&relayd->control_sock, + relayd->relayd_session_id, &is_data_inflight); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0) { + goto data_not_pending; + } + if (is_data_inflight) { + goto data_pending; + } + } + /* - * Finding _no_ node in the hash table means that the stream(s) have been - * removed thus data is guaranteed to be available for analysis from the - * trace files. This is *only* true for local consumer and not network - * streaming. + * Finding _no_ node in the hash table and no inflight data means that the + * stream(s) have been removed thus data is guaranteed to be available for + * analysis from the trace files. */ +data_not_pending: /* Data is available to be read by a viewer. */ pthread_mutex_unlock(&consumer_data.lock); rcu_read_unlock(); return 0; -data_not_pending: +data_pending: /* Data is still being extracted from buffers. */ pthread_mutex_unlock(&consumer_data.lock); rcu_read_unlock(); return 1; } + +/* + * Send a ret code status message to the sessiond daemon. + * + * Return the sendmsg() return value. + */ +int consumer_send_status_msg(int sock, int ret_code) +{ + struct lttcomm_consumer_status_msg msg; + + msg.ret_code = ret_code; + + return lttcomm_send_unix_sock(sock, &msg, sizeof(msg)); +}