X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fconsumer.c;h=bf477933515502510c284c58762b56d2b4399f75;hp=c9197bd8ecf9afc682f3b9e43740cf7fe0298aca;hb=e3e57ea7c5b6379b0a2df41acbcacb731a6acdd9;hpb=2f77fc4b3720dc8f75847130498c2d4aad7c03ec diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index c9197bd8e..bf4779335 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -48,7 +48,7 @@ int consumer_send_destroy_relayd(struct consumer_socket *sock, /* Bail out if consumer is disabled */ if (!consumer->enabled) { - ret = LTTCOMM_OK; + ret = LTTNG_OK; DBG3("Consumer is disabled"); goto error; } @@ -314,6 +314,7 @@ void consumer_destroy_output(struct consumer_output *obj) */ struct consumer_output *consumer_copy_output(struct consumer_output *obj) { + struct lttng_ht *tmp_ht_ptr; struct lttng_ht_iter iter; struct consumer_socket *socket, *copy_sock; struct consumer_output *output; @@ -324,11 +325,13 @@ struct consumer_output *consumer_copy_output(struct consumer_output *obj) if (output == NULL) { goto error; } + /* Avoid losing the HT reference after the memcpy() */ + tmp_ht_ptr = output->socks; memcpy(output, obj, sizeof(struct consumer_output)); - /* Copy sockets */ - output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + /* Putting back the HT pointer and start copying socket(s). */ + output->socks = tmp_ht_ptr; cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) { /* Create new socket object. */ @@ -337,6 +340,7 @@ struct consumer_output *consumer_copy_output(struct consumer_output *obj) goto malloc_error; } + copy_sock->registered = socket->registered; copy_sock->lock = socket->lock; consumer_add_socket(copy_sock, output); } @@ -486,7 +490,8 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, int channel_key, uint64_t max_sb_size, uint64_t mmap_len, - const char *name) + const char *name, + unsigned int nb_init_streams) { assert(msg); @@ -500,6 +505,7 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.channel.channel_key = channel_key; msg->u.channel.max_sb_size = max_sb_size; msg->u.channel.mmap_len = mmap_len; + msg->u.channel.nb_init_streams = nb_init_streams; } /* @@ -517,7 +523,8 @@ void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg, int net_index, unsigned int metadata_flag, const char *name, - const char *pathname) + const char *pathname, + unsigned int session_id) { assert(msg); @@ -535,6 +542,7 @@ void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.stream.gid = gid; msg->u.stream.net_index = net_index; msg->u.stream.metadata_flag = metadata_flag; + msg->u.stream.session_id = (uint64_t) session_id; strncpy(msg->u.stream.name, name, sizeof(msg->u.stream.name)); msg->u.stream.name[sizeof(msg->u.stream.name) - 1] = '\0'; strncpy(msg->u.stream.path_name, pathname, @@ -560,9 +568,12 @@ int consumer_send_stream(int sock, struct consumer_output *dst, break; case CONSUMER_DST_LOCAL: /* Add stream file name to stream path */ - strncat(msg->u.stream.path_name, "/", sizeof(msg->u.stream.path_name)); + strncat(msg->u.stream.path_name, "/", + sizeof(msg->u.stream.path_name) - + strlen(msg->u.stream.path_name) - 1); strncat(msg->u.stream.path_name, msg->u.stream.name, - sizeof(msg->u.stream.path_name)); + sizeof(msg->u.stream.path_name) - + strlen(msg->u.stream.path_name) - 1); msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0'; /* Indicate that the stream is NOT network */ msg->u.stream.net_index = -1; @@ -608,7 +619,7 @@ int consumer_send_relayd_socket(int consumer_sock, /* Bail out if consumer is disabled */ if (!consumer->enabled) { - ret = LTTCOMM_OK; + ret = LTTNG_OK; goto error; } @@ -688,3 +699,63 @@ int consumer_set_subdir(struct consumer_output *consumer, error: return ret; } + +/* + * Ask the consumer if the data is ready to read (NOT pending) for the specific + * session id. + * + * This function has a different behavior with the consumer i.e. that it waits + * for a reply from the consumer if yes or no the data is pending. + */ +int consumer_is_data_pending(unsigned int id, + struct consumer_output *consumer) +{ + int ret; + int32_t ret_code = 0; /* Default is that the data is NOT pending */ + struct consumer_socket *socket; + struct lttng_ht_iter iter; + struct lttcomm_consumer_msg msg; + + assert(consumer); + + msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING; + + msg.u.data_pending.session_id = (uint64_t) id; + + DBG3("Consumer data pending for id %u", id); + + /* Send command for each consumer */ + cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket, + node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); + + ret = lttcomm_send_unix_sock(socket->fd, &msg, sizeof(msg)); + if (ret < 0) { + PERROR("send consumer data pending command"); + pthread_mutex_unlock(socket->lock); + goto error; + } + + ret = lttcomm_recv_unix_sock(socket->fd, &ret_code, sizeof(ret_code)); + if (ret < 0) { + PERROR("recv consumer data pending status"); + pthread_mutex_unlock(socket->lock); + goto error; + } + + pthread_mutex_unlock(socket->lock); + + if (ret_code == 1) { + break; + } + } + + DBG("Consumer data pending ret %d", ret_code); + return ret_code; + +error: + return -1; +}