Send indexes in streaming mode
[lttng-tools.git] / src / common / relayd / relayd.c
index bed0933d1102ae4d13454ba3d152fd86d831f377..bb20a64a0cc5d6cb8ffc04f2439c0c581f71803b 100644 (file)
@@ -26,6 +26,7 @@
 #include <common/common.h>
 #include <common/defaults.h>
 #include <common/sessiond-comm/relayd.h>
+#include <common/index/lttng-index.h>
 
 #include "relayd.h"
 
@@ -41,6 +42,10 @@ static int send_command(struct lttcomm_relayd_sock *rsock,
        char *buf;
        uint64_t buf_size = sizeof(header);
 
+       if (rsock->sock.fd < 0) {
+               return -ECONNRESET;
+       }
+
        if (data) {
                buf_size += size;
        }
@@ -87,7 +92,11 @@ static int recv_reply(struct lttcomm_relayd_sock *rsock, void *data, size_t size
 {
        int ret;
 
-       DBG3("Relayd waiting for reply of size %ld", size);
+       if (rsock->sock.fd < 0) {
+               return -ECONNRESET;
+       }
+
+       DBG3("Relayd waiting for reply of size %zu", size);
 
        ret = rsock->sock.ops->recvmsg(&rsock->sock, data, size, 0);
        if (ret <= 0 || ret != size) {
@@ -95,7 +104,7 @@ static int recv_reply(struct lttcomm_relayd_sock *rsock, void *data, size_t size
                        /* Orderly shutdown. */
                        DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
                } else {
-                       DBG("Receiving reply failed on sock %d for size %lu with ret %d",
+                       DBG("Receiving reply failed on sock %d for size %zu with ret %d",
                                        rsock->sock.fd, size, ret);
                }
                /* Always return -1 here and the caller can use errno. */
@@ -161,10 +170,12 @@ error:
  * On success return 0 else return ret_code negative value.
  */
 int relayd_add_stream(struct lttcomm_relayd_sock *rsock, const char *channel_name,
-               const char *pathname, uint64_t *stream_id)
+               const char *pathname, uint64_t *stream_id,
+               uint64_t tracefile_size, uint64_t tracefile_count)
 {
        int ret;
        struct lttcomm_relayd_add_stream msg;
+       struct lttcomm_relayd_add_stream_2_2 msg_2_2;
        struct lttcomm_relayd_status_stream reply;
 
        /* Code flow error. Safety net. */
@@ -174,13 +185,28 @@ int relayd_add_stream(struct lttcomm_relayd_sock *rsock, const char *channel_nam
 
        DBG("Relayd adding stream for channel name %s", channel_name);
 
-       strncpy(msg.channel_name, channel_name, sizeof(msg.channel_name));
-       strncpy(msg.pathname, pathname, sizeof(msg.pathname));
+       /* Compat with relayd 2.1 */
+       if (rsock->minor == 1) {
+               strncpy(msg.channel_name, channel_name, sizeof(msg.channel_name));
+               strncpy(msg.pathname, pathname, sizeof(msg.pathname));
 
-       /* Send command */
-       ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
-       if (ret < 0) {
-               goto error;
+               /* Send command */
+               ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
+               if (ret < 0) {
+                       goto error;
+               }
+       } else {
+               /* Compat with relayd 2.2+ */
+               strncpy(msg_2_2.channel_name, channel_name, sizeof(msg_2_2.channel_name));
+               strncpy(msg_2_2.pathname, pathname, sizeof(msg_2_2.pathname));
+               msg_2_2.tracefile_size = htobe64(tracefile_size);
+               msg_2_2.tracefile_count = htobe64(tracefile_count);
+
+               /* Send command */
+               ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg_2_2, sizeof(msg_2_2), 0);
+               if (ret < 0) {
+                       goto error;
+               }
        }
 
        /* Waiting for reply */
@@ -320,6 +346,13 @@ int relayd_connect(struct lttcomm_relayd_sock *rsock)
        /* Code flow error. Safety net. */
        assert(rsock);
 
+       if (!rsock->sock.ops) {
+               /*
+                * Attempting a connect on a non-initialized socket.
+                */
+               return -ECONNRESET;
+       }
+
        DBG3("Relayd connect ...");
 
        return rsock->sock.ops->connect(&rsock->sock);
@@ -362,6 +395,7 @@ int relayd_close(struct lttcomm_relayd_sock *rsock)
                        PERROR("relayd_close default close");
                }
        }
+       rsock->sock.fd = -1;
 
 end:
        return ret;
@@ -379,7 +413,11 @@ int relayd_send_data_hdr(struct lttcomm_relayd_sock *rsock,
        assert(rsock);
        assert(hdr);
 
-       DBG3("Relayd sending data header of size %ld", size);
+       if (rsock->sock.fd < 0) {
+               return -ECONNRESET;
+       }
+
+       DBG3("Relayd sending data header of size %zu", size);
 
        /* Again, safety net */
        if (size == 0) {
@@ -634,3 +672,63 @@ int relayd_end_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id,
 error:
        return ret;
 }
+
+/*
+ * Send index to the relayd.
+ */
+int relayd_send_index(struct lttcomm_relayd_sock *rsock,
+               struct lttng_packet_index *index, uint64_t relay_stream_id,
+               uint64_t net_seq_num)
+{
+       int ret;
+       struct lttcomm_relayd_index msg;
+       struct lttcomm_relayd_generic_reply reply;
+
+       /* Code flow error. Safety net. */
+       assert(rsock);
+
+       if (rsock->minor < 4) {
+               DBG("Not sending indexes before protocol 2.4");
+               ret = 0;
+               goto error;
+       }
+
+       DBG("Relayd sending index for stream ID %" PRIu64, relay_stream_id);
+
+       msg.relay_stream_id = htobe64(relay_stream_id);
+       msg.net_seq_num = htobe64(net_seq_num);
+
+       /* The index is already in big endian. */
+       msg.packet_size = index->packet_size;
+       msg.content_size = index->content_size;
+       msg.timestamp_begin = index->timestamp_begin;
+       msg.timestamp_end = index->timestamp_end;
+       msg.events_discarded = index->events_discarded;
+       msg.stream_id = index->stream_id;
+
+       /* Send command */
+       ret = send_command(rsock, RELAYD_SEND_INDEX, &msg, sizeof(msg), 0);
+       if (ret < 0) {
+               goto error;
+       }
+
+       /* Receive response */
+       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
+       if (ret < 0) {
+               goto error;
+       }
+
+       reply.ret_code = be32toh(reply.ret_code);
+
+       /* Return session id or negative ret code. */
+       if (reply.ret_code != LTTNG_OK) {
+               ret = -1;
+               ERR("Relayd send index replied error %d", reply.ret_code);
+       } else {
+               /* Success */
+               ret = 0;
+       }
+
+error:
+       return ret;
+}
This page took 0.026737 seconds and 4 git commands to generate.