#include "../libringbuffer/backend.h"
#include "../libringbuffer/frontend.h"
+#include "../liblttng-ust/wait.h"
+
+/*
+ * Number of milliseconds to retry before failing metadata writes on
+ * buffer full condition. (10 seconds)
+ */
+#define LTTNG_METADATA_TIMEOUT_MSEC 10000
/*
* Channel representation within consumer.
/* initial attributes */
struct ustctl_consumer_channel_attr attr;
+ int wait_fd; /* monitor close() */
+ int wakeup_fd; /* monitor close() */
};
/*
switch (data->type) {
case LTTNG_UST_OBJECT_TYPE_CHANNEL:
+ if (data->u.channel.wakeup_fd >= 0) {
+ ret = close(data->u.channel.wakeup_fd);
+ if (ret < 0) {
+ ret = -errno;
+ return ret;
+ }
+ }
free(data->u.channel.data);
break;
case LTTNG_UST_OBJECT_TYPE_STREAM:
ret = ustcomm_send_unix_sock(sock, bytecode->data,
bytecode->len);
if (ret < 0) {
- if (ret == -ECONNRESET)
- fprintf(stderr, "remote end closed connection\n");
return ret;
}
if (ret != bytecode->len)
enum lttng_ust_chan_type type,
void *data,
uint64_t size,
+ int wakeup_fd,
int send_fd_only)
{
ssize_t len;
return -EIO;
}
+ /* Send wakeup fd */
+ len = ustcomm_send_fds_unix_sock(sock, &wakeup_fd, 1);
+ if (len <= 0) {
+ if (len < 0)
+ return len;
+ else
+ return -EIO;
+ }
return 0;
}
{
struct lttng_ust_object_data *channel_data;
ssize_t len;
+ int wakeup_fd;
int ret;
channel_data = zmalloc(sizeof(*channel_data));
ret = -EINVAL;
goto error_recv_data;
}
-
+ /* recv wakeup fd */
+ len = ustcomm_recv_fds_unix_sock(sock, &wakeup_fd, 1);
+ if (len <= 0) {
+ if (len < 0) {
+ ret = len;
+ goto error_recv_data;
+ } else {
+ ret = -EIO;
+ goto error_recv_data;
+ }
+ }
+ channel_data->u.channel.wakeup_fd = wakeup_fd;
*_channel_data = channel_data;
return 0;
channel_data->u.channel.type,
channel_data->u.channel.data,
channel_data->size,
+ channel_data->u.channel.wakeup_fd,
1);
if (ret)
return ret;
channel->attr.type,
table->objects[0].memory_map,
table->objects[0].memory_map_size,
+ channel->wakeup_fd,
0);
}
0);
}
+int ustctl_write_metadata_to_channel(
+ struct ustctl_consumer_channel *channel,
+ const char *metadata_str, /* NOT null-terminated */
+ size_t len) /* metadata length */
+{
+ struct lttng_ust_lib_ring_buffer_ctx ctx;
+ struct lttng_channel *chan = channel->chan;
+ const char *str = metadata_str;
+ int ret = 0, waitret;
+ size_t reserve_len, pos;
+
+ for (pos = 0; pos < len; pos += reserve_len) {
+ reserve_len = min_t(size_t,
+ chan->ops->packet_avail_size(chan->chan, chan->handle),
+ len - pos);
+ lib_ring_buffer_ctx_init(&ctx, chan->chan, NULL, reserve_len,
+ sizeof(char), -1, chan->handle);
+ /*
+ * We don't care about metadata buffer's records lost
+ * count, because we always retry here. Report error if
+ * we need to bail out after timeout or being
+ * interrupted.
+ */
+ waitret = wait_cond_interruptible_timeout(
+ ({
+ ret = chan->ops->event_reserve(&ctx, 0);
+ ret != -ENOBUFS || !ret;
+ }),
+ LTTNG_METADATA_TIMEOUT_MSEC);
+ if (waitret == -ETIMEDOUT || waitret == -EINTR || ret) {
+ DBG("LTTng: Failure to write metadata to buffers (%s)\n",
+ waitret == -EINTR ? "interrupted" :
+ (ret == -ENOBUFS ? "timeout" : "I/O error"));
+ if (waitret == -EINTR)
+ ret = waitret;
+ goto end;
+ }
+ chan->ops->event_write(&ctx, &str[pos], reserve_len);
+ chan->ops->event_commit(&ctx);
+ }
+end:
+ return ret;
+}
+
+int ustctl_channel_close_wait_fd(struct ustctl_consumer_channel *consumer_chan)
+{
+ struct channel *chan;
+
+ chan = consumer_chan->chan->chan;
+ return ring_buffer_channel_close_wait_fd(&chan->backend.config,
+ chan, chan->handle);
+}
+
+int ustctl_channel_close_wakeup_fd(struct ustctl_consumer_channel *consumer_chan)
+{
+ struct channel *chan;
+
+ chan = consumer_chan->chan->chan;
+ return ring_buffer_channel_close_wakeup_fd(&chan->backend.config,
+ chan, chan->handle);
+}
+
int ustctl_stream_close_wait_fd(struct ustctl_consumer_stream *stream)
{
struct channel *chan;
chan = stream->chan->chan->chan;
- return ring_buffer_close_wait_fd(&chan->backend.config,
+ return ring_buffer_stream_close_wait_fd(&chan->backend.config,
chan, stream->handle, stream->cpu);
}
struct channel *chan;
chan = stream->chan->chan->chan;
- return ring_buffer_close_wakeup_fd(&chan->backend.config,
+ return ring_buffer_stream_close_wakeup_fd(&chan->backend.config,
chan, stream->handle, stream->cpu);
}
free(stream);
}
-int ustctl_get_wait_fd(struct ustctl_consumer_stream *stream)
+int ustctl_channel_get_wait_fd(struct ustctl_consumer_channel *chan)
+{
+ if (!chan)
+ return -EINVAL;
+ return shm_get_wait_fd(chan->chan->handle,
+ &chan->chan->handle->chan._ref);
+}
+
+int ustctl_channel_get_wakeup_fd(struct ustctl_consumer_channel *chan)
+{
+ if (!chan)
+ return -EINVAL;
+ return shm_get_wakeup_fd(chan->chan->handle,
+ &chan->chan->handle->chan._ref);
+}
+
+int ustctl_stream_get_wait_fd(struct ustctl_consumer_stream *stream)
{
struct lttng_ust_lib_ring_buffer *buf;
struct ustctl_consumer_channel *consumer_chan;
return shm_get_wait_fd(consumer_chan->chan->handle, &buf->self._ref);
}
-int ustctl_get_wakeup_fd(struct ustctl_consumer_stream *stream)
+int ustctl_stream_get_wakeup_fd(struct ustctl_consumer_stream *stream)
{
struct lttng_ust_lib_ring_buffer *buf;
struct ustctl_consumer_channel *consumer_chan;
lttng_ring_buffer_metadata_client_init();
lttng_ring_buffer_client_overwrite_init();
lttng_ring_buffer_client_discard_init();
+ lib_ringbuffer_signal_init();
}
static __attribute__((destructor))