consumer_stream_destroy(stream, metadata_ht);
}
+void consumer_stream_update_channel_attributes(
+ struct lttng_consumer_stream *stream,
+ struct lttng_consumer_channel *channel)
+{
+ stream->channel_read_only_attributes.tracefile_size =
+ channel->tracefile_size;
+ memcpy(stream->channel_read_only_attributes.path, channel->pathname,
+ sizeof(stream->channel_read_only_attributes.path));
+}
+
struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
uint64_t stream_key,
enum lttng_consumer_stream_state state,
return written;
}
+/*
+ * Sample the snapshot positions for a specific fd
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream)
+{
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ return lttng_kconsumer_sample_snapshot_positions(stream);
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ return lttng_ustconsumer_sample_snapshot_positions(stream);
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ return -ENOSYS;
+ }
+}
/*
* Take a snapshot for a specific fd
*
}
}
+/*
+ * Get the consumed position (free-running counter position in bytes).
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
+ unsigned long *pos)
+{
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ return lttng_kconsumer_get_consumed_snapshot(stream, pos);
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ return lttng_ustconsumer_get_consumed_snapshot(stream, pos);
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ return -ENOSYS;
+ }
+}
+
int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
int sock, struct pollfd *consumer_sockpoll)
{
lttng_ht_add_unique_u64(ht, &stream->node);
- lttng_ht_add_unique_u64(consumer_data.stream_per_chan_id_ht,
+ lttng_ht_add_u64(consumer_data.stream_per_chan_id_ht,
&stream->node_channel_id);
/*
/* local view of the streams */
struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
/* local view of consumer_data.fds_count */
- int nb_fd = 0;
+ int nb_fd = 0, nb_pipes_fd;
/* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */
int nb_inactive_fd = 0;
struct lttng_consumer_local_data *ctx = data;
local_stream = NULL;
/*
- * Allocate for all fds +1 for the consumer_data_pipe and +1 for
- * wake up pipe.
+ * Allocate for all fds + 2:
+ * +1 for the consumer_data_pipe
+ * +1 for wake up pipe
*/
- pollfd = zmalloc((consumer_data.stream_count + 2) * sizeof(struct pollfd));
+ nb_pipes_fd = 2;
+ pollfd = zmalloc((consumer_data.stream_count + nb_pipes_fd) * sizeof(struct pollfd));
if (pollfd == NULL) {
PERROR("pollfd malloc");
pthread_mutex_unlock(&consumer_data.lock);
goto end;
}
- local_stream = zmalloc((consumer_data.stream_count + 2) *
+ local_stream = zmalloc((consumer_data.stream_count + nb_pipes_fd) *
sizeof(struct lttng_consumer_stream *));
if (local_stream == NULL) {
PERROR("local_stream malloc");
}
/* poll on the array of fds */
restart:
- DBG("polling on %d fd", nb_fd + 2);
+ DBG("polling on %d fd", nb_fd + nb_pipes_fd);
if (testpoint(consumerd_thread_data_poll)) {
goto end;
}
health_poll_entry();
- num_rdy = poll(pollfd, nb_fd + 2, -1);
+ num_rdy = poll(pollfd, nb_fd + nb_pipes_fd, -1);
health_poll_exit();
DBG("poll num_rdy : %d", num_rdy);
if (num_rdy == -1) {
}
return start_pos;
}
+
+static
+int rotate_rename_local(const char *old_path, const char *new_path,
+ uid_t uid, gid_t gid)
+{
+ int ret;
+
+ assert(old_path);
+ assert(new_path);
+
+ ret = utils_mkdir_recursive(new_path, S_IRWXU | S_IRWXG, uid, gid);
+ if (ret < 0) {
+ ERR("Create directory on rotate");
+ goto end;
+ }
+
+ ret = rename(old_path, new_path);
+ if (ret < 0 && errno != ENOENT) {
+ PERROR("Rename completed rotation chunk");
+ goto end;
+ }
+
+ ret = 0;
+end:
+ return ret;
+}
+
+static
+int rotate_rename_relay(const char *old_path, const char *new_path,
+ uint64_t relayd_id)
+{
+ int ret;
+ struct consumer_relayd_sock_pair *relayd;
+
+ relayd = consumer_find_relayd(relayd_id);
+ if (!relayd) {
+ ERR("Failed to find relayd while running rotate_rename_relay command");
+ ret = -1;
+ goto end;
+ }
+
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_rotate_rename(&relayd->control_sock, old_path, new_path);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+end:
+ return ret;
+}
+
+int lttng_consumer_rotate_rename(const char *old_path, const char *new_path,
+ uid_t uid, gid_t gid, uint64_t relayd_id)
+{
+ if (relayd_id != -1ULL) {
+ return rotate_rename_relay(old_path, new_path, relayd_id);
+ } else {
+ return rotate_rename_local(old_path, new_path, uid, gid);
+ }
+}
+
+static
+int mkdir_local(const char *path, uid_t uid, gid_t gid)
+{
+ int ret;
+
+ ret = utils_mkdir_recursive(path, S_IRWXU | S_IRWXG, uid, gid);
+ if (ret < 0) {
+ /* utils_mkdir_recursive logs an error. */
+ goto end;
+ }
+
+ ret = 0;
+end:
+ return ret;
+}
+
+static
+int mkdir_relay(const char *path, uint64_t relayd_id)
+{
+ int ret;
+ struct consumer_relayd_sock_pair *relayd;
+
+ relayd = consumer_find_relayd(relayd_id);
+ if (!relayd) {
+ ERR("Failed to find relayd");
+ ret = -1;
+ goto end;
+ }
+
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_mkdir(&relayd->control_sock, path);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+
+end:
+ return ret;
+
+}
+
+int lttng_consumer_mkdir(const char *path, uid_t uid, gid_t gid,
+ uint64_t relayd_id)
+{
+ if (relayd_id != -1ULL) {
+ return mkdir_relay(path, relayd_id);
+ } else {
+ return mkdir_local(path, uid, gid);
+ }
+}