X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.cpp;h=a9de1696e4ce862d52bf58b5c6ab44f73ad76b28;hb=f149493493fbd8a3efa4748832c03278c96c38ca;hp=6166df4ede77b43ac0bddbce4e3ceb94cc602680;hpb=5c5e3d718d8248d5e075f64c34667c4c4617ae63;p=lttng-tools.git diff --git a/src/common/consumer/consumer.cpp b/src/common/consumer/consumer.cpp index 6166df4ed..a9de1696e 100644 --- a/src/common/consumer/consumer.cpp +++ b/src/common/consumer/consumer.cpp @@ -7,43 +7,42 @@ * */ -#include "common/index/ctf-index.h" -#include #define _LGPL_SOURCE +#include #include #include +#include #include #include #include #include #include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include lttng_consumer_global_data the_consumer_data; @@ -53,12 +52,22 @@ enum consumer_channel_action { CONSUMER_CHANNEL_QUIT, }; +namespace { struct consumer_channel_msg { enum consumer_channel_action action; struct lttng_consumer_channel *chan; /* add */ uint64_t key; /* del */ }; +/* + * Global hash table containing respectively metadata and data streams. The + * stream element in this ht should only be updated by the metadata poll thread + * for the metadata and the data poll thread for the data. + */ +struct lttng_ht *metadata_ht; +struct lttng_ht *data_ht; +} /* namespace */ + /* Flag used to temporarily pause data consumption from testpoints. */ int data_consumption_paused; @@ -70,14 +79,6 @@ int data_consumption_paused; */ int consumer_quit; -/* - * Global hash table containing respectively metadata and data streams. The - * stream element in this ht should only be updated by the metadata poll thread - * for the metadata and the data poll thread for the data. - */ -static struct lttng_ht *metadata_ht; -static struct lttng_ht *data_ht; - static const char *get_consumer_domain(void) { switch (the_consumer_data.type) { @@ -663,7 +664,7 @@ static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair( goto error; } - obj = (consumer_relayd_sock_pair *) zmalloc(sizeof(struct consumer_relayd_sock_pair)); + obj = zmalloc(); if (obj == NULL) { PERROR("zmalloc relayd sock"); goto error; @@ -1030,7 +1031,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, } } - channel = (lttng_consumer_channel *) zmalloc(sizeof(*channel)); + channel = zmalloc(); if (channel == NULL) { PERROR("malloc struct lttng_consumer_channel"); goto end; @@ -1426,7 +1427,7 @@ struct lttng_consumer_local_data *lttng_consumer_create( the_consumer_data.type == type); the_consumer_data.type = type; - ctx = (lttng_consumer_local_data *) zmalloc(sizeof(struct lttng_consumer_local_data)); + ctx = zmalloc(); if (ctx == NULL) { PERROR("allocating context"); goto error; @@ -2562,7 +2563,7 @@ void *consumer_thread_data_poll(void *data) health_code_update(); - local_stream = (lttng_consumer_stream **) zmalloc(sizeof(struct lttng_consumer_stream *)); + local_stream = zmalloc(); if (local_stream == NULL) { PERROR("local_stream malloc"); goto end; @@ -2587,18 +2588,14 @@ void *consumer_thread_data_poll(void *data) local_stream = NULL; /* Allocate for all fds */ - pollfd = (struct pollfd *) zmalloc((the_consumer_data.stream_count + - nb_pipes_fd) * - sizeof(struct pollfd)); + pollfd = calloc(the_consumer_data.stream_count + nb_pipes_fd); if (pollfd == NULL) { PERROR("pollfd malloc"); pthread_mutex_unlock(&the_consumer_data.lock); goto end; } - local_stream = (lttng_consumer_stream **) zmalloc((the_consumer_data.stream_count + - nb_pipes_fd) * - sizeof(struct lttng_consumer_stream *)); + local_stream = calloc(the_consumer_data.stream_count + nb_pipes_fd); if (local_stream == NULL) { PERROR("local_stream malloc"); pthread_mutex_unlock(&the_consumer_data.lock); @@ -3404,7 +3401,7 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, */ if (stream->rotate_ready) { DBG("Rotate stream before consuming data"); - ret = lttng_consumer_rotate_stream(ctx, stream); + ret = lttng_consumer_rotate_stream(stream); if (ret < 0) { ERR("Stream rotation error before consuming data"); goto end; @@ -3460,7 +3457,7 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, */ rotation_ret = lttng_consumer_stream_is_rotate_ready(stream); if (rotation_ret == 1) { - rotation_ret = lttng_consumer_rotate_stream(ctx, stream); + rotation_ret = lttng_consumer_rotate_stream(stream); if (rotation_ret < 0) { ret = rotation_ret; ERR("Stream rotation error after consuming data"); @@ -3986,8 +3983,7 @@ end: * Returns 0 on success, < 0 on error */ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, - uint64_t key, uint64_t relayd_id, uint32_t metadata, - struct lttng_consumer_local_data *ctx) + uint64_t key, uint64_t relayd_id) { int ret; struct lttng_consumer_stream *stream; @@ -4537,8 +4533,7 @@ void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stre * Perform the rotation a local stream file. */ static -int rotate_local_stream(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream) +int rotate_local_stream(struct lttng_consumer_stream *stream) { int ret = 0; @@ -4577,8 +4572,7 @@ end: * * Return 0 on success, a negative number of error. */ -int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream) +int lttng_consumer_rotate_stream(struct lttng_consumer_stream *stream) { int ret; @@ -4613,7 +4607,7 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, } if (stream->net_seq_idx == (uint64_t) -1ULL) { - ret = rotate_local_stream(ctx, stream); + ret = rotate_local_stream(stream); if (ret < 0) { ERR("Failed to rotate stream, ret = %i", ret); goto error; @@ -4657,7 +4651,7 @@ error: * Returns 0 on success, < 0 on error */ int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, - uint64_t key, struct lttng_consumer_local_data *ctx) + uint64_t key) { int ret; struct lttng_consumer_stream *stream; @@ -4686,7 +4680,7 @@ int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, } DBG("Consumer rotate ready stream %" PRIu64, stream->key); - ret = lttng_consumer_rotate_stream(ctx, stream); + ret = lttng_consumer_rotate_stream(stream); pthread_mutex_unlock(&stream->lock); pthread_mutex_unlock(&stream->chan->lock); if (ret) {