X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.c;h=55f5c80355afb7535e36056352218bc66967ecaa;hp=34114ad33872c88e7dfee1d3fc53b59a572ba18d;hb=9911d21b154b53ed659415e6baa56d5fb0863716;hpb=151b6b1ac2ac1a85b587e578e69b48f6e5feab6a diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index 34114ad33..55f5c8035 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -17,6 +17,7 @@ */ #define _GNU_SOURCE +#define _LGPL_SOURCE #include #include #include @@ -43,6 +44,7 @@ #include #include #include +#include #include #include #include @@ -93,7 +95,7 @@ static uint64_t last_relay_viewer_session_id; * Cleanup the daemon */ static -void cleanup(void) +void cleanup_relayd_live(void) { DBG("Cleaning up"); @@ -324,40 +326,12 @@ error_unlock: return ret; } -/* - * Write to writable pipe used to notify a thread. - */ -static -int notify_thread_pipe(int wpipe) -{ - ssize_t ret; - - ret = lttng_write(wpipe, "!", 1); - if (ret < 1) { - PERROR("write poll pipe"); - } - - return (int) ret; -} - -/* - * Stop all threads by closing the thread quit pipe. - */ -static -void stop_threads(void) +int relayd_live_stop(void) { - int ret; - - /* Stopping all threads */ - DBG("Terminating all live threads"); - ret = notify_thread_pipe(thread_quit_pipe[1]); - if (ret < 0) { - ERR("write error on thread quit pipe"); - } - - /* Dispatch thread */ + /* Stop dispatch thread */ CMM_STORE_SHARED(live_dispatch_thread_exit, 1); futex_nto1_wake(&viewer_conn_queue.futex); + return 0; } /* @@ -514,6 +488,11 @@ restart: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); + if (!revents) { + /* No activity for this FD (poll implementation). */ + continue; + } + /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { @@ -557,11 +536,12 @@ restart: new_conn->sock = newsock; /* Enqueue request for the dispatcher thread. */ - cds_wfq_enqueue(&viewer_conn_queue.queue, &new_conn->qnode); + cds_wfcq_enqueue(&viewer_conn_queue.head, &viewer_conn_queue.tail, + &new_conn->qnode); /* * Wake the dispatch queue futex. Implicit memory barrier with - * the exchange in cds_wfq_enqueue. + * the exchange in cds_wfcq_enqueue. */ futex_nto1_wake(&viewer_conn_queue.futex); } @@ -588,7 +568,9 @@ error_sock_control: } health_unregister(health_relayd); DBG("Live viewer listener thread cleanup complete"); - stop_threads(); + if (lttng_relay_stop_threads()) { + ERR("Error stopping threads"); + } return NULL; } @@ -600,7 +582,7 @@ void *thread_dispatcher(void *data) { int err = -1; ssize_t ret; - struct cds_wfq_node *node; + struct cds_wfcq_node *node; struct relay_connection *conn = NULL; DBG("[thread] Live viewer relay dispatcher started"); @@ -623,7 +605,8 @@ void *thread_dispatcher(void *data) health_code_update(); /* Dequeue commands */ - node = cds_wfq_dequeue_blocking(&viewer_conn_queue.queue); + node = cds_wfcq_dequeue_blocking(&viewer_conn_queue.head, + &viewer_conn_queue.tail); if (node == NULL) { DBG("Woken up but nothing in the live-viewer " "relay command queue"); @@ -664,7 +647,9 @@ error_testpoint: } health_unregister(health_relayd); DBG("Live viewer dispatch thread dying"); - stop_threads(); + if (lttng_relay_stop_threads()) { + ERR("Error stopping threads"); + } return NULL; } @@ -694,7 +679,7 @@ int viewer_connect(struct relay_connection *conn) health_code_update(); - memset(&msg, 0, sizeof(msg)); + memset(&reply, 0, sizeof(reply)); reply.major = RELAYD_VERSION_COMM_MAJOR; reply.minor = RELAYD_VERSION_COMM_MINOR; @@ -800,14 +785,9 @@ int viewer_list_sessions(struct relay_connection *conn) } health_code_update(); - rcu_read_unlock(); ret = 0; - goto end; - end_unlock: rcu_read_unlock(); - -end: return ret; } @@ -931,6 +911,8 @@ int viewer_get_new_streams(struct relay_connection *conn) health_code_update(); + memset(&response, 0, sizeof(response)); + rcu_read_lock(); session = session_find_by_id(conn->sessions_ht, session_id); if (!session) { @@ -1032,6 +1014,8 @@ int viewer_attach_session(struct relay_connection *conn) health_code_update(); + memset(&response, 0, sizeof(response)); + if (!conn->viewer_session) { DBG("Client trying to attach before creating a live viewer session"); response.status = htobe32(LTTNG_VIEWER_ATTACH_NO_SESSION); @@ -1113,6 +1097,129 @@ error: return ret; } +/* + * Open the index file if needed for the given vstream. + * + * If an index file is successfully opened, the index_read_fd of the stream is + * set with it. + * + * Return 0 on success, a negative value on error (-ENOENT if not ready yet). + */ +static int try_open_index(struct relay_viewer_stream *vstream, + struct relay_stream *rstream) +{ + int ret = 0; + + assert(vstream); + assert(rstream); + + if (vstream->index_read_fd >= 0) { + goto end; + } + + /* + * First time, we open the index file and at least one index is ready. The + * race between the read and write of the total_index_received is + * acceptable here since the client will be notified to simply come back + * and get the next index. + */ + if (rstream->total_index_received <= 0) { + ret = -ENOENT; + goto end; + } + ret = index_open(vstream->path_name, vstream->channel_name, + vstream->tracefile_count, vstream->tracefile_count_current); + if (ret >= 0) { + vstream->index_read_fd = ret; + ret = 0; + goto end; + } + +end: + return ret; +} + +/* + * Check the status of the index for the given stream. This function updates + * the index structure if needed and can destroy the vstream also for the HUP + * situation. + * + * Return 0 means that we can proceed with the index. A value of 1 means that + * the index has been updated and is ready to be send to the client. A negative + * value indicates an error that can't be handled. + */ +static int check_index_status(struct relay_viewer_stream *vstream, + struct relay_stream *rstream, struct ctf_trace *trace, + struct lttng_viewer_index *index) +{ + int ret; + + assert(vstream); + assert(rstream); + assert(index); + assert(trace); + + if (!rstream->close_flag) { + /* Rotate on abort (overwrite). */ + if (vstream->abort_flag) { + DBG("Viewer stream %" PRIu64 " rotate because of overwrite", + vstream->stream_handle); + ret = viewer_stream_rotate(vstream, rstream); + if (ret < 0) { + goto error; + } else if (ret == 1) { + /* EOF */ + index->status = htobe32(LTTNG_VIEWER_INDEX_HUP); + goto hup; + } + /* ret == 0 means successful so we continue. */ + } + + /* Check if we are in the same trace file at this point. */ + if (rstream->tracefile_count_current == vstream->tracefile_count_current) { + if (rstream->beacon_ts_end != -1ULL && + vstream->last_sent_index == rstream->total_index_received) { + /* + * We've received a synchronization beacon and the last index + * available has been sent, the index for now is inactive. + */ + index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE); + index->timestamp_end = htobe64(rstream->beacon_ts_end); + index->stream_id = htobe64(rstream->ctf_stream_id); + goto index_ready; + } else if (rstream->total_index_received <= vstream->last_sent_index + && !vstream->close_write_flag) { + /* + * Reader and writer are working in the same tracefile, so we care + * about the number of index received and sent. Otherwise, we read + * up to EOF. + */ + index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY); + goto index_ready; + } + } + /* Nothing to do with the index, continue with it. */ + ret = 0; + } else if (rstream->close_flag && vstream->close_write_flag && + vstream->total_index_received == vstream->last_sent_index) { + /* Last index sent and current tracefile closed in write */ + index->status = htobe32(LTTNG_VIEWER_INDEX_HUP); + goto hup; + } else { + vstream->close_write_flag = 1; + ret = 0; + } + +error: + return ret; + +hup: + viewer_stream_delete(vstream); + viewer_stream_destroy(trace, vstream); +index_ready: + return 1; +} + /* * Send the next index for a stream. * @@ -1122,6 +1229,7 @@ static int viewer_get_next_index(struct relay_connection *conn) { int ret; + ssize_t read_ret; struct lttng_viewer_get_next_index request_index; struct lttng_viewer_index viewer_index; struct ctf_packet_index packet_index; @@ -1168,75 +1276,39 @@ int viewer_get_next_index(struct relay_connection *conn) goto send_reply; } - /* First time, we open the index file */ - if (vstream->index_read_fd < 0) { - ret = index_open(vstream->path_name, vstream->channel_name, - vstream->tracefile_count, vstream->tracefile_count_current); + rstream = stream_find_by_id(relay_streams_ht, vstream->stream_handle); + assert(rstream); + + /* Try to open an index if one is needed for that stream. */ + ret = try_open_index(vstream, rstream); + if (ret < 0) { if (ret == -ENOENT) { /* * The index is created only when the first data packet arrives, it * might not be ready at the beginning of the session */ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY); - goto send_reply; - } else if (ret < 0) { + } else { + /* Unhandled error. */ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR); - goto send_reply; } - vstream->index_read_fd = ret; + goto send_reply; } - rstream = stream_find_by_id(relay_streams_ht, vstream->stream_handle); - assert(rstream); - - if (!rstream->close_flag) { - if (vstream->abort_flag) { - /* Rotate on abort (overwrite). */ - DBG("Viewer rotate because of overwrite"); - ret = viewer_stream_rotate(vstream, rstream); - if (ret < 0) { - goto end_unlock; - } else if (ret == 1) { - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP); - viewer_stream_delete(vstream); - viewer_stream_destroy(ctf_trace, vstream); - goto send_reply; - } - /* ret == 0 means successful so we continue. */ - } - - pthread_mutex_lock(&rstream->viewer_stream_rotation_lock); - if (rstream->tracefile_count_current == vstream->tracefile_count_current) { - if (rstream->beacon_ts_end != -1ULL && - vstream->last_sent_index == rstream->total_index_received) { - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE); - viewer_index.timestamp_end = htobe64(rstream->beacon_ts_end); - pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); - goto send_reply; - } else if (rstream->total_index_received <= vstream->last_sent_index - && !vstream->close_write_flag) { - /* - * Reader and writer are working in the same tracefile, so we care - * about the number of index received and sent. Otherwise, we read - * up to EOF. - */ - pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); - /* No new index to send, retry later. */ - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY); - goto send_reply; - } - } - pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); - } else if (rstream->close_flag && vstream->close_write_flag && - vstream->total_index_received == vstream->last_sent_index) { - /* Last index sent and current tracefile closed in write */ - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP); - viewer_stream_delete(vstream); - viewer_stream_destroy(ctf_trace, vstream); + pthread_mutex_lock(&rstream->viewer_stream_rotation_lock); + ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index); + pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); + if (ret < 0) { + goto end_unlock; + } else if (ret == 1) { + /* + * This means the viewer index data structure has been populated by the + * check call thus we now send back the reply to the client. + */ goto send_reply; - } else { - vstream->close_write_flag = 1; } + /* At this point, ret MUST be 0 thus we continue with the get. */ + assert(!ret); if (!ctf_trace->metadata_received || ctf_trace->metadata_received > ctf_trace->metadata_sent) { @@ -1250,48 +1322,53 @@ int viewer_get_next_index(struct relay_connection *conn) viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM; } + pthread_mutex_lock(&rstream->viewer_stream_rotation_lock); pthread_mutex_lock(&vstream->overwrite_lock); if (vstream->abort_flag) { - /* - * The file is being overwritten by the writer, we cannot * use it. - */ - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY); + /* The file is being overwritten by the writer, we cannot use it. */ pthread_mutex_unlock(&vstream->overwrite_lock); ret = viewer_stream_rotate(vstream, rstream); + pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); if (ret < 0) { goto end_unlock; } else if (ret == 1) { viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP); viewer_stream_delete(vstream); viewer_stream_destroy(ctf_trace, vstream); - goto send_reply; + } else { + viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY); } goto send_reply; } - ret = lttng_read(vstream->index_read_fd, &packet_index, + read_ret = lttng_read(vstream->index_read_fd, &packet_index, sizeof(packet_index)); pthread_mutex_unlock(&vstream->overwrite_lock); - if (ret < sizeof(packet_index)) { - /* - * The tracefile is closed in write, so we read up to EOF. - */ - if (vstream->close_write_flag == 1) { - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY); - /* Rotate on normal EOF */ + pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); + if (read_ret < 0) { + viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP); + viewer_stream_delete(vstream); + viewer_stream_destroy(ctf_trace, vstream); + goto send_reply; + } else if (read_ret < sizeof(packet_index)) { + pthread_mutex_lock(&rstream->viewer_stream_rotation_lock); + if (vstream->close_write_flag) { ret = viewer_stream_rotate(vstream, rstream); if (ret < 0) { + pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); goto end_unlock; } else if (ret == 1) { viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP); viewer_stream_delete(vstream); viewer_stream_destroy(ctf_trace, vstream); - goto send_reply; + } else { + viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY); } } else { - PERROR("Relay reading index file %d", vstream->index_read_fd); + ERR("Relay reading index file %d", vstream->index_read_fd); viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR); } + pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); goto send_reply; } else { viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_OK); @@ -1526,6 +1603,8 @@ int viewer_get_metadata(struct relay_connection *conn) } health_code_update(); + memset(&reply, 0, sizeof(reply)); + rcu_read_lock(); stream = viewer_stream_find_by_id(be64toh(request.stream_id)); if (!stream || !stream->metadata_flag) { @@ -1626,6 +1705,7 @@ int viewer_create_session(struct relay_connection *conn) DBG("Viewer create session received"); + memset(&resp, 0, sizeof(resp)); resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_OK); conn->viewer_session = zmalloc(sizeof(*conn->viewer_session)); if (!conn->viewer_session) { @@ -1657,6 +1737,7 @@ void live_relay_unknown_command(struct relay_connection *conn) { struct lttcomm_relayd_generic_reply reply; + memset(&reply, 0, sizeof(reply)); reply.ret_code = htobe32(LTTNG_ERR_UNK); (void) send_response(conn->sock, &reply, sizeof(reply)); } @@ -1846,6 +1927,11 @@ restart: health_code_update(); + if (!revents) { + /* No activity for this FD (poll implementation). */ + continue; + } + /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { @@ -1932,7 +2018,9 @@ error_testpoint: ERR("Health error occurred in %s", __func__); } health_unregister(health_relayd); - stop_threads(); + if (lttng_relay_stop_threads()) { + ERR("Error stopping threads"); + } rcu_unregister_thread(); return NULL; } @@ -1943,55 +2031,54 @@ error_testpoint: */ static int create_conn_pipe(void) { - int ret; - - ret = utils_create_pipe_cloexec(live_conn_pipe); - - return ret; + return utils_create_pipe_cloexec(live_conn_pipe); } -void live_stop_threads(void) +int relayd_live_join(void) { - int ret; + int ret, retval = 0; void *status; - stop_threads(); - ret = pthread_join(live_listener_thread, &status); - if (ret != 0) { + if (ret) { + errno = ret; PERROR("pthread_join live listener"); - goto error; /* join error, exit without cleanup */ + retval = -1; } ret = pthread_join(live_worker_thread, &status); - if (ret != 0) { + if (ret) { + errno = ret; PERROR("pthread_join live worker"); - goto error; /* join error, exit without cleanup */ + retval = -1; } ret = pthread_join(live_dispatcher_thread, &status); - if (ret != 0) { + if (ret) { + errno = ret; PERROR("pthread_join live dispatcher"); - goto error; /* join error, exit without cleanup */ + retval = -1; } - cleanup(); + cleanup_relayd_live(); -error: - return; + return retval; } /* * main */ -int live_start_threads(struct lttng_uri *uri, +int relayd_live_create(struct lttng_uri *uri, struct relay_local_data *relay_ctx) { - int ret = 0; + int ret = 0, retval = 0; void *status; int is_root; - assert(uri); + if (!uri) { + retval = -1; + goto exit_init_data; + } live_uri = uri; /* Check if daemon is UID = 0 */ @@ -2000,74 +2087,86 @@ int live_start_threads(struct lttng_uri *uri, if (!is_root) { if (live_uri->port < 1024) { ERR("Need to be root to use ports < 1024"); - ret = -1; - goto exit; + retval = -1; + goto exit_init_data; } } /* Setup the thread apps communication pipe. */ - if ((ret = create_conn_pipe()) < 0) { - goto exit; + if (create_conn_pipe()) { + retval = -1; + goto exit_init_data; } /* Init relay command queue. */ - cds_wfq_init(&viewer_conn_queue.queue); + cds_wfcq_init(&viewer_conn_queue.head, &viewer_conn_queue.tail); /* Set up max poll set size */ - lttng_poll_set_max_size(); + if (lttng_poll_set_max_size()) { + retval = -1; + goto exit_init_data; + } /* Setup the dispatcher thread */ ret = pthread_create(&live_dispatcher_thread, NULL, thread_dispatcher, (void *) NULL); - if (ret != 0) { + if (ret) { + errno = ret; PERROR("pthread_create viewer dispatcher"); - goto exit_dispatcher; + retval = -1; + goto exit_dispatcher_thread; } /* Setup the worker thread */ ret = pthread_create(&live_worker_thread, NULL, thread_worker, relay_ctx); - if (ret != 0) { + if (ret) { + errno = ret; PERROR("pthread_create viewer worker"); - goto exit_worker; + retval = -1; + goto exit_worker_thread; } /* Setup the listener thread */ ret = pthread_create(&live_listener_thread, NULL, thread_listener, (void *) NULL); - if (ret != 0) { + if (ret) { + errno = ret; PERROR("pthread_create viewer listener"); - goto exit_listener; + retval = -1; + goto exit_listener_thread; } - ret = 0; - goto end; + /* + * All OK, started all threads. + */ + return retval; -exit_listener: - ret = pthread_join(live_listener_thread, &status); - if (ret != 0) { - PERROR("pthread_join live listener"); - goto error; /* join error, exit without cleanup */ - } + /* + * Join on the live_listener_thread should anything be added after + * the live_listener thread's creation. + */ + +exit_listener_thread: -exit_worker: ret = pthread_join(live_worker_thread, &status); - if (ret != 0) { + if (ret) { + errno = ret; PERROR("pthread_join live worker"); - goto error; /* join error, exit without cleanup */ + retval = -1; } +exit_worker_thread: -exit_dispatcher: ret = pthread_join(live_dispatcher_thread, &status); - if (ret != 0) { + if (ret) { + errno = ret; PERROR("pthread_join live dispatcher"); - goto error; /* join error, exit without cleanup */ + retval = -1; } +exit_dispatcher_thread: -exit: - cleanup(); +exit_init_data: + cleanup_relayd_live(); -end: -error: - return ret; + return retval; }