#include "consumer.h"
#include "health.h"
#include "ust-app.h"
+#include "utils.h"
/*
* Receive a reply command status message from the consumer. Consumer socket
assert(consumer);
/* Destroy any relayd connection */
- if (consumer && consumer->type == CONSUMER_DST_NET) {
+ if (consumer->type == CONSUMER_DST_NET) {
rcu_read_lock();
cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
node.node) {
rcu_read_unlock();
}
+ socket->type = data->type;
+
DBG3("Consumer socket created (fd: %d) and added to output",
data->cmd_sock);
rcu_read_unlock();
/* Finally destroy HT */
- lttng_ht_destroy(obj->socks);
+ ht_cleanup_push(obj->socks);
}
free(obj);
*/
struct consumer_output *consumer_copy_output(struct consumer_output *obj)
{
+ int ret;
struct lttng_ht *tmp_ht_ptr;
- struct lttng_ht_iter iter;
- struct consumer_socket *socket, *copy_sock;
struct consumer_output *output;
assert(obj);
/* Putting back the HT pointer and start copying socket(s). */
output->socks = tmp_ht_ptr;
+ ret = consumer_copy_sockets(output, obj);
+ if (ret < 0) {
+ goto malloc_error;
+ }
+
+error:
+ return output;
+
+malloc_error:
+ consumer_destroy_output(output);
+ return NULL;
+}
+
+/*
+ * Copy consumer sockets from src to dst.
+ *
+ * Return 0 on success or else a negative value.
+ */
+int consumer_copy_sockets(struct consumer_output *dst,
+ struct consumer_output *src)
+{
+ int ret = 0;
+ struct lttng_ht_iter iter;
+ struct consumer_socket *socket, *copy_sock;
+
+ assert(dst);
+ assert(src);
+
rcu_read_lock();
- cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
+ cds_lfht_for_each_entry(src->socks->ht, &iter.iter, socket, node.node) {
+ /* Ignore socket that are already there. */
+ copy_sock = consumer_find_socket(socket->fd, dst);
+ if (copy_sock) {
+ continue;
+ }
+
/* Create new socket object. */
copy_sock = consumer_allocate_socket(socket->fd);
if (copy_sock == NULL) {
rcu_read_unlock();
- goto malloc_error;
+ ret = -ENOMEM;
+ goto error;
}
copy_sock->registered = socket->registered;
+ /*
+ * This is valid because this lock is shared accross all consumer
+ * object being the global lock of the consumer data structure of the
+ * session daemon.
+ */
copy_sock->lock = socket->lock;
- consumer_add_socket(copy_sock, output);
+ consumer_add_socket(copy_sock, dst);
}
rcu_read_unlock();
error:
- return output;
-
-malloc_error:
- consumer_destroy_output(output);
- return NULL;
+ return ret;
}
/*
unsigned char *uuid,
uint32_t chan_id,
uint64_t tracefile_size,
- uint64_t tracefile_count)
+ uint64_t tracefile_count,
+ unsigned int monitor)
{
assert(msg);
msg->u.ask_channel.chan_id = chan_id;
msg->u.ask_channel.tracefile_size = tracefile_size;
msg->u.ask_channel.tracefile_count = tracefile_count;
+ msg->u.ask_channel.monitor = monitor;
memcpy(msg->u.ask_channel.uuid, uuid, sizeof(msg->u.ask_channel.uuid));
enum lttng_event_output output,
int type,
uint64_t tracefile_size,
- uint64_t tracefile_count)
+ uint64_t tracefile_count,
+ unsigned int monitor)
{
assert(msg);
msg->u.channel.type = type;
msg->u.channel.tracefile_size = tracefile_size;
msg->u.channel.tracefile_count = tracefile_count;
+ msg->u.channel.monitor = monitor;
strncpy(msg->u.channel.pathname, pathname,
sizeof(msg->u.channel.pathname));
health_code_update();
return ret;
}
+
+/*
+ * Ask the consumer to snapshot a specific channel using the key.
+ *
+ * Return 0 on success or else a negative error.
+ */
+int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
+ struct snapshot_output *output, int metadata, uid_t uid, gid_t gid,
+ int wait)
+{
+ int ret;
+ struct lttcomm_consumer_msg msg;
+
+ assert(socket);
+ assert(socket->fd >= 0);
+ assert(output);
+ assert(output->consumer);
+
+ DBG("Consumer snapshot channel key %" PRIu64, key);
+
+ memset(&msg, 0, sizeof(msg));
+
+ msg.cmd_type = LTTNG_CONSUMER_SNAPSHOT_CHANNEL;
+ msg.u.snapshot_channel.key = key;
+ msg.u.snapshot_channel.max_size = output->max_size;
+ msg.u.snapshot_channel.metadata = metadata;
+
+ if (output->consumer->type == CONSUMER_DST_NET) {
+ msg.u.snapshot_channel.relayd_id = output->consumer->net_seq_index;
+ msg.u.snapshot_channel.use_relayd = 1;
+ ret = snprintf(msg.u.snapshot_channel.pathname,
+ sizeof(msg.u.snapshot_channel.pathname), "%s/%s",
+ output->consumer->subdir, DEFAULT_SNAPSHOT_NAME);
+ if (ret < 0) {
+ ret = -LTTNG_ERR_NOMEM;
+ goto error;
+ }
+ } else {
+ ret = snprintf(msg.u.snapshot_channel.pathname,
+ sizeof(msg.u.snapshot_channel.pathname), "%s/%s",
+ output->consumer->dst.trace_path, DEFAULT_SNAPSHOT_NAME);
+ if (ret < 0) {
+ ret = -LTTNG_ERR_NOMEM;
+ goto error;
+ }
+
+ /* Create directory. Ignore if exist. */
+ ret = run_as_mkdir_recursive(msg.u.snapshot_channel.pathname,
+ S_IRWXU | S_IRWXG, uid, gid);
+ if (ret < 0) {
+ if (ret != -EEXIST) {
+ ERR("Trace directory creation error");
+ goto error;
+ }
+ }
+ }
+
+ health_code_update();
+ ret = consumer_send_msg(socket, &msg);
+ if (ret < 0) {
+ goto error;
+ }
+
+error:
+ health_code_update();
+ return ret;
+}