X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.cpp;h=685d55f838120a4b94cf407d9e420e03cf294e32;hb=c9e313bc594f40a86eed237dce222c0fc99c957f;hp=47bb5bd98b1a1fbf0c10a8795672ca1af0a556e9;hpb=97535efaa975ca52bf02c2d5e76351bfd2e3defa;p=lttng-tools.git diff --git a/src/common/consumer/consumer.cpp b/src/common/consumer/consumer.cpp index 47bb5bd98..685d55f83 100644 --- a/src/common/consumer/consumer.cpp +++ b/src/common/consumer/consumer.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2011 Julien Desfossez + * Copyright (C) 2011 EfficiOS Inc. * Copyright (C) 2011 Mathieu Desnoyers * Copyright (C) 2012 David Goulet * @@ -7,42 +7,42 @@ * */ -#include "common/index/ctf-index.h" #define _LGPL_SOURCE +#include #include #include +#include #include #include #include #include #include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include lttng_consumer_global_data the_consumer_data; @@ -173,7 +173,6 @@ static void clean_channel_stream_list(struct lttng_consumer_channel *channel) /* Delete streams that might have been left in the stream list. */ cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head, send_node) { - cds_list_del(&stream->send_node); /* * Once a stream is added to this list, the buffers were created so we * have a guarantee that this call will succeed. Setting the monitor @@ -246,6 +245,8 @@ struct lttng_consumer_channel *consumer_find_channel(uint64_t key) struct lttng_ht_node_u64 *node; struct lttng_consumer_channel *channel = NULL; + ASSERT_RCU_READ_LOCKED(); + /* -1ULL keys are lookup failures */ if (key == (uint64_t) -1ULL) { return NULL; @@ -521,6 +522,7 @@ void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd) void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd) { LTTNG_ASSERT(relayd); + ASSERT_RCU_READ_LOCKED(); /* Set destroy flag for this object */ uatomic_set(&relayd->destroy_flag, 1); @@ -633,6 +635,7 @@ static int add_relayd(struct consumer_relayd_sock_pair *relayd) struct lttng_ht_iter iter; LTTNG_ASSERT(relayd); + ASSERT_RCU_READ_LOCKED(); lttng_ht_lookup(the_consumer_data.relayd_ht, &relayd->net_seq_idx, &iter); @@ -690,6 +693,8 @@ struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key) struct lttng_ht_node_u64 *node; struct consumer_relayd_sock_pair *relayd = NULL; + ASSERT_RCU_READ_LOCKED(); + /* Negative keys are lookup failures */ if (key == (uint64_t) -1ULL) { goto error; @@ -3381,6 +3386,8 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, if (!locked_by_caller) { stream->read_subbuffer_ops.lock(stream); + } else { + stream->read_subbuffer_ops.assert_locked(stream); } if (stream->read_subbuffer_ops.on_wake_up) { @@ -3396,7 +3403,7 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, */ if (stream->rotate_ready) { DBG("Rotate stream before consuming data"); - ret = lttng_consumer_rotate_stream(ctx, stream); + ret = lttng_consumer_rotate_stream(stream); if (ret < 0) { ERR("Stream rotation error before consuming data"); goto end; @@ -3452,7 +3459,7 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, */ rotation_ret = lttng_consumer_stream_is_rotate_ready(stream); if (rotation_ret == 1) { - rotation_ret = lttng_consumer_rotate_stream(ctx, stream); + rotation_ret = lttng_consumer_rotate_stream(stream); if (rotation_ret < 0) { ret = rotation_ret; ERR("Stream rotation error after consuming data"); @@ -3556,18 +3563,24 @@ error: * This will create a relayd socket pair and add it to the relayd hash table. * The caller MUST acquire a RCU read side lock before calling it. */ - void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, - struct lttng_consumer_local_data *ctx, int sock, +void consumer_add_relayd_socket(uint64_t net_seq_idx, + int sock_type, + struct lttng_consumer_local_data *ctx, + int sock, struct pollfd *consumer_sockpoll, - struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id, - uint64_t relayd_session_id) + uint64_t sessiond_id, + uint64_t relayd_session_id, + uint32_t relayd_version_major, + uint32_t relayd_version_minor, + enum lttcomm_sock_proto relayd_socket_protocol) { int fd = -1, ret = -1, relayd_created = 0; enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS; struct consumer_relayd_sock_pair *relayd = NULL; LTTNG_ASSERT(ctx); - LTTNG_ASSERT(relayd_sock); + LTTNG_ASSERT(sock >= 0); + ASSERT_RCU_READ_LOCKED(); DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx); @@ -3636,54 +3649,25 @@ error: switch (sock_type) { case LTTNG_STREAM_CONTROL: /* Copy received lttcomm socket */ - lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock); - ret = lttcomm_create_sock(&relayd->control_sock.sock); - /* Handle create_sock error. */ - if (ret < 0) { - ret_code = LTTCOMM_CONSUMERD_ENOMEM; - goto error; - } - /* - * Close the socket created internally by - * lttcomm_create_sock, so we can replace it by the one - * received from sessiond. - */ - if (close(relayd->control_sock.sock.fd)) { - PERROR("close"); - } + ret = lttcomm_populate_sock_from_open_socket( + &relayd->control_sock.sock, fd, + relayd_socket_protocol); - /* Assign new file descriptor */ - relayd->control_sock.sock.fd = fd; /* Assign version values. */ - relayd->control_sock.major = relayd_sock->major; - relayd->control_sock.minor = relayd_sock->minor; + relayd->control_sock.major = relayd_version_major; + relayd->control_sock.minor = relayd_version_minor; relayd->relayd_session_id = relayd_session_id; break; case LTTNG_STREAM_DATA: /* Copy received lttcomm socket */ - lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock); - ret = lttcomm_create_sock(&relayd->data_sock.sock); - /* Handle create_sock error. */ - if (ret < 0) { - ret_code = LTTCOMM_CONSUMERD_ENOMEM; - goto error; - } - /* - * Close the socket created internally by - * lttcomm_create_sock, so we can replace it by the one - * received from sessiond. - */ - if (close(relayd->data_sock.sock.fd)) { - PERROR("close"); - } - - /* Assign new file descriptor */ - relayd->data_sock.sock.fd = fd; + ret = lttcomm_populate_sock_from_open_socket( + &relayd->data_sock.sock, fd, + relayd_socket_protocol); /* Assign version values. */ - relayd->data_sock.major = relayd_sock->major; - relayd->data_sock.minor = relayd_sock->minor; + relayd->data_sock.major = relayd_version_major; + relayd->data_sock.minor = relayd_version_minor; break; default: ERR("Unknown relayd socket type (%d)", sock_type); @@ -3691,6 +3675,11 @@ error: goto error; } + if (ret < 0) { + ret_code = LTTCOMM_CONSUMERD_FATAL; + goto error; + } + DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)", sock_type == LTTNG_STREAM_CONTROL ? "control" : "data", relayd->net_seq_idx, fd); @@ -3747,6 +3736,8 @@ static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id) struct lttng_ht_iter iter; struct consumer_relayd_sock_pair *relayd = NULL; + ASSERT_RCU_READ_LOCKED(); + /* Iterate over all relayd since they are indexed by net_seq_idx. */ cds_lfht_for_each_entry(the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) { @@ -3994,8 +3985,7 @@ end: * Returns 0 on success, < 0 on error */ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, - uint64_t key, uint64_t relayd_id, uint32_t metadata, - struct lttng_consumer_local_data *ctx) + uint64_t key, uint64_t relayd_id) { int ret; struct lttng_consumer_stream *stream; @@ -4011,6 +4001,8 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, struct lttng_dynamic_pointer_array streams_packet_to_open; size_t stream_idx; + ASSERT_RCU_READ_LOCKED(); + DBG("Consumer sample rotate position for channel %" PRIu64, key); lttng_dynamic_array_init(&stream_rotation_positions, @@ -4543,8 +4535,7 @@ void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stre * Perform the rotation a local stream file. */ static -int rotate_local_stream(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream) +int rotate_local_stream(struct lttng_consumer_stream *stream) { int ret = 0; @@ -4583,8 +4574,7 @@ end: * * Return 0 on success, a negative number of error. */ -int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream) +int lttng_consumer_rotate_stream(struct lttng_consumer_stream *stream) { int ret; @@ -4619,7 +4609,7 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, } if (stream->net_seq_idx == (uint64_t) -1ULL) { - ret = rotate_local_stream(ctx, stream); + ret = rotate_local_stream(stream); if (ret < 0) { ERR("Failed to rotate stream, ret = %i", ret); goto error; @@ -4663,13 +4653,15 @@ error: * Returns 0 on success, < 0 on error */ int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, - uint64_t key, struct lttng_consumer_local_data *ctx) + uint64_t key) { int ret; struct lttng_consumer_stream *stream; struct lttng_ht_iter iter; struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht; + ASSERT_RCU_READ_LOCKED(); + rcu_read_lock(); DBG("Consumer rotate ready streams in channel %" PRIu64, key); @@ -4690,7 +4682,7 @@ int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, } DBG("Consumer rotate ready stream %" PRIu64, stream->key); - ret = lttng_consumer_rotate_stream(ctx, stream); + ret = lttng_consumer_rotate_stream(stream); pthread_mutex_unlock(&stream->lock); pthread_mutex_unlock(&stream->chan->lock); if (ret) {