int ret;
struct lttcomm_sock *sock = NULL;
- /* Don't resend the sockets to the consumer. */
- if (consumer->dst.net.relayd_socks_sent) {
- ret = LTTNG_OK;
- goto error;
- }
-
/* Set the network sequence index if not set. */
if (consumer->net_seq_index == -1) {
/*
goto close_sock;
}
+ /* Flag that the corresponding socket was sent. */
+ if (relayd_uri->stype == LTTNG_STREAM_CONTROL) {
+ consumer->dst.net.control_sock_sent = 1;
+ } else if (relayd_uri->stype == LTTNG_STREAM_DATA) {
+ consumer->dst.net.data_sock_sent = 1;
+ }
+
ret = LTTNG_OK;
/*
lttcomm_destroy_sock(sock);
}
-error:
return ret;
}
static int send_consumer_relayd_sockets(int domain,
struct ltt_session *session, struct consumer_output *consumer, int fd)
{
- int ret;
+ int ret = LTTNG_OK;
assert(session);
assert(consumer);
- /* Don't resend the sockets to the consumer. */
- if (consumer->dst.net.relayd_socks_sent) {
- ret = LTTNG_OK;
- goto error;
- }
-
/* Sending control relayd socket. */
- ret = send_consumer_relayd_socket(domain, session,
- &consumer->dst.net.control, consumer, fd);
- if (ret != LTTNG_OK) {
- goto error;
+ if (!consumer->dst.net.control_sock_sent) {
+ ret = send_consumer_relayd_socket(domain, session,
+ &consumer->dst.net.control, consumer, fd);
+ if (ret != LTTNG_OK) {
+ goto error;
+ }
}
/* Sending data relayd socket. */
- ret = send_consumer_relayd_socket(domain, session,
- &consumer->dst.net.data, consumer, fd);
- if (ret != LTTNG_OK) {
- goto error;
+ if (!consumer->dst.net.data_sock_sent) {
+ ret = send_consumer_relayd_socket(domain, session,
+ &consumer->dst.net.data, consumer, fd);
+ if (ret != LTTNG_OK) {
+ goto error;
+ }
}
- /* Flag that all relayd sockets were sent to the consumer. */
- consumer->dst.net.relayd_socks_sent = 1;
-
error:
return ret;
}
assert(socket->fd >= 0);
pthread_mutex_lock(socket->lock);
- send_consumer_relayd_sockets(LTTNG_DOMAIN_UST, session,
+ ret = send_consumer_relayd_sockets(LTTNG_DOMAIN_UST, session,
usess->consumer, socket->fd);
pthread_mutex_unlock(socket->lock);
if (ret != LTTNG_OK) {
assert(socket->fd >= 0);
pthread_mutex_lock(socket->lock);
- send_consumer_relayd_sockets(LTTNG_DOMAIN_KERNEL, session,
+ ret = send_consumer_relayd_sockets(LTTNG_DOMAIN_KERNEL, session,
ksess->consumer, socket->fd);
pthread_mutex_unlock(socket->lock);
if (ret != LTTNG_OK) {
goto error;
}
- if (!session->start_consumer) {
- ret = LTTNG_ERR_NO_CONSUMER;
- goto error;
- }
-
/*
* This case switch makes sure the domain session has a temporary consumer
* so the URL can be set.
/*
* Don't send relayd socket if URI is NOT remote or if the relayd
- * sockets for the session are already sent.
+ * socket for the session was already sent.
*/
if (uris[i].dtype == LTTNG_DST_PATH ||
- consumer->dst.net.relayd_socks_sent) {
+ (uris[i].stype == LTTNG_STREAM_CONTROL &&
+ consumer->dst.net.control_sock_sent) ||
+ (uris[i].stype == LTTNG_STREAM_DATA &&
+ consumer->dst.net.data_sock_sent)) {
continue;
}
goto error;
}
- if (!session->start_consumer) {
- ret = LTTNG_ERR_NO_CONSUMER;
- goto error;
- }
-
switch (domain) {
case 0:
assert(session->consumer);
break;
}
+ session->start_consumer = 1;
+
/* Enable it */
if (consumer) {
consumer->enabled = 1;
return ret;
}
+/*
+ * Command LTTNG_DATA_AVAILABLE returning 0 if the data is NOT ready to be read
+ * or else 1 if the data is available for trace analysis.
+ */
+int cmd_data_available(struct ltt_session *session)
+{
+ int ret;
+ struct ltt_kernel_session *ksess = session->kernel_session;
+ struct ltt_ust_session *usess = session->ust_session;
+
+ assert(session);
+
+ /* Session MUST be stopped to ask for data availability. */
+ if (session->enabled) {
+ ret = LTTNG_ERR_SESSION_STARTED;
+ goto error;
+ }
+
+ if (ksess && ksess->consumer) {
+ ret = consumer_is_data_available(ksess->id, ksess->consumer);
+ if (ret == 0) {
+ /* Data is still being extracted for the kernel. */
+ goto error;
+ }
+ }
+
+ if (usess && usess->consumer) {
+ ret = consumer_is_data_available(usess->id, usess->consumer);
+ if (ret == 0) {
+ /* Data is still being extracted for the kernel. */
+ goto error;
+ }
+ }
+
+ /* Data is ready to be read by a viewer */
+ ret = 1;
+
+error:
+ return ret;
+}
+
/*
* Init command subsystem.
*/