X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.c;h=cdbb5b12bdb873de53d56b617c25f0f95a9bf01b;hb=8bd6c0f2181a1d7049d79b3c001f2bc64c76ae3a;hp=480c459ce6a49724f54d1b6ef71c0b5a6d458a1b;hpb=cef0f7d51b8025d3ba04e6496242c1cca1641aa6;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index 480c459ce..cdbb5b12b 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -614,7 +614,7 @@ static int open_index(struct relay_viewer_stream *stream) { int ret; char fullpath[PATH_MAX]; - struct lttng_packet_index_file_hdr hdr; + struct ctf_packet_index_file_hdr hdr; if (stream->tracefile_count > 0) { ret = snprintf(fullpath, sizeof(fullpath), "%s/" DEFAULT_INDEX_DIR "/%s_%" @@ -649,13 +649,13 @@ static int open_index(struct relay_viewer_stream *stream) PERROR("Reading index header"); goto error; } - if (strncmp(hdr.magic, INDEX_MAGIC, sizeof(hdr.magic)) != 0) { + if (be32toh(hdr.magic) != CTF_INDEX_MAGIC) { ERR("Invalid header magic"); ret = -1; goto error; } - if (be32toh(hdr.index_major) != INDEX_MAJOR || - be32toh(hdr.index_minor) != INDEX_MINOR) { + if (be32toh(hdr.index_major) != CTF_INDEX_MAJOR || + be32toh(hdr.index_minor) != CTF_INDEX_MINOR) { ERR("Invalid header version"); ret = -1; goto error; @@ -698,6 +698,7 @@ int init_viewer_stream(struct relay_stream *stream, int seek_last) LTTNG_VIEWER_NAME_MAX); viewer_stream->tracefile_count = stream->tracefile_count; viewer_stream->metadata_flag = stream->metadata_flag; + viewer_stream->tracefile_count_last = -1ULL; if (seek_last) { viewer_stream->tracefile_count_current = stream->tracefile_count_current; @@ -746,7 +747,7 @@ int init_viewer_stream(struct relay_stream *stream, int seek_last) if (seek_last && viewer_stream->index_read_fd > 0) { ret = lseek(viewer_stream->index_read_fd, viewer_stream->total_index_received * - sizeof(struct lttng_packet_index), + sizeof(struct ctf_packet_index), SEEK_CUR); if (ret < 0) { goto error; @@ -764,7 +765,7 @@ error: /* * Rotate a stream to the next tracefile. * - * Returns 0 on success, a negative value on error. + * Returns 0 on success, 1 on EOF, a negative value on error. */ static int rotate_viewer_stream(struct relay_viewer_stream *viewer_stream, @@ -777,6 +778,15 @@ int rotate_viewer_stream(struct relay_viewer_stream *viewer_stream, tracefile_id = (viewer_stream->tracefile_count_current + 1) % viewer_stream->tracefile_count; + /* + * Detect the last tracefile to open. + */ + if (viewer_stream->tracefile_count_last != -1ULL && + viewer_stream->tracefile_count_last == + viewer_stream->tracefile_count_current) { + ret = 1; + goto end; + } if (stream) { pthread_mutex_lock(&stream->viewer_stream_rotation_lock); @@ -802,26 +812,22 @@ int rotate_viewer_stream(struct relay_viewer_stream *viewer_stream, } viewer_stream->tracefile_count_current = tracefile_id; - if (viewer_stream->abort_flag == 0) { - if (viewer_stream->index_read_fd > 0) { - ret = close(viewer_stream->index_read_fd); - if (ret < 0) { - PERROR("close index file %d", - viewer_stream->index_read_fd); - } - viewer_stream->index_read_fd = -1; - } - if (viewer_stream->read_fd > 0) { - ret = close(viewer_stream->read_fd); - if (ret < 0) { - PERROR("close tracefile %d", - viewer_stream->read_fd); - } - viewer_stream->read_fd = -1; - } - } else { - viewer_stream->abort_flag = 0; + ret = close(viewer_stream->index_read_fd); + if (ret < 0) { + PERROR("close index file %d", + viewer_stream->index_read_fd); } + viewer_stream->index_read_fd = -1; + ret = close(viewer_stream->read_fd); + if (ret < 0) { + PERROR("close tracefile %d", + viewer_stream->read_fd); + } + viewer_stream->read_fd = -1; + + pthread_mutex_lock(&viewer_stream->overwrite_lock); + viewer_stream->abort_flag = 0; + pthread_mutex_unlock(&viewer_stream->overwrite_lock); viewer_stream->index_read_fd = -1; viewer_stream->read_fd = -1; @@ -836,6 +842,7 @@ int rotate_viewer_stream(struct relay_viewer_stream *viewer_stream, ret = 0; +end: error: return ret; } @@ -1068,7 +1075,7 @@ int viewer_get_next_index(struct relay_command *cmd, int ret; struct lttng_viewer_get_next_index request_index; struct lttng_viewer_index viewer_index; - struct lttng_packet_index packet_index; + struct ctf_packet_index packet_index; struct relay_viewer_stream *vstream; struct relay_stream *rstream; @@ -1134,6 +1141,9 @@ int viewer_get_next_index(struct relay_command *cmd, ret = rotate_viewer_stream(vstream, rstream); if (ret < 0) { goto end_unlock; + } else if (ret == 1) { + viewer_index.status = htobe32(VIEWER_INDEX_HUP); + goto send_reply; } } pthread_mutex_lock(&rstream->viewer_stream_rotation_lock); @@ -1184,6 +1194,9 @@ int viewer_get_next_index(struct relay_command *cmd, ret = rotate_viewer_stream(vstream, rstream); if (ret < 0) { goto end_unlock; + } else if (ret == 1) { + viewer_index.status = htobe32(VIEWER_INDEX_HUP); + goto send_reply; } goto send_reply; } @@ -1200,6 +1213,9 @@ int viewer_get_next_index(struct relay_command *cmd, ret = rotate_viewer_stream(vstream, rstream); if (ret < 0) { goto end_unlock; + } else if (ret == 1) { + viewer_index.status = htobe32(VIEWER_INDEX_HUP); + goto send_reply; } } else { PERROR("Relay reading index file %d",