#include <urcu/futex.h>
#include <urcu/rculist.h>
#include <urcu/uatomic.h>
-
-#include <common/common.h>
-#include <common/compat/endian.h>
-#include <common/compat/poll.h>
-#include <common/compat/socket.h>
-#include <common/defaults.h>
-#include <common/fd-tracker/utils.h>
-#include <common/fs-handle.h>
-#include <common/futex.h>
-#include <common/index/index.h>
-#include <common/sessiond-comm/inet.h>
-#include <common/sessiond-comm/relayd.h>
-#include <common/sessiond-comm/sessiond-comm.h>
-#include <common/uri.h>
-#include <common/utils.h>
+#include <string>
+
+#include <common/common.hpp>
+#include <common/compat/endian.hpp>
+#include <common/compat/poll.hpp>
+#include <common/compat/socket.hpp>
+#include <common/defaults.hpp>
+#include <common/fd-tracker/utils.hpp>
+#include <common/fs-handle.hpp>
+#include <common/futex.hpp>
+#include <common/index/index.hpp>
+#include <common/sessiond-comm/inet.hpp>
+#include <common/sessiond-comm/relayd.hpp>
+#include <common/sessiond-comm/sessiond-comm.hpp>
+#include <common/uri.hpp>
+#include <common/utils.hpp>
#include <lttng/lttng.h>
-#include "cmd.h"
-#include "connection.h"
-#include "ctf-trace.h"
-#include "health-relayd.h"
-#include "live.h"
-#include "lttng-relayd.h"
-#include "session.h"
-#include "stream.h"
-#include "testpoint.h"
-#include "utils.h"
-#include "viewer-session.h"
-#include "viewer-stream.h"
+#include "cmd.hpp"
+#include "connection.hpp"
+#include "ctf-trace.hpp"
+#include "health-relayd.hpp"
+#include "live.hpp"
+#include "lttng-relayd.hpp"
+#include "session.hpp"
+#include "stream.hpp"
+#include "testpoint.hpp"
+#include "utils.hpp"
+#include "viewer-session.hpp"
+#include "viewer-stream.hpp"
#define SESSION_BUF_DEFAULT_COUNT 16
static pthread_mutex_t last_relay_viewer_session_id_lock =
PTHREAD_MUTEX_INITIALIZER;
+static
+const char *lttng_viewer_command_str(lttng_viewer_command cmd)
+{
+ switch (cmd) {
+ case LTTNG_VIEWER_CONNECT:
+ return "CONNECT";
+ case LTTNG_VIEWER_LIST_SESSIONS:
+ return "LIST_SESSIONS";
+ case LTTNG_VIEWER_ATTACH_SESSION:
+ return "ATTACH_SESSION";
+ case LTTNG_VIEWER_GET_NEXT_INDEX:
+ return "GET_NEXT_INDEX";
+ case LTTNG_VIEWER_GET_PACKET:
+ return "GET_PACKET";
+ case LTTNG_VIEWER_GET_METADATA:
+ return "GET_METADATA";
+ case LTTNG_VIEWER_GET_NEW_STREAMS:
+ return "GET_NEW_STREAMS";
+ case LTTNG_VIEWER_CREATE_SESSION:
+ return "CREATE_SESSION";
+ case LTTNG_VIEWER_DETACH_SESSION:
+ return "DETACH_SESSION";
+ default:
+ abort();
+ }
+}
+
+static
+const char *lttng_viewer_next_index_return_code_str(
+ enum lttng_viewer_next_index_return_code code)
+{
+ switch (code) {
+ case LTTNG_VIEWER_INDEX_OK:
+ return "INDEX_OK";
+ case LTTNG_VIEWER_INDEX_RETRY:
+ return "INDEX_RETRY";
+ case LTTNG_VIEWER_INDEX_HUP:
+ return "INDEX_HUP";
+ case LTTNG_VIEWER_INDEX_ERR:
+ return "INDEX_ERR";
+ case LTTNG_VIEWER_INDEX_INACTIVE:
+ return "INDEX_INACTIVE";
+ case LTTNG_VIEWER_INDEX_EOF:
+ return "INDEX_EOF";
+ default:
+ abort();
+ }
+}
+
+static
+const char *lttng_viewer_attach_return_code_str(
+ enum lttng_viewer_attach_return_code code)
+{
+ switch (code) {
+ case LTTNG_VIEWER_ATTACH_OK:
+ return "ATTACH_OK";
+ case LTTNG_VIEWER_ATTACH_ALREADY:
+ return "ATTACH_ALREADY";
+ case LTTNG_VIEWER_ATTACH_UNK:
+ return "ATTACH_UNK";
+ case LTTNG_VIEWER_ATTACH_NOT_LIVE:
+ return "ATTACH_NOT_LIVE";
+ case LTTNG_VIEWER_ATTACH_SEEK_ERR:
+ return "ATTACH_SEEK_ERR";
+ case LTTNG_VIEWER_ATTACH_NO_SESSION:
+ return "ATTACH_NO_SESSION";
+ default:
+ abort();
+ }
+};
+
+static
+const char *lttng_viewer_get_packet_return_code_str(
+ enum lttng_viewer_get_packet_return_code code)
+{
+ switch (code) {
+ case LTTNG_VIEWER_GET_PACKET_OK:
+ return "GET_PACKET_OK";
+ case LTTNG_VIEWER_GET_PACKET_RETRY:
+ return "GET_PACKET_RETRY";
+ case LTTNG_VIEWER_GET_PACKET_ERR:
+ return "GET_PACKET_ERR";
+ case LTTNG_VIEWER_GET_PACKET_EOF:
+ return "GET_PACKET_EOF";
+ default:
+ abort();
+ }
+};
+
/*
* Cleanup the daemon
*/
* chunk can be used safely.
*/
if ((relay_stream->ongoing_rotation.is_set ||
- relay_session->ongoing_rotation) &&
+ session_has_ongoing_rotation(relay_session)) &&
relay_stream->trace_chunk) {
viewer_stream_trace_chunk = lttng_trace_chunk_copy(
relay_stream->trace_chunk);
}
static
-int close_sock(void *data, int *in_fd)
+int close_sock(void *data, int *in_fd __attribute__((unused)))
{
struct lttcomm_sock *sock = (lttcomm_sock *) data;
* This thread manages the listening for new connections on the network
*/
static
-void *thread_listener(void *data)
+void *thread_listener(void *data __attribute__((unused)))
{
int i, ret, pollfd, err = -1;
uint32_t revents, nb_fd;
* This thread manages the dispatching of the requests to worker threads
*/
static
-void *thread_dispatcher(void *data)
+void *thread_dispatcher(void *data __attribute__((unused)))
{
int err = -1;
ssize_t ret;
/* Continue thread execution */
break;
}
- conn = caa_container_of(node, struct relay_connection, qnode);
+ conn = lttng::utils::container_of(node, &relay_connection::qnode);
DBG("Dispatching viewer request waiting on sock %d",
conn->sock->fd);
health_code_update();
- DBG("Viewer is establishing a connection to the relayd.");
-
ret = recv_request(conn->sock, &msg, sizeof(msg));
if (ret < 0) {
goto end;
uint32_t buf_count = SESSION_BUF_DEFAULT_COUNT;
uint32_t count = 0;
- DBG("List sessions received");
-
- send_session_buf = (lttng_viewer_session *) zmalloc(SESSION_BUF_DEFAULT_COUNT * sizeof(*send_session_buf));
+ send_session_buf = calloc<lttng_viewer_session>(SESSION_BUF_DEFAULT_COUNT);
if (!send_session_buf) {
return -1;
}
LTTNG_ASSERT(conn);
- DBG("Get new streams received");
-
health_code_update();
/* Receive the request from the connected client. */
* the viewer's point of view.
*/
pthread_mutex_lock(&session->lock);
+ /*
+ * If a session rotation is ongoing, do not attempt to open any
+ * stream, because the chunk can be in an intermediate state
+ * due to directory renaming.
+ */
+ if (session_has_ongoing_rotation(session)) {
+ DBG("Relay session %" PRIu64 " rotation ongoing", session_id);
+ response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_NO_NEW);
+ goto send_reply_unlock;
+ }
ret = make_viewer_streams(session,
conn->viewer_session,
LTTNG_VIEWER_SEEK_BEGINNING, &nb_total, &nb_unsent,
&nb_created, &closed);
if (ret < 0) {
- goto error_unlock_session;
+ /*
+ * This is caused by an internal error; propagate the negative
+ * 'ret' to close the connection.
+ */
+ response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
+ goto send_reply_unlock;
}
send_streams = 1;
response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
}
error:
return ret;
-error_unlock_session:
- pthread_mutex_unlock(&session->lock);
- session_put(session);
- return ret;
}
/*
}
session_id = be64toh(request.session_id);
+
health_code_update();
memset(&response, 0, sizeof(response));
if (!conn->viewer_session) {
- DBG("Client trying to attach before creating a live viewer session");
- response.status = htobe32(LTTNG_VIEWER_ATTACH_NO_SESSION);
+ viewer_attach_status = LTTNG_VIEWER_ATTACH_NO_SESSION;
+ DBG("Client trying to attach before creating a live viewer session, returning status=%s",
+ lttng_viewer_attach_return_code_str(viewer_attach_status));
goto send_reply;
}
session = session_get_by_id(session_id);
if (!session) {
- DBG("Relay session %" PRIu64 " not found", session_id);
- response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK);
+ viewer_attach_status = LTTNG_VIEWER_ATTACH_UNK;
+ DBG("Relay session %" PRIu64 " not found, returning status=%s",
+ session_id,
+ lttng_viewer_attach_return_code_str(viewer_attach_status));
goto send_reply;
}
- DBG("Attach session ID %" PRIu64 " received", session_id);
+ DBG("Attach relay session ID %" PRIu64 " received", session_id);
pthread_mutex_lock(&session->lock);
if (session->live_timer == 0) {
- DBG("Not live session");
- response.status = htobe32(LTTNG_VIEWER_ATTACH_NOT_LIVE);
+ viewer_attach_status = LTTNG_VIEWER_ATTACH_NOT_LIVE;
+ DBG("Relay session ID %" PRIu64 " is not a live session, returning status=%s",
+ session_id,
+ lttng_viewer_attach_return_code_str(viewer_attach_status));
goto send_reply;
}
viewer_attach_status = viewer_session_attach(conn->viewer_session,
session);
if (viewer_attach_status != LTTNG_VIEWER_ATTACH_OK) {
- response.status = htobe32(viewer_attach_status);
+ DBG("Error attaching to relay session %" PRIu64 ", returning status=%s",
+ session_id,
+ lttng_viewer_attach_return_code_str(viewer_attach_status));
goto send_reply;
}
switch (be32toh(request.seek)) {
case LTTNG_VIEWER_SEEK_BEGINNING:
case LTTNG_VIEWER_SEEK_LAST:
- response.status = htobe32(LTTNG_VIEWER_ATTACH_OK);
+ viewer_attach_status = LTTNG_VIEWER_ATTACH_OK;
seek_type = (lttng_viewer_seek) be32toh(request.seek);
break;
default:
- ERR("Wrong seek parameter");
- response.status = htobe32(LTTNG_VIEWER_ATTACH_SEEK_ERR);
+ ERR("Wrong seek parameter for relay session %" PRIu64
+ ", returning status=%s", session_id,
+ lttng_viewer_attach_return_code_str(viewer_attach_status));
+ viewer_attach_status = LTTNG_VIEWER_ATTACH_SEEK_ERR;
+ send_streams = 0;
+ goto send_reply;
+ }
+
+ /*
+ * If a session rotation is ongoing, do not attempt to open any
+ * stream, because the chunk can be in an intermediate state
+ * due to directory renaming.
+ */
+ if (session_has_ongoing_rotation(session)) {
+ DBG("Relay session %" PRIu64 " rotation ongoing", session_id);
send_streams = 0;
goto send_reply;
}
if (closed) {
send_streams = 0;
response.streams_count = 0;
- response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK);
+ viewer_attach_status = LTTNG_VIEWER_ATTACH_UNK;
+ ERR("Session %" PRIu64 " is closed, returning status=%s",
+ session_id,
+ lttng_viewer_attach_return_code_str(viewer_attach_status));
goto send_reply;
}
send_reply:
health_code_update();
+
+ response.status = htobe32((uint32_t) viewer_attach_status);
+
ret = send_response(conn->sock, &response, sizeof(response));
if (ret < 0) {
goto end_put_session;
* Last index sent and session connection or relay
* stream are closed.
*/
- index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
+ index->status = LTTNG_VIEWER_INDEX_HUP;
+ DBG("Check index status: Connection or stream are closed, stream %" PRIu64
+ ",connection-closed=%d, relay-stream-closed=%d, returning status=%s",
+ vstream->stream->stream_handle,
+ trace->session->connection_closed, rstream->closed,
+ lttng_viewer_next_index_return_code_str(
+ (enum lttng_viewer_next_index_return_code) index->status));
goto hup;
} else if (rstream->beacon_ts_end != -1ULL &&
(rstream->index_received_seqcount == 0 ||
* viewer_stream_sync_tracefile_array_tail) and skip over
* packet sequence numbers.
*/
- index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE);
+ index->status = LTTNG_VIEWER_INDEX_INACTIVE;
index->timestamp_end = htobe64(rstream->beacon_ts_end);
index->stream_id = htobe64(rstream->ctf_stream_id);
- DBG("Check index status: inactive with beacon, for stream %" PRIu64,
- vstream->stream->stream_handle);
+ DBG("Check index status: inactive with beacon, for stream %" PRIu64
+ ", returning status=%s",
+ vstream->stream->stream_handle,
+ lttng_viewer_next_index_return_code_str(
+ (enum lttng_viewer_next_index_return_code) index->status));
goto index_ready;
} else if (rstream->index_received_seqcount == 0 ||
(vstream->index_sent_seqcount != 0 &&
* viewer_stream_sync_tracefile_array_tail) and skip over
* packet sequence numbers.
*/
- index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
- DBG("Check index status: retry for stream %" PRIu64,
- vstream->stream->stream_handle);
+ index->status = LTTNG_VIEWER_INDEX_RETRY;
+ DBG("Check index status:"
+ "did not received beacon for stream %" PRIu64
+ ", returning status=%s",
+ vstream->stream->stream_handle,
+ lttng_viewer_next_index_return_code_str(
+ (enum lttng_viewer_next_index_return_code) index->status));
goto index_ready;
} else if (!tracefile_array_seq_in_file(rstream->tfa,
vstream->current_tracefile_id,
ret = viewer_stream_rotate(vstream);
if (ret == 1) {
/* EOF across entire stream. */
- index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
+ index->status = LTTNG_VIEWER_INDEX_HUP;
+ DBG("Check index status:"
+ "reached end of file for stream %" PRIu64
+ ", returning status=%s",
+ vstream->stream->stream_handle,
+ lttng_viewer_next_index_return_code_str(
+ (enum lttng_viewer_next_index_return_code) index->status));
goto hup;
}
/*
rstream->tfa,
vstream->current_tracefile_id,
vstream->index_sent_seqcount)) {
- index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
- DBG("Check index status: retry: "
+ index->status = LTTNG_VIEWER_INDEX_RETRY;
+ DBG("Check index status:"
"tracefile array sequence number %" PRIu64
- " not in file for stream %" PRIu64,
+ " not in file for stream %" PRIu64
+ ", returning status=%s",
vstream->index_sent_seqcount,
- vstream->stream->stream_handle);
+ vstream->stream->stream_handle,
+ lttng_viewer_next_index_return_code_str(
+ (enum lttng_viewer_next_index_return_code) index->status));
goto index_ready;
}
LTTNG_ASSERT(tracefile_array_seq_in_file(rstream->tfa,
struct relay_stream *rstream = NULL;
struct ctf_trace *ctf_trace = NULL;
struct relay_viewer_stream *metadata_viewer_stream = NULL;
+ bool viewer_stream_and_session_in_same_chunk, viewer_stream_one_rotation_behind;
+ uint64_t stream_file_chunk_id = -1ULL, viewer_session_chunk_id = -1ULL;
+ enum lttng_trace_chunk_status status;
LTTNG_ASSERT(conn);
- DBG("Viewer get next index");
-
memset(&viewer_index, 0, sizeof(viewer_index));
health_code_update();
vstream = viewer_stream_get_by_id(be64toh(request_index.stream_id));
if (!vstream) {
- DBG("Client requested index of unknown stream id %" PRIu64,
- (uint64_t) be64toh(request_index.stream_id));
- viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
+ viewer_index.status = LTTNG_VIEWER_INDEX_ERR;
+ DBG("Client requested index of unknown stream id %" PRIu64", returning status=%s",
+ (uint64_t) be64toh(request_index.stream_id),
+ lttng_viewer_next_index_return_code_str(
+ (enum lttng_viewer_next_index_return_code) viewer_index.status));
goto send_reply;
}
metadata_viewer_stream =
ctf_trace_get_viewer_metadata_stream(ctf_trace);
+ /*
+ * Hold the session lock to protect against concurrent changes
+ * to the chunk files (e.g. rename done by clear), which are
+ * protected by the session ongoing rotation state. Those are
+ * synchronized with the session lock.
+ */
+ pthread_mutex_lock(&rstream->trace->session->lock);
pthread_mutex_lock(&rstream->lock);
/*
* The viewer should not ask for index on metadata stream.
*/
if (rstream->is_metadata) {
- viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
+ viewer_index.status = LTTNG_VIEWER_INDEX_HUP;
+ DBG("Client requested index of a metadata stream id %" PRIu64", returning status=%s",
+ (uint64_t) be64toh(request_index.stream_id),
+ lttng_viewer_next_index_return_code_str(
+ (enum lttng_viewer_next_index_return_code) viewer_index.status));
goto send_reply;
}
if (rstream->ongoing_rotation.is_set) {
/* Rotation is ongoing, try again later. */
- viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+ viewer_index.status = LTTNG_VIEWER_INDEX_RETRY;
+ DBG("Client requested index for stream id %" PRIu64" while a stream rotation is ongoing, returning status=%s",
+ (uint64_t) be64toh(request_index.stream_id),
+ lttng_viewer_next_index_return_code_str(
+ (enum lttng_viewer_next_index_return_code) viewer_index.status));
goto send_reply;
}
- if (rstream->trace->session->ongoing_rotation) {
+ if (session_has_ongoing_rotation(rstream->trace->session)) {
/* Rotation is ongoing, try again later. */
- viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+ viewer_index.status = LTTNG_VIEWER_INDEX_RETRY;
+ DBG("Client requested index for stream id %" PRIu64" while a session rotation is ongoing, returning status=%s",
+ (uint64_t) be64toh(request_index.stream_id),
+ lttng_viewer_next_index_return_code_str(
+ (enum lttng_viewer_next_index_return_code) viewer_index.status));
goto send_reply;
}
conn->viewer_session,
rstream->trace_chunk);
if (ret) {
- viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
+ viewer_index.status = LTTNG_VIEWER_INDEX_ERR;
+ ERR("Error copying trace chunk for stream id %" PRIu64
+ ", returning status=%s",
+ (uint64_t) be64toh(request_index.stream_id),
+ lttng_viewer_next_index_return_code_str(
+ (enum lttng_viewer_next_index_return_code) viewer_index.status));
goto send_reply;
}
}
* This allows clients to consume all the packets of a trace chunk
* after a session's destruction.
*/
- if (conn->viewer_session->current_trace_chunk != vstream->stream_file.trace_chunk &&
- !(rstream->completed_rotation_count == vstream->last_seen_rotation_count + 1 && !rstream->trace_chunk)) {
- DBG("Viewer session and viewer stream chunk differ: "
- "vsession chunk %p vstream chunk %p",
+ if (vstream->stream_file.trace_chunk) {
+ status = lttng_trace_chunk_get_id(
+ vstream->stream_file.trace_chunk,
+ &stream_file_chunk_id);
+ LTTNG_ASSERT(status == LTTNG_TRACE_CHUNK_STATUS_OK);
+ }
+ if (conn->viewer_session->current_trace_chunk) {
+ status = lttng_trace_chunk_get_id(
conn->viewer_session->current_trace_chunk,
- vstream->stream_file.trace_chunk);
+ &viewer_session_chunk_id);
+ LTTNG_ASSERT(status == LTTNG_TRACE_CHUNK_STATUS_OK);
+ }
+
+ viewer_stream_and_session_in_same_chunk = lttng_trace_chunk_ids_equal(
+ conn->viewer_session->current_trace_chunk,
+ vstream->stream_file.trace_chunk);
+ viewer_stream_one_rotation_behind = rstream->completed_rotation_count ==
+ vstream->last_seen_rotation_count + 1;
+
+ if (viewer_stream_and_session_in_same_chunk) {
+ DBG("Transition to latest chunk check (%s -> %s): Same chunk, no need to rotate",
+ vstream->stream_file.trace_chunk ?
+ std::to_string(stream_file_chunk_id).c_str() :
+ "None",
+ conn->viewer_session->current_trace_chunk ?
+ std::to_string(viewer_session_chunk_id).c_str() :
+ "None");
+ } else if (viewer_stream_one_rotation_behind && !rstream->trace_chunk) {
+ DBG("Transition to latest chunk check (%s -> %s): One chunk behind relay stream which is being destroyed, no need to rotate",
+ vstream->stream_file.trace_chunk ?
+ std::to_string(stream_file_chunk_id).c_str() :
+ "None",
+ conn->viewer_session->current_trace_chunk ?
+ std::to_string(viewer_session_chunk_id).c_str() :
+ "None");
+ } else {
+ DBG("Transition to latest chunk check (%s -> %s): Viewer stream chunk ID and viewer session chunk ID differ, rotating viewer stream",
+ vstream->stream_file.trace_chunk ?
+ std::to_string(stream_file_chunk_id).c_str() :
+ "None",
+ conn->viewer_session->current_trace_chunk ?
+ std::to_string(viewer_session_chunk_id).c_str() :
+ "None");
+
viewer_stream_rotate_to_trace_chunk(vstream,
conn->viewer_session->current_trace_chunk);
vstream->last_seen_rotation_count =
ret = try_open_index(vstream, rstream);
if (ret == -ENOENT) {
if (rstream->closed) {
- viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
+ viewer_index.status = LTTNG_VIEWER_INDEX_HUP;
+ DBG("Cannot open index for stream id %" PRIu64
+ "stream is closed, returning status=%s",
+ (uint64_t) be64toh(request_index.stream_id),
+ lttng_viewer_next_index_return_code_str(
+ (enum lttng_viewer_next_index_return_code) viewer_index.status));
goto send_reply;
} else {
- viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+ viewer_index.status = LTTNG_VIEWER_INDEX_RETRY;
+ DBG("Cannot open index for stream id %" PRIu64
+ ", returning status=%s",
+ (uint64_t) be64toh(request_index.stream_id),
+ lttng_viewer_next_index_return_code_str(
+ (enum lttng_viewer_next_index_return_code) viewer_index.status));
goto send_reply;
}
}
if (ret < 0) {
- viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
+ viewer_index.status = LTTNG_VIEWER_INDEX_ERR;
+ ERR("Error opening index for stream id %" PRIu64
+ ", returning status=%s",
+ (uint64_t) be64toh(request_index.stream_id),
+ lttng_viewer_next_index_return_code_str(
+ (enum lttng_viewer_next_index_return_code) viewer_index.status));
goto send_reply;
}
*/
if (!vstream->stream_file.handle) {
char file_path[LTTNG_PATH_MAX];
- enum lttng_trace_chunk_status status;
struct fs_handle *fs_handle;
ret = utils_stream_file_path(rstream->path_name,
if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
if (status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE &&
rstream->closed) {
- viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
+ viewer_index.status = LTTNG_VIEWER_INDEX_HUP;
+ DBG("Cannot find trace chunk file and stream is closed for stream id %" PRIu64
+ ", returning status=%s",
+ (uint64_t) be64toh(request_index.stream_id),
+ lttng_viewer_next_index_return_code_str(
+ (enum lttng_viewer_next_index_return_code) viewer_index.status));
goto send_reply;
}
PERROR("Failed to open trace file for viewer stream");
ret = check_new_streams(conn);
if (ret < 0) {
- viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
+ viewer_index.status = LTTNG_VIEWER_INDEX_ERR;
+ ERR("Error checking for new streams before sending new index to stream id %" PRIu64
+ ", returning status=%s",
+ (uint64_t) be64toh(request_index.stream_id),
+ lttng_viewer_next_index_return_code_str(
+ (enum lttng_viewer_next_index_return_code) viewer_index.status));
goto send_reply;
} else if (ret == 1) {
viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
ret = lttng_index_file_read(vstream->index_file, &packet_index);
if (ret) {
- ERR("Relay error reading index file");
- viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
+ viewer_index.status = LTTNG_VIEWER_INDEX_ERR;
+ ERR("Relay error reading index file for stream id %" PRIu64
+ ", returning status=%s",
+ (uint64_t) be64toh(request_index.stream_id),
+ lttng_viewer_next_index_return_code_str(
+ (enum lttng_viewer_next_index_return_code) viewer_index.status));
goto send_reply;
} else {
- viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_OK);
+ viewer_index.status = LTTNG_VIEWER_INDEX_OK;
+ DBG("Read index file for stream id %" PRIu64
+ ", returning status=%s",
+ (uint64_t) be64toh(request_index.stream_id),
+ lttng_viewer_next_index_return_code_str(
+ (enum lttng_viewer_next_index_return_code) viewer_index.status));
vstream->index_sent_seqcount++;
}
send_reply:
if (rstream) {
pthread_mutex_unlock(&rstream->lock);
+ pthread_mutex_unlock(&rstream->trace->session->lock);
}
if (metadata_viewer_stream) {
}
viewer_index.flags = htobe32(viewer_index.flags);
+ viewer_index.status = htobe32(viewer_index.status);
health_code_update();
ret = send_response(conn->sock, &viewer_index, sizeof(viewer_index));
error_put:
pthread_mutex_unlock(&rstream->lock);
+ pthread_mutex_unlock(&rstream->trace->session->lock);
if (metadata_viewer_stream) {
viewer_stream_put(metadata_viewer_stream);
}
uint32_t packet_data_len = 0;
ssize_t read_len;
uint64_t stream_id;
-
- DBG2("Relay get data packet");
+ enum lttng_viewer_get_packet_return_code get_packet_status;
health_code_update();
vstream = viewer_stream_get_by_id(stream_id);
if (!vstream) {
- DBG("Client requested packet of unknown stream id %" PRIu64,
- stream_id);
- reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
+ get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR;
+ DBG("Client requested packet of unknown stream id %" PRIu64
+ ", returning status=%s", stream_id,
+ lttng_viewer_get_packet_return_code_str(get_packet_status));
goto send_reply_nolock;
} else {
packet_data_len = be32toh(get_packet_info.len);
reply_size += packet_data_len;
}
- reply = (char *) zmalloc(reply_size);
+ reply = zmalloc<char>(reply_size);
if (!reply) {
- PERROR("packet reply zmalloc");
- reply_size = sizeof(reply_header);
+ get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR;
+ PERROR("Falled to allocate reply, returning status=%s",
+ lttng_viewer_get_packet_return_code_str(get_packet_status));
goto error;
}
lseek_ret = fs_handle_seek(vstream->stream_file.handle,
be64toh(get_packet_info.offset), SEEK_SET);
if (lseek_ret < 0) {
+ get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR;
PERROR("Failed to seek file system handle of viewer stream %" PRIu64
- " to offset %" PRIu64,
- stream_id,
- (uint64_t) be64toh(get_packet_info.offset));
+ " to offset %" PRIu64", returning status=%s", stream_id,
+ (uint64_t) be64toh(get_packet_info.offset),
+ lttng_viewer_get_packet_return_code_str(get_packet_status));
goto error;
}
read_len = fs_handle_read(vstream->stream_file.handle,
reply + sizeof(reply_header), packet_data_len);
if (read_len < packet_data_len) {
+ get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR;
PERROR("Failed to read from file system handle of viewer stream id %" PRIu64
- ", offset: %" PRIu64,
- stream_id,
- (uint64_t) be64toh(get_packet_info.offset));
+ ", offset: %" PRIu64 ", returning status=%s", stream_id,
+ (uint64_t) be64toh(get_packet_info.offset),
+ lttng_viewer_get_packet_return_code_str(get_packet_status));
goto error;
}
- reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK);
+
+ get_packet_status = LTTNG_VIEWER_GET_PACKET_OK;
reply_header.len = htobe32(packet_data_len);
goto send_reply;
error:
/* No payload to send on error. */
reply_size = sizeof(reply_header);
- reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
send_reply:
if (vstream) {
health_code_update();
+ reply_header.status = htobe32(get_packet_status);
if (reply) {
memcpy(reply, &reply_header, sizeof(reply_header));
ret = send_response(conn->sock, reply, reply_size);
LTTNG_ASSERT(conn);
- DBG("Relay get metadata");
-
health_code_update();
ret = recv_request(conn->sock, &request, sizeof(request));
* an error.
*/
if (vstream->metadata_sent > 0) {
- vstream->stream->no_new_metadata_notified = true;
- if (vstream->stream->closed) {
- /* Release ownership for the viewer metadata stream. */
+ if (vstream->stream->closed && vstream->stream->no_new_metadata_notified) {
+ /*
+ * Release ownership for the viewer metadata
+ * stream. Note that this reference is the
+ * viewer's reference. The vstream still exists
+ * until the end of the function as
+ * viewer_stream_get_by_id() took a reference.
+ */
viewer_stream_put(vstream);
}
+
+ vstream->stream->no_new_metadata_notified = true;
}
goto send_reply;
}
}
if (conn->viewer_session->current_trace_chunk &&
- conn->viewer_session->current_trace_chunk !=
- vstream->stream_file.trace_chunk) {
+ !lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk,
+ vstream->stream_file.trace_chunk)) {
bool acquired_reference;
DBG("Viewer session and viewer stream chunk differ: "
}
reply.len = htobe64(len);
- data = (char *) zmalloc(len);
+ data = zmalloc<char>(len);
if (!data) {
PERROR("viewer metadata zmalloc");
goto error;
int ret;
struct lttng_viewer_create_session_response resp;
- DBG("Viewer create session received");
-
memset(&resp, 0, sizeof(resp));
resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_OK);
conn->viewer_session = viewer_session_create();
struct relay_session *session = NULL;
uint64_t viewer_session_to_close;
- DBG("Viewer detach session received");
-
LTTNG_ASSERT(conn);
health_code_update();
struct relay_connection *conn)
{
int ret = 0;
- uint32_t msg_value;
-
- msg_value = be32toh(recv_hdr->cmd);
+ lttng_viewer_command cmd =
+ (lttng_viewer_command) be32toh(recv_hdr->cmd);
/*
- * Make sure we've done the version check before any command other then a
- * new client connection.
+ * Make sure we've done the version check before any command other then
+ * a new client connection.
*/
- if (msg_value != LTTNG_VIEWER_CONNECT && !conn->version_check_done) {
- ERR("Viewer conn value %" PRIu32 " before version check", msg_value);
+ if (cmd != LTTNG_VIEWER_CONNECT && !conn->version_check_done) {
+ ERR("Viewer on connection %d requested %s command before version check",
+ conn->sock->fd, lttng_viewer_command_str(cmd));
ret = -1;
goto end;
}
- switch (msg_value) {
+ DBG("Processing %s viewer command from connection %d",
+ lttng_viewer_command_str(cmd), conn->sock->fd);
+
+ switch (cmd) {
case LTTNG_VIEWER_CONNECT:
ret = viewer_connect(conn);
break;
* This thread does the actual work
*/
static
-void *thread_worker(void *data)
+void *thread_worker(void *data __attribute__((unused)))
{
int ret, err = -1;
uint32_t nb_fd;