stream->stream_handle = stream_handle;
stream->prev_seq = -1ULL;
+ stream->last_net_seq_num = -1ULL;
stream->ctf_stream_id = -1ULL;
stream->tracefile_size = tracefile_size;
stream->tracefile_count = tracefile_count;
ret = -1;
goto end;
}
+ stream->tfa = tracefile_array_create(stream->tracefile_count);
+ if (!stream->tfa) {
+ ret = -1;
+ goto end;
+ }
if (stream->tracefile_size) {
DBG("Tracefile %s/%s_0 created", stream->path_name, stream->channel_name);
} else {
* side of the relayd does not have the concept of session.
*/
lttng_ht_add_unique_u64(relay_streams_ht, &stream->node);
+ stream->in_stream_ht = true;
DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name,
stream->stream_handle);
}
/*
- * Only called from destroy. No stream lock needed, since there is a
- * single user at this point. This is ensured by having the refcount
- * reaching 0.
+ * Stream must be protected by holding the stream lock or by virtue of being
+ * called from stream_destroy, in which case it is guaranteed to be accessed
+ * from a single thread by the reflock.
*/
static void stream_unpublish(struct relay_stream *stream)
{
- if (!stream->published) {
- return;
+ if (stream->in_stream_ht) {
+ struct lttng_ht_iter iter;
+ int ret;
+
+ iter.iter.node = &stream->node.node;
+ ret = lttng_ht_del(relay_streams_ht, &iter);
+ assert(!ret);
+ stream->in_stream_ht = false;
+ }
+ if (stream->published) {
+ pthread_mutex_lock(&stream->trace->stream_list_lock);
+ cds_list_del_rcu(&stream->stream_node);
+ pthread_mutex_unlock(&stream->trace->stream_list_lock);
+ stream->published = false;
}
- pthread_mutex_lock(&stream->trace->stream_list_lock);
- cds_list_del_rcu(&stream->stream_node);
- pthread_mutex_unlock(&stream->trace->stream_list_lock);
-
- stream->published = false;
}
static void stream_destroy(struct relay_stream *stream)
{
if (stream->indexes_ht) {
+ /*
+ * Calling lttng_ht_destroy in call_rcu worker thread so
+ * we don't hold the RCU read-side lock while calling
+ * it.
+ */
lttng_ht_destroy(stream->indexes_ht);
}
+ if (stream->tfa) {
+ tracefile_array_destroy(stream->tfa);
+ }
free(stream->path_name);
free(stream->channel_name);
free(stream);
struct relay_stream *stream =
caa_container_of(ref, struct relay_stream, ref);
struct relay_session *session;
- int ret;
- struct lttng_ht_iter iter;
session = stream->trace->session;
}
pthread_mutex_unlock(&session->recv_list_lock);
- iter.iter.node = &stream->node.node;
- ret = lttng_ht_del(relay_streams_ht, &iter);
- assert(!ret);
-
stream_unpublish(stream);
if (stream->stream_fd) {
rcu_read_unlock();
}
-void stream_close(struct relay_stream *stream)
+void try_stream_close(struct relay_stream *stream)
{
- DBG("closing stream %" PRIu64, stream->stream_handle);
+ DBG("Trying to close stream %" PRIu64, stream->stream_handle);
pthread_mutex_lock(&stream->lock);
+ /*
+ * Can be called concurently by connection close and reception of last
+ * pending data.
+ */
+ if (stream->closed) {
+ pthread_mutex_unlock(&stream->lock);
+ DBG("closing stream %" PRIu64 " aborted since it is already marked as closed", stream->stream_handle);
+ return;
+ }
+
+ stream->close_requested = true;
+
+ if (stream->last_net_seq_num == -1ULL) {
+ /*
+ * Handle connection close without explicit stream close
+ * command.
+ *
+ * We can be clever about indexes partially received in
+ * cases where we received the data socket part, but not
+ * the control socket part: since we're currently closing
+ * the stream on behalf of the control socket, we *know*
+ * there won't be any more control information for this
+ * socket. Therefore, we can destroy all indexes for
+ * which we have received only the file descriptor (from
+ * data socket). This takes care of consumerd crashes
+ * between sending the data and control information for
+ * a packet. Since those are sent in that order, we take
+ * care of consumerd crashes.
+ */
+ relay_index_close_partial_fd(stream);
+ /*
+ * Use the highest net_seq_num we currently have pending
+ * As end of stream indicator. Leave last_net_seq_num
+ * at -1ULL if we cannot find any index.
+ */
+ stream->last_net_seq_num = relay_index_find_last(stream);
+ /* Fall-through into the next check. */
+ }
+
+ if (stream->last_net_seq_num != -1ULL &&
+ ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0) {
+ /*
+ * Don't close since we still have data pending. This
+ * handles cases where an explicit close command has
+ * been received for this stream, and cases where the
+ * connection has been closed, and we are awaiting for
+ * index information from the data socket. It is
+ * therefore expected that all the index fd information
+ * we need has already been received on the control
+ * socket. Matching index information from data socket
+ * should be Expected Soon(TM).
+ *
+ * TODO: We should implement a timer to garbage collect
+ * streams after a timeout to be resilient against a
+ * consumerd implementation that would not match this
+ * expected behavior.
+ */
+ pthread_mutex_unlock(&stream->lock);
+ DBG("closing stream %" PRIu64 " aborted since it still has data pending", stream->stream_handle);
+ return;
+ }
+ /*
+ * We received all the indexes we can expect.
+ */
+ stream_unpublish(stream);
+ stream->closed = true;
+ /* Relay indexes are only used by the "consumer/sessiond" end. */
relay_index_close_all(stream);
pthread_mutex_unlock(&stream->lock);
+ DBG("Succeeded in closing stream %" PRIu64, stream->stream_handle);
stream_put(stream);
}
+static void print_stream_indexes(struct relay_stream *stream)
+{
+ struct lttng_ht_iter iter;
+ struct relay_index *index;
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, index,
+ index_n.node) {
+ DBG("index %p net_seq_num %" PRIu64 " refcount %ld"
+ " stream %" PRIu64 " trace %" PRIu64
+ " session %" PRIu64,
+ index,
+ index->index_n.key,
+ stream->ref.refcount,
+ index->stream->stream_handle,
+ index->stream->trace->id,
+ index->stream->trace->session->id);
+ }
+ rcu_read_unlock();
+}
+
void print_relay_streams(void)
{
struct lttng_ht_iter iter;
stream->stream_handle,
stream->trace->id,
stream->trace->session->id);
+ print_stream_indexes(stream);
stream_put(stream);
}
rcu_read_unlock();