X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.c;h=63a2ab288c865e338582485199a7a268064bb4af;hb=9276e5c88e693249bd31197baecf58310df8167e;hp=3ac00cb030bdebc60a3c864a91107873fb683f63;hpb=b99a8d4211e26a847ca8916884354f159c999ad2;p=lttng-tools.git diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 3ac00cb03..63a2ab288 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -323,6 +323,7 @@ static void free_relayd_rcu(struct rcu_head *head) (void) relayd_close(&relayd->control_sock); (void) relayd_close(&relayd->data_sock); + pthread_mutex_destroy(&relayd->ctrl_sock_mutex); free(relayd); } @@ -464,14 +465,13 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx, * If a local data context is available, notify the threads that the streams' * state have changed. */ -static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd, - struct lttng_consumer_local_data *ctx) +void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd) { uint64_t netidx; assert(relayd); - DBG("Cleaning up relayd sockets"); + DBG("Cleaning up relayd object ID %"PRIu64, relayd->net_seq_idx); /* Save the net sequence index before destroying the object */ netidx = relayd->net_seq_idx; @@ -491,10 +491,8 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd, * memory barrier ordering the updates of the end point status from the * read of this status which happens AFTER receiving this notify. */ - if (ctx) { - notify_thread_lttng_pipe(ctx->consumer_data_pipe); - notify_thread_lttng_pipe(ctx->consumer_metadata_pipe); - } + notify_thread_lttng_pipe(relayd->ctx->consumer_data_pipe); + notify_thread_lttng_pipe(relayd->ctx->consumer_metadata_pipe); } /* @@ -562,7 +560,8 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, int cpu, int *alloc_ret, enum consumer_channel_type type, - unsigned int monitor) + unsigned int monitor, + uint64_t trace_archive_id) { int ret; struct lttng_consumer_stream *stream; @@ -589,6 +588,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE; stream->index_file = NULL; stream->last_sequence_number = -1ULL; + stream->trace_archive_id = trace_archive_id; pthread_mutex_init(&stream->lock, NULL); pthread_mutex_init(&stream->metadata_timer_lock, NULL); @@ -806,9 +806,12 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_add_stream(&relayd->control_sock, stream->name, path, &stream->relayd_stream_id, - stream->chan->tracefile_size, stream->chan->tracefile_count); + stream->chan->tracefile_size, stream->chan->tracefile_count, + stream->trace_archive_id); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { + ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); goto end; } @@ -850,6 +853,8 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx) ret = relayd_streams_sent(&relayd->control_sock); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { + ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); goto end; } } else { @@ -1475,7 +1480,7 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) */ static int write_relayd_metadata_id(int fd, struct lttng_consumer_stream *stream, - struct consumer_relayd_sock_pair *relayd, unsigned long padding) + unsigned long padding) { ssize_t ret; struct lttcomm_relayd_metadata_payload hdr; @@ -1610,7 +1615,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( /* Write metadata stream id before payload */ if (stream->metadata_flag) { - ret = write_relayd_metadata_id(outfd, stream, relayd, padding); + ret = write_relayd_metadata_id(outfd, stream, padding); if (ret < 0) { relayd_hang_up = 1; goto write_error; @@ -1717,7 +1722,8 @@ write_error: * cleanup the relayd object and all associated streams. */ if (relayd && relayd_hang_up) { - cleanup_relayd(relayd, ctx); + ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); } end: @@ -1799,7 +1805,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( } stream->reset_metadata_flag = 0; } - ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd, + ret = write_relayd_metadata_id(splice_pipe[1], stream, padding); if (ret < 0) { written = ret; @@ -1943,7 +1949,8 @@ write_error: * cleanup the relayd object and all associated streams. */ if (relayd && relayd_hang_up) { - cleanup_relayd(relayd, ctx); + ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); /* Skip splice error so the consumer does not fail */ goto end; } @@ -2337,7 +2344,6 @@ int consumer_post_rotation(struct lttng_consumer_stream *stream, stream->chan->name); ret = rotate_notify_sessiond(ctx, stream->chan->key); } - assert(stream->chan->nr_stream_rotate_pending >= 0); pthread_mutex_unlock(&stream->chan->lock); return ret; @@ -2574,7 +2580,9 @@ void *consumer_thread_data_poll(void *data) /* local view of the streams */ struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL; /* local view of consumer_data.fds_count */ - int nb_fd = 0, nb_pipes_fd; + int nb_fd = 0; + /* 2 for the consumer_data_pipe and wake up pipe */ + const int nb_pipes_fd = 2; /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */ int nb_inactive_fd = 0; struct lttng_consumer_local_data *ctx = data; @@ -2614,12 +2622,7 @@ void *consumer_thread_data_poll(void *data) free(local_stream); local_stream = NULL; - /* - * Allocate for all fds + 2: - * +1 for the consumer_data_pipe - * +1 for wake up pipe - */ - nb_pipes_fd = 2; + /* Allocate for all fds */ pollfd = zmalloc((consumer_data.stream_count + nb_pipes_fd) * sizeof(struct pollfd)); if (pollfd == NULL) { PERROR("pollfd malloc"); @@ -3646,6 +3649,7 @@ error: * Add relayd socket pair to consumer data hashtable. If object already * exists or on error, the function gracefully returns. */ + relayd->ctx = ctx; add_relayd(relayd); /* All good! */ @@ -3772,6 +3776,8 @@ int consumer_data_pending(uint64_t id) pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { /* Communication error thus the relayd so no data pending. */ + ERR("Relayd begin data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); goto data_not_pending; } } @@ -3813,6 +3819,13 @@ int consumer_data_pending(uint64_t id) stream->relayd_stream_id, stream->next_net_seq_num - 1); } + if (ret < 0) { + ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + pthread_mutex_unlock(&stream->lock); + goto data_not_pending; + } pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret == 1) { pthread_mutex_unlock(&stream->lock); @@ -3831,6 +3844,8 @@ int consumer_data_pending(uint64_t id) relayd->relayd_session_id, &is_data_inflight); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { + ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); goto data_not_pending; } if (is_data_inflight) { @@ -4389,6 +4404,27 @@ int lttng_consumer_rotate_rename(const char *old_path, const char *new_path, } } +int lttng_consumer_rotate_pending_relay(uint64_t session_id, + uint64_t relayd_id, uint64_t chunk_id) +{ + int ret; + struct consumer_relayd_sock_pair *relayd; + + relayd = consumer_find_relayd(relayd_id); + if (!relayd) { + ERR("Failed to find relayd"); + ret = -1; + goto end; + } + + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_rotate_pending(&relayd->control_sock, chunk_id); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + +end: + return ret; +} + static int mkdir_local(const char *path, uid_t uid, gid_t gid) {