X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.c;h=3c20c57bda9d68ac1967de0cf433901ab8cce7e6;hp=afc346333c5dc780a57c2bb86a548a9b0c60d82f;hb=fb9a95c4d6242bd8336b638c90a7d8f846125659;hpb=0b50e4b3fb9859af7072adcca784684834e5f8d1 diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index afc346333..3c20c57bd 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -323,6 +323,7 @@ static void free_relayd_rcu(struct rcu_head *head) (void) relayd_close(&relayd->control_sock); (void) relayd_close(&relayd->data_sock); + pthread_mutex_destroy(&relayd->ctrl_sock_mutex); free(relayd); } @@ -392,6 +393,10 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) iter.iter.node = &channel->node.node; ret = lttng_ht_del(consumer_data.channel_ht, &iter); assert(!ret); + + iter.iter.node = &channel->channels_by_session_id_ht_node.node; + ret = lttng_ht_del(consumer_data.channels_by_session_id_ht, &iter); + assert(!ret); rcu_read_unlock(); call_rcu(&channel->node.head, free_channel_rcu); @@ -464,14 +469,13 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx, * If a local data context is available, notify the threads that the streams' * state have changed. */ -static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd, - struct lttng_consumer_local_data *ctx) +void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd) { uint64_t netidx; assert(relayd); - DBG("Cleaning up relayd sockets"); + DBG("Cleaning up relayd object ID %"PRIu64, relayd->net_seq_idx); /* Save the net sequence index before destroying the object */ netidx = relayd->net_seq_idx; @@ -491,10 +495,8 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd, * memory barrier ordering the updates of the end point status from the * read of this status which happens AFTER receiving this notify. */ - if (ctx) { - notify_thread_lttng_pipe(ctx->consumer_data_pipe); - notify_thread_lttng_pipe(ctx->consumer_metadata_pipe); - } + notify_thread_lttng_pipe(relayd->ctx->consumer_data_pipe); + notify_thread_lttng_pipe(relayd->ctx->consumer_metadata_pipe); } /* @@ -812,6 +814,8 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, stream->trace_archive_id); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { + ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); goto end; } @@ -853,6 +857,8 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx) ret = relayd_streams_sent(&relayd->control_sock); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { + ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); goto end; } } else { @@ -1033,6 +1039,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, } lttng_ht_node_init_u64(&channel->node, channel->key); + lttng_ht_node_init_u64(&channel->channels_by_session_id_ht_node, + channel->session_id); channel->wait_fd = -1; @@ -1065,6 +1073,8 @@ int consumer_add_channel(struct lttng_consumer_channel *channel, rcu_read_lock(); lttng_ht_add_unique_u64(consumer_data.channel_ht, &channel->node); + lttng_ht_add_u64(consumer_data.channels_by_session_id_ht, + &channel->channels_by_session_id_ht_node); rcu_read_unlock(); pthread_mutex_unlock(&channel->timer_lock); @@ -1223,6 +1233,7 @@ void lttng_consumer_cleanup(void) rcu_read_unlock(); lttng_ht_destroy(consumer_data.channel_ht); + lttng_ht_destroy(consumer_data.channels_by_session_id_ht); cleanup_relayd_ht(); @@ -1234,6 +1245,8 @@ void lttng_consumer_cleanup(void) * it. */ lttng_ht_destroy(consumer_data.stream_list_ht); + + lttng_trace_chunk_registry_destroy(consumer_data.chunk_registry); } /* @@ -1720,7 +1733,8 @@ write_error: * cleanup the relayd object and all associated streams. */ if (relayd && relayd_hang_up) { - cleanup_relayd(relayd, ctx); + ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); } end: @@ -1946,7 +1960,8 @@ write_error: * cleanup the relayd object and all associated streams. */ if (relayd && relayd_hang_up) { - cleanup_relayd(relayd, ctx); + ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); /* Skip splice error so the consumer does not fail */ goto end; } @@ -2279,26 +2294,6 @@ static void validate_endpoint_status_metadata_stream( rcu_read_unlock(); } -static -int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx, - uint64_t key) -{ - ssize_t ret; - - do { - ret = write(ctx->channel_rotate_pipe, &key, sizeof(key)); - } while (ret == -1 && errno == EINTR); - if (ret == -1) { - PERROR("Failed to write to the channel rotation pipe"); - } else { - DBG("Sent channel rotation notification for channel key %" - PRIu64, key); - ret = 0; - } - - return (int) ret; -} - /* * Perform operations that need to be done after a stream has * rotated and released the stream lock. @@ -2335,14 +2330,7 @@ int consumer_post_rotation(struct lttng_consumer_stream *stream, abort(); } - if (--stream->chan->nr_stream_rotate_pending == 0) { - DBG("Rotation of channel \"%s\" completed, notifying the session daemon", - stream->chan->name); - ret = rotate_notify_sessiond(ctx, stream->chan->key); - } - assert(stream->chan->nr_stream_rotate_pending >= 0); pthread_mutex_unlock(&stream->chan->lock); - return ret; } @@ -2419,11 +2407,6 @@ restart: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); - if (!revents) { - /* No activity for this FD (poll implementation). */ - continue; - } - if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) { if (revents & LPOLLIN) { ssize_t pipe_len; @@ -2577,7 +2560,9 @@ void *consumer_thread_data_poll(void *data) /* local view of the streams */ struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL; /* local view of consumer_data.fds_count */ - int nb_fd = 0, nb_pipes_fd; + int nb_fd = 0; + /* 2 for the consumer_data_pipe and wake up pipe */ + const int nb_pipes_fd = 2; /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */ int nb_inactive_fd = 0; struct lttng_consumer_local_data *ctx = data; @@ -2617,12 +2602,7 @@ void *consumer_thread_data_poll(void *data) free(local_stream); local_stream = NULL; - /* - * Allocate for all fds + 2: - * +1 for the consumer_data_pipe - * +1 for wake up pipe - */ - nb_pipes_fd = 2; + /* Allocate for all fds */ pollfd = zmalloc((consumer_data.stream_count + nb_pipes_fd) * sizeof(struct pollfd)); if (pollfd == NULL) { PERROR("pollfd malloc"); @@ -3016,11 +2996,6 @@ restart: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); - if (!revents) { - /* No activity for this FD (poll implementation). */ - continue; - } - if (pollfd == ctx->consumer_channel_pipe[0]) { if (revents & LPOLLIN) { enum consumer_channel_action action; @@ -3459,6 +3434,12 @@ int lttng_consumer_init(void) goto error; } + consumer_data.channels_by_session_id_ht = + lttng_ht_new(0, LTTNG_HT_TYPE_U64); + if (!consumer_data.channels_by_session_id_ht) { + goto error; + } + consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (!consumer_data.relayd_ht) { goto error; @@ -3484,6 +3465,11 @@ int lttng_consumer_init(void) goto error; } + consumer_data.chunk_registry = lttng_trace_chunk_registry_create(); + if (!consumer_data.chunk_registry) { + goto error; + } + return 0; error: @@ -3594,7 +3580,6 @@ error: /* Assign new file descriptor */ relayd->control_sock.sock.fd = fd; - fd = -1; /* For error path */ /* Assign version values. */ relayd->control_sock.major = relayd_sock->major; relayd->control_sock.minor = relayd_sock->minor; @@ -3622,7 +3607,6 @@ error: /* Assign new file descriptor */ relayd->data_sock.sock.fd = fd; - fd = -1; /* for eventual error paths */ /* Assign version values. */ relayd->data_sock.major = relayd_sock->major; relayd->data_sock.minor = relayd_sock->minor; @@ -3636,6 +3620,11 @@ error: DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)", sock_type == LTTNG_STREAM_CONTROL ? "control" : "data", relayd->net_seq_idx, fd); + /* + * We gave the ownership of the fd to the relayd structure. Set the + * fd to -1 so we don't call close() on it in the error path below. + */ + fd = -1; /* We successfully added the socket. Send status back. */ ret = consumer_send_status_msg(sock, ret_code); @@ -3649,6 +3638,7 @@ error: * Add relayd socket pair to consumer data hashtable. If object already * exists or on error, the function gracefully returns. */ + relayd->ctx = ctx; add_relayd(relayd); /* All good! */ @@ -3672,34 +3662,6 @@ error_nosignal: } } -/* - * Try to lock the stream mutex. - * - * On success, 1 is returned else 0 indicating that the mutex is NOT lock. - */ -static int stream_try_lock(struct lttng_consumer_stream *stream) -{ - int ret; - - assert(stream); - - /* - * Try to lock the stream mutex. On failure, we know that the stream is - * being used else where hence there is data still being extracted. - */ - ret = pthread_mutex_trylock(&stream->lock); - if (ret) { - /* For both EBUSY and EINVAL error, the mutex is NOT locked. */ - ret = 0; - goto end; - } - - ret = 1; - -end: - return ret; -} - /* * Search for a relayd associated to the session id and return the reference. * @@ -3766,28 +3728,11 @@ int consumer_data_pending(uint64_t id) /* Ease our life a bit */ ht = consumer_data.stream_list_ht; - relayd = find_relayd_by_session_id(id); - if (relayd) { - /* Send init command for data pending. */ - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = relayd_begin_data_pending(&relayd->control_sock, - relayd->relayd_session_id); - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - if (ret < 0) { - /* Communication error thus the relayd so no data pending. */ - goto data_not_pending; - } - } - cds_lfht_for_each_entry_duplicate(ht->ht, ht->hash_fct(&id, lttng_ht_seed), ht->match_fct, &id, &iter.iter, stream, node_session_id.node) { - /* If this call fails, the stream is being used hence data pending. */ - ret = stream_try_lock(stream); - if (!ret) { - goto data_pending; - } + pthread_mutex_lock(&stream->lock); /* * A removed node from the hash table indicates that the stream has @@ -3805,9 +3750,27 @@ int consumer_data_pending(uint64_t id) } } - /* Relayd check */ - if (relayd) { - pthread_mutex_lock(&relayd->ctrl_sock_mutex); + pthread_mutex_unlock(&stream->lock); + } + + relayd = find_relayd_by_session_id(id); + if (relayd) { + unsigned int is_data_inflight = 0; + + /* Send init command for data pending. */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_begin_data_pending(&relayd->control_sock, + relayd->relayd_session_id); + if (ret < 0) { + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + /* Communication error thus the relayd so no data pending. */ + goto data_not_pending; + } + + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&id, lttng_ht_seed), + ht->match_fct, &id, + &iter.iter, stream, node_session_id.node) { if (stream->metadata_flag) { ret = relayd_quiescent_control(&relayd->control_sock, stream->relayd_stream_id); @@ -3816,24 +3779,25 @@ int consumer_data_pending(uint64_t id) stream->relayd_stream_id, stream->next_net_seq_num - 1); } - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret == 1) { - pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); goto data_pending; + } else if (ret < 0) { + ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + goto data_not_pending; } } - pthread_mutex_unlock(&stream->lock); - } - if (relayd) { - unsigned int is_data_inflight = 0; - - /* Send init command for data pending. */ - pthread_mutex_lock(&relayd->ctrl_sock_mutex); + /* Send end command for data pending. */ ret = relayd_end_data_pending(&relayd->control_sock, relayd->relayd_session_id, &is_data_inflight); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { + ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); goto data_not_pending; } if (is_data_inflight) { @@ -3947,15 +3911,16 @@ end: * is already at the rotate position (produced == consumed), we flag it as * ready for rotation. The rotation of ready streams occurs after we have * replied to the session daemon that we have finished sampling the positions. + * Must be called with RCU read-side lock held to ensure existence of channel. * * Returns 0 on success, < 0 on error */ -int lttng_consumer_rotate_channel(uint64_t key, const char *path, - uint64_t relayd_id, uint32_t metadata, uint64_t new_chunk_id, +int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, + uint64_t key, const char *path, uint64_t relayd_id, + uint32_t metadata, uint64_t new_chunk_id, struct lttng_consumer_local_data *ctx) { int ret; - struct lttng_consumer_channel *channel; struct lttng_consumer_stream *stream; struct lttng_ht_iter iter; struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht; @@ -3964,13 +3929,6 @@ int lttng_consumer_rotate_channel(uint64_t key, const char *path, rcu_read_lock(); - channel = consumer_find_channel(key); - if (!channel) { - ERR("No channel found for key %" PRIu64, key); - ret = -1; - goto end; - } - pthread_mutex_lock(&channel->lock); channel->current_chunk_id = new_chunk_id; @@ -4035,7 +3993,6 @@ int lttng_consumer_rotate_channel(uint64_t key, const char *path, if (consumed_pos == stream->rotate_position) { stream->rotate_ready = true; } - channel->nr_stream_rotate_pending++; ret = consumer_flush_buffer(stream, 1); if (ret < 0) { @@ -4204,6 +4161,10 @@ int rotate_relay_stream(struct lttng_consumer_local_data *ctx, stream->chan->current_chunk_id, stream->last_sequence_number); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0) { + ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); + } if (ret) { ERR("Rotate relay stream"); } @@ -4230,8 +4191,9 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, } else { ret = rotate_local_stream(ctx, stream); } + stream->trace_archive_id++; if (ret < 0) { - ERR("Rotate stream"); + ERR("Failed to rotate stream, ret = %i", ret); goto error; } @@ -4279,14 +4241,15 @@ error: * This is especially important for low throughput streams that have already * been consumed, we cannot wait for their next packet to perform the * rotation. + * Need to be called with RCU read-side lock held to ensure existence of + * channel. * * Returns 0 on success, < 0 on error */ -int lttng_consumer_rotate_ready_streams(uint64_t key, - struct lttng_consumer_local_data *ctx) +int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, + uint64_t key, struct lttng_consumer_local_data *ctx) { int ret; - struct lttng_consumer_channel *channel; struct lttng_consumer_stream *stream; struct lttng_ht_iter iter; struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht; @@ -4295,13 +4258,6 @@ int lttng_consumer_rotate_ready_streams(uint64_t key, DBG("Consumer rotate ready streams in channel %" PRIu64, key); - channel = consumer_find_channel(key); - if (!channel) { - ERR("No channel found for key %" PRIu64, key); - ret = -1; - goto end; - } - cds_lfht_for_each_entry_duplicate(ht->ht, ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct, &channel->key, &iter.iter, @@ -4377,6 +4333,10 @@ int rotate_rename_relay(const char *old_path, const char *new_path, pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_rotate_rename(&relayd->control_sock, old_path, new_path); + if (ret < 0) { + ERR("Relayd rotate rename failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); + } pthread_mutex_unlock(&relayd->ctrl_sock_mutex); end: return ret; @@ -4392,7 +4352,64 @@ int lttng_consumer_rotate_rename(const char *old_path, const char *new_path, } } -int lttng_consumer_rotate_pending_relay(uint64_t session_id, +/* Stream lock must be acquired by the caller. */ +static +bool check_stream_rotation_pending(const struct lttng_consumer_stream *stream, + uint64_t session_id, uint64_t chunk_id) +{ + bool pending = false; + + if (stream->session_id != session_id) { + /* Skip. */ + goto end; + } + + /* + * If the stream's archive_id belongs to the chunk being rotated (or an + * even older one), it means that the consumer has not consumed all the + * buffers that belong to the chunk being rotated. Therefore, the + * rotation is considered as ongoing/pending. + */ + pending = stream->trace_archive_id <= chunk_id; +end: + return pending; +} + +/* RCU read lock must be acquired by the caller. */ +int lttng_consumer_check_rotation_pending_local(uint64_t session_id, + uint64_t chunk_id) +{ + struct lttng_ht_iter iter; + struct lttng_consumer_stream *stream; + bool rotation_pending = false; + + /* Start with the metadata streams... */ + cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) { + pthread_mutex_lock(&stream->lock); + rotation_pending = check_stream_rotation_pending(stream, + session_id, chunk_id); + pthread_mutex_unlock(&stream->lock); + if (rotation_pending) { + goto end; + } + } + + /* ... followed by the data streams. */ + cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) { + pthread_mutex_lock(&stream->lock); + rotation_pending = check_stream_rotation_pending(stream, + session_id, chunk_id); + pthread_mutex_unlock(&stream->lock); + if (rotation_pending) { + goto end; + } + } + +end: + return !!rotation_pending; +} + +int lttng_consumer_check_rotation_pending_relay(uint64_t session_id, uint64_t relayd_id, uint64_t chunk_id) { int ret; @@ -4400,13 +4417,17 @@ int lttng_consumer_rotate_pending_relay(uint64_t session_id, relayd = consumer_find_relayd(relayd_id); if (!relayd) { - ERR("Failed to find relayd"); + ERR("Failed to find relayd id %" PRIu64, relayd_id); ret = -1; goto end; } pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_rotate_pending(&relayd->control_sock, chunk_id); + if (ret < 0) { + ERR("Relayd rotate pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); + } pthread_mutex_unlock(&relayd->ctrl_sock_mutex); end: @@ -4444,6 +4465,10 @@ int mkdir_relay(const char *path, uint64_t relayd_id) pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_mkdir(&relayd->control_sock, path); + if (ret < 0) { + ERR("Relayd mkdir failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); + } pthread_mutex_unlock(&relayd->ctrl_sock_mutex); end: @@ -4460,3 +4485,24 @@ int lttng_consumer_mkdir(const char *path, uid_t uid, gid_t gid, return mkdir_local(path, uid, gid); } } + +enum lttcomm_return_code lttng_consumer_init_command( + struct lttng_consumer_local_data *ctx, + const lttng_uuid sessiond_uuid) +{ + enum lttcomm_return_code ret; + char uuid_str[UUID_STR_LEN]; + + if (ctx->sessiond_uuid.is_set) { + ret = LTTCOMM_CONSUMERD_ALREADY_SET; + goto end; + } + + ctx->sessiond_uuid.is_set = true; + memcpy(ctx->sessiond_uuid.value, sessiond_uuid, sizeof(lttng_uuid)); + ret = LTTCOMM_CONSUMERD_SUCCESS; + lttng_uuid_to_str(sessiond_uuid, uuid_str); + DBG("Received session daemon UUID: %s", uuid_str); +end: + return ret; +}