/*
* Flag to inform the polling thread to quit when all fd hung up. Updated by
* the consumer_thread_receive_fds when it notices that all fds has hung up.
/*
* Flag to inform the polling thread to quit when all fd hung up. Updated by
* the consumer_thread_receive_fds when it notices that all fds has hung up.
* Global hash table containing respectively metadata and data streams. The
* stream element in this ht should only be updated by the metadata poll thread
* for the metadata and the data poll thread for the data.
*/
* Global hash table containing respectively metadata and data streams. The
* stream element in this ht should only be updated by the metadata poll thread
* for the metadata and the data poll thread for the data.
*/
lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
&iter);
node = lttng_ht_iter_get_node_ulong(&iter);
lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
&iter);
node = lttng_ht_iter_get_node_ulong(&iter);
stream->path_name, stream->key, stream->shm_fd, stream->wait_fd,
(unsigned long long) stream->mmap_len, stream->out_fd,
stream->net_seq_idx, stream->session_id);
stream->path_name, stream->key, stream->shm_fd, stream->wait_fd,
(unsigned long long) stream->mmap_len, stream->out_fd,
stream->net_seq_idx, stream->session_id);
DBG3("Adding consumer stream %d", stream->key);
pthread_mutex_lock(&consumer_data.lock);
DBG3("Adding consumer stream %d", stream->key);
pthread_mutex_lock(&consumer_data.lock);
* changed where this function will be called back again.
*/
if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
* changed where this function will be called back again.
*/
if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
if (stream->mmap_base != NULL) {
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
if (stream->mmap_base != NULL) {
DBG3("Adding metadata stream %d to hash table", stream->wait_fd);
pthread_mutex_lock(&consumer_data.lock);
DBG3("Adding metadata stream %d to hash table", stream->wait_fd);
pthread_mutex_lock(&consumer_data.lock);
+
+ /*
+ * Lookup the stream just to make sure it does not exist in our internal
+ * state. This should NEVER happen.
+ */
+ lttng_ht_lookup(ht, (void *)((unsigned long) stream->wait_fd), &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ assert(!node);
+
/* Find relayd and, if one is found, increment refcount. */
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd != NULL) {
/* Find relayd and, if one is found, increment refcount. */
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd != NULL) {
rcu_read_lock();
cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
/* Validate delete flag of the stream */
rcu_read_lock();
cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
/* Validate delete flag of the stream */
rcu_read_lock();
cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
/* Validate delete flag of the stream */
rcu_read_lock();
cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
/* Validate delete flag of the stream */
* since their might be data to consume.
*/
lttng_poll_del(&events, ctx->consumer_metadata_pipe[0]);
* since their might be data to consume.
*/
lttng_poll_del(&events, ctx->consumer_metadata_pipe[0]);
/* poll on the array of fds */
restart:
DBG("polling on %d fd", nb_fd + 1);
/* poll on the array of fds */
restart:
DBG("polling on %d fd", nb_fd + 1);
/* update the polling structure to poll on the established socket */
consumer_sockpoll[1].fd = sock;
consumer_sockpoll[1].events = POLLIN | POLLPRI;
/* update the polling structure to poll on the established socket */
consumer_sockpoll[1].fd = sock;
consumer_sockpoll[1].events = POLLIN | POLLPRI;
- /*
- * 2s of grace period, if no polling events occur during
- * this period, the polling thread will exit even if there
- * are still open FDs (should not happen, but safety mechanism).
- */
- consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
-
/*
* Notify the data poll thread to poll back again and test the
* consumer_quit state that we just set so to quit gracefully.
*/
notify_thread_pipe(ctx->consumer_data_pipe[1]);
/*
* Notify the data poll thread to poll back again and test the
* consumer_quit state that we just set so to quit gracefully.
*/
notify_thread_pipe(ctx->consumer_data_pipe[1]);
ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx)
{
ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx)
{
struct lttng_consumer_local_data *ctx, int sock,
struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock)
{
struct lttng_consumer_local_data *ctx, int sock,
struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock)
{
struct consumer_relayd_sock_pair *relayd;
DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
struct consumer_relayd_sock_pair *relayd;
DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
/* Copy received lttcomm socket */
lttcomm_copy_sock(&relayd->control_sock, relayd_sock);
ret = lttcomm_create_sock(&relayd->control_sock);
/* Copy received lttcomm socket */
lttcomm_copy_sock(&relayd->control_sock, relayd_sock);
ret = lttcomm_create_sock(&relayd->control_sock);
+ /* Immediately try to close the created socket if valid. */
+ if (relayd->control_sock.fd >= 0) {
+ if (close(relayd->control_sock.fd)) {
+ PERROR("close relayd control socket");
+ }
+ }
+ /* Handle create_sock error. */
/* Copy received lttcomm socket */
lttcomm_copy_sock(&relayd->data_sock, relayd_sock);
ret = lttcomm_create_sock(&relayd->data_sock);
/* Copy received lttcomm socket */
lttcomm_copy_sock(&relayd->data_sock, relayd_sock);
ret = lttcomm_create_sock(&relayd->data_sock);
+ /* Immediately try to close the created socket if valid. */
+ if (relayd->data_sock.fd >= 0) {
+ if (close(relayd->data_sock.fd)) {
+ PERROR("close relayd data socket");
+ }
+ }
+ /* Handle create_sock error. */
ht = consumer_data.stream_list_ht;
cds_lfht_for_each_entry_duplicate(ht->ht,
ht = consumer_data.stream_list_ht;
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->match_fct, (void *)((unsigned long) id),
&iter.iter, stream, node_session_id.node) {
/* If this call fails, the stream is being used hence data pending. */
ht->match_fct, (void *)((unsigned long) id),
&iter.iter, stream, node_session_id.node) {
/* If this call fails, the stream is being used hence data pending. */