Fix: relayd: don't call lttng_ht_destroy in RCU read-side C.S.
[lttng-tools.git] / src / bin / lttng-relayd / stream.c
index 3f76b1e6a63cb6093eafb443689f087d2805fdb1..8825d094ce57b389662667bfd5d567ca0ed77cfd 100644 (file)
@@ -93,6 +93,7 @@ struct relay_stream *stream_create(struct ctf_trace *trace,
 
        stream->stream_handle = stream_handle;
        stream->prev_seq = -1ULL;
+       stream->last_net_seq_num = -1ULL;
        stream->ctf_stream_id = -1ULL;
        stream->tracefile_size = tracefile_size;
        stream->tracefile_count = tracefile_count;
@@ -137,6 +138,11 @@ struct relay_stream *stream_create(struct ctf_trace *trace,
                ret = -1;
                goto end;
        }
+       stream->tfa = tracefile_array_create(stream->tracefile_count);
+       if (!stream->tfa) {
+               ret = -1;
+               goto end;
+       }
        if (stream->tracefile_size) {
                DBG("Tracefile %s/%s_0 created", stream->path_name, stream->channel_name);
        } else {
@@ -163,6 +169,7 @@ struct relay_stream *stream_create(struct ctf_trace *trace,
         * side of the relayd does not have the concept of session.
         */
        lttng_ht_add_unique_u64(relay_streams_ht, &stream->node);
+       stream->in_stream_ht = true;
 
        DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name,
                        stream->stream_handle);
@@ -220,27 +227,42 @@ unlock:
 }
 
 /*
- * Only called from destroy. No stream lock needed, since there is a
- * single user at this point. This is ensured by having the refcount
- * reaching 0.
+ * Stream must be protected by holding the stream lock or by virtue of being
+ * called from stream_destroy, in which case it is guaranteed to be accessed
+ * from a single thread by the reflock.
  */
 static void stream_unpublish(struct relay_stream *stream)
 {
-       if (!stream->published) {
-               return;
+       if (stream->in_stream_ht) {
+               struct lttng_ht_iter iter;
+               int ret;
+
+               iter.iter.node = &stream->node.node;
+               ret = lttng_ht_del(relay_streams_ht, &iter);
+               assert(!ret);
+               stream->in_stream_ht = false;
+       }
+       if (stream->published) {
+               pthread_mutex_lock(&stream->trace->stream_list_lock);
+               cds_list_del_rcu(&stream->stream_node);
+               pthread_mutex_unlock(&stream->trace->stream_list_lock);
+               stream->published = false;
        }
-       pthread_mutex_lock(&stream->trace->stream_list_lock);
-       cds_list_del_rcu(&stream->stream_node);
-       pthread_mutex_unlock(&stream->trace->stream_list_lock);
-
-       stream->published = false;
 }
 
 static void stream_destroy(struct relay_stream *stream)
 {
        if (stream->indexes_ht) {
+               /*
+                * Calling lttng_ht_destroy in call_rcu worker thread so
+                * we don't hold the RCU read-side lock while calling
+                * it.
+                */
                lttng_ht_destroy(stream->indexes_ht);
        }
+       if (stream->tfa) {
+               tracefile_array_destroy(stream->tfa);
+       }
        free(stream->path_name);
        free(stream->channel_name);
        free(stream);
@@ -266,8 +288,6 @@ static void stream_release(struct urcu_ref *ref)
        struct relay_stream *stream =
                caa_container_of(ref, struct relay_stream, ref);
        struct relay_session *session;
-       int ret;
-       struct lttng_ht_iter iter;
 
        session = stream->trace->session;
 
@@ -281,10 +301,6 @@ static void stream_release(struct urcu_ref *ref)
        }
        pthread_mutex_unlock(&session->recv_list_lock);
 
-       iter.iter.node = &stream->node.node;
-       ret = lttng_ht_del(relay_streams_ht, &iter);
-       assert(!ret);
-
        stream_unpublish(stream);
 
        if (stream->stream_fd) {
@@ -328,16 +344,104 @@ void stream_put(struct relay_stream *stream)
        rcu_read_unlock();
 }
 
-void stream_close(struct relay_stream *stream)
+void try_stream_close(struct relay_stream *stream)
 {
-       DBG("closing stream %" PRIu64, stream->stream_handle);
+       DBG("Trying to close stream %" PRIu64, stream->stream_handle);
        pthread_mutex_lock(&stream->lock);
+       /*
+        * Can be called concurently by connection close and reception of last
+        * pending data.
+        */
+       if (stream->closed) {
+               pthread_mutex_unlock(&stream->lock);
+               DBG("closing stream %" PRIu64 " aborted since it is already marked as closed", stream->stream_handle);
+               return;
+       }
+
+       stream->close_requested = true;
+
+       if (stream->last_net_seq_num == -1ULL) {
+               /*
+                * Handle connection close without explicit stream close
+                * command.
+                *
+                * We can be clever about indexes partially received in
+                * cases where we received the data socket part, but not
+                * the control socket part: since we're currently closing
+                * the stream on behalf of the control socket, we *know*
+                * there won't be any more control information for this
+                * socket. Therefore, we can destroy all indexes for
+                * which we have received only the file descriptor (from
+                * data socket). This takes care of consumerd crashes
+                * between sending the data and control information for
+                * a packet. Since those are sent in that order, we take
+                * care of consumerd crashes.
+                */
+               relay_index_close_partial_fd(stream);
+               /*
+                * Use the highest net_seq_num we currently have pending
+                * As end of stream indicator.  Leave last_net_seq_num
+                * at -1ULL if we cannot find any index.
+                */
+               stream->last_net_seq_num = relay_index_find_last(stream);
+               /* Fall-through into the next check. */
+       }
+
+       if (stream->last_net_seq_num != -1ULL &&
+                       ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0) {
+               /*
+                * Don't close since we still have data pending. This
+                * handles cases where an explicit close command has
+                * been received for this stream, and cases where the
+                * connection has been closed, and we are awaiting for
+                * index information from the data socket. It is
+                * therefore expected that all the index fd information
+                * we need has already been received on the control
+                * socket. Matching index information from data socket
+                * should be Expected Soon(TM).
+                *
+                * TODO: We should implement a timer to garbage collect
+                * streams after a timeout to be resilient against a
+                * consumerd implementation that would not match this
+                * expected behavior.
+                */
+               pthread_mutex_unlock(&stream->lock);
+               DBG("closing stream %" PRIu64 " aborted since it still has data pending", stream->stream_handle);
+               return;
+       }
+       /*
+        * We received all the indexes we can expect.
+        */
+       stream_unpublish(stream);
        stream->closed = true;
+       /* Relay indexes are only used by the "consumer/sessiond" end. */
        relay_index_close_all(stream);
        pthread_mutex_unlock(&stream->lock);
+       DBG("Succeeded in closing stream %" PRIu64, stream->stream_handle);
        stream_put(stream);
 }
 
+static void print_stream_indexes(struct relay_stream *stream)
+{
+       struct lttng_ht_iter iter;
+       struct relay_index *index;
+
+       rcu_read_lock();
+       cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, index,
+                       index_n.node) {
+               DBG("index %p net_seq_num %" PRIu64 " refcount %ld"
+                               " stream %" PRIu64 " trace %" PRIu64
+                               " session %" PRIu64,
+                               index,
+                               index->index_n.key,
+                               stream->ref.refcount,
+                               index->stream->stream_handle,
+                               index->stream->trace->id,
+                               index->stream->trace->session->id);
+       }
+       rcu_read_unlock();
+}
+
 void print_relay_streams(void)
 {
        struct lttng_ht_iter iter;
@@ -356,6 +460,7 @@ void print_relay_streams(void)
                                stream->stream_handle,
                                stream->trace->id,
                                stream->trace->session->id);
+               print_stream_indexes(stream);
                stream_put(stream);
        }
        rcu_read_unlock();
This page took 0.025523 seconds and 4 git commands to generate.