X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=c6ffb12234428b43b985abe6c38a467dea26fa5f;hb=d2dc232d580ae18400445b6e34fabe1023208f20;hp=2948fda5043ad7b8aa4dd177e0beb04b25d38d48;hpb=d85707b0cf4196ea0242f514211a84a144ccea77;p=lttng-tools.git diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 2948fda50..c6ffb1223 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2011 Julien Desfossez + * Copyright (C) 2011 EfficiOS Inc. * Copyright (C) 2011 Mathieu Desnoyers * Copyright (C) 2017 Jérémie Galarneau * @@ -68,7 +68,7 @@ static void destroy_channel(struct lttng_consumer_channel *channel) health_code_update(); - cds_list_del(&stream->send_node); + cds_list_del_init(&stream->send_node); lttng_ust_ctl_destroy_stream(stream->ustream); lttng_trace_chunk_put(stream->trace_chunk); free(stream); @@ -204,7 +204,7 @@ static int send_stream_to_thread(struct lttng_consumer_stream *stream, * global. */ stream->globally_visible = 1; - cds_list_del(&stream->send_node); + cds_list_del_init(&stream->send_node); ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream)); if (ret < 0) { @@ -949,7 +949,6 @@ error: * will make sure to clean that list. */ consumer_stream_destroy(metadata->metadata_stream, NULL); - cds_list_del(&metadata->metadata_stream->send_node); metadata->metadata_stream = NULL; send_streams_error: error_no_stream: @@ -1005,7 +1004,7 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel, metadata_stream = metadata_channel->metadata_stream; assert(metadata_stream); - pthread_mutex_lock(&metadata_stream->lock); + metadata_stream->read_subbuffer_ops.lock(metadata_stream); if (relayd_id != (uint64_t) -1ULL) { metadata_stream->net_seq_idx = relayd_id; ret = consumer_send_relayd_stream(metadata_stream, path); @@ -1013,14 +1012,12 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel, ret = consumer_stream_create_output_files(metadata_stream, false); } - pthread_mutex_unlock(&metadata_stream->lock); if (ret < 0) { goto error_stream; } do { health_code_update(); - ret = lttng_consumer_read_subbuffer(metadata_stream, ctx, true); if (ret < 0) { goto error_stream; @@ -1028,12 +1025,12 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel, } while (ret > 0); error_stream: + metadata_stream->read_subbuffer_ops.unlock(metadata_stream); /* - * Clean up the stream completly because the next snapshot will use a new - * metadata stream. + * Clean up the stream completely because the next snapshot will use a + * new metadata stream. */ consumer_stream_destroy(metadata_stream, NULL); - cds_list_del(&metadata_stream->send_node); metadata_channel->metadata_stream = NULL; error: @@ -1324,10 +1321,21 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, * metadata position to ensure the metadata poll thread consumes * the whole cache. */ - pthread_mutex_lock(&channel->metadata_stream->lock); - metadata_stream_reset_cache_consumed_position( - channel->metadata_stream); - pthread_mutex_unlock(&channel->metadata_stream->lock); + + /* + * channel::metadata_stream can be null when the metadata + * channel is under a snapshot session type. No need to update + * the stream position in that scenario. + */ + if (channel->metadata_stream != NULL) { + pthread_mutex_lock(&channel->metadata_stream->lock); + metadata_stream_reset_cache_consumed_position( + channel->metadata_stream); + pthread_mutex_unlock(&channel->metadata_stream->lock); + } else { + /* Validate we are in snapshot mode. */ + assert(!channel->monitor); + } /* Fall-through. */ case CONSUMER_METADATA_CACHE_WRITE_STATUS_APPENDED_CONTENT: /* @@ -1418,11 +1426,18 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, switch (msg.cmd_type) { case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: { + uint32_t major = msg.u.relayd_sock.major; + uint32_t minor = msg.u.relayd_sock.minor; + enum lttcomm_sock_proto protocol = + (enum lttcomm_sock_proto) msg.u.relayd_sock + .relayd_socket_protocol; + /* Session daemon status message are handled in the following call. */ consumer_add_relayd_socket(msg.u.relayd_sock.net_index, - msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll, - &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id, - msg.u.relayd_sock.relayd_session_id); + msg.u.relayd_sock.type, ctx, sock, + consumer_sockpoll, msg.u.relayd_sock.session_id, + msg.u.relayd_sock.relayd_session_id, major, + minor, protocol); goto end_nosignal; } case LTTNG_CONSUMER_DESTROY_RELAYD: @@ -2448,8 +2463,9 @@ void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream) stream->quiescent = true; } } - pthread_mutex_unlock(&stream->lock); + stream->hangup_flush_done = 1; + pthread_mutex_unlock(&stream->lock); } void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)