projects
/
lttng-tools.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
relayd: implement file and session rotation on top of trace chunks
[lttng-tools.git]
/
src
/
bin
/
lttng-relayd
/
viewer-stream.c
diff --git
a/src/bin/lttng-relayd/viewer-stream.c
b/src/bin/lttng-relayd/viewer-stream.c
index 623f94ca7fca4d059254d36bfc902ea030d17a03..60aa4371d5cc446bb4dac4bdd654e13d45adc9ea 100644
(file)
--- a/
src/bin/lttng-relayd/viewer-stream.c
+++ b/
src/bin/lttng-relayd/viewer-stream.c
@@
-116,29
+116,21
@@
struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
* the opening of the index, otherwise open it right now.
*/
if (stream->index_received_seqcount == 0) {
* the opening of the index, otherwise open it right now.
*/
if (stream->index_received_seqcount == 0) {
- vstream->index_f
d
= NULL;
+ vstream->index_f
ile
= NULL;
} else {
} else {
- int read_fd;
-
- read_fd = index_open(vstream->path_name, vstream->channel_name,
+ vstream->index_file = lttng_index_file_open(vstream->path_name,
+ vstream->channel_name,
stream->tracefile_count,
vstream->current_tracefile_id);
stream->tracefile_count,
vstream->current_tracefile_id);
- if (read_fd < 0) {
- goto error_unlock;
- }
- vstream->index_fd = stream_fd_create(read_fd);
- if (!vstream->index_fd) {
- if (close(read_fd)) {
- PERROR("close");
- }
+ if (!vstream->index_file) {
goto error_unlock;
}
}
goto error_unlock;
}
}
- if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_f
d
) {
+ if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_f
ile
) {
off_t lseek_ret;
off_t lseek_ret;
- lseek_ret = lseek(vstream->index_f
d
->fd, 0, SEEK_END);
+ lseek_ret = lseek(vstream->index_f
ile
->fd, 0, SEEK_END);
if (lseek_ret < 0) {
goto error_unlock;
}
if (lseek_ret < 0) {
goto error_unlock;
}
@@
-153,7
+145,6
@@
struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle);
lttng_ht_add_unique_u64(viewer_streams_ht, &vstream->stream_n);
lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle);
lttng_ht_add_unique_u64(viewer_streams_ht, &vstream->stream_n);
- pthread_mutex_init(&vstream->reflock, NULL);
urcu_ref_init(&vstream->ref);
return vstream;
urcu_ref_init(&vstream->ref);
return vstream;
@@
-192,9
+183,9
@@
static void viewer_stream_release(struct urcu_ref *ref)
stream_fd_put(vstream->stream_fd);
vstream->stream_fd = NULL;
}
stream_fd_put(vstream->stream_fd);
vstream->stream_fd = NULL;
}
- if (vstream->index_f
d
) {
-
stream_fd_put(vstream->index_fd
);
- vstream->index_f
d
= NULL;
+ if (vstream->index_f
ile
) {
+
lttng_index_file_put(vstream->index_file
);
+ vstream->index_f
ile
= NULL;
}
if (vstream->stream) {
stream_put(vstream->stream);
}
if (vstream->stream) {
stream_put(vstream->stream);
@@
-206,16
+197,7
@@
static void viewer_stream_release(struct urcu_ref *ref)
/* Must be called with RCU read-side lock held. */
bool viewer_stream_get(struct relay_viewer_stream *vstream)
{
/* Must be called with RCU read-side lock held. */
bool viewer_stream_get(struct relay_viewer_stream *vstream)
{
- bool has_ref = false;
-
- pthread_mutex_lock(&vstream->reflock);
- if (vstream->ref.refcount != 0) {
- has_ref = true;
- urcu_ref_get(&vstream->ref);
- }
- pthread_mutex_unlock(&vstream->reflock);
-
- return has_ref;
+ return urcu_ref_get_unless_zero(&vstream->ref);
}
/*
}
/*
@@
-248,9
+230,7
@@
end:
void viewer_stream_put(struct relay_viewer_stream *vstream)
{
rcu_read_lock();
void viewer_stream_put(struct relay_viewer_stream *vstream)
{
rcu_read_lock();
- pthread_mutex_lock(&vstream->reflock);
urcu_ref_put(&vstream->ref, viewer_stream_release);
urcu_ref_put(&vstream->ref, viewer_stream_release);
- pthread_mutex_unlock(&vstream->reflock);
rcu_read_unlock();
}
rcu_read_unlock();
}
@@
-305,29
+285,24
@@
int viewer_stream_rotate(struct relay_viewer_stream *vstream)
vstream->index_sent_seqcount = seq_tail;
}
vstream->index_sent_seqcount = seq_tail;
}
- if (vstream->index_f
d
) {
-
stream_fd_put(vstream->index_fd
);
- vstream->index_f
d
= NULL;
+ if (vstream->index_f
ile
) {
+
lttng_index_file_put(vstream->index_file
);
+ vstream->index_f
ile
= NULL;
}
if (vstream->stream_fd) {
stream_fd_put(vstream->stream_fd);
vstream->stream_fd = NULL;
}
}
if (vstream->stream_fd) {
stream_fd_put(vstream->stream_fd);
vstream->stream_fd = NULL;
}
- ret = index_open(vstream->path_name, vstream->channel_name,
+ vstream->index_file = lttng_index_file_open(vstream->path_name,
+ vstream->channel_name,
stream->tracefile_count,
vstream->current_tracefile_id);
stream->tracefile_count,
vstream->current_tracefile_id);
- if (ret < 0) {
+ if (!vstream->index_file) {
+ ret = -1;
goto end;
goto end;
- }
- vstream->index_fd = stream_fd_create(ret);
- if (vstream->index_fd) {
- ret = 0;
} else {
} else {
- if (close(ret)) {
- PERROR("close");
- }
- ret = -1;
+ ret = 0;
}
end:
return ret;
}
end:
return ret;
@@
-338,6
+313,10
@@
void print_viewer_streams(void)
struct lttng_ht_iter iter;
struct relay_viewer_stream *vstream;
struct lttng_ht_iter iter;
struct relay_viewer_stream *vstream;
+ if (!viewer_streams_ht) {
+ return;
+ }
+
rcu_read_lock();
cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream,
stream_n.node) {
rcu_read_lock();
cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream,
stream_n.node) {
This page took
0.029796 seconds
and
4
git commands to generate.