X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.c;h=3896875f50e999076b2296829e19eab5f665ad5c;hb=e2b6b28eb86b7d0391a514f174a566a459b41136;hp=efafeddbc3543e0d26c5cd2f66f046663c5b8572;hpb=fcf0f774e2b567e5dcf9190a84973b7a4e06eb7d;p=lttng-tools.git diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index efafeddbc..3896875f5 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -4497,7 +4497,7 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk( DBG("Failed to set new trace chunk on existing channels, rolling back"); close_ret = lttng_consumer_close_trace_chunk(relayd_id, session_id, chunk_id, - chunk_creation_timestamp); + chunk_creation_timestamp, NULL); if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) { ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64, session_id, chunk_id); @@ -4507,8 +4507,40 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk( break; } } - rcu_read_unlock(); + if (relayd_id) { + struct consumer_relayd_sock_pair *relayd; + + relayd = consumer_find_relayd(*relayd_id); + if (relayd) { + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_create_trace_chunk( + &relayd->control_sock, published_chunk); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + } else { + ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id); + } + + if (!relayd || ret) { + enum lttcomm_return_code close_ret; + + close_ret = lttng_consumer_close_trace_chunk(relayd_id, + session_id, + chunk_id, + chunk_creation_timestamp, + NULL); + if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) { + ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64, + session_id, + chunk_id); + } + + ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; + goto error; + } + } +error: + rcu_read_unlock(); /* Release the reference returned by the "publish" operation. */ lttng_trace_chunk_put(published_chunk); end: @@ -4517,12 +4549,14 @@ end: enum lttcomm_return_code lttng_consumer_close_trace_chunk( const uint64_t *relayd_id, uint64_t session_id, - uint64_t chunk_id, time_t chunk_close_timestamp) + uint64_t chunk_id, time_t chunk_close_timestamp, + const enum lttng_trace_chunk_command_type *close_command) { enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS; struct lttng_trace_chunk *chunk; char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)]; const char *relayd_id_str = "(none)"; + const char *close_command_name = "none"; struct lttng_ht_iter iter; struct lttng_consumer_channel *channel; enum lttng_trace_chunk_status chunk_status; @@ -4538,16 +4572,21 @@ enum lttcomm_return_code lttng_consumer_close_trace_chunk( } else { relayd_id_str = "(formatting error)"; } - } + } + if (close_command) { + close_command_name = lttng_trace_chunk_command_type_get_name( + *close_command); + } DBG("Consumer close trace chunk command: relayd_id = %s" - ", session_id = %" PRIu64 - ", chunk_id = %" PRIu64, relayd_id_str, - session_id, chunk_id); + ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 + ", close command = %s", + relayd_id_str, session_id, chunk_id, + close_command_name); + chunk = lttng_trace_chunk_registry_find_chunk( - consumer_data.chunk_registry, session_id, - chunk_id); - if (!chunk) { + consumer_data.chunk_registry, session_id, chunk_id); + if (!chunk) { ERR("Failed to find chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64, session_id, chunk_id); @@ -4561,12 +4600,15 @@ enum lttcomm_return_code lttng_consumer_close_trace_chunk( ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED; goto end; } - /* - * Release the reference returned by the "find" operation and - * the session daemon's implicit reference to the chunk. - */ - lttng_trace_chunk_put(chunk); - lttng_trace_chunk_put(chunk); + + if (close_command) { + chunk_status = lttng_trace_chunk_set_close_command( + chunk, *close_command); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED; + goto end; + } + } /* * chunk is now invalid to access as we no longer hold a reference to @@ -4597,8 +4639,37 @@ enum lttcomm_return_code lttng_consumer_close_trace_chunk( ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED; } } + + if (relayd_id) { + int ret; + struct consumer_relayd_sock_pair *relayd; + + relayd = consumer_find_relayd(*relayd_id); + if (relayd) { + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_close_trace_chunk( + &relayd->control_sock, chunk); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + } else { + ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, + *relayd_id); + } + + if (!relayd || ret) { + ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED; + goto error_unlock; + } + } +error_unlock: rcu_read_unlock(); end: + /* + * Release the reference returned by the "find" operation and + * the session daemon's implicit reference to the chunk. + */ + lttng_trace_chunk_put(chunk); + lttng_trace_chunk_put(chunk); + return ret_code; }