Fix: race with the viewer and readiness of streams
[lttng-tools.git] / src / bin / lttng-sessiond / consumer.c
index d79dee3cafd99c1232f2458a985561fb0e7a1875..8806e9c322bef43b818975bee8f44bc520e3aa53 100644 (file)
 #include <common/common.h>
 #include <common/defaults.h>
 #include <common/uri.h>
+#include <common/relayd/relayd.h>
 
 #include "consumer.h"
-#include "health.h"
+#include "health-sessiond.h"
 #include "ust-app.h"
 #include "utils.h"
 
@@ -62,8 +63,8 @@ int consumer_socket_send(struct consumer_socket *socket, void *msg, size_t len)
                /* The above call will print a PERROR on error. */
                DBG("Error when sending data to consumer on sock %d", fd);
                /*
-                * At this point, the socket is not usable anymore thus flagging it
-                * invalid and closing it.
+                * At this point, the socket is not usable anymore thus closing it and
+                * setting the file descriptor to -1 so it is not reused.
                 */
 
                /* This call will PERROR on error. */
@@ -106,8 +107,8 @@ int consumer_socket_recv(struct consumer_socket *socket, void *msg, size_t len)
                /* The above call will print a PERROR on error. */
                DBG("Error when receiving data from the consumer socket %d", fd);
                /*
-                * At this point, the socket is not usable anymore thus flagging it
-                * invalid and closing it.
+                * At this point, the socket is not usable anymore thus closing it and
+                * setting the file descriptor to -1 so it is not reused.
                 */
 
                /* This call will PERROR on error. */
@@ -141,7 +142,7 @@ int consumer_recv_status_reply(struct consumer_socket *sock)
                goto end;
        }
 
-       if (reply.ret_code == LTTNG_OK) {
+       if (reply.ret_code == LTTCOMM_CONSUMERD_SUCCESS) {
                /* All good. */
                ret = 0;
        } else {
@@ -177,13 +178,14 @@ int consumer_recv_status_channel(struct consumer_socket *sock,
        }
 
        /* An error is possible so don't touch the key and stream_count. */
-       if (reply.ret_code != LTTNG_OK) {
+       if (reply.ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
                ret = -1;
                goto end;
        }
 
        *key = reply.key;
        *stream_count = reply.stream_count;
+       ret = 0;
 
 end:
        return ret;
@@ -781,6 +783,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
                int overwrite,
                unsigned int switch_timer_interval,
                unsigned int read_timer_interval,
+               unsigned int live_timer_interval,
                int output,
                int type,
                uint64_t session_id,
@@ -809,6 +812,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
        msg->u.ask_channel.overwrite = overwrite;
        msg->u.ask_channel.switch_timer_interval = switch_timer_interval;
        msg->u.ask_channel.read_timer_interval = read_timer_interval;
+       msg->u.ask_channel.live_timer_interval = live_timer_interval;
        msg->u.ask_channel.output = output;
        msg->u.ask_channel.type = type;
        msg->u.ask_channel.session_id = session_id;
@@ -852,7 +856,8 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
                int type,
                uint64_t tracefile_size,
                uint64_t tracefile_count,
-               unsigned int monitor)
+               unsigned int monitor,
+               unsigned int live_timer_interval)
 {
        assert(msg);
 
@@ -872,6 +877,7 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
        msg->u.channel.tracefile_size = tracefile_size;
        msg->u.channel.tracefile_count = tracefile_count;
        msg->u.channel.monitor = monitor;
+       msg->u.channel.live_timer_interval = live_timer_interval;
 
        strncpy(msg->u.channel.pathname, pathname,
                        sizeof(msg->u.channel.pathname));
@@ -900,6 +906,19 @@ void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
        msg->u.stream.cpu = cpu;
 }
 
+void consumer_init_streams_sent_comm_msg(struct lttcomm_consumer_msg *msg,
+               enum lttng_consumer_command cmd,
+               uint64_t channel_key, uint64_t net_seq_idx)
+{
+       assert(msg);
+
+       memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
+
+       msg->cmd_type = cmd;
+       msg->u.sent_streams.channel_key = channel_key;
+       msg->u.sent_streams.net_seq_idx = net_seq_idx;
+}
+
 /*
  * Send stream communication structure to the consumer.
  */
@@ -935,7 +954,8 @@ error:
  */
 int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
                struct lttcomm_relayd_sock *rsock, struct consumer_output *consumer,
-               enum lttng_stream_type type, uint64_t session_id)
+               enum lttng_stream_type type, uint64_t session_id,
+               char *session_name, char *hostname, int session_live_timer)
 {
        int ret;
        struct lttcomm_consumer_msg msg;
@@ -951,6 +971,18 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
                goto error;
        }
 
+       if (type == LTTNG_STREAM_CONTROL) {
+               ret = relayd_create_session(rsock,
+                               &msg.u.relayd_sock.relayd_session_id,
+                               session_name, hostname, session_live_timer,
+                               consumer->snapshot);
+               if (ret < 0) {
+                       /* Close the control socket. */
+                       (void) relayd_close(rsock);
+                       goto error;
+               }
+       }
+
        msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET;
        /*
         * Assign network consumer output index using the temporary consumer since
This page took 0.025203 seconds and 4 git commands to generate.