Accept uid and gid parameters in utils_mkdir()/utils_mkdir_recursive()
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index c843aa5ad355c930f49fc63d51692a3973f73c8c..92d466df7bc34ff2ba73161f37944dd9147b0cb5 100644 (file)
@@ -1248,7 +1248,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
        struct relay_session *session = conn->session;
        struct relay_stream *stream = NULL;
        struct lttcomm_relayd_status_stream reply;
-       struct ctf_trace *trace;
+       struct ctf_trace *trace = NULL;
 
        if (!session || conn->version_check_done == 0) {
                ERR("Trying to add a stream before version check");
@@ -1276,7 +1276,6 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
                goto err_free_stream;
        }
 
-       rcu_read_lock();
        stream->stream_handle = ++last_relay_stream_id;
        stream->prev_seq = -1ULL;
        stream->session_id = session->id;
@@ -1286,10 +1285,11 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
        lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
        pthread_mutex_init(&stream->lock, NULL);
 
-       ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG);
+       ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG,
+                       -1, -1);
        if (ret < 0) {
                ERR("relay creating output directory");
-               goto end;
+               goto err_free_stream;
        }
 
        /*
@@ -1300,7 +1300,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
                        stream->tracefile_size, 0, relayd_uid, relayd_gid, NULL);
        if (ret < 0) {
                ERR("Create output file");
-               goto end;
+               goto err_free_stream;
        }
        stream->fd = ret;
        if (stream->tracefile_size) {
@@ -1309,6 +1309,8 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
                DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name);
        }
 
+       /* Protect access to "trace" */
+       rcu_read_lock();
        trace = ctf_trace_find_by_path(session->ctf_traces_ht, stream->path_name);
        if (!trace) {
                trace = ctf_trace_create(stream->path_name);
@@ -1336,6 +1338,9 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
        /*
         * Both in the ctf_trace object and the global stream ht since the data
         * side of the relayd does not have the concept of session.
+        *
+        * rcu_read_lock() is kept to protect the stream which is now part of
+        * the relay_streams_ht.
         */
        lttng_ht_add_unique_u64(relay_streams_ht, &stream->node);
        cds_list_add_tail(&stream->trace_list, &trace->stream_list);
@@ -1352,7 +1357,7 @@ end:
        if (ret < 0) {
                reply.ret_code = htobe32(LTTNG_ERR_UNK);
                /* stream was not properly added to the ht, so free it */
-               free(stream);
+               stream_destroy(stream);
        } else {
                reply.ret_code = htobe32(LTTNG_OK);
        }
@@ -1363,15 +1368,19 @@ end:
                ERR("Relay sending stream id");
                ret = send_ret;
        }
+       /*
+        * rcu_read_lock() was held to protect either "trace" OR the "stream" at
+        * this point.
+        */
        rcu_read_unlock();
+       trace = NULL;
+       stream = NULL;
 
 end_no_session:
        return ret;
 
 err_free_stream:
-       free(stream->path_name);
-       free(stream->channel_name);
-       free(stream);
+       stream_destroy(stream);
        return ret;
 }
 
@@ -2494,7 +2503,6 @@ void *relay_thread_worker(void *data)
 {
        int ret, err = -1, last_seen_data_fd = -1;
        uint32_t nb_fd;
-       struct relay_connection *conn;
        struct lttng_poll_event events;
        struct lttng_ht *relay_connections_ht;
        struct lttng_ht_iter iter;
@@ -2502,6 +2510,7 @@ void *relay_thread_worker(void *data)
        struct relay_local_data *relay_ctx = (struct relay_local_data *) data;
        struct lttng_ht *sessions_ht = relay_ctx->sessions_ht;
        struct relay_index *index;
+       struct relay_connection *destroy_conn = NULL;
 
        DBG("[thread] Relay worker started");
 
@@ -2561,8 +2570,8 @@ restart:
                nb_fd = ret;
 
                /*
-                * Process control. The control connection is prioritised so we don't
-                * starve it with high throughout put tracing data on the data
+                * Process control. The control connection is prioritised so we
+                * don't starve it with high throughput tracing data on the data
                 * connection.
                 */
                for (i = 0; i < nb_fd; i++) {
@@ -2590,6 +2599,8 @@ restart:
                                        ERR("Relay connection pipe error");
                                        goto error;
                                } else if (revents & LPOLLIN) {
+                                       struct relay_connection *conn;
+
                                        ret = lttng_read(relay_conn_pipe[0], &conn, sizeof(conn));
                                        if (ret < 0) {
                                                goto error;
@@ -2605,32 +2616,34 @@ restart:
                                        DBG("Connection socket %d added", conn->sock->fd);
                                }
                        } else {
+                               struct relay_connection *ctrl_conn;
+
                                rcu_read_lock();
-                               conn = connection_find_by_sock(relay_connections_ht, pollfd);
+                               ctrl_conn = connection_find_by_sock(relay_connections_ht, pollfd);
                                /* If not found, there is a synchronization issue. */
-                               assert(conn);
+                               assert(ctrl_conn);
 
                                if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
                                        cleanup_connection_pollfd(&events, pollfd);
-                                       destroy_connection(relay_connections_ht, conn);
+                                       destroy_connection(relay_connections_ht, ctrl_conn);
                                        if (last_seen_data_fd == pollfd) {
                                                last_seen_data_fd = last_notdel_data_fd;
                                        }
                                } else if (revents & LPOLLIN) {
-                                       if (conn->type == RELAY_CONTROL) {
-                                               ret = conn->sock->ops->recvmsg(conn->sock, &recv_hdr,
+                                       if (ctrl_conn->type == RELAY_CONTROL) {
+                                               ret = ctrl_conn->sock->ops->recvmsg(ctrl_conn->sock, &recv_hdr,
                                                                sizeof(recv_hdr), 0);
                                                if (ret <= 0) {
                                                        /* Connection closed */
                                                        cleanup_connection_pollfd(&events, pollfd);
-                                                       destroy_connection(relay_connections_ht, conn);
+                                                       destroy_connection(relay_connections_ht, ctrl_conn);
                                                        DBG("Control connection closed with %d", pollfd);
                                                } else {
-                                                       ret = relay_process_control(&recv_hdr, conn);
+                                                       ret = relay_process_control(&recv_hdr, ctrl_conn);
                                                        if (ret < 0) {
                                                                /* Clear the session on error. */
                                                                cleanup_connection_pollfd(&events, pollfd);
-                                                               destroy_connection(relay_connections_ht, conn);
+                                                               destroy_connection(relay_connections_ht, ctrl_conn);
                                                                DBG("Connection closed with %d", pollfd);
                                                        }
                                                        seen_control = 1;
@@ -2676,6 +2689,7 @@ restart:
                        /* Fetch the poll data. */
                        uint32_t revents = LTTNG_POLL_GETEV(&events, i);
                        int pollfd = LTTNG_POLL_GETFD(&events, i);
+                       struct relay_connection *data_conn;
 
                        health_code_update();
 
@@ -2690,24 +2704,24 @@ restart:
                        }
 
                        rcu_read_lock();
-                       conn = connection_find_by_sock(relay_connections_ht, pollfd);
-                       if (!conn) {
+                       data_conn = connection_find_by_sock(relay_connections_ht, pollfd);
+                       if (!data_conn) {
                                /* Skip it. Might be removed before. */
                                rcu_read_unlock();
                                continue;
                        }
 
                        if (revents & LPOLLIN) {
-                               if (conn->type != RELAY_DATA) {
+                               if (data_conn->type != RELAY_DATA) {
                                        rcu_read_unlock();
                                        continue;
                                }
 
-                               ret = relay_process_data(conn);
+                               ret = relay_process_data(data_conn);
                                /* Connection closed */
                                if (ret < 0) {
                                        cleanup_connection_pollfd(&events, pollfd);
-                                       destroy_connection(relay_connections_ht, conn);
+                                       destroy_connection(relay_connections_ht, data_conn);
                                        DBG("Data connection closed with %d", pollfd);
                                        /*
                                         * Every goto restart call sets the last seen fd where
@@ -2735,10 +2749,11 @@ error:
 
        /* Cleanup reamaining connection object. */
        rcu_read_lock();
-       cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, conn,
+       cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter,
+                       destroy_conn,
                        sock_n.node) {
                health_code_update();
-               destroy_connection(relay_connections_ht, conn);
+               destroy_connection(relay_connections_ht, destroy_conn);
        }
        rcu_read_unlock();
 error_poll_create:
@@ -2814,7 +2829,8 @@ int main(int argc, char **argv)
                        goto exit_options;
                }
 
-               ret = utils_mkdir_recursive(opt_output_path, S_IRWXU | S_IRWXG);
+               ret = utils_mkdir_recursive(opt_output_path, S_IRWXU | S_IRWXG,
+                               -1, -1);
                if (ret < 0) {
                        ERR("Unable to create %s", opt_output_path);
                        retval = -1;
This page took 0.026932 seconds and 4 git commands to generate.