X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fcommon%2Fconsumer.c;h=f47d8de1b2669665a4a138a8b0fcbcdb65dc3b3d;hb=a4baae1b0463bc4ce65c2a458c4a941e7fabc594;hp=152064078d34b354263a79992e93d965a5b738a4;hpb=0c759fc95033a3d6d7cb939f39dd643ce7e127ee;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index 152064078..f47d8de1b 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -768,6 +768,44 @@ end: return ret; } +/* + * Find a relayd and send the streams sent message + * + * Returns 0 on success, < 0 on error + */ +int consumer_send_relayd_streams_sent(uint64_t net_seq_idx) +{ + int ret = 0; + struct consumer_relayd_sock_pair *relayd; + + assert(net_seq_idx != -1ULL); + + /* The stream is not metadata. Get relayd reference if exists. */ + rcu_read_lock(); + relayd = consumer_find_relayd(net_seq_idx); + if (relayd != NULL) { + /* Add stream on the relayd */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_streams_sent(&relayd->control_sock); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0) { + goto end; + } + } else { + ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.", + net_seq_idx); + ret = -1; + goto end; + } + + ret = 0; + DBG("All streams sent relayd id %" PRIu64, net_seq_idx); + +end: + rcu_read_unlock(); + return ret; +} + /* * Find a relayd and close the stream */ @@ -1354,7 +1392,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len, unsigned long padding, - struct lttng_packet_index *index) + struct ctf_packet_index *index) { unsigned long mmap_offset; void *mmap_base; @@ -1562,7 +1600,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len, unsigned long padding, - struct lttng_packet_index *index) + struct ctf_packet_index *index) { ssize_t ret = 0, written = 0, ret_splice = 0; loff_t offset = 0;