return ret;
}
-static int check_kernel_stream(struct lttng_consumer_stream *stream)
+int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
{
uint64_t ts, stream_id;
int ret;
- /*
- * While holding the stream mutex, try to take a snapshot, if it
- * succeeds, it means that data is ready to be sent, just let the data
- * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
- * means that there is no data to read after the flush, so we can
- * safely send the empty index.
- */
- pthread_mutex_lock(&stream->lock);
ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
if (ret < 0) {
ERR("Failed to get the current timestamp");
- goto error_unlock;
+ goto end;
}
ret = kernctl_buffer_flush(stream->wait_fd);
if (ret < 0) {
ERR("Failed to flush kernel stream");
- goto error_unlock;
+ goto end;
}
ret = kernctl_snapshot(stream->wait_fd);
if (ret < 0) {
if (errno != EAGAIN && errno != ENODATA) {
PERROR("live timer kernel snapshot");
ret = -1;
- goto error_unlock;
+ goto end;
}
ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
if (ret < 0) {
PERROR("kernctl_get_stream_id");
- goto error_unlock;
+ goto end;
}
DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
ret = send_empty_index(stream, ts, stream_id);
if (ret < 0) {
- goto error_unlock;
+ goto end;
}
}
ret = 0;
-
-error_unlock:
- pthread_mutex_unlock(&stream->lock);
+end:
return ret;
}
-static int check_ust_stream(struct lttng_consumer_stream *stream)
+static int check_kernel_stream(struct lttng_consumer_stream *stream)
{
- uint64_t ts, stream_id;
int ret;
- assert(stream);
- assert(stream->ustream);
/*
* While holding the stream mutex, try to take a snapshot, if it
* succeeds, it means that data is ready to be sent, just let the data
* thread handle that. Otherwise, if the snapshot returns EAGAIN, it
* means that there is no data to read after the flush, so we can
* safely send the empty index.
+ *
+ * Doing a trylock and checking if waiting on metadata if
+ * trylock fails. Bail out of the stream is indeed waiting for
+ * metadata to be pushed. Busy wait on trylock otherwise.
*/
- pthread_mutex_lock(&stream->lock);
+ for (;;) {
+ ret = pthread_mutex_trylock(&stream->lock);
+ switch (ret) {
+ case 0:
+ break; /* We have the lock. */
+ case EBUSY:
+ pthread_mutex_lock(&stream->metadata_timer_lock);
+ if (stream->waiting_on_metadata) {
+ ret = 0;
+ stream->missed_metadata_flush = true;
+ pthread_mutex_unlock(&stream->metadata_timer_lock);
+ goto end; /* Bail out. */
+ }
+ pthread_mutex_unlock(&stream->metadata_timer_lock);
+ /* Try again. */
+ caa_cpu_relax();
+ continue;
+ default:
+ ERR("Unexpected pthread_mutex_trylock error %d", ret);
+ ret = -1;
+ goto end;
+ }
+ break;
+ }
+ ret = consumer_flush_kernel_index(stream);
+ pthread_mutex_unlock(&stream->lock);
+end:
+ return ret;
+}
+
+int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
+{
+ uint64_t ts, stream_id;
+ int ret;
+
ret = cds_lfht_is_node_deleted(&stream->node.node);
if (ret) {
- goto error_unlock;
+ goto end;
}
ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
if (ret < 0) {
ERR("Failed to get the current timestamp");
- goto error_unlock;
+ goto end;
}
lttng_ustconsumer_flush_buffer(stream, 1);
ret = lttng_ustconsumer_take_snapshot(stream);
if (ret != -EAGAIN) {
ERR("Taking UST snapshot");
ret = -1;
- goto error_unlock;
+ goto end;
}
ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
if (ret < 0) {
PERROR("ustctl_get_stream_id");
- goto error_unlock;
+ goto end;
}
DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
ret = send_empty_index(stream, ts, stream_id);
if (ret < 0) {
- goto error_unlock;
+ goto end;
}
}
ret = 0;
+end:
+ return ret;
+}
-error_unlock:
+static int check_ust_stream(struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ assert(stream);
+ assert(stream->ustream);
+ /*
+ * While holding the stream mutex, try to take a snapshot, if it
+ * succeeds, it means that data is ready to be sent, just let the data
+ * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
+ * means that there is no data to read after the flush, so we can
+ * safely send the empty index.
+ *
+ * Doing a trylock and checking if waiting on metadata if
+ * trylock fails. Bail out of the stream is indeed waiting for
+ * metadata to be pushed. Busy wait on trylock otherwise.
+ */
+ for (;;) {
+ ret = pthread_mutex_trylock(&stream->lock);
+ switch (ret) {
+ case 0:
+ break; /* We have the lock. */
+ case EBUSY:
+ pthread_mutex_lock(&stream->metadata_timer_lock);
+ if (stream->waiting_on_metadata) {
+ ret = 0;
+ stream->missed_metadata_flush = true;
+ pthread_mutex_unlock(&stream->metadata_timer_lock);
+ goto end; /* Bail out. */
+ }
+ pthread_mutex_unlock(&stream->metadata_timer_lock);
+ /* Try again. */
+ caa_cpu_relax();
+ continue;
+ default:
+ ERR("Unexpected pthread_mutex_trylock error %d", ret);
+ ret = -1;
+ goto end;
+ }
+ break;
+ }
+ ret = consumer_flush_ust_index(stream);
pthread_mutex_unlock(&stream->lock);
+end:
return ret;
}
siginfo_t info;
struct lttng_consumer_local_data *ctx = data;
+ rcu_register_thread();
+
health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
if (testpoint(consumerd_thread_metadata_timer)) {
health_error();
health_unregister(health_consumerd);
+ rcu_unregister_thread();
+
/* Never return */
return NULL;
}