There is a data pending race involving late population of the streams in
the stream hash table, and applying flush on streams that are not yet
globally visible.
This is caused by the fact that streams are added to the hash table only
when received by the data-handling consumer thread.
This results in data_pending() incorrectly returning that there is no
data pending in some cases.
This has been discovered by adding 1s delay in read subbuffer function
for testing.
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
call_rcu(&stream->node.head, free_stream_rcu);
}
call_rcu(&stream->node.head, free_stream_rcu);
}
+/*
+ * XXX naming of del vs destroy is all mixed up.
+ */
+void consumer_del_stream_for_data(struct lttng_consumer_stream *stream)
+{
+ consumer_del_stream(stream, data_ht);
+}
+
+void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
+{
+ consumer_del_stream(stream, metadata_ht);
+}
+
struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
uint64_t stream_key,
enum lttng_consumer_stream_state state,
struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
uint64_t stream_key,
enum lttng_consumer_stream_state state,
/*
* Add a stream to the global list protected by a mutex.
*/
/*
* Add a stream to the global list protected by a mutex.
*/
-static int add_stream(struct lttng_consumer_stream *stream,
- struct lttng_ht *ht)
+int consumer_add_data_stream(struct lttng_consumer_stream *stream)
+ struct lttng_ht *ht = data_ht;
int ret = 0;
struct consumer_relayd_sock_pair *relayd;
int ret = 0;
struct consumer_relayd_sock_pair *relayd;
+void consumer_del_data_stream(struct lttng_consumer_stream *stream)
+{
+ consumer_del_stream(stream, data_ht);
+}
+
/*
* Add relayd socket to global consumer data hashtable. RCU read side lock MUST
* be acquired before calling this.
/*
* Add relayd socket to global consumer data hashtable. RCU read side lock MUST
* be acquired before calling this.
* Action done with the metadata stream when adding it to the consumer internal
* data structures to handle it.
*/
* Action done with the metadata stream when adding it to the consumer internal
* data structures to handle it.
*/
-static int add_metadata_stream(struct lttng_consumer_stream *stream,
- struct lttng_ht *ht)
+int consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
+ struct lttng_ht *ht = metadata_ht;
int ret = 0;
struct consumer_relayd_sock_pair *relayd;
struct lttng_ht_iter iter;
int ret = 0;
struct consumer_relayd_sock_pair *relayd;
struct lttng_ht_iter iter;
DBG("Adding metadata stream %d to poll set",
stream->wait_fd);
DBG("Adding metadata stream %d to poll set",
stream->wait_fd);
- ret = add_metadata_stream(stream, metadata_ht);
- if (ret) {
- ERR("Unable to add metadata stream");
- /* Stream was not setup properly. Continuing. */
- consumer_del_metadata_stream(stream, NULL);
- continue;
- }
-
/* Add metadata stream to the global poll events list */
lttng_poll_add(&events, stream->wait_fd,
LPOLLIN | LPOLLPRI);
/* Add metadata stream to the global poll events list */
lttng_poll_add(&events, stream->wait_fd,
LPOLLIN | LPOLLPRI);
- ret = add_stream(new_stream, data_ht);
- if (ret) {
- ERR("Consumer add stream %" PRIu64 " failed. Continuing",
- new_stream->key);
- /*
- * At this point, if the add_stream fails, it is not in the
- * hash table thus passing the NULL value here.
- */
- consumer_del_stream(new_stream, NULL);
- }
-
/* Continue to update the local streams and handle prio ones */
continue;
}
/* Continue to update the local streams and handle prio ones */
continue;
}
struct lttng_consumer_channel *channel);
void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
uint64_t key);
struct lttng_consumer_channel *channel);
void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
uint64_t key);
+int consumer_add_data_stream(struct lttng_consumer_stream *stream);
+void consumer_del_stream_for_data(struct lttng_consumer_stream *stream);
+int consumer_add_metadata_stream(struct lttng_consumer_stream *stream);
+void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream);
#endif /* LIB_CONSUMER_H */
#endif /* LIB_CONSUMER_H */
/* Get the right pipe where the stream will be sent. */
if (new_stream->metadata_flag) {
/* Get the right pipe where the stream will be sent. */
if (new_stream->metadata_flag) {
+ ret = consumer_add_metadata_stream(new_stream);
+ if (ret) {
+ ERR("Consumer add metadata stream %" PRIu64 " failed. Continuing",
+ new_stream->key);
+ consumer_del_stream(new_stream, NULL);
+ goto end_nosignal;
+ }
stream_pipe = ctx->consumer_metadata_pipe;
} else {
stream_pipe = ctx->consumer_metadata_pipe;
} else {
+ ret = consumer_add_data_stream(new_stream);
+ if (ret) {
+ ERR("Consumer add stream %" PRIu64 " failed. Continuing",
+ new_stream->key);
+ consumer_del_stream(new_stream, NULL);
+ goto end_nosignal;
+ }
stream_pipe = ctx->consumer_data_pipe;
}
stream_pipe = ctx->consumer_data_pipe;
}
ERR("Consumer write %s stream to pipe %d",
new_stream->metadata_flag ? "metadata" : "data",
lttng_pipe_get_writefd(stream_pipe));
ERR("Consumer write %s stream to pipe %d",
new_stream->metadata_flag ? "metadata" : "data",
lttng_pipe_get_writefd(stream_pipe));
- consumer_del_stream(new_stream, NULL);
+ if (new_stream->metadata_flag) {
+ consumer_del_stream_for_metadata(new_stream);
+ } else {
+ consumer_del_stream_for_data(new_stream);
+ }
/* Get the right pipe where the stream will be sent. */
if (stream->metadata_flag) {
/* Get the right pipe where the stream will be sent. */
if (stream->metadata_flag) {
+ ret = consumer_add_metadata_stream(stream);
+ if (ret) {
+ ERR("Consumer add metadata stream %" PRIu64 " failed.",
+ stream->key);
+ goto error;
+ }
stream_pipe = ctx->consumer_metadata_pipe;
} else {
stream_pipe = ctx->consumer_metadata_pipe;
} else {
+ ret = consumer_add_data_stream(stream);
+ if (ret) {
+ ERR("Consumer add stream %" PRIu64 " failed.",
+ stream->key);
+ goto error;
+ }
stream_pipe = ctx->consumer_data_pipe;
}
stream_pipe = ctx->consumer_data_pipe;
}
ERR("Consumer write %s stream to pipe %d",
stream->metadata_flag ? "metadata" : "data",
lttng_pipe_get_writefd(stream_pipe));
ERR("Consumer write %s stream to pipe %d",
stream->metadata_flag ? "metadata" : "data",
lttng_pipe_get_writefd(stream_pipe));
+ if (stream->metadata_flag) {
+ consumer_del_stream_for_metadata(stream);
+ } else {
+ consumer_del_stream_for_data(stream);
+ }
* If we are unable to send the stream to the thread, there is
* a big problem so just stop everything.
*/
* If we are unable to send the stream to the thread, there is
* a big problem so just stop everything.
*/
+ /* Remove node from the channel stream list. */
+ cds_list_del(&stream->send_node);