X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=01e27bf666d25d09ff1ed9843df119253ff59971;hp=0856cf0b9d3abf0858ca693b664ab5e542e5bbfb;hb=ecd1a12fac39784bded85c0f06e47ace2dc98cde;hpb=e5add6d004793894ef4c7e047bc0f8885763b205 diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 0856cf0b9..01e27bf66 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -519,7 +519,8 @@ error_open: if (channel->root_shm_path[0]) { (void) run_as_rmdir_recursive(channel->root_shm_path, channel->buffer_credentials.value.uid, - channel->buffer_credentials.value.gid); + channel->buffer_credentials.value.gid, + LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG); } free(stream_fds); error_alloc: @@ -1590,7 +1591,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (!channel) { ERR("UST consumer get channel key %" PRIu64 " not found", key); ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; - goto end_msg_sessiond; + goto end_get_channel; } health_code_update(); @@ -1606,13 +1607,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, * and the consumer can continue its work. The above call * has sent the error status message to the sessiond. */ - goto end_nosignal; + goto end_get_channel_nosignal; } /* * The communicaton was broken hence there is a bad state between * the consumer and sessiond so stop everything. */ - goto error_fatal; + goto error_get_channel_fatal; } health_code_update(); @@ -1622,7 +1623,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, * so don't send them to the data thread. */ if (!channel->monitor) { - goto end_msg_sessiond; + goto end_get_channel; } ret = send_streams_to_thread(channel, ctx); @@ -1631,11 +1632,16 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, * If we are unable to send the stream to the thread, there is * a big problem so just stop everything. */ - goto error_fatal; + goto error_get_channel_fatal; } /* List MUST be empty after or else it could be reused. */ assert(cds_list_empty(&channel->streams.head)); +end_get_channel: goto end_msg_sessiond; +error_get_channel_fatal: + goto error_fatal; +end_get_channel_nosignal: + goto end_nosignal; } case LTTNG_CONSUMER_DESTROY_CHANNEL: { @@ -1706,7 +1712,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, */ DBG("UST consumer push metadata %" PRIu64 " not found", key); ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL; - goto end_msg_sessiond; + goto end_push_metadata_msg_sessiond; } health_code_update(); @@ -1717,14 +1723,14 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, * checked whether the channel can be found. */ ret_code = LTTCOMM_CONSUMERD_SUCCESS; - goto end_msg_sessiond; + goto end_push_metadata_msg_sessiond; } /* Tell session daemon we are ready to receive the metadata. */ ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ - goto error_fatal; + goto error_push_metadata_fatal; } health_code_update(); @@ -1734,7 +1740,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = lttng_consumer_poll_socket(consumer_sockpoll); health_poll_exit(); if (ret) { - goto error_fatal; + goto error_push_metadata_fatal; } health_code_update(); @@ -1743,11 +1749,15 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, len, version, channel, 0, 1); if (ret < 0) { /* error receiving from sessiond */ - goto error_fatal; + goto error_push_metadata_fatal; } else { ret_code = ret; - goto end_msg_sessiond; + goto end_push_metadata_msg_sessiond; } +end_push_metadata_msg_sessiond: + goto end_msg_sessiond; +error_push_metadata_fatal: + goto error_fatal; } case LTTNG_CONSUMER_SETUP_METADATA: { @@ -1974,7 +1984,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; + goto end_rotate_channel_nosignal; } /* @@ -1992,6 +2002,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } } break; +end_rotate_channel_nosignal: + goto end_nosignal; } case LTTNG_CONSUMER_INIT: { @@ -2080,16 +2092,36 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK: { + enum lttng_trace_chunk_command_type close_command = + msg.u.close_trace_chunk.close_command.value; const uint64_t relayd_id = msg.u.close_trace_chunk.relayd_id.value; + struct lttcomm_consumer_close_trace_chunk_reply reply; + char closed_trace_chunk_path[LTTNG_PATH_MAX]; + int ret; ret_code = lttng_consumer_close_trace_chunk( msg.u.close_trace_chunk.relayd_id.is_set ? - &relayd_id : NULL, + &relayd_id : + NULL, msg.u.close_trace_chunk.session_id, msg.u.close_trace_chunk.chunk_id, - (time_t) msg.u.close_trace_chunk.close_timestamp); - goto end_msg_sessiond; + (time_t) msg.u.close_trace_chunk.close_timestamp, + msg.u.close_trace_chunk.close_command.is_set ? + &close_command : + NULL, closed_trace_chunk_path); + reply.ret_code = ret_code; + reply.path_length = strlen(closed_trace_chunk_path) + 1; + ret = lttcomm_send_unix_sock(sock, &reply, sizeof(reply)); + if (ret != sizeof(reply)) { + goto error_fatal; + } + ret = lttcomm_send_unix_sock(sock, closed_trace_chunk_path, + reply.path_length); + if (ret != reply.path_length) { + goto error_fatal; + } + goto end_nosignal; } case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS: { @@ -2108,15 +2140,12 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } end_nosignal: - rcu_read_unlock(); - - health_code_update(); - /* * Return 1 to indicate success since the 0 value can be a socket * shutdown during the recv() or send() call. */ - return 1; + ret = 1; + goto end; end_msg_sessiond: /* @@ -2128,11 +2157,9 @@ end_msg_sessiond: if (ret < 0) { goto error_fatal; } - rcu_read_unlock(); - - health_code_update(); + ret = 1; + goto end; - return 1; end_channel_error: if (channel) { pthread_mutex_unlock(&channel->lock); @@ -2148,15 +2175,18 @@ end_channel_error: /* Stop everything if session daemon can not be notified. */ goto error_fatal; } - rcu_read_unlock(); - - health_code_update(); + ret = 1; + goto end; - return 1; error_fatal: - rcu_read_unlock(); /* This will issue a consumer stop. */ - return -1; + ret = -1; + goto end; + +end: + rcu_read_unlock(); + health_code_update(); + return ret; } /* @@ -2343,7 +2373,8 @@ void lttng_ustconsumer_free_channel(struct lttng_consumer_channel *chan) if (chan->root_shm_path[0]) { (void) run_as_rmdir_recursive(chan->root_shm_path, chan->buffer_credentials.value.uid, - chan->buffer_credentials.value.gid); + chan->buffer_credentials.value.gid, + LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG); } free(chan->stream_fds); } @@ -3014,7 +3045,7 @@ end: * Stop a given metadata channel timer if enabled and close the wait fd which * is the poll pipe of the metadata stream. * - * This MUST be called with the metadata channel acquired. + * This MUST be called with the metadata channel lock acquired. */ void lttng_ustconsumer_close_metadata(struct lttng_consumer_channel *metadata) {