X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=8416bc5a3716fb5400cbeb49ae373858a2137144;hp=5c0dce45d27e87337c3c3190551522f122c4b49f;hb=6a00837f8cb0431a2ad90974d67fae138ea97dd5;hpb=fdf9986c9639bbe13935351a6e3c3137e3160383 diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 5c0dce45d..8416bc5a3 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -110,20 +110,21 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, * Returns 0 on success, < 0 on error */ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, - uint64_t relayd_id, struct lttng_consumer_local_data *ctx) + uint64_t relayd_id, uint64_t max_stream_size, + struct lttng_consumer_local_data *ctx) { int ret; unsigned long consumed_pos, produced_pos; struct lttng_consumer_channel *channel; struct lttng_consumer_stream *stream; - DBG("Kernel consumer snapshot channel %lu", key); + DBG("Kernel consumer snapshot channel %" PRIu64, key); rcu_read_lock(); channel = consumer_find_channel(key); if (!channel) { - ERR("No channel found for key %lu", key); + ERR("No channel found for key %" PRIu64, key); ret = -1; goto end; } @@ -203,6 +204,15 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, } } + /* + * The original value is sent back if max stream size is larger than + * the possible size of the snapshot. Also, we asume that the session + * daemon should never send a maximum stream size that is lower than + * subbuffer size. + */ + consumed_pos = consumer_get_consumed_maxsize(consumed_pos, + produced_pos, max_stream_size); + while (consumed_pos < produced_pos) { ssize_t read_len; unsigned long len, padded_len; @@ -346,7 +356,7 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path, ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx); if (ret_read < 0) { if (ret_read != -EPERM) { - ERR("Kernel snapshot reading metadata subbuffer (ret: %ld)", + ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)", ret_read); goto error; } @@ -643,17 +653,38 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Get the right pipe where the stream will be sent. */ if (new_stream->metadata_flag) { + ret = consumer_add_metadata_stream(new_stream); + if (ret) { + ERR("Consumer add metadata stream %" PRIu64 " failed. Continuing", + new_stream->key); + consumer_stream_free(new_stream); + goto end_nosignal; + } stream_pipe = ctx->consumer_metadata_pipe; } else { + ret = consumer_add_data_stream(new_stream); + if (ret) { + ERR("Consumer add stream %" PRIu64 " failed. Continuing", + new_stream->key); + consumer_stream_free(new_stream); + goto end_nosignal; + } stream_pipe = ctx->consumer_data_pipe; } + /* Vitible to other threads */ + new_stream->globally_visible = 1; + ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream)); if (ret < 0) { ERR("Consumer write %s stream to pipe %d", new_stream->metadata_flag ? "metadata" : "data", lttng_pipe_get_writefd(stream_pipe)); - consumer_stream_free(new_stream); + if (new_stream->metadata_flag) { + consumer_del_stream_for_metadata(new_stream); + } else { + consumer_del_stream_for_data(new_stream); + } goto end_nosignal; } @@ -737,7 +768,9 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } else { ret = lttng_kconsumer_snapshot_channel(msg.u.snapshot_channel.key, msg.u.snapshot_channel.pathname, - msg.u.snapshot_channel.relayd_id, ctx); + msg.u.snapshot_channel.relayd_id, + msg.u.snapshot_channel.max_stream_size, + ctx); if (ret < 0) { ERR("Snapshot channel failed"); ret_code = LTTNG_ERR_KERN_CHAN_FAIL; @@ -993,6 +1026,11 @@ int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream) assert(stream); + if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) { + ret = 0; + goto end; + } + ret = kernctl_get_next_subbuf(stream->wait_fd); if (ret == 0) { /* There is still data so let's put back this subbuffer. */