X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=ca6aeba3ffd7a8748256bd9cae356eeff3e977b6;hb=3c14c33fe61a2011881075506e6946ca5cdf23c1;hp=d245ed7eecb385cb72b76841a14944810f9a72c1;hpb=8994307fa7ccf9b61cc0157f2c5d34e248c56641;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index d245ed7ee..ca6aeba3f 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -288,6 +288,8 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd, assert(relayd); + DBG("Cleaning up relayd sockets"); + /* Save the net sequence index before destroying the object */ netidx = relayd->net_seq_idx; @@ -1941,7 +1943,7 @@ static void validate_endpoint_status_data_stream(void) rcu_read_lock(); cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) { /* Validate delete flag of the stream */ - if (!stream->endpoint_status) { + if (stream->endpoint_status != CONSUMER_ENDPOINT_INACTIVE) { continue; } /* Delete it right now */ @@ -2307,6 +2309,9 @@ void *consumer_thread_data_poll(void *data) /* Take care of high priority channels first. */ for (i = 0; i < nb_fd; i++) { + if (local_stream[i] == NULL) { + continue; + } if (pollfd[i].revents & POLLPRI) { DBG("Urgent read on fd %d", pollfd[i].fd); high_prio = 1; @@ -2315,6 +2320,7 @@ void *consumer_thread_data_poll(void *data) if (len < 0 && len != -EAGAIN && len != -ENODATA) { /* Clean the stream and free it. */ consumer_del_stream(local_stream[i], data_ht); + local_stream[i] = NULL; } else if (len > 0) { local_stream[i]->data_read = 1; } @@ -2331,6 +2337,9 @@ void *consumer_thread_data_poll(void *data) /* Take care of low priority channels. */ for (i = 0; i < nb_fd; i++) { + if (local_stream[i] == NULL) { + continue; + } if ((pollfd[i].revents & POLLIN) || local_stream[i]->hangup_flush_done) { DBG("Normal read on fd %d", pollfd[i].fd); @@ -2339,6 +2348,7 @@ void *consumer_thread_data_poll(void *data) if (len < 0 && len != -EAGAIN && len != -ENODATA) { /* Clean the stream and free it. */ consumer_del_stream(local_stream[i], data_ht); + local_stream[i] = NULL; } else if (len > 0) { local_stream[i]->data_read = 1; } @@ -2347,12 +2357,15 @@ void *consumer_thread_data_poll(void *data) /* Handle hangup and errors */ for (i = 0; i < nb_fd; i++) { + if (local_stream[i] == NULL) { + continue; + } if (!local_stream[i]->hangup_flush_done && (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL)) && (consumer_data.type == LTTNG_CONSUMER32_UST || consumer_data.type == LTTNG_CONSUMER64_UST)) { DBG("fd %d is hup|err|nval. Attempting flush and read.", - pollfd[i].fd); + pollfd[i].fd); lttng_ustconsumer_on_stream_hangup(local_stream[i]); /* Attempt read again, for the data we just flushed. */ local_stream[i]->data_read = 1; @@ -2366,22 +2379,27 @@ void *consumer_thread_data_poll(void *data) DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); if (!local_stream[i]->data_read) { consumer_del_stream(local_stream[i], data_ht); + local_stream[i] = NULL; num_hup++; } } else if (pollfd[i].revents & POLLERR) { ERR("Error returned in polling fd %d.", pollfd[i].fd); if (!local_stream[i]->data_read) { consumer_del_stream(local_stream[i], data_ht); + local_stream[i] = NULL; num_hup++; } } else if (pollfd[i].revents & POLLNVAL) { ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); if (!local_stream[i]->data_read) { consumer_del_stream(local_stream[i], data_ht); + local_stream[i] = NULL; num_hup++; } } - local_stream[i]->data_read = 0; + if (local_stream[i] != NULL) { + local_stream[i]->data_read = 0; + } } } end: @@ -2671,33 +2689,61 @@ error: return ret; } +/* + * Try to lock the stream mutex. + * + * On success, 1 is returned else 0 indicating that the mutex is NOT lock. + */ +static int stream_try_lock(struct lttng_consumer_stream *stream) +{ + int ret; + + assert(stream); + + /* + * Try to lock the stream mutex. On failure, we know that the stream is + * being used else where hence there is data still being extracted. + */ + ret = pthread_mutex_trylock(&stream->lock); + if (ret) { + /* For both EBUSY and EINVAL error, the mutex is NOT locked. */ + ret = 0; + goto end; + } + + ret = 1; + +end: + return ret; +} + /* * Check if for a given session id there is still data needed to be extract * from the buffers. * - * Return 1 if data is in fact available to be read or else 0. + * Return 1 if data is pending or else 0 meaning ready to be read. */ -int consumer_data_available(uint64_t id) +int consumer_data_pending(uint64_t id) { int ret; struct lttng_ht_iter iter; struct lttng_ht *ht; struct lttng_consumer_stream *stream; struct consumer_relayd_sock_pair *relayd; - int (*data_available)(struct lttng_consumer_stream *); + int (*data_pending)(struct lttng_consumer_stream *); - DBG("Consumer data available command on session id %" PRIu64, id); + DBG("Consumer data pending command on session id %" PRIu64, id); rcu_read_lock(); pthread_mutex_lock(&consumer_data.lock); switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: - data_available = lttng_kconsumer_data_available; + data_pending = lttng_kconsumer_data_pending; break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: - data_available = lttng_ustconsumer_data_available; + data_pending = lttng_ustconsumer_data_pending; break; default: ERR("Unknown consumer data type"); @@ -2711,30 +2757,56 @@ int consumer_data_available(uint64_t id) ht->hash_fct((void *)((unsigned long) id), 0x42UL), ht->match_fct, (void *)((unsigned long) id), &iter.iter, stream, node_session_id.node) { - /* Check the stream for data. */ - ret = data_available(stream); - if (ret == 0) { - goto data_not_available; + /* If this call fails, the stream is being used hence data pending. */ + ret = stream_try_lock(stream); + if (!ret) { + goto data_not_pending; + } + + /* + * A removed node from the hash table indicates that the stream has + * been deleted thus having a guarantee that the buffers are closed + * on the consumer side. However, data can still be transmitted + * over the network so don't skip the relayd check. + */ + ret = cds_lfht_is_node_deleted(&stream->node.node); + if (!ret) { + /* Check the stream if there is data in the buffers. */ + ret = data_pending(stream); + if (ret == 1) { + pthread_mutex_unlock(&stream->lock); + goto data_not_pending; + } } + /* Relayd check */ if (stream->net_seq_idx != -1) { relayd = consumer_find_relayd(stream->net_seq_idx); - assert(relayd); + if (!relayd) { + /* + * At this point, if the relayd object is not available for the + * given stream, it is because the relayd is being cleaned up + * so every stream associated with it (for a session id value) + * are or will be marked for deletion hence no data pending. + */ + pthread_mutex_unlock(&stream->lock); + goto data_not_pending; + } - pthread_mutex_lock(&stream->lock); pthread_mutex_lock(&relayd->ctrl_sock_mutex); if (stream->metadata_flag) { ret = relayd_quiescent_control(&relayd->control_sock); } else { - ret = relayd_data_available(&relayd->control_sock, + ret = relayd_data_pending(&relayd->control_sock, stream->relayd_stream_id, stream->next_net_seq_num); } pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - pthread_mutex_unlock(&stream->lock); - if (ret == 0) { - goto data_not_available; + if (ret == 1) { + pthread_mutex_unlock(&stream->lock); + goto data_not_pending; } } + pthread_mutex_unlock(&stream->lock); } /* @@ -2747,11 +2819,11 @@ int consumer_data_available(uint64_t id) /* Data is available to be read by a viewer. */ pthread_mutex_unlock(&consumer_data.lock); rcu_read_unlock(); - return 1; + return 0; -data_not_available: +data_not_pending: /* Data is still being extracted from buffers. */ pthread_mutex_unlock(&consumer_data.lock); rcu_read_unlock(); - return 0; + return 1; }