#include <urcu/uatomic.h>
#include <string>
-#include <common/common.h>
-#include <common/compat/endian.h>
-#include <common/compat/poll.h>
-#include <common/compat/socket.h>
-#include <common/defaults.h>
-#include <common/fd-tracker/utils.h>
-#include <common/fs-handle.h>
-#include <common/futex.h>
-#include <common/index/index.h>
-#include <common/sessiond-comm/inet.h>
-#include <common/sessiond-comm/relayd.h>
-#include <common/sessiond-comm/sessiond-comm.h>
-#include <common/uri.h>
-#include <common/utils.h>
+#include <common/common.hpp>
+#include <common/compat/endian.hpp>
+#include <common/compat/poll.hpp>
+#include <common/compat/socket.hpp>
+#include <common/defaults.hpp>
+#include <common/fd-tracker/utils.hpp>
+#include <common/fs-handle.hpp>
+#include <common/futex.hpp>
+#include <common/index/index.hpp>
+#include <common/sessiond-comm/inet.hpp>
+#include <common/sessiond-comm/relayd.hpp>
+#include <common/sessiond-comm/sessiond-comm.hpp>
+#include <common/uri.hpp>
+#include <common/utils.hpp>
#include <lttng/lttng.h>
-#include "cmd.h"
-#include "connection.h"
-#include "ctf-trace.h"
-#include "health-relayd.h"
-#include "live.h"
-#include "lttng-relayd.h"
-#include "session.h"
-#include "stream.h"
-#include "testpoint.h"
-#include "utils.h"
-#include "viewer-session.h"
-#include "viewer-stream.h"
+#include "cmd.hpp"
+#include "connection.hpp"
+#include "ctf-trace.hpp"
+#include "health-relayd.hpp"
+#include "live.hpp"
+#include "lttng-relayd.hpp"
+#include "session.hpp"
+#include "stream.hpp"
+#include "testpoint.hpp"
+#include "utils.hpp"
+#include "viewer-session.hpp"
+#include "viewer-stream.hpp"
#define SESSION_BUF_DEFAULT_COUNT 16
}
};
+static
+const char *lttng_viewer_get_packet_return_code_str(
+ enum lttng_viewer_get_packet_return_code code)
+{
+ switch (code) {
+ case LTTNG_VIEWER_GET_PACKET_OK:
+ return "GET_PACKET_OK";
+ case LTTNG_VIEWER_GET_PACKET_RETRY:
+ return "GET_PACKET_RETRY";
+ case LTTNG_VIEWER_GET_PACKET_ERR:
+ return "GET_PACKET_ERR";
+ case LTTNG_VIEWER_GET_PACKET_EOF:
+ return "GET_PACKET_EOF";
+ default:
+ abort();
+ }
+};
+
/*
* Cleanup the daemon
*/
* chunk can be used safely.
*/
if ((relay_stream->ongoing_rotation.is_set ||
- relay_session->ongoing_rotation) &&
+ session_has_ongoing_rotation(relay_session)) &&
relay_stream->trace_chunk) {
viewer_stream_trace_chunk = lttng_trace_chunk_copy(
relay_stream->trace_chunk);
}
static
-int close_sock(void *data, int *in_fd)
+int close_sock(void *data, int *in_fd __attribute__((unused)))
{
struct lttcomm_sock *sock = (lttcomm_sock *) data;
* This thread manages the listening for new connections on the network
*/
static
-void *thread_listener(void *data)
+void *thread_listener(void *data __attribute__((unused)))
{
int i, ret, pollfd, err = -1;
uint32_t revents, nb_fd;
* This thread manages the dispatching of the requests to worker threads
*/
static
-void *thread_dispatcher(void *data)
+void *thread_dispatcher(void *data __attribute__((unused)))
{
int err = -1;
ssize_t ret;
uint32_t buf_count = SESSION_BUF_DEFAULT_COUNT;
uint32_t count = 0;
- send_session_buf = (lttng_viewer_session *) zmalloc(SESSION_BUF_DEFAULT_COUNT * sizeof(*send_session_buf));
+ send_session_buf = calloc<lttng_viewer_session>(SESSION_BUF_DEFAULT_COUNT);
if (!send_session_buf) {
return -1;
}
* stream, because the chunk can be in an intermediate state
* due to directory renaming.
*/
- if (session->ongoing_rotation) {
+ if (session_has_ongoing_rotation(session)) {
DBG("Relay session %" PRIu64 " rotation ongoing", session_id);
response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_NO_NEW);
goto send_reply_unlock;
LTTNG_VIEWER_SEEK_BEGINNING, &nb_total, &nb_unsent,
&nb_created, &closed);
if (ret < 0) {
- goto error_unlock_session;
+ /*
+ * This is caused by an internal error; propagate the negative
+ * 'ret' to close the connection.
+ */
+ response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
+ goto send_reply_unlock;
}
send_streams = 1;
response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
}
error:
return ret;
-error_unlock_session:
- pthread_mutex_unlock(&session->lock);
- session_put(session);
- return ret;
}
/*
* stream, because the chunk can be in an intermediate state
* due to directory renaming.
*/
- if (session->ongoing_rotation) {
+ if (session_has_ongoing_rotation(session)) {
DBG("Relay session %" PRIu64 " rotation ongoing", session_id);
send_streams = 0;
goto send_reply;
goto send_reply;
}
- if (rstream->trace->session->ongoing_rotation) {
+ if (session_has_ongoing_rotation(rstream->trace->session)) {
/* Rotation is ongoing, try again later. */
viewer_index.status = LTTNG_VIEWER_INDEX_RETRY;
DBG("Client requested index for stream id %" PRIu64" while a session rotation is ongoing, returning status=%s",
uint32_t packet_data_len = 0;
ssize_t read_len;
uint64_t stream_id;
+ enum lttng_viewer_get_packet_return_code get_packet_status;
health_code_update();
vstream = viewer_stream_get_by_id(stream_id);
if (!vstream) {
- DBG("Client requested packet of unknown stream id %" PRIu64,
- stream_id);
- reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
+ get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR;
+ DBG("Client requested packet of unknown stream id %" PRIu64
+ ", returning status=%s", stream_id,
+ lttng_viewer_get_packet_return_code_str(get_packet_status));
goto send_reply_nolock;
} else {
packet_data_len = be32toh(get_packet_info.len);
reply_size += packet_data_len;
}
- reply = (char *) zmalloc(reply_size);
+ reply = zmalloc<char>(reply_size);
if (!reply) {
- PERROR("packet reply zmalloc");
- reply_size = sizeof(reply_header);
+ get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR;
+ PERROR("Falled to allocate reply, returning status=%s",
+ lttng_viewer_get_packet_return_code_str(get_packet_status));
goto error;
}
lseek_ret = fs_handle_seek(vstream->stream_file.handle,
be64toh(get_packet_info.offset), SEEK_SET);
if (lseek_ret < 0) {
+ get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR;
PERROR("Failed to seek file system handle of viewer stream %" PRIu64
- " to offset %" PRIu64,
- stream_id,
- (uint64_t) be64toh(get_packet_info.offset));
+ " to offset %" PRIu64", returning status=%s", stream_id,
+ (uint64_t) be64toh(get_packet_info.offset),
+ lttng_viewer_get_packet_return_code_str(get_packet_status));
goto error;
}
read_len = fs_handle_read(vstream->stream_file.handle,
reply + sizeof(reply_header), packet_data_len);
if (read_len < packet_data_len) {
+ get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR;
PERROR("Failed to read from file system handle of viewer stream id %" PRIu64
- ", offset: %" PRIu64,
- stream_id,
- (uint64_t) be64toh(get_packet_info.offset));
+ ", offset: %" PRIu64 ", returning status=%s", stream_id,
+ (uint64_t) be64toh(get_packet_info.offset),
+ lttng_viewer_get_packet_return_code_str(get_packet_status));
goto error;
}
- reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK);
+
+ get_packet_status = LTTNG_VIEWER_GET_PACKET_OK;
reply_header.len = htobe32(packet_data_len);
goto send_reply;
error:
/* No payload to send on error. */
reply_size = sizeof(reply_header);
- reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
send_reply:
if (vstream) {
health_code_update();
+ reply_header.status = htobe32(get_packet_status);
if (reply) {
memcpy(reply, &reply_header, sizeof(reply_header));
ret = send_response(conn->sock, reply, reply_size);
}
reply.len = htobe64(len);
- data = (char *) zmalloc(len);
+ data = zmalloc<char>(len);
if (!data) {
PERROR("viewer metadata zmalloc");
goto error;
* This thread does the actual work
*/
static
-void *thread_worker(void *data)
+void *thread_worker(void *data __attribute__((unused)))
{
int ret, err = -1;
uint32_t nb_fd;