}
struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
- enum lttng_viewer_seek seek_t)
+ enum lttng_viewer_seek seek_t, struct ctf_trace *ctf_trace)
{
struct relay_viewer_stream *vstream;
assert(stream);
+ assert(ctf_trace);
vstream = zmalloc(sizeof(*vstream));
if (!vstream) {
goto error;
}
- vstream->session_id = stream->session->id;
+ vstream->session_id = stream->session_id;
vstream->stream_handle = stream->stream_handle;
vstream->path_name = strndup(stream->path_name, LTTNG_VIEWER_PATH_MAX);
vstream->channel_name = strndup(stream->channel_name,
vstream->tracefile_count_last = -1ULL;
switch (seek_t) {
- case VIEWER_SEEK_BEGINNING:
+ case LTTNG_VIEWER_SEEK_BEGINNING:
vstream->tracefile_count_current = stream->oldest_tracefile_id;
break;
- case VIEWER_SEEK_LAST:
+ case LTTNG_VIEWER_SEEK_LAST:
vstream->tracefile_count_current = stream->tracefile_count_current;
break;
default:
goto error;
}
- vstream->ctf_trace = stream->ctf_trace;
if (vstream->metadata_flag) {
- vstream->ctf_trace->viewer_metadata_stream = vstream;
+ ctf_trace->viewer_metadata_stream = vstream;
}
- uatomic_inc(&vstream->ctf_trace->refcount);
/* Globally visible after the add unique. */
lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle);
vstream->index_read_fd = read_fd;
}
- if (seek_t == VIEWER_SEEK_LAST && vstream->index_read_fd >= 0) {
+ if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_read_fd >= 0) {
off_t lseek_ret;
lseek_ret = lseek(vstream->index_read_fd,
assert(!ret);
}
-void viewer_stream_destroy(struct relay_viewer_stream *stream)
+void viewer_stream_destroy(struct ctf_trace *ctf_trace,
+ struct relay_viewer_stream *stream)
{
int ret;
- unsigned long ret_ref;
assert(stream);
- ret_ref = uatomic_add_return(&stream->ctf_trace->refcount, -1);
- assert(ret_ref >= 0);
+
+ if (ctf_trace) {
+ ctf_trace_put_ref(ctf_trace);
+ }
if (stream->read_fd >= 0) {
ret = close(stream->read_fd);
}
}
- /*
- * If the only stream left in the HT is the metadata stream,
- * we need to remove it because we won't detect a EOF for this
- * stream.
- */
- if (ret_ref == 1 && stream->ctf_trace->viewer_metadata_stream) {
- viewer_stream_delete(stream->ctf_trace->viewer_metadata_stream);
- viewer_stream_destroy(stream->ctf_trace->viewer_metadata_stream);
- stream->ctf_trace->metadata_stream = NULL;
- DBG("Freeing ctf_trace %" PRIu64, stream->ctf_trace->id);
- /*
- * The streaming-side is already closed and we can't receive a new
- * stream concurrently at this point (since the session is being
- * destroyed), so when we detect the refcount equals 0, we are the
- * only owners of the ctf_trace and we can free it ourself.
- */
- free(stream->ctf_trace);
- }
-
call_rcu(&stream->rcu_node, deferred_free_viewer_stream);
}
uint64_t tracefile_id;
assert(vstream);
+ assert(stream);
+
+ if (vstream->tracefile_count == 0) {
+ /* Ignore rotation, there is none to do. */
+ ret = 0;
+ goto end;
+ }
tracefile_id = (vstream->tracefile_count_current + 1) %
vstream->tracefile_count;
}
/*
- * If the stream on the streaming side still exists, lock to execute
- * rotation in order to avoid races between a modification on the index
- * values.
+ * Lock to execute rotation in order to avoid races between a modification
+ * on the index values.
*/
- if (stream) {
- pthread_mutex_lock(&stream->viewer_stream_rotation_lock);
- }
+ pthread_mutex_lock(&stream->viewer_stream_rotation_lock);
/*
* The writer and the reader are not working in the same tracefile, we can
* read up to EOF, we don't care about the total_index_received.
*/
- if (!stream || (stream->tracefile_count_current != tracefile_id)) {
+ if (stream->close_flag || (stream->tracefile_count_current != tracefile_id)) {
vstream->close_write_flag = 1;
} else {
/*
* limit our reading to the number of indexes received.
*/
vstream->close_write_flag = 0;
- if (stream) {
+ if (stream->close_flag) {
vstream->total_index_received = stream->total_index_received;
}
}
vstream->abort_flag = 0;
pthread_mutex_unlock(&vstream->overwrite_lock);
- if (stream) {
- pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
- }
+ pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
ret = index_open(vstream->path_name, vstream->channel_name,
vstream->tracefile_count, vstream->tracefile_count_current);