X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.c;h=61acafec2098ecd70a5b1889309992e858a0e3cb;hp=a1fbbbe566fde08576e78d7ebefeb07503ad9d7e;hb=797bc362b6845f7e8f50922f53fc4683c573fc55;hpb=d812ecb94c72ebe8fd5640de53a628cfa9f5b3e8 diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index a1fbbbe56..61acafec2 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -17,7 +17,6 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#define _GNU_SOURCE #define _LGPL_SOURCE #include #include @@ -40,7 +39,6 @@ #include #include #include -#include #include #include @@ -232,10 +230,21 @@ ssize_t send_viewer_streams(struct lttcomm_sock *sock, send_stream.ctf_trace_id = htobe64(ctf_trace->id); send_stream.metadata_flag = htobe32( vstream->stream->is_metadata); - strncpy(send_stream.path_name, vstream->path_name, - sizeof(send_stream.path_name)); - strncpy(send_stream.channel_name, vstream->channel_name, - sizeof(send_stream.channel_name)); + if (lttng_strncpy(send_stream.path_name, vstream->path_name, + sizeof(send_stream.path_name))) { + pthread_mutex_unlock(&vstream->stream->lock); + viewer_stream_put(vstream); + ret = -1; /* Error. */ + goto end_unlock; + } + if (lttng_strncpy(send_stream.channel_name, + vstream->channel_name, + sizeof(send_stream.channel_name))) { + pthread_mutex_unlock(&vstream->stream->lock); + viewer_stream_put(vstream); + ret = -1; /* Error. */ + goto end_unlock; + } DBG("Sending stream %" PRIu64 " to viewer", vstream->stream->stream_handle); @@ -330,7 +339,10 @@ int make_viewer_streams(struct relay_session *session, * Ensure a self-reference is preserved even * after we have put our local reference. */ - viewer_stream_get(vstream); + if (!viewer_stream_get(vstream)) { + ERR("Unable to get self-reference on viewer stream, logic error."); + abort(); + } } else { if (!vstream->sent_flag && nb_unsent) { /* Update number of unsent stream counter. */ @@ -542,10 +554,7 @@ restart: goto exit; } - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("socket poll error"); - goto error; - } else if (revents & LPOLLIN) { + if (revents & LPOLLIN) { /* * A new connection is requested, therefore a * viewer connection is allocated in this @@ -588,6 +597,12 @@ restart: * exchange in cds_wfcq_enqueue. */ futex_nto1_wake(&viewer_conn_queue.futex); + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("socket poll error"); + goto error; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } } @@ -793,7 +808,7 @@ end: static int viewer_list_sessions(struct relay_connection *conn) { - int ret; + int ret = 0; struct lttng_viewer_list_sessions session_list; struct lttng_ht_iter iter; struct relay_session *session; @@ -823,17 +838,23 @@ int viewer_list_sessions(struct relay_connection *conn) new_buf_count * sizeof(*send_session_buf)); if (!newbuf) { ret = -1; - rcu_read_unlock(); - goto end_free; + break; } send_session_buf = newbuf; buf_count = new_buf_count; } send_session = &send_session_buf[count]; - strncpy(send_session->session_name, session->session_name, - sizeof(send_session->session_name)); - strncpy(send_session->hostname, session->hostname, - sizeof(send_session->hostname)); + if (lttng_strncpy(send_session->session_name, + session->session_name, + sizeof(send_session->session_name))) { + ret = -1; + break; + } + if (lttng_strncpy(send_session->hostname, session->hostname, + sizeof(send_session->hostname))) { + ret = -1; + break; + } send_session->id = htobe64(session->id); send_session->live_timer = htobe32(session->live_timer); if (session->viewer_attached) { @@ -845,6 +866,9 @@ int viewer_list_sessions(struct relay_connection *conn) count++; } rcu_read_unlock(); + if (ret < 0) { + goto end_free; + } session_list.sessions_count = htobe32(count); @@ -1098,8 +1122,8 @@ error: /* * Open the index file if needed for the given vstream. * - * If an index file is successfully opened, the vstream index_fd set with - * it. + * If an index file is successfully opened, the vstream will set it as its + * current index file. * * Return 0 on success, a negative value on error (-ENOENT if not ready yet). * @@ -1110,7 +1134,7 @@ static int try_open_index(struct relay_viewer_stream *vstream, { int ret = 0; - if (vstream->index_fd) { + if (vstream->index_file) { goto end; } @@ -1121,20 +1145,12 @@ static int try_open_index(struct relay_viewer_stream *vstream, ret = -ENOENT; goto end; } - ret = index_open(vstream->path_name, vstream->channel_name, + vstream->index_file = lttng_index_file_open(vstream->path_name, + vstream->channel_name, vstream->stream->tracefile_count, vstream->current_tracefile_id); - if (ret >= 0) { - vstream->index_fd = stream_fd_create(ret); - if (!vstream->index_fd) { - if (close(ret)) { - PERROR("close"); - } - ret = -1; - } else { - ret = 0; - } - goto end; + if (!vstream->index_file) { + ret = -1; } end: @@ -1158,10 +1174,13 @@ static int check_index_status(struct relay_viewer_stream *vstream, { int ret; - if (trace->session->connection_closed + if ((trace->session->connection_closed || rstream->closed) && rstream->index_received_seqcount == vstream->index_sent_seqcount) { - /* Last index sent and session connection is closed. */ + /* + * Last index sent and session connection or relay + * stream are closed. + */ index->status = htobe32(LTTNG_VIEWER_INDEX_HUP); goto hup; } else if (rstream->beacon_ts_end != -1ULL && @@ -1253,7 +1272,6 @@ static int viewer_get_next_index(struct relay_connection *conn) { int ret; - ssize_t read_ret; struct lttng_viewer_get_next_index request_index; struct lttng_viewer_index viewer_index; struct ctf_packet_index packet_index; @@ -1376,11 +1394,10 @@ int viewer_get_next_index(struct relay_connection *conn) viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM; } - read_ret = lttng_read(vstream->index_fd->fd, &packet_index, - sizeof(packet_index)); - if (read_ret < sizeof(packet_index)) { - ERR("Relay reading index file %d returned %zd", - vstream->index_fd->fd, read_ret); + ret = lttng_index_file_read(vstream->index_file, &packet_index); + if (ret) { + ERR("Relay error reading index file %d", + vstream->index_file->fd); viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR); goto send_reply; } else { @@ -1733,6 +1750,78 @@ end: return ret; } +/* + * Detach a viewer session. + * + * Return 0 on success or else a negative value. + */ +static +int viewer_detach_session(struct relay_connection *conn) +{ + int ret; + struct lttng_viewer_detach_session_response response; + struct lttng_viewer_detach_session_request request; + struct relay_session *session = NULL; + uint64_t viewer_session_to_close; + + DBG("Viewer detach session received"); + + assert(conn); + + health_code_update(); + + /* Receive the request from the connected client. */ + ret = recv_request(conn->sock, &request, sizeof(request)); + if (ret < 0) { + goto end; + } + viewer_session_to_close = be64toh(request.session_id); + + if (!conn->viewer_session) { + DBG("Client trying to detach before creating a live viewer session"); + response.status = htobe32(LTTNG_VIEWER_DETACH_SESSION_ERR); + goto send_reply; + } + + health_code_update(); + + memset(&response, 0, sizeof(response)); + DBG("Detaching from session ID %" PRIu64, viewer_session_to_close); + + session = session_get_by_id(be64toh(request.session_id)); + if (!session) { + DBG("Relay session %" PRIu64 " not found", + be64toh(request.session_id)); + response.status = htobe32(LTTNG_VIEWER_DETACH_SESSION_UNK); + goto send_reply; + } + + ret = viewer_session_is_attached(conn->viewer_session, session); + if (ret != 1) { + DBG("Not attached to this session"); + response.status = htobe32(LTTNG_VIEWER_DETACH_SESSION_ERR); + goto send_reply_put; + } + + viewer_session_close_one_session(conn->viewer_session, session); + response.status = htobe32(LTTNG_VIEWER_DETACH_SESSION_OK); + DBG("Session %" PRIu64 " detached.", viewer_session_to_close); + +send_reply_put: + session_put(session); + +send_reply: + health_code_update(); + ret = send_response(conn->sock, &response, sizeof(response)); + if (ret < 0) { + goto end; + } + health_code_update(); + ret = 0; + +end: + return ret; +} /* * live_relay_unknown_command: send -1 if received unknown command @@ -1794,6 +1883,9 @@ int process_control(struct lttng_viewer_cmd *recv_hdr, case LTTNG_VIEWER_CREATE_SESSION: ret = viewer_create_session(conn); break; + case LTTNG_VIEWER_DETACH_SESSION: + ret = viewer_detach_session(conn); + break; default: ERR("Received unknown viewer command (%u)", be32toh(recv_hdr->cmd)); @@ -1908,10 +2000,7 @@ restart: /* Inspect the relay conn pipe for new connection. */ if (pollfd == live_conn_pipe[0]) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("Relay live pipe error"); - goto error; - } else if (revents & LPOLLIN) { + if (revents & LPOLLIN) { struct relay_connection *conn; ret = lttng_read(live_conn_pipe[0], @@ -1923,6 +2012,12 @@ restart: LPOLLIN | LPOLLRDHUP); connection_ht_add(viewer_connections_ht, conn); DBG("Connection socket %d added to poll", conn->sock->fd); + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Relay live pipe error"); + goto error; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } else { /* Connection activity. */ @@ -1933,11 +2028,7 @@ restart: continue; } - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - cleanup_connection_pollfd(&events, pollfd); - /* Put "create" ownership reference. */ - connection_put(conn); - } else if (revents & LPOLLIN) { + if (revents & LPOLLIN) { ret = conn->sock->ops->recvmsg(conn->sock, &recv_hdr, sizeof(recv_hdr), 0); if (ret <= 0) { @@ -1956,6 +2047,14 @@ restart: DBG("Viewer connection closed with %d", pollfd); } } + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + cleanup_connection_pollfd(&events, pollfd); + /* Put "create" ownership reference. */ + connection_put(conn); + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + connection_put(conn); + goto error; } /* Put local "get_by_sock" reference. */ connection_put(conn); @@ -2080,7 +2179,7 @@ int relayd_live_create(struct lttng_uri *uri) } /* Setup the dispatcher thread */ - ret = pthread_create(&live_dispatcher_thread, NULL, + ret = pthread_create(&live_dispatcher_thread, default_pthread_attr(), thread_dispatcher, (void *) NULL); if (ret) { errno = ret; @@ -2090,7 +2189,7 @@ int relayd_live_create(struct lttng_uri *uri) } /* Setup the worker thread */ - ret = pthread_create(&live_worker_thread, NULL, + ret = pthread_create(&live_worker_thread, default_pthread_attr(), thread_worker, NULL); if (ret) { errno = ret; @@ -2100,7 +2199,7 @@ int relayd_live_create(struct lttng_uri *uri) } /* Setup the listener thread */ - ret = pthread_create(&live_listener_thread, NULL, + ret = pthread_create(&live_listener_thread, default_pthread_attr(), thread_listener, (void *) NULL); if (ret) { errno = ret;