/* Fetch once the poll data */
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
/* Fetch once the poll data */
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
struct cds_wfq_node *node;
struct relay_command *relay_cmd = NULL;
DBG("[thread] Live viewer relay dispatcher started");
struct cds_wfq_node *node;
struct relay_command *relay_cmd = NULL;
DBG("[thread] Live viewer relay dispatcher started");
/* Get version from the other side. */
ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0);
if (ret < 0 || ret != sizeof(msg)) {
/* Get version from the other side. */
ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0);
if (ret < 0 || ret != sizeof(msg)) {
if (cmd->type == RELAY_VIEWER_COMMAND) {
reply.viewer_session_id = htobe64(++last_relay_viewer_session_id);
}
if (cmd->type == RELAY_VIEWER_COMMAND) {
reply.viewer_session_id = htobe64(++last_relay_viewer_session_id);
}
ret = cmd->sock->ops->sendmsg(cmd->sock, &reply,
sizeof(struct lttng_viewer_connect), 0);
if (ret < 0) {
ERR("Relay sending version");
}
ret = cmd->sock->ops->sendmsg(cmd->sock, &reply,
sizeof(struct lttng_viewer_connect), 0);
if (ret < 0) {
ERR("Relay sending version");
}
cds_lfht_count_nodes(sessions_ht->ht, &approx_before, &count, &approx_after);
session_list.sessions_count = htobe32(count);
cds_lfht_count_nodes(sessions_ht->ht, &approx_before, &count, &approx_after);
session_list.sessions_count = htobe32(count);
ret = cmd->sock->ops->sendmsg(cmd->sock, &session_list,
sizeof(session_list), 0);
if (ret < 0) {
ret = cmd->sock->ops->sendmsg(cmd->sock, &session_list,
sizeof(session_list), 0);
if (ret < 0) {
cds_lfht_for_each_entry(sessions_ht->ht, &iter.iter, node, node) {
cds_lfht_for_each_entry(sessions_ht->ht, &iter.iter, node, node) {
send_session.live_timer = htobe32(session->live_timer);
send_session.clients = htobe32(session->viewer_attached);
send_session.live_timer = htobe32(session->live_timer);
send_session.clients = htobe32(session->viewer_attached);
ret = cmd->sock->ops->sendmsg(cmd->sock, &send_session,
sizeof(send_session), 0);
if (ret < 0) {
ret = cmd->sock->ops->sendmsg(cmd->sock, &send_session,
sizeof(send_session), 0);
if (ret < 0) {
ret = cmd->sock->ops->recvmsg(cmd->sock, &request, sizeof(request), 0);
if (ret < 0 || ret != sizeof(request)) {
if (ret == 0) {
ret = cmd->sock->ops->recvmsg(cmd->sock, &request, sizeof(request), 0);
if (ret < 0 || ret != sizeof(request)) {
if (ret == 0) {
rcu_read_lock();
lttng_ht_lookup(sessions_ht,
(void *)((unsigned long) be64toh(request.session_id)), &iter);
rcu_read_lock();
lttng_ht_lookup(sessions_ht,
(void *)((unsigned long) be64toh(request.session_id)), &iter);
cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, node, node) {
struct relay_viewer_stream *vstream;
cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, node, node) {
struct relay_viewer_stream *vstream;
ret = cmd->sock->ops->sendmsg(cmd->sock, &response, sizeof(response), 0);
if (ret < 0) {
ERR("Relay sending viewer attach response");
goto end_unlock;
}
ret = cmd->sock->ops->sendmsg(cmd->sock, &response, sizeof(response), 0);
if (ret < 0) {
ERR("Relay sending viewer attach response");
goto end_unlock;
}
/* We should only be there if we have a session to attach to. */
assert(session);
cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, node, node) {
/* We should only be there if we have a session to attach to. */
assert(session);
cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, node, node) {
DBG("Opening index file %s in read only, (fd: %d)", fullpath, ret);
do {
DBG("Opening index file %s in read only, (fd: %d)", fullpath, ret);
do {
ret = read(stream->index_read_fd, &hdr, sizeof(hdr));
} while (ret < 0 && errno == EINTR);
if (ret < 0) {
ret = read(stream->index_read_fd, &hdr, sizeof(hdr));
} while (ret < 0 && errno == EINTR);
if (ret < 0) {
ret = cmd->sock->ops->recvmsg(cmd->sock, &request_index,
sizeof(request_index), 0);
if (ret < 0 || ret != sizeof(request_index)) {
ret = cmd->sock->ops->recvmsg(cmd->sock, &request_index,
sizeof(request_index), 0);
if (ret < 0 || ret != sizeof(request_index)) {
ret = read(vstream->index_read_fd, &packet_index,
sizeof(packet_index));
} while (ret < 0 && errno == EINTR);
ret = read(vstream->index_read_fd, &packet_index,
sizeof(packet_index));
} while (ret < 0 && errno == EINTR);
ret = cmd->sock->ops->sendmsg(cmd->sock, &viewer_index,
sizeof(viewer_index), 0);
if (ret < 0) {
ERR("Relay index to viewer");
goto end_unlock;
}
ret = cmd->sock->ops->sendmsg(cmd->sock, &viewer_index,
sizeof(viewer_index), 0);
if (ret < 0) {
ERR("Relay index to viewer");
goto end_unlock;
}
DBG("Index %" PRIu64 "for stream %" PRIu64 "sent",
vstream->last_sent_index, vstream->stream_handle);
DBG("Index %" PRIu64 "for stream %" PRIu64 "sent",
vstream->last_sent_index, vstream->stream_handle);
ret = cmd->sock->ops->recvmsg(cmd->sock, &get_packet_info,
sizeof(get_packet_info), 0);
if (ret < 0 || ret != sizeof(get_packet_info)) {
ret = cmd->sock->ops->recvmsg(cmd->sock, &get_packet_info,
sizeof(get_packet_info), 0);
if (ret < 0 || ret != sizeof(get_packet_info)) {
if (!stream->ctf_trace->metadata_received ||
stream->ctf_trace->metadata_received >
stream->ctf_trace->metadata_sent) {
reply.status = htobe32(VIEWER_GET_PACKET_ERR);
reply.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
if (!stream->ctf_trace->metadata_received ||
stream->ctf_trace->metadata_received >
stream->ctf_trace->metadata_sent) {
reply.status = htobe32(VIEWER_GET_PACKET_ERR);
reply.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
if (ret < 0) {
ERR("Relay data header to viewer");
goto end_unlock;
}
ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
if (ret < 0) {
ERR("Relay data header to viewer");
goto end_unlock;
}
ret = cmd->sock->ops->sendmsg(cmd->sock, data, len, 0);
if (ret < 0) {
ERR("Relay send data to viewer");
goto end_unlock;
}
ret = cmd->sock->ops->sendmsg(cmd->sock, data, len, 0);
if (ret < 0) {
ERR("Relay send data to viewer");
goto end_unlock;
}
ret = cmd->sock->ops->recvmsg(cmd->sock, &request,
sizeof(request), 0);
if (ret < 0 || ret != sizeof(request)) {
ret = cmd->sock->ops->recvmsg(cmd->sock, &request,
sizeof(request), 0);
if (ret < 0 || ret != sizeof(request)) {
ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
if (ret < 0) {
ERR("Relay data header to viewer");
goto end_unlock;
}
ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
if (ret < 0) {
ERR("Relay data header to viewer");
goto end_unlock;
}
if (len > 0) {
ret = cmd->sock->ops->sendmsg(cmd->sock, data, len, 0);
if (len > 0) {
ret = cmd->sock->ops->sendmsg(cmd->sock, data, len, 0);
ret = read(fd, relay_connection, sizeof(*relay_connection));
} while (ret < 0 && errno == EINTR);
if (ret < 0 || ret < sizeof(*relay_connection)) {
ret = read(fd, relay_connection, sizeof(*relay_connection));
} while (ret < 0 && errno == EINTR);
if (ret < 0 || ret < sizeof(*relay_connection)) {
rcu_read_lock();
cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, node, node) {
rcu_read_lock();
cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, node, node) {
/* table of connections indexed on socket */
relay_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
if (!relay_connections_ht) {
/* table of connections indexed on socket */
relay_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
if (!relay_connections_ht) {
/* Infinite blocking call, waiting for transmission */
DBG3("Relayd live viewer worker thread polling...");
/* Infinite blocking call, waiting for transmission */
DBG3("Relayd live viewer worker thread polling...");
uint32_t revents = LTTNG_POLL_GETEV(&events, i);
int pollfd = LTTNG_POLL_GETFD(&events, i);
uint32_t revents = LTTNG_POLL_GETEV(&events, i);
int pollfd = LTTNG_POLL_GETFD(&events, i);
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
/* empty the hash table and free the memory */
rcu_read_lock();
cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, node, node) {
/* empty the hash table and free the memory */
rcu_read_lock();
cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, node, node) {