X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.h;h=830514659772b0afa6bb5de1ad7be678e033dcfb;hb=de800f526933f12c0bf521b5d6976d7c6d15af2f;hp=53b6151822ba36102b6b6df5a53878ac35b714a7;hpb=63281d5d013d47087217291ba7724d8302fd63e4;p=lttng-tools.git diff --git a/src/common/consumer.h b/src/common/consumer.h index 53b615182..830514659 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -30,21 +30,6 @@ #include #include -/* - * When the receiving thread dies, we need to have a way to make the polling - * thread exit eventually. If all FDs hang up (normal case when the - * lttng-sessiond stops), we can exit cleanly, but if there is a problem and - * for whatever reason some FDs remain open, the consumer should still exit - * eventually. - * - * If the timeout is reached, it means that during this period no events - * occurred on the FDs so we need to force an exit. This case should not happen - * but it is a safety to ensure we won't block the consumer indefinitely. - * - * The value of 2 seconds is an arbitrary choice. - */ -#define LTTNG_CONSUMER_POLL_TIMEOUT 2000 - /* Commands for consumer */ enum lttng_consumer_command { LTTNG_CONSUMER_ADD_CHANNEL, @@ -57,7 +42,7 @@ enum lttng_consumer_command { /* Inform the consumer to kill a specific relayd connection */ LTTNG_CONSUMER_DESTROY_RELAYD, /* Return to the sessiond if there is data pending for a session */ - LTTNG_CONSUMER_DATA_AVAILABLE, + LTTNG_CONSUMER_DATA_PENDING, }; /* State of each fd in consumer */ @@ -74,6 +59,11 @@ enum lttng_consumer_type { LTTNG_CONSUMER32_UST, }; +enum consumer_endpoint_status { + CONSUMER_ENDPOINT_ACTIVE, + CONSUMER_ENDPOINT_INACTIVE, +}; + struct lttng_consumer_channel { struct lttng_ht_node_ulong node; int key; @@ -139,17 +129,36 @@ struct lttng_consumer_stream { unsigned int metadata_flag; /* Used when the stream is set for network streaming */ uint64_t relayd_stream_id; - /* Next sequence number to use for trace packet */ + /* + * When sending a stream packet to a relayd, this number is used to track + * the packet sent by the consumer and seen by the relayd. When sending the + * data header to the relayd, this number is sent and if the transmission + * was successful, it is incremented. + * + * Even if the full data is not fully transmitted it won't matter since + * only two possible error can happen after that where either the relayd + * died or a read error is detected on the stream making this value useless + * after that. + * + * This value SHOULD be read/updated atomically or with the lock acquired. + */ uint64_t next_net_seq_num; /* - * Lock to use the stream FDs since they are used between threads. Using - * this lock with network streaming, when using the control mutex of a - * consumer_relayd_sock_pair, make sure to acquire this lock BEFORE locking - * it and releasing it AFTER the control mutex unlock. + * Lock to use the stream FDs since they are used between threads. + * + * This is nested INSIDE the consumer_data lock. + * This is nested OUTSIDE consumer_relayd_sock_pair lock. */ pthread_mutex_t lock; /* Tracing session id */ uint64_t session_id; + /* + * Indicates if the stream end point is still active or not (network + * streaming or local file system). The thread "owning" the stream is + * handling this status and can be notified of a state change through the + * consumer data appropriate pipe. + */ + enum consumer_endpoint_status endpoint_status; }; /* @@ -173,6 +182,9 @@ struct consumer_relayd_sock_pair { * between threads sending data to the relayd. Since metadata data is sent * over that socket, at least two sendmsg() are needed (header + data) * creating a race for packets to overlap between threads using it. + * + * This is nested INSIDE the consumer_data lock. + * This is nested INSIDE the stream lock. */ pthread_mutex_t ctrl_sock_mutex; @@ -186,6 +198,10 @@ struct consumer_relayd_sock_pair { */ struct lttcomm_sock data_sock; struct lttng_ht_node_ulong node; + + /* Session id on both sides for the sockets. */ + uint64_t relayd_session_id; + uint64_t sessiond_session_id; }; /* @@ -260,8 +276,8 @@ struct lttng_consumer_global_data { * and number of element in the hash table. It's also a protection for * concurrent read/write between threads. * - * XXX: We need to see if this lock is still needed with the lockless RCU - * hash tables. + * This is nested OUTSIDE the stream lock. + * This is nested OUTSIDE the consumer_relayd_sock_pair lock. */ pthread_mutex_t lock; @@ -296,10 +312,6 @@ struct lttng_consumer_global_data { struct lttng_ht *stream_list_ht; }; -/* Defined in consumer.c and coupled with explanations */ -extern struct lttng_ht *metadata_ht; -extern struct lttng_ht *data_ht; - /* * Init consumer data structures. */ @@ -416,9 +428,11 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream); int consumer_add_relayd_socket(int net_seq_idx, int sock_type, struct lttng_consumer_local_data *ctx, int sock, - struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock); + struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock, + unsigned int sessiond_id); void consumer_flag_relayd_for_destroy( struct consumer_relayd_sock_pair *relayd); -int consumer_data_available(uint64_t id); +int consumer_data_pending(uint64_t id); +int consumer_send_status_msg(int sock, int ret_code); #endif /* LIB_CONSUMER_H */