UST consumer: perform buffer flush on hang up
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Tue, 8 Nov 2011 14:42:17 +0000 (09:42 -0500)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Tue, 8 Nov 2011 14:42:17 +0000 (09:42 -0500)
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
include/lttng/lttng-consumer.h
include/lttng/lttng-ustconsumer.h
liblttng-consumer/lttng-consumer.c
liblttng-ustconsumer/lttng-ustconsumer.c

index 256293b811001f18fe6c3fc865749eed16d7b689..6f893a53a67a3d01095c2221a09850e086675b85 100644 (file)
@@ -118,6 +118,7 @@ struct lttng_consumer_stream {
        /* For UST */
        struct lttng_ust_lib_ring_buffer *buf;
        int cpu;
        /* For UST */
        struct lttng_ust_lib_ring_buffer *buf;
        int cpu;
+       int hangup_flush_done;
 };
 
 /*
 };
 
 /*
index 90d6588b90e7a75a5da244b4f13f9f8bd5b3438b..217c592d7c9adebaa22892d38c3a20e7b171586c 100644 (file)
@@ -70,6 +70,8 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx);
 int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream);
 
                struct lttng_consumer_local_data *ctx);
 int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream);
 
+void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream);
+
 #else /* HAVE_LIBLTTNG_UST_CTL */
 
 static inline
 #else /* HAVE_LIBLTTNG_UST_CTL */
 
 static inline
@@ -146,6 +148,11 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
        return -ENOSYS;
 }
 
        return -ENOSYS;
 }
 
+static inline
+void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
+{
+}
+
 #endif /* HAVE_LIBLTTNG_UST_CTL */
 
 #endif /* _LTTNG_USTCONSUMER_H */
 #endif /* HAVE_LIBLTTNG_UST_CTL */
 
 #endif /* _LTTNG_USTCONSUMER_H */
index de15dd3b72379276cd0d934e79f2435ef67dbb8a..54338e800619a728f70ca44673c6f346a0ebecc0 100644 (file)
@@ -847,7 +847,21 @@ void *lttng_consumer_thread_poll_fds(void *data)
                                num_hup++;
                        } else if ((pollfd[i].revents & POLLHUP) &&
                                        !(pollfd[i].revents & POLLIN)) {
                                num_hup++;
                        } else if ((pollfd[i].revents & POLLHUP) &&
                                        !(pollfd[i].revents & POLLIN)) {
-                               DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
+                               if (consumer_data.type == LTTNG_CONSUMER_UST) {
+                                       DBG("Polling fd %d tells it has hung up. Attempting flush and read.",
+                                               pollfd[i].fd);
+                                       if (!local_stream[i]->hangup_flush_done) {
+                                               lttng_ustconsumer_on_stream_hangup(local_stream[i]);
+                                               /* try reading after flush */
+                                               ret = ctx->on_buffer_ready(local_stream[i], ctx);
+                                               /* it's ok to have an unavailable sub-buffer */
+                                               if (ret == EAGAIN) {
+                                                       ret = 0;
+                                               }
+                                       }
+                               } else {
+                                       DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
+                               }
                                consumer_del_stream(local_stream[i]);
                                num_hup++;
                        }
                                consumer_del_stream(local_stream[i]);
                                num_hup++;
                        }
index 29f735249417709191f344974decbb1f3676fdb3..806ebd3c03977f340245ed41e98047a7255461fc 100644 (file)
@@ -175,6 +175,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                msg.u.channel.mmap_len,
                                msg.u.channel.max_sb_size);
                if (new_channel == NULL) {
                                msg.u.channel.mmap_len,
                                msg.u.channel.max_sb_size);
                if (new_channel == NULL) {
+                       fprintf(stderr, "AAAAA\n");
                        lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
                        goto end_nosignal;
                }
                        lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
                        goto end_nosignal;
                }
@@ -217,6 +218,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                msg.u.stream.output,
                                msg.u.stream.path_name);
                if (new_stream == NULL) {
                                msg.u.stream.output,
                                msg.u.stream.path_name);
                if (new_stream == NULL) {
+                       fprintf(stderr, "BBBBBB\n");
                        lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
                        goto end;
                }
                        lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
                        goto end;
                }
@@ -281,6 +283,11 @@ int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan)
        return 0;
 }
 
        return 0;
 }
 
+void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
+{
+       ustctl_flush_buffer(stream->chan->handle, stream->buf, 0);
+}
+
 void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
 {
        ustctl_unmap_channel(chan->handle);
 void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
 {
        ustctl_unmap_channel(chan->handle);
This page took 0.027139 seconds and 4 git commands to generate.