/*
* Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
* Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License, version 2 only,
}
/* Try to rmdir all directories under shm_path root. */
if (channel->root_shm_path[0]) {
- (void) run_as_recursive_rmdir(channel->root_shm_path,
+ (void) run_as_rmdir_recursive(channel->root_shm_path,
channel->uid, channel->gid);
}
free(stream_fds);
consumer_timer_switch_start(channel, attr.switch_timer_interval);
attr.switch_timer_interval = 0;
} else {
+ int monitor_start_ret;
+
consumer_timer_live_start(channel,
msg.u.ask_channel.live_timer_interval);
+ monitor_start_ret = consumer_timer_monitor_start(
+ channel,
+ msg.u.ask_channel.monitor_timer_interval);
+ if (monitor_start_ret < 0) {
+ ERR("Starting channel monitoring timer failed");
+ goto end_channel_error;
+ }
}
health_code_update();
if (channel->live_timer_enabled == 1) {
consumer_timer_live_stop(channel);
}
+ if (channel->monitor_timer_enabled == 1) {
+ consumer_timer_monitor_stop(channel);
+ }
goto end_channel_error;
}
break;
}
+ case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
+ {
+ int channel_monitor_pipe;
+
+ ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ /* Successfully received the command's type. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ goto error_fatal;
+ }
+
+ ret = lttcomm_recv_fds_unix_sock(sock, &channel_monitor_pipe,
+ 1);
+ if (ret != sizeof(channel_monitor_pipe)) {
+ ERR("Failed to receive channel monitor pipe");
+ goto error_fatal;
+ }
+
+ DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
+ ret = consumer_timer_thread_set_channel_monitor_pipe(
+ channel_monitor_pipe);
+ if (!ret) {
+ int flags;
+
+ ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ /* Set the pipe as non-blocking. */
+ ret = fcntl(channel_monitor_pipe, F_GETFL, 0);
+ if (ret == -1) {
+ PERROR("fcntl get flags of the channel monitoring pipe");
+ goto error_fatal;
+ }
+ flags = ret;
+
+ ret = fcntl(channel_monitor_pipe, F_SETFL,
+ flags | O_NONBLOCK);
+ if (ret == -1) {
+ PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
+ goto error_fatal;
+ }
+ DBG("Channel monitor pipe set as non-blocking");
+ } else {
+ ret_code = LTTCOMM_CONSUMERD_ALREADY_SET;
+ }
+ goto end_msg_sessiond;
+ }
default:
break;
}
}
/*
- * Take a snapshot for a specific fd
+ * Take a snapshot for a specific stream.
*
* Returns 0 on success, < 0 on error
*/
return ustctl_snapshot(stream->ustream);
}
+/*
+ * Sample consumed and produced positions for a specific stream.
+ *
+ * Returns 0 on success, < 0 on error.
+ */
+int lttng_ustconsumer_sample_snapshot_positions(
+ struct lttng_consumer_stream *stream)
+{
+ assert(stream);
+ assert(stream->ustream);
+
+ return ustctl_snapshot_sample_positions(stream->ustream);
+}
+
/*
* Get the produced position
*
ustctl_destroy_channel(chan->uchan);
/* Try to rmdir all directories under shm_path root. */
if (chan->root_shm_path[0]) {
- (void) run_as_recursive_rmdir(chan->root_shm_path,
+ (void) run_as_rmdir_recursive(chan->root_shm_path,
chan->uid, chan->gid);
}
free(chan->stream_fds);
* because we locked the metadata thread.
*/
ret = lttng_ustconsumer_request_metadata(ctx, metadata->chan, 0, 0);
+ pthread_mutex_lock(&metadata->lock);
if (ret < 0) {
goto end;
}
- pthread_mutex_lock(&metadata->lock);
ret = commit_one_metadata_packet(metadata);
if (ret <= 0) {
index.offset = htobe64(stream->out_fd_offset);
ret = get_index_values(&index, ustream);
if (ret < 0) {
+ err = ustctl_put_subbuf(ustream);
+ assert(err == 0);
goto end;
}
ret = update_stream_stats(stream);
if (ret < 0) {
PERROR("kernctl_get_events_discarded");
+ err = ustctl_put_subbuf(ustream);
+ assert(err == 0);
goto end;
}
} else {
stream->tracefile_size_current = 0;
if (!stream->metadata_flag) {
- ret = index_create_file(stream->chan->pathname,
+ struct lttng_index_file *index_file;
+
+ index_file = lttng_index_file_create(stream->chan->pathname,
stream->name, stream->uid, stream->gid,
stream->chan->tracefile_size,
- stream->tracefile_count_current);
- if (ret < 0) {
+ stream->tracefile_count_current,
+ CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+ if (!index_file) {
goto error;
}
- stream->index_fd = ret;
+ assert(!stream->index_file);
+ stream->index_file = index_file;
}
}
ret = 0;