*/
ssize_t lttng_consumer_on_read_subbuffer_mmap(
struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream, unsigned long len,
+ struct lttng_consumer_stream *stream,
+ const char *buffer,
+ unsigned long len,
unsigned long padding,
struct ctf_packet_index *index)
{
- unsigned long mmap_offset;
- void *mmap_base;
ssize_t ret = 0;
off_t orig_offset = stream->out_fd_offset;
/* Default is on the disk */
}
}
- /* get the offset inside the fd to mmap */
- switch (consumer_data.type) {
- case LTTNG_CONSUMER_KERNEL:
- mmap_base = stream->mmap_base;
- ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
- if (ret < 0) {
- PERROR("tracer ctl get_mmap_read_offset");
- goto end;
- }
- break;
- case LTTNG_CONSUMER32_UST:
- case LTTNG_CONSUMER64_UST:
- mmap_base = lttng_ustctl_get_mmap_base(stream);
- if (!mmap_base) {
- ERR("read mmap get mmap base for stream %s", stream->name);
- ret = -EPERM;
- goto end;
- }
- ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset);
- if (ret != 0) {
- PERROR("tracer ctl get_mmap_read_offset");
- ret = -EINVAL;
- goto end;
- }
- break;
- default:
- ERR("Unknown consumer_data type");
- assert(0);
- }
-
/* Handle stream on the relayd if the output is on the network */
if (relayd) {
unsigned long netlen = len;
* This call guarantee that len or less is returned. It's impossible to
* receive a ret value that is bigger than len.
*/
- ret = lttng_write(outfd, mmap_base + mmap_offset, len);
+ ret = lttng_write(outfd, buffer, len);
DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
if (ret < 0 || ((size_t) ret != len)) {
/*
void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx);
ssize_t lttng_consumer_on_read_subbuffer_mmap(
struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream, unsigned long len,
+ struct lttng_consumer_stream *stream,
+ const char *buffer,
+ unsigned long len,
unsigned long padding,
struct ctf_packet_index *index);
ssize_t lttng_consumer_on_read_subbuffer_splice(
*
*/
+#include <stdint.h>
#define _LGPL_SOURCE
#include <assert.h>
#include <poll.h>
return ret;
}
+static
+int get_current_subbuf_addr(struct lttng_consumer_stream *stream,
+ const char **addr)
+{
+ int ret;
+ unsigned long mmap_offset;
+ const char *mmap_base = stream->mmap_base;
+
+ ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
+ if (ret < 0) {
+ PERROR("Failed to get mmap read offset");
+ goto error;
+ }
+
+ *addr = mmap_base + mmap_offset;
+error:
+ return ret;
+}
+
/*
* Take a snapshot of all the stream of a channel
* RCU read-side lock must be held across this function to ensure existence of
while ((long) (consumed_pos - produced_pos) < 0) {
ssize_t read_len;
unsigned long len, padded_len;
+ const char *subbuf_addr;
health_code_update();
goto error_put_subbuf;
}
- read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len,
+ ret = get_current_subbuf_addr(stream, &subbuf_addr);
+ if (ret) {
+ goto error_put_subbuf;
+ }
+
+ read_len = lttng_consumer_on_read_subbuffer_mmap(ctx,
+ stream, subbuf_addr, len,
padded_len - len, NULL);
/*
* We write the padded len in local tracefiles but the data len
}
break;
case CONSUMER_CHANNEL_MMAP:
+ {
+ const char *subbuf_addr;
+
/* Get subbuffer size without padding */
err = kernctl_get_subbuf_size(infd, &subbuf_size);
if (err != 0) {
goto error;
}
+ ret = get_current_subbuf_addr(stream, &subbuf_addr);
+ if (ret) {
+ goto error_put_subbuf;
+ }
+
/* Make sure the tracer is not gone mad on us! */
assert(len >= subbuf_size);
padding = len - subbuf_size;
/* write the subbuffer to the tracefile */
- ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size,
+ ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream,
+ subbuf_addr,
+ subbuf_size,
padding, &index);
/*
* The mmap operation should write subbuf_size amount of data when
write_index = 0;
}
break;
+ }
default:
ERR("Unknown output method");
ret = -EPERM;
}
-
+error_put_subbuf:
err = kernctl_put_next_subbuf(infd);
if (err != 0) {
if (err == -EFAULT) {
*
*/
+#include <stdint.h>
#define _LGPL_SOURCE
#include <assert.h>
#include <lttng/ust-ctl.h>
return ret;
}
+static
+int get_current_subbuf_addr(struct lttng_consumer_stream *stream,
+ const char **addr)
+{
+ int ret;
+ unsigned long mmap_offset;
+ const char *mmap_base;
+
+ mmap_base = ustctl_get_mmap_base(stream->ustream);
+ if (!mmap_base) {
+ ERR("Failed to get mmap base for stream `%s`",
+ stream->name);
+ ret = -EPERM;
+ goto error;
+ }
+
+ ret = ustctl_get_mmap_read_offset(stream->ustream, &mmap_offset);
+ if (ret != 0) {
+ ERR("Failed to get mmap offset for stream `%s`", stream->name);
+ ret = -EINVAL;
+ goto error;
+ }
+
+ *addr = mmap_base + mmap_offset;
+error:
+ return ret;
+
+}
+
/*
* Take a snapshot of all the stream of a channel.
* RCU read-side lock and the channel lock must be held by the caller.
while ((long) (consumed_pos - produced_pos) < 0) {
ssize_t read_len;
unsigned long len, padded_len;
+ const char *subbuf_addr;
health_code_update();
goto error_put_subbuf;
}
- read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len,
+ ret = get_current_subbuf_addr(stream, &subbuf_addr);
+ if (ret) {
+ goto error_put_subbuf;
+ }
+
+ read_len = lttng_consumer_on_read_subbuffer_mmap(ctx,
+ stream, subbuf_addr, len,
padded_len - len, NULL);
if (use_relayd) {
if (read_len != len) {
return ret;
}
-/*
- * Wrapper over the mmap() read offset from ust-ctl library. Since this can be
- * compiled out, we isolate it in this library.
- */
-int lttng_ustctl_get_mmap_read_offset(struct lttng_consumer_stream *stream,
- unsigned long *off)
-{
- assert(stream);
- assert(stream->ustream);
-
- return ustctl_get_mmap_read_offset(stream->ustream, off);
-}
-
-/*
- * Wrapper over the mmap() read offset from ust-ctl library. Since this can be
- * compiled out, we isolate it in this library.
- */
-void *lttng_ustctl_get_mmap_base(struct lttng_consumer_stream *stream)
-{
- assert(stream);
- assert(stream->ustream);
-
- return ustctl_get_mmap_base(stream->ustream);
-}
-
void lttng_ustctl_flush_buffer(struct lttng_consumer_stream *stream,
int producer_active)
{
long ret = 0;
struct ustctl_consumer_stream *ustream;
struct ctf_packet_index index;
+ const char *subbuf_addr;
assert(stream);
assert(stream->ustream);
padding = len - subbuf_size;
+ ret = get_current_subbuf_addr(stream, &subbuf_addr);
+ if (ret) {
+ write_index = 0;
+ goto error_put_subbuf;
+ }
+
/* write the subbuffer to the tracefile */
- ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding, &index);
+ ret = lttng_consumer_on_read_subbuffer_mmap(
+ ctx, stream, subbuf_addr, subbuf_size, padding, &index);
/*
- * The mmap operation should write subbuf_size amount of data when network
- * streaming or the full padding (len) size when we are _not_ streaming.
+ * The mmap operation should write subbuf_size amount of data when
+ * network streaming or the full padding (len) size when we are _not_
+ * streaming.
*/
if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) ||
(ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) {
ret, len, subbuf_size);
write_index = 0;
}
+error_put_subbuf:
err = ustctl_put_next_subbuf(ustream);
assert(err == 0);
void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream);
-int lttng_ustctl_get_mmap_read_offset(struct lttng_consumer_stream *stream,
- unsigned long *off);
-void *lttng_ustctl_get_mmap_base(struct lttng_consumer_stream *stream);
void lttng_ustctl_flush_buffer(struct lttng_consumer_stream *stream,
int producer_active);
int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream,