X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fconsumer.c;h=061ec1219c90ad5ef3bd0ea4e217424927864358;hp=b69df16fdb43fcf29c4aa60098924b3c8a675681;hb=c8f59ee5fc11492ef472dc5cfd2fd2c4926b1787;hpb=f73fabfda365d22e7dd180fb1614e37c446fbd9e diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index b69df16fd..061ec1219 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -486,7 +486,8 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, int channel_key, uint64_t max_sb_size, uint64_t mmap_len, - const char *name) + const char *name, + unsigned int nb_init_streams) { assert(msg); @@ -500,6 +501,7 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.channel.channel_key = channel_key; msg->u.channel.max_sb_size = max_sb_size; msg->u.channel.mmap_len = mmap_len; + msg->u.channel.nb_init_streams = nb_init_streams; } /* @@ -517,7 +519,8 @@ void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg, int net_index, unsigned int metadata_flag, const char *name, - const char *pathname) + const char *pathname, + unsigned int session_id) { assert(msg); @@ -535,6 +538,7 @@ void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.stream.gid = gid; msg->u.stream.net_index = net_index; msg->u.stream.metadata_flag = metadata_flag; + msg->u.stream.session_id = (uint64_t) session_id; strncpy(msg->u.stream.name, name, sizeof(msg->u.stream.name)); msg->u.stream.name[sizeof(msg->u.stream.name) - 1] = '\0'; strncpy(msg->u.stream.path_name, pathname, @@ -691,3 +695,67 @@ int consumer_set_subdir(struct consumer_output *consumer, error: return ret; } + +/* + * Ask the consumer if the data is ready to bread (available) for the specific + * session id. + * + * This function has a different behavior with the consumer i.e. that it waits + * for a reply from the consumer if yes or no the data is available. + */ +int consumer_is_data_available(unsigned int id, + struct consumer_output *consumer) +{ + int ret; + int32_t ret_code; + struct consumer_socket *socket; + struct lttng_ht_iter iter; + struct lttcomm_consumer_msg msg; + + assert(consumer); + + msg.cmd_type = LTTNG_CONSUMER_DATA_AVAILABLE; + + msg.u.data_available.session_id = (uint64_t) id; + + DBG3("Consumer data available for id %u", id); + + /* Send command for each consumer */ + cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket, + node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); + + ret = lttcomm_send_unix_sock(socket->fd, &msg, sizeof(msg)); + if (ret < 0) { + PERROR("send consumer data available command"); + pthread_mutex_unlock(socket->lock); + goto error; + } + + /* + * Waiting for the reply code where 0 the data is not available and 1 + * it is for trace reading. + */ + ret = lttcomm_recv_unix_sock(socket->fd, &ret_code, sizeof(ret_code)); + if (ret < 0) { + PERROR("recv consumer data available status"); + pthread_mutex_unlock(socket->lock); + goto error; + } + + pthread_mutex_unlock(socket->lock); + + if (ret_code == 0) { + break; + } + } + + DBG("Consumer data available ret %d", ret_code); + return ret_code; + +error: + return -1; +}