Fix: lttng-relayd allow binding of privileged ports for non-root users
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index dcaaaa8dc0ec2c284eb211ebde21ab7234745b31..1ea2d2c08e5d96b0a894b43bbf33c7a48c3e4b41 100644 (file)
@@ -83,6 +83,10 @@ static int opt_daemon, opt_background;
  */
 #define NR_LTTNG_RELAY_READY   3
 static int lttng_relay_ready = NR_LTTNG_RELAY_READY;
+
+/* Size of receive buffer. */
+#define RECV_DATA_BUFFER_SIZE          65536
+
 static int recv_child_signal;  /* Set to 1 when a SIGUSR1 signal is received. */
 static pid_t child_ppid;       /* Internal parent PID use with daemonize. */
 
@@ -2084,8 +2088,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
        /* Get data offset because we are about to update the index. */
        data_offset = htobe64(stream->tracefile_size_current);
 
-       DBG("handle_index_data: stream %" PRIu64 " data offset %" PRIu64,
-               stream->stream_handle, stream->tracefile_size_current);
+       DBG("handle_index_data: stream %" PRIu64 " net_seq_num %" PRIu64 " data offset %" PRIu64,
+                       stream->stream_handle, net_seq_num, stream->tracefile_size_current);
 
        /*
         * Lookup for an existing index for that stream id/sequence
@@ -2166,6 +2170,9 @@ static int relay_process_data(struct relay_connection *conn)
        uint32_t data_size;
        struct relay_session *session;
        bool new_stream = false, close_requested = false;
+       size_t chunk_size = RECV_DATA_BUFFER_SIZE;
+       size_t recv_off = 0;
+       char data_buffer[chunk_size];
 
        ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr,
                        sizeof(struct lttcomm_relayd_data_hdr), 0);
@@ -2183,39 +2190,17 @@ static int relay_process_data(struct relay_connection *conn)
        stream_id = be64toh(data_hdr.stream_id);
        stream = stream_get_by_id(stream_id);
        if (!stream) {
+               ERR("relay_process_data: Cannot find stream %" PRIu64, stream_id);
                ret = -1;
                goto end;
        }
        session = stream->trace->session;
        data_size = be32toh(data_hdr.data_size);
-       if (data_buffer_size < data_size) {
-               char *tmp_data_ptr;
-
-               tmp_data_ptr = realloc(data_buffer, data_size);
-               if (!tmp_data_ptr) {
-                       ERR("Allocating data buffer");
-                       free(data_buffer);
-                       ret = -1;
-                       goto end_stream_put;
-               }
-               data_buffer = tmp_data_ptr;
-               data_buffer_size = data_size;
-       }
-       memset(data_buffer, 0, data_size);
 
        net_seq_num = be64toh(data_hdr.net_seq_num);
 
        DBG3("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64,
                data_size, stream_id, net_seq_num);
-       ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0);
-       if (ret <= 0) {
-               if (ret == 0) {
-                       /* Orderly shutdown. Not necessary to print an error. */
-                       DBG("Socket %d did an orderly shutdown", conn->sock->fd);
-               }
-               ret = -1;
-               goto end_stream_put;
-       }
 
        pthread_mutex_lock(&stream->lock);
 
@@ -2255,24 +2240,45 @@ static int relay_process_data(struct relay_connection *conn)
        if (session->minor >= 4 && !session->snapshot) {
                ret = handle_index_data(stream, net_seq_num, rotate_index);
                if (ret < 0) {
+                       ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
+                                       stream->stream_handle, net_seq_num, ret);
                        goto end_stream_unlock;
                }
        }
 
-       /* Write data to stream output fd. */
-       size_ret = lttng_write(stream->stream_fd->fd, data_buffer, data_size);
-       if (size_ret < data_size) {
-               ERR("Relay error writing data to file");
-               ret = -1;
-               goto end_stream_unlock;
-       }
+       for (recv_off = 0; recv_off < data_size; recv_off += chunk_size) {
+               size_t recv_size = min(data_size - recv_off, chunk_size);
 
-       DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64,
-                       size_ret, stream->stream_handle);
+               ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, recv_size, 0);
+               if (ret <= 0) {
+                       if (ret == 0) {
+                               /* Orderly shutdown. Not necessary to print an error. */
+                               DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+                       } else {
+                               ERR("Socket %d error %d", conn->sock->fd, ret);
+                       }
+                       ret = -1;
+                       goto end_stream_unlock;
+               }
+
+               /* Write data to stream output fd. */
+               size_ret = lttng_write(stream->stream_fd->fd, data_buffer,
+                               recv_size);
+               if (size_ret < recv_size) {
+                       ERR("Relay error writing data to file");
+                       ret = -1;
+                       goto end_stream_unlock;
+               }
+
+               DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64,
+                               size_ret, stream->stream_handle);
+       }
 
        ret = write_padding_to_file(stream->stream_fd->fd,
                        be32toh(data_hdr.padding_size));
        if (ret < 0) {
+               ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
+                               stream->stream_handle, net_seq_num, ret);
                goto end_stream_unlock;
        }
        stream->tracefile_size_current +=
@@ -2295,7 +2301,6 @@ end_stream_unlock:
                uatomic_set(&session->new_streams, 1);
                pthread_mutex_unlock(&session->lock);
        }
-end_stream_put:
        stream_put(stream);
 end:
        return ret;
@@ -2613,7 +2618,6 @@ relay_connections_ht_error:
                DBG("Thread exited with error");
        }
        DBG("Worker thread cleanup complete");
-       free(data_buffer);
 error_testpoint:
        if (err) {
                health_error();
@@ -2711,15 +2715,6 @@ int main(int argc, char **argv)
                goto exit_init_data;
        }
 
-       /* Check if daemon is UID = 0 */
-       if (!getuid()) {
-               if (control_uri->port < 1024 || data_uri->port < 1024 || live_uri->port < 1024) {
-                       ERR("Need to be root to use ports < 1024");
-                       retval = -1;
-                       goto exit_init_data;
-               }
-       }
-
        /* Setup the thread apps communication pipe. */
        if (create_relay_conn_pipe()) {
                retval = -1;
This page took 0.026404 seconds and 4 git commands to generate.