projects
/
lttng-tools.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Accept uid and gid parameters in utils_mkdir()/utils_mkdir_recursive()
[lttng-tools.git]
/
src
/
bin
/
lttng-relayd
/
main.c
diff --git
a/src/bin/lttng-relayd/main.c
b/src/bin/lttng-relayd/main.c
index c843aa5ad355c930f49fc63d51692a3973f73c8c..92d466df7bc34ff2ba73161f37944dd9147b0cb5 100644
(file)
--- a/
src/bin/lttng-relayd/main.c
+++ b/
src/bin/lttng-relayd/main.c
@@
-1248,7
+1248,7
@@
int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_session *session = conn->session;
struct relay_stream *stream = NULL;
struct lttcomm_relayd_status_stream reply;
struct relay_session *session = conn->session;
struct relay_stream *stream = NULL;
struct lttcomm_relayd_status_stream reply;
- struct ctf_trace *trace;
+ struct ctf_trace *trace
= NULL
;
if (!session || conn->version_check_done == 0) {
ERR("Trying to add a stream before version check");
if (!session || conn->version_check_done == 0) {
ERR("Trying to add a stream before version check");
@@
-1276,7
+1276,6
@@
int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
goto err_free_stream;
}
goto err_free_stream;
}
- rcu_read_lock();
stream->stream_handle = ++last_relay_stream_id;
stream->prev_seq = -1ULL;
stream->session_id = session->id;
stream->stream_handle = ++last_relay_stream_id;
stream->prev_seq = -1ULL;
stream->session_id = session->id;
@@
-1286,10
+1285,11
@@
int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
pthread_mutex_init(&stream->lock, NULL);
lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
pthread_mutex_init(&stream->lock, NULL);
- ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG);
+ ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG,
+ -1, -1);
if (ret < 0) {
ERR("relay creating output directory");
if (ret < 0) {
ERR("relay creating output directory");
- goto e
nd
;
+ goto e
rr_free_stream
;
}
/*
}
/*
@@
-1300,7
+1300,7
@@
int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
stream->tracefile_size, 0, relayd_uid, relayd_gid, NULL);
if (ret < 0) {
ERR("Create output file");
stream->tracefile_size, 0, relayd_uid, relayd_gid, NULL);
if (ret < 0) {
ERR("Create output file");
- goto e
nd
;
+ goto e
rr_free_stream
;
}
stream->fd = ret;
if (stream->tracefile_size) {
}
stream->fd = ret;
if (stream->tracefile_size) {
@@
-1309,6
+1309,8
@@
int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name);
}
DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name);
}
+ /* Protect access to "trace" */
+ rcu_read_lock();
trace = ctf_trace_find_by_path(session->ctf_traces_ht, stream->path_name);
if (!trace) {
trace = ctf_trace_create(stream->path_name);
trace = ctf_trace_find_by_path(session->ctf_traces_ht, stream->path_name);
if (!trace) {
trace = ctf_trace_create(stream->path_name);
@@
-1336,6
+1338,9
@@
int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
/*
* Both in the ctf_trace object and the global stream ht since the data
* side of the relayd does not have the concept of session.
/*
* Both in the ctf_trace object and the global stream ht since the data
* side of the relayd does not have the concept of session.
+ *
+ * rcu_read_lock() is kept to protect the stream which is now part of
+ * the relay_streams_ht.
*/
lttng_ht_add_unique_u64(relay_streams_ht, &stream->node);
cds_list_add_tail(&stream->trace_list, &trace->stream_list);
*/
lttng_ht_add_unique_u64(relay_streams_ht, &stream->node);
cds_list_add_tail(&stream->trace_list, &trace->stream_list);
@@
-1352,7
+1357,7
@@
end:
if (ret < 0) {
reply.ret_code = htobe32(LTTNG_ERR_UNK);
/* stream was not properly added to the ht, so free it */
if (ret < 0) {
reply.ret_code = htobe32(LTTNG_ERR_UNK);
/* stream was not properly added to the ht, so free it */
-
free
(stream);
+
stream_destroy
(stream);
} else {
reply.ret_code = htobe32(LTTNG_OK);
}
} else {
reply.ret_code = htobe32(LTTNG_OK);
}
@@
-1363,15
+1368,19
@@
end:
ERR("Relay sending stream id");
ret = send_ret;
}
ERR("Relay sending stream id");
ret = send_ret;
}
+ /*
+ * rcu_read_lock() was held to protect either "trace" OR the "stream" at
+ * this point.
+ */
rcu_read_unlock();
rcu_read_unlock();
+ trace = NULL;
+ stream = NULL;
end_no_session:
return ret;
err_free_stream:
end_no_session:
return ret;
err_free_stream:
- free(stream->path_name);
- free(stream->channel_name);
- free(stream);
+ stream_destroy(stream);
return ret;
}
return ret;
}
@@
-2494,7
+2503,6
@@
void *relay_thread_worker(void *data)
{
int ret, err = -1, last_seen_data_fd = -1;
uint32_t nb_fd;
{
int ret, err = -1, last_seen_data_fd = -1;
uint32_t nb_fd;
- struct relay_connection *conn;
struct lttng_poll_event events;
struct lttng_ht *relay_connections_ht;
struct lttng_ht_iter iter;
struct lttng_poll_event events;
struct lttng_ht *relay_connections_ht;
struct lttng_ht_iter iter;
@@
-2502,6
+2510,7
@@
void *relay_thread_worker(void *data)
struct relay_local_data *relay_ctx = (struct relay_local_data *) data;
struct lttng_ht *sessions_ht = relay_ctx->sessions_ht;
struct relay_index *index;
struct relay_local_data *relay_ctx = (struct relay_local_data *) data;
struct lttng_ht *sessions_ht = relay_ctx->sessions_ht;
struct relay_index *index;
+ struct relay_connection *destroy_conn = NULL;
DBG("[thread] Relay worker started");
DBG("[thread] Relay worker started");
@@
-2561,8
+2570,8
@@
restart:
nb_fd = ret;
/*
nb_fd = ret;
/*
- * Process control. The control connection is prioritised so we
don't
- *
starve it with high throughout
put tracing data on the data
+ * Process control. The control connection is prioritised so we
+ *
don't starve it with high through
put tracing data on the data
* connection.
*/
for (i = 0; i < nb_fd; i++) {
* connection.
*/
for (i = 0; i < nb_fd; i++) {
@@
-2590,6
+2599,8
@@
restart:
ERR("Relay connection pipe error");
goto error;
} else if (revents & LPOLLIN) {
ERR("Relay connection pipe error");
goto error;
} else if (revents & LPOLLIN) {
+ struct relay_connection *conn;
+
ret = lttng_read(relay_conn_pipe[0], &conn, sizeof(conn));
if (ret < 0) {
goto error;
ret = lttng_read(relay_conn_pipe[0], &conn, sizeof(conn));
if (ret < 0) {
goto error;
@@
-2605,32
+2616,34
@@
restart:
DBG("Connection socket %d added", conn->sock->fd);
}
} else {
DBG("Connection socket %d added", conn->sock->fd);
}
} else {
+ struct relay_connection *ctrl_conn;
+
rcu_read_lock();
rcu_read_lock();
- conn = connection_find_by_sock(relay_connections_ht, pollfd);
+ c
trl_c
onn = connection_find_by_sock(relay_connections_ht, pollfd);
/* If not found, there is a synchronization issue. */
/* If not found, there is a synchronization issue. */
- assert(conn);
+ assert(c
trl_c
onn);
if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
cleanup_connection_pollfd(&events, pollfd);
if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
cleanup_connection_pollfd(&events, pollfd);
- destroy_connection(relay_connections_ht, conn);
+ destroy_connection(relay_connections_ht, c
trl_c
onn);
if (last_seen_data_fd == pollfd) {
last_seen_data_fd = last_notdel_data_fd;
}
} else if (revents & LPOLLIN) {
if (last_seen_data_fd == pollfd) {
last_seen_data_fd = last_notdel_data_fd;
}
} else if (revents & LPOLLIN) {
- if (conn->type == RELAY_CONTROL) {
- ret = c
onn->sock->ops->recvmsg(
conn->sock, &recv_hdr,
+ if (c
trl_c
onn->type == RELAY_CONTROL) {
+ ret = c
trl_conn->sock->ops->recvmsg(ctrl_
conn->sock, &recv_hdr,
sizeof(recv_hdr), 0);
if (ret <= 0) {
/* Connection closed */
cleanup_connection_pollfd(&events, pollfd);
sizeof(recv_hdr), 0);
if (ret <= 0) {
/* Connection closed */
cleanup_connection_pollfd(&events, pollfd);
- destroy_connection(relay_connections_ht, conn);
+ destroy_connection(relay_connections_ht, c
trl_c
onn);
DBG("Control connection closed with %d", pollfd);
} else {
DBG("Control connection closed with %d", pollfd);
} else {
- ret = relay_process_control(&recv_hdr, conn);
+ ret = relay_process_control(&recv_hdr, c
trl_c
onn);
if (ret < 0) {
/* Clear the session on error. */
cleanup_connection_pollfd(&events, pollfd);
if (ret < 0) {
/* Clear the session on error. */
cleanup_connection_pollfd(&events, pollfd);
- destroy_connection(relay_connections_ht, conn);
+ destroy_connection(relay_connections_ht, c
trl_c
onn);
DBG("Connection closed with %d", pollfd);
}
seen_control = 1;
DBG("Connection closed with %d", pollfd);
}
seen_control = 1;
@@
-2676,6
+2689,7
@@
restart:
/* Fetch the poll data. */
uint32_t revents = LTTNG_POLL_GETEV(&events, i);
int pollfd = LTTNG_POLL_GETFD(&events, i);
/* Fetch the poll data. */
uint32_t revents = LTTNG_POLL_GETEV(&events, i);
int pollfd = LTTNG_POLL_GETFD(&events, i);
+ struct relay_connection *data_conn;
health_code_update();
health_code_update();
@@
-2690,24
+2704,24
@@
restart:
}
rcu_read_lock();
}
rcu_read_lock();
- conn = connection_find_by_sock(relay_connections_ht, pollfd);
- if (!conn) {
+
data_
conn = connection_find_by_sock(relay_connections_ht, pollfd);
+ if (!
data_
conn) {
/* Skip it. Might be removed before. */
rcu_read_unlock();
continue;
}
if (revents & LPOLLIN) {
/* Skip it. Might be removed before. */
rcu_read_unlock();
continue;
}
if (revents & LPOLLIN) {
- if (conn->type != RELAY_DATA) {
+ if (
data_
conn->type != RELAY_DATA) {
rcu_read_unlock();
continue;
}
rcu_read_unlock();
continue;
}
- ret = relay_process_data(conn);
+ ret = relay_process_data(
data_
conn);
/* Connection closed */
if (ret < 0) {
cleanup_connection_pollfd(&events, pollfd);
/* Connection closed */
if (ret < 0) {
cleanup_connection_pollfd(&events, pollfd);
- destroy_connection(relay_connections_ht, conn);
+ destroy_connection(relay_connections_ht,
data_
conn);
DBG("Data connection closed with %d", pollfd);
/*
* Every goto restart call sets the last seen fd where
DBG("Data connection closed with %d", pollfd);
/*
* Every goto restart call sets the last seen fd where
@@
-2735,10
+2749,11
@@
error:
/* Cleanup reamaining connection object. */
rcu_read_lock();
/* Cleanup reamaining connection object. */
rcu_read_lock();
- cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, conn,
+ cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter,
+ destroy_conn,
sock_n.node) {
health_code_update();
sock_n.node) {
health_code_update();
- destroy_connection(relay_connections_ht, conn);
+ destroy_connection(relay_connections_ht,
destroy_
conn);
}
rcu_read_unlock();
error_poll_create:
}
rcu_read_unlock();
error_poll_create:
@@
-2814,7
+2829,8
@@
int main(int argc, char **argv)
goto exit_options;
}
goto exit_options;
}
- ret = utils_mkdir_recursive(opt_output_path, S_IRWXU | S_IRWXG);
+ ret = utils_mkdir_recursive(opt_output_path, S_IRWXU | S_IRWXG,
+ -1, -1);
if (ret < 0) {
ERR("Unable to create %s", opt_output_path);
retval = -1;
if (ret < 0) {
ERR("Unable to create %s", opt_output_path);
retval = -1;
This page took
0.029112 seconds
and
4
git commands to generate.