#include <common/uri.h>
#include "consumer.h"
+#include "health.h"
+#include "ust-app.h"
/*
* Receive a reply command status message from the consumer. Consumer socket
return ret;
}
+/*
+ * Return the consumer socket from the given consumer output with the right
+ * bitness. On error, returns NULL.
+ *
+ * The caller MUST acquire a rcu read side lock and keep it until the socket
+ * object reference is not needed anymore.
+ */
+struct consumer_socket *consumer_find_socket_by_bitness(int bits,
+ struct consumer_output *consumer)
+{
+ int consumer_fd;
+ struct consumer_socket *socket = NULL;
+
+ switch (bits) {
+ case 64:
+ consumer_fd = uatomic_read(&ust_consumerd64_fd);
+ break;
+ case 32:
+ consumer_fd = uatomic_read(&ust_consumerd32_fd);
+ break;
+ default:
+ assert(0);
+ goto end;
+ }
+
+ socket = consumer_find_socket(consumer_fd, consumer);
+ if (!socket) {
+ ERR("Consumer socket fd %d not found in consumer obj %p",
+ consumer_fd, consumer);
+ }
+
+end:
+ return socket;
+}
+
/*
* Find a consumer_socket in a consumer_output hashtable. Read side lock must
* be acquired before calling this function and across use of the
gid_t gid,
uint64_t relayd_id,
uint64_t key,
- unsigned char *uuid)
+ unsigned char *uuid,
+ uint32_t chan_id)
{
assert(msg);
msg->u.ask_channel.gid = gid;
msg->u.ask_channel.relayd_id = relayd_id;
msg->u.ask_channel.key = key;
+ msg->u.ask_channel.chan_id = chan_id;
memcpy(msg->u.ask_channel.uuid, uuid, sizeof(msg->u.ask_channel.uuid));
rcu_read_unlock();
return -1;
}
+
+/*
+ * Send a flush command to consumer using the given channel key.
+ *
+ * Return 0 on success else a negative value.
+ */
+int consumer_flush_channel(struct consumer_socket *socket, uint64_t key)
+{
+ int ret;
+ struct lttcomm_consumer_msg msg;
+
+ assert(socket);
+ assert(socket->fd >= 0);
+
+ DBG2("Consumer flush channel key %" PRIu64, key);
+
+ msg.cmd_type = LTTNG_CONSUMER_FLUSH_CHANNEL;
+ msg.u.flush_channel.key = key;
+
+ pthread_mutex_lock(socket->lock);
+ health_code_update();
+
+ ret = consumer_send_msg(socket, &msg);
+ if (ret < 0) {
+ goto end;
+ }
+
+end:
+ health_code_update();
+ pthread_mutex_unlock(socket->lock);
+ return ret;
+}
+
+/*
+ * Send a close metdata command to consumer using the given channel key.
+ *
+ * Return 0 on success else a negative value.
+ */
+int consumer_close_metadata(struct consumer_socket *socket,
+ uint64_t metadata_key)
+{
+ int ret;
+ struct lttcomm_consumer_msg msg;
+
+ assert(socket);
+ assert(socket->fd >= 0);
+
+ DBG2("Consumer close metadata channel key %" PRIu64, metadata_key);
+
+ msg.cmd_type = LTTNG_CONSUMER_CLOSE_METADATA;
+ msg.u.close_metadata.key = metadata_key;
+
+ pthread_mutex_lock(socket->lock);
+ health_code_update();
+
+ ret = consumer_send_msg(socket, &msg);
+ if (ret < 0) {
+ goto end;
+ }
+
+end:
+ health_code_update();
+ pthread_mutex_unlock(socket->lock);
+ return ret;
+}
+
+/*
+ * Send a setup metdata command to consumer using the given channel key.
+ *
+ * Return 0 on success else a negative value.
+ */
+int consumer_setup_metadata(struct consumer_socket *socket,
+ uint64_t metadata_key)
+{
+ int ret;
+ struct lttcomm_consumer_msg msg;
+
+ assert(socket);
+ assert(socket->fd >= 0);
+
+ DBG2("Consumer setup metadata channel key %" PRIu64, metadata_key);
+
+ msg.cmd_type = LTTNG_CONSUMER_SETUP_METADATA;
+ msg.u.setup_metadata.key = metadata_key;
+
+ pthread_mutex_lock(socket->lock);
+ health_code_update();
+
+ ret = consumer_send_msg(socket, &msg);
+ if (ret < 0) {
+ goto end;
+ }
+
+end:
+ health_code_update();
+ pthread_mutex_unlock(socket->lock);
+ return ret;
+}
+
+/*
+ * Send metadata string to consumer.
+ *
+ * Return 0 on success else a negative value.
+ */
+int consumer_push_metadata(struct consumer_socket *socket,
+ uint64_t metadata_key, char *metadata_str, size_t len,
+ size_t target_offset)
+{
+ int ret;
+ struct lttcomm_consumer_msg msg;
+
+ assert(socket);
+ assert(socket->fd >= 0);
+
+ DBG2("Consumer push metadata to consumer socket %d", socket->fd);
+
+ msg.cmd_type = LTTNG_CONSUMER_PUSH_METADATA;
+ msg.u.push_metadata.key = metadata_key;
+ msg.u.push_metadata.target_offset = target_offset;
+ msg.u.push_metadata.len = len;
+
+ /*
+ * TODO: reenable these locks when the consumerd gets the ability to
+ * reorder the metadata it receives. This fits with locking in
+ * src/bin/lttng-sessiond/ust-app.c:push_metadata()
+ *
+ * pthread_mutex_lock(socket->lock);
+ */
+
+ health_code_update();
+ ret = consumer_send_msg(socket, &msg);
+ if (ret < 0) {
+ goto end;
+ }
+
+ DBG3("Consumer pushing metadata on sock %d of len %lu", socket->fd, len);
+
+ ret = lttcomm_send_unix_sock(socket->fd, metadata_str, len);
+ if (ret < 0) {
+ goto end;
+ }
+
+ health_code_update();
+ ret = consumer_recv_status_reply(socket);
+ if (ret < 0) {
+ goto end;
+ }
+
+end:
+ health_code_update();
+ /*
+ * pthread_mutex_unlock(socket->lock);
+ */
+ return ret;
+}