summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
7f12ef5)
There is a data pending race involving late population of the streams in
the stream hash table, and applying flush on streams that are not yet
globally visible.
This is caused by the fact that streams are added to the hash table only
when received by the data-handling consumer thread.
This results in data_pending() incorrectly returning that there is no
data pending in some cases.
This has been discovered by adding 1s delay in read subbuffer function
for testing.
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
consumer_stream_destroy(stream, ht);
}
consumer_stream_destroy(stream, ht);
}
+/*
+ * XXX naming of del vs destroy is all mixed up.
+ */
+void consumer_del_stream_for_data(struct lttng_consumer_stream *stream)
+{
+ consumer_stream_destroy(stream, data_ht);
+}
+
+void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
+{
+ consumer_stream_destroy(stream, metadata_ht);
+}
+
struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
uint64_t stream_key,
enum lttng_consumer_stream_state state,
struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
uint64_t stream_key,
enum lttng_consumer_stream_state state,
/*
* Add a stream to the global list protected by a mutex.
*/
/*
* Add a stream to the global list protected by a mutex.
*/
-static int add_stream(struct lttng_consumer_stream *stream,
- struct lttng_ht *ht)
+int consumer_add_data_stream(struct lttng_consumer_stream *stream)
+ struct lttng_ht *ht = data_ht;
int ret = 0;
assert(stream);
int ret = 0;
assert(stream);
+void consumer_del_data_stream(struct lttng_consumer_stream *stream)
+{
+ consumer_del_stream(stream, data_ht);
+}
+
/*
* Add relayd socket to global consumer data hashtable. RCU read side lock MUST
* be acquired before calling this.
/*
* Add relayd socket to global consumer data hashtable. RCU read side lock MUST
* be acquired before calling this.
* Action done with the metadata stream when adding it to the consumer internal
* data structures to handle it.
*/
* Action done with the metadata stream when adding it to the consumer internal
* data structures to handle it.
*/
-static int add_metadata_stream(struct lttng_consumer_stream *stream,
- struct lttng_ht *ht)
+int consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
+ struct lttng_ht *ht = metadata_ht;
int ret = 0;
struct lttng_ht_iter iter;
struct lttng_ht_node_u64 *node;
int ret = 0;
struct lttng_ht_iter iter;
struct lttng_ht_node_u64 *node;
DBG("Adding metadata stream %d to poll set",
stream->wait_fd);
DBG("Adding metadata stream %d to poll set",
stream->wait_fd);
- ret = add_metadata_stream(stream, metadata_ht);
- if (ret) {
- ERR("Unable to add metadata stream");
- /* Stream was not setup properly. Continuing. */
- consumer_del_metadata_stream(stream, NULL);
- continue;
- }
-
/* Add metadata stream to the global poll events list */
lttng_poll_add(&events, stream->wait_fd,
LPOLLIN | LPOLLPRI);
/* Add metadata stream to the global poll events list */
lttng_poll_add(&events, stream->wait_fd,
LPOLLIN | LPOLLPRI);
- ret = add_stream(new_stream, data_ht);
- if (ret) {
- ERR("Consumer add stream %" PRIu64 " failed. Continuing",
- new_stream->key);
- /*
- * At this point, if the add_stream fails, it is not in the
- * hash table thus passing the NULL value here.
- */
- consumer_del_stream(new_stream, NULL);
- }
-
/* Continue to update the local streams and handle prio ones */
continue;
}
/* Continue to update the local streams and handle prio ones */
continue;
}
void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd);
unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos,
unsigned long produced_pos, uint64_t max_stream_size);
void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd);
unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos,
unsigned long produced_pos, uint64_t max_stream_size);
+int consumer_add_data_stream(struct lttng_consumer_stream *stream);
+void consumer_del_stream_for_data(struct lttng_consumer_stream *stream);
+int consumer_add_metadata_stream(struct lttng_consumer_stream *stream);
+void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream);
#endif /* LIB_CONSUMER_H */
#endif /* LIB_CONSUMER_H */
/* Get the right pipe where the stream will be sent. */
if (new_stream->metadata_flag) {
/* Get the right pipe where the stream will be sent. */
if (new_stream->metadata_flag) {
+ ret = consumer_add_metadata_stream(new_stream);
+ if (ret) {
+ ERR("Consumer add metadata stream %" PRIu64 " failed. Continuing",
+ new_stream->key);
+ consumer_stream_free(new_stream);
+ goto end_nosignal;
+ }
stream_pipe = ctx->consumer_metadata_pipe;
} else {
stream_pipe = ctx->consumer_metadata_pipe;
} else {
+ ret = consumer_add_data_stream(new_stream);
+ if (ret) {
+ ERR("Consumer add stream %" PRIu64 " failed. Continuing",
+ new_stream->key);
+ consumer_stream_free(new_stream);
+ goto end_nosignal;
+ }
stream_pipe = ctx->consumer_data_pipe;
}
stream_pipe = ctx->consumer_data_pipe;
}
+ /* Vitible to other threads */
+ new_stream->globally_visible = 1;
+
ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream));
if (ret < 0) {
ERR("Consumer write %s stream to pipe %d",
new_stream->metadata_flag ? "metadata" : "data",
lttng_pipe_get_writefd(stream_pipe));
ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream));
if (ret < 0) {
ERR("Consumer write %s stream to pipe %d",
new_stream->metadata_flag ? "metadata" : "data",
lttng_pipe_get_writefd(stream_pipe));
- consumer_stream_free(new_stream);
+ if (new_stream->metadata_flag) {
+ consumer_del_stream_for_metadata(new_stream);
+ } else {
+ consumer_del_stream_for_data(new_stream);
+ }
- /* Successfully sent to the right thread. */
- new_stream->globally_visible = 1;
DBG("Kernel consumer ADD_STREAM %s (fd: %d) with relayd id %" PRIu64,
new_stream->name, fd, new_stream->relayd_stream_id);
DBG("Kernel consumer ADD_STREAM %s (fd: %d) with relayd id %" PRIu64,
new_stream->name, fd, new_stream->relayd_stream_id);
/* Get the right pipe where the stream will be sent. */
if (stream->metadata_flag) {
/* Get the right pipe where the stream will be sent. */
if (stream->metadata_flag) {
+ ret = consumer_add_metadata_stream(stream);
+ if (ret) {
+ ERR("Consumer add metadata stream %" PRIu64 " failed.",
+ stream->key);
+ goto error;
+ }
stream_pipe = ctx->consumer_metadata_pipe;
} else {
stream_pipe = ctx->consumer_metadata_pipe;
} else {
+ ret = consumer_add_data_stream(stream);
+ if (ret) {
+ ERR("Consumer add stream %" PRIu64 " failed.",
+ stream->key);
+ goto error;
+ }
stream_pipe = ctx->consumer_data_pipe;
}
stream_pipe = ctx->consumer_data_pipe;
}
+ /*
+ * From this point on, the stream's ownership has been moved away from
+ * the channel and becomes globally visible.
+ */
+ stream->globally_visible = 1;
+
ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
if (ret < 0) {
ERR("Consumer write %s stream to pipe %d",
stream->metadata_flag ? "metadata" : "data",
lttng_pipe_get_writefd(stream_pipe));
ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
if (ret < 0) {
ERR("Consumer write %s stream to pipe %d",
stream->metadata_flag ? "metadata" : "data",
lttng_pipe_get_writefd(stream_pipe));
+ if (stream->metadata_flag) {
+ consumer_del_stream_for_metadata(stream);
+ } else {
+ consumer_del_stream_for_data(stream);
+ }
* If we are unable to send the stream to the thread, there is
* a big problem so just stop everything.
*/
* If we are unable to send the stream to the thread, there is
* a big problem so just stop everything.
*/
+ /* Remove node from the channel stream list. */
+ cds_list_del(&stream->send_node);
goto error;
}
/* Remove node from the channel stream list. */
cds_list_del(&stream->send_node);
goto error;
}
/* Remove node from the channel stream list. */
cds_list_del(&stream->send_node);
- /*
- * From this point on, the stream's ownership has been moved away from
- * the channel and becomes globally visible.
- */
- stream->globally_visible = 1;