+ memset(&lum, 0, sizeof(lum));
+ lum.handle = session_handle;
+ lum.cmd = LTTNG_UST_CHANNEL;
+ lum.u.channel.len = channel_data->size;
+ lum.u.channel.type = channel_data->u.channel.type;
+ ret = ustcomm_send_app_msg(sock, &lum);
+ if (ret)
+ return ret;
+
+ ret = ustctl_send_channel(sock,
+ channel_data->u.channel.type,
+ channel_data->u.channel.data,
+ channel_data->size,
+ channel_data->u.channel.wakeup_fd,
+ 1);
+ if (ret)
+ return ret;
+ ret = ustcomm_recv_app_reply(sock, &lur, lum.handle, lum.cmd);
+ if (!ret) {
+ channel_data->handle = lur.ret_val;
+ }
+ return ret;
+}
+
+int ustctl_send_stream_to_ust(int sock,
+ struct lttng_ust_object_data *channel_data,
+ struct lttng_ust_object_data *stream_data)
+{
+ struct ustcomm_ust_msg lum;
+ struct ustcomm_ust_reply lur;
+ int ret;
+
+ memset(&lum, 0, sizeof(lum));
+ lum.handle = channel_data->handle;
+ lum.cmd = LTTNG_UST_STREAM;
+ lum.u.stream.len = stream_data->size;
+ lum.u.stream.stream_nr = stream_data->u.stream.stream_nr;
+ ret = ustcomm_send_app_msg(sock, &lum);
+ if (ret)
+ return ret;
+
+ assert(stream_data);
+ assert(stream_data->type == LTTNG_UST_OBJECT_TYPE_STREAM);
+
+ ret = ustctl_send_stream(sock,
+ stream_data->u.stream.stream_nr,
+ stream_data->size,
+ stream_data->u.stream.shm_fd,
+ stream_data->u.stream.wakeup_fd, 1);
+ if (ret)
+ return ret;
+ return ustcomm_recv_app_reply(sock, &lur, lum.handle, lum.cmd);
+}
+
+int ustctl_duplicate_ust_object_data(struct lttng_ust_object_data **dest,
+ struct lttng_ust_object_data *src)
+{
+ struct lttng_ust_object_data *obj;
+ int ret;
+
+ if (src->handle != -1) {
+ ret = -EINVAL;
+ goto error;
+ }
+
+ obj = zmalloc(sizeof(*obj));
+ if (!obj) {
+ ret = -ENOMEM;
+ goto error;
+ }
+
+ obj->type = src->type;
+ obj->handle = src->handle;
+ obj->size = src->size;
+
+ switch (obj->type) {
+ case LTTNG_UST_OBJECT_TYPE_CHANNEL:
+ {
+ obj->u.channel.type = src->u.channel.type;
+ if (src->u.channel.wakeup_fd >= 0) {
+ obj->u.channel.wakeup_fd =
+ dup(src->u.channel.wakeup_fd);
+ if (obj->u.channel.wakeup_fd < 0) {
+ ret = errno;
+ goto chan_error_wakeup_fd;
+ }
+ } else {
+ obj->u.channel.wakeup_fd =
+ src->u.channel.wakeup_fd;
+ }
+ obj->u.channel.data = zmalloc(obj->size);
+ if (!obj->u.channel.data) {
+ ret = -ENOMEM;
+ goto chan_error_alloc;
+ }
+ memcpy(obj->u.channel.data, src->u.channel.data, obj->size);
+ break;
+
+ chan_error_alloc:
+ if (src->u.channel.wakeup_fd >= 0) {
+ int closeret;
+
+ closeret = close(obj->u.channel.wakeup_fd);
+ if (closeret) {
+ PERROR("close");
+ }
+ }
+ chan_error_wakeup_fd:
+ goto error_type;
+
+ }
+
+ case LTTNG_UST_OBJECT_TYPE_STREAM:
+ {
+ obj->u.stream.stream_nr = src->u.stream.stream_nr;
+ if (src->u.stream.wakeup_fd >= 0) {
+ obj->u.stream.wakeup_fd =
+ dup(src->u.stream.wakeup_fd);
+ if (obj->u.stream.wakeup_fd < 0) {
+ ret = errno;
+ goto stream_error_wakeup_fd;
+ }
+ } else {
+ obj->u.stream.wakeup_fd =
+ src->u.stream.wakeup_fd;
+ }
+
+ if (src->u.stream.shm_fd >= 0) {
+ obj->u.stream.shm_fd =
+ dup(src->u.stream.shm_fd);
+ if (obj->u.stream.shm_fd < 0) {
+ ret = errno;
+ goto stream_error_shm_fd;
+ }
+ } else {
+ obj->u.stream.shm_fd =
+ src->u.stream.shm_fd;
+ }
+ break;
+
+ stream_error_shm_fd:
+ if (src->u.stream.wakeup_fd >= 0) {
+ int closeret;
+
+ closeret = close(obj->u.stream.wakeup_fd);
+ if (closeret) {
+ PERROR("close");
+ }
+ }
+ stream_error_wakeup_fd:
+ goto error_type;
+ }
+
+ default:
+ ret = -EINVAL;
+ goto error_type;
+ }
+
+ *dest = obj;
+ return 0;
+
+error_type:
+ free(obj);
+error:
+ return ret;
+}
+
+
+/* Buffer operations */
+
+struct ustctl_consumer_channel *
+ ustctl_create_channel(struct ustctl_consumer_channel_attr *attr)
+{
+ struct ustctl_consumer_channel *chan;
+ const char *transport_name;
+ struct lttng_transport *transport;
+
+ switch (attr->type) {
+ case LTTNG_UST_CHAN_PER_CPU:
+ if (attr->output == LTTNG_UST_MMAP) {
+ if (attr->overwrite) {
+ if (attr->read_timer_interval == 0) {
+ transport_name = "relay-overwrite-mmap";
+ } else {
+ transport_name = "relay-overwrite-rt-mmap";
+ }
+ } else {
+ if (attr->read_timer_interval == 0) {
+ transport_name = "relay-discard-mmap";
+ } else {
+ transport_name = "relay-discard-rt-mmap";
+ }
+ }
+ } else {
+ return NULL;
+ }
+ break;
+ case LTTNG_UST_CHAN_METADATA:
+ if (attr->output == LTTNG_UST_MMAP)
+ transport_name = "relay-metadata-mmap";
+ else
+ return NULL;
+ break;
+ default:
+ transport_name = "<unknown>";
+ return NULL;
+ }
+
+ transport = lttng_transport_find(transport_name);
+ if (!transport) {
+ DBG("LTTng transport %s not found\n",
+ transport_name);
+ return NULL;
+ }
+
+ chan = zmalloc(sizeof(*chan));
+ if (!chan)
+ return NULL;
+
+ chan->chan = transport->ops.channel_create(transport_name, NULL,
+ attr->subbuf_size, attr->num_subbuf,
+ attr->switch_timer_interval,
+ attr->read_timer_interval,
+ attr->uuid, attr->chan_id);
+ if (!chan->chan) {
+ goto chan_error;
+ }
+ chan->chan->ops = &transport->ops;
+ memcpy(&chan->attr, attr, sizeof(chan->attr));
+ chan->wait_fd = ustctl_channel_get_wait_fd(chan);
+ chan->wakeup_fd = ustctl_channel_get_wakeup_fd(chan);
+ return chan;
+
+chan_error:
+ free(chan);
+ return NULL;
+}
+
+void ustctl_destroy_channel(struct ustctl_consumer_channel *chan)
+{
+ (void) ustctl_channel_close_wait_fd(chan);
+ (void) ustctl_channel_close_wakeup_fd(chan);
+ chan->chan->ops->channel_destroy(chan->chan);
+ free(chan);
+}
+
+int ustctl_send_channel_to_sessiond(int sock,
+ struct ustctl_consumer_channel *channel)
+{
+ struct shm_object_table *table;
+
+ table = channel->chan->handle->table;
+ if (table->size <= 0)
+ return -EINVAL;
+ return ustctl_send_channel(sock,
+ channel->attr.type,
+ table->objects[0].memory_map,
+ table->objects[0].memory_map_size,
+ channel->wakeup_fd,
+ 0);
+}
+
+int ustctl_send_stream_to_sessiond(int sock,
+ struct ustctl_consumer_stream *stream)
+{
+ if (!stream)
+ return ustctl_send_stream(sock, -1U, -1U, -1, -1, 0);
+
+ return ustctl_send_stream(sock,
+ stream->cpu,
+ stream->memory_map_size,
+ stream->shm_fd, stream->wakeup_fd,
+ 0);
+}
+
+int ustctl_write_metadata_to_channel(
+ struct ustctl_consumer_channel *channel,
+ const char *metadata_str, /* NOT null-terminated */
+ size_t len) /* metadata length */
+{
+ struct lttng_ust_lib_ring_buffer_ctx ctx;
+ struct lttng_channel *chan = channel->chan;
+ const char *str = metadata_str;
+ int ret = 0, waitret;
+ size_t reserve_len, pos;
+
+ for (pos = 0; pos < len; pos += reserve_len) {
+ reserve_len = min_t(size_t,
+ chan->ops->packet_avail_size(chan->chan, chan->handle),
+ len - pos);
+ lib_ring_buffer_ctx_init(&ctx, chan->chan, NULL, reserve_len,
+ sizeof(char), -1, chan->handle);
+ /*
+ * We don't care about metadata buffer's records lost
+ * count, because we always retry here. Report error if
+ * we need to bail out after timeout or being
+ * interrupted.
+ */
+ waitret = wait_cond_interruptible_timeout(
+ ({
+ ret = chan->ops->event_reserve(&ctx, 0);
+ ret != -ENOBUFS || !ret;
+ }),
+ LTTNG_METADATA_TIMEOUT_MSEC);
+ if (waitret == -ETIMEDOUT || waitret == -EINTR || ret) {
+ DBG("LTTng: Failure to write metadata to buffers (%s)\n",
+ waitret == -EINTR ? "interrupted" :
+ (ret == -ENOBUFS ? "timeout" : "I/O error"));
+ if (waitret == -EINTR)
+ ret = waitret;
+ goto end;
+ }
+ chan->ops->event_write(&ctx, &str[pos], reserve_len);
+ chan->ops->event_commit(&ctx);
+ }
+end:
+ return ret;
+}
+
+/*
+ * Write at most one packet in the channel.
+ * Returns the number of bytes written on success, < 0 on error.
+ */
+ssize_t ustctl_write_one_packet_to_channel(
+ struct ustctl_consumer_channel *channel,
+ const char *metadata_str, /* NOT null-terminated */
+ size_t len) /* metadata length */
+{
+ struct lttng_ust_lib_ring_buffer_ctx ctx;
+ struct lttng_channel *chan = channel->chan;
+ const char *str = metadata_str;
+ ssize_t reserve_len;
+ int ret;
+
+ reserve_len = min_t(ssize_t,
+ chan->ops->packet_avail_size(chan->chan, chan->handle),
+ len);
+ lib_ring_buffer_ctx_init(&ctx, chan->chan, NULL, reserve_len,
+ sizeof(char), -1, chan->handle);
+ ret = chan->ops->event_reserve(&ctx, 0);
+ if (ret != 0) {
+ DBG("LTTng: event reservation failed");
+ assert(ret < 0);
+ reserve_len = ret;
+ goto end;
+ }
+ chan->ops->event_write(&ctx, str, reserve_len);
+ chan->ops->event_commit(&ctx);
+
+end:
+ return reserve_len;
+}
+
+int ustctl_channel_close_wait_fd(struct ustctl_consumer_channel *consumer_chan)
+{
+ struct channel *chan;
+ int ret;
+
+ chan = consumer_chan->chan->chan;
+ ret = ring_buffer_channel_close_wait_fd(&chan->backend.config,
+ chan, chan->handle);
+ if (!ret)
+ consumer_chan->wait_fd = -1;
+ return ret;
+}
+
+int ustctl_channel_close_wakeup_fd(struct ustctl_consumer_channel *consumer_chan)
+{
+ struct channel *chan;
+ int ret;
+
+ chan = consumer_chan->chan->chan;
+ ret = ring_buffer_channel_close_wakeup_fd(&chan->backend.config,
+ chan, chan->handle);
+ if (!ret)
+ consumer_chan->wakeup_fd = -1;
+ return ret;
+}
+
+int ustctl_stream_close_wait_fd(struct ustctl_consumer_stream *stream)
+{
+ struct channel *chan;
+
+ chan = stream->chan->chan->chan;
+ return ring_buffer_stream_close_wait_fd(&chan->backend.config,
+ chan, stream->handle, stream->cpu);
+}
+
+int ustctl_stream_close_wakeup_fd(struct ustctl_consumer_stream *stream)
+{
+ struct channel *chan;
+
+ chan = stream->chan->chan->chan;
+ return ring_buffer_stream_close_wakeup_fd(&chan->backend.config,
+ chan, stream->handle, stream->cpu);
+}
+
+struct ustctl_consumer_stream *
+ ustctl_create_stream(struct ustctl_consumer_channel *channel,
+ int cpu)
+{
+ struct ustctl_consumer_stream *stream;
+ struct lttng_ust_shm_handle *handle;
+ struct channel *chan;
+ int shm_fd, wait_fd, wakeup_fd;
+ uint64_t memory_map_size;
+ struct lttng_ust_lib_ring_buffer *buf;
+ int ret;
+
+ if (!channel)
+ return NULL;
+ handle = channel->chan->handle;
+ if (!handle)
+ return NULL;
+
+ chan = channel->chan->chan;
+ buf = channel_get_ring_buffer(&chan->backend.config,
+ chan, cpu, handle, &shm_fd, &wait_fd,
+ &wakeup_fd, &memory_map_size);
+ if (!buf)
+ return NULL;
+ ret = lib_ring_buffer_open_read(buf, handle);
+ if (ret)
+ return NULL;
+
+ stream = zmalloc(sizeof(*stream));
+ if (!stream)
+ goto alloc_error;
+ stream->handle = handle;
+ stream->buf = buf;
+ stream->chan = channel;
+ stream->shm_fd = shm_fd;
+ stream->wait_fd = wait_fd;
+ stream->wakeup_fd = wakeup_fd;
+ stream->memory_map_size = memory_map_size;
+ stream->cpu = cpu;
+ return stream;
+
+alloc_error:
+ return NULL;
+}
+
+void ustctl_destroy_stream(struct ustctl_consumer_stream *stream)
+{
+ struct lttng_ust_lib_ring_buffer *buf;
+ struct ustctl_consumer_channel *consumer_chan;
+
+ assert(stream);
+ buf = stream->buf;
+ consumer_chan = stream->chan;
+ (void) ustctl_stream_close_wait_fd(stream);
+ (void) ustctl_stream_close_wakeup_fd(stream);
+ lib_ring_buffer_release_read(buf, consumer_chan->chan->handle);
+ free(stream);
+}
+
+int ustctl_channel_get_wait_fd(struct ustctl_consumer_channel *chan)
+{
+ if (!chan)
+ return -EINVAL;
+ return shm_get_wait_fd(chan->chan->handle,
+ &chan->chan->handle->chan._ref);
+}
+
+int ustctl_channel_get_wakeup_fd(struct ustctl_consumer_channel *chan)
+{
+ if (!chan)
+ return -EINVAL;
+ return shm_get_wakeup_fd(chan->chan->handle,
+ &chan->chan->handle->chan._ref);
+}
+
+int ustctl_stream_get_wait_fd(struct ustctl_consumer_stream *stream)
+{
+ struct lttng_ust_lib_ring_buffer *buf;
+ struct ustctl_consumer_channel *consumer_chan;
+
+ if (!stream)
+ return -EINVAL;
+ buf = stream->buf;
+ consumer_chan = stream->chan;
+ return shm_get_wait_fd(consumer_chan->chan->handle, &buf->self._ref);
+}
+
+int ustctl_stream_get_wakeup_fd(struct ustctl_consumer_stream *stream)
+{
+ struct lttng_ust_lib_ring_buffer *buf;
+ struct ustctl_consumer_channel *consumer_chan;
+
+ if (!stream)
+ return -EINVAL;
+ buf = stream->buf;
+ consumer_chan = stream->chan;
+ return shm_get_wakeup_fd(consumer_chan->chan->handle, &buf->self._ref);
+}
+
+/* For mmap mode, readable without "get" operation */
+
+void *ustctl_get_mmap_base(struct ustctl_consumer_stream *stream)
+{
+ struct lttng_ust_lib_ring_buffer *buf;
+ struct ustctl_consumer_channel *consumer_chan;
+
+ if (!stream)
+ return NULL;
+ buf = stream->buf;
+ consumer_chan = stream->chan;
+ return shmp(consumer_chan->chan->handle, buf->backend.memory_map);
+}
+
+/* returns the length to mmap. */
+int ustctl_get_mmap_len(struct ustctl_consumer_stream *stream,
+ unsigned long *len)
+{
+ struct ustctl_consumer_channel *consumer_chan;
+ unsigned long mmap_buf_len;
+ struct channel *chan;
+
+ if (!stream)
+ return -EINVAL;
+ consumer_chan = stream->chan;
+ chan = consumer_chan->chan->chan;
+ if (chan->backend.config.output != RING_BUFFER_MMAP)
+ return -EINVAL;
+ mmap_buf_len = chan->backend.buf_size;
+ if (chan->backend.extra_reader_sb)
+ mmap_buf_len += chan->backend.subbuf_size;
+ if (mmap_buf_len > INT_MAX)
+ return -EFBIG;
+ *len = mmap_buf_len;
+ return 0;
+}
+
+/* returns the maximum size for sub-buffers. */
+int ustctl_get_max_subbuf_size(struct ustctl_consumer_stream *stream,
+ unsigned long *len)
+{
+ struct ustctl_consumer_channel *consumer_chan;
+ struct channel *chan;
+
+ if (!stream)
+ return -EINVAL;
+ consumer_chan = stream->chan;
+ chan = consumer_chan->chan->chan;
+ *len = chan->backend.subbuf_size;
+ return 0;