X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=40d0f71960fb6204019b508bd0ca1caffb4f838c;hb=f50f23d9f80ed9fae7fe5c49aee65e813e0031c8;hp=249df8a47bc4789ae9aedee05300e248c52e926e;hpb=6efae65ee1d5c6b048f0f4cd778d784d3857e459;p=lttng-tools.git diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 249df8a47..40d0f7196 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -88,6 +88,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll) { ssize_t ret; + enum lttng_error_code ret_code = LTTNG_OK; struct lttcomm_consumer_msg msg; ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); @@ -96,6 +97,14 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, return ret; } if (msg.cmd_type == LTTNG_CONSUMER_STOP) { + /* + * Notify the session daemon that the command is completed. + * + * On transport layer error, the function call will print an error + * message so handling the returned code is a bit useless since we + * return an error code anyway. + */ + (void) consumer_send_status_msg(sock, ret_code); return -ENOENT; } @@ -105,6 +114,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, switch (msg.cmd_type) { case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: { + /* Session daemon status message are handled in the following call. */ ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index, msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll, &msg.u.relayd_sock.sock); @@ -114,6 +124,13 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, { struct lttng_consumer_channel *new_channel; + /* First send a status message before receiving the fds. */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + DBG("consumer_add_channel %d", msg.u.channel.channel_key); new_channel = consumer_allocate_channel(msg.u.channel.channel_key, -1, -1, @@ -143,6 +160,13 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *new_stream; int alloc_ret = 0; + /* First send a status message before receiving the fds. */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { rcu_read_unlock(); @@ -157,6 +181,17 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, return ret; } + /* + * Send status code to session daemon only if the recv works. If the + * above recv() failed, the session daemon is notified through the + * error socket and the teardown is eventually done. + */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + new_stream = consumer_allocate_stream(msg.u.stream.channel_key, msg.u.stream.stream_key, fd, fd, @@ -263,7 +298,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, relayd = consumer_find_relayd(index); if (relayd == NULL) { ERR("Unable to find relayd %" PRIu64, index); - goto end_nosignal; + ret_code = LTTNG_ERR_NO_CONSUMER; } /* @@ -276,24 +311,37 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, * * The destroy can happen either here or when a stream fd hangs up. */ - consumer_flag_relayd_for_destroy(relayd); + if (relayd) { + consumer_flag_relayd_for_destroy(relayd); + } + + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } goto end_nosignal; } - case LTTNG_CONSUMER_DATA_AVAILABLE: + case LTTNG_CONSUMER_DATA_PENDING: { int32_t ret; - uint64_t id = msg.u.data_available.session_id; + uint64_t id = msg.u.data_pending.session_id; - DBG("Kernel consumer data available command for id %" PRIu64, id); + DBG("Kernel consumer data pending command for id %" PRIu64, id); - ret = consumer_data_available(id); + ret = consumer_data_pending(id); /* Send back returned value to session daemon */ ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret)); if (ret < 0) { - PERROR("send data available ret code"); + PERROR("send data pending ret code"); } + + /* + * No need to send back a status message since the data pending + * returned value is the response. + */ break; } default: @@ -485,42 +533,30 @@ error: /* * Check if data is still being extracted from the buffers for a specific - * stream. Consumer data lock MUST be acquired before calling this function. + * stream. Consumer data lock MUST be acquired before calling this function + * and the stream lock. * - * Return 0 if the traced data are still getting read else 1 meaning that the + * Return 1 if the traced data are still getting read else 0 meaning that the * data is available for trace viewer reading. */ -int lttng_kconsumer_data_available(struct lttng_consumer_stream *stream) +int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream) { int ret; assert(stream); - /* - * Try to lock the stream mutex. On failure, we know that the stream is - * being used else where hence there is data still being extracted. - */ - ret = pthread_mutex_trylock(&stream->lock); - if (ret == EBUSY) { - /* Data not available */ - ret = 0; - goto end; - } - /* The stream is now locked so we can do our ustctl calls */ - ret = kernctl_get_next_subbuf(stream->wait_fd); if (ret == 0) { /* There is still data so let's put back this subbuffer. */ ret = kernctl_put_subbuf(stream->wait_fd); assert(ret == 0); - goto end_unlock; + ret = 1; /* Data is pending */ + goto end; } - /* Data is available to be read for this stream. */ - ret = 1; + /* Data is NOT pending and ready to be read. */ + ret = 0; -end_unlock: - pthread_mutex_unlock(&stream->lock); end: return ret; }