Fix: RCU hash table seed
[lttng-tools.git] / src / common / consumer.c
index efd9e7eb3d368552685c3e0a7bd3f54362943318..e66edee2b79609cdcb76c77bca74b04d97b50da9 100644 (file)
@@ -288,6 +288,8 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
 
        assert(relayd);
 
+       DBG("Cleaning up relayd sockets");
+
        /* Save the net sequence index before destroying the object */
        netidx = relayd->net_seq_idx;
 
@@ -1557,7 +1559,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                                written = ret_splice;
                        }
                        /* Socket operation failed. We consider the relayd dead */
-                       if (errno == EBADF) {
+                       if (errno == EBADF || errno == EPIPE) {
                                WARN("Remote relayd disconnected. Stopping");
                                relayd_hang_up = 1;
                                goto write_error;
@@ -1941,7 +1943,7 @@ static void validate_endpoint_status_data_stream(void)
        rcu_read_lock();
        cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
                /* Validate delete flag of the stream */
-               if (!stream->endpoint_status) {
+               if (stream->endpoint_status != CONSUMER_ENDPOINT_INACTIVE) {
                        continue;
                }
                /* Delete it right now */
@@ -2307,6 +2309,9 @@ void *consumer_thread_data_poll(void *data)
 
                /* Take care of high priority channels first. */
                for (i = 0; i < nb_fd; i++) {
+                       if (local_stream[i] == NULL) {
+                               continue;
+                       }
                        if (pollfd[i].revents & POLLPRI) {
                                DBG("Urgent read on fd %d", pollfd[i].fd);
                                high_prio = 1;
@@ -2315,6 +2320,7 @@ void *consumer_thread_data_poll(void *data)
                                if (len < 0 && len != -EAGAIN && len != -ENODATA) {
                                        /* Clean the stream and free it. */
                                        consumer_del_stream(local_stream[i], data_ht);
+                                       local_stream[i] = NULL;
                                } else if (len > 0) {
                                        local_stream[i]->data_read = 1;
                                }
@@ -2331,6 +2337,9 @@ void *consumer_thread_data_poll(void *data)
 
                /* Take care of low priority channels. */
                for (i = 0; i < nb_fd; i++) {
+                       if (local_stream[i] == NULL) {
+                               continue;
+                       }
                        if ((pollfd[i].revents & POLLIN) ||
                                        local_stream[i]->hangup_flush_done) {
                                DBG("Normal read on fd %d", pollfd[i].fd);
@@ -2339,6 +2348,7 @@ void *consumer_thread_data_poll(void *data)
                                if (len < 0 && len != -EAGAIN && len != -ENODATA) {
                                        /* Clean the stream and free it. */
                                        consumer_del_stream(local_stream[i], data_ht);
+                                       local_stream[i] = NULL;
                                } else if (len > 0) {
                                        local_stream[i]->data_read = 1;
                                }
@@ -2347,12 +2357,15 @@ void *consumer_thread_data_poll(void *data)
 
                /* Handle hangup and errors */
                for (i = 0; i < nb_fd; i++) {
+                       if (local_stream[i] == NULL) {
+                               continue;
+                       }
                        if (!local_stream[i]->hangup_flush_done
                                        && (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
                                        && (consumer_data.type == LTTNG_CONSUMER32_UST
                                                || consumer_data.type == LTTNG_CONSUMER64_UST)) {
                                DBG("fd %d is hup|err|nval. Attempting flush and read.",
-                                       pollfd[i].fd);
+                                               pollfd[i].fd);
                                lttng_ustconsumer_on_stream_hangup(local_stream[i]);
                                /* Attempt read again, for the data we just flushed. */
                                local_stream[i]->data_read = 1;
@@ -2366,22 +2379,27 @@ void *consumer_thread_data_poll(void *data)
                                DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
                                if (!local_stream[i]->data_read) {
                                        consumer_del_stream(local_stream[i], data_ht);
+                                       local_stream[i] = NULL;
                                        num_hup++;
                                }
                        } else if (pollfd[i].revents & POLLERR) {
                                ERR("Error returned in polling fd %d.", pollfd[i].fd);
                                if (!local_stream[i]->data_read) {
                                        consumer_del_stream(local_stream[i], data_ht);
+                                       local_stream[i] = NULL;
                                        num_hup++;
                                }
                        } else if (pollfd[i].revents & POLLNVAL) {
                                ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
                                if (!local_stream[i]->data_read) {
                                        consumer_del_stream(local_stream[i], data_ht);
+                                       local_stream[i] = NULL;
                                        num_hup++;
                                }
                        }
-                       local_stream[i]->data_read = 0;
+                       if (local_stream[i] != NULL) {
+                               local_stream[i]->data_read = 0;
+                       }
                }
        }
 end:
@@ -2703,29 +2721,29 @@ end:
  * Check if for a given session id there is still data needed to be extract
  * from the buffers.
  *
- * Return 1 if data is in fact available to be read or else 0.
+ * Return 1 if data is pending or else 0 meaning ready to be read.
  */
-int consumer_data_available(uint64_t id)
+int consumer_data_pending(uint64_t id)
 {
        int ret;
        struct lttng_ht_iter iter;
        struct lttng_ht *ht;
        struct lttng_consumer_stream *stream;
        struct consumer_relayd_sock_pair *relayd;
-       int (*data_available)(struct lttng_consumer_stream *);
+       int (*data_pending)(struct lttng_consumer_stream *);
 
-       DBG("Consumer data available command on session id %" PRIu64, id);
+       DBG("Consumer data pending command on session id %" PRIu64, id);
 
        rcu_read_lock();
        pthread_mutex_lock(&consumer_data.lock);
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
-               data_available = lttng_kconsumer_data_available;
+               data_pending = lttng_kconsumer_data_pending;
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
-               data_available = lttng_ustconsumer_data_available;
+               data_pending = lttng_ustconsumer_data_pending;
                break;
        default:
                ERR("Unknown consumer data type");
@@ -2736,13 +2754,13 @@ int consumer_data_available(uint64_t id)
        ht = consumer_data.stream_list_ht;
 
        cds_lfht_for_each_entry_duplicate(ht->ht,
-                       ht->hash_fct((void *)((unsigned long) id), 0x42UL),
+                       ht->hash_fct((void *)((unsigned long) id), lttng_ht_seed),
                        ht->match_fct, (void *)((unsigned long) id),
                        &iter.iter, stream, node_session_id.node) {
                /* If this call fails, the stream is being used hence data pending. */
                ret = stream_try_lock(stream);
                if (!ret) {
-                       goto data_not_available;
+                       goto data_not_pending;
                }
 
                /*
@@ -2754,10 +2772,10 @@ int consumer_data_available(uint64_t id)
                ret = cds_lfht_is_node_deleted(&stream->node.node);
                if (!ret) {
                        /* Check the stream if there is data in the buffers. */
-                       ret = data_available(stream);
-                       if (ret == 0) {
+                       ret = data_pending(stream);
+                       if (ret == 1) {
                                pthread_mutex_unlock(&stream->lock);
-                               goto data_not_available;
+                               goto data_not_pending;
                        }
                }
 
@@ -2772,20 +2790,20 @@ int consumer_data_available(uint64_t id)
                                 * are or will be marked for deletion hence no data pending.
                                 */
                                pthread_mutex_unlock(&stream->lock);
-                               goto data_not_available;
+                               goto data_not_pending;
                        }
 
                        pthread_mutex_lock(&relayd->ctrl_sock_mutex);
                        if (stream->metadata_flag) {
                                ret = relayd_quiescent_control(&relayd->control_sock);
                        } else {
-                               ret = relayd_data_available(&relayd->control_sock,
+                               ret = relayd_data_pending(&relayd->control_sock,
                                                stream->relayd_stream_id, stream->next_net_seq_num);
                        }
                        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-                       if (ret == 0) {
+                       if (ret == 1) {
                                pthread_mutex_unlock(&stream->lock);
-                               goto data_not_available;
+                               goto data_not_pending;
                        }
                }
                pthread_mutex_unlock(&stream->lock);
@@ -2801,11 +2819,11 @@ int consumer_data_available(uint64_t id)
        /* Data is available to be read by a viewer. */
        pthread_mutex_unlock(&consumer_data.lock);
        rcu_read_unlock();
-       return 1;
+       return 0;
 
-data_not_available:
+data_not_pending:
        /* Data is still being extracted from buffers. */
        pthread_mutex_unlock(&consumer_data.lock);
        rcu_read_unlock();
-       return 0;
+       return 1;
 }
This page took 0.026602 seconds and 4 git commands to generate.