Fix: Stream allocation and insertion consistency
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index 8ab2b819dcee52a237518b1ce6f8cba5f3e1aa1f..11706877a7f5b3f147034abcf106a0bb5e77c5d2 100644 (file)
@@ -150,7 +150,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
                                fds[0], -1,
                                msg.u.channel.mmap_len,
-                               msg.u.channel.max_sb_size);
+                               msg.u.channel.max_sb_size,
+                               msg.u.channel.nb_init_streams);
                if (new_channel == NULL) {
                        lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
                        goto end_nosignal;
@@ -173,6 +174,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                int fds[2];
                size_t nb_fd = 2;
                struct consumer_relayd_sock_pair *relayd = NULL;
+               int alloc_ret = 0;
 
                DBG("UST Consumer adding stream");
 
@@ -188,9 +190,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        return ret;
                }
 
-               DBG("consumer_add_stream chan %d stream %d",
-                               msg.u.stream.channel_key,
-                               msg.u.stream.stream_key);
+               DBG("Consumer command ADD_STREAM chan %d stream %d",
+                               msg.u.stream.channel_key, msg.u.stream.stream_key);
 
                assert(msg.u.stream.output == LTTNG_EVENT_MMAP);
                new_stream = consumer_allocate_stream(msg.u.stream.channel_key,
@@ -203,9 +204,23 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                msg.u.stream.uid,
                                msg.u.stream.gid,
                                msg.u.stream.net_index,
-                               msg.u.stream.metadata_flag);
+                               msg.u.stream.metadata_flag,
+                               &alloc_ret);
                if (new_stream == NULL) {
-                       lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
+                       switch (alloc_ret) {
+                       case -ENOMEM:
+                       case -EINVAL:
+                       default:
+                               lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
+                               break;
+                       case -ENOENT:
+                               /*
+                                * We could not find the channel. Can happen if cpu hotplug
+                                * happens while tearing down.
+                                */
+                               DBG3("Could not find channel");
+                               break;
+                       }
                        goto end_nosignal;
                }
 
@@ -219,39 +234,48 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                        &new_stream->relayd_stream_id);
                        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                        if (ret < 0) {
+                               consumer_del_stream(new_stream, NULL);
                                goto end_nosignal;
                        }
                } else if (msg.u.stream.net_index != -1) {
                        ERR("Network sequence index %d unknown. Not adding stream.",
                                        msg.u.stream.net_index);
-                       free(new_stream);
+                       consumer_del_stream(new_stream, NULL);
                        goto end_nosignal;
                }
 
-               /* Send stream to the metadata thread */
-               if (new_stream->metadata_flag) {
-                       if (ctx->on_recv_stream) {
-                               ret = ctx->on_recv_stream(new_stream);
-                               if (ret < 0) {
-                                       goto end_nosignal;
-                               }
+               /* Do actions once stream has been received. */
+               if (ctx->on_recv_stream) {
+                       ret = ctx->on_recv_stream(new_stream);
+                       if (ret < 0) {
+                               consumer_del_stream(new_stream, NULL);
+                               goto end_nosignal;
                        }
+               }
 
+               /* Send stream to the metadata thread */
+               if (new_stream->metadata_flag) {
                        do {
-                               ret = write(ctx->consumer_metadata_pipe[1], new_stream,
-                                               sizeof(struct lttng_consumer_stream));
+                               ret = write(ctx->consumer_metadata_pipe[1], &new_stream,
+                                               sizeof(new_stream));
                        } while (ret < 0 && errno == EINTR);
                        if (ret < 0) {
                                PERROR("write metadata pipe");
+                               consumer_del_metadata_stream(new_stream, NULL);
+                               goto end_nosignal;
                        }
                } else {
-                       if (ctx->on_recv_stream) {
-                               ret = ctx->on_recv_stream(new_stream);
-                               if (ret < 0) {
-                                       goto end_nosignal;
-                               }
+                       ret = consumer_add_stream(new_stream);
+                       if (ret) {
+                               ERR("Consumer add stream %d failed. Continuing",
+                                               new_stream->key);
+                               /*
+                                * At this point, if the add_stream fails, it is not in the
+                                * hash table thus passing the NULL value here.
+                                */
+                               consumer_del_stream(new_stream, NULL);
+                               goto end_nosignal;
                        }
-                       consumer_add_stream(new_stream);
                }
 
                DBG("UST consumer_add_stream %s (%d,%d) with relayd id %" PRIu64,
@@ -363,7 +387,7 @@ void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
        ustctl_unmap_channel(chan->handle);
 }
 
-int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream)
+int lttng_ustconsumer_add_stream(struct lttng_consumer_stream *stream)
 {
        struct lttng_ust_object_data obj;
        int ret;
@@ -373,21 +397,35 @@ int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream)
        obj.wait_fd = stream->wait_fd;
        obj.memory_map_size = stream->mmap_len;
        ret = ustctl_add_stream(stream->chan->handle, &obj);
-       if (ret)
-               return ret;
+       if (ret) {
+               ERR("UST ctl add_stream failed with ret %d", ret);
+               goto error;
+       }
+
        stream->buf = ustctl_open_stream_read(stream->chan->handle, stream->cpu);
-       if (!stream->buf)
-               return -EBUSY;
+       if (!stream->buf) {
+               ERR("UST ctl open_stream_read failed");
+               ret = -EBUSY;
+               goto error;
+       }
+
        /* ustctl_open_stream_read has closed the shm fd. */
        stream->wait_fd_is_copy = 1;
        stream->shm_fd = -1;
 
        stream->mmap_base = ustctl_get_mmap_base(stream->chan->handle, stream->buf);
        if (!stream->mmap_base) {
-               return -EINVAL;
+               ERR("UST ctl get_mmap_base failed");
+               ret = -EINVAL;
+               goto mmap_error;
        }
 
        return 0;
+
+mmap_error:
+       ustctl_close_stream_read(stream->chan->handle, stream->buf);
+error:
+       return ret;
 }
 
 void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream)
@@ -465,7 +503,6 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                ERR("Error writing to tracefile "
                                "(ret: %zd != len: %lu != subbuf_size: %lu)",
                                ret, len, subbuf_size);
-
        }
        err = ustctl_put_next_subbuf(handle, buf);
        assert(err == 0);
This page took 0.025153 seconds and 4 git commands to generate.