#include "utils.h"
#include "lttng-relayd.h"
#include "live.h"
+#include "health-relayd.h"
/* command line options */
char *opt_output_path;
const char *progname;
+const char *tracing_group_name = DEFAULT_TRACING_GROUP;
+
/*
* Quit pipe for all threads. This permits a single cancellation point
* for all threads when receiving an event on the pipe.
static pthread_t listener_thread;
static pthread_t dispatcher_thread;
static pthread_t worker_thread;
+static pthread_t health_thread;
static uint64_t last_relay_stream_id;
static uint64_t last_relay_session_id;
static char *data_buffer;
static unsigned int data_buffer_size;
-/* Global hash table that stores relay index object. */
-static struct lttng_ht *indexes_ht;
-
/* We need those values for the file/dir creation. */
static uid_t relayd_uid;
static gid_t relayd_gid;
/* Global relay viewer stream hash table. */
struct lttng_ht *viewer_streams_ht;
+/* Global hash table that stores relay index object. */
+struct lttng_ht *indexes_ht;
+
+/* Relayd health monitoring */
+struct health_app *health_relayd;
+
/*
* usage function on stderr
*/
fprintf(stderr, " -D, --data-port URL Data port listening.\n");
fprintf(stderr, " -o, --output PATH Output path for traces. Must use an absolute path.\n");
fprintf(stderr, " -v, --verbose Verbose mode. Activate DBG() macro.\n");
+ fprintf(stderr, " -g, --group NAME Specify the tracing group name. (default: tracing)\n");
}
static
{ "control-port", 1, 0, 'C', },
{ "data-port", 1, 0, 'D', },
{ "daemonize", 0, 0, 'd', },
+ { "group", 1, 0, 'g', },
{ "help", 0, 0, 'h', },
{ "output", 1, 0, 'o', },
{ "verbose", 0, 0, 'v', },
while (1) {
int option_index = 0;
- c = getopt_long(argc, argv, "dhv" "C:D:o:",
+ c = getopt_long(argc, argv, "dhv" "C:D:o:g:",
long_options, &option_index);
if (c == -1) {
break;
case 'd':
opt_daemon = 1;
break;
+ case 'g':
+ tracing_group_name = optarg;
+ break;
case 'h':
usage();
exit(EXIT_FAILURE);
return ret;
}
+static void notify_health_quit_pipe(int *pipe)
+{
+ int ret;
+
+ do {
+ ret = write(pipe[1], "4", 1);
+ } while (ret < 0 && errno == EINTR);
+ if (ret < 0 || ret != 1) {
+ PERROR("write relay health quit");
+ }
+}
+
/*
* Stop all threads by closing the thread quit pipe.
*/
ERR("write error on thread quit pipe");
}
+ notify_health_quit_pipe(health_quit_pipe);
+
/* Dispatch thread */
CMM_STORE_SHARED(dispatch_thread_exit, 1);
futex_nto1_wake(&relay_cmd_queue.futex);
DBG("[thread] Relay listener started");
+ health_register(health_relayd, HEALTH_RELAYD_TYPE_LISTENER);
+
+ health_code_update();
+
control_sock = relay_init_sock(control_uri);
if (!control_sock) {
goto error_sock_control;
}
while (1) {
+ health_code_update();
+
DBG("Listener accepting connections");
restart:
+ health_poll_entry();
ret = lttng_poll_wait(&events, -1);
+ health_poll_exit();
if (ret < 0) {
/*
* Restart interrupted system call.
DBG("Relay new connection received");
for (i = 0; i < nb_fd; i++) {
+ health_code_update();
+
/* Fetch once the poll data */
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
lttcomm_destroy_sock(control_sock);
error_sock_control:
if (err) {
- DBG("Thread exited with error");
+ health_error();
+ ERR("Health error occurred in %s", __func__);
}
+ health_unregister(health_relayd);
DBG("Relay listener thread cleanup complete");
stop_threads();
return NULL;
static
void *relay_thread_dispatcher(void *data)
{
- int ret;
+ int ret, err = -1;
struct cds_wfq_node *node;
struct relay_command *relay_cmd = NULL;
DBG("[thread] Relay dispatcher started");
+ health_register(health_relayd, HEALTH_RELAYD_TYPE_DISPATCHER);
+
+ health_code_update();
+
while (!CMM_LOAD_SHARED(dispatch_thread_exit)) {
+ health_code_update();
+
/* Atomically prepare the queue futex */
futex_nto1_prepare(&relay_cmd_queue.futex);
do {
+ health_code_update();
+
/* Dequeue commands */
node = cds_wfq_dequeue_blocking(&relay_cmd_queue.queue);
if (node == NULL) {
} while (node != NULL);
/* Futex wait on queue. Blocking call on futex() */
+ health_poll_entry();
futex_nto1_wait(&relay_cmd_queue.futex);
+ health_poll_exit();
}
+ /* Normal exit, no error */
+ err = 0;
+
error:
+ if (err) {
+ health_error();
+ ERR("Health error occurred in %s", __func__);
+ }
+ health_unregister(health_relayd);
DBG("Dispatch thread dying");
stop_threads();
return NULL;
free(session);
}
-static void close_stream(struct relay_stream *stream,
- struct lttng_ht *ctf_traces_ht)
+/*
+ * Close a given stream. The stream is freed using a call RCU.
+ *
+ * RCU read side lock MUST be acquired. If NO close_stream_check() was called
+ * BEFORE the stream lock MUST be acquired.
+ */
+static void destroy_stream(struct relay_stream *stream)
{
int delret;
struct relay_viewer_stream *vstream;
vstream->total_index_received = stream->total_index_received;
}
+ /* Cleanup index of that stream. */
+ relay_index_destroy_by_stream_id(stream->stream_handle);
+
iter.iter.node = &stream->stream_n.node;
delret = lttng_ht_del(relay_streams_ht, &iter);
assert(!delret);
iter.iter.node = &stream->ctf_trace_node.node;
- delret = lttng_ht_del(ctf_traces_ht, &iter);
+ delret = lttng_ht_del(stream->ctf_traces_ht, &iter);
assert(!delret);
call_rcu(&stream->rcu_node, deferred_free_stream);
DBG("Closed tracefile %d from close stream", stream->fd);
rcu_read_lock();
cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, node, node) {
node = lttng_ht_iter_get_node_ulong(&iter);
- if (node) {
- stream = caa_container_of(node,
- struct relay_stream, stream_n);
- if (stream->session == cmd->session) {
- ret = close(stream->fd);
- if (ret < 0) {
- PERROR("close stream fd on delete session");
- }
- ret = lttng_ht_del(relay_streams_ht, &iter);
- assert(!ret);
- call_rcu(&stream->rcu_node,
- deferred_free_stream);
- }
- /* Cleanup index of that stream. */
- relay_index_destroy_by_stream_id(stream->stream_handle,
- indexes_ht);
+ if (!node) {
+ continue;
+ }
+ stream = caa_container_of(node, struct relay_stream, stream_n);
+ if (stream->session == cmd->session) {
+ destroy_stream(stream);
}
}
+
+ /* Make this session not visible anymore. */
iter.iter.node = &cmd->session->session_n.node;
ret = lttng_ht_del(sessions_ht, &iter);
assert(!ret);
- call_rcu(&cmd->session->rcu_node,
- deferred_free_session);
+ call_rcu(&cmd->session->rcu_node, deferred_free_session);
rcu_read_unlock();
}
session->id = ++last_relay_session_id;
session->sock = cmd->sock;
+ session->minor = cmd->minor;
+ session->major = cmd->major;
cmd->session = session;
reply.session_id = htobe64(session->id);
stream->ctf_trace->metadata_stream = stream;
}
ctf_trace_assign(cmd->ctf_traces_ht, stream);
+ stream->ctf_traces_ht = cmd->ctf_traces_ht;
lttng_ht_node_init_ulong(&stream->stream_n,
(unsigned long) stream->stream_handle);
stream->close_flag = 1;
if (close_stream_check(stream)) {
- close_stream(stream, cmd->ctf_traces_ht);
+ destroy_stream(stream);
}
end_unlock:
*/
static
int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
- struct relay_command *cmd, struct lttng_ht *indexes_ht)
+ struct relay_command *cmd)
{
int ret, send_ret, index_created = 0;
struct relay_session *session = cmd->session;
uint64_t net_seq_num;
assert(cmd);
- assert(indexes_ht);
DBG("Relay receiving index");
stream->beacon_ts_end = -1ULL;
}
- index = relay_index_find(stream->stream_handle, net_seq_num, indexes_ht);
+ index = relay_index_find(stream->stream_handle, net_seq_num);
if (!index) {
/* A successful creation will add the object to the HT. */
index = relay_index_create(stream->stream_handle, net_seq_num);
* already exist, destroy back the index created, set the data in this
* object and write it on disk.
*/
- relay_index_add(index, indexes_ht, &wr_index);
+ relay_index_add(index, &wr_index);
if (wr_index) {
copy_index_control_data(wr_index, &index_info);
free(index);
stream->index_fd = ret;
}
- ret = relay_index_write(wr_index->fd, wr_index, indexes_ht);
+ ret = relay_index_write(wr_index->fd, wr_index);
if (ret < 0) {
goto end_rcu_unlock;
}
ret = relay_end_data_pending(recv_hdr, cmd);
break;
case RELAYD_SEND_INDEX:
- ret = relay_recv_index(recv_hdr, cmd, indexes_ht);
+ ret = relay_recv_index(recv_hdr, cmd);
break;
case RELAYD_UPDATE_SYNC_INFO:
default:
return ret;
}
+/*
+ * Handle index for a data stream.
+ *
+ * RCU read side lock MUST be acquired.
+ *
+ * Return 0 on success else a negative value.
+ */
+static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
+ int rotate_index)
+{
+ int ret = 0, index_created = 0;
+ uint64_t stream_id, data_offset;
+ struct relay_index *index, *wr_index = NULL;
+
+ assert(stream);
+
+ stream_id = stream->stream_handle;
+ /* Get data offset because we are about to update the index. */
+ data_offset = htobe64(stream->tracefile_size_current);
+
+ /*
+ * Lookup for an existing index for that stream id/sequence number. If on
+ * exists, the control thread already received the data for it thus we need
+ * to write it on disk.
+ */
+ index = relay_index_find(stream_id, net_seq_num);
+ if (!index) {
+ /* A successful creation will add the object to the HT. */
+ index = relay_index_create(stream_id, net_seq_num);
+ if (!index) {
+ ret = -1;
+ goto error;
+ }
+ index_created = 1;
+ }
+
+ if (rotate_index || stream->index_fd < 0) {
+ index->to_close_fd = stream->index_fd;
+ ret = index_create_file(stream->path_name, stream->channel_name,
+ relayd_uid, relayd_gid, stream->tracefile_size,
+ stream->tracefile_count_current);
+ if (ret < 0) {
+ /* This will close the stream's index fd if one. */
+ relay_index_free_safe(index);
+ goto error;
+ }
+ stream->index_fd = ret;
+ }
+ index->fd = stream->index_fd;
+ index->index_data.offset = data_offset;
+
+ if (index_created) {
+ /*
+ * Try to add the relay index object to the hash table. If an object
+ * already exist, destroy back the index created and set the data.
+ */
+ relay_index_add(index, &wr_index);
+ if (wr_index) {
+ /* Copy back data from the created index. */
+ wr_index->fd = index->fd;
+ wr_index->to_close_fd = index->to_close_fd;
+ wr_index->index_data.offset = data_offset;
+ free(index);
+ }
+ } else {
+ /* The index already exists so write it on disk. */
+ wr_index = index;
+ }
+
+ /* Do we have a writable ready index to write on disk. */
+ if (wr_index) {
+ ret = relay_index_write(wr_index->fd, wr_index);
+ if (ret < 0) {
+ goto error;
+ }
+ stream->total_index_received++;
+ }
+
+error:
+ return ret;
+}
+
/*
* relay_process_data: Process the data received on the data socket
*/
static
-int relay_process_data(struct relay_command *cmd,
- struct lttng_ht *indexes_ht)
+int relay_process_data(struct relay_command *cmd)
{
- int ret = 0, rotate_index = 0, index_created = 0;
+ int ret = 0, rotate_index = 0;
struct relay_stream *stream;
- struct relay_index *index, *wr_index = NULL;
struct lttcomm_relayd_data_hdr data_hdr;
- uint64_t stream_id, data_offset;
+ uint64_t stream_id;
uint64_t net_seq_num;
uint32_t data_size;
rotate_index = 1;
}
- /* Get data offset because we are about to update the index. */
- data_offset = htobe64(stream->tracefile_size_current);
-
/*
- * Lookup for an existing index for that stream id/sequence number. If on
- * exists, the control thread already received the data for it thus we need
- * to write it on disk.
+ * Index are handled in protocol version 2.4 and above. Also, snapshot and
+ * index are NOT supported.
*/
- index = relay_index_find(stream_id, net_seq_num, indexes_ht);
- if (!index) {
- /* A successful creation will add the object to the HT. */
- index = relay_index_create(stream->stream_handle, net_seq_num);
- if (!index) {
- goto end_rcu_unlock;
- }
- index_created = 1;
- }
-
- if (rotate_index || stream->index_fd < 0) {
- index->to_close_fd = stream->index_fd;
- ret = index_create_file(stream->path_name, stream->channel_name,
- relayd_uid, relayd_gid, stream->tracefile_size,
- stream->tracefile_count_current);
+ if (stream->session->minor >= 4 && !stream->session->snapshot) {
+ ret = handle_index_data(stream, net_seq_num, rotate_index);
if (ret < 0) {
- /* This will close the stream's index fd if one. */
- relay_index_free_safe(index);
goto end_rcu_unlock;
}
- stream->index_fd = ret;
- }
- index->fd = stream->index_fd;
- index->index_data.offset = data_offset;
-
- if (index_created) {
- /*
- * Try to add the relay index object to the hash table. If an object
- * already exist, destroy back the index created and set the data.
- */
- relay_index_add(index, indexes_ht, &wr_index);
- if (wr_index) {
- /* Copy back data from the created index. */
- wr_index->fd = index->fd;
- wr_index->to_close_fd = index->to_close_fd;
- wr_index->index_data.offset = data_offset;
- free(index);
- }
- } else {
- /* The index already exists so write it on disk. */
- wr_index = index;
- }
-
- /* Do we have a writable ready index to write on disk. */
- if (wr_index) {
- /* Starting at 2.4, create the index file if none available. */
- if (cmd->minor >= 4 && stream->index_fd < 0) {
- ret = index_create_file(stream->path_name, stream->channel_name,
- relayd_uid, relayd_gid, stream->tracefile_size,
- stream->tracefile_count_current);
- if (ret < 0) {
- goto end_rcu_unlock;
- }
- stream->index_fd = ret;
- }
-
- ret = relay_index_write(wr_index->fd, wr_index, indexes_ht);
- if (ret < 0) {
- goto end_rcu_unlock;
- }
- stream->total_index_received++;
}
+ /* Write data to stream output fd. */
do {
ret = write(stream->fd, data_buffer, data_size);
} while (ret < 0 && errno == EINTR);
/* Check if we need to close the FD */
if (close_stream_check(stream)) {
- close_stream(stream, cmd->ctf_traces_ht);
+ destroy_stream(stream);
}
end_rcu_unlock:
goto error_read;
}
- relay_connection->ctf_traces_ht = lttng_ht_new(0, LTTNG_HT_TYPE_STRING);
- if (!relay_connection->ctf_traces_ht) {
- goto error_read;
+ /*
+ * Only used by the control side and the reference is copied inside each
+ * stream from that connection. Thus a destroy HT must be done after every
+ * stream has been destroyed.
+ */
+ if (relay_connection->type == RELAY_CONTROL) {
+ relay_connection->ctf_traces_ht = lttng_ht_new(0,
+ LTTNG_HT_TYPE_STRING);
+ if (!relay_connection->ctf_traces_ht) {
+ goto error_read;
+ }
}
lttng_ht_node_init_ulong(&relay_connection->sock_n,
struct relay_command *relay_connection =
caa_container_of(head, struct relay_command, rcu_node);
- lttng_ht_destroy(relay_connection->ctf_traces_ht);
lttcomm_destroy_sock(relay_connection->sock);
free(relay_connection);
}
ret = lttng_ht_del(relay_connections_ht, iter);
assert(!ret);
+
if (relay_connection->type == RELAY_CONTROL) {
relay_delete_session(relay_connection, sessions_ht);
+ lttng_ht_destroy(relay_connection->ctf_traces_ht);
}
- call_rcu(&relay_connection->rcu_node,
- deferred_free_connection);
+ call_rcu(&relay_connection->rcu_node, deferred_free_connection);
}
/*
rcu_register_thread();
+ health_register(health_relayd, HEALTH_RELAYD_TYPE_WORKER);
+
+ health_code_update();
+
/* table of connections indexed on socket */
relay_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
if (!relay_connections_ht) {
while (1) {
int idx = -1, i, seen_control = 0, last_notdel_data_fd = -1;
+ health_code_update();
+
/* Infinite blocking call, waiting for transmission */
DBG3("Relayd worker thread polling...");
+ health_poll_entry();
ret = lttng_poll_wait(&events, -1);
+ health_poll_exit();
if (ret < 0) {
/*
* Restart interrupted system call.
uint32_t revents = LTTNG_POLL_GETEV(&events, i);
int pollfd = LTTNG_POLL_GETFD(&events, i);
+ health_code_update();
+
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
if (last_seen_data_fd >= 0) {
for (i = 0; i < nb_fd; i++) {
int pollfd = LTTNG_POLL_GETFD(&events, i);
+
+ health_code_update();
+
if (last_seen_data_fd == pollfd) {
idx = i;
break;
uint32_t revents = LTTNG_POLL_GETEV(&events, i);
int pollfd = LTTNG_POLL_GETFD(&events, i);
+ health_code_update();
+
/* Skip the command pipe. It's handled in the first loop. */
if (pollfd == relay_cmd_pipe[0]) {
continue;
continue;
}
- ret = relay_process_data(relay_connection, indexes_ht);
+ ret = relay_process_data(relay_connection);
/* connection closed */
if (ret < 0) {
relay_cleanup_poll_connection(&events, pollfd);
last_seen_data_fd = -1;
}
+ /* Normal exit, no error */
+ ret = 0;
+
exit:
error:
lttng_poll_clean(&events);
/* empty the hash table and free the memory */
rcu_read_lock();
cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, node, node) {
+ health_code_update();
+
node = lttng_ht_iter_get_node_ulong(&iter);
if (node) {
relay_connection = caa_container_of(node,
&iter, relay_connection, sessions_ht);
}
}
-error_poll_create:
- {
- struct relay_index *index;
- cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, index_n.node) {
- relay_index_delete(index, indexes_ht);
- }
- lttng_ht_destroy(indexes_ht);
- }
rcu_read_unlock();
+error_poll_create:
+ lttng_ht_destroy(indexes_ht);
indexes_ht_error:
lttng_ht_destroy(relay_connections_ht);
relay_connections_ht_error:
}
DBG("Worker thread cleanup complete");
free(data_buffer);
- stop_threads();
+ if (err) {
+ health_error();
+ ERR("Health error occurred in %s", __func__);
+ }
+ health_unregister(health_relayd);
rcu_unregister_thread();
+ stop_threads();
return NULL;
}
goto exit_relay_ctx_viewer_streams;
}
+ /* Initialize thread health monitoring */
+ health_relayd = health_app_create(NR_HEALTH_RELAYD_TYPES);
+ if (!health_relayd) {
+ PERROR("health_app_create error");
+ goto exit_health_app_create;
+ }
+
+ ret = utils_create_pipe(health_quit_pipe);
+ if (ret < 0) {
+ goto error_health_pipe;
+ }
+
+ /* Create thread to manage the client socket */
+ ret = pthread_create(&health_thread, NULL,
+ thread_manage_health, (void *) NULL);
+ if (ret != 0) {
+ PERROR("pthread_create health");
+ goto health_error;
+ }
+
/* Setup the dispatcher thread */
ret = pthread_create(&dispatcher_thread, NULL,
relay_thread_dispatcher, (void *) NULL);
goto exit_listener;
}
- ret = live_start_threads(live_uri, relay_ctx);
+ ret = live_start_threads(live_uri, relay_ctx, thread_quit_pipe);
if (ret != 0) {
ERR("Starting live viewer threads");
+ goto exit_live;
}
-exit_listener:
+exit_live:
ret = pthread_join(listener_thread, &status);
if (ret != 0) {
PERROR("pthread_join");
goto error; /* join error, exit without cleanup */
}
-exit_worker:
+exit_listener:
ret = pthread_join(worker_thread, &status);
if (ret != 0) {
PERROR("pthread_join");
goto error; /* join error, exit without cleanup */
}
-exit_dispatcher:
+exit_worker:
ret = pthread_join(dispatcher_thread, &status);
if (ret != 0) {
PERROR("pthread_join");
goto error; /* join error, exit without cleanup */
}
+
+exit_dispatcher:
+ ret = pthread_join(health_thread, &status);
+ if (ret != 0) {
+ PERROR("pthread_join health thread");
+ goto error; /* join error, exit without cleanup */
+ }
+
+ /*
+ * Stop live threads only after joining other threads.
+ */
+ live_stop_threads();
+
+health_error:
+ utils_close_pipe(health_quit_pipe);
+
+error_health_pipe:
+ health_app_destroy(health_relayd);
+
+exit_health_app_create:
lttng_ht_destroy(viewer_streams_ht);
exit_relay_ctx_viewer_streams:
free(relay_ctx);
exit:
- live_stop_threads();
cleanup();
if (!ret) {
exit(EXIT_SUCCESS);