#include <inttypes.h>
#include <urcu/futex.h>
#include <urcu/uatomic.h>
+#include <urcu/rculist.h>
#include <unistd.h>
#include <fcntl.h>
#include <common/config/session-config.h>
#include <common/dynamic-buffer.h>
#include <common/buffer-view.h>
-#include <urcu/rculist.h>
+#include <common/string-utils/format.h>
#include "cmd.h"
#include "ctf-trace.h"
const size_t header_len = sizeof(struct lttcomm_relayd_rotate_streams);
struct lttng_trace_chunk *next_trace_chunk = NULL;
struct lttng_buffer_view stream_positions;
+ char chunk_id_buf[MAX_INT_DEC_LEN(uint64_t)];
+ const char *chunk_id_str = "none";
if (!session || !conn->version_check_done) {
ERR("Trying to rotate a stream before version check");
ret = -1;
goto end;
}
+
+ ret = snprintf(chunk_id_buf, sizeof(chunk_id_buf), "%" PRIu64,
+ rotate_streams.new_chunk_id.value);
+ if (ret < 0 || ret >= sizeof(chunk_id_buf)) {
+ chunk_id_str = "formatting error";
+ } else {
+ chunk_id_str = chunk_id_buf;
+ }
}
+ DBG("Rotate %" PRIu32 " streams of session \"%s\" to chunk \"%s\"",
+ rotate_streams.stream_count, session->session_name,
+ chunk_id_str);
+
stream_positions = lttng_buffer_view_from_view(payload,
sizeof(rotate_streams), -1);
if (!stream_positions.data ||
ret = -1;
}
+ ret = 0;
end_no_reply:
lttng_trace_chunk_put(next_trace_chunk);
return ret;
}
pthread_mutex_lock(&conn->session->lock);
- lttng_trace_chunk_put(conn->session->current_trace_chunk);
+ if (conn->session->pending_closure_trace_chunk) {
+ /*
+ * Invalid; this means a second create_trace_chunk command was
+ * received before a close_trace_chunk.
+ */
+ ERR("Invalid trace chunk close command received; a trace chunk is already waiting for a trace chunk close command");
+ reply_code = LTTNG_ERR_INVALID_PROTOCOL;
+ ret = -1;
+ goto end_unlock_session;
+ }
+ conn->session->pending_closure_trace_chunk =
+ conn->session->current_trace_chunk;
conn->session->current_trace_chunk = published_chunk;
published_chunk = NULL;
+end_unlock_session:
pthread_mutex_unlock(&conn->session->lock);
-
end:
reply.ret_code = htobe32((uint32_t) reply_code);
send_ret = conn->sock->ops->sendmsg(conn->sock,
goto end;
}
+ pthread_mutex_lock(&session->lock);
+ if (session->pending_closure_trace_chunk &&
+ session->pending_closure_trace_chunk != chunk) {
+ ERR("Trace chunk close command for session \"%s\" does not target the trace chunk pending closure",
+ session->session_name);
+ reply_code = LTTNG_ERR_INVALID_PROTOCOL;
+ ret = -1;
+ goto end_unlock_session;
+ }
+
chunk_status = lttng_trace_chunk_set_close_timestamp(
chunk, close_timestamp);
if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ERR("Failed to set trace chunk close timestamp");
ret = -1;
reply_code = LTTNG_ERR_UNK;
- goto end;
+ goto end_unlock_session;
}
if (close_command.is_set) {
if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ret = -1;
reply_code = LTTNG_ERR_INVALID;
- goto end;
+ goto end_unlock_session;
}
}
- pthread_mutex_lock(&session->lock);
if (session->current_trace_chunk == chunk) {
/*
* After a trace chunk close command, no new streams
lttng_trace_chunk_put(session->current_trace_chunk);
session->current_trace_chunk = NULL;
}
+ lttng_trace_chunk_put(session->pending_closure_trace_chunk);
+ session->pending_closure_trace_chunk = NULL;
+end_unlock_session:
pthread_mutex_unlock(&session->lock);
end: