*/
#define NR_LTTNG_RELAY_READY 3
static int lttng_relay_ready = NR_LTTNG_RELAY_READY;
+
+/* Size of receive buffer. */
+#define RECV_DATA_BUFFER_SIZE 65536
+
static int recv_child_signal; /* Set to 1 when a SIGUSR1 signal is received. */
static pid_t child_ppid; /* Internal parent PID use with daemonize. */
static void sighandler(int sig)
{
switch (sig) {
- case SIGPIPE:
- DBG("SIGPIPE caught");
- return;
case SIGINT:
DBG("SIGINT caught");
if (lttng_relay_stop_threads()) {
return ret;
}
- sa.sa_handler = sighandler;
sa.sa_mask = sigset;
sa.sa_flags = 0;
+
+ sa.sa_handler = sighandler;
if ((ret = sigaction(SIGTERM, &sa, NULL)) < 0) {
PERROR("sigaction");
return ret;
return ret;
}
- if ((ret = sigaction(SIGPIPE, &sa, NULL)) < 0) {
+ if ((ret = sigaction(SIGUSR1, &sa, NULL)) < 0) {
PERROR("sigaction");
return ret;
}
- if ((ret = sigaction(SIGUSR1, &sa, NULL)) < 0) {
+ sa.sa_handler = SIG_IGN;
+ if ((ret = sigaction(SIGPIPE, &sa, NULL)) < 0) {
PERROR("sigaction");
return ret;
}
uint32_t data_size;
struct relay_session *session;
bool new_stream = false, close_requested = false;
+ size_t chunk_size = RECV_DATA_BUFFER_SIZE;
+ size_t recv_off = 0;
+ char data_buffer[chunk_size];
ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr,
sizeof(struct lttcomm_relayd_data_hdr), 0);
}
session = stream->trace->session;
data_size = be32toh(data_hdr.data_size);
- if (data_buffer_size < data_size) {
- char *tmp_data_ptr;
-
- tmp_data_ptr = realloc(data_buffer, data_size);
- if (!tmp_data_ptr) {
- ERR("Allocating data buffer");
- free(data_buffer);
- ret = -1;
- goto end_stream_put;
- }
- data_buffer = tmp_data_ptr;
- data_buffer_size = data_size;
- }
- memset(data_buffer, 0, data_size);
net_seq_num = be64toh(data_hdr.net_seq_num);
DBG3("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64,
data_size, stream_id, net_seq_num);
- ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0);
- if (ret <= 0) {
- if (ret == 0) {
- /* Orderly shutdown. Not necessary to print an error. */
- DBG("Socket %d did an orderly shutdown", conn->sock->fd);
- } else {
- ERR("Socket %d error %d", conn->sock->fd, ret);
- }
- ret = -1;
- goto end_stream_put;
- }
pthread_mutex_lock(&stream->lock);
}
}
- /* Write data to stream output fd. */
- size_ret = lttng_write(stream->stream_fd->fd, data_buffer, data_size);
- if (size_ret < data_size) {
- ERR("Relay error writing data to file");
- ret = -1;
- goto end_stream_unlock;
- }
+ for (recv_off = 0; recv_off < data_size; recv_off += chunk_size) {
+ size_t recv_size = min(data_size - recv_off, chunk_size);
- DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64,
- size_ret, stream->stream_handle);
+ ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, recv_size, 0);
+ if (ret <= 0) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+ } else {
+ ERR("Socket %d error %d", conn->sock->fd, ret);
+ }
+ ret = -1;
+ goto end_stream_unlock;
+ }
+
+ /* Write data to stream output fd. */
+ size_ret = lttng_write(stream->stream_fd->fd, data_buffer,
+ recv_size);
+ if (size_ret < recv_size) {
+ ERR("Relay error writing data to file");
+ ret = -1;
+ goto end_stream_unlock;
+ }
+
+ DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64,
+ size_ret, stream->stream_handle);
+ }
ret = write_padding_to_file(stream->stream_fd->fd,
be32toh(data_hdr.padding_size));
uatomic_set(&session->new_streams, 1);
pthread_mutex_unlock(&session->lock);
}
-end_stream_put:
stream_put(stream);
end:
return ret;
DBG("Thread exited with error");
}
DBG("Worker thread cleanup complete");
- free(data_buffer);
error_testpoint:
if (err) {
health_error();
}
}
-
/* Initialize thread health monitoring */
health_relayd = health_app_create(NR_HEALTH_RELAYD_TYPES);
if (!health_relayd) {
goto exit_init_data;
}
- /* Check if daemon is UID = 0 */
- if (!getuid()) {
- if (control_uri->port < 1024 || data_uri->port < 1024 || live_uri->port < 1024) {
- ERR("Need to be root to use ports < 1024");
- retval = -1;
- goto exit_init_data;
- }
- }
-
/* Setup the thread apps communication pipe. */
if (create_relay_conn_pipe()) {
retval = -1;
/* Init relay command queue. */
cds_wfcq_init(&relay_conn_queue.head, &relay_conn_queue.tail);
- /* Set up max poll set size */
- if (lttng_poll_set_max_size()) {
- retval = -1;
- goto exit_init_data;
- }
-
/* Initialize communication library */
lttcomm_init();
lttcomm_inet_init();
}
/* Create thread to manage the client socket */
- ret = pthread_create(&health_thread, NULL,
+ ret = pthread_create(&health_thread, default_pthread_attr(),
thread_manage_health, (void *) NULL);
if (ret) {
errno = ret;
}
/* Setup the dispatcher thread */
- ret = pthread_create(&dispatcher_thread, NULL,
+ ret = pthread_create(&dispatcher_thread, default_pthread_attr(),
relay_thread_dispatcher, (void *) NULL);
if (ret) {
errno = ret;
}
/* Setup the worker thread */
- ret = pthread_create(&worker_thread, NULL,
+ ret = pthread_create(&worker_thread, default_pthread_attr(),
relay_thread_worker, NULL);
if (ret) {
errno = ret;
}
/* Setup the listener thread */
- ret = pthread_create(&listener_thread, NULL,
+ ret = pthread_create(&listener_thread, default_pthread_attr(),
relay_thread_listener, (void *) NULL);
if (ret) {
errno = ret;