From ff0f57289ff0e6be25424081fabbbfc0e3b1b565 Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Wed, 6 Mar 2013 16:11:44 -0500 Subject: [PATCH] Add channel wakeup fd to monitor close Add channel wakeup fd, so consumer can keep its handle on the stream wakeup_fd (for periodic timer flush), and yet still discover that an application has closed a channel or exited. Requires to be updated in locked-step with lttng-tools "Add channel wakeup fd to monitor close" Signed-off-by: Mathieu Desnoyers --- include/lttng/ust-abi.h | 2 + include/lttng/ust-ctl.h | 9 ++-- include/ust-comm.h | 2 +- liblttng-ust-comm/lttng-ust-comm.c | 18 ++++++- liblttng-ust-ctl/ustctl.c | 76 ++++++++++++++++++++++++++-- liblttng-ust/lttng-ust-abi.c | 24 ++++++++- liblttng-ust/lttng-ust-comm.c | 5 +- libringbuffer/frontend.h | 12 ++++- libringbuffer/ring_buffer_frontend.c | 29 +++++++++-- libringbuffer/shm.c | 70 +++++++++++++++++++++++-- libringbuffer/shm.h | 4 +- 11 files changed, 224 insertions(+), 27 deletions(-) diff --git a/include/lttng/ust-abi.h b/include/lttng/ust-abi.h index 8f2233a5..df61cde5 100644 --- a/include/lttng/ust-abi.h +++ b/include/lttng/ust-abi.h @@ -191,6 +191,7 @@ struct lttng_ust_object_data { struct { void *data; enum lttng_ust_chan_type type; + int wakeup_fd; } channel; struct { int shm_fd; @@ -279,6 +280,7 @@ struct lttng_ust_obj; union ust_args { struct { void *chan_data; + int wakeup_fd; } channel; struct { int shm_fd; diff --git a/include/lttng/ust-ctl.h b/include/lttng/ust-ctl.h index 3171b315..ed2b513d 100644 --- a/include/lttng/ust-ctl.h +++ b/include/lttng/ust-ctl.h @@ -146,6 +146,10 @@ void ustctl_destroy_channel(struct ustctl_consumer_channel *chan); int ustctl_send_channel_to_sessiond(int sock, struct ustctl_consumer_channel *channel); +int ustctl_channel_close_wait_fd(struct ustctl_consumer_channel *consumer_chan); +int ustctl_channel_close_wakeup_fd(struct ustctl_consumer_channel *consumer_chan); +int ustctl_channel_get_wait_fd(struct ustctl_consumer_channel *consumer_chan); +int ustctl_channel_get_wakeup_fd(struct ustctl_consumer_channel *consumer_chan); int ustctl_write_metadata_to_channel( struct ustctl_consumer_channel *channel, @@ -160,6 +164,8 @@ int ustctl_send_stream_to_sessiond(int sock, struct ustctl_consumer_stream *stream); int ustctl_stream_close_wait_fd(struct ustctl_consumer_stream *stream); int ustctl_stream_close_wakeup_fd(struct ustctl_consumer_stream *stream); +int ustctl_stream_get_wait_fd(struct ustctl_consumer_stream *stream); +int ustctl_stream_get_wakeup_fd(struct ustctl_consumer_stream *stream); /* Create/destroy stream buffers for read */ struct ustctl_consumer_stream * @@ -167,9 +173,6 @@ struct ustctl_consumer_stream * int cpu); void ustctl_destroy_stream(struct ustctl_consumer_stream *stream); -int ustctl_get_wait_fd(struct ustctl_consumer_stream *stream); -int ustctl_get_wakeup_fd(struct ustctl_consumer_stream *stream); - /* For mmap mode, readable without "get" operation */ int ustctl_get_mmap_len(struct ustctl_consumer_stream *stream, unsigned long *len); diff --git a/include/ust-comm.h b/include/ust-comm.h index c3564dc6..ba800900 100644 --- a/include/ust-comm.h +++ b/include/ust-comm.h @@ -185,7 +185,7 @@ extern int ustcomm_send_app_cmd(int sock, int ustcomm_recv_fd(int sock); ssize_t ustcomm_recv_channel_from_sessiond(int sock, - void **chan_data, uint64_t len); + void **chan_data, uint64_t len, int *wakeup_fd); int ustcomm_recv_stream_from_sessiond(int sock, uint64_t *memory_map_size, int *shm_fd, int *wakeup_fd); diff --git a/liblttng-ust-comm/lttng-ust-comm.c b/liblttng-ust-comm/lttng-ust-comm.c index a666ab27..00b1d43a 100644 --- a/liblttng-ust-comm/lttng-ust-comm.c +++ b/liblttng-ust-comm/lttng-ust-comm.c @@ -546,10 +546,12 @@ int ustcomm_send_app_cmd(int sock, * expected var_len. */ ssize_t ustcomm_recv_channel_from_sessiond(int sock, - void **_chan_data, uint64_t var_len) + void **_chan_data, uint64_t var_len, + int *_wakeup_fd) { void *chan_data; - ssize_t len; + ssize_t len, nr_fd; + int wakeup_fd; if (var_len > LTTNG_UST_CHANNEL_DATA_MAX_LEN) { len = -EINVAL; @@ -565,6 +567,18 @@ ssize_t ustcomm_recv_channel_from_sessiond(int sock, if (len != var_len) { goto error_recv; } + /* recv wakeup fd */ + nr_fd = ustcomm_recv_fds_unix_sock(sock, &wakeup_fd, 1); + if (nr_fd <= 0) { + if (nr_fd < 0) { + len = nr_fd; + goto error_recv; + } else { + len = -EIO; + goto error_recv; + } + } + *_wakeup_fd = wakeup_fd; *_chan_data = chan_data; return len; diff --git a/liblttng-ust-ctl/ustctl.c b/liblttng-ust-ctl/ustctl.c index fb4330e4..641c508b 100644 --- a/liblttng-ust-ctl/ustctl.c +++ b/liblttng-ust-ctl/ustctl.c @@ -46,6 +46,8 @@ struct ustctl_consumer_channel { /* initial attributes */ struct ustctl_consumer_channel_attr attr; + int wait_fd; /* monitor close() */ + int wakeup_fd; /* monitor close() */ }; /* @@ -95,6 +97,13 @@ int ustctl_release_object(int sock, struct lttng_ust_object_data *data) switch (data->type) { case LTTNG_UST_OBJECT_TYPE_CHANNEL: + if (data->u.channel.wakeup_fd >= 0) { + ret = close(data->u.channel.wakeup_fd); + if (ret < 0) { + ret = -errno; + return ret; + } + } free(data->u.channel.data); break; case LTTNG_UST_OBJECT_TYPE_STREAM: @@ -469,6 +478,7 @@ int ustctl_send_channel(int sock, enum lttng_ust_chan_type type, void *data, uint64_t size, + int wakeup_fd, int send_fd_only) { ssize_t len; @@ -502,6 +512,14 @@ int ustctl_send_channel(int sock, return -EIO; } + /* Send wakeup fd */ + len = ustcomm_send_fds_unix_sock(sock, &wakeup_fd, 1); + if (len <= 0) { + if (len < 0) + return len; + else + return -EIO; + } return 0; } @@ -569,6 +587,7 @@ int ustctl_recv_channel_from_consumer(int sock, { struct lttng_ust_object_data *channel_data; ssize_t len; + int wakeup_fd; int ret; channel_data = zmalloc(sizeof(*channel_data)); @@ -615,7 +634,18 @@ int ustctl_recv_channel_from_consumer(int sock, ret = -EINVAL; goto error_recv_data; } - + /* recv wakeup fd */ + len = ustcomm_recv_fds_unix_sock(sock, &wakeup_fd, 1); + if (len <= 0) { + if (len < 0) { + ret = len; + goto error_recv_data; + } else { + ret = -EIO; + goto error_recv_data; + } + } + channel_data->u.channel.wakeup_fd = wakeup_fd; *_channel_data = channel_data; return 0; @@ -715,6 +745,7 @@ int ustctl_send_channel_to_ust(int sock, int session_handle, channel_data->u.channel.type, channel_data->u.channel.data, channel_data->size, + channel_data->u.channel.wakeup_fd, 1); if (ret) return ret; @@ -833,6 +864,7 @@ int ustctl_send_channel_to_sessiond(int sock, channel->attr.type, table->objects[0].memory_map, table->objects[0].memory_map_size, + channel->wakeup_fd, 0); } @@ -893,12 +925,30 @@ end: return ret; } +int ustctl_channel_close_wait_fd(struct ustctl_consumer_channel *consumer_chan) +{ + struct channel *chan; + + chan = consumer_chan->chan->chan; + return ring_buffer_channel_close_wait_fd(&chan->backend.config, + chan, chan->handle); +} + +int ustctl_channel_close_wakeup_fd(struct ustctl_consumer_channel *consumer_chan) +{ + struct channel *chan; + + chan = consumer_chan->chan->chan; + return ring_buffer_channel_close_wakeup_fd(&chan->backend.config, + chan, chan->handle); +} + int ustctl_stream_close_wait_fd(struct ustctl_consumer_stream *stream) { struct channel *chan; chan = stream->chan->chan->chan; - return ring_buffer_close_wait_fd(&chan->backend.config, + return ring_buffer_stream_close_wait_fd(&chan->backend.config, chan, stream->handle, stream->cpu); } @@ -907,7 +957,7 @@ int ustctl_stream_close_wakeup_fd(struct ustctl_consumer_stream *stream) struct channel *chan; chan = stream->chan->chan->chan; - return ring_buffer_close_wakeup_fd(&chan->backend.config, + return ring_buffer_stream_close_wakeup_fd(&chan->backend.config, chan, stream->handle, stream->cpu); } @@ -968,7 +1018,23 @@ void ustctl_destroy_stream(struct ustctl_consumer_stream *stream) free(stream); } -int ustctl_get_wait_fd(struct ustctl_consumer_stream *stream) +int ustctl_channel_get_wait_fd(struct ustctl_consumer_channel *chan) +{ + if (!chan) + return -EINVAL; + return shm_get_wait_fd(chan->chan->handle, + &chan->chan->handle->chan._ref); +} + +int ustctl_channel_get_wakeup_fd(struct ustctl_consumer_channel *chan) +{ + if (!chan) + return -EINVAL; + return shm_get_wakeup_fd(chan->chan->handle, + &chan->chan->handle->chan._ref); +} + +int ustctl_stream_get_wait_fd(struct ustctl_consumer_stream *stream) { struct lttng_ust_lib_ring_buffer *buf; struct ustctl_consumer_channel *consumer_chan; @@ -980,7 +1046,7 @@ int ustctl_get_wait_fd(struct ustctl_consumer_stream *stream) return shm_get_wait_fd(consumer_chan->chan->handle, &buf->self._ref); } -int ustctl_get_wakeup_fd(struct ustctl_consumer_stream *stream) +int ustctl_stream_get_wakeup_fd(struct ustctl_consumer_stream *stream) { struct lttng_ust_lib_ring_buffer *buf; struct ustctl_consumer_channel *consumer_chan; diff --git a/liblttng-ust/lttng-ust-abi.c b/liblttng-ust/lttng-ust-abi.c index 70ec22aa..f26ab5c4 100644 --- a/liblttng-ust/lttng-ust-abi.c +++ b/liblttng-ust/lttng-ust-abi.c @@ -403,11 +403,13 @@ int lttng_abi_map_channel(int session_objd, struct channel *chan; struct lttng_ust_lib_ring_buffer_config *config; void *chan_data; + int wakeup_fd; uint64_t len; int ret; enum lttng_ust_chan_type type; chan_data = uargs->channel.chan_data; + wakeup_fd = uargs->channel.wakeup_fd; len = ust_chan->len; type = ust_chan->type; @@ -415,7 +417,8 @@ int lttng_abi_map_channel(int session_objd, case LTTNG_UST_CHAN_PER_CPU: break; default: - return -EINVAL; + ret = -EINVAL; + goto invalid; } if (session->been_active) { @@ -423,7 +426,7 @@ int lttng_abi_map_channel(int session_objd, goto active; /* Refuse to add channel to active session */ } - channel_handle = channel_handle_create(chan_data, len); + channel_handle = channel_handle_create(chan_data, len, wakeup_fd); if (!channel_handle) { ret = -EINVAL; goto handle_error; @@ -496,13 +499,30 @@ int lttng_abi_map_channel(int session_objd, objd_ref(session_objd); return chan_objd; + /* error path after channel was created */ objd_error: notransport: free(lttng_chan); alloc_error: channel_destroy(chan, channel_handle, 0); + return ret; + + /* + * error path before channel creation (owning chan_data and + * wakeup_fd). + */ handle_error: active: +invalid: + { + int close_ret; + + close_ret = close(wakeup_fd); + if (close_ret) { + PERROR("close"); + } + } + free(chan_data); return ret; } diff --git a/liblttng-ust/lttng-ust-comm.c b/liblttng-ust/lttng-ust-comm.c index 096a22fc..d91f4bd2 100644 --- a/liblttng-ust/lttng-ust-comm.c +++ b/liblttng-ust/lttng-ust-comm.c @@ -467,9 +467,11 @@ int handle_message(struct sock_info *sock_info, case LTTNG_UST_CHANNEL: { void *chan_data; + int wakeup_fd; len = ustcomm_recv_channel_from_sessiond(sock, - &chan_data, lum->u.channel.len); + &chan_data, lum->u.channel.len, + &wakeup_fd); switch (len) { case 0: /* orderly shutdown */ ret = 0; @@ -494,6 +496,7 @@ int handle_message(struct sock_info *sock_info, } } args.channel.chan_data = chan_data; + args.channel.wakeup_fd = wakeup_fd; if (ops->cmd) ret = ops->cmd(lum->handle, lum->cmd, (unsigned long) &lum->u, diff --git a/libringbuffer/frontend.h b/libringbuffer/frontend.h index 2eda6e94..89613d4c 100644 --- a/libringbuffer/frontend.h +++ b/libringbuffer/frontend.h @@ -94,12 +94,20 @@ extern struct lttng_ust_lib_ring_buffer *channel_get_ring_buffer( int *wakeup_fd, uint64_t *memory_map_size); extern -int ring_buffer_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config, +int ring_buffer_channel_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config, + struct channel *chan, + struct lttng_ust_shm_handle *handle); +extern +int ring_buffer_channel_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config, + struct channel *chan, + struct lttng_ust_shm_handle *handle); +extern +int ring_buffer_stream_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config, struct channel *chan, struct lttng_ust_shm_handle *handle, int cpu); extern -int ring_buffer_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config, +int ring_buffer_stream_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config, struct channel *chan, struct lttng_ust_shm_handle *handle, int cpu); diff --git a/libringbuffer/ring_buffer_frontend.c b/libringbuffer/ring_buffer_frontend.c index a01ebbbf..5d1bc4ad 100644 --- a/libringbuffer/ring_buffer_frontend.c +++ b/libringbuffer/ring_buffer_frontend.c @@ -753,7 +753,8 @@ error_table_alloc: } struct lttng_ust_shm_handle *channel_handle_create(void *data, - uint64_t memory_map_size) + uint64_t memory_map_size, + int wakeup_fd) { struct lttng_ust_shm_handle *handle; struct shm_object *object; @@ -768,7 +769,7 @@ struct lttng_ust_shm_handle *channel_handle_create(void *data, goto error_table_alloc; /* Add channel object */ object = shm_object_table_append_mem(handle->table, data, - memory_map_size); + memory_map_size, wakeup_fd); if (!object) goto error_table_object; /* struct channel is at object 0, offset 0 (hardcoded) */ @@ -868,7 +869,27 @@ struct lttng_ust_lib_ring_buffer *channel_get_ring_buffer( return shmp(handle, chan->backend.buf[cpu].shmp); } -int ring_buffer_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config, +int ring_buffer_channel_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config, + struct channel *chan, + struct lttng_ust_shm_handle *handle) +{ + struct shm_ref *ref; + + ref = &handle->chan._ref; + return shm_close_wait_fd(handle, ref); +} + +int ring_buffer_channel_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config, + struct channel *chan, + struct lttng_ust_shm_handle *handle) +{ + struct shm_ref *ref; + + ref = &handle->chan._ref; + return shm_close_wakeup_fd(handle, ref); +} + +int ring_buffer_stream_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config, struct channel *chan, struct lttng_ust_shm_handle *handle, int cpu) @@ -885,7 +906,7 @@ int ring_buffer_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *con return shm_close_wait_fd(handle, ref); } -int ring_buffer_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config, +int ring_buffer_stream_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config, struct channel *chan, struct lttng_ust_shm_handle *handle, int cpu) diff --git a/libringbuffer/shm.c b/libringbuffer/shm.c index 85b1e4b7..90160ce1 100644 --- a/libringbuffer/shm.c +++ b/libringbuffer/shm.c @@ -231,6 +231,7 @@ struct shm_object *_shm_object_table_alloc_mem(struct shm_object_table *table, { struct shm_object *obj; void *memory_map; + int waitfd[2], i, ret; if (table->allocated_len >= table->size) return NULL; @@ -240,8 +241,28 @@ struct shm_object *_shm_object_table_alloc_mem(struct shm_object_table *table, if (!memory_map) goto alloc_error; - obj->wait_fd[0] = -1; - obj->wait_fd[1] = -1; + /* wait_fd: create pipe */ + ret = pipe(waitfd); + if (ret < 0) { + PERROR("pipe"); + goto error_pipe; + } + for (i = 0; i < 2; i++) { + ret = fcntl(waitfd[i], F_SETFD, FD_CLOEXEC); + if (ret < 0) { + PERROR("fcntl"); + goto error_fcntl; + } + } + /* The write end of the pipe needs to be non-blocking */ + ret = fcntl(waitfd[1], F_SETFL, O_NONBLOCK); + if (ret < 0) { + PERROR("fcntl"); + goto error_fcntl; + } + memcpy(obj->wait_fd, waitfd, sizeof(waitfd)); + + /* no shm_fd */ obj->shm_fd = -1; obj->type = SHM_OBJECT_MEM; @@ -252,6 +273,16 @@ struct shm_object *_shm_object_table_alloc_mem(struct shm_object_table *table, return obj; +error_fcntl: + for (i = 0; i < 2; i++) { + ret = close(waitfd[i]); + if (ret) { + PERROR("close"); + assert(0); + } + } +error_pipe: + free(memory_map); alloc_error: return NULL; } @@ -328,18 +359,31 @@ error_mmap: * Passing ownership of mem to object. */ struct shm_object *shm_object_table_append_mem(struct shm_object_table *table, - void *mem, size_t memory_map_size) + void *mem, size_t memory_map_size, int wakeup_fd) { struct shm_object *obj; + int ret; if (table->allocated_len >= table->size) return NULL; obj = &table->objects[table->allocated_len]; - obj->wait_fd[0] = -1; - obj->wait_fd[1] = -1; + obj->wait_fd[0] = -1; /* read end is unset */ + obj->wait_fd[1] = wakeup_fd; obj->shm_fd = -1; + ret = fcntl(obj->wait_fd[1], F_SETFD, FD_CLOEXEC); + if (ret < 0) { + PERROR("fcntl"); + goto error_fcntl; + } + /* The write end of the pipe needs to be non-blocking */ + ret = fcntl(obj->wait_fd[1], F_SETFL, O_NONBLOCK); + if (ret < 0) { + PERROR("fcntl"); + goto error_fcntl; + } + obj->type = SHM_OBJECT_MEM; obj->memory_map = mem; obj->memory_map_size = memory_map_size; @@ -347,6 +391,9 @@ struct shm_object *shm_object_table_append_mem(struct shm_object_table *table, obj->index = table->allocated_len++; return obj; + +error_fcntl: + return NULL; } static @@ -379,8 +426,21 @@ void shmp_object_destroy(struct shm_object *obj) break; } case SHM_OBJECT_MEM: + { + int ret, i; + + for (i = 0; i < 2; i++) { + if (obj->wait_fd[i] < 0) + continue; + ret = close(obj->wait_fd[i]); + if (ret) { + PERROR("close"); + assert(0); + } + } free(obj->memory_map); break; + } default: assert(0); } diff --git a/libringbuffer/shm.h b/libringbuffer/shm.h index a1ddc4cd..8d9d1136 100644 --- a/libringbuffer/shm.h +++ b/libringbuffer/shm.h @@ -29,7 +29,7 @@ /* channel_handle_create - for UST. */ extern struct lttng_ust_shm_handle *channel_handle_create(void *data, - uint64_t memory_map_size); + uint64_t memory_map_size, int wakeup_fd); /* channel_handle_add_stream - for UST. */ extern int channel_handle_add_stream(struct lttng_ust_shm_handle *handle, @@ -94,7 +94,7 @@ struct shm_object *shm_object_table_append_shm(struct shm_object_table *table, size_t memory_map_size); /* mem ownership is passed to shm_object_table_append_mem(). */ struct shm_object *shm_object_table_append_mem(struct shm_object_table *table, - void *mem, size_t memory_map_size); + void *mem, size_t memory_map_size, int wakeup_fd); void shm_object_table_destroy(struct shm_object_table *table); /* -- 2.34.1