#define _LGPL_SOURCE
#include <assert.h>
#include <lttng/ust-ctl.h>
+#include <lttng/ust-sigbus.h>
#include <poll.h>
#include <pthread.h>
#include <stdlib.h>
extern struct lttng_consumer_global_data the_consumer_data;
extern int consumer_poll_timeout;
+DEFINE_LTTNG_UST_SIGBUS_STATE();
+
/*
* Free channel object and all streams associated with it. This MUST be used
* only and only if the channel has _NEVER_ been added to the global channel
}
if (!stream->quiescent) {
- lttng_ust_ctl_flush_buffer(stream->ustream, 0);
+ ret = lttng_ust_ctl_flush_buffer(stream->ustream, 0);
+ if (ret) {
+ ERR("Failed to flush buffer while flushing channel: channel key = %" PRIu64 ", channel name = '%s'",
+ chan_key, channel->name);
+ ret = LTTNG_ERR_BUFFER_FLUSH_FAILED;
+ pthread_mutex_unlock(&stream->lock);
+ goto error;
+ }
stream->quiescent = true;
}
next:
* Else, if quiescent, it has already been done by the prior stop.
*/
if (!stream->quiescent) {
- lttng_ust_ctl_flush_buffer(stream->ustream, 0);
+ ret = lttng_ust_ctl_flush_buffer(stream->ustream, 0);
+ if (ret < 0) {
+ ERR("Failed to flush buffer during snapshot of channel: channel key = %" PRIu64 ", channel name = '%s'",
+ channel->key, channel->name);
+ goto error_unlock;
+ }
}
ret = lttng_ustconsumer_take_snapshot(stream);
return ret_func;
}
-void lttng_lttng_ust_ctl_flush_buffer(struct lttng_consumer_stream *stream,
+int lttng_ust_flush_buffer(struct lttng_consumer_stream *stream,
int producer_active)
{
assert(stream);
assert(stream->ustream);
- lttng_ust_ctl_flush_buffer(stream->ustream, producer_active);
+ return lttng_ust_ctl_flush_buffer(stream->ustream, producer_active);
}
/*
return lttng_ust_ctl_snapshot_get_consumed(stream->ustream, pos);
}
-void lttng_ustconsumer_flush_buffer(struct lttng_consumer_stream *stream,
+int lttng_ustconsumer_flush_buffer(struct lttng_consumer_stream *stream,
int producer)
{
assert(stream);
assert(stream->ustream);
- lttng_ust_ctl_flush_buffer(stream->ustream, producer);
+ return lttng_ust_ctl_flush_buffer(stream->ustream, producer);
}
-void lttng_ustconsumer_clear_buffer(struct lttng_consumer_stream *stream)
+int lttng_ustconsumer_clear_buffer(struct lttng_consumer_stream *stream)
{
assert(stream);
assert(stream->ustream);
- lttng_ust_ctl_clear_buffer(stream->ustream);
+ return lttng_ust_ctl_clear_buffer(stream->ustream);
}
int lttng_ustconsumer_get_current_timestamp(
pthread_mutex_lock(&stream->lock);
if (!stream->quiescent) {
- lttng_ust_ctl_flush_buffer(stream->ustream, 0);
- stream->quiescent = true;
+ if (lttng_ust_ctl_flush_buffer(stream->ustream, 0) < 0) {
+ ERR("Failed to flush buffer on stream hang-up");
+ } else {
+ stream->quiescent = true;
+ }
}
pthread_mutex_unlock(&stream->lock);
stream->hangup_flush_done = 1;
* a metadata packet. Since the subbuffer is fully filled (with padding,
* if needed), the stream is "quiescent" after this commit.
*/
- lttng_ust_ctl_flush_buffer(stream->ustream, 1);
- stream->quiescent = true;
+ if (lttng_ust_ctl_flush_buffer(stream->ustream, 1)) {
+ ERR("Failed to flush buffer while commiting one metadata packet");
+ ret = -EIO;
+ } else {
+ stream->quiescent = true;
+ }
end:
pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
return ret;
return ret;
}
-static int get_next_subbuffer(struct lttng_consumer_stream *stream,
+static enum get_next_subbuffer_status get_next_subbuffer(
+ struct lttng_consumer_stream *stream,
struct stream_subbuffer *subbuffer)
{
int ret;
+ enum get_next_subbuffer_status status;
ret = lttng_ust_ctl_get_next_subbuf(stream->ustream);
- if (ret) {
+ switch (ret) {
+ case 0:
+ status = GET_NEXT_SUBBUFFER_STATUS_OK;
+ break;
+ case -ENODATA:
+ case -EAGAIN:
+ /*
+ * The caller only expects -ENODATA when there is no data to
+ * read, but the kernel tracer returns -EAGAIN when there is
+ * currently no data for a non-finalized stream, and -ENODATA
+ * when there is no data for a finalized stream. Those can be
+ * combined into a -ENODATA return value.
+ */
+ status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA;
+ goto end;
+ default:
+ status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
goto end;
}
ret = get_next_subbuffer_common(stream, subbuffer);
if (ret) {
+ status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
goto end;
}
end:
- return ret;
+ return status;
}
-static int get_next_subbuffer_metadata(struct lttng_consumer_stream *stream,
+static enum get_next_subbuffer_status get_next_subbuffer_metadata(
+ struct lttng_consumer_stream *stream,
struct stream_subbuffer *subbuffer)
{
int ret;
bool coherent;
bool buffer_empty;
unsigned long consumed_pos, produced_pos;
+ enum get_next_subbuffer_status status;
do {
ret = lttng_ust_ctl_get_next_subbuf(stream->ustream);
got_subbuffer = false;
if (ret != -EAGAIN) {
/* Fatal error. */
+ status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
goto end;
}
}
if (!got_subbuffer) {
ret = commit_one_metadata_packet(stream);
if (ret < 0 && ret != -ENOBUFS) {
+ status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
goto end;
} else if (ret == 0) {
/* Not an error, the cache is empty. */
cache_empty = true;
- ret = -ENODATA;
+ status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA;
goto end;
} else {
cache_empty = false;
/* Populate sub-buffer infos and view. */
ret = get_next_subbuffer_common(stream, subbuffer);
if (ret) {
+ status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
goto end;
}
* pushed the consumption position yet (on put_next).
*/
PERROR("Failed to take a snapshot of metadata buffer positions");
+ status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
goto end;
}
ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos);
if (ret) {
PERROR("Failed to get metadata consumed position");
+ status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
goto end;
}
ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos);
if (ret) {
PERROR("Failed to get metadata produced position");
+ status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
goto end;
}
coherent = got_subbuffer && cache_empty && buffer_empty;
LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent);
+ status = GET_NEXT_SUBBUFFER_STATUS_OK;
end:
- return ret;
+ return status;
}
static int put_next_subbuffer(struct lttng_consumer_stream *stream,
return lttng_ust_ctl_get_stream_id(stream->ustream, stream_id);
}
+
+void lttng_ustconsumer_sigbus_handle(void *addr)
+{
+ lttng_ust_ctl_sigbus_handle(addr);
+}