From 878c34cf4708a7b7d52d4e2ea1bdda853ba6a790 Mon Sep 17 00:00:00 2001 From: David Goulet Date: Thu, 13 Feb 2014 13:58:19 -0500 Subject: [PATCH] Fix: simplify get next index in live This fixes some case of unitialized status, use of bad ret value and help modularize the get next index function. Signed-off-by: Julien Desfossez Signed-off-by: David Goulet --- src/bin/lttng-relayd/live.c | 237 ++++++++++++++++++++++++------------ 1 file changed, 159 insertions(+), 78 deletions(-) diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index 06c1c1513..d29804cd6 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -1113,6 +1113,128 @@ error: return ret; } +/* + * Open the index file if needed for the given vstream. + * + * If an index file is successfully opened, the index_read_fd of the stream is + * set with it. + * + * Return 0 on success, a negative value on error (-ENOENT if not ready yet). + */ +static int try_open_index(struct relay_viewer_stream *vstream, + struct relay_stream *rstream) +{ + int ret = 0; + + assert(vstream); + assert(rstream); + + if (vstream->index_read_fd >= 0) { + goto end; + } + + /* + * First time, we open the index file and at least one index is ready. The + * race between the read and write of the total_index_received is + * acceptable here since the client will be notified to simply come back + * and get the next index. + */ + if (rstream->total_index_received <= 0) { + ret = -ENOENT; + goto end; + } + ret = index_open(vstream->path_name, vstream->channel_name, + vstream->tracefile_count, vstream->tracefile_count_current); + if (ret >= 0) { + vstream->index_read_fd = ret; + ret = 0; + goto end; + } + +end: + return ret; +} + +/* + * Check the status of the index for the given stream. This function updates + * the index structure if needed and can destroy the vstream also for the HUP + * situation. + * + * Return 0 means that we can proceed with the index. A value of 1 means that + * the index has been updated and is ready to be send to the client. A negative + * value indicates an error that can't be handled. + */ +static int check_index_status(struct relay_viewer_stream *vstream, + struct relay_stream *rstream, struct ctf_trace *trace, + struct lttng_viewer_index *index) +{ + int ret; + + assert(vstream); + assert(rstream); + assert(index); + assert(trace); + + if (!rstream->close_flag) { + /* Rotate on abort (overwrite). */ + if (vstream->abort_flag) { + DBG("Viewer stream %" PRIu64 " rotate because of overwrite", + vstream->stream_handle); + ret = viewer_stream_rotate(vstream, rstream); + if (ret < 0) { + goto error; + } else if (ret == 1) { + /* EOF */ + index->status = htobe32(LTTNG_VIEWER_INDEX_HUP); + goto hup; + } + /* ret == 0 means successful so we continue. */ + } + + /* Check if we are in the same trace file at this point. */ + if (rstream->tracefile_count_current == vstream->tracefile_count_current) { + if (rstream->beacon_ts_end != -1ULL && + vstream->last_sent_index == rstream->total_index_received) { + /* + * We've received a synchronization beacon and the last index + * available has been sent, the index for now is inactive. + */ + index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE); + index->timestamp_end = htobe64(rstream->beacon_ts_end); + goto index_ready; + } else if (rstream->total_index_received <= vstream->last_sent_index + && !vstream->close_write_flag) { + /* + * Reader and writer are working in the same tracefile, so we care + * about the number of index received and sent. Otherwise, we read + * up to EOF. + */ + index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY); + goto index_ready; + } + } + /* Nothing to do with the index, continue with it. */ + ret = 0; + } else if (rstream->close_flag && vstream->close_write_flag && + vstream->total_index_received == vstream->last_sent_index) { + /* Last index sent and current tracefile closed in write */ + index->status = htobe32(LTTNG_VIEWER_INDEX_HUP); + goto hup; + } else { + vstream->close_write_flag = 1; + ret = 0; + } + +error: + return ret; + +hup: + viewer_stream_delete(vstream); + viewer_stream_destroy(trace, vstream); +index_ready: + return 1; +} + /* * Send the next index for a stream. * @@ -1122,6 +1244,7 @@ static int viewer_get_next_index(struct relay_connection *conn) { int ret; + ssize_t read_ret; struct lttng_viewer_get_next_index request_index; struct lttng_viewer_index viewer_index; struct ctf_packet_index packet_index; @@ -1168,78 +1291,39 @@ int viewer_get_next_index(struct relay_connection *conn) goto send_reply; } - /* First time, we open the index file */ - if (vstream->index_read_fd < 0) { - ret = index_open(vstream->path_name, vstream->channel_name, - vstream->tracefile_count, vstream->tracefile_count_current); + rstream = stream_find_by_id(relay_streams_ht, vstream->stream_handle); + assert(rstream); + + /* Try to open an index if one is needed for that stream. */ + ret = try_open_index(vstream, rstream); + if (ret < 0) { if (ret == -ENOENT) { /* * The index is created only when the first data packet arrives, it * might not be ready at the beginning of the session */ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY); - goto send_reply; - } else if (ret < 0) { + } else { + /* Unhandled error. */ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR); - goto send_reply; } - vstream->index_read_fd = ret; + goto send_reply; } - rstream = stream_find_by_id(relay_streams_ht, vstream->stream_handle); - assert(rstream); - pthread_mutex_lock(&rstream->viewer_stream_rotation_lock); - if (!rstream->close_flag) { - if (vstream->abort_flag) { - /* Rotate on abort (overwrite). */ - DBG("Viewer rotate because of overwrite"); - ret = viewer_stream_rotate(vstream, rstream); - if (ret < 0) { - pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); - goto end_unlock; - } else if (ret == 1) { - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP); - viewer_stream_delete(vstream); - viewer_stream_destroy(ctf_trace, vstream); - pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); - goto send_reply; - } - /* ret == 0 means successful so we continue. */ - } - - if (rstream->tracefile_count_current == vstream->tracefile_count_current) { - if (rstream->beacon_ts_end != -1ULL && - vstream->last_sent_index == rstream->total_index_received) { - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE); - viewer_index.timestamp_end = htobe64(rstream->beacon_ts_end); - pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); - goto send_reply; - } else if (rstream->total_index_received <= vstream->last_sent_index - && !vstream->close_write_flag) { - /* - * Reader and writer are working in the same tracefile, so we care - * about the number of index received and sent. Otherwise, we read - * up to EOF. - */ - pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); - /* No new index to send, retry later. */ - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY); - goto send_reply; - } - } - } else if (rstream->close_flag && vstream->close_write_flag && - vstream->total_index_received == vstream->last_sent_index) { - /* Last index sent and current tracefile closed in write */ - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP); - viewer_stream_delete(vstream); - viewer_stream_destroy(ctf_trace, vstream); - pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); + ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index); + pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); + if (ret < 0) { + goto end; + } else if (ret == 1) { + /* + * This means the viewer index data structure has been populated by the + * check call thus we now send back the reply to the client. + */ goto send_reply; - } else { - vstream->close_write_flag = 1; } - pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); + /* At this point, ret MUST be 0 thus we continue with the get. */ + assert(!ret); if (!ctf_trace->metadata_received || ctf_trace->metadata_received > ctf_trace->metadata_sent) { @@ -1255,10 +1339,7 @@ int viewer_get_next_index(struct relay_connection *conn) pthread_mutex_lock(&vstream->overwrite_lock); if (vstream->abort_flag) { - /* - * The file is being overwritten by the writer, we cannot * use it. - */ - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY); + /* The file is being overwritten by the writer, we cannot use it. */ pthread_mutex_unlock(&vstream->overwrite_lock); ret = viewer_stream_rotate(vstream, rstream); if (ret < 0) { @@ -1267,39 +1348,39 @@ int viewer_get_next_index(struct relay_connection *conn) viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP); viewer_stream_delete(vstream); viewer_stream_destroy(ctf_trace, vstream); - goto send_reply; + } else { + viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY); } goto send_reply; } - ret = lttng_read(vstream->index_read_fd, &packet_index, + read_ret = lttng_read(vstream->index_read_fd, &packet_index, sizeof(packet_index)); pthread_mutex_unlock(&vstream->overwrite_lock); - if (ret < sizeof(packet_index)) { - unsigned int close_write_flag; - + if (read_ret < 0) { + viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP); + viewer_stream_delete(vstream); + viewer_stream_destroy(ctf_trace, vstream); + goto send_reply; + } else if (read_ret < sizeof(packet_index)) { pthread_mutex_lock(&rstream->viewer_stream_rotation_lock); - close_write_flag = vstream->close_write_flag; - pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); - /* - * The tracefile is closed in write, so we read up to EOF. - */ - if (close_write_flag == 1) { - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY); - /* Rotate on normal EOF */ + if (vstream->close_write_flag) { ret = viewer_stream_rotate(vstream, rstream); if (ret < 0) { + pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); goto end_unlock; } else if (ret == 1) { viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP); viewer_stream_delete(vstream); viewer_stream_destroy(ctf_trace, vstream); - goto send_reply; + } else { + viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY); } } else { - PERROR("Relay reading index file %d", vstream->index_read_fd); + ERR("Relay reading index file %d", vstream->index_read_fd); viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR); } + pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); goto send_reply; } else { viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_OK); -- 2.34.1