X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=b26cc19f923d23da7806be92069c395b7a99aaa3;hb=337d64f0c475bd9617f2fb86417071b65cae8330;hp=5e1ff896c4e37102bf895819717eda89aefbbba3;hpb=62c43103c60bd704cd8ed7acaaa22465802f5673;p=lttng-tools.git diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 5e1ff896c..b26cc19f9 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -32,6 +32,7 @@ #include #include #include +#include #include #include @@ -1959,6 +1960,45 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } break; } + case LTTNG_CONSUMER_ROTATE_CHANNEL: + { + /* + * Sample the rotate position of all the streams in this channel. + */ + ret = lttng_consumer_rotate_channel(msg.u.rotate_channel.key, + msg.u.rotate_channel.pathname, + msg.u.rotate_channel.relayd_id, + msg.u.rotate_channel.metadata, + msg.u.rotate_channel.new_chunk_id, + ctx); + if (ret < 0) { + ERR("Rotate channel failed"); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } + + health_code_update(); + + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + + /* + * Rotate the streams that are ready right now. + * FIXME: this is a second consecutive iteration over the + * streams in a channel, there is probably a better way to + * handle this, but it needs to be after the + * consumer_send_status_msg() call. + */ + ret = lttng_consumer_rotate_ready_streams( + msg.u.rotate_channel.key, ctx); + if (ret < 0) { + ERR("Rotate channel failed"); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } + break; + } case LTTNG_CONSUMER_ROTATE_RENAME: { DBG("Consumer rename session %" PRIu64 " after rotation", @@ -1970,7 +2010,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.rotate_rename.relayd_id); if (ret < 0) { ERR("Rotate rename failed"); - ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; } health_code_update(); @@ -1982,6 +2022,37 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } break; } + case LTTNG_CONSUMER_ROTATE_PENDING_RELAY: + { + uint32_t pending; + + DBG("Consumer rotate pending on relay for session %" PRIu64, + msg.u.rotate_pending_relay.session_id); + pending = lttng_consumer_rotate_pending_relay( + msg.u.rotate_pending_relay.session_id, + msg.u.rotate_pending_relay.relayd_id, + msg.u.rotate_pending_relay.chunk_id); + if (pending < 0) { + ERR("Rotate pending relay failed"); + ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; + } + + health_code_update(); + + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + + /* Send back returned value to session daemon */ + ret = lttcomm_send_unix_sock(sock, &pending, sizeof(pending)); + if (ret < 0) { + PERROR("send data pending ret code"); + goto error_fatal; + } + break; + } case LTTNG_CONSUMER_MKDIR: { DBG("Consumer mkdir %s in session %" PRIu64, @@ -1993,7 +2064,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.mkdir.relayd_id); if (ret < 0) { ERR("consumer mkdir failed"); - ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; } health_code_update(); @@ -2594,10 +2665,10 @@ end: * Return 0 on success else a negative value. */ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx, bool *rotated) { unsigned long len, subbuf_size, padding; - int err, write_index = 1; + int err, write_index = 1, rotation_ret; long ret = 0; struct ustctl_consumer_stream *ustream; struct ctf_packet_index index; @@ -2629,7 +2700,21 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, readlen = lttng_read(stream->wait_fd, &dummy, 1); if (readlen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { ret = readlen; - goto end; + goto error; + } + } + + /* + * If the stream was flagged to be ready for rotation before we extract the + * next packet, rotate it now. + */ + if (stream->rotate_ready) { + DBG("Rotate stream before extracting data"); + rotation_ret = lttng_consumer_rotate_stream(ctx, stream, rotated); + if (rotation_ret < 0) { + ERR("Stream rotation error"); + ret = -1; + goto error; } } @@ -2644,7 +2729,7 @@ retry: if (stream->metadata_flag) { ret = commit_one_metadata_packet(stream); if (ret <= 0) { - goto end; + goto error; } ustctl_flush_buffer(stream->ustream, 1); goto retry; @@ -2659,7 +2744,7 @@ retry: */ DBG("Reserving sub buffer failed (everything is normal, " "it is due to concurrency) [ret: %d]", err); - goto end; + goto error; } assert(stream->chan->output == CONSUMER_CHANNEL_MMAP); @@ -2669,7 +2754,7 @@ retry: if (ret < 0) { err = ustctl_put_subbuf(ustream); assert(err == 0); - goto end; + goto error; } /* Update the stream's sequence and discarded events count. */ @@ -2678,7 +2763,7 @@ retry: PERROR("kernctl_get_events_discarded"); err = ustctl_put_subbuf(ustream); assert(err == 0); - goto end; + goto error; } } else { write_index = 0; @@ -2696,6 +2781,7 @@ retry: assert(len >= subbuf_size); padding = len - subbuf_size; + /* write the subbuffer to the tracefile */ ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding, &index); /* @@ -2727,13 +2813,13 @@ retry: if (!stream->metadata_flag) { ret = notify_if_more_data(stream, ctx); if (ret < 0) { - goto end; + goto error; } } /* Write index if needed. */ if (!write_index) { - goto end; + goto rotate; } if (stream->chan->live_timer_interval && !stream->metadata_flag) { @@ -2758,17 +2844,35 @@ retry: } if (err < 0) { - goto end; + goto error; } } assert(!stream->metadata_flag); err = consumer_stream_write_index(stream, &index); if (err < 0) { - goto end; + goto error; } -end: +rotate: + /* + * After extracting the packet, we check if the stream is now ready to be + * rotated and perform the action immediately. + */ + rotation_ret = lttng_consumer_stream_is_rotate_ready(stream); + if (rotation_ret == 1) { + rotation_ret = lttng_consumer_rotate_stream(ctx, stream, rotated); + if (rotation_ret < 0) { + ERR("Stream rotation error"); + ret = -1; + goto error; + } + } else if (rotation_ret < 0) { + ERR("Checking if stream is ready to rotate"); + ret = -1; + goto error; + } +error: return ret; }