#include <common/hashtable/hashtable.h>
#include <common/sessiond-comm/relayd.h>
#include <common/uri.h>
+#include <common/utils.h>
#include "lttng-relayd.h"
*/
static int relay_cmd_pipe[2] = { -1, -1 };
+/* Shared between threads */
static int dispatch_thread_exit;
static pthread_t listener_thread;
fprintf(stderr, " -d, --daemonize Start as a daemon.\n");
fprintf(stderr, " -C, --control-port Control port listening (URI)\n");
fprintf(stderr, " -D, --data-port Data port listening (URI)\n");
+ fprintf(stderr, " -o, --output Output path for traces (PATH)\n");
fprintf(stderr, " -v, --verbose Verbose mode. Activate DBG() macro.\n");
}
char *default_address;
static struct option long_options[] = {
- { "control-port", 1, 0, 'C' },
- { "data-port", 1, 0, 'D' },
- { "daemonize", 0, 0, 'd' },
- { "output", 1, 0, 'o' },
- { "verbose", 0, 0, 'v' },
- { NULL, 0, 0, 0 }
+ { "control-port", 1, 0, 'C', },
+ { "data-port", 1, 0, 'D', },
+ { "daemonize", 0, 0, 'd', },
+ { "help", 0, 0, 'h', },
+ { "output", 1, 0, 'o', },
+ { "verbose", 0, 0, 'v', },
+ { NULL, 0, 0, 0 },
};
while (1) {
static
void cleanup(void)
{
- int i, ret;
-
DBG("Cleaning up");
- for (i = 0; i < 2; i++) {
- if (thread_quit_pipe[i] >= 0) {
- ret = close(thread_quit_pipe[i]);
- if (ret) {
- PERROR("close");
- }
- }
- }
+ /* Close thread quit pipes */
+ utils_close_pipe(thread_quit_pipe);
+
+ /* Close relay cmd pipes */
+ utils_close_pipe(relay_cmd_pipe);
}
/*
{
int ret;
- ret = write(wpipe, "!", 1);
+ do {
+ ret = write(wpipe, "!", 1);
+ } while (ret < 0 && errno == EINTR);
if (ret < 0) {
PERROR("write poll pipe");
}
}
/* Dispatch thread */
- dispatch_thread_exit = 1;
+ CMM_STORE_SHARED(dispatch_thread_exit, 1);
futex_nto1_wake(&relay_cmd_queue.futex);
}
static
int init_thread_quit_pipe(void)
{
- int ret, i;
-
- ret = pipe(thread_quit_pipe);
- if (ret < 0) {
- PERROR("thread quit pipe");
- goto error;
- }
+ int ret;
- for (i = 0; i < 2; i++) {
- ret = fcntl(thread_quit_pipe[i], F_SETFD, FD_CLOEXEC);
- if (ret < 0) {
- PERROR("fcntl");
- goto error;
- }
- }
+ ret = utils_create_pipe_cloexec(thread_quit_pipe);
-error:
return ret;
}
DBG("[thread] Relay dispatcher started");
- while (!dispatch_thread_exit) {
+ while (!CMM_LOAD_SHARED(dispatch_thread_exit)) {
/* Atomically prepare the queue futex */
futex_nto1_prepare(&relay_cmd_queue.futex);
* call is blocking so we can be assured that the data will be read
* at some point in time or wait to the end of the world :)
*/
- ret = write(relay_cmd_pipe[1], relay_cmd,
- sizeof(struct relay_command));
+ do {
+ ret = write(relay_cmd_pipe[1], relay_cmd,
+ sizeof(struct relay_command));
+ } while (ret < 0 && errno == EINTR);
free(relay_cmd);
if (ret < 0) {
PERROR("write cmd pipe");
goto end;
}
- ret = write(metadata_stream->fd, metadata_struct->payload,
- payload_size);
+ do {
+ ret = write(metadata_stream->fd, metadata_struct->payload,
+ payload_size);
+ } while (ret < 0 && errno == EINTR);
if (ret < (payload_size)) {
ERR("Relay error writing metadata on file");
ret = -1;
goto end;
}
- ret = write(stream->fd, data_buffer, data_size);
+ do {
+ ret = write(stream->fd, data_buffer, data_size);
+ } while (ret < 0 && errno == EINTR);
if (ret < data_size) {
ERR("Relay error writing data to file");
ret = -1;
*/
static int create_relay_cmd_pipe(void)
{
- int ret, i;
-
- ret = pipe(relay_cmd_pipe);
- if (ret < 0) {
- PERROR("relay cmd pipe");
- goto error;
- }
+ int ret;
- for (i = 0; i < 2; i++) {
- ret = fcntl(relay_cmd_pipe[i], F_SETFD, FD_CLOEXEC);
- if (ret < 0) {
- PERROR("fcntl relay_cmd_pipe");
- goto error;
- }
- }
+ ret = utils_create_pipe_cloexec(relay_cmd_pipe);
-error:
return ret;
}
/* Parse arguments */
progname = argv[0];
if ((ret = parse_args(argc, argv) < 0)) {
- goto error;
+ goto exit;
}
if ((ret = set_signal_handler()) < 0) {
ret = daemon(0, 0);
if (ret < 0) {
PERROR("daemon");
- goto error;
+ goto exit;
}
}
if (control_uri->port < 1024 || data_uri->port < 1024) {
ERR("Need to be root to use ports < 1024");
ret = -1;
- goto error;
+ goto exit;
}
}
if (!ret) {
exit(EXIT_SUCCESS);
}
+
error:
exit(EXIT_FAILURE);
}