{
DBG("Cleaning up");
- /* Close thread quit pipes */
- utils_close_pipe(live_thread_quit_pipe);
free(live_uri);
}
futex_nto1_wake(&viewer_cmd_queue.futex);
}
-/*
- * Init thread quit pipe.
- *
- * Return -1 on error or 0 if all pipes are created.
- */
-static
-int init_thread_quit_pipe(void)
-{
- int ret;
-
- ret = utils_create_pipe_cloexec(live_thread_quit_pipe);
-
- return ret;
-}
-
/*
* Create a poll set with O_CLOEXEC and add the thread quit pipe to the set.
*/
* Returns 0 on success or a negative value on error.
*/
static
-int init_viewer_stream(struct relay_stream *stream,
- struct lttng_ht *viewer_streams_ht)
+int init_viewer_stream(struct relay_stream *stream)
{
int ret;
struct relay_viewer_stream *viewer_stream;
assert(stream);
- assert(viewer_streams_ht);
viewer_stream = zmalloc(sizeof(*viewer_stream));
if (!viewer_stream) {
*/
static
int viewer_attach_session(struct relay_command *cmd,
- struct lttng_ht *sessions_ht,
- struct lttng_ht *viewer_streams_ht)
+ struct lttng_ht *sessions_ht)
{
int ret, send_streams = 0, nb_streams = 0;
struct lttng_viewer_attach_session_request request;
assert(cmd);
assert(sessions_ht);
- assert(viewer_streams_ht);
DBG("Attach session received");
}
session = caa_container_of(node, struct relay_session, session_n);
- if (cmd->session == session) {
+ if (cmd->session_id == session->id) {
/* Same viewer already attached, just send the stream list. */
send_streams = 1;
response.status = htobe32(VIEWER_ATTACH_OK);
session->viewer_attached++;
send_streams = 1;
response.status = htobe32(VIEWER_ATTACH_OK);
+ cmd->session_id = session->id;
cmd->session = session;
}
continue;
}
- vstream = live_find_viewer_stream_by_id(stream->stream_handle,
- viewer_streams_ht);
+ vstream = live_find_viewer_stream_by_id(stream->stream_handle);
if (!vstream) {
- ret = init_viewer_stream(stream, viewer_streams_ht);
+ ret = init_viewer_stream(stream);
if (ret < 0) {
goto end_unlock;
}
*
* RCU read side lock MUST be acquired.
*/
-struct relay_viewer_stream *live_find_viewer_stream_by_id(uint64_t stream_id,
- struct lttng_ht *viewer_streams_ht)
+struct relay_viewer_stream *live_find_viewer_stream_by_id(uint64_t stream_id)
{
struct lttng_ht_node_u64 *node;
struct lttng_ht_iter iter;
struct relay_viewer_stream *stream = NULL;
- assert(viewer_streams_ht);
-
lttng_ht_lookup(viewer_streams_ht, &stream_id, &iter);
node = lttng_ht_iter_get_node_u64(&iter);
if (node == NULL) {
*/
static
int viewer_get_next_index(struct relay_command *cmd,
- struct lttng_ht *viewer_streams_ht, struct lttng_ht *sessions_ht)
+ struct lttng_ht *sessions_ht)
{
int ret;
struct lttng_viewer_get_next_index request_index;
struct relay_stream *rstream;
assert(cmd);
- assert(viewer_streams_ht);
assert(sessions_ht);
DBG("Viewer get next index");
}
rcu_read_lock();
- vstream = live_find_viewer_stream_by_id(be64toh(request_index.stream_id),
- viewer_streams_ht);
+ vstream = live_find_viewer_stream_by_id(be64toh(request_index.stream_id));
if (!vstream) {
ret = -1;
goto end_unlock;
* Return 0 on success or else a negative value.
*/
static
-int viewer_get_packet(struct relay_command *cmd,
- struct lttng_ht *viewer_streams_ht)
+int viewer_get_packet(struct relay_command *cmd)
{
int ret, send_data = 0;
char *data = NULL;
struct relay_viewer_stream *stream;
assert(cmd);
- assert(viewer_streams_ht);
DBG2("Relay get data packet");
}
rcu_read_lock();
- stream = live_find_viewer_stream_by_id(be64toh(get_packet_info.stream_id),
- viewer_streams_ht);
+ stream = live_find_viewer_stream_by_id(be64toh(get_packet_info.stream_id));
if (!stream) {
goto error;
}
* Return 0 on success else a negative value.
*/
static
-int viewer_get_metadata(struct relay_command *cmd,
- struct lttng_ht *viewer_streams_ht)
+int viewer_get_metadata(struct relay_command *cmd)
{
int ret = 0;
ssize_t read_len;
struct relay_viewer_stream *stream;
assert(cmd);
- assert(viewer_streams_ht);
DBG("Relay get metadata");
}
rcu_read_lock();
- stream = live_find_viewer_stream_by_id(be64toh(request.stream_id),
- viewer_streams_ht);
+ stream = live_find_viewer_stream_by_id(be64toh(request.stream_id));
if (!stream || !stream->metadata_flag) {
ERR("Invalid metadata stream");
goto error;
*/
static
int process_control(struct lttng_viewer_cmd *recv_hdr,
- struct relay_command *cmd, struct lttng_ht *sessions_ht,
- struct lttng_ht *viewer_streams_ht)
+ struct relay_command *cmd, struct lttng_ht *sessions_ht)
{
int ret = 0;
ret = viewer_list_sessions(cmd, sessions_ht);
break;
case VIEWER_ATTACH_SESSION:
- ret = viewer_attach_session(cmd, sessions_ht,
- viewer_streams_ht);
+ ret = viewer_attach_session(cmd, sessions_ht);
break;
case VIEWER_GET_NEXT_INDEX:
- ret = viewer_get_next_index(cmd, viewer_streams_ht, sessions_ht);
+ ret = viewer_get_next_index(cmd, sessions_ht);
break;
case VIEWER_GET_PACKET:
- ret = viewer_get_packet(cmd, viewer_streams_ht);
+ ret = viewer_get_packet(cmd);
break;
case VIEWER_GET_METADATA:
- ret = viewer_get_metadata(cmd, viewer_streams_ht);
+ ret = viewer_get_metadata(cmd);
break;
default:
ERR("Received unknown viewer command (%u)", be32toh(recv_hdr->cmd));
}
static
-void viewer_del_streams(struct lttng_ht *viewer_streams_ht,
- struct relay_session *session)
+void viewer_del_streams(uint64_t session_id)
{
int ret;
struct relay_viewer_stream *stream;
struct lttng_ht_node_u64 *node;
struct lttng_ht_iter iter;
- assert(viewer_streams_ht);
- assert(session);
-
rcu_read_lock();
cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, node, node) {
node = lttng_ht_iter_get_node_u64(&iter);
}
stream = caa_container_of(node, struct relay_viewer_stream, stream_n);
- if (stream->session_id != session->id) {
+ if (stream->session_id != session_id) {
continue;
}
*/
static
void del_connection(struct lttng_ht *relay_connections_ht,
- struct lttng_ht_iter *iter, struct relay_command *relay_connection,
- struct lttng_ht *viewer_streams_ht)
+ struct lttng_ht_iter *iter, struct relay_command *relay_connection)
{
int ret;
assert(relay_connections_ht);
assert(iter);
assert(relay_connection);
- assert(viewer_streams_ht);
ret = lttng_ht_del(relay_connections_ht, iter);
assert(!ret);
- if (relay_connection->session) {
- viewer_del_streams(viewer_streams_ht, relay_connection->session);
- }
+ viewer_del_streams(relay_connection->session_id);
call_rcu(&relay_connection->rcu_node, deferred_free_connection);
}
struct lttng_viewer_cmd recv_hdr;
struct relay_local_data *relay_ctx = (struct relay_local_data *) data;
struct lttng_ht *sessions_ht = relay_ctx->sessions_ht;
- struct lttng_ht *viewer_streams_ht = relay_ctx->viewer_streams_ht;
DBG("[thread] Live viewer relay worker started");
sock_n);
if (revents & (LPOLLERR)) {
- ERR("VIEWER POLL ERROR");
cleanup_poll_connection(&events, pollfd);
del_connection(relay_connections_ht, &iter,
- relay_connection, viewer_streams_ht);
+ relay_connection);
} else if (revents & (LPOLLHUP | LPOLLRDHUP)) {
DBG("Viewer socket %d hung up", pollfd);
cleanup_poll_connection(&events, pollfd);
del_connection(relay_connections_ht, &iter,
- relay_connection, viewer_streams_ht);
+ relay_connection);
} else if (revents & LPOLLIN) {
ret = relay_connection->sock->ops->recvmsg(
relay_connection->sock, &recv_hdr,
if (ret <= 0) {
cleanup_poll_connection(&events, pollfd);
del_connection( relay_connections_ht, &iter,
- relay_connection, viewer_streams_ht);
+ relay_connection);
DBG("Viewer control connection closed with %d",
pollfd);
} else {
relay_connection->session->id);
}
ret = process_control(&recv_hdr, relay_connection,
- sessions_ht, viewer_streams_ht);
+ sessions_ht);
if (ret < 0) {
/* Clear the session on error. */
cleanup_poll_connection(&events, pollfd);
del_connection(relay_connections_ht, &iter,
- relay_connection, viewer_streams_ht);
+ relay_connection);
DBG("Viewer connection closed with %d", pollfd);
}
}
relay_connection = caa_container_of(node, struct relay_command,
sock_n);
- del_connection(relay_connections_ht, &iter, relay_connection,
- viewer_streams_ht);
+ del_connection(relay_connections_ht, &iter, relay_connection);
}
rcu_read_unlock();
error_poll_create:
* main
*/
int live_start_threads(struct lttng_uri *uri,
- struct relay_local_data *relay_ctx)
+ struct relay_local_data *relay_ctx, int quit_pipe[2])
{
int ret = 0;
void *status;
assert(uri);
live_uri = uri;
- /* Create thread quit pipe */
- if ((ret = init_thread_quit_pipe()) < 0) {
- goto error;
- }
+ live_thread_quit_pipe[0] = quit_pipe[0];
+ live_thread_quit_pipe[1] = quit_pipe[1];
/* Check if daemon is UID = 0 */
is_root = !getuid();