.type = LTTNG_CONSUMER_UNKNOWN,
};
-/* timeout parameter, to control the polling thread grace period. */
-int consumer_poll_timeout = -1;
-
/*
* Flag to inform the polling thread to quit when all fd hung up. Updated by
* the consumer_thread_receive_fds when it notices that all fds has hung up.
assert(relayd);
+ DBG("Cleaning up relayd sockets");
+
/* Save the net sequence index before destroying the object */
netidx = relayd->net_seq_idx;
written = ret_splice;
}
/* Socket operation failed. We consider the relayd dead */
- if (errno == EBADF) {
+ if (errno == EBADF || errno == EPIPE) {
WARN("Remote relayd disconnected. Stopping");
relayd_hang_up = 1;
goto write_error;
rcu_read_lock();
cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
/* Validate delete flag of the stream */
- if (!stream->endpoint_status) {
+ if (stream->endpoint_status != CONSUMER_ENDPOINT_INACTIVE) {
continue;
}
/* Delete it right now */
/* poll on the array of fds */
restart:
DBG("polling on %d fd", nb_fd + 1);
- num_rdy = poll(pollfd, nb_fd + 1, consumer_poll_timeout);
+ num_rdy = poll(pollfd, nb_fd + 1, -1);
DBG("poll num_rdy : %d", num_rdy);
if (num_rdy == -1) {
/*
/* Take care of high priority channels first. */
for (i = 0; i < nb_fd; i++) {
+ if (local_stream[i] == NULL) {
+ continue;
+ }
if (pollfd[i].revents & POLLPRI) {
DBG("Urgent read on fd %d", pollfd[i].fd);
high_prio = 1;
if (len < 0 && len != -EAGAIN && len != -ENODATA) {
/* Clean the stream and free it. */
consumer_del_stream(local_stream[i], data_ht);
+ local_stream[i] = NULL;
} else if (len > 0) {
local_stream[i]->data_read = 1;
}
/* Take care of low priority channels. */
for (i = 0; i < nb_fd; i++) {
+ if (local_stream[i] == NULL) {
+ continue;
+ }
if ((pollfd[i].revents & POLLIN) ||
local_stream[i]->hangup_flush_done) {
DBG("Normal read on fd %d", pollfd[i].fd);
if (len < 0 && len != -EAGAIN && len != -ENODATA) {
/* Clean the stream and free it. */
consumer_del_stream(local_stream[i], data_ht);
+ local_stream[i] = NULL;
} else if (len > 0) {
local_stream[i]->data_read = 1;
}
/* Handle hangup and errors */
for (i = 0; i < nb_fd; i++) {
+ if (local_stream[i] == NULL) {
+ continue;
+ }
if (!local_stream[i]->hangup_flush_done
&& (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
&& (consumer_data.type == LTTNG_CONSUMER32_UST
|| consumer_data.type == LTTNG_CONSUMER64_UST)) {
DBG("fd %d is hup|err|nval. Attempting flush and read.",
- pollfd[i].fd);
+ pollfd[i].fd);
lttng_ustconsumer_on_stream_hangup(local_stream[i]);
/* Attempt read again, for the data we just flushed. */
local_stream[i]->data_read = 1;
DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
if (!local_stream[i]->data_read) {
consumer_del_stream(local_stream[i], data_ht);
+ local_stream[i] = NULL;
num_hup++;
}
} else if (pollfd[i].revents & POLLERR) {
ERR("Error returned in polling fd %d.", pollfd[i].fd);
if (!local_stream[i]->data_read) {
consumer_del_stream(local_stream[i], data_ht);
+ local_stream[i] = NULL;
num_hup++;
}
} else if (pollfd[i].revents & POLLNVAL) {
ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
if (!local_stream[i]->data_read) {
consumer_del_stream(local_stream[i], data_ht);
+ local_stream[i] = NULL;
num_hup++;
}
}
- local_stream[i]->data_read = 0;
+ if (local_stream[i] != NULL) {
+ local_stream[i]->data_read = 0;
+ }
}
}
end:
*/
consumer_quit = 1;
- /*
- * 2s of grace period, if no polling events occur during
- * this period, the polling thread will exit even if there
- * are still open FDs (should not happen, but safety mechanism).
- */
- consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
-
/*
* Notify the data poll thread to poll back again and test the
* consumer_quit state that we just set so to quit gracefully.
* Check if for a given session id there is still data needed to be extract
* from the buffers.
*
- * Return 1 if data is in fact available to be read or else 0.
+ * Return 1 if data is pending or else 0 meaning ready to be read.
*/
-int consumer_data_available(uint64_t id)
+int consumer_data_pending(uint64_t id)
{
int ret;
struct lttng_ht_iter iter;
struct lttng_ht *ht;
struct lttng_consumer_stream *stream;
struct consumer_relayd_sock_pair *relayd;
- int (*data_available)(struct lttng_consumer_stream *);
+ int (*data_pending)(struct lttng_consumer_stream *);
- DBG("Consumer data available command on session id %" PRIu64, id);
+ DBG("Consumer data pending command on session id %" PRIu64, id);
rcu_read_lock();
pthread_mutex_lock(&consumer_data.lock);
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
- data_available = lttng_kconsumer_data_available;
+ data_pending = lttng_kconsumer_data_pending;
break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
- data_available = lttng_ustconsumer_data_available;
+ data_pending = lttng_ustconsumer_data_pending;
break;
default:
ERR("Unknown consumer data type");
ht = consumer_data.stream_list_ht;
cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct((void *)((unsigned long) id), 0x42UL),
+ ht->hash_fct((void *)((unsigned long) id), lttng_ht_seed),
ht->match_fct, (void *)((unsigned long) id),
&iter.iter, stream, node_session_id.node) {
/* If this call fails, the stream is being used hence data pending. */
ret = stream_try_lock(stream);
if (!ret) {
- goto data_not_available;
+ goto data_not_pending;
}
/*
ret = cds_lfht_is_node_deleted(&stream->node.node);
if (!ret) {
/* Check the stream if there is data in the buffers. */
- ret = data_available(stream);
- if (ret == 0) {
+ ret = data_pending(stream);
+ if (ret == 1) {
pthread_mutex_unlock(&stream->lock);
- goto data_not_available;
+ goto data_not_pending;
}
}
* are or will be marked for deletion hence no data pending.
*/
pthread_mutex_unlock(&stream->lock);
- goto data_not_available;
+ goto data_not_pending;
}
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
if (stream->metadata_flag) {
ret = relayd_quiescent_control(&relayd->control_sock);
} else {
- ret = relayd_data_available(&relayd->control_sock,
+ ret = relayd_data_pending(&relayd->control_sock,
stream->relayd_stream_id, stream->next_net_seq_num);
}
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- if (ret == 0) {
+ if (ret == 1) {
pthread_mutex_unlock(&stream->lock);
- goto data_not_available;
+ goto data_not_pending;
}
}
pthread_mutex_unlock(&stream->lock);
/* Data is available to be read by a viewer. */
pthread_mutex_unlock(&consumer_data.lock);
rcu_read_unlock();
- return 1;
+ return 0;
-data_not_available:
+data_not_pending:
/* Data is still being extracted from buffers. */
pthread_mutex_unlock(&consumer_data.lock);
rcu_read_unlock();
- return 0;
+ return 1;
}