Fix a typo in lttng-probe-module name
[lttng-tools.git] / src / bin / lttng-sessiond / consumer.c
index c9197bd8ecf9afc682f3b9e43740cf7fe0298aca..bf477933515502510c284c58762b56d2b4399f75 100644 (file)
@@ -48,7 +48,7 @@ int consumer_send_destroy_relayd(struct consumer_socket *sock,
 
        /* Bail out if consumer is disabled */
        if (!consumer->enabled) {
-               ret = LTTCOMM_OK;
+               ret = LTTNG_OK;
                DBG3("Consumer is disabled");
                goto error;
        }
@@ -314,6 +314,7 @@ void consumer_destroy_output(struct consumer_output *obj)
  */
 struct consumer_output *consumer_copy_output(struct consumer_output *obj)
 {
+       struct lttng_ht *tmp_ht_ptr;
        struct lttng_ht_iter iter;
        struct consumer_socket *socket, *copy_sock;
        struct consumer_output *output;
@@ -324,11 +325,13 @@ struct consumer_output *consumer_copy_output(struct consumer_output *obj)
        if (output == NULL) {
                goto error;
        }
+       /* Avoid losing the HT reference after the memcpy() */
+       tmp_ht_ptr = output->socks;
 
        memcpy(output, obj, sizeof(struct consumer_output));
 
-       /* Copy sockets */
-       output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+       /* Putting back the HT pointer and start copying socket(s). */
+       output->socks = tmp_ht_ptr;
 
        cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
                /* Create new socket object. */
@@ -337,6 +340,7 @@ struct consumer_output *consumer_copy_output(struct consumer_output *obj)
                        goto malloc_error;
                }
 
+               copy_sock->registered = socket->registered;
                copy_sock->lock = socket->lock;
                consumer_add_socket(copy_sock, output);
        }
@@ -486,7 +490,8 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
                int channel_key,
                uint64_t max_sb_size,
                uint64_t mmap_len,
-               const char *name)
+               const char *name,
+               unsigned int nb_init_streams)
 {
        assert(msg);
 
@@ -500,6 +505,7 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
        msg->u.channel.channel_key = channel_key;
        msg->u.channel.max_sb_size = max_sb_size;
        msg->u.channel.mmap_len = mmap_len;
+       msg->u.channel.nb_init_streams = nb_init_streams;
 }
 
 /*
@@ -517,7 +523,8 @@ void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
                int net_index,
                unsigned int metadata_flag,
                const char *name,
-               const char *pathname)
+               const char *pathname,
+               unsigned int session_id)
 {
        assert(msg);
 
@@ -535,6 +542,7 @@ void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
        msg->u.stream.gid = gid;
        msg->u.stream.net_index = net_index;
        msg->u.stream.metadata_flag = metadata_flag;
+       msg->u.stream.session_id = (uint64_t) session_id;
        strncpy(msg->u.stream.name, name, sizeof(msg->u.stream.name));
        msg->u.stream.name[sizeof(msg->u.stream.name) - 1] = '\0';
        strncpy(msg->u.stream.path_name, pathname,
@@ -560,9 +568,12 @@ int consumer_send_stream(int sock, struct consumer_output *dst,
                break;
        case CONSUMER_DST_LOCAL:
                /* Add stream file name to stream path */
-               strncat(msg->u.stream.path_name, "/", sizeof(msg->u.stream.path_name));
+               strncat(msg->u.stream.path_name, "/",
+                               sizeof(msg->u.stream.path_name) -
+                               strlen(msg->u.stream.path_name) - 1);
                strncat(msg->u.stream.path_name, msg->u.stream.name,
-                               sizeof(msg->u.stream.path_name));
+                               sizeof(msg->u.stream.path_name) -
+                               strlen(msg->u.stream.path_name) - 1);
                msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0';
                /* Indicate that the stream is NOT network */
                msg->u.stream.net_index = -1;
@@ -608,7 +619,7 @@ int consumer_send_relayd_socket(int consumer_sock,
 
        /* Bail out if consumer is disabled */
        if (!consumer->enabled) {
-               ret = LTTCOMM_OK;
+               ret = LTTNG_OK;
                goto error;
        }
 
@@ -688,3 +699,63 @@ int consumer_set_subdir(struct consumer_output *consumer,
 error:
        return ret;
 }
+
+/*
+ * Ask the consumer if the data is ready to read (NOT pending) for the specific
+ * session id.
+ *
+ * This function has a different behavior with the consumer i.e. that it waits
+ * for a reply from the consumer if yes or no the data is pending.
+ */
+int consumer_is_data_pending(unsigned int id,
+               struct consumer_output *consumer)
+{
+       int ret;
+       int32_t ret_code = 0;  /* Default is that the data is NOT pending */
+       struct consumer_socket *socket;
+       struct lttng_ht_iter iter;
+       struct lttcomm_consumer_msg msg;
+
+       assert(consumer);
+
+       msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING;
+
+       msg.u.data_pending.session_id = (uint64_t) id;
+
+       DBG3("Consumer data pending for id %u", id);
+
+       /* Send command for each consumer */
+       cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
+                       node.node) {
+               /* Code flow error */
+               assert(socket->fd >= 0);
+
+               pthread_mutex_lock(socket->lock);
+
+               ret = lttcomm_send_unix_sock(socket->fd, &msg, sizeof(msg));
+               if (ret < 0) {
+                       PERROR("send consumer data pending command");
+                       pthread_mutex_unlock(socket->lock);
+                       goto error;
+               }
+
+               ret = lttcomm_recv_unix_sock(socket->fd, &ret_code, sizeof(ret_code));
+               if (ret < 0) {
+                       PERROR("recv consumer data pending status");
+                       pthread_mutex_unlock(socket->lock);
+                       goto error;
+               }
+
+               pthread_mutex_unlock(socket->lock);
+
+               if (ret_code == 1) {
+                       break;
+               }
+       }
+
+       DBG("Consumer data pending ret %d", ret_code);
+       return ret_code;
+
+error:
+       return -1;
+}
This page took 0.024916 seconds and 4 git commands to generate.