summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
8726a04)
Fixes a race where data connection can still add indexes after close,
preventing graceful teardown of the stream.
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
+
+ /*
+ * Set last_net_seq_num before the close flag. Required by data
+ * pending check.
+ */
pthread_mutex_lock(&stream->lock);
pthread_mutex_lock(&stream->lock);
stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
+ pthread_mutex_unlock(&stream->lock);
+
+ stream_close(stream);
+
if (stream->is_metadata) {
struct relay_viewer_stream *vstream;
if (stream->is_metadata) {
struct relay_viewer_stream *vstream;
viewer_stream_put(vstream);
}
}
viewer_stream_put(vstream);
}
}
- pthread_mutex_unlock(&stream->lock);
* side of the relayd does not have the concept of session.
*/
lttng_ht_add_unique_u64(relay_streams_ht, &stream->node);
* side of the relayd does not have the concept of session.
*/
lttng_ht_add_unique_u64(relay_streams_ht, &stream->node);
+ stream->in_stream_ht = true;
DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name,
stream->stream_handle);
DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name,
stream->stream_handle);
- * Only called from destroy. No stream lock needed, since there is a
- * single user at this point. This is ensured by having the refcount
- * reaching 0.
+ * Stream must be protected by holding the stream lock or by virtue of being
+ * called from stream_destroy, in which case it is guaranteed to be accessed
+ * from a single thread by the reflock.
*/
static void stream_unpublish(struct relay_stream *stream)
{
*/
static void stream_unpublish(struct relay_stream *stream)
{
- if (!stream->published) {
- return;
+ if (stream->in_stream_ht) {
+ struct lttng_ht_iter iter;
+ int ret;
+
+ iter.iter.node = &stream->node.node;
+ ret = lttng_ht_del(relay_streams_ht, &iter);
+ assert(!ret);
+ stream->in_stream_ht = false;
+ }
+ if (stream->published) {
+ pthread_mutex_lock(&stream->trace->stream_list_lock);
+ cds_list_del_rcu(&stream->stream_node);
+ pthread_mutex_unlock(&stream->trace->stream_list_lock);
+ stream->published = false;
- pthread_mutex_lock(&stream->trace->stream_list_lock);
- cds_list_del_rcu(&stream->stream_node);
- pthread_mutex_unlock(&stream->trace->stream_list_lock);
-
- stream->published = false;
}
static void stream_destroy(struct relay_stream *stream)
}
static void stream_destroy(struct relay_stream *stream)
struct relay_stream *stream =
caa_container_of(ref, struct relay_stream, ref);
struct relay_session *session;
struct relay_stream *stream =
caa_container_of(ref, struct relay_stream, ref);
struct relay_session *session;
- int ret;
- struct lttng_ht_iter iter;
session = stream->trace->session;
session = stream->trace->session;
}
pthread_mutex_unlock(&session->recv_list_lock);
}
pthread_mutex_unlock(&session->recv_list_lock);
- iter.iter.node = &stream->node.node;
- ret = lttng_ht_del(relay_streams_ht, &iter);
- assert(!ret);
-
stream_unpublish(stream);
if (stream->stream_fd) {
stream_unpublish(stream);
if (stream->stream_fd) {
{
DBG("closing stream %" PRIu64, stream->stream_handle);
pthread_mutex_lock(&stream->lock);
{
DBG("closing stream %" PRIu64, stream->stream_handle);
pthread_mutex_lock(&stream->lock);
+ stream_unpublish(stream);
stream->closed = true;
relay_index_close_all(stream);
pthread_mutex_unlock(&stream->lock);
stream->closed = true;
relay_index_close_all(stream);
pthread_mutex_unlock(&stream->lock);
* Node of stream within global stream hash table.
*/
struct lttng_ht_node_u64 node;
* Node of stream within global stream hash table.
*/
struct lttng_ht_node_u64 node;
+ bool in_stream_ht; /* is stream in stream hash table. */
struct rcu_head rcu_node; /* For call_rcu teardown. */
};
struct rcu_head rcu_node; /* For call_rcu teardown. */
};