#include "session.h"
#include "stream.h"
#include "connection.h"
+#include "tracefile-array.h"
/* command line options */
char *opt_output_path;
ret = -1;
goto end;
}
+
+ /*
+ * Set last_net_seq_num before the close flag. Required by data
+ * pending check.
+ */
pthread_mutex_lock(&stream->lock);
- stream->closed = true;
stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
+ pthread_mutex_unlock(&stream->lock);
+
+ /*
+ * This is one of the conditions which may trigger a stream close
+ * with the others being:
+ * 1) A close command is received for a stream
+ * 2) The control connection owning the stream is closed
+ * 3) We have received all of the stream's data _after_ a close
+ * request.
+ */
+ try_stream_close(stream);
if (stream->is_metadata) {
struct relay_viewer_stream *vstream;
viewer_stream_put(vstream);
}
}
- pthread_mutex_unlock(&stream->lock);
stream_put(stream);
end:
static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn)
{
- int ret = htobe32(LTTNG_OK);
+ int ret = 0;
ssize_t size_ret;
struct relay_session *session = conn->session;
struct lttcomm_relayd_metadata_payload *metadata_struct;
}
memset(data_buffer, 0, data_size);
DBG2("Relay receiving metadata, waiting for %" PRIu64 " bytes", data_size);
- ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0);
- if (ret < 0 || ret != data_size) {
- if (ret == 0) {
+ size_ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0);
+ if (size_ret < 0 || size_ret != data_size) {
+ if (size_ret == 0) {
/* Orderly shutdown. Not necessary to print an error. */
DBG("Socket %d did an orderly shutdown", conn->sock->fd);
} else {
goto end_put;
}
- ret = write_padding_to_file(metadata_stream->stream_fd->fd,
+ size_ret = write_padding_to_file(metadata_stream->stream_fd->fd,
be32toh(metadata_struct->padding_size));
- if (ret < 0) {
+ if (size_ret < 0) {
goto end_put;
}
end_put:
pthread_mutex_unlock(&metadata_stream->lock);
stream_put(metadata_stream);
-
end:
return ret;
}
* Only flag a stream inactive when it has already
* received data and no indexes are in flight.
*/
- if (stream->total_index_received > 0
+ if (stream->index_received_seqcount > 0
&& stream->indexes_in_flight == 0) {
stream->beacon_ts_end =
be64toh(index_info.timestamp_end);
}
ret = relay_index_try_flush(index);
if (ret == 0) {
- stream->total_index_received++;
+ tracefile_array_commit_seq(stream->tfa);
+ stream->index_received_seqcount++;
} else if (ret > 0) {
/* no flush. */
ret = 0;
fd = index_create_file(stream->path_name, stream->channel_name,
-1, -1, stream->tracefile_size,
- stream->current_tracefile_id);
+ tracefile_array_get_file_index_head(stream->tfa));
if (fd < 0) {
ret = -1;
/* Put self-ref for this index due to error. */
ret = relay_index_try_flush(index);
if (ret == 0) {
- stream->total_index_received++;
+ tracefile_array_commit_seq(stream->tfa);
+ stream->index_received_seqcount++;
} else if (ret > 0) {
/* No flush. */
ret = 0;
uint64_t net_seq_num;
uint32_t data_size;
struct relay_session *session;
- bool new_stream = false;
+ bool new_stream = false, close_requested = false;
ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr,
sizeof(struct lttcomm_relayd_data_hdr), 0);
if (stream->tracefile_size > 0 &&
(stream->tracefile_size_current + data_size) >
stream->tracefile_size) {
- uint64_t new_id;
+ uint64_t old_id, new_id;
+
+ old_id = tracefile_array_get_file_index_head(stream->tfa);
+ tracefile_array_file_rotate(stream->tfa);
+
+ /* new_id is updated by utils_rotate_stream_file. */
+ new_id = old_id;
- new_id = (stream->current_tracefile_id + 1) %
- stream->tracefile_count;
- /*
- * Move viewer oldest available data position forward if
- * we are overwriting a tracefile.
- */
- if (new_id == stream->oldest_tracefile_id) {
- stream->oldest_tracefile_id =
- (stream->oldest_tracefile_id + 1) %
- stream->tracefile_count;
- }
ret = utils_rotate_stream_file(stream->path_name,
stream->channel_name, stream->tracefile_size,
stream->tracefile_count, -1,
-1, stream->stream_fd->fd,
- &stream->current_tracefile_id,
- &stream->stream_fd->fd);
+ &new_id, &stream->stream_fd->fd);
if (ret < 0) {
ERR("Rotating stream output file");
goto end_stream_unlock;
}
- stream->current_tracefile_seq++;
- if (stream->current_tracefile_seq
- - stream->oldest_tracefile_seq >=
- stream->tracefile_count) {
- stream->oldest_tracefile_seq++;
- }
/*
* Reset current size because we just performed a stream
* rotation.
stream->prev_seq = net_seq_num;
end_stream_unlock:
+ close_requested = stream->close_requested;
pthread_mutex_unlock(&stream->lock);
+ if (close_requested) {
+ try_stream_close(stream);
+ }
+
if (new_stream) {
pthread_mutex_lock(&session->lock);
uatomic_set(&session->new_streams, 1);