X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=f8706d0ba88528e3dbf1e698de96eec9c4d5073d;hp=ff9d31d5287a92b29bf6ac16bff0e6a17876d837;hb=16aa001d9a342adc6b913f854c0cee7a896a7e03;hpb=0d88e04674ead21c741c6f4ed7fadf666c5e7bce diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index ff9d31d52..f8706d0ba 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -519,7 +519,8 @@ error_open: if (channel->root_shm_path[0]) { (void) run_as_rmdir_recursive(channel->root_shm_path, channel->buffer_credentials.value.uid, - channel->buffer_credentials.value.gid); + channel->buffer_credentials.value.gid, + LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG); } free(stream_fds); error_alloc: @@ -774,10 +775,19 @@ static int flush_channel(uint64_t chan_key) health_code_update(); pthread_mutex_lock(&stream->lock); + + /* + * Protect against concurrent teardown of a stream. + */ + if (cds_lfht_is_node_deleted(&stream->node.node)) { + goto next; + } + if (!stream->quiescent) { ustctl_flush_buffer(stream->ustream, 0); stream->quiescent = true; } +next: pthread_mutex_unlock(&stream->lock); } error: @@ -1069,7 +1079,6 @@ error_stream: * Clean up the stream completly because the next snapshot will use a new * metadata stream. */ - pthread_mutex_lock(&metadata_stream->lock); consumer_stream_destroy(metadata_stream, NULL); cds_list_del(&metadata_stream->send_node); metadata_channel->metadata_stream = NULL; @@ -1590,7 +1599,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (!channel) { ERR("UST consumer get channel key %" PRIu64 " not found", key); ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; - goto end_msg_sessiond; + goto end_get_channel; } health_code_update(); @@ -1606,13 +1615,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, * and the consumer can continue its work. The above call * has sent the error status message to the sessiond. */ - goto end_nosignal; + goto end_get_channel_nosignal; } /* * The communicaton was broken hence there is a bad state between * the consumer and sessiond so stop everything. */ - goto error_fatal; + goto error_get_channel_fatal; } health_code_update(); @@ -1622,7 +1631,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, * so don't send them to the data thread. */ if (!channel->monitor) { - goto end_msg_sessiond; + goto end_get_channel; } ret = send_streams_to_thread(channel, ctx); @@ -1631,11 +1640,16 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, * If we are unable to send the stream to the thread, there is * a big problem so just stop everything. */ - goto error_fatal; + goto error_get_channel_fatal; } /* List MUST be empty after or else it could be reused. */ assert(cds_list_empty(&channel->streams.head)); +end_get_channel: goto end_msg_sessiond; +error_get_channel_fatal: + goto error_fatal; +end_get_channel_nosignal: + goto end_nosignal; } case LTTNG_CONSUMER_DESTROY_CHANNEL: { @@ -1706,7 +1720,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, */ DBG("UST consumer push metadata %" PRIu64 " not found", key); ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL; - goto end_msg_sessiond; + goto end_push_metadata_msg_sessiond; } health_code_update(); @@ -1717,14 +1731,14 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, * checked whether the channel can be found. */ ret_code = LTTCOMM_CONSUMERD_SUCCESS; - goto end_msg_sessiond; + goto end_push_metadata_msg_sessiond; } /* Tell session daemon we are ready to receive the metadata. */ ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ - goto error_fatal; + goto error_push_metadata_fatal; } health_code_update(); @@ -1734,7 +1748,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = lttng_consumer_poll_socket(consumer_sockpoll); health_poll_exit(); if (ret) { - goto error_fatal; + goto error_push_metadata_fatal; } health_code_update(); @@ -1743,11 +1757,15 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, len, version, channel, 0, 1); if (ret < 0) { /* error receiving from sessiond */ - goto error_fatal; + goto error_push_metadata_fatal; } else { ret_code = ret; - goto end_msg_sessiond; + goto end_push_metadata_msg_sessiond; } +end_push_metadata_msg_sessiond: + goto end_msg_sessiond; +error_push_metadata_fatal: + goto error_fatal; } case LTTNG_CONSUMER_SETUP_METADATA: { @@ -1974,7 +1992,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; + goto end_rotate_channel_nosignal; } /* @@ -1992,6 +2010,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } } break; +end_rotate_channel_nosignal: + goto end_nosignal; } case LTTNG_CONSUMER_INIT: { @@ -2008,8 +2028,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, case LTTNG_CONSUMER_CREATE_TRACE_CHUNK: { const struct lttng_credentials credentials = { - .uid = msg.u.create_trace_chunk.credentials.uid, - .gid = msg.u.create_trace_chunk.credentials.gid, + .uid = msg.u.create_trace_chunk.credentials.value.uid, + .gid = msg.u.create_trace_chunk.credentials.value.gid, }; const bool is_local_trace = !msg.u.create_trace_chunk.relayd_id.is_set; @@ -2062,9 +2082,12 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, !is_local_trace ? &relayd_id : NULL, msg.u.create_trace_chunk.session_id, msg.u.create_trace_chunk.chunk_id, - (time_t) msg.u.create_trace_chunk.creation_timestamp, + (time_t) msg.u.create_trace_chunk + .creation_timestamp, chunk_override_name, - &credentials, + msg.u.create_trace_chunk.credentials.is_set ? + &credentials : + NULL, chunk_directory_handle.is_set ? &chunk_directory_handle.value : NULL); @@ -2077,16 +2100,36 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK: { + enum lttng_trace_chunk_command_type close_command = + msg.u.close_trace_chunk.close_command.value; const uint64_t relayd_id = msg.u.close_trace_chunk.relayd_id.value; + struct lttcomm_consumer_close_trace_chunk_reply reply; + char closed_trace_chunk_path[LTTNG_PATH_MAX]; + int ret; ret_code = lttng_consumer_close_trace_chunk( msg.u.close_trace_chunk.relayd_id.is_set ? - &relayd_id : NULL, + &relayd_id : + NULL, msg.u.close_trace_chunk.session_id, msg.u.close_trace_chunk.chunk_id, - (time_t) msg.u.close_trace_chunk.close_timestamp); - goto end_msg_sessiond; + (time_t) msg.u.close_trace_chunk.close_timestamp, + msg.u.close_trace_chunk.close_command.is_set ? + &close_command : + NULL, closed_trace_chunk_path); + reply.ret_code = ret_code; + reply.path_length = strlen(closed_trace_chunk_path) + 1; + ret = lttcomm_send_unix_sock(sock, &reply, sizeof(reply)); + if (ret != sizeof(reply)) { + goto error_fatal; + } + ret = lttcomm_send_unix_sock(sock, closed_trace_chunk_path, + reply.path_length); + if (ret != reply.path_length) { + goto error_fatal; + } + goto end_nosignal; } case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS: { @@ -2105,15 +2148,12 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } end_nosignal: - rcu_read_unlock(); - - health_code_update(); - /* * Return 1 to indicate success since the 0 value can be a socket * shutdown during the recv() or send() call. */ - return 1; + ret = 1; + goto end; end_msg_sessiond: /* @@ -2125,11 +2165,9 @@ end_msg_sessiond: if (ret < 0) { goto error_fatal; } - rcu_read_unlock(); - - health_code_update(); + ret = 1; + goto end; - return 1; end_channel_error: if (channel) { pthread_mutex_unlock(&channel->lock); @@ -2145,15 +2183,18 @@ end_channel_error: /* Stop everything if session daemon can not be notified. */ goto error_fatal; } - rcu_read_unlock(); - - health_code_update(); + ret = 1; + goto end; - return 1; error_fatal: - rcu_read_unlock(); /* This will issue a consumer stop. */ - return -1; + ret = -1; + goto end; + +end: + rcu_read_unlock(); + health_code_update(); + return ret; } /* @@ -2340,7 +2381,8 @@ void lttng_ustconsumer_free_channel(struct lttng_consumer_channel *chan) if (chan->root_shm_path[0]) { (void) run_as_rmdir_recursive(chan->root_shm_path, chan->buffer_credentials.value.uid, - chan->buffer_credentials.value.gid); + chan->buffer_credentials.value.gid, + LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG); } free(chan->stream_fds); } @@ -2381,62 +2423,69 @@ static int get_index_values(struct ctf_packet_index *index, struct ustctl_consumer_stream *ustream) { int ret; + uint64_t packet_size, content_size, timestamp_begin, timestamp_end, + events_discarded, stream_id, stream_instance_id, + packet_seq_num; - ret = ustctl_get_timestamp_begin(ustream, &index->timestamp_begin); + ret = ustctl_get_timestamp_begin(ustream, ×tamp_begin); if (ret < 0) { PERROR("ustctl_get_timestamp_begin"); goto error; } - index->timestamp_begin = htobe64(index->timestamp_begin); - ret = ustctl_get_timestamp_end(ustream, &index->timestamp_end); + ret = ustctl_get_timestamp_end(ustream, ×tamp_end); if (ret < 0) { PERROR("ustctl_get_timestamp_end"); goto error; } - index->timestamp_end = htobe64(index->timestamp_end); - ret = ustctl_get_events_discarded(ustream, &index->events_discarded); + ret = ustctl_get_events_discarded(ustream, &events_discarded); if (ret < 0) { PERROR("ustctl_get_events_discarded"); goto error; } - index->events_discarded = htobe64(index->events_discarded); - ret = ustctl_get_content_size(ustream, &index->content_size); + ret = ustctl_get_content_size(ustream, &content_size); if (ret < 0) { PERROR("ustctl_get_content_size"); goto error; } - index->content_size = htobe64(index->content_size); - ret = ustctl_get_packet_size(ustream, &index->packet_size); + ret = ustctl_get_packet_size(ustream, &packet_size); if (ret < 0) { PERROR("ustctl_get_packet_size"); goto error; } - index->packet_size = htobe64(index->packet_size); - ret = ustctl_get_stream_id(ustream, &index->stream_id); + ret = ustctl_get_stream_id(ustream, &stream_id); if (ret < 0) { PERROR("ustctl_get_stream_id"); goto error; } - index->stream_id = htobe64(index->stream_id); - ret = ustctl_get_instance_id(ustream, &index->stream_instance_id); + ret = ustctl_get_instance_id(ustream, &stream_instance_id); if (ret < 0) { PERROR("ustctl_get_instance_id"); goto error; } - index->stream_instance_id = htobe64(index->stream_instance_id); - ret = ustctl_get_sequence_number(ustream, &index->packet_seq_num); + ret = ustctl_get_sequence_number(ustream, &packet_seq_num); if (ret < 0) { PERROR("ustctl_get_sequence_number"); goto error; } - index->packet_seq_num = htobe64(index->packet_seq_num); + + *index = (typeof(*index)) { + .offset = index->offset, + .packet_size = htobe64(packet_size), + .content_size = htobe64(content_size), + .timestamp_begin = htobe64(timestamp_begin), + .timestamp_end = htobe64(timestamp_end), + .events_discarded = htobe64(events_discarded), + .stream_id = htobe64(stream_id), + .stream_instance_id = htobe64(stream_instance_id), + .packet_seq_num = htobe64(packet_seq_num), + }; error: return ret; @@ -3011,7 +3060,7 @@ end: * Stop a given metadata channel timer if enabled and close the wait fd which * is the poll pipe of the metadata stream. * - * This MUST be called with the metadata channel acquired. + * This MUST be called with the metadata channel lock acquired. */ void lttng_ustconsumer_close_metadata(struct lttng_consumer_channel *metadata) {