X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=f2ccf9536b147b72574638046135da3e425b771a;hp=e80ac6be751fbba7eebc98404d520f9ea62c4cd5;hb=b83e03c49920557f292d3861f42d0109e6fa03ea;hpb=02b3d1769d5f8a33e4109b1e681141c9295dfda6 diff --git a/src/common/consumer.c b/src/common/consumer.c index e80ac6be7..f2ccf9536 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -18,6 +18,7 @@ */ #define _GNU_SOURCE +#define _LGPL_SOURCE #include #include #include @@ -34,6 +35,7 @@ #include #include #include +#include #include #include #include @@ -46,6 +48,7 @@ #include "consumer.h" #include "consumer-stream.h" #include "consumer-testpoint.h" +#include "align.h" struct lttng_consumer_global_data consumer_data = { .stream_count = 0, @@ -283,6 +286,17 @@ static void free_channel_rcu(struct rcu_head *head) struct lttng_consumer_channel *channel = caa_container_of(node, struct lttng_consumer_channel, node); + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + lttng_ustconsumer_free_channel(channel); + break; + default: + ERR("Unknown consumer_data type"); + abort(); + } free(channel); } @@ -560,6 +574,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE; stream->index_fd = -1; pthread_mutex_init(&stream->lock, NULL); + pthread_mutex_init(&stream->metadata_timer_lock, NULL); /* If channel is the metadata, flag this stream as metadata. */ if (type == CONSUMER_CHANNEL_TYPE_METADATA) { @@ -933,7 +948,9 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, uint64_t tracefile_count, uint64_t session_id_per_pid, unsigned int monitor, - unsigned int live_timer_interval) + unsigned int live_timer_interval, + const char *root_shm_path, + const char *shm_path) { struct lttng_consumer_channel *channel; @@ -990,6 +1007,15 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, strncpy(channel->name, name, sizeof(channel->name)); channel->name[sizeof(channel->name) - 1] = '\0'; + if (root_shm_path) { + strncpy(channel->root_shm_path, root_shm_path, sizeof(channel->root_shm_path)); + channel->root_shm_path[sizeof(channel->root_shm_path) - 1] = '\0'; + } + if (shm_path) { + strncpy(channel->shm_path, shm_path, sizeof(channel->shm_path)); + channel->shm_path[sizeof(channel->shm_path) - 1] = '\0'; + } + lttng_ht_node_init_u64(&channel->node, channel->key); channel->wait_fd = -1; @@ -1301,12 +1327,6 @@ struct lttng_consumer_local_data *lttng_consumer_create( goto error_quit_pipe; } - ret = pipe(ctx->consumer_thread_pipe); - if (ret < 0) { - PERROR("Error creating thread pipe"); - goto error_thread_pipe; - } - ret = pipe(ctx->consumer_channel_pipe); if (ret < 0) { PERROR("Error creating channel pipe"); @@ -1318,20 +1338,11 @@ struct lttng_consumer_local_data *lttng_consumer_create( goto error_metadata_pipe; } - ret = utils_create_pipe(ctx->consumer_splice_metadata_pipe); - if (ret < 0) { - goto error_splice_pipe; - } - return ctx; -error_splice_pipe: - lttng_pipe_destroy(ctx->consumer_metadata_pipe); error_metadata_pipe: utils_close_pipe(ctx->consumer_channel_pipe); error_channel_pipe: - utils_close_pipe(ctx->consumer_thread_pipe); -error_thread_pipe: utils_close_pipe(ctx->consumer_should_quit); error_quit_pipe: lttng_pipe_destroy(ctx->consumer_wakeup_pipe); @@ -1403,6 +1414,10 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) DBG("Consumer destroying it. Closing everything."); + if (!ctx) { + return; + } + destroy_data_stream_ht(data_ht); destroy_metadata_stream_ht(metadata_ht); @@ -1414,13 +1429,11 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) if (ret) { PERROR("close"); } - utils_close_pipe(ctx->consumer_thread_pipe); utils_close_pipe(ctx->consumer_channel_pipe); lttng_pipe_destroy(ctx->consumer_data_pipe); lttng_pipe_destroy(ctx->consumer_metadata_pipe); lttng_pipe_destroy(ctx->consumer_wakeup_pipe); utils_close_pipe(ctx->consumer_should_quit); - utils_close_pipe(ctx->consumer_splice_metadata_pipe); unlink(ctx->consumer_command_sock_path); free(ctx); @@ -1441,7 +1454,7 @@ static int write_relayd_metadata_id(int fd, ret = lttng_write(fd, (void *) &hdr, sizeof(hdr)); if (ret < sizeof(hdr)) { /* - * This error means that the fd's end is closed so ignore the perror + * This error means that the fd's end is closed so ignore the PERROR * not to clubber the error output since this can happen in a normal * code path. */ @@ -1713,17 +1726,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( goto end; } } - - /* - * Choose right pipe for splice. Metadata and trace data are handled by - * different threads hence the use of two pipes in order not to race or - * corrupt the written data. - */ - if (stream->metadata_flag) { - splice_pipe = ctx->consumer_splice_metadata_pipe; - } else { - splice_pipe = ctx->consumer_thread_pipe; - } + splice_pipe = stream->splice_pipe; /* Write metadata stream id before payload */ if (relayd) { @@ -1829,7 +1832,8 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( /* Splice data out */ ret_splice = splice(splice_pipe[0], NULL, outfd, NULL, ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE); - DBG("Consumer splice pipe to file, ret %zd", ret_splice); + DBG("Consumer splice pipe to file (out_fd: %d), ret %zd", + outfd, ret_splice); if (ret_splice < 0) { ret = errno; written = -ret; @@ -2201,18 +2205,13 @@ void *consumer_thread_metadata_poll(void *data) DBG("Metadata main loop started"); while (1) { - health_code_update(); - - /* Only the metadata pipe is set */ - if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) { - err = 0; /* All is OK */ - goto end; - } - restart: - DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events)); + health_code_update(); health_poll_entry(); + DBG("Metadata poll wait"); ret = lttng_poll_wait(&events, -1); + DBG("Metadata poll return from wait with %d fd(s)", + LTTNG_POLL_GETNB(&events)); health_poll_exit(); DBG("Metadata event catched in thread"); if (ret < 0) { @@ -2220,7 +2219,10 @@ restart: ERR("Poll EINTR catched"); goto restart; } - goto error; + if (LTTNG_POLL_GETNB(&events) == 0) { + err = 0; /* All is OK */ + } + goto end; } nb_fd = ret; @@ -2232,6 +2234,11 @@ restart: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); + if (!revents) { + /* No activity for this FD (poll implementation). */ + continue; + } + if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) { if (revents & (LPOLLERR | LPOLLHUP )) { DBG("Metadata thread pipe hung up"); @@ -2348,7 +2355,6 @@ restart: /* All is OK */ err = 0; -error: end: DBG("Metadata poll thread exiting"); @@ -2771,18 +2777,13 @@ void *consumer_thread_channel_poll(void *data) DBG("Channel main loop started"); while (1) { - health_code_update(); - - /* Only the channel pipe is set */ - if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) { - err = 0; /* All is OK */ - goto end; - } - restart: - DBG("Channel poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events)); + health_code_update(); + DBG("Channel poll wait"); health_poll_entry(); ret = lttng_poll_wait(&events, -1); + DBG("Channel poll return from wait with %d fd(s)", + LTTNG_POLL_GETNB(&events)); health_poll_exit(); DBG("Channel event catched in thread"); if (ret < 0) { @@ -2790,6 +2791,9 @@ restart: ERR("Poll EINTR catched"); goto restart; } + if (LTTNG_POLL_GETNB(&events) == 0) { + err = 0; /* All is OK */ + } goto end; } @@ -2802,10 +2806,11 @@ restart: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); - /* Just don't waste time if no returned events for the fd */ if (!revents) { + /* No activity for this FD (poll implementation). */ continue; } + if (pollfd == ctx->consumer_channel_pipe[0]) { if (revents & (LPOLLERR | LPOLLHUP)) { DBG("Channel thread pipe hung up"); @@ -3568,15 +3573,6 @@ int consumer_data_pending(uint64_t id) */ ret = cds_lfht_is_node_deleted(&stream->node.node); if (!ret) { - /* - * An empty output file is not valid. We need at least one packet - * generated per stream, even if it contains no event, so it - * contains at least one packet header. - */ - if (stream->output_written == 0) { - pthread_mutex_unlock(&stream->lock); - goto data_pending; - } /* Check the stream if there is data in the buffers. */ ret = data_pending(stream); if (ret == 1) { @@ -3679,22 +3675,19 @@ int consumer_send_status_channel(int sock, return lttcomm_send_unix_sock(sock, &msg, sizeof(msg)); } -/* - * Using a maximum stream size with the produced and consumed position of a - * stream, computes the new consumed position to be as close as possible to the - * maximum possible stream size. - * - * If maximum stream size is lower than the possible buffer size (produced - - * consumed), the consumed_pos given is returned untouched else the new value - * is returned. - */ -unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos, - unsigned long produced_pos, uint64_t max_stream_size) +unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos, + unsigned long produced_pos, uint64_t nb_packets_per_stream, + uint64_t max_sb_size) { - if (max_stream_size && max_stream_size < (produced_pos - consumed_pos)) { - /* Offset from the produced position to get the latest buffers. */ - return produced_pos - max_stream_size; - } + unsigned long start_pos; - return consumed_pos; + if (!nb_packets_per_stream) { + return consumed_pos; /* Grab everything */ + } + start_pos = produced_pos - offset_align_floor(produced_pos, max_sb_size); + start_pos -= max_sb_size * nb_packets_per_stream; + if ((long) (start_pos - consumed_pos) < 0) { + return consumed_pos; /* Grab everything */ + } + return start_pos; }