X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.c;h=64057c785f9512d734ccbd78649214125fad824d;hb=128708c34ee7d054755d110df12940155c2dd781;hp=a1c669894bec5e9472e072c9410817a58399ec41;hpb=5f3aff8bbb7dfaa4aa9eb9234b6f2393c40b69bf;p=lttng-tools.git diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index a1c669894..64057c785 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -1,20 +1,10 @@ /* - * Copyright (C) 2011 - Julien Desfossez - * Mathieu Desnoyers - * 2012 - David Goulet + * Copyright (C) 2011 Julien Desfossez + * Copyright (C) 2011 Mathieu Desnoyers + * Copyright (C) 2012 David Goulet * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License, version 2 only, - * as published by the Free Software Foundation. + * SPDX-License-Identifier: GPL-2.0-only * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along - * with this program; if not, write to the Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #define _LGPL_SOURCE @@ -1680,12 +1670,12 @@ end: */ ssize_t lttng_consumer_on_read_subbuffer_mmap( struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream, unsigned long len, + struct lttng_consumer_stream *stream, + const char *buffer, + unsigned long len, unsigned long padding, struct ctf_packet_index *index) { - unsigned long mmap_offset; - void *mmap_base; ssize_t ret = 0; off_t orig_offset = stream->out_fd_offset; /* Default is on the disk */ @@ -1707,36 +1697,6 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( } } - /* get the offset inside the fd to mmap */ - switch (consumer_data.type) { - case LTTNG_CONSUMER_KERNEL: - mmap_base = stream->mmap_base; - ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset); - if (ret < 0) { - PERROR("tracer ctl get_mmap_read_offset"); - goto end; - } - break; - case LTTNG_CONSUMER32_UST: - case LTTNG_CONSUMER64_UST: - mmap_base = lttng_ustctl_get_mmap_base(stream); - if (!mmap_base) { - ERR("read mmap get mmap base for stream %s", stream->name); - ret = -EPERM; - goto end; - } - ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset); - if (ret != 0) { - PERROR("tracer ctl get_mmap_read_offset"); - ret = -EINVAL; - goto end; - } - break; - default: - ERR("Unknown consumer_data type"); - assert(0); - } - /* Handle stream on the relayd if the output is on the network */ if (relayd) { unsigned long netlen = len; @@ -1813,7 +1773,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( * This call guarantee that len or less is returned. It's impossible to * receive a ret value that is bigger than len. */ - ret = lttng_write(outfd, mmap_base + mmap_offset, len); + ret = lttng_write(outfd, buffer, len); DBG("Consumer mmap write() ret %zd (len %lu)", ret, len); if (ret < 0 || ((size_t) ret != len)) { /* @@ -3963,8 +3923,18 @@ int consumer_flush_buffer(struct lttng_consumer_stream *stream, int producer_act } else { ret = kernctl_buffer_flush_empty(stream->wait_fd); if (ret < 0) { - ERR("Failed to flush kernel stream"); - goto end; + /* + * Doing a buffer flush which does not take into + * account empty packets. This is not perfect, + * but required as a fall-back when + * "flush_empty" is not implemented by + * lttng-modules. + */ + ret = kernctl_buffer_flush(stream->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel stream"); + goto end; + } } } break; @@ -4084,7 +4054,13 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, */ produced_pos = ALIGN_FLOOR(produced_pos, stream->max_sb_size); if (consumed_pos == produced_pos) { + DBG("Set rotate ready for stream %" PRIu64 " produced = %lu consumed = %lu", + stream->key, produced_pos, consumed_pos); stream->rotate_ready = true; + } else { + DBG("Different consumed and produced positions " + "for stream %" PRIu64 " produced = %lu consumed = %lu", + stream->key, produced_pos, consumed_pos); } /* * The rotation position is based on the packet_seq_num of the @@ -4109,6 +4085,8 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, } stream->rotate_position = stream->last_sequence_number + 1 + ((produced_pos - consumed_pos) / stream->max_sb_size); + DBG("Set rotation position for stream %" PRIu64 " at position %" PRIu64, + stream->key, stream->rotate_position); if (!is_local_trace) { /* @@ -4195,7 +4173,7 @@ int consumer_clear_buffer(struct lttng_consumer_stream *stream) case LTTNG_CONSUMER_KERNEL: ret = kernctl_buffer_clear(stream->wait_fd); if (ret < 0) { - ERR("Failed to flush kernel stream"); + ERR("Failed to clear kernel stream (ret = %d)", ret); goto end; } break; @@ -4274,11 +4252,6 @@ error_unlock: pthread_mutex_unlock(&stream->lock); pthread_mutex_unlock(&channel->lock); rcu_read_unlock(); - if (ret) { - goto error; - } - ret = LTTCOMM_CONSUMERD_SUCCESS; -error: return ret; } @@ -4290,6 +4263,11 @@ error: */ int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream) { + DBG("Check is rotate ready for stream %" PRIu64 + " ready %u rotate_position %" PRIu64 + " last_sequence_number %" PRIu64, + stream->key, stream->rotate_ready, + stream->rotate_position, stream->last_sequence_number); if (stream->rotate_ready) { return 1; } @@ -4318,6 +4296,12 @@ int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream) * but consumerd considers rotation ready when reaching the last * packet of the current chunk, hence the "rotate_position - 1". */ + + DBG("Check is rotate ready for stream %" PRIu64 + " last_sequence_number %" PRIu64 + " rotate_position %" PRIu64, + stream->key, stream->last_sequence_number, + stream->rotate_position); if (stream->last_sequence_number >= stream->rotate_position - 1) { return 1; } @@ -4330,6 +4314,8 @@ int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream) */ void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream) { + DBG("lttng_consumer_reset_stream_rotate_state for stream %" PRIu64, + stream->key); stream->rotate_position = -1ULL; stream->rotate_ready = false; } @@ -4582,7 +4568,7 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk( * and LTTNG_CONSUMER_DESTROY_TRACE_CHUNK commands. */ created_chunk = lttng_trace_chunk_create(chunk_id, - chunk_creation_timestamp); + chunk_creation_timestamp, NULL); if (!created_chunk) { ERR("Failed to create trace chunk"); ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;