X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=6030ca182fd19e5c3078473611ffe1e1ab1c6f8d;hb=02d02e31d47c091a38154c9c188c08387902d97b;hp=ec078fb477a39eec0988c3075d8f3d86cad91ea4;hpb=00fb02ace5151a6546f4e97e5439512913a50e68;p=lttng-tools.git diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index ec078fb47..6030ca182 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -32,6 +32,7 @@ #include #include #include +#include #include #include @@ -184,6 +185,7 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, int key, goto error; } + consumer_stream_update_channel_attributes(stream, channel); stream->chan = channel; error: @@ -1917,6 +1919,47 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } goto end_msg_sessiond; } + case LTTNG_CONSUMER_SET_CHANNEL_ROTATE_PIPE: + { + int channel_rotate_pipe; + int flags; + + ret_code = LTTCOMM_CONSUMERD_SUCCESS; + /* Successfully received the command's type. */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + goto error_fatal; + } + + ret = lttcomm_recv_fds_unix_sock(sock, &channel_rotate_pipe, 1); + if (ret != sizeof(channel_rotate_pipe)) { + ERR("Failed to receive channel rotate pipe"); + goto error_fatal; + } + + DBG("Received channel rotate pipe (%d)", channel_rotate_pipe); + ctx->channel_rotate_pipe = channel_rotate_pipe; + /* Set the pipe as non-blocking. */ + ret = fcntl(channel_rotate_pipe, F_GETFL, 0); + if (ret == -1) { + PERROR("fcntl get flags of the channel rotate pipe"); + goto error_fatal; + } + flags = ret; + + ret = fcntl(channel_rotate_pipe, F_SETFL, flags | O_NONBLOCK); + if (ret == -1) { + PERROR("fcntl set O_NONBLOCK flag of the channel rotate pipe"); + goto error_fatal; + } + DBG("Channel rotate pipe set as non-blocking"); + ret_code = LTTCOMM_CONSUMERD_SUCCESS; + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + goto error_fatal; + } + break; + } case LTTNG_CONSUMER_ROTATE_RENAME: { DBG("Consumer rename session %" PRIu64 " after rotation", @@ -2043,6 +2086,15 @@ void *lttng_ustctl_get_mmap_base(struct lttng_consumer_stream *stream) return ustctl_get_mmap_base(stream->ustream); } +void lttng_ustctl_flush_buffer(struct lttng_consumer_stream *stream, + int producer_active) +{ + assert(stream); + assert(stream->ustream); + + ustctl_flush_buffer(stream->ustream, producer_active); +} + /* * Take a snapshot for a specific stream. * @@ -2543,10 +2595,10 @@ end: * Return 0 on success else a negative value. */ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx, bool *rotated) { unsigned long len, subbuf_size, padding; - int err, write_index = 1; + int err, write_index = 1, rotation_ret; long ret = 0; struct ustctl_consumer_stream *ustream; struct ctf_packet_index index; @@ -2578,7 +2630,21 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, readlen = lttng_read(stream->wait_fd, &dummy, 1); if (readlen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { ret = readlen; - goto end; + goto error; + } + } + + /* + * If the stream was flagged to be ready for rotation before we extract the + * next packet, rotate it now. + */ + if (stream->rotate_ready) { + DBG("Rotate stream before extracting data"); + rotation_ret = lttng_consumer_rotate_stream(ctx, stream, rotated); + if (rotation_ret < 0) { + ERR("Stream rotation error"); + ret = -1; + goto error; } } @@ -2593,7 +2659,7 @@ retry: if (stream->metadata_flag) { ret = commit_one_metadata_packet(stream); if (ret <= 0) { - goto end; + goto error; } ustctl_flush_buffer(stream->ustream, 1); goto retry; @@ -2608,7 +2674,7 @@ retry: */ DBG("Reserving sub buffer failed (everything is normal, " "it is due to concurrency) [ret: %d]", err); - goto end; + goto error; } assert(stream->chan->output == CONSUMER_CHANNEL_MMAP); @@ -2618,7 +2684,7 @@ retry: if (ret < 0) { err = ustctl_put_subbuf(ustream); assert(err == 0); - goto end; + goto error; } /* Update the stream's sequence and discarded events count. */ @@ -2627,7 +2693,7 @@ retry: PERROR("kernctl_get_events_discarded"); err = ustctl_put_subbuf(ustream); assert(err == 0); - goto end; + goto error; } } else { write_index = 0; @@ -2645,6 +2711,7 @@ retry: assert(len >= subbuf_size); padding = len - subbuf_size; + /* write the subbuffer to the tracefile */ ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding, &index); /* @@ -2676,13 +2743,13 @@ retry: if (!stream->metadata_flag) { ret = notify_if_more_data(stream, ctx); if (ret < 0) { - goto end; + goto error; } } /* Write index if needed. */ if (!write_index) { - goto end; + goto rotate; } if (stream->chan->live_timer_interval && !stream->metadata_flag) { @@ -2707,17 +2774,35 @@ retry: } if (err < 0) { - goto end; + goto error; } } assert(!stream->metadata_flag); err = consumer_stream_write_index(stream, &index); if (err < 0) { - goto end; + goto error; } -end: +rotate: + /* + * After extracting the packet, we check if the stream is now ready to be + * rotated and perform the action immediately. + */ + rotation_ret = lttng_consumer_stream_is_rotate_ready(stream); + if (rotation_ret == 1) { + rotation_ret = lttng_consumer_rotate_stream(ctx, stream, rotated); + if (rotation_ret < 0) { + ERR("Stream rotation error"); + ret = -1; + goto error; + } + } else if (rotation_ret < 0) { + ERR("Checking if stream is ready to rotate"); + ret = -1; + goto error; + } +error: return ret; }