Fix: Synchronization issue for data available command
[lttng-tools.git] / src / common / consumer.c
index 132e3ec6891f925006b8b43910b2a24b3e22f468..efd9e7eb3d368552685c3e0a7bd3f54362943318 100644 (file)
@@ -69,6 +69,21 @@ volatile int consumer_quit;
 struct lttng_ht *metadata_ht;
 struct lttng_ht *data_ht;
 
+/*
+ * Notify a thread pipe to poll back again. This usually means that some global
+ * state has changed so we just send back the thread in a poll wait call.
+ */
+static void notify_thread_pipe(int wpipe)
+{
+       int ret;
+
+       do {
+               struct lttng_consumer_stream *null_stream = NULL;
+
+               ret = write(wpipe, &null_stream, sizeof(null_stream));
+       } while (ret < 0 && errno == EINTR);
+}
+
 /*
  * Find a stream. The consumer_data.lock must be locked during this
  * call.
@@ -182,6 +197,17 @@ static void consumer_rcu_free_relayd(struct rcu_head *head)
        struct consumer_relayd_sock_pair *relayd =
                caa_container_of(node, struct consumer_relayd_sock_pair, node);
 
+       /*
+        * Close all sockets. This is done in the call RCU since we don't want the
+        * socket fds to be reassigned thus potentially creating bad state of the
+        * relayd object.
+        *
+        * We do not have to lock the control socket mutex here since at this stage
+        * there is no one referencing to this relayd object.
+        */
+       (void) relayd_close(&relayd->control_sock);
+       (void) relayd_close(&relayd->data_sock);
+
        free(relayd);
 }
 
@@ -204,20 +230,88 @@ static void destroy_relayd(struct consumer_relayd_sock_pair *relayd)
        iter.iter.node = &relayd->node.node;
        ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
        if (ret != 0) {
-               /* We assume the relayd was already destroyed */
+               /* We assume the relayd is being or is destroyed */
                return;
        }
 
-       /* Close all sockets */
-       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-       (void) relayd_close(&relayd->control_sock);
-       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-       (void) relayd_close(&relayd->data_sock);
-
        /* RCU free() call */
        call_rcu(&relayd->node.head, consumer_rcu_free_relayd);
 }
 
+/*
+ * Update the end point status of all streams having the given network sequence
+ * index (relayd index).
+ *
+ * It's atomically set without having the stream mutex locked which is fine
+ * because we handle the write/read race with a pipe wakeup for each thread.
+ */
+static void update_endpoint_status_by_netidx(int net_seq_idx,
+               enum consumer_endpoint_status status)
+{
+       struct lttng_ht_iter iter;
+       struct lttng_consumer_stream *stream;
+
+       DBG("Consumer set delete flag on stream by idx %d", net_seq_idx);
+
+       rcu_read_lock();
+
+       /* Let's begin with metadata */
+       cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
+               if (stream->net_seq_idx == net_seq_idx) {
+                       uatomic_set(&stream->endpoint_status, status);
+                       DBG("Delete flag set to metadata stream %d", stream->wait_fd);
+               }
+       }
+
+       /* Follow up by the data streams */
+       cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
+               if (stream->net_seq_idx == net_seq_idx) {
+                       uatomic_set(&stream->endpoint_status, status);
+                       DBG("Delete flag set to data stream %d", stream->wait_fd);
+               }
+       }
+       rcu_read_unlock();
+}
+
+/*
+ * Cleanup a relayd object by flagging every associated streams for deletion,
+ * destroying the object meaning removing it from the relayd hash table,
+ * closing the sockets and freeing the memory in a RCU call.
+ *
+ * If a local data context is available, notify the threads that the streams'
+ * state have changed.
+ */
+static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
+               struct lttng_consumer_local_data *ctx)
+{
+       int netidx;
+
+       assert(relayd);
+
+       /* Save the net sequence index before destroying the object */
+       netidx = relayd->net_seq_idx;
+
+       /*
+        * Delete the relayd from the relayd hash table, close the sockets and free
+        * the object in a RCU call.
+        */
+       destroy_relayd(relayd);
+
+       /* Set inactive endpoint to all streams */
+       update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
+
+       /*
+        * With a local data context, notify the threads that the streams' state
+        * have changed. The write() action on the pipe acts as an "implicit"
+        * memory barrier ordering the updates of the end point status from the
+        * read of this status which happens AFTER receiving this notify.
+        */
+       if (ctx) {
+               notify_thread_pipe(ctx->consumer_data_pipe[1]);
+               notify_thread_pipe(ctx->consumer_metadata_pipe[1]);
+       }
+}
+
 /*
  * Flag a relayd socket pair for destruction. Destroy it if the refcount
  * reaches zero.
@@ -251,11 +345,14 @@ void consumer_del_stream(struct lttng_consumer_stream *stream,
 
        assert(stream);
 
+       DBG("Consumer del stream %d", stream->wait_fd);
+
        if (ht == NULL) {
                /* Means the stream was allocated but not successfully added */
                goto free_stream;
        }
 
+       pthread_mutex_lock(&stream->lock);
        pthread_mutex_lock(&consumer_data.lock);
 
        switch (consumer_data.type) {
@@ -349,6 +446,7 @@ void consumer_del_stream(struct lttng_consumer_stream *stream,
 end:
        consumer_data.need_update = 1;
        pthread_mutex_unlock(&consumer_data.lock);
+       pthread_mutex_unlock(&stream->lock);
 
        if (free_chan) {
                consumer_del_channel(free_chan);
@@ -804,7 +902,17 @@ static int consumer_update_poll_array(
        DBG("Updating poll fd array");
        rcu_read_lock();
        cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
-               if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
+               /*
+                * Only active streams with an active end point can be added to the
+                * poll set and local stream storage of the thread.
+                *
+                * There is a potential race here for endpoint_status to be updated
+                * just after the check. However, this is OK since the stream(s) will
+                * be deleted once the thread is notified that the end point state has
+                * changed where this function will be called back again.
+                */
+               if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
+                               stream->endpoint_status) {
                        continue;
                }
                DBG("Active FD %d", stream->wait_fd);
@@ -923,6 +1031,8 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
        if (ret < 0) {
                PERROR("write consumer quit");
        }
+
+       DBG("Consumer flag that it should quit");
 }
 
 void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
@@ -1084,6 +1194,8 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
 {
        int ret;
 
+       DBG("Consumer destroying it. Closing everything.");
+
        ret = close(ctx->consumer_error_socket);
        if (ret) {
                PERROR("close");
@@ -1165,6 +1277,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        /* Default is on the disk */
        int outfd = stream->out_fd;
        struct consumer_relayd_sock_pair *relayd = NULL;
+       unsigned int relayd_hang_up = 0;
 
        /* RCU lock for the relayd pointer */
        rcu_read_lock();
@@ -1224,11 +1337,22 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                                ret = write_relayd_metadata_id(outfd, stream, relayd, padding);
                                if (ret < 0) {
                                        written = ret;
+                                       /* Socket operation failed. We consider the relayd dead */
+                                       if (ret == -EPIPE || ret == -EINVAL) {
+                                               relayd_hang_up = 1;
+                                               goto write_error;
+                                       }
                                        goto end;
                                }
                        }
+               } else {
+                       /* Socket operation failed. We consider the relayd dead */
+                       if (ret == -EPIPE || ret == -EINVAL) {
+                               relayd_hang_up = 1;
+                               goto write_error;
+                       }
+                       /* Else, use the default set before which is the filesystem. */
                }
-               /* Else, use the default set before which is the filesystem. */
        } else {
                /* No streaming, we have to set the len with the full padding */
                len += padding;
@@ -1244,6 +1368,11 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                        if (written == 0) {
                                written = ret;
                        }
+                       /* Socket operation failed. We consider the relayd dead */
+                       if (errno == EPIPE || errno == EINVAL) {
+                               relayd_hang_up = 1;
+                               goto write_error;
+                       }
                        goto end;
                } else if (ret > len) {
                        PERROR("Error in file write (ret %zd > len %lu)", ret, len);
@@ -1265,12 +1394,21 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        }
        lttng_consumer_sync_trace_file(stream, orig_offset);
 
+write_error:
+       /*
+        * This is a special case that the relayd has closed its socket. Let's
+        * cleanup the relayd object and all associated streams.
+        */
+       if (relayd && relayd_hang_up) {
+               cleanup_relayd(relayd, ctx);
+       }
+
 end:
-       pthread_mutex_unlock(&stream->lock);
        /* Unlock only if ctrl socket used */
        if (relayd && stream->metadata_flag) {
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        }
+       pthread_mutex_unlock(&stream->lock);
 
        rcu_read_unlock();
        return written;
@@ -1294,6 +1432,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
        int outfd = stream->out_fd;
        struct consumer_relayd_sock_pair *relayd = NULL;
        int *splice_pipe;
+       unsigned int relayd_hang_up = 0;
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -1346,6 +1485,12 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                                        padding);
                        if (ret < 0) {
                                written = ret;
+                               /* Socket operation failed. We consider the relayd dead */
+                               if (ret == -EBADF) {
+                                       WARN("Remote relayd disconnected. Stopping");
+                                       relayd_hang_up = 1;
+                                       goto write_error;
+                               }
                                goto end;
                        }
 
@@ -1357,7 +1502,12 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                        /* Use the returned socket. */
                        outfd = ret;
                } else {
-                       ERR("Remote relayd disconnected. Stopping");
+                       /* Socket operation failed. We consider the relayd dead */
+                       if (ret == -EBADF) {
+                               WARN("Remote relayd disconnected. Stopping");
+                               relayd_hang_up = 1;
+                               goto write_error;
+                       }
                        goto end;
                }
        } else {
@@ -1406,6 +1556,12 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                        if (written == 0) {
                                written = ret_splice;
                        }
+                       /* Socket operation failed. We consider the relayd dead */
+                       if (errno == EBADF) {
+                               WARN("Remote relayd disconnected. Stopping");
+                               relayd_hang_up = 1;
+                               goto write_error;
+                       }
                        ret = errno;
                        goto splice_error;
                } else if (ret_splice > len) {
@@ -1433,12 +1589,20 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
 
        goto end;
 
+write_error:
+       /*
+        * This is a special case that the relayd has closed its socket. Let's
+        * cleanup the relayd object and all associated streams.
+        */
+       if (relayd && relayd_hang_up) {
+               cleanup_relayd(relayd, ctx);
+               /* Skip splice error so the consumer does not fail */
+               goto end;
+       }
+
 splice_error:
        /* send the appropriate error description to sessiond */
        switch (ret) {
-       case EBADF:
-               lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EBADF);
-               break;
        case EINVAL:
                lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
                break;
@@ -1451,10 +1615,10 @@ splice_error:
        }
 
 end:
-       pthread_mutex_unlock(&stream->lock);
        if (relayd && stream->metadata_flag) {
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        }
+       pthread_mutex_unlock(&stream->lock);
 
        rcu_read_unlock();
        return written;
@@ -1600,6 +1764,8 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
                goto free_stream;
        }
 
+       pthread_mutex_lock(&stream->lock);
+
        pthread_mutex_lock(&consumer_data.lock);
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -1691,6 +1857,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
 
 end:
        pthread_mutex_unlock(&consumer_data.lock);
+       pthread_mutex_unlock(&stream->lock);
 
        if (free_chan) {
                consumer_del_channel(free_chan);
@@ -1761,6 +1928,59 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
        return ret;
 }
 
+/*
+ * Delete data stream that are flagged for deletion (endpoint_status).
+ */
+static void validate_endpoint_status_data_stream(void)
+{
+       struct lttng_ht_iter iter;
+       struct lttng_consumer_stream *stream;
+
+       DBG("Consumer delete flagged data stream");
+
+       rcu_read_lock();
+       cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
+               /* Validate delete flag of the stream */
+               if (!stream->endpoint_status) {
+                       continue;
+               }
+               /* Delete it right now */
+               consumer_del_stream(stream, data_ht);
+       }
+       rcu_read_unlock();
+}
+
+/*
+ * Delete metadata stream that are flagged for deletion (endpoint_status).
+ */
+static void validate_endpoint_status_metadata_stream(
+               struct lttng_poll_event *pollset)
+{
+       struct lttng_ht_iter iter;
+       struct lttng_consumer_stream *stream;
+
+       DBG("Consumer delete flagged metadata stream");
+
+       assert(pollset);
+
+       rcu_read_lock();
+       cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
+               /* Validate delete flag of the stream */
+               if (!stream->endpoint_status) {
+                       continue;
+               }
+               /*
+                * Remove from pollset so the metadata thread can continue without
+                * blocking on a deleted stream.
+                */
+               lttng_poll_del(pollset, stream->wait_fd);
+
+               /* Delete it right now */
+               consumer_del_metadata_stream(stream, metadata_ht);
+       }
+       rcu_read_unlock();
+}
+
 /*
  * Thread polls on metadata file descriptor and write them on disk or on the
  * network.
@@ -1852,6 +2072,13 @@ restart:
                                                continue;
                                        }
 
+                                       /* A NULL stream means that the state has changed. */
+                                       if (stream == NULL) {
+                                               /* Check for deleted streams. */
+                                               validate_endpoint_status_metadata_stream(&events);
+                                               continue;
+                                       }
+
                                        DBG("Adding metadata stream %d to poll set",
                                                        stream->wait_fd);
 
@@ -1916,8 +2143,9 @@ restart:
                                len = ctx->on_buffer_ready(stream, ctx);
                                /* It's ok to have an unavailable sub-buffer */
                                if (len < 0 && len != -EAGAIN && len != -ENODATA) {
-                                       rcu_read_unlock();
-                                       goto end;
+                                       /* Clean up stream from consumer and free it. */
+                                       lttng_poll_del(&events, stream->wait_fd);
+                                       consumer_del_metadata_stream(stream, metadata_ht);
                                } else if (len > 0) {
                                        stream->data_read = 1;
                                }
@@ -2058,6 +2286,7 @@ void *consumer_thread_data_poll(void *data)
                         * waking us up to test it.
                         */
                        if (new_stream == NULL) {
+                               validate_endpoint_status_data_stream();
                                continue;
                        }
 
@@ -2084,7 +2313,8 @@ void *consumer_thread_data_poll(void *data)
                                len = ctx->on_buffer_ready(local_stream[i], ctx);
                                /* it's ok to have an unavailable sub-buffer */
                                if (len < 0 && len != -EAGAIN && len != -ENODATA) {
-                                       goto end;
+                                       /* Clean the stream and free it. */
+                                       consumer_del_stream(local_stream[i], data_ht);
                                } else if (len > 0) {
                                        local_stream[i]->data_read = 1;
                                }
@@ -2107,7 +2337,8 @@ void *consumer_thread_data_poll(void *data)
                                len = ctx->on_buffer_ready(local_stream[i], ctx);
                                /* it's ok to have an unavailable sub-buffer */
                                if (len < 0 && len != -EAGAIN && len != -ENODATA) {
-                                       goto end;
+                                       /* Clean the stream and free it. */
+                                       consumer_del_stream(local_stream[i], data_ht);
                                } else if (len > 0) {
                                        local_stream[i]->data_read = 1;
                                }
@@ -2294,14 +2525,9 @@ end:
 
        /*
         * Notify the data poll thread to poll back again and test the
-        * consumer_quit state to quit gracefully.
+        * consumer_quit state that we just set so to quit gracefully.
         */
-       do {
-               struct lttng_consumer_stream *null_stream = NULL;
-
-               ret = write(ctx->consumer_data_pipe[1], &null_stream,
-                               sizeof(null_stream));
-       } while (ret < 0 && errno == EINTR);
+       notify_thread_pipe(ctx->consumer_data_pipe[1]);
 
        rcu_unregister_thread();
        return NULL;
@@ -2445,6 +2671,34 @@ error:
        return ret;
 }
 
+/*
+ * Try to lock the stream mutex.
+ *
+ * On success, 1 is returned else 0 indicating that the mutex is NOT lock.
+ */
+static int stream_try_lock(struct lttng_consumer_stream *stream)
+{
+       int ret;
+
+       assert(stream);
+
+       /*
+        * Try to lock the stream mutex. On failure, we know that the stream is
+        * being used else where hence there is data still being extracted.
+        */
+       ret = pthread_mutex_trylock(&stream->lock);
+       if (ret) {
+               /* For both EBUSY and EINVAL error, the mutex is NOT locked. */
+               ret = 0;
+               goto end;
+       }
+
+       ret = 1;
+
+end:
+       return ret;
+}
+
 /*
  * Check if for a given session id there is still data needed to be extract
  * from the buffers.
@@ -2462,6 +2716,7 @@ int consumer_data_available(uint64_t id)
 
        DBG("Consumer data available command on session id %" PRIu64, id);
 
+       rcu_read_lock();
        pthread_mutex_lock(&consumer_data.lock);
 
        switch (consumer_data.type) {
@@ -2477,8 +2732,6 @@ int consumer_data_available(uint64_t id)
                assert(0);
        }
 
-       rcu_read_lock();
-
        /* Ease our life a bit */
        ht = consumer_data.stream_list_ht;
 
@@ -2486,17 +2739,42 @@ int consumer_data_available(uint64_t id)
                        ht->hash_fct((void *)((unsigned long) id), 0x42UL),
                        ht->match_fct, (void *)((unsigned long) id),
                        &iter.iter, stream, node_session_id.node) {
-               /* Check the stream for data. */
-               ret = data_available(stream);
-               if (ret == 0) {
+               /* If this call fails, the stream is being used hence data pending. */
+               ret = stream_try_lock(stream);
+               if (!ret) {
                        goto data_not_available;
                }
 
+               /*
+                * A removed node from the hash table indicates that the stream has
+                * been deleted thus having a guarantee that the buffers are closed
+                * on the consumer side. However, data can still be transmitted
+                * over the network so don't skip the relayd check.
+                */
+               ret = cds_lfht_is_node_deleted(&stream->node.node);
+               if (!ret) {
+                       /* Check the stream if there is data in the buffers. */
+                       ret = data_available(stream);
+                       if (ret == 0) {
+                               pthread_mutex_unlock(&stream->lock);
+                               goto data_not_available;
+                       }
+               }
+
+               /* Relayd check */
                if (stream->net_seq_idx != -1) {
                        relayd = consumer_find_relayd(stream->net_seq_idx);
-                       assert(relayd);
+                       if (!relayd) {
+                               /*
+                                * At this point, if the relayd object is not available for the
+                                * given stream, it is because the relayd is being cleaned up
+                                * so every stream associated with it (for a session id value)
+                                * are or will be marked for deletion hence no data pending.
+                                */
+                               pthread_mutex_unlock(&stream->lock);
+                               goto data_not_available;
+                       }
 
-                       pthread_mutex_lock(&stream->lock);
                        pthread_mutex_lock(&relayd->ctrl_sock_mutex);
                        if (stream->metadata_flag) {
                                ret = relayd_quiescent_control(&relayd->control_sock);
@@ -2505,11 +2783,12 @@ int consumer_data_available(uint64_t id)
                                                stream->relayd_stream_id, stream->next_net_seq_num);
                        }
                        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-                       pthread_mutex_unlock(&stream->lock);
                        if (ret == 0) {
+                               pthread_mutex_unlock(&stream->lock);
                                goto data_not_available;
                        }
                }
+               pthread_mutex_unlock(&stream->lock);
        }
 
        /*
This page took 0.029864 seconds and 4 git commands to generate.