X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.h;h=4ac823c017d03754ed545a9a1288df707b84778f;hp=a7517a34083db737d9be2b2b51b61ddb3a60eb36;hb=02b3d1769d5f8a33e4109b1e681141c9295dfda6;hpb=282dadbc28bdf6c8565bd0cabd6a2fbffb6b9396 diff --git a/src/common/consumer.h b/src/common/consumer.h index a7517a340..4ac823c01 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -41,7 +41,7 @@ enum lttng_consumer_command { /* pause, delete, active depending on fd state */ LTTNG_CONSUMER_UPDATE_STREAM, /* inform the consumer to quit when all fd has hang up */ - LTTNG_CONSUMER_STOP, + LTTNG_CONSUMER_STOP, /* deprecated */ LTTNG_CONSUMER_ADD_RELAYD_SOCKET, /* Inform the consumer to kill a specific relayd connection */ LTTNG_CONSUMER_DESTROY_RELAYD, @@ -341,6 +341,9 @@ struct lttng_consumer_stream { */ pthread_cond_t metadata_rdv; pthread_mutex_t metadata_rdv_lock; + + /* Indicate if the stream still has some data to be read. */ + unsigned int has_data:1; }; /* @@ -453,6 +456,19 @@ struct lttng_consumer_local_data { int consumer_splice_metadata_pipe[2]; /* Data stream poll thread pipe. To transfer data stream to the thread */ struct lttng_pipe *consumer_data_pipe; + + /* + * Data thread use that pipe to catch wakeup from read subbuffer that + * detects that there is still data to be read for the stream encountered. + * Before doing so, the stream is flagged to indicate that there is still + * data to be read. + * + * Both pipes (read/write) are owned and used inside the data thread. + */ + struct lttng_pipe *consumer_wakeup_pipe; + /* Indicate if the wakeup thread has been notified. */ + unsigned int has_wakeup:1; + /* to let the signal handler wake up the fd receiver thread */ int consumer_should_quit[2]; /* Metadata poll thread pipe. Transfer metadata stream to it */ @@ -622,6 +638,8 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream); int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos); +int lttng_ustconsumer_get_wakeup_fd(struct lttng_consumer_stream *stream); +int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream); void *consumer_thread_metadata_poll(void *data); void *consumer_thread_data_poll(void *data); void *consumer_thread_sessiond_poll(void *data);