X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=8fe3dd96deabe322f7e634bbd51aa78ffcfd70d0;hp=82e2603544c9b770ec0f60a1344f456835ddb76e;hb=2fdaeb7e6ba9952cc083027fbe591fd6b69cbe80;hpb=c35f9726a22f1d93e14589688d830efccda196f3 diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 82e260354..8fe3dd96d 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -37,6 +37,7 @@ #include #include #include +#include #include #include @@ -58,7 +59,7 @@ #include #include #include -#include +#include #include "cmd.h" #include "ctf-trace.h" @@ -2127,6 +2128,8 @@ static int relay_rotate_session_streams( const size_t header_len = sizeof(struct lttcomm_relayd_rotate_streams); struct lttng_trace_chunk *next_trace_chunk = NULL; struct lttng_buffer_view stream_positions; + char chunk_id_buf[MAX_INT_DEC_LEN(uint64_t)]; + const char *chunk_id_str = "none"; if (!session || !conn->version_check_done) { ERR("Trying to rotate a stream before version check"); @@ -2180,8 +2183,20 @@ static int relay_rotate_session_streams( ret = -1; goto end; } + + ret = snprintf(chunk_id_buf, sizeof(chunk_id_buf), "%" PRIu64, + rotate_streams.new_chunk_id.value); + if (ret < 0 || ret >= sizeof(chunk_id_buf)) { + chunk_id_str = "formatting error"; + } else { + chunk_id_str = chunk_id_buf; + } } + DBG("Rotate %" PRIu32 " streams of session \"%s\" to chunk \"%s\"", + rotate_streams.stream_count, session->session_name, + chunk_id_str); + stream_positions = lttng_buffer_view_from_view(payload, sizeof(rotate_streams), -1); if (!stream_positions.data || @@ -2237,6 +2252,7 @@ end: ret = -1; } + ret = 0; end_no_reply: lttng_trace_chunk_put(next_trace_chunk); return ret; @@ -2423,11 +2439,22 @@ static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, } pthread_mutex_lock(&conn->session->lock); - lttng_trace_chunk_put(conn->session->current_trace_chunk); + if (conn->session->pending_closure_trace_chunk) { + /* + * Invalid; this means a second create_trace_chunk command was + * received before a close_trace_chunk. + */ + ERR("Invalid trace chunk close command received; a trace chunk is already waiting for a trace chunk close command"); + reply_code = LTTNG_ERR_INVALID_PROTOCOL; + ret = -1; + goto end_unlock_session; + } + conn->session->pending_closure_trace_chunk = + conn->session->current_trace_chunk; conn->session->current_trace_chunk = published_chunk; published_chunk = NULL; +end_unlock_session: pthread_mutex_unlock(&conn->session->lock); - end: reply.ret_code = htobe32((uint32_t) reply_code); send_ret = conn->sock->ops->sendmsg(conn->sock, @@ -2512,13 +2539,23 @@ static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, goto end; } + pthread_mutex_lock(&session->lock); + if (session->pending_closure_trace_chunk && + session->pending_closure_trace_chunk != chunk) { + ERR("Trace chunk close command for session \"%s\" does not target the trace chunk pending closure", + session->session_name); + reply_code = LTTNG_ERR_INVALID_PROTOCOL; + ret = -1; + goto end_unlock_session; + } + chunk_status = lttng_trace_chunk_set_close_timestamp( chunk, close_timestamp); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { ERR("Failed to set trace chunk close timestamp"); ret = -1; reply_code = LTTNG_ERR_UNK; - goto end; + goto end_unlock_session; } if (close_command.is_set) { @@ -2527,11 +2564,10 @@ static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { ret = -1; reply_code = LTTNG_ERR_INVALID; - goto end; + goto end_unlock_session; } } - pthread_mutex_lock(&session->lock); if (session->current_trace_chunk == chunk) { /* * After a trace chunk close command, no new streams @@ -2544,6 +2580,9 @@ static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, lttng_trace_chunk_put(session->current_trace_chunk); session->current_trace_chunk = NULL; } + lttng_trace_chunk_put(session->pending_closure_trace_chunk); + session->pending_closure_trace_chunk = NULL; +end_unlock_session: pthread_mutex_unlock(&session->lock); end: