Move add data stream to the data thread
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
index f910f033d9ea96e283f3ee5705e6cf9d599efef8..1d725c2318b74029feb878289cfdfde55fa082fd 100644 (file)
@@ -118,7 +118,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
                                -1, -1,
                                msg.u.channel.mmap_len,
-                               msg.u.channel.max_sb_size);
+                               msg.u.channel.max_sb_size,
+                               msg.u.channel.nb_init_streams);
                if (new_channel == NULL) {
                        lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
                        goto end_nosignal;
@@ -140,6 +141,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                int fd;
                struct consumer_relayd_sock_pair *relayd = NULL;
                struct lttng_consumer_stream *new_stream;
+               int alloc_ret = 0;
 
                /* block */
                if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
@@ -165,9 +167,23 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                msg.u.stream.uid,
                                msg.u.stream.gid,
                                msg.u.stream.net_index,
-                               msg.u.stream.metadata_flag);
+                               msg.u.stream.metadata_flag,
+                               &alloc_ret);
                if (new_stream == NULL) {
-                       lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
+                       switch (alloc_ret) {
+                       case -ENOMEM:
+                       case -EINVAL:
+                       default:
+                               lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
+                               break;
+                       case -ENOENT:
+                               /*
+                                * We could not find the channel. Can happen if cpu hotplug
+                                * happens while tearing down.
+                                */
+                               DBG3("Could not find channel");
+                               break;
+                       }
                        goto end_nosignal;
                }
 
@@ -190,39 +206,45 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                        &new_stream->relayd_stream_id);
                        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                        if (ret < 0) {
+                               consumer_del_stream(new_stream, NULL);
                                goto end_nosignal;
                        }
                } else if (msg.u.stream.net_index != -1) {
                        ERR("Network sequence index %d unknown. Not adding stream.",
                                        msg.u.stream.net_index);
-                       free(new_stream);
+                       consumer_del_stream(new_stream, NULL);
                        goto end_nosignal;
                }
 
-               /* Send stream to the metadata thread */
-               if (new_stream->metadata_flag) {
-                       if (ctx->on_recv_stream) {
-                               ret = ctx->on_recv_stream(new_stream);
-                               if (ret < 0) {
-                                       goto end_nosignal;
-                               }
+               if (ctx->on_recv_stream) {
+                       ret = ctx->on_recv_stream(new_stream);
+                       if (ret < 0) {
+                               consumer_del_stream(new_stream, NULL);
+                               goto end_nosignal;
                        }
+               }
 
+               /* Send stream to the metadata thread */
+               if (new_stream->metadata_flag) {
                        do {
-                               ret = write(ctx->consumer_metadata_pipe[1], new_stream,
-                                               sizeof(struct lttng_consumer_stream));
+                               ret = write(ctx->consumer_metadata_pipe[1], &new_stream,
+                                               sizeof(new_stream));
                        } while (ret < 0 && errno == EINTR);
                        if (ret < 0) {
                                PERROR("write metadata pipe");
+                               consumer_del_stream(new_stream, NULL);
+                               goto end_nosignal;
                        }
                } else {
-                       if (ctx->on_recv_stream) {
-                               ret = ctx->on_recv_stream(new_stream);
-                               if (ret < 0) {
-                                       goto end_nosignal;
-                               }
+                       do {
+                               ret = write(ctx->consumer_poll_pipe[1], &new_stream,
+                                               sizeof(new_stream));
+                       } while (ret < 0 && errno == EINTR);
+                       if (ret < 0) {
+                               PERROR("write data pipe");
+                               consumer_del_stream(new_stream, NULL);
+                               goto end_nosignal;
                        }
-                       consumer_add_stream(new_stream);
                }
 
                DBG("Kernel consumer_add_stream (%d)", fd);
@@ -265,20 +287,6 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                goto end_nosignal;
        }
 
-       /*
-        * Wake-up the other end by writing a null byte in the pipe (non-blocking).
-        * Important note: Because writing into the pipe is non-blocking (and
-        * therefore we allow dropping wakeup data, as long as there is wakeup data
-        * present in the pipe buffer to wake up the other end), the other end
-        * should perform the following sequence for waiting:
-        *
-        * 1) empty the pipe (reads).
-        * 2) perform update operation.
-        * 3) wait on the pipe (poll).
-        */
-       do {
-               ret = write(ctx->consumer_poll_pipe[1], "", 1);
-       } while (ret < 0 && errno == EINTR);
 end_nosignal:
        rcu_read_unlock();
 
This page took 0.025886 seconds and 4 git commands to generate.