X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.h;h=2bcb0db24d88fed6419d21685023bf1dbe0ffccb;hp=d0cd8fd869e2b647581e02e163f75725365124f8;hb=18eace3ba4aeaa6b869c8ad9ec1273381b4cbdee;hpb=e316aad5fbbe3782872083cb68dfdd58bccea811 diff --git a/src/common/consumer.h b/src/common/consumer.h index d0cd8fd86..2bcb0db24 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -30,21 +30,6 @@ #include #include -/* - * When the receiving thread dies, we need to have a way to make the polling - * thread exit eventually. If all FDs hang up (normal case when the - * lttng-sessiond stops), we can exit cleanly, but if there is a problem and - * for whatever reason some FDs remain open, the consumer should still exit - * eventually. - * - * If the timeout is reached, it means that during this period no events - * occurred on the FDs so we need to force an exit. This case should not happen - * but it is a safety to ensure we won't block the consumer indefinitely. - * - * The value of 2 seconds is an arbitrary choice. - */ -#define LTTNG_CONSUMER_POLL_TIMEOUT 2000 - /* Commands for consumer */ enum lttng_consumer_command { LTTNG_CONSUMER_ADD_CHANNEL, @@ -56,6 +41,8 @@ enum lttng_consumer_command { LTTNG_CONSUMER_ADD_RELAYD_SOCKET, /* Inform the consumer to kill a specific relayd connection */ LTTNG_CONSUMER_DESTROY_RELAYD, + /* Return to the sessiond if there is data pending for a session */ + LTTNG_CONSUMER_DATA_PENDING, }; /* State of each fd in consumer */ @@ -72,6 +59,11 @@ enum lttng_consumer_type { LTTNG_CONSUMER32_UST, }; +enum consumer_endpoint_status { + CONSUMER_ENDPOINT_ACTIVE, + CONSUMER_ENDPOINT_INACTIVE, +}; + struct lttng_consumer_channel { struct lttng_ht_node_ulong node; int key; @@ -101,8 +93,10 @@ struct lttng_ust_lib_ring_buffer; * uniquely a stream. */ struct lttng_consumer_stream { + /* HT node used by the data_ht and metadata_ht */ struct lttng_ht_node_ulong node; - struct lttng_ht_node_ulong waitfd_node; + /* HT node used in consumer_data.stream_list_ht */ + struct lttng_ht_node_ulong node_session_id; struct lttng_consumer_channel *chan; /* associated channel */ /* * key is the key used by the session daemon to refer to the @@ -137,6 +131,22 @@ struct lttng_consumer_stream { uint64_t relayd_stream_id; /* Next sequence number to use for trace packet */ uint64_t next_net_seq_num; + /* + * Lock to use the stream FDs since they are used between threads. Using + * this lock with network streaming, when using the control mutex of a + * consumer_relayd_sock_pair, make sure to acquire this lock BEFORE locking + * it and releasing it AFTER the control mutex unlock. + */ + pthread_mutex_t lock; + /* Tracing session id */ + uint64_t session_id; + /* + * Indicates if the stream end point is still active or not (network + * streaming or local file system). The thread "owning" the stream is + * handling this status and can be notified of a state change through the + * consumer data appropriate pipe. + */ + enum consumer_endpoint_status endpoint_status; }; /* @@ -230,8 +240,8 @@ struct lttng_consumer_local_data { /* communication with splice */ int consumer_thread_pipe[2]; int consumer_splice_metadata_pipe[2]; - /* pipe to wake the poll thread when necessary */ - int consumer_poll_pipe[2]; + /* Data stream poll thread pipe. To transfer data stream to the thread */ + int consumer_data_pipe[2]; /* to let the signal handler wake up the fd receiver thread */ int consumer_should_quit[2]; /* Metadata poll thread pipe. Transfer metadata stream to it */ @@ -253,13 +263,12 @@ struct lttng_consumer_global_data { pthread_mutex_t lock; /* - * Number of streams in the hash table. Protected by consumer_data.lock. + * Number of streams in the data stream hash table declared outside. + * Protected by consumer_data.lock. */ int stream_count; - /* - * Hash tables of streams and channels. Protected by consumer_data.lock. - */ - struct lttng_ht *stream_ht; + + /* Channel hash table protected by consumer_data.lock. */ struct lttng_ht *channel_ht; /* * Flag specifying if the local array of FDs needs update in the @@ -273,8 +282,21 @@ struct lttng_consumer_global_data { * stream has an index which associate the right relayd socket to use. */ struct lttng_ht *relayd_ht; + + /* + * This hash table contains all streams (metadata and data) indexed by + * session id. In other words, the ht is indexed by session id and each + * bucket contains the list of associated streams. + * + * This HT uses the "node_session_id" of the consumer stream. + */ + struct lttng_ht *stream_list_ht; }; +/* Defined in consumer.c and coupled with explanations */ +extern struct lttng_ht *metadata_ht; +extern struct lttng_ht *data_ht; + /* * Init consumer data structures. */ @@ -324,10 +346,6 @@ extern void lttng_consumer_sync_trace_file( */ extern int lttng_consumer_poll_socket(struct pollfd *kconsumer_sockpoll); -extern int consumer_update_poll_array( - struct lttng_consumer_local_data *ctx, struct pollfd **pollfd, - struct lttng_consumer_stream **local_consumer_streams); - extern struct lttng_consumer_stream *consumer_allocate_stream( int channel_key, int stream_key, int shm_fd, int wait_fd, @@ -339,14 +357,12 @@ extern struct lttng_consumer_stream *consumer_allocate_stream( gid_t gid, int net_index, int metadata_flag, + uint64_t session_id, int *alloc_ret); -extern int consumer_add_stream(struct lttng_consumer_stream *stream); extern void consumer_del_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht); extern void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht); -extern void consumer_change_stream_state(int stream_key, - enum lttng_consumer_stream_state state); extern void consumer_del_channel(struct lttng_consumer_channel *channel); extern struct lttng_consumer_channel *consumer_allocate_channel( int channel_key, @@ -362,6 +378,7 @@ struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair( struct consumer_relayd_sock_pair *consumer_find_relayd(int key); int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, size_t data_size); +void consumer_steal_stream_key(int key, struct lttng_ht *ht); extern struct lttng_consumer_local_data *lttng_consumer_create( enum lttng_consumer_type type, @@ -385,8 +402,9 @@ extern int lttng_consumer_get_produced_snapshot( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long *pos); -extern void *lttng_consumer_thread_poll_fds(void *data); -extern void *lttng_consumer_thread_receive_fds(void *data); +extern void *consumer_thread_metadata_poll(void *data); +extern void *consumer_thread_data_poll(void *data); +extern void *consumer_thread_sessiond_poll(void *data); extern int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll); @@ -398,5 +416,6 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock); void consumer_flag_relayd_for_destroy( struct consumer_relayd_sock_pair *relayd); +int consumer_data_pending(uint64_t id); #endif /* LIB_CONSUMER_H */