Fix: Wait for in-flight data before closing a stream
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Sat, 5 Sep 2015 02:04:12 +0000 (22:04 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Sat, 5 Sep 2015 15:23:29 +0000 (11:23 -0400)
A stream's closing conditions are evaluated in three places:
    1) When a close command is received
    2) When the control connection owning it is closed
    3) The stream has received all of its data following
       a close request.

These checks are performed in try_stream_close().

A known downside of this approach is that a stream will never
be closed if it has not received all of its data.

Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-relayd/ctf-trace.c
src/bin/lttng-relayd/main.c
src/bin/lttng-relayd/session.c
src/bin/lttng-relayd/stream.c
src/bin/lttng-relayd/stream.h

index d965cec8caa72aa9dcf771282f399a63868a27ab..7c882620e36fb11d8e2030b724df7ac64b4b2f89 100644 (file)
@@ -183,9 +183,10 @@ int ctf_trace_close(struct ctf_trace *trace)
        cds_list_for_each_entry_rcu(stream, &trace->stream_list,
                        stream_node) {
                /*
        cds_list_for_each_entry_rcu(stream, &trace->stream_list,
                        stream_node) {
                /*
-                * Close the stream.
+                * Close stream since the connection owning the trace is being
+                * torn down.
                 */
                 */
-               stream_close(stream);
+               try_stream_close(stream);
        }
        rcu_read_unlock();
        /*
        }
        rcu_read_unlock();
        /*
index fe52702c77489209dcb43208206bb6e7dd1fb04c..e769daf88fe99b6c0e01d2006861ea8c6a804957 100644 (file)
@@ -1275,8 +1275,15 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
        stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
        pthread_mutex_unlock(&stream->lock);
 
        stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
        pthread_mutex_unlock(&stream->lock);
 
-       stream_close(stream);
-
+       /*
+        * This is one of the conditions which may trigger a stream close
+        * with the others being:
+        *     1) A close command is received for a stream
+        *     2) The control connection owning the stream is closed
+        *     3) We have received all of the stream's data _after_ a close
+        *        request.
+        */
+       try_stream_close(stream);
        if (stream->is_metadata) {
                struct relay_viewer_stream *vstream;
 
        if (stream->is_metadata) {
                struct relay_viewer_stream *vstream;
 
@@ -2156,7 +2163,7 @@ static int relay_process_data(struct relay_connection *conn)
        uint64_t net_seq_num;
        uint32_t data_size;
        struct relay_session *session;
        uint64_t net_seq_num;
        uint32_t data_size;
        struct relay_session *session;
-       bool new_stream = false;
+       bool new_stream = false, close_requested = false;
 
        ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr,
                        sizeof(struct lttcomm_relayd_data_hdr), 0);
 
        ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr,
                        sizeof(struct lttcomm_relayd_data_hdr), 0);
@@ -2275,7 +2282,12 @@ static int relay_process_data(struct relay_connection *conn)
        stream->prev_seq = net_seq_num;
 
 end_stream_unlock:
        stream->prev_seq = net_seq_num;
 
 end_stream_unlock:
+       close_requested = stream->close_requested;
        pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&stream->lock);
+       if (close_requested) {
+               try_stream_close(stream);
+       }
+
        if (new_stream) {
                pthread_mutex_lock(&session->lock);
                uatomic_set(&session->new_streams, 1);
        if (new_stream) {
                pthread_mutex_lock(&session->lock);
                uatomic_set(&session->new_streams, 1);
index 33d7d43c0f67b5814c02c5fabe1db15c69e4c059..14ae874ea782ef45624447659d988c1bd3a05258 100644 (file)
@@ -212,7 +212,8 @@ unlock:
        }
        cds_list_for_each_entry_rcu(stream, &session->recv_list,
                        recv_node) {
        }
        cds_list_for_each_entry_rcu(stream, &session->recv_list,
                        recv_node) {
-               stream_close(stream);
+               /* Close streams which have not been published yet. */
+               try_stream_close(stream);
        }
 rcu_unlock:
        rcu_read_unlock();
        }
 rcu_unlock:
        rcu_read_unlock();
index 1d19759023264cbd6d4817cf989a669cef3ac217..2a59d1ed3e166edd319209ea97065eebf81e3876 100644 (file)
@@ -93,6 +93,7 @@ struct relay_stream *stream_create(struct ctf_trace *trace,
 
        stream->stream_handle = stream_handle;
        stream->prev_seq = -1ULL;
 
        stream->stream_handle = stream_handle;
        stream->prev_seq = -1ULL;
+       stream->last_net_seq_num = -1ULL;
        stream->ctf_stream_id = -1ULL;
        stream->tracefile_size = tracefile_size;
        stream->tracefile_count = tracefile_count;
        stream->ctf_stream_id = -1ULL;
        stream->tracefile_size = tracefile_size;
        stream->tracefile_count = tracefile_count;
@@ -338,14 +339,47 @@ void stream_put(struct relay_stream *stream)
        rcu_read_unlock();
 }
 
        rcu_read_unlock();
 }
 
-void stream_close(struct relay_stream *stream)
+void try_stream_close(struct relay_stream *stream)
 {
 {
-       DBG("closing stream %" PRIu64, stream->stream_handle);
+       DBG("Trying to close stream %" PRIu64, stream->stream_handle);
        pthread_mutex_lock(&stream->lock);
        pthread_mutex_lock(&stream->lock);
+       /*
+        * Can be called concurently by connection close and reception of last
+        * pending data.
+        */
+       if (stream->closed) {
+               pthread_mutex_unlock(&stream->lock);
+               DBG("closing stream %" PRIu64 " aborted since it is already marked as closed", stream->stream_handle);
+               return;
+       }
+
+       stream->close_requested = true;
+       /*
+        * We shortcut the data pending check if no bound is known for this
+        * stream. This prevents us from never closing the stream in the case
+        * where a connection would be closed before a "close" command has
+        * been received.
+        *
+        * TODO
+        * This still leaves open the question of handling missing data after
+        * a bound has been set by a stream close command. Since we have no
+        * way of pairing data and control connection, and that a data
+        * connection has no ownership of a stream, it is likely that a
+        * timeout approach would be appropriate to handle dangling streams.
+        */
+       if (stream->last_net_seq_num != -1ULL &&
+                       ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0) {
+               /* Don't close since we still have data pending. */
+               pthread_mutex_unlock(&stream->lock);
+               DBG("closing stream %" PRIu64 " aborted since it still has data pending", stream->stream_handle);
+               return;
+       }
        stream_unpublish(stream);
        stream->closed = true;
        stream_unpublish(stream);
        stream->closed = true;
+       /* Relay indexes are only used by the "consumer/sessiond" end. */
        relay_index_close_all(stream);
        pthread_mutex_unlock(&stream->lock);
        relay_index_close_all(stream);
        pthread_mutex_unlock(&stream->lock);
+       DBG("Succeeded in closing stream %" PRIu64, stream->stream_handle);
        stream_put(stream);
 }
 
        stream_put(stream);
 }
 
index 542e05c0fb9e36b8fca3ec6b0995d8d20edb1b18..5030e5d4c47e03291b22102aaef1b50a093bd33a 100644 (file)
@@ -85,7 +85,8 @@ struct relay_stream {
         */
        struct tracefile_array *tfa;
 
         */
        struct tracefile_array *tfa;
 
-       bool closed;    /* Stream is closed. */
+       bool closed;            /* Stream is closed. */
+       bool close_requested;   /* Close command has been received. */
 
        /*
         * Counts number of indexes in indexes_ht. Redundant info.
 
        /*
         * Counts number of indexes in indexes_ht. Redundant info.
@@ -144,7 +145,7 @@ struct relay_stream *stream_create(struct ctf_trace *trace,
 struct relay_stream *stream_get_by_id(uint64_t stream_id);
 bool stream_get(struct relay_stream *stream);
 void stream_put(struct relay_stream *stream);
 struct relay_stream *stream_get_by_id(uint64_t stream_id);
 bool stream_get(struct relay_stream *stream);
 void stream_put(struct relay_stream *stream);
-void stream_close(struct relay_stream *stream);
+void try_stream_close(struct relay_stream *stream);
 void stream_publish(struct relay_stream *stream);
 void print_relay_streams(void);
 
 void stream_publish(struct relay_stream *stream);
 void print_relay_streams(void);
 
This page took 0.030047 seconds and 4 git commands to generate.