*/
#define _GNU_SOURCE
+#define _LGPL_SOURCE
#include <getopt.h>
#include <grp.h>
#include <limits.h>
#include <common/common.h>
#include <common/compat/poll.h>
#include <common/compat/socket.h>
+#include <common/compat/endian.h>
#include <common/defaults.h>
#include <common/futex.h>
#include <common/index/index.h>
new_conn->sock = newsock;
/* Enqueue request for the dispatcher thread. */
- cds_wfq_enqueue(&viewer_conn_queue.queue, &new_conn->qnode);
+ cds_wfcq_enqueue(&viewer_conn_queue.head, &viewer_conn_queue.tail,
+ &new_conn->qnode);
/*
* Wake the dispatch queue futex. Implicit memory barrier with
- * the exchange in cds_wfq_enqueue.
+ * the exchange in cds_wfcq_enqueue.
*/
futex_nto1_wake(&viewer_conn_queue.futex);
}
{
int err = -1;
ssize_t ret;
- struct cds_wfq_node *node;
+ struct cds_wfcq_node *node;
struct relay_connection *conn = NULL;
DBG("[thread] Live viewer relay dispatcher started");
health_code_update();
/* Dequeue commands */
- node = cds_wfq_dequeue_blocking(&viewer_conn_queue.queue);
+ node = cds_wfcq_dequeue_blocking(&viewer_conn_queue.head,
+ &viewer_conn_queue.tail);
if (node == NULL) {
DBG("Woken up but nothing in the live-viewer "
"relay command queue");
health_code_update();
+ memset(&response, 0, sizeof(response));
+
rcu_read_lock();
session = session_find_by_id(conn->sessions_ht, session_id);
if (!session) {
health_code_update();
+ memset(&response, 0, sizeof(response));
+
if (!conn->viewer_session) {
DBG("Client trying to attach before creating a live viewer session");
response.status = htobe32(LTTNG_VIEWER_ATTACH_NO_SESSION);
*/
index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE);
index->timestamp_end = htobe64(rstream->beacon_ts_end);
+ index->stream_id = htobe64(rstream->ctf_stream_id);
goto index_ready;
} else if (rstream->total_index_received <= vstream->last_sent_index
&& !vstream->close_write_flag) {
ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index);
pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
if (ret < 0) {
- goto end;
+ goto end_unlock;
} else if (ret == 1) {
/*
* This means the viewer index data structure has been populated by the
}
health_code_update();
+ memset(&reply, 0, sizeof(reply));
+
rcu_read_lock();
stream = viewer_stream_find_by_id(be64toh(request.stream_id));
if (!stream || !stream->metadata_flag) {
DBG("Viewer create session received");
+ memset(&resp, 0, sizeof(resp));
resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_OK);
conn->viewer_session = zmalloc(sizeof(*conn->viewer_session));
if (!conn->viewer_session) {
{
struct lttcomm_relayd_generic_reply reply;
+ memset(&reply, 0, sizeof(reply));
reply.ret_code = htobe32(LTTNG_ERR_UNK);
(void) send_response(conn->sock, &reply, sizeof(reply));
}
}
/* Init relay command queue. */
- cds_wfq_init(&viewer_conn_queue.queue);
+ cds_wfcq_init(&viewer_conn_queue.head, &viewer_conn_queue.tail);
/* Set up max poll set size */
lttng_poll_set_max_size();