Only flush when there are readers active
[lttng-ust.git] / libringbuffer / ring_buffer_frontend.c
index 76f369e9bc7bbee03faf55627c254855ddf54f40..d0e9466810cc0cb45b7ef51e40a67c08e367cf33 100644 (file)
@@ -112,6 +112,12 @@ struct switch_offsets {
 
 DEFINE_URCU_TLS(unsigned int, lib_ring_buffer_nesting);
 
+/*
+ * wakeup_fd_mutex protects wakeup fd use by timer from concurrent
+ * close.
+ */
+static pthread_mutex_t wakeup_fd_mutex = PTHREAD_MUTEX_INITIALIZER;
+
 static
 void lib_ring_buffer_print_errors(struct channel *chan,
                                struct lttng_ust_lib_ring_buffer *buf, int cpu,
@@ -262,29 +268,6 @@ free_chanbuf:
        return ret;
 }
 
-#if 0
-static void switch_buffer_timer(unsigned long data)
-{
-       struct lttng_ust_lib_ring_buffer *buf = (struct lttng_ust_lib_ring_buffer *)data;
-       struct channel *chan = shmp(handle, buf->backend.chan);
-       const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
-
-       /*
-        * Only flush buffers periodically if readers are active.
-        */
-       if (uatomic_read(&buf->active_readers))
-               lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE, handle);
-
-       //TODO timers
-       //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
-       //      mod_timer_pinned(&buf->switch_timer,
-       //                       jiffies + chan->switch_timer_interval);
-       //else
-       //      mod_timer(&buf->switch_timer,
-       //                jiffies + chan->switch_timer_interval);
-}
-#endif //0
-
 static
 void lib_ring_buffer_channel_switch_timer(int sig, siginfo_t *si, void *uc)
 {
@@ -301,21 +284,27 @@ void lib_ring_buffer_channel_switch_timer(int sig, siginfo_t *si, void *uc)
 
        DBG("Timer for channel %p\n", chan);
 
+       /*
+        * Only flush buffers periodically if readers are active.
+        */
+       pthread_mutex_lock(&wakeup_fd_mutex);
        if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
                for_each_possible_cpu(cpu) {
                        struct lttng_ust_lib_ring_buffer *buf =
                                shmp(handle, chan->backend.buf[cpu].shmp);
-
-                       lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE,
-                               chan->handle);
+                       if (uatomic_read(&buf->active_readers))
+                               lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE,
+                                       chan->handle);
                }
        } else {
                struct lttng_ust_lib_ring_buffer *buf =
                        shmp(handle, chan->backend.buf[0].shmp);
 
-                       lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE,
-                               chan->handle);
+                       if (uatomic_read(&buf->active_readers))
+                               lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE,
+                                       chan->handle);
        }
+       pthread_mutex_unlock(&wakeup_fd_mutex);
        return;
 }
 
@@ -612,7 +601,6 @@ static void channel_print_errors(struct channel *chan,
 static void channel_free(struct channel *chan,
                struct lttng_ust_shm_handle *handle)
 {
-       channel_print_errors(chan, handle);
        channel_backend_free(&chan->backend, handle);
        /* chan is freed by shm teardown */
        shm_object_table_destroy(handle->table);
@@ -754,7 +742,8 @@ error_table_alloc:
 }
 
 struct lttng_ust_shm_handle *channel_handle_create(void *data,
-                                       uint64_t memory_map_size)
+                                       uint64_t memory_map_size,
+                                       int wakeup_fd)
 {
        struct lttng_ust_shm_handle *handle;
        struct shm_object *object;
@@ -769,7 +758,7 @@ struct lttng_ust_shm_handle *channel_handle_create(void *data,
                goto error_table_alloc;
        /* Add channel object */
        object = shm_object_table_append_mem(handle->table, data,
-                       memory_map_size);
+                       memory_map_size, wakeup_fd);
        if (!object)
                goto error_table_object;
        /* struct channel is at object 0, offset 0 (hardcoded) */
@@ -830,6 +819,10 @@ void channel_destroy(struct channel *chan, struct lttng_ust_shm_handle *handle,
                 * switching the buffers.
                 */
                channel_unregister_notifiers(chan, handle);
+               /*
+                * The consumer prints errors.
+                */
+               channel_print_errors(chan, handle);
        }
 
        /*
@@ -865,7 +858,27 @@ struct lttng_ust_lib_ring_buffer *channel_get_ring_buffer(
        return shmp(handle, chan->backend.buf[cpu].shmp);
 }
 
-int ring_buffer_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+int ring_buffer_channel_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+                       struct channel *chan,
+                       struct lttng_ust_shm_handle *handle)
+{
+       struct shm_ref *ref;
+
+       ref = &handle->chan._ref;
+       return shm_close_wait_fd(handle, ref);
+}
+
+int ring_buffer_channel_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+                       struct channel *chan,
+                       struct lttng_ust_shm_handle *handle)
+{
+       struct shm_ref *ref;
+
+       ref = &handle->chan._ref;
+       return shm_close_wakeup_fd(handle, ref);
+}
+
+int ring_buffer_stream_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config,
                        struct channel *chan,
                        struct lttng_ust_shm_handle *handle,
                        int cpu)
@@ -882,12 +895,13 @@ int ring_buffer_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *con
        return shm_close_wait_fd(handle, ref);
 }
 
-int ring_buffer_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+int ring_buffer_stream_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config,
                        struct channel *chan,
                        struct lttng_ust_shm_handle *handle,
                        int cpu)
 {
        struct shm_ref *ref;
+       int ret;
 
        if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
                cpu = 0;
@@ -896,7 +910,10 @@ int ring_buffer_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *c
                        return -EINVAL;
        }
        ref = &chan->backend.buf[cpu].shmp._ref;
-       return shm_close_wakeup_fd(handle, ref);
+       pthread_mutex_lock(&wakeup_fd_mutex);
+       ret = shm_close_wakeup_fd(handle, ref);
+       pthread_mutex_unlock(&wakeup_fd_mutex);
+       return ret;
 }
 
 int lib_ring_buffer_open_read(struct lttng_ust_lib_ring_buffer *buf,
This page took 0.026363 seconds and 4 git commands to generate.