Add mkdir_recursive function to libcommon utils
[lttng-tools.git] / src / common / consumer.c
index 1045bfb46101d8a6bb63a8c91157f3ffbf5199a2..08592f6c0b8713f1ce1765a7de4d9237fd8bd81a 100644 (file)
@@ -63,21 +63,6 @@ volatile int consumer_quit;
 static struct lttng_ht *metadata_ht;
 static struct lttng_ht *data_ht;
 
-/*
- * This hash table contains the mapping between the session id of the sessiond
- * and the relayd session id. Element of the ht are indexed by sessiond session
- * id.
- *
- * Node can be added when a relayd communication is opened in the sessiond
- * thread.
- *
- * Note that a session id of the session daemon is unique to a tracing session
- * and not to a domain session. However, a domain session has one consumer
- * which forces the 1-1 mapping between a consumer and a domain session (ex:
- * UST). This means that we can't have duplicate in this ht.
- */
-static struct lttng_ht *relayd_session_id_ht;
-
 /*
  * Notify a thread pipe to poll back again. This usually means that some global
  * state has changed so we just send back the thread in a poll wait call.
@@ -231,7 +216,6 @@ static void destroy_relayd(struct consumer_relayd_sock_pair *relayd)
 {
        int ret;
        struct lttng_ht_iter iter;
-       struct lttng_ht_node_ulong *node;
 
        if (relayd == NULL) {
                return;
@@ -239,22 +223,6 @@ static void destroy_relayd(struct consumer_relayd_sock_pair *relayd)
 
        DBG("Consumer destroy and close relayd socket pair");
 
-       /* Loockup for a relayd node in the session id map hash table. */
-       lttng_ht_lookup(relayd_session_id_ht,
-                       (void *)((unsigned long) relayd->sessiond_session_id), &iter);
-       node = lttng_ht_iter_get_node_ulong(&iter);
-       if (node == NULL) {
-               /* We assume the relayd is being or is destroyed */
-               return;
-       }
-
-       /*
-        * Try to delete it from the relayd session id ht. The return value is of
-        * no importance since either way we are going to try to delete the relayd
-        * from the global relayd_ht.
-        */
-       lttng_ht_del(relayd_session_id_ht, &iter);
-
        iter.iter.node = &relayd->node.node;
        ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
        if (ret != 0) {
@@ -283,8 +251,6 @@ static void cleanup_relayd_ht(void)
        }
 
        lttng_ht_destroy(consumer_data.relayd_ht);
-       /* The destroy_relayd call makes sure that this ht is empty here. */
-       lttng_ht_destroy(relayd_session_id_ht);
 
        rcu_read_unlock();
 }
@@ -789,7 +755,7 @@ static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
                 * this next value, 1 should always be substracted in order to compare
                 * the last seen sequence number on the relayd side to the last sent.
                 */
-               data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++);
+               data_hdr.net_seq_num = htobe64(stream->next_net_seq_num);
                /* Other fields are zeroed previously */
 
                ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
@@ -798,6 +764,8 @@ static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
                        goto error;
                }
 
+               ++stream->next_net_seq_num;
+
                /* Set to go on data socket */
                outfd = relayd->data_sock.fd;
        }
@@ -2189,7 +2157,7 @@ restart:
                                        if (stream == NULL) {
                                                /* Check for deleted streams. */
                                                validate_endpoint_status_metadata_stream(&events);
-                                               continue;
+                                               goto restart;
                                        }
 
                                        DBG("Adding metadata stream %d to poll set",
@@ -2315,14 +2283,11 @@ void *consumer_thread_data_poll(void *data)
                 */
                pthread_mutex_lock(&consumer_data.lock);
                if (consumer_data.need_update) {
-                       if (pollfd != NULL) {
-                               free(pollfd);
-                               pollfd = NULL;
-                       }
-                       if (local_stream != NULL) {
-                               free(local_stream);
-                               local_stream = NULL;
-                       }
+                       free(pollfd);
+                       pollfd = NULL;
+
+                       free(local_stream);
+                       local_stream = NULL;
 
                        /* allocate for all fds + 1 for the consumer_data_pipe */
                        pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
@@ -2383,7 +2348,7 @@ void *consumer_thread_data_poll(void *data)
                 * array update over low-priority reads.
                 */
                if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
-                       size_t pipe_readlen;
+                       ssize_t pipe_readlen;
 
                        DBG("consumer_data_pipe wake up");
                        /* Consume 1 byte of pipe data */
@@ -2519,14 +2484,8 @@ void *consumer_thread_data_poll(void *data)
        }
 end:
        DBG("polling thread exiting");
-       if (pollfd != NULL) {
-               free(pollfd);
-               pollfd = NULL;
-       }
-       if (local_stream != NULL) {
-               free(local_stream);
-               local_stream = NULL;
-       }
+       free(pollfd);
+       free(local_stream);
 
        /*
         * Close the write side of the pipe so epoll_wait() in
@@ -2730,7 +2689,6 @@ void lttng_consumer_init(void)
        consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
        consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
        consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
-       relayd_session_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
 }
 
 /*
@@ -2747,7 +2705,6 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
        int fd = -1, ret = -1, relayd_created = 0;
        enum lttng_error_code ret_code = LTTNG_OK;
        struct consumer_relayd_sock_pair *relayd;
-       struct consumer_relayd_session_id *relayd_id_node;
 
        DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
 
@@ -2830,24 +2787,6 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                        goto error;
                }
 
-               /* Set up a relayd session id node. */
-               relayd_id_node = zmalloc(sizeof(struct consumer_relayd_session_id));
-               if (!relayd_id_node) {
-                       PERROR("zmalloc relayd id node");
-                       ret = -1;
-                       goto error;
-               }
-
-               relayd_id_node->relayd_id = relayd->relayd_session_id;
-               relayd_id_node->sessiond_id = (uint64_t) sessiond_id;
-
-               /* Indexed by session id of the sessiond. */
-               lttng_ht_node_init_ulong(&relayd_id_node->node,
-                               relayd_id_node->sessiond_id);
-               rcu_read_lock();
-               lttng_ht_add_unique_ulong(relayd_session_id_ht, &relayd_id_node->node);
-               rcu_read_unlock();
-
                break;
        case LTTNG_STREAM_DATA:
                /* Copy received lttcomm socket */
@@ -2941,30 +2880,24 @@ end:
 static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
 {
        struct lttng_ht_iter iter;
-       struct lttng_ht_node_ulong *node;
        struct consumer_relayd_sock_pair *relayd = NULL;
-       struct consumer_relayd_session_id *session_id_map;
-
-       /* Get the session id map. */
-       lttng_ht_lookup(relayd_session_id_ht, (void *)((unsigned long) id), &iter);
-       node = lttng_ht_iter_get_node_ulong(&iter);
-       if (node == NULL) {
-               goto end;
-       }
-
-       session_id_map = caa_container_of(node, struct consumer_relayd_session_id,
-                       node);
 
        /* Iterate over all relayd since they are indexed by net_seq_idx. */
        cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
                        node.node) {
-               if (relayd->relayd_session_id == session_id_map->relayd_id) {
+               /*
+                * Check by sessiond id which is unique here where the relayd session
+                * id might not be when having multiple relayd.
+                */
+               if (relayd->sessiond_session_id == id) {
                        /* Found the relayd. There can be only one per id. */
-                       break;
+                       goto found;
                }
        }
 
-end:
+       return NULL;
+
+found:
        return relayd;
 }
 
This page took 0.025511 seconds and 4 git commands to generate.