Fix: consumer should await for initial streams
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Fri, 28 Sep 2012 19:39:42 +0000 (15:39 -0400)
committerDavid Goulet <dgoulet@efficios.com>
Fri, 28 Sep 2012 20:00:56 +0000 (16:00 -0400)
lttng-sessiond need to let the consumer know how many streams are sent
initially, so that for very short traces (short-lived apps, short kernel
trace), the consumerd don't run into the scenario where it deletes the
channel when there are still pending streams to receive for this
channel.

Fixes #355

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: David Goulet <dgoulet@efficios.com>
src/bin/lttng-sessiond/consumer.c
src/bin/lttng-sessiond/consumer.h
src/bin/lttng-sessiond/kernel-consumer.c
src/bin/lttng-sessiond/ust-app.c
src/bin/lttng-sessiond/ust-consumer.c
src/common/consumer.c
src/common/consumer.h
src/common/kernel-consumer/kernel-consumer.c
src/common/sessiond-comm/sessiond-comm.h
src/common/ust-consumer/ust-consumer.c

index b69df16fdb43fcf29c4aa60098924b3c8a675681..d33f85f1bf548659a050c180f517a98c180de7fe 100644 (file)
@@ -486,7 +486,8 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
                int channel_key,
                uint64_t max_sb_size,
                uint64_t mmap_len,
-               const char *name)
+               const char *name,
+               unsigned int nb_init_streams)
 {
        assert(msg);
 
@@ -500,6 +501,7 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
        msg->u.channel.channel_key = channel_key;
        msg->u.channel.max_sb_size = max_sb_size;
        msg->u.channel.mmap_len = mmap_len;
+       msg->u.channel.nb_init_streams = nb_init_streams;
 }
 
 /*
index 5e8ad9b9e7c006bb1999d09a9ba7777c98ff0d94..ec4ef3f3162533be8b0f98880eac41dde943e013 100644 (file)
@@ -195,6 +195,7 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
                int channel_key,
                uint64_t max_sb_size,
                uint64_t mmap_len,
-               const char *name);
+               const char *name,
+               unsigned int nb_init_streams);
 
 #endif /* _CONSUMER_H */
index 825121382529fcd3d8504ee3aeead9403cef838a..33cbbed3e679a42df9db3951b0a2040b681b481c 100644 (file)
@@ -48,7 +48,8 @@ int kernel_consumer_add_channel(int sock, struct ltt_kernel_channel *channel)
                        channel->fd,
                        channel->channel->attr.subbuf_size,
                        0, /* Kernel */
-                       channel->channel->name);
+                       channel->channel->name,
+                       channel->stream_count);
 
        ret = consumer_send_channel(sock, &lkm);
        if (ret < 0) {
@@ -116,7 +117,8 @@ int kernel_consumer_add_metadata(int sock, struct ltt_kernel_session *session)
                        session->metadata->fd,
                        session->metadata->conf->attr.subbuf_size,
                        0, /* for kernel */
-                       "metadata");
+                       "metadata",
+                       1);
 
        ret = consumer_send_channel(sock, &lkm);
        if (ret < 0) {
index 3202cd4e04c12a49a8c6db284def42ae8a08a77f..fc8728dd21da9b4d68a417323f41fbe0c7a6f225 100644 (file)
@@ -2239,7 +2239,8 @@ int ust_app_start_trace(struct ltt_ust_session *usess, struct ust_app *app)
                        /* Order is important */
                        cds_list_add_tail(&ustream->list, &ua_chan->streams.head);
                        ret = snprintf(ustream->name, sizeof(ustream->name), "%s_%u",
-                                       ua_chan->name, ua_chan->streams.count++);
+                                       ua_chan->name, ua_chan->streams.count);
+                       ua_chan->streams.count++;
                        if (ret < 0) {
                                PERROR("asprintf UST create stream");
                                /*
index aabe49403fbc1215d423986cb7dfa71301da2a47..44913cb8fcb763b40b49bc2dc6466d9bd6cef721 100644 (file)
@@ -52,7 +52,8 @@ static int send_channel(int sock, struct ust_app_channel *uchan)
                        uchan->obj->shm_fd,
                        uchan->attr.subbuf_size,
                        uchan->obj->memory_map_size,
-                       uchan->name);
+                       uchan->name,
+                       uchan->streams.count);
 
        ret = consumer_send_channel(sock, &msg);
        if (ret < 0) {
@@ -208,7 +209,8 @@ static int send_metadata(int sock, struct ust_app_session *usess,
                        usess->metadata->obj->shm_fd,
                        usess->metadata->attr.subbuf_size,
                        usess->metadata->obj->memory_map_size,
-                       "metadata");
+                       "metadata",
+                       1);
 
        ret = consumer_send_channel(sock, &msg);
        if (ret < 0) {
index a2980e77d29c33b57c4a12889cb82db019956fa0..f4eaf705f0b811230d0a9bb1f2f74458c7f7e6d0 100644 (file)
@@ -319,18 +319,20 @@ void consumer_del_stream(struct lttng_consumer_stream *stream)
        }
        rcu_read_unlock();
 
-       if (!--stream->chan->refcount) {
+       uatomic_dec(&stream->chan->refcount);
+       if (!uatomic_read(&stream->chan->refcount)
+                       && !uatomic_read(&stream->chan->nb_init_streams)) {
                free_chan = stream->chan;
        }
 
-
        call_rcu(&stream->node.head, consumer_free_stream);
 end:
        consumer_data.need_update = 1;
        pthread_mutex_unlock(&consumer_data.lock);
 
-       if (free_chan)
+       if (free_chan) {
                consumer_del_channel(free_chan);
+       }
 }
 
 struct lttng_consumer_stream *consumer_allocate_stream(
@@ -394,13 +396,24 @@ struct lttng_consumer_stream *consumer_allocate_stream(
                assert(0);
                goto end;
        }
-       DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d, net_seq_idx %d)",
-                       stream->path_name, stream->key,
-                       stream->shm_fd,
-                       stream->wait_fd,
-                       (unsigned long long) stream->mmap_len,
-                       stream->out_fd,
+
+       /*
+        * When nb_init_streams reaches 0, we don't need to trigger any action in
+        * terms of destroying the associated channel, because the action that
+        * causes the count to become 0 also causes a stream to be added. The
+        * channel deletion will thus be triggered by the following removal of this
+        * stream.
+        */
+       if (uatomic_read(&stream->chan->nb_init_streams) > 0) {
+               uatomic_dec(&stream->chan->nb_init_streams);
+       }
+
+       DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu,"
+                       " out_fd %d, net_seq_idx %d)", stream->path_name, stream->key,
+                       stream->shm_fd, stream->wait_fd,
+                       (unsigned long long) stream->mmap_len, stream->out_fd,
                        stream->net_seq_idx);
+
 end:
        return stream;
 }
@@ -671,7 +684,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(
                int channel_key,
                int shm_fd, int wait_fd,
                uint64_t mmap_len,
-               uint64_t max_sb_size)
+               uint64_t max_sb_size,
+               unsigned int nb_init_streams)
 {
        struct lttng_consumer_channel *channel;
        int ret;
@@ -687,6 +701,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(
        channel->mmap_len = mmap_len;
        channel->max_sb_size = max_sb_size;
        channel->refcount = 0;
+       channel->nb_init_streams = nb_init_streams;
        lttng_ht_node_init_ulong(&channel->node, channel->key);
 
        switch (consumer_data.type) {
@@ -1520,7 +1535,6 @@ static void destroy_stream_ht(struct lttng_ht *ht)
 static void consumer_del_metadata_stream(struct lttng_consumer_stream *stream)
 {
        int ret;
-       struct lttng_consumer_channel *free_chan = NULL;
        struct consumer_relayd_sock_pair *relayd;
 
        assert(stream);
@@ -1602,12 +1616,10 @@ static void consumer_del_metadata_stream(struct lttng_consumer_stream *stream)
 
        /* Atomically decrement channel refcount since other threads can use it. */
        uatomic_dec(&stream->chan->refcount);
-       if (!uatomic_read(&stream->chan->refcount)) {
-               free_chan = stream->chan;
-       }
-
-       if (free_chan) {
-               consumer_del_channel(free_chan);
+       if (!uatomic_read(&stream->chan->refcount)
+                       && !uatomic_read(&stream->chan->nb_init_streams)) {
+               /* Go for channel deletion! */
+               consumer_del_channel(stream->chan);
        }
 
        free(stream);
index dba7765772859bd703adb5ba9e0889759d5fb757..0f82a10865f18b2960900db6e937b3768618d1d0 100644 (file)
@@ -77,6 +77,12 @@ struct lttng_consumer_channel {
        int key;
        uint64_t max_sb_size; /* the subbuffer size for this channel */
        int refcount; /* Number of streams referencing this channel */
+       /*
+        * The number of streams to receive initially. Used to guarantee that we do
+        * not destroy a channel before receiving all its associated streams.
+        */
+       unsigned int nb_init_streams;
+
        /* For UST */
        int shm_fd;
        int wait_fd;
@@ -342,7 +348,8 @@ extern struct lttng_consumer_channel *consumer_allocate_channel(
                int channel_key,
                int shm_fd, int wait_fd,
                uint64_t mmap_len,
-               uint64_t max_sb_size);
+               uint64_t max_sb_size,
+               unsigned int nb_init_streams);
 int consumer_add_channel(struct lttng_consumer_channel *channel);
 
 /* lttng-relayd consumer command */
index f910f033d9ea96e283f3ee5705e6cf9d599efef8..5a219fc0b6c543d5c0f5270c3febd4ccfb4adb3a 100644 (file)
@@ -118,7 +118,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
                                -1, -1,
                                msg.u.channel.mmap_len,
-                               msg.u.channel.max_sb_size);
+                               msg.u.channel.max_sb_size,
+                               msg.u.channel.nb_init_streams);
                if (new_channel == NULL) {
                        lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
                        goto end_nosignal;
index 62205f4c91d0b001ce63bd772707da65135a84f5..6d796efe42e650b20683c58e74be725db4716ba0 100644 (file)
@@ -255,6 +255,8 @@ struct lttcomm_consumer_msg {
                        uint64_t max_sb_size; /* the subbuffer size for this channel */
                        /* shm_fd and wait_fd are sent as ancillary data */
                        uint64_t mmap_len;
+                       /* nb_init_streams is the number of streams open initially. */
+                       unsigned int nb_init_streams;
                        char name[LTTNG_SYMBOL_NAME_LEN];
                } channel;
                struct {
index 8ab2b819dcee52a237518b1ce6f8cba5f3e1aa1f..ad4b014c6f08dc09f4e47b22e19cbd47330ec5ae 100644 (file)
@@ -150,7 +150,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
                                fds[0], -1,
                                msg.u.channel.mmap_len,
-                               msg.u.channel.max_sb_size);
+                               msg.u.channel.max_sb_size,
+                               msg.u.channel.nb_init_streams);
                if (new_channel == NULL) {
                        lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
                        goto end_nosignal;
This page took 0.032472 seconds and 4 git commands to generate.