#include "consumer.h"
#include "consumer-stream.h"
+#include "consumer-testpoint.h"
struct lttng_consumer_global_data consumer_data = {
.stream_count = 0,
return ret;
}
+/*
+ * Find a relayd and send the streams sent message
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
+{
+ int ret = 0;
+ struct consumer_relayd_sock_pair *relayd;
+
+ assert(net_seq_idx != -1ULL);
+
+ /* The stream is not metadata. Get relayd reference if exists. */
+ rcu_read_lock();
+ relayd = consumer_find_relayd(net_seq_idx);
+ if (relayd != NULL) {
+ /* Add stream on the relayd */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_streams_sent(&relayd->control_sock);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ goto end;
+ }
+ } else {
+ ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.",
+ net_seq_idx);
+ ret = -1;
+ goto end;
+ }
+
+ ret = 0;
+ DBG("All streams sent relayd id %" PRIu64, net_seq_idx);
+
+end:
+ rcu_read_unlock();
+ return ret;
+}
+
/*
* Find a relayd and close the stream
*/
return NULL;
}
+/*
+ * Iterate over all streams of the hashtable and free them properly.
+ */
+static void destroy_data_stream_ht(struct lttng_ht *ht)
+{
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_stream *stream;
+
+ if (ht == NULL) {
+ return;
+ }
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
+ /*
+ * Ignore return value since we are currently cleaning up so any error
+ * can't be handled.
+ */
+ (void) consumer_del_stream(stream, ht);
+ }
+ rcu_read_unlock();
+
+ lttng_ht_destroy(ht);
+}
+
+/*
+ * Iterate over all streams of the metadata hashtable and free them
+ * properly.
+ */
+static void destroy_metadata_stream_ht(struct lttng_ht *ht)
+{
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_stream *stream;
+
+ if (ht == NULL) {
+ return;
+ }
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
+ /*
+ * Ignore return value since we are currently cleaning up so any error
+ * can't be handled.
+ */
+ (void) consumer_del_metadata_stream(stream, ht);
+ }
+ rcu_read_unlock();
+
+ lttng_ht_destroy(ht);
+}
+
/*
* Close all fds associated with the instance and free the context.
*/
DBG("Consumer destroying it. Closing everything.");
+ destroy_data_stream_ht(data_ht);
+ destroy_metadata_stream_ht(metadata_ht);
+
ret = close(ctx->consumer_error_socket);
if (ret) {
PERROR("close");
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len,
unsigned long padding,
- struct lttng_packet_index *index)
+ struct ctf_packet_index *index)
{
unsigned long mmap_offset;
void *mmap_base;
}
}
- while (len > 0) {
- ret = lttng_write(outfd, mmap_base + mmap_offset, len);
- DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
- if (ret < len) {
- /*
- * This is possible if the fd is closed on the other side (outfd)
- * or any write problem. It can be verbose a bit for a normal
- * execution if for instance the relayd is stopped abruptly. This
- * can happen so set this to a DBG statement.
- */
- DBG("Error in file write mmap");
- if (written == 0) {
- written = -errno;
- }
- /* Socket operation failed. We consider the relayd dead */
- if (errno == EPIPE || errno == EINVAL) {
- relayd_hang_up = 1;
- goto write_error;
- }
- goto end;
- } else if (ret > len) {
- PERROR("Error in file write (ret %zd > len %lu)", ret, len);
- written += ret;
- goto end;
+ /*
+ * This call guarantee that len or less is returned. It's impossible to
+ * receive a ret value that is bigger than len.
+ */
+ ret = lttng_write(outfd, mmap_base + mmap_offset, len);
+ DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
+ if (ret < 0 || ((size_t) ret != len)) {
+ /*
+ * Report error to caller if nothing was written else at least send the
+ * amount written.
+ */
+ if (ret < 0) {
+ written = -errno;
} else {
- len -= ret;
- mmap_offset += ret;
+ written = ret;
}
- /* This call is useless on a socket so better save a syscall. */
- if (!relayd) {
- /* This won't block, but will start writeout asynchronously */
- lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
- SYNC_FILE_RANGE_WRITE);
- stream->out_fd_offset += ret;
+ /* Socket operation failed. We consider the relayd dead */
+ if (errno == EPIPE || errno == EINVAL) {
+ /*
+ * This is possible if the fd is closed on the other side
+ * (outfd) or any write problem. It can be verbose a bit for a
+ * normal execution if for instance the relayd is stopped
+ * abruptly. This can happen so set this to a DBG statement.
+ */
+ DBG("Consumer mmap write detected relayd hang up");
+ relayd_hang_up = 1;
+ goto write_error;
}
- stream->output_written += ret;
- written += ret;
+
+ /* Unhandled error, print it and stop function right now. */
+ PERROR("Error in write mmap (ret %zd != len %lu)", ret, len);
+ goto end;
+ }
+ stream->output_written += ret;
+ written = ret;
+
+ /* This call is useless on a socket so better save a syscall. */
+ if (!relayd) {
+ /* This won't block, but will start writeout asynchronously */
+ lttng_sync_file_range(outfd, stream->out_fd_offset, len,
+ SYNC_FILE_RANGE_WRITE);
+ stream->out_fd_offset += len;
}
lttng_consumer_sync_trace_file(stream, orig_offset);
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len,
unsigned long padding,
- struct lttng_packet_index *index)
+ struct ctf_packet_index *index)
{
ssize_t ret = 0, written = 0, ret_splice = 0;
loff_t offset = 0;
SPLICE_F_MOVE | SPLICE_F_MORE);
DBG("splice chan to pipe, ret %zd", ret_splice);
if (ret_splice < 0) {
- PERROR("Error in relay splice");
+ ret = errno;
if (written == 0) {
written = ret_splice;
}
- ret = errno;
+ PERROR("Error in relay splice");
goto splice_error;
}
ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
DBG("Consumer splice pipe to file, ret %zd", ret_splice);
if (ret_splice < 0) {
- PERROR("Error in file splice");
+ ret = errno;
if (written == 0) {
written = ret_splice;
}
/* Socket operation failed. We consider the relayd dead */
- if (errno == EBADF || errno == EPIPE) {
+ if (errno == EBADF || errno == EPIPE || errno == ESPIPE) {
WARN("Remote relayd disconnected. Stopping");
relayd_hang_up = 1;
goto write_error;
}
- ret = errno;
+ PERROR("Error in file splice");
goto splice_error;
} else if (ret_splice > len) {
- errno = EINVAL;
- PERROR("Wrote more data than requested %zd (len: %lu)",
- ret_splice, len);
+ /*
+ * We don't expect this code path to be executed but you never know
+ * so this is an extra protection agains a buggy splice().
+ */
written += ret_splice;
ret = errno;
+ PERROR("Wrote more data than requested %zd (len: %lu)", ret_splice,
+ len);
goto splice_error;
+ } else {
+ /* All good, update current len and continue. */
+ len -= ret_splice;
}
- len -= ret_splice;
/* This call is useless on a socket so better save a syscall. */
if (!relayd) {
written += ret_splice;
}
lttng_consumer_sync_trace_file(stream, orig_offset);
-
- ret = ret_splice;
-
goto end;
write_error:
}
}
-/*
- * Iterate over all streams of the hashtable and free them properly.
- *
- * WARNING: *MUST* be used with data stream only.
- */
-static void destroy_data_stream_ht(struct lttng_ht *ht)
-{
- struct lttng_ht_iter iter;
- struct lttng_consumer_stream *stream;
-
- if (ht == NULL) {
- return;
- }
-
- rcu_read_lock();
- cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
- /*
- * Ignore return value since we are currently cleaning up so any error
- * can't be handled.
- */
- (void) consumer_del_stream(stream, ht);
- }
- rcu_read_unlock();
-
- lttng_ht_destroy(ht);
-}
-
-/*
- * Iterate over all streams of the hashtable and free them properly.
- *
- * XXX: Should not be only for metadata stream or else use an other name.
- */
-static void destroy_stream_ht(struct lttng_ht *ht)
-{
- struct lttng_ht_iter iter;
- struct lttng_consumer_stream *stream;
-
- if (ht == NULL) {
- return;
- }
-
- rcu_read_lock();
- cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
- /*
- * Ignore return value since we are currently cleaning up so any error
- * can't be handled.
- */
- (void) consumer_del_metadata_stream(stream, ht);
- }
- rcu_read_unlock();
-
- lttng_ht_destroy(ht);
-}
-
void lttng_consumer_close_metadata(void)
{
switch (consumer_data.type) {
health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
- health_code_update();
-
- metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
- if (!metadata_ht) {
- /* ENOMEM at this point. Better to bail out. */
- goto end_ht;
+ if (testpoint(consumerd_thread_metadata)) {
+ goto error_testpoint;
}
+ health_code_update();
+
DBG("Thread metadata poll started");
/* Size is set to 1 for the consumer_metadata pipe */
lttng_poll_clean(&events);
end_poll:
- destroy_stream_ht(metadata_ht);
-end_ht:
+error_testpoint:
if (err) {
health_error();
ERR("Health error occurred in %s", __func__);
health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
- health_code_update();
-
- data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
- if (data_ht == NULL) {
- /* ENOMEM at this point. Better to bail out. */
- goto end;
+ if (testpoint(consumerd_thread_data)) {
+ goto error_testpoint;
}
+ health_code_update();
+
local_stream = zmalloc(sizeof(struct lttng_consumer_stream *));
if (local_stream == NULL) {
PERROR("local_stream malloc");
*/
(void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
- destroy_data_stream_ht(data_ht);
-
+error_testpoint:
if (err) {
health_error();
ERR("Health error occurred in %s", __func__);
health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
+ if (testpoint(consumerd_thread_channel)) {
+ goto error_testpoint;
+ }
+
health_code_update();
channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
end_poll:
destroy_channel_ht(channel_ht);
end_ht:
+error_testpoint:
DBG("Channel poll thread exiting");
if (err) {
health_error();
health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
+ if (testpoint(consumerd_thread_sessiond)) {
+ goto error_testpoint;
+ }
+
health_code_update();
DBG("Creating command socket %s", ctx->consumer_command_sock_path);
}
}
+error_testpoint:
if (err) {
health_error();
ERR("Health error occurred in %s", __func__);
/*
* Allocate and set consumer data hash tables.
*/
-void lttng_consumer_init(void)
+int lttng_consumer_init(void)
{
consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!consumer_data.channel_ht) {
+ goto error;
+ }
+
consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!consumer_data.relayd_ht) {
+ goto error;
+ }
+
consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!consumer_data.stream_list_ht) {
+ goto error;
+ }
+
consumer_data.stream_per_chan_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!consumer_data.stream_per_chan_id_ht) {
+ goto error;
+ }
+
+ data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!data_ht) {
+ goto error;
+ }
+
+ metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!metadata_ht) {
+ goto error;
+ }
+
+ return 0;
+
+error:
+ return -1;
}
/*