consumer_stream_destroy(stream, metadata_ht);
}
+void consumer_stream_update_channel_attributes(
+ struct lttng_consumer_stream *stream,
+ struct lttng_consumer_channel *channel)
+{
+ stream->channel_read_only_attributes.tracefile_size =
+ channel->tracefile_size;
+ memcpy(stream->channel_read_only_attributes.path, channel->pathname,
+ sizeof(stream->channel_read_only_attributes.path));
+}
+
struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
uint64_t stream_key,
enum lttng_consumer_stream_state state,
return written;
}
+/*
+ * Sample the snapshot positions for a specific fd
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream)
+{
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ return lttng_kconsumer_sample_snapshot_positions(stream);
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ return lttng_ustconsumer_sample_snapshot_positions(stream);
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ return -ENOSYS;
+ }
+}
/*
* Take a snapshot for a specific fd
*
}
}
+/*
+ * Get the consumed position (free-running counter position in bytes).
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
+ unsigned long *pos)
+{
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ return lttng_kconsumer_get_consumed_snapshot(stream, pos);
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ return lttng_ustconsumer_get_consumed_snapshot(stream, pos);
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ return -ENOSYS;
+ }
+}
+
int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
int sock, struct pollfd *consumer_sockpoll)
{
/* local view of the streams */
struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
/* local view of consumer_data.fds_count */
- int nb_fd = 0;
+ int nb_fd = 0, nb_pipes_fd;
/* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */
int nb_inactive_fd = 0;
struct lttng_consumer_local_data *ctx = data;
local_stream = NULL;
/*
- * Allocate for all fds +1 for the consumer_data_pipe and +1 for
- * wake up pipe.
+ * Allocate for all fds + 2:
+ * +1 for the consumer_data_pipe
+ * +1 for wake up pipe
*/
- pollfd = zmalloc((consumer_data.stream_count + 2) * sizeof(struct pollfd));
+ nb_pipes_fd = 2;
+ pollfd = zmalloc((consumer_data.stream_count + nb_pipes_fd) * sizeof(struct pollfd));
if (pollfd == NULL) {
PERROR("pollfd malloc");
pthread_mutex_unlock(&consumer_data.lock);
goto end;
}
- local_stream = zmalloc((consumer_data.stream_count + 2) *
+ local_stream = zmalloc((consumer_data.stream_count + nb_pipes_fd) *
sizeof(struct lttng_consumer_stream *));
if (local_stream == NULL) {
PERROR("local_stream malloc");
}
/* poll on the array of fds */
restart:
- DBG("polling on %d fd", nb_fd + 2);
+ DBG("polling on %d fd", nb_fd + nb_pipes_fd);
if (testpoint(consumerd_thread_data_poll)) {
goto end;
}
health_poll_entry();
- num_rdy = poll(pollfd, nb_fd + 2, -1);
+ num_rdy = poll(pollfd, nb_fd + nb_pipes_fd, -1);
health_poll_exit();
DBG("poll num_rdy : %d", num_rdy);
if (num_rdy == -1) {