#include <common/common.h>
#include <common/defaults.h>
#include <common/uri.h>
+#include <common/utils.h>
#include "consumer.h"
#include "health.h"
#include "ust-app.h"
+#include "utils.h"
/*
* Receive a reply command status message from the consumer. Consumer socket
assert(consumer);
/* Destroy any relayd connection */
- if (consumer && consumer->type == CONSUMER_DST_NET) {
+ if (consumer->type == CONSUMER_DST_NET) {
rcu_read_lock();
cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
node.node) {
rcu_read_unlock();
}
+ socket->type = data->type;
+
DBG3("Consumer socket created (fd: %d) and added to output",
data->cmd_sock);
/*
* Delete the consumer_output object from the list and free the ptr.
+ *
+ * Should *NOT* be called with RCU read-side lock held.
*/
void consumer_destroy_output(struct consumer_output *obj)
{
rcu_read_unlock();
/* Finally destroy HT */
- lttng_ht_destroy(obj->socks);
+ ht_cleanup_push(obj->socks);
}
free(obj);
/*
* Copy consumer output and returned the newly allocated copy.
+ *
+ * Should *NOT* be called with RCU read-side lock held.
*/
struct consumer_output *consumer_copy_output(struct consumer_output *obj)
{
+ int ret;
struct lttng_ht *tmp_ht_ptr;
- struct lttng_ht_iter iter;
- struct consumer_socket *socket, *copy_sock;
struct consumer_output *output;
assert(obj);
/* Putting back the HT pointer and start copying socket(s). */
output->socks = tmp_ht_ptr;
+ ret = consumer_copy_sockets(output, obj);
+ if (ret < 0) {
+ goto malloc_error;
+ }
+
+error:
+ return output;
+
+malloc_error:
+ consumer_destroy_output(output);
+ return NULL;
+}
+
+/*
+ * Copy consumer sockets from src to dst.
+ *
+ * Return 0 on success or else a negative value.
+ */
+int consumer_copy_sockets(struct consumer_output *dst,
+ struct consumer_output *src)
+{
+ int ret = 0;
+ struct lttng_ht_iter iter;
+ struct consumer_socket *socket, *copy_sock;
+
+ assert(dst);
+ assert(src);
+
rcu_read_lock();
- cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
+ cds_lfht_for_each_entry(src->socks->ht, &iter.iter, socket, node.node) {
+ /* Ignore socket that are already there. */
+ copy_sock = consumer_find_socket(socket->fd, dst);
+ if (copy_sock) {
+ continue;
+ }
+
/* Create new socket object. */
copy_sock = consumer_allocate_socket(socket->fd);
if (copy_sock == NULL) {
rcu_read_unlock();
- goto malloc_error;
+ ret = -ENOMEM;
+ goto error;
}
copy_sock->registered = socket->registered;
+ /*
+ * This is valid because this lock is shared accross all consumer
+ * object being the global lock of the consumer data structure of the
+ * session daemon.
+ */
copy_sock->lock = socket->lock;
- consumer_add_socket(copy_sock, output);
+ consumer_add_socket(copy_sock, dst);
}
rcu_read_unlock();
error:
- return output;
-
-malloc_error:
- consumer_destroy_output(output);
- return NULL;
+ return ret;
}
/*
if (uri->port == 0) {
/* Assign default port. */
uri->port = DEFAULT_NETWORK_CONTROL_PORT;
+ } else {
+ if (obj->dst.net.data_isset && uri->port ==
+ obj->dst.net.data.port) {
+ ret = -LTTNG_ERR_INVALID;
+ goto error;
+ }
}
DBG3("Consumer control URI set with port %d", uri->port);
break;
if (uri->port == 0) {
/* Assign default port. */
uri->port = DEFAULT_NETWORK_DATA_PORT;
+ } else {
+ if (obj->dst.net.control_isset && uri->port ==
+ obj->dst.net.control.port) {
+ ret = -LTTNG_ERR_INVALID;
+ goto error;
+ }
}
DBG3("Consumer data URI set with port %d", uri->port);
break;
default:
ERR("Set network uri type unknown %d", uri->stype);
+ ret = -LTTNG_ERR_INVALID;
goto error;
}
}
if (ret < 0) {
PERROR("snprintf set consumer uri subdir");
+ ret = -LTTNG_ERR_NOMEM;
goto error;
}
equal:
return 1;
error:
- return -1;
+ return ret;
}
/*
uint64_t relayd_id,
uint64_t key,
unsigned char *uuid,
- uint32_t chan_id)
+ uint32_t chan_id,
+ uint64_t tracefile_size,
+ uint64_t tracefile_count,
+ unsigned int monitor)
{
assert(msg);
msg->u.ask_channel.relayd_id = relayd_id;
msg->u.ask_channel.key = key;
msg->u.ask_channel.chan_id = chan_id;
+ msg->u.ask_channel.tracefile_size = tracefile_size;
+ msg->u.ask_channel.tracefile_count = tracefile_count;
+ msg->u.ask_channel.monitor = monitor;
memcpy(msg->u.ask_channel.uuid, uuid, sizeof(msg->u.ask_channel.uuid));
const char *name,
unsigned int nb_init_streams,
enum lttng_event_output output,
- int type)
+ int type,
+ uint64_t tracefile_size,
+ uint64_t tracefile_count,
+ unsigned int monitor)
{
assert(msg);
msg->u.channel.nb_init_streams = nb_init_streams;
msg->u.channel.output = output;
msg->u.channel.type = type;
+ msg->u.channel.tracefile_size = tracefile_size;
+ msg->u.channel.tracefile_count = tracefile_count;
+ msg->u.channel.monitor = monitor;
strncpy(msg->u.channel.pathname, pathname,
sizeof(msg->u.channel.pathname));
* On success return positive value. On error, negative value.
*/
int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
- struct lttcomm_sock *sock, struct consumer_output *consumer,
+ struct lttcomm_relayd_sock *rsock, struct consumer_output *consumer,
enum lttng_stream_type type, uint64_t session_id)
{
int ret;
struct lttcomm_consumer_msg msg;
/* Code flow error. Safety net. */
- assert(sock);
+ assert(rsock);
assert(consumer);
assert(consumer_sock);
msg.u.relayd_sock.net_index = consumer->net_seq_index;
msg.u.relayd_sock.type = type;
msg.u.relayd_sock.session_id = session_id;
- memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock));
+ memcpy(&msg.u.relayd_sock.sock, rsock, sizeof(msg.u.relayd_sock.sock));
DBG3("Sending relayd sock info to consumer on %d", consumer_sock->fd);
ret = lttcomm_send_unix_sock(consumer_sock->fd, &msg, sizeof(msg));
if (ret < 0) {
/* The above call will print a PERROR on error. */
- DBG("Error when sending relayd sockets on sock %d", sock->fd);
+ DBG("Error when sending relayd sockets on sock %d", rsock->sock.fd);
goto error;
}
}
DBG3("Sending relayd socket file descriptor to consumer");
- ret = consumer_send_fds(consumer_sock, &sock->fd, 1);
+ ret = consumer_send_fds(consumer_sock, &rsock->sock.fd, 1);
if (ret < 0) {
goto error;
}
goto end;
}
- DBG3("Consumer pushing metadata on sock %d of len %lu", socket->fd, len);
+ DBG3("Consumer pushing metadata on sock %d of len %zu", socket->fd, len);
ret = lttcomm_send_unix_sock(socket->fd, metadata_str, len);
if (ret < 0) {
health_code_update();
return ret;
}
+
+/*
+ * Ask the consumer to snapshot a specific channel using the key.
+ *
+ * Return 0 on success or else a negative error.
+ */
+int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
+ struct snapshot_output *output, int metadata, uid_t uid, gid_t gid,
+ const char *session_path, int wait)
+{
+ int ret;
+ char datetime[16];
+ struct lttcomm_consumer_msg msg;
+
+ assert(socket);
+ assert(socket->fd >= 0);
+ assert(output);
+ assert(output->consumer);
+
+ DBG("Consumer snapshot channel key %" PRIu64, key);
+
+ ret = utils_get_current_time_str("%Y%m%d-%H%M%S", datetime,
+ sizeof(datetime));
+ if (!ret) {
+ ret = -EINVAL;
+ goto error;
+ }
+
+ memset(&msg, 0, sizeof(msg));
+ msg.cmd_type = LTTNG_CONSUMER_SNAPSHOT_CHANNEL;
+ msg.u.snapshot_channel.key = key;
+ msg.u.snapshot_channel.max_size = output->max_size;
+ msg.u.snapshot_channel.metadata = metadata;
+
+ if (output->consumer->type == CONSUMER_DST_NET) {
+ msg.u.snapshot_channel.relayd_id = output->consumer->net_seq_index;
+ msg.u.snapshot_channel.use_relayd = 1;
+ ret = snprintf(msg.u.snapshot_channel.pathname,
+ sizeof(msg.u.snapshot_channel.pathname), "%s/%s-%s%s",
+ output->consumer->subdir, output->name, datetime,
+ session_path);
+ if (ret < 0) {
+ ret = -LTTNG_ERR_NOMEM;
+ goto error;
+ }
+ } else {
+ ret = snprintf(msg.u.snapshot_channel.pathname,
+ sizeof(msg.u.snapshot_channel.pathname), "%s/%s-%s%s",
+ output->consumer->dst.trace_path, output->name, datetime,
+ session_path);
+ if (ret < 0) {
+ ret = -LTTNG_ERR_NOMEM;
+ goto error;
+ }
+ msg.u.snapshot_channel.relayd_id = (uint64_t) -1ULL;
+
+ /* Create directory. Ignore if exist. */
+ ret = run_as_mkdir_recursive(msg.u.snapshot_channel.pathname,
+ S_IRWXU | S_IRWXG, uid, gid);
+ if (ret < 0) {
+ if (ret != -EEXIST) {
+ ERR("Trace directory creation error");
+ goto error;
+ }
+ }
+ }
+
+ health_code_update();
+ ret = consumer_send_msg(socket, &msg);
+ if (ret < 0) {
+ goto error;
+ }
+
+error:
+ health_code_update();
+ return ret;
+}