X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=e5c5256c7b767675c7bbb748d1a5e25b84131580;hp=dea92ac2374f3f5127dc97389ed6f054d8f49d69;hb=c617c0c651432f9d5ae7adf4c5c1a5fd92ad828e;hpb=3cc2f24a5cdabfbcb1022c0798f6b4845f72b498 diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index dea92ac23..e5c5256c7 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -101,6 +101,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll) { ssize_t ret; + enum lttng_error_code ret_code = LTTNG_OK; struct lttcomm_consumer_msg msg; ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); @@ -111,6 +112,14 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, return ret; } if (msg.cmd_type == LTTNG_CONSUMER_STOP) { + /* + * Notify the session daemon that the command is completed. + * + * On transport layer error, the function call will print an error + * message so handling the returned code is a bit useless since we + * return an error code anyway. + */ + (void) consumer_send_status_msg(sock, ret_code); return -ENOENT; } @@ -120,9 +129,10 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, switch (msg.cmd_type) { case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: { + /* Session daemon status message are handled in the following call. */ ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index, msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll, - &msg.u.relayd_sock.sock); + &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id); goto end_nosignal; } case LTTNG_CONSUMER_ADD_CHANNEL: @@ -133,6 +143,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, DBG("UST Consumer adding channel"); + /* First send a status message before receiving the fds. */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { rcu_read_unlock(); @@ -145,6 +162,17 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, return ret; } + /* + * Send status code to session daemon only if the recv works. If the + * above recv() failed, the session daemon is notified through the + * error socket and the teardown is eventually done. + */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + DBG("consumer_add_channel %d", msg.u.channel.channel_key); new_channel = consumer_allocate_channel(msg.u.channel.channel_key, @@ -178,6 +206,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, DBG("UST Consumer adding stream"); + /* First send a status message before receiving the fds. */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { rcu_read_unlock(); @@ -190,6 +225,17 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, return ret; } + /* + * Send status code to session daemon only if the recv works. If the + * above recv() failed, the session daemon is notified through the + * error socket and the teardown is eventually done. + */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + DBG("Consumer command ADD_STREAM chan %d stream %d", msg.u.stream.channel_key, msg.u.stream.stream_key); @@ -205,6 +251,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.stream.gid, msg.u.stream.net_index, msg.u.stream.metadata_flag, + msg.u.stream.session_id, &alloc_ret); if (new_stream == NULL) { switch (alloc_ret) { @@ -287,7 +334,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, relayd = consumer_find_relayd(index); if (relayd == NULL) { ERR("Unable to find relayd %" PRIu64, index); - goto end_nosignal; + ret_code = LTTNG_ERR_NO_CONSUMER; } /* @@ -300,7 +347,15 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, * * The destroy can happen either here or when a stream fd hangs up. */ - consumer_flag_relayd_for_destroy(relayd); + if (relayd) { + consumer_flag_relayd_for_destroy(relayd); + } + + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } goto end_nosignal; } @@ -309,6 +364,27 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, rcu_read_unlock(); return -ENOSYS; } + case LTTNG_CONSUMER_DATA_PENDING: + { + int32_t ret; + uint64_t id = msg.u.data_pending.session_id; + + DBG("UST consumer data pending command for id %" PRIu64, id); + + ret = consumer_data_pending(id); + + /* Send back returned value to session daemon */ + ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret)); + if (ret < 0) { + PERROR("send data pending ret code"); + } + + /* + * No need to send back a status message since the data pending + * returned value is the response. + */ + break; + } default: break; } @@ -408,13 +484,14 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_ust_shm_handle *handle; struct lttng_ust_lib_ring_buffer *buf; char dummy; - ssize_t readlen; DBG("In read_subbuffer (wait_fd: %d, stream key: %d)", stream->wait_fd, stream->key); /* We can consume the 1 byte written into the wait_fd by UST */ if (!stream->hangup_flush_done) { + ssize_t readlen; + do { readlen = read(stream->wait_fd, &dummy, 1); } while (readlen == -1 && errno == EINTR); @@ -506,3 +583,35 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream) error: return ret; } + +/* + * Check if data is still being extracted from the buffers for a specific + * stream. Consumer data lock MUST be acquired before calling this function + * and the stream lock. + * + * Return 1 if the traced data are still getting read else 0 meaning that the + * data is available for trace viewer reading. + */ +int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream) +{ + int ret; + + assert(stream); + + DBG("UST consumer checking data pending"); + + ret = ustctl_get_next_subbuf(stream->chan->handle, stream->buf); + if (ret == 0) { + /* There is still data so let's put back this subbuffer. */ + ret = ustctl_put_subbuf(stream->chan->handle, stream->buf); + assert(ret == 0); + ret = 1; /* Data is pending */ + goto end; + } + + /* Data is NOT pending so ready to be read. */ + ret = 0; + +end: + return ret; +}