projects
/
lttng-tools.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
relayd: move relay_session locking outside of make_viewer_streams
[lttng-tools.git]
/
src
/
bin
/
lttng-relayd
/
live.c
diff --git
a/src/bin/lttng-relayd/live.c
b/src/bin/lttng-relayd/live.c
index 84fa1435196057c6a506b019eedf4bcf477da49b..dc8238c9499b876de333a7c313e1e95135e847aa 100644
(file)
--- a/
src/bin/lttng-relayd/live.c
+++ b/
src/bin/lttng-relayd/live.c
@@
-197,7
+197,7
@@
end:
*/
static
ssize_t send_viewer_streams(struct lttcomm_sock *sock,
*/
static
ssize_t send_viewer_streams(struct lttcomm_sock *sock,
-
struct relay_session *session
, unsigned int ignore_sent_flag)
+
uint64_t session_id
, unsigned int ignore_sent_flag)
{
ssize_t ret;
struct lttng_viewer_stream send_stream;
{
ssize_t ret;
struct lttng_viewer_stream send_stream;
@@
-218,7
+218,7
@@
ssize_t send_viewer_streams(struct lttcomm_sock *sock,
pthread_mutex_lock(&vstream->stream->lock);
/* Ignore if not the same session. */
pthread_mutex_lock(&vstream->stream->lock);
/* Ignore if not the same session. */
- if (vstream->stream->trace->session->id != session
->
id ||
+ if (vstream->stream->trace->session->id != session
_
id ||
(!ignore_sent_flag && vstream->sent_flag)) {
pthread_mutex_unlock(&vstream->stream->lock);
viewer_stream_put(vstream);
(!ignore_sent_flag && vstream->sent_flag)) {
pthread_mutex_unlock(&vstream->stream->lock);
viewer_stream_put(vstream);
@@
-271,6
+271,9
@@
end_unlock:
* viewer stream of the session, the number of unsent stream and the number of
* stream created. Those counters can be NULL and thus will be ignored.
*
* viewer stream of the session, the number of unsent stream and the number of
* stream created. Those counters can be NULL and thus will be ignored.
*
+ * session must be locked to ensure that we see either none or all initial
+ * streams for a session, but no intermediate state..
+ *
* Return 0 on success or else a negative value.
*/
static
* Return 0 on success or else a negative value.
*/
static
@@
-283,12
+286,7
@@
int make_viewer_streams(struct relay_session *session,
struct ctf_trace *ctf_trace;
assert(session);
struct ctf_trace *ctf_trace;
assert(session);
-
- /*
- * Hold the session lock to ensure that we see either none or
- * all initial streams for a session, but no intermediate state.
- */
- pthread_mutex_lock(&session->lock);
+ ASSERT_LOCKED(session->lock);
if (session->connection_closed) {
*closed = true;
if (session->connection_closed) {
*closed = true;
@@
-376,7
+374,6
@@
int make_viewer_streams(struct relay_session *session,
error_unlock:
rcu_read_unlock();
error_unlock:
rcu_read_unlock();
- pthread_mutex_unlock(&session->lock);
return ret;
}
return ret;
}
@@
-946,8
+943,10
@@
int viewer_get_new_streams(struct relay_connection *conn)
send_streams = 1;
response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
send_streams = 1;
response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
+ pthread_mutex_lock(&session->lock);
ret = make_viewer_streams(session, LTTNG_VIEWER_SEEK_LAST, &nb_total, &nb_unsent,
&nb_created, &closed);
ret = make_viewer_streams(session, LTTNG_VIEWER_SEEK_LAST, &nb_total, &nb_unsent,
&nb_created, &closed);
+ pthread_mutex_unlock(&session->lock);
if (ret < 0) {
goto end_put_session;
}
if (ret < 0) {
goto end_put_session;
}
@@
-988,7
+987,7
@@
send_reply:
* streams that were not sent from that point will be sent to
* the viewer.
*/
* streams that were not sent from that point will be sent to
* the viewer.
*/
- ret = send_viewer_streams(conn->sock, session, 0);
+ ret = send_viewer_streams(conn->sock, session
_id
, 0);
if (ret < 0) {
goto end_put_session;
}
if (ret < 0) {
goto end_put_session;
}
@@
-1015,6
+1014,7
@@
int viewer_attach_session(struct relay_connection *conn)
struct lttng_viewer_attach_session_response response;
struct relay_session *session = NULL;
bool closed = false;
struct lttng_viewer_attach_session_response response;
struct relay_session *session = NULL;
bool closed = false;
+ uint64_t session_id;
assert(conn);
assert(conn);
@@
-1026,6
+1026,7
@@
int viewer_attach_session(struct relay_connection *conn)
goto error;
}
goto error;
}
+ session_id = be64toh(request.session_id);
health_code_update();
memset(&response, 0, sizeof(response));
health_code_update();
memset(&response, 0, sizeof(response));
@@
-1036,16
+1037,15
@@
int viewer_attach_session(struct relay_connection *conn)
goto send_reply;
}
goto send_reply;
}
- session = session_get_by_id(
be64toh(request.session_id)
);
+ session = session_get_by_id(
session_id
);
if (!session) {
if (!session) {
- DBG("Relay session %" PRIu64 " not found",
- (uint64_t) be64toh(request.session_id));
+ DBG("Relay session %" PRIu64 " not found", session_id);
response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK);
goto send_reply;
}
response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK);
goto send_reply;
}
- DBG("Attach session ID %" PRIu64 " received",
- (uint64_t) be64toh(request.session_id));
+ DBG("Attach session ID %" PRIu64 " received", session_id);
+ pthread_mutex_lock(&session->lock);
if (session->live_timer == 0) {
DBG("Not live session");
response.status = htobe32(LTTNG_VIEWER_ATTACH_NOT_LIVE);
if (session->live_timer == 0) {
DBG("Not live session");
response.status = htobe32(LTTNG_VIEWER_ATTACH_NOT_LIVE);
@@
-1078,8
+1078,12
@@
int viewer_attach_session(struct relay_connection *conn)
if (ret < 0) {
goto end_put_session;
}
if (ret < 0) {
goto end_put_session;
}
- response.streams_count = htobe32(nb_streams);
+ pthread_mutex_unlock(&session->lock);
+ session_put(session);
+ session = NULL;
+
+ response.streams_count = htobe32(nb_streams);
/*
* If the session is closed when the viewer is attaching, it
* means some of the streams may have been concurrently removed,
/*
* If the session is closed when the viewer is attaching, it
* means some of the streams may have been concurrently removed,
@@
-1111,13
+1115,14
@@
send_reply:
}
/* Send stream and ignore the sent flag. */
}
/* Send stream and ignore the sent flag. */
- ret = send_viewer_streams(conn->sock, session, 1);
+ ret = send_viewer_streams(conn->sock, session
_id
, 1);
if (ret < 0) {
goto end_put_session;
}
end_put_session:
if (session) {
if (ret < 0) {
goto end_put_session;
}
end_put_session:
if (session) {
+ pthread_mutex_unlock(&session->lock);
session_put(session);
}
error:
session_put(session);
}
error:
This page took
0.025203 seconds
and
4
git commands to generate.