/*
- * Copyright (C) 2011 Julien Desfossez <julien.desfossez@polymtl.ca>
+ * Copyright (C) 2011 EfficiOS Inc.
* Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
* Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
*
*/
#include "common/index/ctf-index.h"
+#include <stdint.h>
#define _LGPL_SOURCE
#include <assert.h>
#include <poll.h>
/* Delete streams that might have been left in the stream list. */
cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
send_node) {
- cds_list_del(&stream->send_node);
/*
* Once a stream is added to this list, the buffers were created so we
* have a guarantee that this call will succeed. Setting the monitor
return outfd;
}
+/*
+ * Write a character on the metadata poll pipe to wake the metadata thread.
+ * Returns 0 on success, -1 on error.
+ */
+int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel)
+{
+ int ret = 0;
+
+ DBG("Waking up metadata poll thread (writing to pipe): channel name = '%s'",
+ channel->name);
+ if (channel->monitor && channel->metadata_stream) {
+ const char dummy = 'c';
+ const ssize_t write_ret = lttng_write(
+ channel->metadata_stream->ust_metadata_poll_pipe[1],
+ &dummy, 1);
+
+ if (write_ret < 1) {
+ if (errno == EWOULDBLOCK) {
+ /*
+ * This is fine, the metadata poll thread
+ * is having a hard time keeping-up, but
+ * it will eventually wake-up and consume
+ * the available data.
+ */
+ ret = 0;
+ } else {
+ PERROR("Failed to write to UST metadata pipe while attempting to wake-up the metadata poll thread");
+ ret = -1;
+ goto end;
+ }
+ }
+ }
+
+end:
+ return ret;
+}
+
/*
* Trigger a dump of the metadata content. Following/during the succesful
* completion of this call, the metadata poll thread will start receiving
if (!locked_by_caller) {
stream->read_subbuffer_ops.lock(stream);
+ } else {
+ stream->read_subbuffer_ops.assert_locked(stream);
}
if (stream->read_subbuffer_ops.on_wake_up) {
* This will create a relayd socket pair and add it to the relayd hash table.
* The caller MUST acquire a RCU read side lock before calling it.
*/
- void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
- struct lttng_consumer_local_data *ctx, int sock,
+void consumer_add_relayd_socket(uint64_t net_seq_idx,
+ int sock_type,
+ struct lttng_consumer_local_data *ctx,
+ int sock,
struct pollfd *consumer_sockpoll,
- struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id,
- uint64_t relayd_session_id)
+ uint64_t sessiond_id,
+ uint64_t relayd_session_id,
+ uint32_t relayd_version_major,
+ uint32_t relayd_version_minor,
+ enum lttcomm_sock_proto relayd_socket_protocol)
{
int fd = -1, ret = -1, relayd_created = 0;
enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
struct consumer_relayd_sock_pair *relayd = NULL;
assert(ctx);
- assert(relayd_sock);
DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx);
switch (sock_type) {
case LTTNG_STREAM_CONTROL:
/* Copy received lttcomm socket */
- lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock);
- ret = lttcomm_create_sock(&relayd->control_sock.sock);
- /* Handle create_sock error. */
- if (ret < 0) {
- ret_code = LTTCOMM_CONSUMERD_ENOMEM;
- goto error;
- }
- /*
- * Close the socket created internally by
- * lttcomm_create_sock, so we can replace it by the one
- * received from sessiond.
- */
- if (close(relayd->control_sock.sock.fd)) {
- PERROR("close");
- }
+ ret = lttcomm_populate_sock_from_open_socket(
+ &relayd->control_sock.sock, fd,
+ relayd_socket_protocol);
- /* Assign new file descriptor */
- relayd->control_sock.sock.fd = fd;
/* Assign version values. */
- relayd->control_sock.major = relayd_sock->major;
- relayd->control_sock.minor = relayd_sock->minor;
+ relayd->control_sock.major = relayd_version_major;
+ relayd->control_sock.minor = relayd_version_minor;
relayd->relayd_session_id = relayd_session_id;
break;
case LTTNG_STREAM_DATA:
/* Copy received lttcomm socket */
- lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock);
- ret = lttcomm_create_sock(&relayd->data_sock.sock);
- /* Handle create_sock error. */
- if (ret < 0) {
- ret_code = LTTCOMM_CONSUMERD_ENOMEM;
- goto error;
- }
- /*
- * Close the socket created internally by
- * lttcomm_create_sock, so we can replace it by the one
- * received from sessiond.
- */
- if (close(relayd->data_sock.sock.fd)) {
- PERROR("close");
- }
-
- /* Assign new file descriptor */
- relayd->data_sock.sock.fd = fd;
+ ret = lttcomm_populate_sock_from_open_socket(
+ &relayd->data_sock.sock, fd,
+ relayd_socket_protocol);
/* Assign version values. */
- relayd->data_sock.major = relayd_sock->major;
- relayd->data_sock.minor = relayd_sock->minor;
+ relayd->data_sock.major = relayd_version_major;
+ relayd->data_sock.minor = relayd_version_minor;
break;
default:
ERR("Unknown relayd socket type (%d)", sock_type);
goto error;
}
+ if (ret < 0) {
+ ret_code = LTTCOMM_CONSUMERD_FATAL;
+ goto error;
+ }
+
DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
relayd->net_seq_idx, fd);
return start_pos;
}
+/* Stream lock must be held by the caller. */
+static int sample_stream_positions(struct lttng_consumer_stream *stream,
+ unsigned long *produced, unsigned long *consumed)
+{
+ int ret;
+
+ ASSERT_LOCKED(stream->lock);
+
+ ret = lttng_consumer_sample_snapshot_positions(stream);
+ if (ret < 0) {
+ ERR("Failed to sample snapshot positions");
+ goto end;
+ }
+
+ ret = lttng_consumer_get_produced_snapshot(stream, produced);
+ if (ret < 0) {
+ ERR("Failed to sample produced position");
+ goto end;
+ }
+
+ ret = lttng_consumer_get_consumed_snapshot(stream, consumed);
+ if (ret < 0) {
+ ERR("Failed to sample consumed position");
+ goto end;
+ }
+
+end:
+ return ret;
+}
+
/*
* Sample the rotate position for all the streams of a channel. If a stream
* is already at the rotate position (produced == consumed), we flag it as
}
/*
- * Do not flush an empty packet when rotating from a NULL trace
+ * Do not flush a packet when rotating from a NULL trace
* chunk. The stream has no means to output data, and the prior
- * rotation which rotated to NULL performed that side-effect already.
+ * rotation which rotated to NULL performed that side-effect
+ * already. No new data can be produced when a stream has no
+ * associated trace chunk (e.g. a stop followed by a rotate).
*/
if (stream->trace_chunk) {
+ bool flush_active;
+
+ if (stream->metadata_flag) {
+ /*
+ * Don't produce an empty metadata packet,
+ * simply close the current one.
+ *
+ * Metadata is regenerated on every trace chunk
+ * switch; there is no concern that no data was
+ * produced.
+ */
+ flush_active = true;
+ } else {
+ /*
+ * Only flush an empty packet if the "packet
+ * open" could not be performed on transition
+ * to a new trace chunk and no packets were
+ * consumed within the chunk's lifetime.
+ */
+ if (stream->opened_packet_in_current_trace_chunk) {
+ flush_active = true;
+ } else {
+ /*
+ * Stream could have been full at the
+ * time of rotation, but then have had
+ * no activity at all.
+ *
+ * It is important to flush a packet
+ * to prevent 0-length files from being
+ * produced as most viewers choke on
+ * them.
+ *
+ * Unfortunately viewers will not be
+ * able to know that tracing was active
+ * for this stream during this trace
+ * chunk's lifetime.
+ */
+ ret = sample_stream_positions(stream, &produced_pos, &consumed_pos);
+ if (ret) {
+ goto end_unlock_stream;
+ }
+
+ /*
+ * Don't flush an empty packet if data
+ * was produced; it will be consumed
+ * before the rotation completes.
+ */
+ flush_active = produced_pos != consumed_pos;
+ if (!flush_active) {
+ enum lttng_trace_chunk_status chunk_status;
+ const char *trace_chunk_name;
+ uint64_t trace_chunk_id;
+
+ chunk_status = lttng_trace_chunk_get_name(
+ stream->trace_chunk,
+ &trace_chunk_name,
+ NULL);
+ if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NONE) {
+ trace_chunk_name = "none";
+ }
+
+ /*
+ * Consumer trace chunks are
+ * never anonymous.
+ */
+ chunk_status = lttng_trace_chunk_get_id(
+ stream->trace_chunk,
+ &trace_chunk_id);
+ assert(chunk_status ==
+ LTTNG_TRACE_CHUNK_STATUS_OK);
+
+ DBG("Unable to open packet for stream during trace chunk's lifetime. "
+ "Flushing an empty packet to prevent an empty file from being created: "
+ "stream id = %" PRIu64 ", trace chunk name = `%s`, trace chunk id = %" PRIu64,
+ stream->key, trace_chunk_name, trace_chunk_id);
+ }
+ }
+ }
+
/*
- * For metadata stream, do an active flush, which does not
- * produce empty packets. For data streams, empty-flush;
- * ensures we have at least one packet in each stream per trace
- * chunk, even if no data was produced.
+ * Close the current packet before sampling the
+ * ring buffer positions.
*/
- ret = consumer_flush_buffer(stream, stream->metadata_flag ? 1 : 0);
+ ret = consumer_flush_buffer(stream, flush_active);
if (ret < 0) {
ERR("Failed to flush stream %" PRIu64 " during channel rotation",
stream->key);