X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=27dfe32b8592487e6c5e881604a650085240f536;hp=4f83639d5d8ded41abaa03cb0c649f907bd0df1a;hb=4d86847e8786d4902dceeb1dff91791112d2c396;hpb=cd2b09ed75c28ef5e82698972582c99e6b423134 diff --git a/src/common/consumer.c b/src/common/consumer.c index 4f83639d5..27dfe32b8 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -63,21 +63,6 @@ volatile int consumer_quit; static struct lttng_ht *metadata_ht; static struct lttng_ht *data_ht; -/* - * This hash table contains the mapping between the session id of the sessiond - * and the relayd session id. Element of the ht are indexed by sessiond session - * id. - * - * Node can be added when a relayd communication is opened in the sessiond - * thread. - * - * Note that a session id of the session daemon is unique to a tracing session - * and not to a domain session. However, a domain session has one consumer - * which forces the 1-1 mapping between a consumer and a domain session (ex: - * UST). This means that we can't have duplicate in this ht. - */ -static struct lttng_ht *relayd_session_id_ht; - /* * Notify a thread pipe to poll back again. This usually means that some global * state has changed so we just send back the thread in a poll wait call. @@ -231,7 +216,6 @@ static void destroy_relayd(struct consumer_relayd_sock_pair *relayd) { int ret; struct lttng_ht_iter iter; - struct lttng_ht_node_ulong *node; if (relayd == NULL) { return; @@ -239,22 +223,6 @@ static void destroy_relayd(struct consumer_relayd_sock_pair *relayd) DBG("Consumer destroy and close relayd socket pair"); - /* Loockup for a relayd node in the session id map hash table. */ - lttng_ht_lookup(relayd_session_id_ht, - (void *)((unsigned long) relayd->sessiond_session_id), &iter); - node = lttng_ht_iter_get_node_ulong(&iter); - if (node != NULL) { - /* We assume the relayd is being or is destroyed */ - return; - } - - /* - * Try to delete it from the relayd session id ht. The return value is of - * no importance since either way we are going to try to delete the relayd - * from the global relayd_ht. - */ - lttng_ht_del(relayd_session_id_ht, &iter); - iter.iter.node = &relayd->node.node; ret = lttng_ht_del(consumer_data.relayd_ht, &iter); if (ret != 0) { @@ -283,8 +251,6 @@ static void cleanup_relayd_ht(void) } lttng_ht_destroy(consumer_data.relayd_ht); - /* The destroy_relayd call makes sure that this ht is empty here. */ - lttng_ht_destroy(relayd_session_id_ht); rcu_read_unlock(); } @@ -782,6 +748,13 @@ static int write_relayd_stream_header(struct lttng_consumer_stream *stream, data_hdr.stream_id = htobe64(stream->relayd_stream_id); data_hdr.data_size = htobe32(data_size); data_hdr.padding_size = htobe32(padding); + /* + * Note that net_seq_num below is assigned with the *current* value of + * next_net_seq_num and only after that the next_net_seq_num will be + * increment. This is why when issuing a command on the relayd using + * this next value, 1 should always be substracted in order to compare + * the last seen sequence number on the relayd side to the last sent. + */ data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++); /* Other fields are zeroed previously */ @@ -1099,7 +1072,7 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx) do { ret = write(ctx->consumer_should_quit[1], "4", 1); } while (ret < 0 && errno == EINTR); - if (ret < 0) { + if (ret < 0 || ret != 1) { PERROR("write consumer quit"); } @@ -1317,8 +1290,22 @@ static int write_relayd_metadata_id(int fd, do { ret = write(fd, (void *) &hdr, sizeof(hdr)); } while (ret < 0 && errno == EINTR); - if (ret < 0) { - PERROR("write metadata stream id"); + if (ret < 0 || ret != sizeof(hdr)) { + /* + * This error means that the fd's end is closed so ignore the perror + * not to clubber the error output since this can happen in a normal + * code path. + */ + if (errno != EPIPE) { + PERROR("write metadata stream id"); + } + DBG3("Consumer failed to write relayd metadata id (errno: %d)", errno); + /* + * Set ret to a negative value because if ret != sizeof(hdr), we don't + * handle writting the missing part so report that as an error and + * don't lie to the caller. + */ + ret = -1; goto end; } DBG("Metadata stream id %" PRIu64 " with padding %lu written before data", @@ -1435,7 +1422,13 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( } while (ret < 0 && errno == EINTR); DBG("Consumer mmap write() ret %zd (len %lu)", ret, len); if (ret < 0) { - PERROR("Error in file write"); + /* + * This is possible if the fd is closed on the other side (outfd) + * or any write problem. It can be verbose a bit for a normal + * execution if for instance the relayd is stopped abruptly. This + * can happen so set this to a DBG statement. + */ + DBG("Error in file write mmap"); if (written == 0) { written = ret; } @@ -2101,17 +2094,13 @@ void *consumer_thread_metadata_poll(void *data) DBG("Metadata main loop started"); while (1) { - lttng_poll_reset(&events); - - nb_fd = LTTNG_POLL_GETNB(&events); - /* Only the metadata pipe is set */ - if (nb_fd == 0 && consumer_quit == 1) { + if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) { goto end; } restart: - DBG("Metadata poll wait with %d fd(s)", nb_fd); + DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events)); ret = lttng_poll_wait(&events, -1); DBG("Metadata event catched in thread"); if (ret < 0) { @@ -2122,6 +2111,8 @@ restart: goto error; } + nb_fd = ret; + /* From here, the event is a metadata wait fd */ for (i = 0; i < nb_fd; i++) { revents = LTTNG_POLL_GETEV(&events, i); @@ -2366,6 +2357,11 @@ void *consumer_thread_data_poll(void *data) pipe_readlen = read(ctx->consumer_data_pipe[0], &new_stream, sizeof(new_stream)); } while (pipe_readlen == -1 && errno == EINTR); + if (pipe_readlen < 0) { + PERROR("read consumer data pipe"); + /* Continue so we can at least handle the current stream(s). */ + continue; + } /* * If the stream is NULL, just ignore it. It's also possible that @@ -2573,7 +2569,7 @@ void *consumer_thread_sessiond_poll(void *data) /* Blocking call, waiting for transmission */ sock = lttcomm_accept_unix_sock(client_socket); - if (sock <= 0) { + if (sock < 0) { WARN("On accept"); goto end; } @@ -2700,7 +2696,6 @@ void lttng_consumer_init(void) consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); - relayd_session_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); } /* @@ -2717,7 +2712,6 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, int fd = -1, ret = -1, relayd_created = 0; enum lttng_error_code ret_code = LTTNG_OK; struct consumer_relayd_sock_pair *relayd; - struct consumer_relayd_session_id *relayd_id_node; DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx); @@ -2735,6 +2729,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, relayd = consumer_allocate_relayd_sock_pair(net_seq_idx); if (relayd == NULL) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); + ret = -1; goto error; } relayd->sessiond_session_id = (uint64_t) sessiond_id; @@ -2784,32 +2779,21 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, relayd->control_sock.fd = fd; /* - * Create a session on the relayd and store the returned id. No need to - * grab the socket lock since the relayd object is not yet visible. + * Create a session on the relayd and store the returned id. Lock the + * control socket mutex if the relayd was NOT created before. */ + if (!relayd_created) { + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + } ret = relayd_create_session(&relayd->control_sock, &relayd->relayd_session_id); - if (ret < 0) { - goto error; + if (!relayd_created) { + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); } - - /* Set up a relayd session id node. */ - relayd_id_node = zmalloc(sizeof(struct consumer_relayd_session_id)); - if (!relayd_id_node) { - PERROR("zmalloc relayd id node"); + if (ret < 0) { goto error; } - relayd_id_node->relayd_id = relayd->relayd_session_id; - relayd_id_node->sessiond_id = (uint64_t) sessiond_id; - - /* Indexed by session id of the sessiond. */ - lttng_ht_node_init_ulong(&relayd_id_node->node, - relayd_id_node->sessiond_id); - rcu_read_lock(); - lttng_ht_add_unique_ulong(relayd_session_id_ht, &relayd_id_node->node); - rcu_read_unlock(); - break; case LTTNG_STREAM_DATA: /* Copy received lttcomm socket */ @@ -2831,6 +2815,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, break; default: ERR("Unknown relayd socket type (%d)", sock_type); + ret = -1; goto error; } @@ -2902,30 +2887,24 @@ end: static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id) { struct lttng_ht_iter iter; - struct lttng_ht_node_ulong *node; struct consumer_relayd_sock_pair *relayd = NULL; - struct consumer_relayd_session_id *session_id_map; - - /* Get the session id map. */ - lttng_ht_lookup(relayd_session_id_ht, (void *)((unsigned long) id), &iter); - node = lttng_ht_iter_get_node_ulong(&iter); - if (node == NULL) { - goto end; - } - - session_id_map = caa_container_of(node, struct consumer_relayd_session_id, - node); /* Iterate over all relayd since they are indexed by net_seq_idx. */ cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) { - if (relayd->relayd_session_id == session_id_map->relayd_id) { + /* + * Check by sessiond id which is unique here where the relayd session + * id might not be when having multiple relayd. + */ + if (relayd->sessiond_session_id == id) { /* Found the relayd. There can be only one per id. */ - break; + goto found; } } -end: + return NULL; + +found: return relayd; } @@ -3008,10 +2987,12 @@ int consumer_data_pending(uint64_t id) if (relayd) { pthread_mutex_lock(&relayd->ctrl_sock_mutex); if (stream->metadata_flag) { - ret = relayd_quiescent_control(&relayd->control_sock); + ret = relayd_quiescent_control(&relayd->control_sock, + stream->relayd_stream_id); } else { ret = relayd_data_pending(&relayd->control_sock, - stream->relayd_stream_id, stream->next_net_seq_num); + stream->relayd_stream_id, + stream->next_net_seq_num - 1); } pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret == 1) { @@ -3030,10 +3011,12 @@ int consumer_data_pending(uint64_t id) ret = relayd_end_data_pending(&relayd->control_sock, relayd->relayd_session_id, &is_data_inflight); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - if (ret < 0 || !is_data_inflight) { - /* On error or if NO data inflight, no data is pending. */ + if (ret < 0) { goto data_not_pending; } + if (is_data_inflight) { + goto data_pending; + } } /*