X-Git-Url: http://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.c;h=1d6f8b4cc4ce7cba975c85eb6a2c01fcb6e19e5b;hb=bb586a6e7c1337041aab59ec04824ce9bf002d8c;hp=e0d6ea496ade52af826cf02b0fc9fe0054b64895;hpb=3a84e2f3c3df5b461d9621b64f14abc3b8c3c29c;p=lttng-tools.git diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index e0d6ea496..1d6f8b4cc 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -465,14 +465,13 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx, * If a local data context is available, notify the threads that the streams' * state have changed. */ -static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd, - struct lttng_consumer_local_data *ctx) +void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd) { uint64_t netidx; assert(relayd); - DBG("Cleaning up relayd sockets"); + DBG("Cleaning up relayd object ID %"PRIu64, relayd->net_seq_idx); /* Save the net sequence index before destroying the object */ netidx = relayd->net_seq_idx; @@ -492,10 +491,8 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd, * memory barrier ordering the updates of the end point status from the * read of this status which happens AFTER receiving this notify. */ - if (ctx) { - notify_thread_lttng_pipe(ctx->consumer_data_pipe); - notify_thread_lttng_pipe(ctx->consumer_metadata_pipe); - } + notify_thread_lttng_pipe(relayd->ctx->consumer_data_pipe); + notify_thread_lttng_pipe(relayd->ctx->consumer_metadata_pipe); } /* @@ -813,6 +810,8 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, stream->trace_archive_id); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { + ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); goto end; } @@ -854,6 +853,8 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx) ret = relayd_streams_sent(&relayd->control_sock); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { + ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); goto end; } } else { @@ -1721,7 +1722,8 @@ write_error: * cleanup the relayd object and all associated streams. */ if (relayd && relayd_hang_up) { - cleanup_relayd(relayd, ctx); + ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); } end: @@ -1947,7 +1949,8 @@ write_error: * cleanup the relayd object and all associated streams. */ if (relayd && relayd_hang_up) { - cleanup_relayd(relayd, ctx); + ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); /* Skip splice error so the consumer does not fail */ goto end; } @@ -3646,6 +3649,7 @@ error: * Add relayd socket pair to consumer data hashtable. If object already * exists or on error, the function gracefully returns. */ + relayd->ctx = ctx; add_relayd(relayd); /* All good! */ @@ -3669,34 +3673,6 @@ error_nosignal: } } -/* - * Try to lock the stream mutex. - * - * On success, 1 is returned else 0 indicating that the mutex is NOT lock. - */ -static int stream_try_lock(struct lttng_consumer_stream *stream) -{ - int ret; - - assert(stream); - - /* - * Try to lock the stream mutex. On failure, we know that the stream is - * being used else where hence there is data still being extracted. - */ - ret = pthread_mutex_trylock(&stream->lock); - if (ret) { - /* For both EBUSY and EINVAL error, the mutex is NOT locked. */ - ret = 0; - goto end; - } - - ret = 1; - -end: - return ret; -} - /* * Search for a relayd associated to the session id and return the reference. * @@ -3772,6 +3748,8 @@ int consumer_data_pending(uint64_t id) pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { /* Communication error thus the relayd so no data pending. */ + ERR("Relayd begin data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); goto data_not_pending; } } @@ -3780,11 +3758,7 @@ int consumer_data_pending(uint64_t id) ht->hash_fct(&id, lttng_ht_seed), ht->match_fct, &id, &iter.iter, stream, node_session_id.node) { - /* If this call fails, the stream is being used hence data pending. */ - ret = stream_try_lock(stream); - if (!ret) { - goto data_pending; - } + pthread_mutex_lock(&stream->lock); /* * A removed node from the hash table indicates that the stream has @@ -3813,6 +3787,13 @@ int consumer_data_pending(uint64_t id) stream->relayd_stream_id, stream->next_net_seq_num - 1); } + if (ret < 0) { + ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + pthread_mutex_unlock(&stream->lock); + goto data_not_pending; + } pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret == 1) { pthread_mutex_unlock(&stream->lock); @@ -3831,6 +3812,8 @@ int consumer_data_pending(uint64_t id) relayd->relayd_session_id, &is_data_inflight); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { + ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); goto data_not_pending; } if (is_data_inflight) { @@ -4201,6 +4184,10 @@ int rotate_relay_stream(struct lttng_consumer_local_data *ctx, stream->chan->current_chunk_id, stream->last_sequence_number); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0) { + ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); + } if (ret) { ERR("Rotate relay stream"); } @@ -4374,6 +4361,10 @@ int rotate_rename_relay(const char *old_path, const char *new_path, pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_rotate_rename(&relayd->control_sock, old_path, new_path); + if (ret < 0) { + ERR("Relayd rotate rename failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); + } pthread_mutex_unlock(&relayd->ctrl_sock_mutex); end: return ret; @@ -4404,6 +4395,10 @@ int lttng_consumer_rotate_pending_relay(uint64_t session_id, pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_rotate_pending(&relayd->control_sock, chunk_id); + if (ret < 0) { + ERR("Relayd rotate pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); + } pthread_mutex_unlock(&relayd->ctrl_sock_mutex); end: @@ -4441,6 +4436,10 @@ int mkdir_relay(const char *path, uint64_t relayd_id) pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_mkdir(&relayd->control_sock, path); + if (ret < 0) { + ERR("Relayd mkdir failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); + } pthread_mutex_unlock(&relayd->ctrl_sock_mutex); end: