relayd_hang_up = 1;
/* Socket operation failed. We consider the relayd dead */
- if (errno == EPIPE || errno == EINVAL || errno == EBADF) {
+ if (errno == EPIPE) {
/*
* This is possible if the fd is closed on the other side
* (outfd) or any write problem. It can be verbose a bit for a
DBG("Failed to set new trace chunk on existing channels, rolling back");
close_ret = lttng_consumer_close_trace_chunk(relayd_id,
session_id, chunk_id,
- chunk_creation_timestamp);
+ chunk_creation_timestamp, NULL);
if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
session_id, chunk_id);
break;
}
}
- rcu_read_unlock();
+ if (relayd_id) {
+ struct consumer_relayd_sock_pair *relayd;
+
+ relayd = consumer_find_relayd(*relayd_id);
+ if (relayd) {
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_create_trace_chunk(
+ &relayd->control_sock, published_chunk);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ } else {
+ ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id);
+ }
+
+ if (!relayd || ret) {
+ enum lttcomm_return_code close_ret;
+
+ close_ret = lttng_consumer_close_trace_chunk(relayd_id,
+ session_id,
+ chunk_id,
+ chunk_creation_timestamp,
+ NULL);
+ if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
+ ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
+ session_id,
+ chunk_id);
+ }
+
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto error;
+ }
+ }
+error:
+ rcu_read_unlock();
/* Release the reference returned by the "publish" operation. */
lttng_trace_chunk_put(published_chunk);
end:
enum lttcomm_return_code lttng_consumer_close_trace_chunk(
const uint64_t *relayd_id, uint64_t session_id,
- uint64_t chunk_id, time_t chunk_close_timestamp)
+ uint64_t chunk_id, time_t chunk_close_timestamp,
+ const enum lttng_trace_chunk_command_type *close_command)
{
enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
struct lttng_trace_chunk *chunk;
char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
const char *relayd_id_str = "(none)";
+ const char *close_command_name = "none";
struct lttng_ht_iter iter;
struct lttng_consumer_channel *channel;
enum lttng_trace_chunk_status chunk_status;
} else {
relayd_id_str = "(formatting error)";
}
- }
+ }
+ if (close_command) {
+ close_command_name = lttng_trace_chunk_command_type_get_name(
+ *close_command);
+ }
DBG("Consumer close trace chunk command: relayd_id = %s"
- ", session_id = %" PRIu64
- ", chunk_id = %" PRIu64, relayd_id_str,
- session_id, chunk_id);
+ ", session_id = %" PRIu64 ", chunk_id = %" PRIu64
+ ", close command = %s",
+ relayd_id_str, session_id, chunk_id,
+ close_command_name);
+
chunk = lttng_trace_chunk_registry_find_chunk(
- consumer_data.chunk_registry, session_id,
- chunk_id);
- if (!chunk) {
+ consumer_data.chunk_registry, session_id, chunk_id);
+ if (!chunk) {
ERR("Failed to find chunk: session_id = %" PRIu64
", chunk_id = %" PRIu64,
session_id, chunk_id);
ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
goto end;
}
- /*
- * Release the reference returned by the "find" operation and
- * the session daemon's implicit reference to the chunk.
- */
- lttng_trace_chunk_put(chunk);
- lttng_trace_chunk_put(chunk);
+
+ if (close_command) {
+ chunk_status = lttng_trace_chunk_set_close_command(
+ chunk, *close_command);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+ goto end;
+ }
+ }
/*
* chunk is now invalid to access as we no longer hold a reference to
ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
}
}
+
+ if (relayd_id) {
+ int ret;
+ struct consumer_relayd_sock_pair *relayd;
+
+ relayd = consumer_find_relayd(*relayd_id);
+ if (relayd) {
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_close_trace_chunk(
+ &relayd->control_sock, chunk);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ } else {
+ ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64,
+ *relayd_id);
+ }
+
+ if (!relayd || ret) {
+ ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+ goto error_unlock;
+ }
+ }
+error_unlock:
rcu_read_unlock();
end:
+ /*
+ * Release the reference returned by the "find" operation and
+ * the session daemon's implicit reference to the chunk.
+ */
+ lttng_trace_chunk_put(chunk);
+ lttng_trace_chunk_put(chunk);
+
return ret_code;
}