projects
/
lttng-tools.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Fix: code refactoring of viewer streams in relayd
[lttng-tools.git]
/
src
/
bin
/
lttng-relayd
/
main.c
diff --git
a/src/bin/lttng-relayd/main.c
b/src/bin/lttng-relayd/main.c
index fe4a898bc6d349dfd866218e597d77ef12a746aa..02f676df8bc0e1bf56542e2c2394a5cabb59185c 100644
(file)
--- a/
src/bin/lttng-relayd/main.c
+++ b/
src/bin/lttng-relayd/main.c
@@
-62,6
+62,7
@@
#include "live.h"
#include "health-relayd.h"
#include "testpoint.h"
#include "live.h"
#include "health-relayd.h"
#include "testpoint.h"
+#include "viewer-stream.h"
/* command line options */
char *opt_output_path;
/* command line options */
char *opt_output_path;
@@
-1034,7
+1035,7
@@
static void destroy_stream(struct relay_stream *stream)
}
}
}
}
- vstream =
live_find_viewer_stream
_by_id(stream->stream_handle);
+ vstream =
viewer_stream_find
_by_id(stream->stream_handle);
if (vstream) {
/*
* Set the last good value into the viewer stream. This is done
if (vstream) {
/*
* Set the last good value into the viewer stream. This is done
@@
-1160,6
+1161,7
@@
int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr,
session->sock = cmd->sock;
session->minor = cmd->minor;
session->major = cmd->major;
session->sock = cmd->sock;
session->minor = cmd->minor;
session->major = cmd->major;
+ pthread_mutex_init(&session->viewer_ready_lock, NULL);
cmd->session = session;
reply.session_id = htobe64(session->id);
cmd->session = session;
reply.session_id = htobe64(session->id);
@@
-1207,6
+1209,8
@@
void set_viewer_ready_flag(struct relay_command *cmd)
{
struct relay_stream_recv_handle *node, *tmp_node;
{
struct relay_stream_recv_handle *node, *tmp_node;
+ pthread_mutex_lock(&cmd->session->viewer_ready_lock);
+
cds_list_for_each_entry_safe(node, tmp_node, &cmd->recv_head, node) {
struct relay_stream *stream;
cds_list_for_each_entry_safe(node, tmp_node, &cmd->recv_head, node) {
struct relay_stream *stream;
@@
-1229,6
+1233,7
@@
void set_viewer_ready_flag(struct relay_command *cmd)
free(node);
}
free(node);
}
+ pthread_mutex_unlock(&cmd->session->viewer_ready_lock);
return;
}
return;
}
@@
-1350,11
+1355,7
@@
int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
* stream message is received, this list is emptied and streams are set
* with the viewer ready flag.
*/
* stream message is received, this list is emptied and streams are set
* with the viewer ready flag.
*/
- if (stream->metadata_flag) {
- stream->viewer_ready = 1;
- } else {
- queue_stream_handle(stream->stream_handle, cmd);
- }
+ queue_stream_handle(stream->stream_handle, cmd);
lttng_ht_node_init_ulong(&stream->stream_n,
(unsigned long) stream->stream_handle);
lttng_ht_node_init_ulong(&stream->stream_n,
(unsigned long) stream->stream_handle);
@@
-2132,6
+2133,11
@@
int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr,
*/
set_viewer_ready_flag(cmd);
*/
set_viewer_ready_flag(cmd);
+ /*
+ * Inform the viewer that there are new streams in the session.
+ */
+ uatomic_set(&cmd->session->new_streams, 1);
+
reply.ret_code = htobe32(LTTNG_OK);
send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
if (send_ret < 0) {
reply.ret_code = htobe32(LTTNG_OK);
send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
if (send_ret < 0) {
@@
-2374,7
+2380,7
@@
int relay_process_data(struct relay_command *cmd)
(stream->oldest_tracefile_id + 1) %
stream->tracefile_count;
}
(stream->oldest_tracefile_id + 1) %
stream->tracefile_count;
}
- vstream =
live_find_viewer_stream
_by_id(stream->stream_handle);
+ vstream =
viewer_stream_find
_by_id(stream->stream_handle);
if (vstream) {
/*
* The viewer is reading a file about to be
if (vstream) {
/*
* The viewer is reading a file about to be
This page took
0.024837 seconds
and
4
git commands to generate.