X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fconsumer.c;h=bf477933515502510c284c58762b56d2b4399f75;hp=071135bda6975b09abf4f4db7841454dfecdc269;hb=e3e57ea7c5b6379b0a2df41acbcacb731a6acdd9;hpb=ca22feea083301934d1c8511851c86fb008c0697 diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index 071135bda..bf4779335 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -314,6 +314,7 @@ void consumer_destroy_output(struct consumer_output *obj) */ struct consumer_output *consumer_copy_output(struct consumer_output *obj) { + struct lttng_ht *tmp_ht_ptr; struct lttng_ht_iter iter; struct consumer_socket *socket, *copy_sock; struct consumer_output *output; @@ -324,11 +325,13 @@ struct consumer_output *consumer_copy_output(struct consumer_output *obj) if (output == NULL) { goto error; } + /* Avoid losing the HT reference after the memcpy() */ + tmp_ht_ptr = output->socks; memcpy(output, obj, sizeof(struct consumer_output)); - /* Copy sockets */ - output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + /* Putting back the HT pointer and start copying socket(s). */ + output->socks = tmp_ht_ptr; cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) { /* Create new socket object. */ @@ -337,6 +340,7 @@ struct consumer_output *consumer_copy_output(struct consumer_output *obj) goto malloc_error; } + copy_sock->registered = socket->registered; copy_sock->lock = socket->lock; consumer_add_socket(copy_sock, output); } @@ -695,3 +699,63 @@ int consumer_set_subdir(struct consumer_output *consumer, error: return ret; } + +/* + * Ask the consumer if the data is ready to read (NOT pending) for the specific + * session id. + * + * This function has a different behavior with the consumer i.e. that it waits + * for a reply from the consumer if yes or no the data is pending. + */ +int consumer_is_data_pending(unsigned int id, + struct consumer_output *consumer) +{ + int ret; + int32_t ret_code = 0; /* Default is that the data is NOT pending */ + struct consumer_socket *socket; + struct lttng_ht_iter iter; + struct lttcomm_consumer_msg msg; + + assert(consumer); + + msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING; + + msg.u.data_pending.session_id = (uint64_t) id; + + DBG3("Consumer data pending for id %u", id); + + /* Send command for each consumer */ + cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket, + node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); + + ret = lttcomm_send_unix_sock(socket->fd, &msg, sizeof(msg)); + if (ret < 0) { + PERROR("send consumer data pending command"); + pthread_mutex_unlock(socket->lock); + goto error; + } + + ret = lttcomm_recv_unix_sock(socket->fd, &ret_code, sizeof(ret_code)); + if (ret < 0) { + PERROR("recv consumer data pending status"); + pthread_mutex_unlock(socket->lock); + goto error; + } + + pthread_mutex_unlock(socket->lock); + + if (ret_code == 1) { + break; + } + } + + DBG("Consumer data pending ret %d", ret_code); + return ret_code; + +error: + return -1; +}