*/
#define _GNU_SOURCE
+#define _LGPL_SOURCE
#include <getopt.h>
#include <grp.h>
#include <limits.h>
#include <common/common.h>
#include <common/compat/poll.h>
#include <common/compat/socket.h>
+#include <common/compat/endian.h>
#include <common/defaults.h>
#include <common/daemonize.h>
#include <common/futex.h>
break;
case 'g':
tracing_group_name = strdup(arg);
+ if (tracing_group_name == NULL) {
+ ret = -errno;
+ PERROR("strdup");
+ goto end;
+ }
tracing_group_name_override = 1;
break;
case 'h':
if (arg) {
lttng_opt_verbose = config_parse_value(arg);
} else {
- lttng_opt_verbose += 1;
+ /* Only 3 level of verbosity (-vvv). */
+ if (lttng_opt_verbose < 3) {
+ lttng_opt_verbose += 1;
+ }
}
break;
default:
static
int set_options(int argc, char **argv)
{
- int c, ret = 0, option_index = 0;
+ int c, ret = 0, option_index = 0, retval = 0;
int orig_optopt = optopt, orig_optind = optind;
char *default_address, *optstring;
const char *config_path = NULL;
optstring = utils_generate_optstring(long_options,
sizeof(long_options) / sizeof(struct option));
if (!optstring) {
- ret = -ENOMEM;
+ retval = -ENOMEM;
goto exit;
}
while ((c = getopt_long(argc, argv, optstring, long_options,
&option_index)) != -1) {
if (c == '?') {
- ret = -EINVAL;
+ retval = -EINVAL;
goto exit;
} else if (c != 'f') {
continue;
if (ret) {
if (ret > 0) {
ERR("Invalid configuration option at line %i", ret);
- ret = -1;
}
+ retval = -1;
goto exit;
}
ret = set_option(c, optarg, long_options[option_index].name);
if (ret < 0) {
+ retval = -1;
goto exit;
}
}
/* assign default values */
if (control_uri == NULL) {
- ret = asprintf(&default_address, "tcp://0.0.0.0:%d",
- DEFAULT_NETWORK_CONTROL_PORT);
+ ret = asprintf(&default_address,
+ "tcp://" DEFAULT_NETWORK_CONTROL_BIND_ADDRESS ":%d",
+ DEFAULT_NETWORK_CONTROL_PORT);
if (ret < 0) {
PERROR("asprintf default data address");
+ retval = -1;
goto exit;
}
free(default_address);
if (ret < 0) {
ERR("Invalid control URI specified");
+ retval = -1;
goto exit;
}
}
if (data_uri == NULL) {
- ret = asprintf(&default_address, "tcp://0.0.0.0:%d",
- DEFAULT_NETWORK_DATA_PORT);
+ ret = asprintf(&default_address,
+ "tcp://" DEFAULT_NETWORK_DATA_BIND_ADDRESS ":%d",
+ DEFAULT_NETWORK_DATA_PORT);
if (ret < 0) {
PERROR("asprintf default data address");
+ retval = -1;
goto exit;
}
free(default_address);
if (ret < 0) {
ERR("Invalid data URI specified");
+ retval = -1;
goto exit;
}
}
if (live_uri == NULL) {
- ret = asprintf(&default_address, "tcp://0.0.0.0:%d",
- DEFAULT_NETWORK_VIEWER_PORT);
+ ret = asprintf(&default_address,
+ "tcp://" DEFAULT_NETWORK_VIEWER_BIND_ADDRESS ":%d",
+ DEFAULT_NETWORK_VIEWER_PORT);
if (ret < 0) {
PERROR("asprintf default viewer control address");
+ retval = -1;
goto exit;
}
free(default_address);
if (ret < 0) {
ERR("Invalid viewer control URI specified");
+ retval = -1;
goto exit;
}
}
exit:
free(optstring);
- return ret;
+ return retval;
}
/*
* Cleanup the daemon
*/
static
-void cleanup(void)
+void relayd_cleanup(struct relay_local_data *relay_ctx)
{
DBG("Cleaning up");
+ if (viewer_streams_ht)
+ lttng_ht_destroy(viewer_streams_ht);
+ if (relay_streams_ht)
+ lttng_ht_destroy(relay_streams_ht);
+ if (relay_ctx && relay_ctx->sessions_ht)
+ lttng_ht_destroy(relay_ctx->sessions_ht);
+ free(relay_ctx);
+
/* free the dynamically allocated opt_output_path */
free(opt_output_path);
ret = lttng_write(wpipe, "!", 1);
if (ret < 1) {
PERROR("write poll pipe");
+ goto end;
}
-
+ ret = 0;
+end:
return ret;
}
-static void notify_health_quit_pipe(int *pipe)
+static
+int notify_health_quit_pipe(int *pipe)
{
ssize_t ret;
ret = lttng_write(pipe[1], "4", 1);
if (ret < 1) {
PERROR("write relay health quit");
+ goto end;
}
+ ret = 0;
+end:
+ return ret;
}
/*
- * Stop all threads by closing the thread quit pipe.
+ * Stop all relayd and relayd-live threads.
*/
-static
-void stop_threads(void)
+int lttng_relay_stop_threads(void)
{
- int ret;
+ int retval = 0;
/* Stopping all threads */
DBG("Terminating all threads");
- ret = notify_thread_pipe(thread_quit_pipe[1]);
- if (ret < 0) {
+ if (notify_thread_pipe(thread_quit_pipe[1])) {
ERR("write error on thread quit pipe");
+ retval = -1;
}
- notify_health_quit_pipe(health_quit_pipe);
+ if (notify_health_quit_pipe(health_quit_pipe)) {
+ ERR("write error on health quit pipe");
+ }
/* Dispatch thread */
CMM_STORE_SHARED(dispatch_thread_exit, 1);
futex_nto1_wake(&relay_conn_queue.futex);
+
+ if (relayd_live_stop()) {
+ ERR("Error stopping live threads");
+ retval = -1;
+ }
+ return retval;
}
/*
return;
case SIGINT:
DBG("SIGINT caught");
- stop_threads();
+ if (lttng_relay_stop_threads()) {
+ ERR("Error stopping threads");
+ }
break;
case SIGTERM:
DBG("SIGTERM caught");
- stop_threads();
+ if (lttng_relay_stop_threads()) {
+ ERR("Error stopping threads");
+ }
break;
case SIGUSR1:
CMM_STORE_SHARED(recv_child_signal, 1);
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
+ if (!revents) {
+ /* No activity for this FD (poll implementation). */
+ continue;
+ }
+
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
new_conn->sock = newsock;
/* Enqueue request for the dispatcher thread. */
- cds_wfq_enqueue(&relay_conn_queue.queue, &new_conn->qnode);
+ cds_wfcq_enqueue(&relay_conn_queue.head, &relay_conn_queue.tail,
+ &new_conn->qnode);
/*
* Wake the dispatch queue futex. Implicit memory barrier with
- * the exchange in cds_wfq_enqueue.
+ * the exchange in cds_wfcq_enqueue.
*/
futex_nto1_wake(&relay_conn_queue.futex);
}
}
health_unregister(health_relayd);
DBG("Relay listener thread cleanup complete");
- stop_threads();
+ lttng_relay_stop_threads();
return NULL;
}
{
int err = -1;
ssize_t ret;
- struct cds_wfq_node *node;
+ struct cds_wfcq_node *node;
struct relay_connection *new_conn = NULL;
DBG("[thread] Relay dispatcher started");
health_code_update();
/* Dequeue commands */
- node = cds_wfq_dequeue_blocking(&relay_conn_queue.queue);
+ node = cds_wfcq_dequeue_blocking(&relay_conn_queue.head,
+ &relay_conn_queue.tail);
if (node == NULL) {
DBG("Woken up but nothing in the relay command queue");
/* Continue thread execution */
}
health_unregister(health_relayd);
DBG("Dispatch thread dying");
- stop_threads();
+ lttng_relay_stop_threads();
return NULL;
}
stream->session_id = session->id;
stream->index_fd = -1;
stream->read_index_fd = -1;
+ stream->ctf_stream_id = -1ULL;
lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
pthread_mutex_init(&stream->lock, NULL);
stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
stream->close_flag = 1;
session->stream_count--;
- assert(session->stream_count >= 0);
/* Check if we can close it or else the data will do it. */
try_close_stream(session, stream);
DBG("Received live beacon for stream %" PRIu64, stream->stream_handle);
/*
- * Only flag a stream inactive when it has already received data.
+ * Only flag a stream inactive when it has already received data
+ * and no indexes are in flight.
*/
- if (stream->total_index_received > 0) {
+ if (stream->total_index_received > 0 && stream->indexes_in_flight == 0) {
stream->beacon_ts_end = be64toh(index_info.timestamp_end);
}
ret = 0;
goto end_rcu_unlock;
}
index_created = 1;
+ stream->indexes_in_flight++;
}
copy_index_control_data(index, &index_info);
+ if (stream->ctf_stream_id == -1ULL) {
+ stream->ctf_stream_id = be64toh(index_info.stream_id);
+ }
if (index_created) {
/*
goto end_rcu_unlock;
}
stream->total_index_received++;
+ stream->indexes_in_flight--;
+ assert(stream->indexes_in_flight >= 0);
}
end_rcu_unlock:
goto error;
}
index_created = 1;
+ stream->indexes_in_flight++;
}
if (rotate_index || stream->index_fd < 0) {
goto error;
}
stream->total_index_received++;
+ stream->indexes_in_flight--;
+ assert(stream->indexes_in_flight >= 0);
}
error:
stream->tracefile_size, stream->tracefile_count,
relayd_uid, relayd_gid, stream->fd,
&(stream->tracefile_count_current), &stream->fd);
- stream->total_index_received = 0;
pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
if (ret < 0) {
ERR("Rotating stream output file");
connection_delete(relay_connections_ht, conn);
/* For the control socket, we try to destroy the session. */
- if (conn->type == RELAY_CONTROL) {
+ if (conn->type == RELAY_CONTROL && conn->session) {
destroy_session(conn->session, conn->sessions_ht);
}
struct lttcomm_relayd_hdr recv_hdr;
struct relay_local_data *relay_ctx = (struct relay_local_data *) data;
struct lttng_ht *sessions_ht = relay_ctx->sessions_ht;
+ struct relay_index *index;
DBG("[thread] Relay worker started");
health_code_update();
+ if (!revents) {
+ /* No activity for this FD (poll implementation). */
+ continue;
+ }
+
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
health_code_update();
+ if (!revents) {
+ /* No activity for this FD (poll implementation). */
+ continue;
+ }
+
/* Skip the command pipe. It's handled in the first loop. */
if (pollfd == relay_conn_pipe[0]) {
continue;
}
- if (revents) {
- rcu_read_lock();
- conn = connection_find_by_sock(relay_connections_ht, pollfd);
- if (!conn) {
- /* Skip it. Might be removed before. */
+ rcu_read_lock();
+ conn = connection_find_by_sock(relay_connections_ht, pollfd);
+ if (!conn) {
+ /* Skip it. Might be removed before. */
+ rcu_read_unlock();
+ continue;
+ }
+
+ if (revents & LPOLLIN) {
+ if (conn->type != RELAY_DATA) {
rcu_read_unlock();
continue;
}
- if (revents & LPOLLIN) {
- if (conn->type != RELAY_DATA) {
- continue;
- }
-
- ret = relay_process_data(conn);
- /* Connection closed */
- if (ret < 0) {
- cleanup_connection_pollfd(&events, pollfd);
- destroy_connection(relay_connections_ht, conn);
- DBG("Data connection closed with %d", pollfd);
- /*
- * Every goto restart call sets the last seen fd where
- * here we don't really care since we gracefully
- * continue the loop after the connection is deleted.
- */
- } else {
- /* Keep last seen port. */
- last_seen_data_fd = pollfd;
- rcu_read_unlock();
- goto restart;
- }
+ ret = relay_process_data(conn);
+ /* Connection closed */
+ if (ret < 0) {
+ cleanup_connection_pollfd(&events, pollfd);
+ destroy_connection(relay_connections_ht, conn);
+ DBG("Data connection closed with %d", pollfd);
+ /*
+ * Every goto restart call sets the last seen fd where
+ * here we don't really care since we gracefully
+ * continue the loop after the connection is deleted.
+ */
+ } else {
+ /* Keep last seen port. */
+ last_seen_data_fd = pollfd;
+ rcu_read_unlock();
+ goto restart;
}
- rcu_read_unlock();
}
+ rcu_read_unlock();
}
last_seen_data_fd = -1;
}
}
rcu_read_unlock();
error_poll_create:
+ rcu_read_lock();
+ cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index,
+ index_n.node) {
+ health_code_update();
+ relay_index_delete(index);
+ relay_index_free_safe(index);
+ }
+ rcu_read_unlock();
lttng_ht_destroy(indexes_ht);
indexes_ht_error:
lttng_ht_destroy(relay_connections_ht);
}
health_unregister(health_relayd);
rcu_unregister_thread();
- stop_threads();
+ lttng_relay_stop_threads();
return NULL;
}
*/
int main(int argc, char **argv)
{
- int ret = 0;
+ int ret = 0, retval = 0;
void *status;
- struct relay_local_data *relay_ctx;
+ struct relay_local_data *relay_ctx = NULL;
/* Parse arguments */
progname = argv[0];
- if ((ret = set_options(argc, argv)) < 0) {
- goto exit;
+ if (set_options(argc, argv)) {
+ retval = -1;
+ goto exit_options;
}
- if ((ret = set_signal_handler()) < 0) {
- goto exit;
+ if (set_signal_handler()) {
+ retval = -1;
+ goto exit_options;
}
/* Try to create directory if -o, --output is specified. */
if (opt_output_path) {
if (*opt_output_path != '/') {
ERR("Please specify an absolute path for -o, --output PATH");
- goto exit;
+ retval = -1;
+ goto exit_options;
}
ret = utils_mkdir_recursive(opt_output_path, S_IRWXU | S_IRWXG);
if (ret < 0) {
ERR("Unable to create %s", opt_output_path);
- goto exit;
+ retval = -1;
+ goto exit_options;
}
}
ret = lttng_daemonize(&child_ppid, &recv_child_signal,
!opt_background);
if (ret < 0) {
- goto exit;
+ retval = -1;
+ goto exit_options;
}
/*
}
}
+
+ /* Initialize thread health monitoring */
+ health_relayd = health_app_create(NR_HEALTH_RELAYD_TYPES);
+ if (!health_relayd) {
+ PERROR("health_app_create error");
+ retval = -1;
+ goto exit_health_app_create;
+ }
+
/* Create thread quit pipe */
- if ((ret = init_thread_quit_pipe()) < 0) {
- goto error;
+ if (init_thread_quit_pipe()) {
+ retval = -1;
+ goto exit_init_data;
}
/* We need those values for the file/dir creation. */
if (relayd_uid == 0) {
if (control_uri->port < 1024 || data_uri->port < 1024 || live_uri->port < 1024) {
ERR("Need to be root to use ports < 1024");
- ret = -1;
- goto exit;
+ retval = -1;
+ goto exit_init_data;
}
}
/* Setup the thread apps communication pipe. */
- if ((ret = create_relay_conn_pipe()) < 0) {
- goto exit;
+ if (create_relay_conn_pipe()) {
+ retval = -1;
+ goto exit_init_data;
}
/* Init relay command queue. */
- cds_wfq_init(&relay_conn_queue.queue);
+ cds_wfcq_init(&relay_conn_queue.head, &relay_conn_queue.tail);
/* Set up max poll set size */
lttng_poll_set_max_size();
relay_ctx = zmalloc(sizeof(struct relay_local_data));
if (!relay_ctx) {
PERROR("relay_ctx");
- goto exit;
+ retval = -1;
+ goto exit_init_data;
}
/* tables of sessions indexed by session ID */
relay_ctx->sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!relay_ctx->sessions_ht) {
- goto exit_relay_ctx_sessions;
+ retval = -1;
+ goto exit_init_data;
}
/* tables of streams indexed by stream ID */
relay_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!relay_streams_ht) {
- goto exit_relay_ctx_streams;
+ retval = -1;
+ goto exit_init_data;
}
/* tables of streams indexed by stream ID */
viewer_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!viewer_streams_ht) {
- goto exit_relay_ctx_viewer_streams;
- }
-
- /* Initialize thread health monitoring */
- health_relayd = health_app_create(NR_HEALTH_RELAYD_TYPES);
- if (!health_relayd) {
- PERROR("health_app_create error");
- goto exit_health_app_create;
+ retval = -1;
+ goto exit_init_data;
}
ret = utils_create_pipe(health_quit_pipe);
- if (ret < 0) {
- goto error_health_pipe;
+ if (ret) {
+ retval = -1;
+ goto exit_health_quit_pipe;
}
/* Create thread to manage the client socket */
ret = pthread_create(&health_thread, NULL,
thread_manage_health, (void *) NULL);
- if (ret != 0) {
+ if (ret) {
+ errno = ret;
PERROR("pthread_create health");
- goto health_error;
+ retval = -1;
+ goto exit_health_thread;
}
/* Setup the dispatcher thread */
ret = pthread_create(&dispatcher_thread, NULL,
relay_thread_dispatcher, (void *) NULL);
- if (ret != 0) {
+ if (ret) {
+ errno = ret;
PERROR("pthread_create dispatcher");
- goto exit_dispatcher;
+ retval = -1;
+ goto exit_dispatcher_thread;
}
/* Setup the worker thread */
ret = pthread_create(&worker_thread, NULL,
relay_thread_worker, (void *) relay_ctx);
- if (ret != 0) {
+ if (ret) {
+ errno = ret;
PERROR("pthread_create worker");
- goto exit_worker;
+ retval = -1;
+ goto exit_worker_thread;
}
/* Setup the listener thread */
ret = pthread_create(&listener_thread, NULL,
relay_thread_listener, (void *) NULL);
- if (ret != 0) {
+ if (ret) {
+ errno = ret;
PERROR("pthread_create listener");
- goto exit_listener;
+ retval = -1;
+ goto exit_listener_thread;
}
- ret = live_start_threads(live_uri, relay_ctx);
- if (ret != 0) {
+ ret = relayd_live_create(live_uri, relay_ctx);
+ if (ret) {
ERR("Starting live viewer threads");
+ retval = -1;
goto exit_live;
}
+ /*
+ * This is where we start awaiting program completion (e.g. through
+ * signal that asks threads to teardown).
+ */
+
+ ret = relayd_live_join();
+ if (ret) {
+ retval = -1;
+ }
exit_live:
+
ret = pthread_join(listener_thread, &status);
- if (ret != 0) {
- PERROR("pthread_join");
- goto error; /* join error, exit without cleanup */
+ if (ret) {
+ errno = ret;
+ PERROR("pthread_join listener_thread");
+ retval = -1;
}
-exit_listener:
+exit_listener_thread:
ret = pthread_join(worker_thread, &status);
- if (ret != 0) {
- PERROR("pthread_join");
- goto error; /* join error, exit without cleanup */
+ if (ret) {
+ errno = ret;
+ PERROR("pthread_join worker_thread");
+ retval = -1;
}
-exit_worker:
+exit_worker_thread:
ret = pthread_join(dispatcher_thread, &status);
- if (ret != 0) {
- PERROR("pthread_join");
- goto error; /* join error, exit without cleanup */
+ if (ret) {
+ errno = ret;
+ PERROR("pthread_join dispatcher_thread");
+ retval = -1;
}
+exit_dispatcher_thread:
-exit_dispatcher:
ret = pthread_join(health_thread, &status);
- if (ret != 0) {
- PERROR("pthread_join health thread");
- goto error; /* join error, exit without cleanup */
+ if (ret) {
+ errno = ret;
+ PERROR("pthread_join health_thread");
+ retval = -1;
}
+exit_health_thread:
- /*
- * Stop live threads only after joining other threads.
- */
- live_stop_threads();
-
-health_error:
utils_close_pipe(health_quit_pipe);
+exit_health_quit_pipe:
-error_health_pipe:
+exit_init_data:
health_app_destroy(health_relayd);
-
exit_health_app_create:
- lttng_ht_destroy(viewer_streams_ht);
-
-exit_relay_ctx_viewer_streams:
- lttng_ht_destroy(relay_streams_ht);
-
-exit_relay_ctx_streams:
- lttng_ht_destroy(relay_ctx->sessions_ht);
+exit_options:
+ relayd_cleanup(relay_ctx);
-exit_relay_ctx_sessions:
- free(relay_ctx);
-
-exit:
- cleanup();
- if (!ret) {
+ if (!retval) {
exit(EXIT_SUCCESS);
+ } else {
+ exit(EXIT_FAILURE);
}
-
-error:
- exit(EXIT_FAILURE);
}