Update version to v2.1.0-rc8
[lttng-tools.git] / src / common / consumer.c
index 41f8ae53fe67adde7552b54dfb31aa89d4abedcb..16b9eb64f88ae006a742ade5d3d05b162d5bb0ef 100644 (file)
@@ -47,9 +47,6 @@ struct lttng_consumer_global_data consumer_data = {
        .type = LTTNG_CONSUMER_UNKNOWN,
 };
 
-/* timeout parameter, to control the polling thread grace period. */
-int consumer_poll_timeout = -1;
-
 /*
  * Flag to inform the polling thread to quit when all fd hung up. Updated by
  * the consumer_thread_receive_fds when it notices that all fds has hung up.
@@ -288,6 +285,8 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
 
        assert(relayd);
 
+       DBG("Cleaning up relayd sockets");
+
        /* Save the net sequence index before destroying the object */
        netidx = relayd->net_seq_idx;
 
@@ -1557,7 +1556,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                                written = ret_splice;
                        }
                        /* Socket operation failed. We consider the relayd dead */
-                       if (errno == EBADF) {
+                       if (errno == EBADF || errno == EPIPE) {
                                WARN("Remote relayd disconnected. Stopping");
                                relayd_hang_up = 1;
                                goto write_error;
@@ -1941,7 +1940,7 @@ static void validate_endpoint_status_data_stream(void)
        rcu_read_lock();
        cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
                /* Validate delete flag of the stream */
-               if (!stream->endpoint_status) {
+               if (stream->endpoint_status != CONSUMER_ENDPOINT_INACTIVE) {
                        continue;
                }
                /* Delete it right now */
@@ -2248,7 +2247,7 @@ void *consumer_thread_data_poll(void *data)
                /* poll on the array of fds */
        restart:
                DBG("polling on %d fd", nb_fd + 1);
-               num_rdy = poll(pollfd, nb_fd + 1, consumer_poll_timeout);
+               num_rdy = poll(pollfd, nb_fd + 1, -1);
                DBG("poll num_rdy : %d", num_rdy);
                if (num_rdy == -1) {
                        /*
@@ -2307,6 +2306,9 @@ void *consumer_thread_data_poll(void *data)
 
                /* Take care of high priority channels first. */
                for (i = 0; i < nb_fd; i++) {
+                       if (local_stream[i] == NULL) {
+                               continue;
+                       }
                        if (pollfd[i].revents & POLLPRI) {
                                DBG("Urgent read on fd %d", pollfd[i].fd);
                                high_prio = 1;
@@ -2315,6 +2317,7 @@ void *consumer_thread_data_poll(void *data)
                                if (len < 0 && len != -EAGAIN && len != -ENODATA) {
                                        /* Clean the stream and free it. */
                                        consumer_del_stream(local_stream[i], data_ht);
+                                       local_stream[i] = NULL;
                                } else if (len > 0) {
                                        local_stream[i]->data_read = 1;
                                }
@@ -2331,6 +2334,9 @@ void *consumer_thread_data_poll(void *data)
 
                /* Take care of low priority channels. */
                for (i = 0; i < nb_fd; i++) {
+                       if (local_stream[i] == NULL) {
+                               continue;
+                       }
                        if ((pollfd[i].revents & POLLIN) ||
                                        local_stream[i]->hangup_flush_done) {
                                DBG("Normal read on fd %d", pollfd[i].fd);
@@ -2339,6 +2345,7 @@ void *consumer_thread_data_poll(void *data)
                                if (len < 0 && len != -EAGAIN && len != -ENODATA) {
                                        /* Clean the stream and free it. */
                                        consumer_del_stream(local_stream[i], data_ht);
+                                       local_stream[i] = NULL;
                                } else if (len > 0) {
                                        local_stream[i]->data_read = 1;
                                }
@@ -2347,12 +2354,15 @@ void *consumer_thread_data_poll(void *data)
 
                /* Handle hangup and errors */
                for (i = 0; i < nb_fd; i++) {
+                       if (local_stream[i] == NULL) {
+                               continue;
+                       }
                        if (!local_stream[i]->hangup_flush_done
                                        && (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
                                        && (consumer_data.type == LTTNG_CONSUMER32_UST
                                                || consumer_data.type == LTTNG_CONSUMER64_UST)) {
                                DBG("fd %d is hup|err|nval. Attempting flush and read.",
-                                       pollfd[i].fd);
+                                               pollfd[i].fd);
                                lttng_ustconsumer_on_stream_hangup(local_stream[i]);
                                /* Attempt read again, for the data we just flushed. */
                                local_stream[i]->data_read = 1;
@@ -2366,22 +2376,27 @@ void *consumer_thread_data_poll(void *data)
                                DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
                                if (!local_stream[i]->data_read) {
                                        consumer_del_stream(local_stream[i], data_ht);
+                                       local_stream[i] = NULL;
                                        num_hup++;
                                }
                        } else if (pollfd[i].revents & POLLERR) {
                                ERR("Error returned in polling fd %d.", pollfd[i].fd);
                                if (!local_stream[i]->data_read) {
                                        consumer_del_stream(local_stream[i], data_ht);
+                                       local_stream[i] = NULL;
                                        num_hup++;
                                }
                        } else if (pollfd[i].revents & POLLNVAL) {
                                ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
                                if (!local_stream[i]->data_read) {
                                        consumer_del_stream(local_stream[i], data_ht);
+                                       local_stream[i] = NULL;
                                        num_hup++;
                                }
                        }
-                       local_stream[i]->data_read = 0;
+                       if (local_stream[i] != NULL) {
+                               local_stream[i]->data_read = 0;
+                       }
                }
        }
 end:
@@ -2516,13 +2531,6 @@ end:
         */
        consumer_quit = 1;
 
-       /*
-        * 2s of grace period, if no polling events occur during
-        * this period, the polling thread will exit even if there
-        * are still open FDs (should not happen, but safety mechanism).
-        */
-       consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
-
        /*
         * Notify the data poll thread to poll back again and test the
         * consumer_quit state that we just set so to quit gracefully.
@@ -2736,7 +2744,7 @@ int consumer_data_pending(uint64_t id)
        ht = consumer_data.stream_list_ht;
 
        cds_lfht_for_each_entry_duplicate(ht->ht,
-                       ht->hash_fct((void *)((unsigned long) id), 0x42UL),
+                       ht->hash_fct((void *)((unsigned long) id), lttng_ht_seed),
                        ht->match_fct, (void *)((unsigned long) id),
                        &iter.iter, stream, node_session_id.node) {
                /* If this call fails, the stream is being used hence data pending. */
This page took 0.02552 seconds and 4 git commands to generate.