X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=514278d91fd930fac88603286e93fec16c7691a5;hb=58f835e108c72cf830c4bc3d5b6abce80ebb0b6c;hp=0d61b866f46d426c120694d9138f57a83be26a5d;hpb=bbc4768c20f1c552222e1746f9475d145d7bf04e;p=lttng-tools.git diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 0d61b866f..514278d91 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -519,7 +519,8 @@ error_open: if (channel->root_shm_path[0]) { (void) run_as_rmdir_recursive(channel->root_shm_path, channel->buffer_credentials.value.uid, - channel->buffer_credentials.value.gid); + channel->buffer_credentials.value.gid, + LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG); } free(stream_fds); error_alloc: @@ -774,10 +775,19 @@ static int flush_channel(uint64_t chan_key) health_code_update(); pthread_mutex_lock(&stream->lock); + + /* + * Protect against concurrent teardown of a stream. + */ + if (cds_lfht_is_node_deleted(&stream->node.node)) { + goto next; + } + if (!stream->quiescent) { ustctl_flush_buffer(stream->ustream, 0); stream->quiescent = true; } +next: pthread_mutex_unlock(&stream->lock); } error: @@ -1590,7 +1600,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (!channel) { ERR("UST consumer get channel key %" PRIu64 " not found", key); ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; - goto end_msg_sessiond; + goto end_get_channel; } health_code_update(); @@ -1606,13 +1616,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, * and the consumer can continue its work. The above call * has sent the error status message to the sessiond. */ - goto end_nosignal; + goto end_get_channel_nosignal; } /* * The communicaton was broken hence there is a bad state between * the consumer and sessiond so stop everything. */ - goto error_fatal; + goto error_get_channel_fatal; } health_code_update(); @@ -1622,7 +1632,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, * so don't send them to the data thread. */ if (!channel->monitor) { - goto end_msg_sessiond; + goto end_get_channel; } ret = send_streams_to_thread(channel, ctx); @@ -1631,11 +1641,16 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, * If we are unable to send the stream to the thread, there is * a big problem so just stop everything. */ - goto error_fatal; + goto error_get_channel_fatal; } /* List MUST be empty after or else it could be reused. */ assert(cds_list_empty(&channel->streams.head)); +end_get_channel: goto end_msg_sessiond; +error_get_channel_fatal: + goto error_fatal; +end_get_channel_nosignal: + goto end_nosignal; } case LTTNG_CONSUMER_DESTROY_CHANNEL: { @@ -1706,7 +1721,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, */ DBG("UST consumer push metadata %" PRIu64 " not found", key); ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL; - goto end_msg_sessiond; + goto end_push_metadata_msg_sessiond; } health_code_update(); @@ -1717,14 +1732,14 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, * checked whether the channel can be found. */ ret_code = LTTCOMM_CONSUMERD_SUCCESS; - goto end_msg_sessiond; + goto end_push_metadata_msg_sessiond; } /* Tell session daemon we are ready to receive the metadata. */ ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ - goto error_fatal; + goto error_push_metadata_fatal; } health_code_update(); @@ -1734,7 +1749,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = lttng_consumer_poll_socket(consumer_sockpoll); health_poll_exit(); if (ret) { - goto error_fatal; + goto error_push_metadata_fatal; } health_code_update(); @@ -1743,11 +1758,15 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, len, version, channel, 0, 1); if (ret < 0) { /* error receiving from sessiond */ - goto error_fatal; + goto error_push_metadata_fatal; } else { ret_code = ret; - goto end_msg_sessiond; + goto end_push_metadata_msg_sessiond; } +end_push_metadata_msg_sessiond: + goto end_msg_sessiond; +error_push_metadata_fatal: + goto error_fatal; } case LTTNG_CONSUMER_SETUP_METADATA: { @@ -1974,7 +1993,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; + goto end_rotate_channel_nosignal; } /* @@ -1992,6 +2011,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } } break; +end_rotate_channel_nosignal: + goto end_nosignal; } case LTTNG_CONSUMER_INIT: { @@ -2084,6 +2105,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.close_trace_chunk.close_command.value; const uint64_t relayd_id = msg.u.close_trace_chunk.relayd_id.value; + struct lttcomm_consumer_close_trace_chunk_reply reply; + char closed_trace_chunk_path[LTTNG_PATH_MAX]; + int ret; ret_code = lttng_consumer_close_trace_chunk( msg.u.close_trace_chunk.relayd_id.is_set ? @@ -2094,8 +2118,19 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, (time_t) msg.u.close_trace_chunk.close_timestamp, msg.u.close_trace_chunk.close_command.is_set ? &close_command : - NULL); - goto end_msg_sessiond; + NULL, closed_trace_chunk_path); + reply.ret_code = ret_code; + reply.path_length = strlen(closed_trace_chunk_path) + 1; + ret = lttcomm_send_unix_sock(sock, &reply, sizeof(reply)); + if (ret != sizeof(reply)) { + goto error_fatal; + } + ret = lttcomm_send_unix_sock(sock, closed_trace_chunk_path, + reply.path_length); + if (ret != reply.path_length) { + goto error_fatal; + } + goto end_nosignal; } case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS: { @@ -2114,15 +2149,12 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } end_nosignal: - rcu_read_unlock(); - - health_code_update(); - /* * Return 1 to indicate success since the 0 value can be a socket * shutdown during the recv() or send() call. */ - return 1; + ret = 1; + goto end; end_msg_sessiond: /* @@ -2134,11 +2166,9 @@ end_msg_sessiond: if (ret < 0) { goto error_fatal; } - rcu_read_unlock(); - - health_code_update(); + ret = 1; + goto end; - return 1; end_channel_error: if (channel) { pthread_mutex_unlock(&channel->lock); @@ -2154,15 +2184,18 @@ end_channel_error: /* Stop everything if session daemon can not be notified. */ goto error_fatal; } - rcu_read_unlock(); - - health_code_update(); + ret = 1; + goto end; - return 1; error_fatal: - rcu_read_unlock(); /* This will issue a consumer stop. */ - return -1; + ret = -1; + goto end; + +end: + rcu_read_unlock(); + health_code_update(); + return ret; } /* @@ -2349,7 +2382,8 @@ void lttng_ustconsumer_free_channel(struct lttng_consumer_channel *chan) if (chan->root_shm_path[0]) { (void) run_as_rmdir_recursive(chan->root_shm_path, chan->buffer_credentials.value.uid, - chan->buffer_credentials.value.gid); + chan->buffer_credentials.value.gid, + LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG); } free(chan->stream_fds); } @@ -3020,7 +3054,7 @@ end: * Stop a given metadata channel timer if enabled and close the wait fd which * is the poll pipe of the metadata stream. * - * This MUST be called with the metadata channel acquired. + * This MUST be called with the metadata channel lock acquired. */ void lttng_ustconsumer_close_metadata(struct lttng_consumer_channel *metadata) {