DEFINE_URCU_TLS(unsigned int, lib_ring_buffer_nesting);
+/*
+ * wakeup_fd_mutex protects wakeup fd use by timer from concurrent
+ * close.
+ */
+static pthread_mutex_t wakeup_fd_mutex = PTHREAD_MUTEX_INITIALIZER;
+
static
void lib_ring_buffer_print_errors(struct channel *chan,
struct lttng_ust_lib_ring_buffer *buf, int cpu,
DBG("Timer for channel %p\n", chan);
+ pthread_mutex_lock(&wakeup_fd_mutex);
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
for_each_possible_cpu(cpu) {
struct lttng_ust_lib_ring_buffer *buf =
lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE,
chan->handle);
}
+ pthread_mutex_unlock(&wakeup_fd_mutex);
return;
}
static void channel_free(struct channel *chan,
struct lttng_ust_shm_handle *handle)
{
- channel_print_errors(chan, handle);
channel_backend_free(&chan->backend, handle);
/* chan is freed by shm teardown */
shm_object_table_destroy(handle->table);
}
struct lttng_ust_shm_handle *channel_handle_create(void *data,
- uint64_t memory_map_size)
+ uint64_t memory_map_size,
+ int wakeup_fd)
{
struct lttng_ust_shm_handle *handle;
struct shm_object *object;
goto error_table_alloc;
/* Add channel object */
object = shm_object_table_append_mem(handle->table, data,
- memory_map_size);
+ memory_map_size, wakeup_fd);
if (!object)
goto error_table_object;
/* struct channel is at object 0, offset 0 (hardcoded) */
* switching the buffers.
*/
channel_unregister_notifiers(chan, handle);
+ /*
+ * The consumer prints errors.
+ */
+ channel_print_errors(chan, handle);
}
/*
return shmp(handle, chan->backend.buf[cpu].shmp);
}
-int ring_buffer_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+int ring_buffer_channel_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+ struct channel *chan,
+ struct lttng_ust_shm_handle *handle)
+{
+ struct shm_ref *ref;
+
+ ref = &handle->chan._ref;
+ return shm_close_wait_fd(handle, ref);
+}
+
+int ring_buffer_channel_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+ struct channel *chan,
+ struct lttng_ust_shm_handle *handle)
+{
+ struct shm_ref *ref;
+
+ ref = &handle->chan._ref;
+ return shm_close_wakeup_fd(handle, ref);
+}
+
+int ring_buffer_stream_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config,
struct channel *chan,
struct lttng_ust_shm_handle *handle,
int cpu)
return shm_close_wait_fd(handle, ref);
}
-int ring_buffer_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+int ring_buffer_stream_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config,
struct channel *chan,
struct lttng_ust_shm_handle *handle,
int cpu)
{
struct shm_ref *ref;
+ int ret;
if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
cpu = 0;
return -EINVAL;
}
ref = &chan->backend.buf[cpu].shmp._ref;
- return shm_close_wakeup_fd(handle, ref);
+ pthread_mutex_lock(&wakeup_fd_mutex);
+ ret = shm_close_wakeup_fd(handle, ref);
+ pthread_mutex_unlock(&wakeup_fd_mutex);
+ return ret;
}
int lib_ring_buffer_open_read(struct lttng_ust_lib_ring_buffer *buf,