ret = kernctl_snapshot(infd);
if (ret != 0) {
- errno = -ret;
perror("Getting sub-buffer snapshot.");
+ ret = -errno;
}
return ret;
ret = kernctl_snapshot_get_produced(infd, pos);
if (ret != 0) {
- errno = -ret;
perror("kernctl_snapshot_get_produced");
+ ret = -errno;
}
return ret;
ret = kernctl_snapshot_get_consumed(infd, pos);
if (ret != 0) {
- errno = -ret;
perror("kernctl_snapshot_get_consumed");
+ ret = -errno;
}
return ret;
* Returns 0 on success, < 0 on error
*/
int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
- uint64_t relayd_id, struct lttng_consumer_local_data *ctx)
+ uint64_t relayd_id, uint64_t max_stream_size,
+ struct lttng_consumer_local_data *ctx)
{
int ret;
unsigned long consumed_pos, produced_pos;
struct lttng_consumer_channel *channel;
struct lttng_consumer_stream *stream;
- DBG("Kernel consumer snapshot channel %lu", key);
+ DBG("Kernel consumer snapshot channel %" PRIu64, key);
rcu_read_lock();
channel = consumer_find_channel(key);
if (!channel) {
- ERR("No channel found for key %lu", key);
+ ERR("No channel found for key %" PRIu64, key);
ret = -1;
goto end;
}
stream->out_fd = ret;
stream->tracefile_size_current = 0;
- DBG("Kernel consumer snapshot stream %s/%s (%lu)", path,
- stream->name, stream->key);
+ DBG("Kernel consumer snapshot stream %s/%s (%" PRIu64 ")",
+ path, stream->name, stream->key);
}
ret = kernctl_buffer_flush(stream->wait_fd);
if (ret < 0) {
ERR("Failed to flush kernel stream");
+ ret = -errno;
goto end_unlock;
}
&stream->max_sb_size);
if (ret < 0) {
ERR("Getting kernel max_sb_size");
+ ret = -errno;
goto end_unlock;
}
}
+ /*
+ * The original value is sent back if max stream size is larger than
+ * the possible size of the snapshot. Also, we asume that the session
+ * daemon should never send a maximum stream size that is lower than
+ * subbuffer size.
+ */
+ consumed_pos = consumer_get_consumed_maxsize(consumed_pos,
+ produced_pos, max_stream_size);
+
while (consumed_pos < produced_pos) {
ssize_t read_len;
unsigned long len, padded_len;
if (ret < 0) {
if (errno != EAGAIN) {
PERROR("kernctl_get_subbuf snapshot");
+ ret = -errno;
goto end_unlock;
}
DBG("Kernel consumer get subbuf failed. Skipping it.");
ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
if (ret < 0) {
ERR("Snapshot kernctl_get_subbuf_size");
+ ret = -errno;
goto error_put_subbuf;
}
ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
if (ret < 0) {
ERR("Snapshot kernctl_get_padded_subbuf_size");
+ ret = -errno;
goto error_put_subbuf;
}
ret = kernctl_put_subbuf(stream->wait_fd);
if (ret < 0) {
ERR("Snapshot kernctl_put_subbuf");
+ ret = -errno;
goto end_unlock;
}
consumed_pos += stream->max_sb_size;
}
if (relayd_id == (uint64_t) -1ULL) {
- ret = close(stream->out_fd);
- if (ret < 0) {
- PERROR("Kernel consumer snapshot close out_fd");
- goto end_unlock;
+ if (stream->out_fd >= 0) {
+ ret = close(stream->out_fd);
+ if (ret < 0) {
+ PERROR("Kernel consumer snapshot close out_fd");
+ goto end_unlock;
+ }
+ stream->out_fd = -1;
}
- stream->out_fd = -1;
} else {
close_relayd_stream(stream);
stream->net_seq_idx = (uint64_t) -1ULL;
error_put_subbuf:
ret = kernctl_put_subbuf(stream->wait_fd);
if (ret < 0) {
+ ret = -errno;
ERR("Snapshot kernctl_put_subbuf error path");
}
end_unlock:
do {
ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx);
if (ret_read < 0) {
- if (ret_read != -EPERM) {
- ERR("Kernel snapshot reading metadata subbuffer (ret: %ld)",
+ if (ret_read != -EAGAIN) {
+ ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)",
ret_read);
goto error;
}
close_relayd_stream(metadata_stream);
metadata_stream->net_seq_idx = (uint64_t) -1ULL;
} else {
- ret = close(metadata_stream->out_fd);
- if (ret < 0) {
- PERROR("Kernel consumer snapshot metadata close out_fd");
- /*
- * Don't go on error here since the snapshot was successful at this
- * point but somehow the close failed.
- */
+ if (metadata_stream->out_fd >= 0) {
+ ret = close(metadata_stream->out_fd);
+ if (ret < 0) {
+ PERROR("Kernel consumer snapshot metadata close out_fd");
+ /*
+ * Don't go on error here since the snapshot was successful at this
+ * point but somehow the close failed.
+ */
+ }
+ metadata_stream->out_fd = -1;
}
- metadata_stream->out_fd = -1;
}
ret = 0;
ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
if (ret != sizeof(msg)) {
- lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
if (ret > 0) {
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
ret = -1;
}
return ret;
/* Get the right pipe where the stream will be sent. */
if (new_stream->metadata_flag) {
+ ret = consumer_add_metadata_stream(new_stream);
+ if (ret) {
+ ERR("Consumer add metadata stream %" PRIu64 " failed. Continuing",
+ new_stream->key);
+ consumer_stream_free(new_stream);
+ goto end_nosignal;
+ }
stream_pipe = ctx->consumer_metadata_pipe;
} else {
+ ret = consumer_add_data_stream(new_stream);
+ if (ret) {
+ ERR("Consumer add stream %" PRIu64 " failed. Continuing",
+ new_stream->key);
+ consumer_stream_free(new_stream);
+ goto end_nosignal;
+ }
stream_pipe = ctx->consumer_data_pipe;
}
+ /* Vitible to other threads */
+ new_stream->globally_visible = 1;
+
ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream));
if (ret < 0) {
ERR("Consumer write %s stream to pipe %d",
new_stream->metadata_flag ? "metadata" : "data",
lttng_pipe_get_writefd(stream_pipe));
- consumer_stream_free(new_stream);
+ if (new_stream->metadata_flag) {
+ consumer_del_stream_for_metadata(new_stream);
+ } else {
+ consumer_del_stream_for_data(new_stream);
+ }
goto end_nosignal;
}
} else {
ret = lttng_kconsumer_snapshot_channel(msg.u.snapshot_channel.key,
msg.u.snapshot_channel.pathname,
- msg.u.snapshot_channel.relayd_id, ctx);
+ msg.u.snapshot_channel.relayd_id,
+ msg.u.snapshot_channel.max_stream_size,
+ ctx);
if (ret < 0) {
ERR("Snapshot channel failed");
ret_code = LTTNG_ERR_KERN_CHAN_FAIL;
/* Get the next subbuffer */
err = kernctl_get_next_subbuf(infd);
if (err != 0) {
- ret = err;
/*
* This is a debug message even for single-threaded consumer,
* because poll() have more relaxed criterions than get subbuf,
*/
DBG("Reserving sub buffer failed (everything is normal, "
"it is due to concurrency)");
+ ret = -errno;
goto end;
}
/* Get the full subbuffer size including padding */
err = kernctl_get_padded_subbuf_size(infd, &len);
if (err != 0) {
- errno = -err;
perror("Getting sub-buffer len failed.");
- ret = err;
+ ret = -errno;
goto end;
}
/* Get subbuffer size without padding */
err = kernctl_get_subbuf_size(infd, &subbuf_size);
if (err != 0) {
- errno = -err;
perror("Getting sub-buffer len failed.");
- ret = err;
+ ret = -errno;
goto end;
}
break;
default:
ERR("Unknown output method");
- ret = -1;
+ ret = -EPERM;
}
err = kernctl_put_next_subbuf(infd);
if (err != 0) {
- errno = -err;
if (errno == EFAULT) {
perror("Error in unreserving sub buffer\n");
} else if (errno == EIO) {
/* Should never happen with newer LTTng versions */
perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
-
- ret = -err;
+ ret = -errno;
goto end;
}
ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
if (ret != 0) {
- errno = -ret;
PERROR("kernctl_get_mmap_len");
+ ret = -errno;
goto error_close_fd;
}
stream->mmap_len = (size_t) mmap_len;
assert(stream);
+ if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
+ ret = 0;
+ goto end;
+ }
+
ret = kernctl_get_next_subbuf(stream->wait_fd);
if (ret == 0) {
/* There is still data so let's put back this subbuffer. */