/*
* Flag to inform the polling thread to quit when all fd hung up. Updated by
* the consumer_thread_receive_fds when it notices that all fds has hung up.
/*
* Flag to inform the polling thread to quit when all fd hung up. Updated by
* the consumer_thread_receive_fds when it notices that all fds has hung up.
* Global hash table containing respectively metadata and data streams. The
* stream element in this ht should only be updated by the metadata poll thread
* for the metadata and the data poll thread for the data.
*/
* Global hash table containing respectively metadata and data streams. The
* stream element in this ht should only be updated by the metadata poll thread
* for the metadata and the data poll thread for the data.
*/
lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
&iter);
node = lttng_ht_iter_get_node_ulong(&iter);
lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
&iter);
node = lttng_ht_iter_get_node_ulong(&iter);
stream->path_name, stream->key, stream->shm_fd, stream->wait_fd,
(unsigned long long) stream->mmap_len, stream->out_fd,
stream->net_seq_idx, stream->session_id);
stream->path_name, stream->key, stream->shm_fd, stream->wait_fd,
(unsigned long long) stream->mmap_len, stream->out_fd,
stream->net_seq_idx, stream->session_id);
DBG3("Adding consumer stream %d", stream->key);
pthread_mutex_lock(&consumer_data.lock);
DBG3("Adding consumer stream %d", stream->key);
pthread_mutex_lock(&consumer_data.lock);
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
if (stream->mmap_base != NULL) {
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
if (stream->mmap_base != NULL) {
DBG3("Adding metadata stream %d to hash table", stream->wait_fd);
pthread_mutex_lock(&consumer_data.lock);
DBG3("Adding metadata stream %d to hash table", stream->wait_fd);
pthread_mutex_lock(&consumer_data.lock);
+
+ /*
+ * Lookup the stream just to make sure it does not exist in our internal
+ * state. This should NEVER happen.
+ */
+ lttng_ht_lookup(ht, (void *)((unsigned long) stream->wait_fd), &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ assert(!node);
+
/* Find relayd and, if one is found, increment refcount. */
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd != NULL) {
/* Find relayd and, if one is found, increment refcount. */
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd != NULL) {
rcu_read_lock();
cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
/* Validate delete flag of the stream */
rcu_read_lock();
cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
/* Validate delete flag of the stream */
* since their might be data to consume.
*/
lttng_poll_del(&events, ctx->consumer_metadata_pipe[0]);
* since their might be data to consume.
*/
lttng_poll_del(&events, ctx->consumer_metadata_pipe[0]);
/* poll on the array of fds */
restart:
DBG("polling on %d fd", nb_fd + 1);
/* poll on the array of fds */
restart:
DBG("polling on %d fd", nb_fd + 1);
if (pollfd[i].revents & POLLPRI) {
DBG("Urgent read on fd %d", pollfd[i].fd);
high_prio = 1;
if (pollfd[i].revents & POLLPRI) {
DBG("Urgent read on fd %d", pollfd[i].fd);
high_prio = 1;
if (len < 0 && len != -EAGAIN && len != -ENODATA) {
/* Clean the stream and free it. */
consumer_del_stream(local_stream[i], data_ht);
if (len < 0 && len != -EAGAIN && len != -ENODATA) {
/* Clean the stream and free it. */
consumer_del_stream(local_stream[i], data_ht);
if ((pollfd[i].revents & POLLIN) ||
local_stream[i]->hangup_flush_done) {
DBG("Normal read on fd %d", pollfd[i].fd);
if ((pollfd[i].revents & POLLIN) ||
local_stream[i]->hangup_flush_done) {
DBG("Normal read on fd %d", pollfd[i].fd);
if (len < 0 && len != -EAGAIN && len != -ENODATA) {
/* Clean the stream and free it. */
consumer_del_stream(local_stream[i], data_ht);
if (len < 0 && len != -EAGAIN && len != -ENODATA) {
/* Clean the stream and free it. */
consumer_del_stream(local_stream[i], data_ht);
if (!local_stream[i]->hangup_flush_done
&& (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
&& (consumer_data.type == LTTNG_CONSUMER32_UST
|| consumer_data.type == LTTNG_CONSUMER64_UST)) {
DBG("fd %d is hup|err|nval. Attempting flush and read.",
if (!local_stream[i]->hangup_flush_done
&& (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
&& (consumer_data.type == LTTNG_CONSUMER32_UST
|| consumer_data.type == LTTNG_CONSUMER64_UST)) {
DBG("fd %d is hup|err|nval. Attempting flush and read.",
lttng_ustconsumer_on_stream_hangup(local_stream[i]);
/* Attempt read again, for the data we just flushed. */
local_stream[i]->data_read = 1;
lttng_ustconsumer_on_stream_hangup(local_stream[i]);
/* Attempt read again, for the data we just flushed. */
local_stream[i]->data_read = 1;
DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
if (!local_stream[i]->data_read) {
consumer_del_stream(local_stream[i], data_ht);
DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
if (!local_stream[i]->data_read) {
consumer_del_stream(local_stream[i], data_ht);
num_hup++;
}
} else if (pollfd[i].revents & POLLERR) {
ERR("Error returned in polling fd %d.", pollfd[i].fd);
if (!local_stream[i]->data_read) {
consumer_del_stream(local_stream[i], data_ht);
num_hup++;
}
} else if (pollfd[i].revents & POLLERR) {
ERR("Error returned in polling fd %d.", pollfd[i].fd);
if (!local_stream[i]->data_read) {
consumer_del_stream(local_stream[i], data_ht);
num_hup++;
}
} else if (pollfd[i].revents & POLLNVAL) {
ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
if (!local_stream[i]->data_read) {
consumer_del_stream(local_stream[i], data_ht);
num_hup++;
}
} else if (pollfd[i].revents & POLLNVAL) {
ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
if (!local_stream[i]->data_read) {
consumer_del_stream(local_stream[i], data_ht);
- /*
- * 2s of grace period, if no polling events occur during
- * this period, the polling thread will exit even if there
- * are still open FDs (should not happen, but safety mechanism).
- */
- consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
-
/*
* Notify the data poll thread to poll back again and test the
* consumer_quit state that we just set so to quit gracefully.
/*
* Notify the data poll thread to poll back again and test the
* consumer_quit state that we just set so to quit gracefully.
ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx)
{
ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx)
{
ht = consumer_data.stream_list_ht;
cds_lfht_for_each_entry_duplicate(ht->ht,
ht = consumer_data.stream_list_ht;
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->match_fct, (void *)((unsigned long) id),
&iter.iter, stream, node_session_id.node) {
/* If this call fails, the stream is being used hence data pending. */
ht->match_fct, (void *)((unsigned long) id),
&iter.iter, stream, node_session_id.node) {
/* If this call fails, the stream is being used hence data pending. */