struct relay_stream *stream_create(struct ctf_trace *trace,
uint64_t stream_handle, char *path_name,
char *channel_name, uint64_t tracefile_size,
- uint64_t tracefile_count)
+ uint64_t tracefile_count,
+ const struct relay_stream_chunk_id *chunk_id)
{
int ret;
struct relay_stream *stream = NULL;
}
stream->stream_handle = stream_handle;
- stream->prev_seq = -1ULL;
+ stream->prev_data_seq = -1ULL;
+ stream->prev_index_seq = -1ULL;
stream->last_net_seq_num = -1ULL;
stream->ctf_stream_id = -1ULL;
stream->tracefile_size = tracefile_size;
stream->tracefile_count = tracefile_count;
stream->path_name = path_name;
+ stream->prev_path_name = NULL;
stream->channel_name = channel_name;
stream->rotate_at_seq_num = -1ULL;
+ stream->beacon_ts_end = -1ULL;
lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
pthread_mutex_init(&stream->lock, NULL);
urcu_ref_init(&stream->ref);
ctf_trace_get(trace);
stream->trace = trace;
+ stream->current_chunk_id = *chunk_id;
stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!stream->indexes_ht) {
tracefile_array_destroy(stream->tfa);
}
free(stream->path_name);
+ free(stream->prev_path_name);
free(stream->channel_name);
free(stream);
}
* a packet. Since those are sent in that order, we take
* care of consumerd crashes.
*/
+ DBG("relay_index_close_partial_fd");
relay_index_close_partial_fd(stream);
/*
* Use the highest net_seq_num we currently have pending
* at -1ULL if we cannot find any index.
*/
stream->last_net_seq_num = relay_index_find_last(stream);
+ DBG("Updating stream->last_net_seq_num to %" PRIu64, stream->last_net_seq_num);
/* Fall-through into the next check. */
}
if (stream->last_net_seq_num != -1ULL &&
- ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0
+ ((int64_t) (stream->prev_data_seq - stream->last_net_seq_num)) < 0
&& !session_aborted) {
/*
* Don't close since we still have data pending. This