relayd: add health instrumentation to threads
[lttng-tools.git] / src / common / consumer.c
index b8695698da7301b69b114fe373680b248294b486..892c841ba0f8958ae46cff1f4d577f0426df1415 100644 (file)
@@ -520,6 +520,9 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
                stream->metadata_flag = 1;
                /* Metadata is flat out. */
                strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
+               /* Live rendez-vous point. */
+               pthread_cond_init(&stream->metadata_rdv, NULL);
+               pthread_mutex_init(&stream->metadata_rdv_lock, NULL);
        } else {
                /* Format stream name to <channel_name>_<cpu_number> */
                ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
@@ -3005,6 +3008,7 @@ void *consumer_thread_sessiond_poll(void *data)
                         * ERR() here.
                         */
                        DBG("Communication interrupted on command socket");
+                       err = 0;
                        goto end;
                }
                if (consumer_quit) {
@@ -3062,6 +3066,9 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
        ssize_t ret;
 
        pthread_mutex_lock(&stream->lock);
+       if (stream->metadata_flag) {
+               pthread_mutex_lock(&stream->metadata_rdv_lock);
+       }
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -3078,6 +3085,10 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                break;
        }
 
+       if (stream->metadata_flag) {
+               pthread_cond_broadcast(&stream->metadata_rdv);
+               pthread_mutex_unlock(&stream->metadata_rdv_lock);
+       }
        pthread_mutex_unlock(&stream->lock);
        return ret;
 }
This page took 0.023548 seconds and 4 git commands to generate.