Consumer daemon data available command support
authorDavid Goulet <dgoulet@efficios.com>
Tue, 16 Oct 2012 20:42:40 +0000 (16:42 -0400)
committerDavid Goulet <dgoulet@efficios.com>
Fri, 19 Oct 2012 17:12:27 +0000 (13:12 -0400)
Only the UST and kernel consumer daemon supports the data available
command thus it's not yet implemented on the session daemon.

Once implemented in the liblttng-ctl and session daemon, the commit will
explain the purpose of this command.

This is mostly to separate commits so we don't push a 2000 liners
upstream.

Signed-off-by: David Goulet <dgoulet@efficios.com>
src/bin/lttng-sessiond/consumer.c
src/bin/lttng-sessiond/consumer.h
src/bin/lttng-sessiond/kernel-consumer.c
src/bin/lttng-sessiond/ust-consumer.c
src/common/consumer.c
src/common/consumer.h
src/common/kernel-consumer/kernel-consumer.c
src/common/kernel-consumer/kernel-consumer.h
src/common/ust-consumer/ust-consumer.c
src/common/ust-consumer/ust-consumer.h

index d33f85f1bf548659a050c180f517a98c180de7fe..071135bda6975b09abf4f4db7841454dfecdc269 100644 (file)
@@ -519,7 +519,8 @@ void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
                int net_index,
                unsigned int metadata_flag,
                const char *name,
-               const char *pathname)
+               const char *pathname,
+               unsigned int session_id)
 {
        assert(msg);
 
@@ -537,6 +538,7 @@ void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
        msg->u.stream.gid = gid;
        msg->u.stream.net_index = net_index;
        msg->u.stream.metadata_flag = metadata_flag;
+       msg->u.stream.session_id = (uint64_t) session_id;
        strncpy(msg->u.stream.name, name, sizeof(msg->u.stream.name));
        msg->u.stream.name[sizeof(msg->u.stream.name) - 1] = '\0';
        strncpy(msg->u.stream.path_name, pathname,
index 6855068dbedc86162acdeb00b8328db3aa808732..1a1e1c8d8086d4d9155fda1436a955d192d741a8 100644 (file)
@@ -190,7 +190,8 @@ void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
                int net_index,
                unsigned int metadata_flag,
                const char *name,
-               const char *pathname);
+               const char *pathname,
+               unsigned int session_id);
 void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
                enum lttng_consumer_command cmd,
                int channel_key,
index 33cbbed3e679a42df9db3951b0a2040b681b481c..c86d52803b80ac19c254eb3a1e267f3223d6c5d9 100644 (file)
@@ -138,7 +138,8 @@ int kernel_consumer_add_metadata(int sock, struct ltt_kernel_session *session)
                        consumer->net_seq_index,
                        1, /* Metadata flag set */
                        "metadata",
-                       pathname);
+                       pathname,
+                       session->id);
 
        /* Send stream and file descriptor */
        ret = consumer_send_stream(sock, consumer, &lkm,
@@ -207,7 +208,8 @@ int kernel_consumer_add_stream(int sock, struct ltt_kernel_channel *channel,
                        consumer->net_seq_index,
                        0, /* Metadata flag unset */
                        stream->name,
-                       pathname);
+                       pathname,
+                       session->id);
 
        /* Send stream and file descriptor */
        ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
index 44913cb8fcb763b40b49bc2dc6466d9bd6cef721..465dd07d74489cd26135126f903e1a30aa6070a2 100644 (file)
@@ -101,7 +101,8 @@ static int send_channel_stream(int sock, struct ust_app_channel *uchan,
                        consumer->net_seq_index,
                        0, /* Metadata flag unset */
                        stream->name,
-                       pathname);
+                       pathname,
+                       usess->id);
 
        /* Send stream and file descriptor */
        fds[0] = stream->obj->shm_fd;
@@ -266,7 +267,8 @@ static int send_metadata(int sock, struct ust_app_session *usess,
                        consumer->net_seq_index,
                        1, /* Flag metadata set */
                        "metadata",
-                       pathname);
+                       pathname,
+                       usess->id);
 
        /* Send stream and file descriptor */
        fds[0] = usess->metadata->stream_obj->shm_fd;
index 464198597b8cc9ece7ee00d465077cbe757eb00c..8d1a34025687da37b7c3c1555d80441c93d8761d 100644 (file)
@@ -281,6 +281,11 @@ void consumer_del_stream(struct lttng_consumer_stream *stream,
        iter.iter.node = &stream->node.node;
        ret = lttng_ht_del(ht, &iter);
        assert(!ret);
+
+       /* Remove node session id from the consumer_data stream ht */
+       iter.iter.node = &stream->node_session_id.node;
+       ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
+       assert(!ret);
        rcu_read_unlock();
 
        assert(consumer_data.stream_count > 0);
@@ -462,6 +467,13 @@ static int consumer_add_stream(struct lttng_consumer_stream *stream,
 
        lttng_ht_add_unique_ulong(ht, &stream->node);
 
+       /*
+        * Add stream to the stream_list_ht of the consumer data. No need to steal
+        * the key since the HT does not use it and we allow to add redundant keys
+        * into this table.
+        */
+       lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id);
+
        /* Check and cleanup relayd */
        relayd = consumer_find_relayd(stream->net_seq_idx);
        if (relayd != NULL) {
@@ -1606,6 +1618,11 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
        iter.iter.node = &stream->node.node;
        ret = lttng_ht_del(ht, &iter);
        assert(!ret);
+
+       /* Remove node session id from the consumer_data stream ht */
+       iter.iter.node = &stream->node_session_id.node;
+       ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
+       assert(!ret);
        rcu_read_unlock();
 
        if (stream->out_fd >= 0) {
@@ -1724,6 +1741,14 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
        consumer_steal_stream_key(stream->key, ht);
 
        lttng_ht_add_unique_ulong(ht, &stream->node);
+
+       /*
+        * Add stream to the stream_list_ht of the consumer data. No need to steal
+        * the key since the HT does not use it and we allow to add redundant keys
+        * into this table.
+        */
+       lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id);
+
        rcu_read_unlock();
 
        pthread_mutex_unlock(&consumer_data.lock);
@@ -2413,3 +2438,66 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
 error:
        return ret;
 }
+
+/*
+ * Check if for a given session id there is still data needed to be extract
+ * from the buffers.
+ *
+ * Return 1 if data is in fact available to be read or else 0.
+ */
+int consumer_data_available(uint64_t id)
+{
+       int ret;
+       struct lttng_ht_iter iter;
+       struct lttng_ht *ht;
+       struct lttng_consumer_stream *stream;
+       int (*data_available)(struct lttng_consumer_stream *);
+
+       DBG("Consumer data available command on session id %" PRIu64, id);
+
+       pthread_mutex_lock(&consumer_data.lock);
+
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               data_available = lttng_kconsumer_data_available;
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               data_available = lttng_ustconsumer_data_available;
+               break;
+       default:
+               ERR("Unknown consumer data type");
+               assert(0);
+       }
+
+       /* Ease our life a bit */
+       ht = consumer_data.stream_list_ht;
+
+       cds_lfht_for_each_entry_duplicate(ht->ht, (long unsigned int) ht->hash_fct,
+                       ht->match_fct, (void *)((unsigned long) id),
+                       &iter.iter, stream, node_session_id.node) {
+               /* Check the stream for data. */
+               ret = data_available(stream);
+               if (ret == 0) {
+                       goto data_not_available;
+               }
+       }
+
+       /* TODO: Support to ask the relayd if the streams are remote */
+
+       /*
+        * Finding _no_ node in the hash table means that the stream(s) have been
+        * removed thus data is guaranteed to be available for analysis from the
+        * trace files. This is *only* true for local consumer and not network
+        * streaming.
+        */
+
+       /* Data is available to be read by a viewer. */
+       pthread_mutex_unlock(&consumer_data.lock);
+       return 1;
+
+data_not_available:
+       /* Data is still being extracted from buffers. */
+       pthread_mutex_unlock(&consumer_data.lock);
+       return 0;
+}
index 9981856feeca8734a428951418c34c7e2f380e96..df002f81a606c62f75db59bf23d752ec29743e62 100644 (file)
@@ -414,5 +414,6 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock);
 void consumer_flag_relayd_for_destroy(
                struct consumer_relayd_sock_pair *relayd);
+int consumer_data_available(uint64_t id);
 
 #endif /* LIB_CONSUMER_H */
index 2456d3fc91936c69f074bbd038ee227c46e48220..46413eda6e710d9e2f934df9e49336a5fd6fcef0 100644 (file)
@@ -472,3 +472,42 @@ error:
        return ret;
 }
 
+/*
+ * Check if data is still being extracted from the buffers for a specific
+ * stream. Consumer data lock MUST be acquired before calling this function.
+ *
+ * Return 0 if the traced data are still getting read else 1 meaning that the
+ * data is available for trace viewer reading.
+ */
+int lttng_kconsumer_data_available(struct lttng_consumer_stream *stream)
+{
+       int ret;
+
+       assert(stream);
+
+       /*
+        * Try to lock the stream mutex. On failure, we know that the stream is
+        * being used else where hence there is data still being extracted.
+        */
+       ret = pthread_mutex_trylock(&stream->lock);
+       if (ret == EBUSY) {
+               goto data_not_available;
+       }
+       /* The stream is now locked so we can do our ustctl calls */
+
+       ret = kernctl_get_next_subbuf(stream->wait_fd);
+       if (ret == 0) {
+               /* There is still data so let's put back this subbuffer. */
+               ret = kernctl_put_subbuf(stream->wait_fd);
+               assert(ret == 0);
+               pthread_mutex_unlock(&stream->lock);
+               goto data_not_available;
+       }
+
+       /* Data is available to be read for this stream. */
+       pthread_mutex_unlock(&stream->lock);
+       return 1;
+
+data_not_available:
+       return 0;
+}
index fd3e6d1cbe5f49437799403b3c3397b378b5d924..e836a841aa9f91488399d02792e3e21fd7741cb1 100644 (file)
@@ -46,5 +46,6 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx);
 int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream);
+int lttng_kconsumer_data_available(struct lttng_consumer_stream *stream);
 
 #endif /* _LTTNG_KCONSUMER_H */
index f802c462d4393565d5c3da60be70bd6b8d1cad3e..1bafeee07daccb79ef8f5377256ec6ed78257018 100644 (file)
@@ -312,8 +312,19 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        }
        case LTTNG_CONSUMER_DATA_AVAILABLE:
        {
-               rcu_read_unlock();
-               return -ENOSYS;
+               int32_t ret;
+               uint64_t id = msg.u.data_available.session_id;
+
+               DBG("UST consumer data available command for id %" PRIu64, id);
+
+               ret = consumer_data_available(id);
+
+               /* Send back returned value to session daemon */
+               ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+               if (ret < 0) {
+                       PERROR("send data available ret code");
+               }
+               break;
        }
        default:
                break;
@@ -512,3 +523,43 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
 error:
        return ret;
 }
+
+/*
+ * Check if data is still being extracted from the buffers for a specific
+ * stream. Consumer data lock MUST be acquired before calling this function.
+ *
+ * Return 0 if the traced data are still getting read else 1 meaning that the
+ * data is available for trace viewer reading.
+ */
+int lttng_ustconsumer_data_available(struct lttng_consumer_stream *stream)
+{
+       int ret;
+
+       assert(stream);
+
+       /*
+        * Try to lock the stream mutex. On failure, we know that the stream is
+        * being used else where hence there is data still being extracted.
+        */
+       ret = pthread_mutex_trylock(&stream->lock);
+       if (ret == EBUSY) {
+               goto data_not_available;
+       }
+       /* The stream is now locked so we can do our ustctl calls */
+
+       ret = ustctl_get_next_subbuf(stream->chan->handle, stream->buf);
+       if (ret == 0) {
+               /* There is still data so let's put back this subbuffer. */
+               ret = ustctl_put_subbuf(stream->chan->handle, stream->buf);
+               assert(ret == 0);
+               pthread_mutex_unlock(&stream->lock);
+               goto data_not_available;
+       }
+
+       /* Data is available to be read for this stream. */
+       pthread_mutex_unlock(&stream->lock);
+       return 1;
+
+data_not_available:
+       return 0;
+}
index 6b507ed99f9a9fa1f2640f0d7a06f25315fd46ee..a8a167239f49759c34e2b065ab479ba3a00ad5c3 100644 (file)
@@ -61,6 +61,7 @@ void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream);
 extern int lttng_ustctl_get_mmap_read_offset(
                struct lttng_ust_shm_handle *handle,
                struct lttng_ust_lib_ring_buffer *buf, unsigned long *off);
+int lttng_ustconsumer_data_available(struct lttng_consumer_stream *stream);
 
 #else /* HAVE_LIBLTTNG_UST_CTL */
 
@@ -151,6 +152,11 @@ int lttng_ustctl_get_mmap_read_offset(struct lttng_ust_shm_handle *handle,
 {
        return -ENOSYS;
 }
+static inline
+int lttng_ustconsumer_data_available(struct lttng_consumer_stream *stream)
+{
+       return -ENOSYS;
+}
 #endif /* HAVE_LIBLTTNG_UST_CTL */
 
 #endif /* _LTTNG_USTCONSUMER_H */
This page took 0.0319 seconds and 4 git commands to generate.