Move add data stream to the data thread
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index 76238a087976ff5ca341040ecbafecfcb3d0b0a0..718887971abb7b280565613b0b08643e8232304b 100644 (file)
@@ -234,12 +234,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                        &new_stream->relayd_stream_id);
                        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                        if (ret < 0) {
+                               consumer_del_stream(new_stream, NULL);
                                goto end_nosignal;
                        }
                } else if (msg.u.stream.net_index != -1) {
                        ERR("Network sequence index %d unknown. Not adding stream.",
                                        msg.u.stream.net_index);
-                       free(new_stream);
+                       consumer_del_stream(new_stream, NULL);
                        goto end_nosignal;
                }
 
@@ -247,6 +248,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                if (ctx->on_recv_stream) {
                        ret = ctx->on_recv_stream(new_stream);
                        if (ret < 0) {
+                               consumer_del_stream(new_stream, NULL);
                                goto end_nosignal;
                        }
                }
@@ -259,9 +261,19 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        } while (ret < 0 && errno == EINTR);
                        if (ret < 0) {
                                PERROR("write metadata pipe");
+                               consumer_del_metadata_stream(new_stream, NULL);
+                               goto end_nosignal;
                        }
                } else {
-                       consumer_add_stream(new_stream);
+                       do {
+                               ret = write(ctx->consumer_poll_pipe[1], &new_stream,
+                                               sizeof(new_stream));
+                       } while (ret < 0 && errno == EINTR);
+                       if (ret < 0) {
+                               PERROR("write data pipe");
+                               consumer_del_stream(new_stream, NULL);
+                               goto end_nosignal;
+                       }
                }
 
                DBG("UST consumer_add_stream %s (%d,%d) with relayd id %" PRIu64,
@@ -320,20 +332,6 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                break;
        }
 
-       /*
-        * Wake-up the other end by writing a null byte in the pipe (non-blocking).
-        * Important note: Because writing into the pipe is non-blocking (and
-        * therefore we allow dropping wakeup data, as long as there is wakeup data
-        * present in the pipe buffer to wake up the other end), the other end
-        * should perform the following sequence for waiting:
-        *
-        * 1) empty the pipe (reads).
-        * 2) perform update operation.
-        * 3) wait on the pipe (poll).
-        */
-       do {
-               ret = write(ctx->consumer_poll_pipe[1], "", 1);
-       } while (ret < 0 && errno == EINTR);
 end_nosignal:
        rcu_read_unlock();
 
@@ -373,7 +371,7 @@ void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
        ustctl_unmap_channel(chan->handle);
 }
 
-int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream)
+int lttng_ustconsumer_add_stream(struct lttng_consumer_stream *stream)
 {
        struct lttng_ust_object_data obj;
        int ret;
@@ -385,13 +383,14 @@ int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream)
        ret = ustctl_add_stream(stream->chan->handle, &obj);
        if (ret) {
                ERR("UST ctl add_stream failed with ret %d", ret);
-               return ret;
+               goto error;
        }
 
        stream->buf = ustctl_open_stream_read(stream->chan->handle, stream->cpu);
        if (!stream->buf) {
                ERR("UST ctl open_stream_read failed");
-               return -EBUSY;
+               ret = -EBUSY;
+               goto error;
        }
 
        /* ustctl_open_stream_read has closed the shm fd. */
@@ -401,10 +400,16 @@ int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream)
        stream->mmap_base = ustctl_get_mmap_base(stream->chan->handle, stream->buf);
        if (!stream->mmap_base) {
                ERR("UST ctl get_mmap_base failed");
-               return -EINVAL;
+               ret = -EINVAL;
+               goto mmap_error;
        }
 
        return 0;
+
+mmap_error:
+       ustctl_close_stream_read(stream->chan->handle, stream->buf);
+error:
+       return ret;
 }
 
 void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream)
@@ -507,6 +512,13 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
                stream->out_fd = ret;
        }
 
+       ret = lttng_ustconsumer_add_stream(stream);
+       if (ret) {
+               consumer_del_stream(stream, NULL);
+               ret = -1;
+               goto error;
+       }
+
        /* we return 0 to let the library handle the FD internally */
        return 0;
 
This page took 0.024485 seconds and 4 git commands to generate.