From ca22feea083301934d1c8511851c86fb008c0697 Mon Sep 17 00:00:00 2001 From: David Goulet Date: Tue, 16 Oct 2012 16:42:40 -0400 Subject: [PATCH] Consumer daemon data available command support Only the UST and kernel consumer daemon supports the data available command thus it's not yet implemented on the session daemon. Once implemented in the liblttng-ctl and session daemon, the commit will explain the purpose of this command. This is mostly to separate commits so we don't push a 2000 liners upstream. Signed-off-by: David Goulet --- src/bin/lttng-sessiond/consumer.c | 4 +- src/bin/lttng-sessiond/consumer.h | 3 +- src/bin/lttng-sessiond/kernel-consumer.c | 6 +- src/bin/lttng-sessiond/ust-consumer.c | 6 +- src/common/consumer.c | 88 ++++++++++++++++++++ src/common/consumer.h | 1 + src/common/kernel-consumer/kernel-consumer.c | 39 +++++++++ src/common/kernel-consumer/kernel-consumer.h | 1 + src/common/ust-consumer/ust-consumer.c | 55 +++++++++++- src/common/ust-consumer/ust-consumer.h | 6 ++ 10 files changed, 201 insertions(+), 8 deletions(-) diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index d33f85f1b..071135bda 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -519,7 +519,8 @@ void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg, int net_index, unsigned int metadata_flag, const char *name, - const char *pathname) + const char *pathname, + unsigned int session_id) { assert(msg); @@ -537,6 +538,7 @@ void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.stream.gid = gid; msg->u.stream.net_index = net_index; msg->u.stream.metadata_flag = metadata_flag; + msg->u.stream.session_id = (uint64_t) session_id; strncpy(msg->u.stream.name, name, sizeof(msg->u.stream.name)); msg->u.stream.name[sizeof(msg->u.stream.name) - 1] = '\0'; strncpy(msg->u.stream.path_name, pathname, diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h index 6855068db..1a1e1c8d8 100644 --- a/src/bin/lttng-sessiond/consumer.h +++ b/src/bin/lttng-sessiond/consumer.h @@ -190,7 +190,8 @@ void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg, int net_index, unsigned int metadata_flag, const char *name, - const char *pathname); + const char *pathname, + unsigned int session_id); void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, enum lttng_consumer_command cmd, int channel_key, diff --git a/src/bin/lttng-sessiond/kernel-consumer.c b/src/bin/lttng-sessiond/kernel-consumer.c index 33cbbed3e..c86d52803 100644 --- a/src/bin/lttng-sessiond/kernel-consumer.c +++ b/src/bin/lttng-sessiond/kernel-consumer.c @@ -138,7 +138,8 @@ int kernel_consumer_add_metadata(int sock, struct ltt_kernel_session *session) consumer->net_seq_index, 1, /* Metadata flag set */ "metadata", - pathname); + pathname, + session->id); /* Send stream and file descriptor */ ret = consumer_send_stream(sock, consumer, &lkm, @@ -207,7 +208,8 @@ int kernel_consumer_add_stream(int sock, struct ltt_kernel_channel *channel, consumer->net_seq_index, 0, /* Metadata flag unset */ stream->name, - pathname); + pathname, + session->id); /* Send stream and file descriptor */ ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1); diff --git a/src/bin/lttng-sessiond/ust-consumer.c b/src/bin/lttng-sessiond/ust-consumer.c index 44913cb8f..465dd07d7 100644 --- a/src/bin/lttng-sessiond/ust-consumer.c +++ b/src/bin/lttng-sessiond/ust-consumer.c @@ -101,7 +101,8 @@ static int send_channel_stream(int sock, struct ust_app_channel *uchan, consumer->net_seq_index, 0, /* Metadata flag unset */ stream->name, - pathname); + pathname, + usess->id); /* Send stream and file descriptor */ fds[0] = stream->obj->shm_fd; @@ -266,7 +267,8 @@ static int send_metadata(int sock, struct ust_app_session *usess, consumer->net_seq_index, 1, /* Flag metadata set */ "metadata", - pathname); + pathname, + usess->id); /* Send stream and file descriptor */ fds[0] = usess->metadata->stream_obj->shm_fd; diff --git a/src/common/consumer.c b/src/common/consumer.c index 464198597..8d1a34025 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -281,6 +281,11 @@ void consumer_del_stream(struct lttng_consumer_stream *stream, iter.iter.node = &stream->node.node; ret = lttng_ht_del(ht, &iter); assert(!ret); + + /* Remove node session id from the consumer_data stream ht */ + iter.iter.node = &stream->node_session_id.node; + ret = lttng_ht_del(consumer_data.stream_list_ht, &iter); + assert(!ret); rcu_read_unlock(); assert(consumer_data.stream_count > 0); @@ -462,6 +467,13 @@ static int consumer_add_stream(struct lttng_consumer_stream *stream, lttng_ht_add_unique_ulong(ht, &stream->node); + /* + * Add stream to the stream_list_ht of the consumer data. No need to steal + * the key since the HT does not use it and we allow to add redundant keys + * into this table. + */ + lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id); + /* Check and cleanup relayd */ relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd != NULL) { @@ -1606,6 +1618,11 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, iter.iter.node = &stream->node.node; ret = lttng_ht_del(ht, &iter); assert(!ret); + + /* Remove node session id from the consumer_data stream ht */ + iter.iter.node = &stream->node_session_id.node; + ret = lttng_ht_del(consumer_data.stream_list_ht, &iter); + assert(!ret); rcu_read_unlock(); if (stream->out_fd >= 0) { @@ -1724,6 +1741,14 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream, consumer_steal_stream_key(stream->key, ht); lttng_ht_add_unique_ulong(ht, &stream->node); + + /* + * Add stream to the stream_list_ht of the consumer data. No need to steal + * the key since the HT does not use it and we allow to add redundant keys + * into this table. + */ + lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id); + rcu_read_unlock(); pthread_mutex_unlock(&consumer_data.lock); @@ -2413,3 +2438,66 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, error: return ret; } + +/* + * Check if for a given session id there is still data needed to be extract + * from the buffers. + * + * Return 1 if data is in fact available to be read or else 0. + */ +int consumer_data_available(uint64_t id) +{ + int ret; + struct lttng_ht_iter iter; + struct lttng_ht *ht; + struct lttng_consumer_stream *stream; + int (*data_available)(struct lttng_consumer_stream *); + + DBG("Consumer data available command on session id %" PRIu64, id); + + pthread_mutex_lock(&consumer_data.lock); + + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + data_available = lttng_kconsumer_data_available; + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + data_available = lttng_ustconsumer_data_available; + break; + default: + ERR("Unknown consumer data type"); + assert(0); + } + + /* Ease our life a bit */ + ht = consumer_data.stream_list_ht; + + cds_lfht_for_each_entry_duplicate(ht->ht, (long unsigned int) ht->hash_fct, + ht->match_fct, (void *)((unsigned long) id), + &iter.iter, stream, node_session_id.node) { + /* Check the stream for data. */ + ret = data_available(stream); + if (ret == 0) { + goto data_not_available; + } + } + + /* TODO: Support to ask the relayd if the streams are remote */ + + /* + * Finding _no_ node in the hash table means that the stream(s) have been + * removed thus data is guaranteed to be available for analysis from the + * trace files. This is *only* true for local consumer and not network + * streaming. + */ + + /* Data is available to be read by a viewer. */ + pthread_mutex_unlock(&consumer_data.lock); + return 1; + +data_not_available: + /* Data is still being extracted from buffers. */ + pthread_mutex_unlock(&consumer_data.lock); + return 0; +} diff --git a/src/common/consumer.h b/src/common/consumer.h index 9981856fe..df002f81a 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -414,5 +414,6 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock); void consumer_flag_relayd_for_destroy( struct consumer_relayd_sock_pair *relayd); +int consumer_data_available(uint64_t id); #endif /* LIB_CONSUMER_H */ diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 2456d3fc9..46413eda6 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -472,3 +472,42 @@ error: return ret; } +/* + * Check if data is still being extracted from the buffers for a specific + * stream. Consumer data lock MUST be acquired before calling this function. + * + * Return 0 if the traced data are still getting read else 1 meaning that the + * data is available for trace viewer reading. + */ +int lttng_kconsumer_data_available(struct lttng_consumer_stream *stream) +{ + int ret; + + assert(stream); + + /* + * Try to lock the stream mutex. On failure, we know that the stream is + * being used else where hence there is data still being extracted. + */ + ret = pthread_mutex_trylock(&stream->lock); + if (ret == EBUSY) { + goto data_not_available; + } + /* The stream is now locked so we can do our ustctl calls */ + + ret = kernctl_get_next_subbuf(stream->wait_fd); + if (ret == 0) { + /* There is still data so let's put back this subbuffer. */ + ret = kernctl_put_subbuf(stream->wait_fd); + assert(ret == 0); + pthread_mutex_unlock(&stream->lock); + goto data_not_available; + } + + /* Data is available to be read for this stream. */ + pthread_mutex_unlock(&stream->lock); + return 1; + +data_not_available: + return 0; +} diff --git a/src/common/kernel-consumer/kernel-consumer.h b/src/common/kernel-consumer/kernel-consumer.h index fd3e6d1cb..e836a841a 100644 --- a/src/common/kernel-consumer/kernel-consumer.h +++ b/src/common/kernel-consumer/kernel-consumer.h @@ -46,5 +46,6 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx); int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream); +int lttng_kconsumer_data_available(struct lttng_consumer_stream *stream); #endif /* _LTTNG_KCONSUMER_H */ diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index f802c462d..1bafeee07 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -312,8 +312,19 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_DATA_AVAILABLE: { - rcu_read_unlock(); - return -ENOSYS; + int32_t ret; + uint64_t id = msg.u.data_available.session_id; + + DBG("UST consumer data available command for id %" PRIu64, id); + + ret = consumer_data_available(id); + + /* Send back returned value to session daemon */ + ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret)); + if (ret < 0) { + PERROR("send data available ret code"); + } + break; } default: break; @@ -512,3 +523,43 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream) error: return ret; } + +/* + * Check if data is still being extracted from the buffers for a specific + * stream. Consumer data lock MUST be acquired before calling this function. + * + * Return 0 if the traced data are still getting read else 1 meaning that the + * data is available for trace viewer reading. + */ +int lttng_ustconsumer_data_available(struct lttng_consumer_stream *stream) +{ + int ret; + + assert(stream); + + /* + * Try to lock the stream mutex. On failure, we know that the stream is + * being used else where hence there is data still being extracted. + */ + ret = pthread_mutex_trylock(&stream->lock); + if (ret == EBUSY) { + goto data_not_available; + } + /* The stream is now locked so we can do our ustctl calls */ + + ret = ustctl_get_next_subbuf(stream->chan->handle, stream->buf); + if (ret == 0) { + /* There is still data so let's put back this subbuffer. */ + ret = ustctl_put_subbuf(stream->chan->handle, stream->buf); + assert(ret == 0); + pthread_mutex_unlock(&stream->lock); + goto data_not_available; + } + + /* Data is available to be read for this stream. */ + pthread_mutex_unlock(&stream->lock); + return 1; + +data_not_available: + return 0; +} diff --git a/src/common/ust-consumer/ust-consumer.h b/src/common/ust-consumer/ust-consumer.h index 6b507ed99..a8a167239 100644 --- a/src/common/ust-consumer/ust-consumer.h +++ b/src/common/ust-consumer/ust-consumer.h @@ -61,6 +61,7 @@ void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream); extern int lttng_ustctl_get_mmap_read_offset( struct lttng_ust_shm_handle *handle, struct lttng_ust_lib_ring_buffer *buf, unsigned long *off); +int lttng_ustconsumer_data_available(struct lttng_consumer_stream *stream); #else /* HAVE_LIBLTTNG_UST_CTL */ @@ -151,6 +152,11 @@ int lttng_ustctl_get_mmap_read_offset(struct lttng_ust_shm_handle *handle, { return -ENOSYS; } +static inline +int lttng_ustconsumer_data_available(struct lttng_consumer_stream *stream) +{ + return -ENOSYS; +} #endif /* HAVE_LIBLTTNG_UST_CTL */ #endif /* _LTTNG_USTCONSUMER_H */ -- 2.34.1