Add consumer socket object and relayd commands
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index 7ce03ad87dde9b442015f3dc5af9b05697e2d8a5..07a68d8f755da7813d765357b5a2379f86b0b589 100644 (file)
@@ -212,6 +212,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
        ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
        if (ret != sizeof(msg)) {
+               DBG("Consumer received unexpected message size %zd (expects %zu)",
+                       ret, sizeof(msg));
                lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
                return ret;
        }
@@ -307,6 +309,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                int fds[1];
                size_t nb_fd = 1;
 
+               DBG("UST Consumer adding channel");
+
                /* block */
                if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
                        return -EINTR;
@@ -346,6 +350,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                size_t nb_fd = 2;
                struct consumer_relayd_sock_pair *relayd = NULL;
 
+               DBG("UST Consumer adding stream");
+
                /* block */
                if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
                        return -EINTR;
@@ -356,8 +362,12 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        return ret;
                }
 
+               DBG("consumer_add_stream chan %d stream %d",
+                               msg.u.stream.channel_key,
+                               msg.u.stream.stream_key);
+
                assert(msg.u.stream.output == LTTNG_EVENT_MMAP);
-               new_stream = consumer_allocate_stream(msg.u.channel.channel_key,
+               new_stream = consumer_allocate_stream(msg.u.stream.channel_key,
                                msg.u.stream.stream_key,
                                fds[0], fds[1],
                                msg.u.stream.state,
@@ -408,6 +418,29 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                new_stream->relayd_stream_id);
                break;
        }
+       case LTTNG_CONSUMER_DESTROY_RELAYD:
+       {
+               struct consumer_relayd_sock_pair *relayd;
+
+               DBG("UST consumer destroying relayd %zu",
+                               msg.u.destroy_relayd.net_seq_idx);
+
+               /* Get relayd reference if exists. */
+               relayd = consumer_find_relayd(msg.u.destroy_relayd.net_seq_idx);
+               if (relayd == NULL) {
+                       ERR("Unable to find relayd %zu",
+                                       msg.u.destroy_relayd.net_seq_idx);
+               }
+
+               /* Set destroy flag for this object */
+               uatomic_set(&relayd->destroy_flag, 1);
+
+               /* Destroy the relayd if refcount is 0 else set the destroy flag. */
+               if (uatomic_read(&relayd->refcount) == 0) {
+                       consumer_destroy_relayd(relayd);
+               }
+               break;
+       }
        case LTTNG_CONSUMER_UPDATE_STREAM:
        {
                return -ENOSYS;
This page took 0.024616 seconds and 4 git commands to generate.