projects
/
lttng-tools.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Comment why we cannot rmdir the lttng and relayd rundir
[lttng-tools.git]
/
src
/
bin
/
lttng-relayd
/
main.c
diff --git
a/src/bin/lttng-relayd/main.c
b/src/bin/lttng-relayd/main.c
index bf7be3e25c77b299e139ce8c46d49e6b8b9bb667..1bdef652eb64d7ad60ffb3b582ca540f24a62361 100644
(file)
--- a/
src/bin/lttng-relayd/main.c
+++ b/
src/bin/lttng-relayd/main.c
@@
-68,6
+68,8
@@
static struct lttng_uri *live_uri;
const char *progname;
const char *progname;
+const char *tracing_group_name = DEFAULT_TRACING_GROUP;
+
/*
* Quit pipe for all threads. This permits a single cancellation point
* for all threads when receiving an event on the pipe.
/*
* Quit pipe for all threads. This permits a single cancellation point
* for all threads when receiving an event on the pipe.
@@
-86,6
+88,7
@@
static int dispatch_thread_exit;
static pthread_t listener_thread;
static pthread_t dispatcher_thread;
static pthread_t worker_thread;
static pthread_t listener_thread;
static pthread_t dispatcher_thread;
static pthread_t worker_thread;
+static pthread_t health_thread;
static uint64_t last_relay_stream_id;
static uint64_t last_relay_session_id;
static uint64_t last_relay_stream_id;
static uint64_t last_relay_session_id;
@@
-116,7
+119,7
@@
struct lttng_ht *viewer_streams_ht;
struct lttng_ht *indexes_ht;
/* Relayd health monitoring */
struct lttng_ht *indexes_ht;
/* Relayd health monitoring */
-st
atic st
ruct health_app *health_relayd;
+struct health_app *health_relayd;
/*
* usage function on stderr
/*
* usage function on stderr
@@
-131,6
+134,7
@@
void usage(void)
fprintf(stderr, " -D, --data-port URL Data port listening.\n");
fprintf(stderr, " -o, --output PATH Output path for traces. Must use an absolute path.\n");
fprintf(stderr, " -v, --verbose Verbose mode. Activate DBG() macro.\n");
fprintf(stderr, " -D, --data-port URL Data port listening.\n");
fprintf(stderr, " -o, --output PATH Output path for traces. Must use an absolute path.\n");
fprintf(stderr, " -v, --verbose Verbose mode. Activate DBG() macro.\n");
+ fprintf(stderr, " -g, --group NAME Specify the tracing group name. (default: tracing)\n");
}
static
}
static
@@
-144,6
+148,7
@@
int parse_args(int argc, char **argv)
{ "control-port", 1, 0, 'C', },
{ "data-port", 1, 0, 'D', },
{ "daemonize", 0, 0, 'd', },
{ "control-port", 1, 0, 'C', },
{ "data-port", 1, 0, 'D', },
{ "daemonize", 0, 0, 'd', },
+ { "group", 1, 0, 'g', },
{ "help", 0, 0, 'h', },
{ "output", 1, 0, 'o', },
{ "verbose", 0, 0, 'v', },
{ "help", 0, 0, 'h', },
{ "output", 1, 0, 'o', },
{ "verbose", 0, 0, 'v', },
@@
-152,7
+157,7
@@
int parse_args(int argc, char **argv)
while (1) {
int option_index = 0;
while (1) {
int option_index = 0;
- c = getopt_long(argc, argv, "dhv" "C:D:o:",
+ c = getopt_long(argc, argv, "dhv" "C:D:o:
g:
",
long_options, &option_index);
if (c == -1) {
break;
long_options, &option_index);
if (c == -1) {
break;
@@
-188,6
+193,9
@@
int parse_args(int argc, char **argv)
case 'd':
opt_daemon = 1;
break;
case 'd':
opt_daemon = 1;
break;
+ case 'g':
+ tracing_group_name = optarg;
+ break;
case 'h':
usage();
exit(EXIT_FAILURE);
case 'h':
usage();
exit(EXIT_FAILURE);
@@
-297,6
+305,18
@@
int notify_thread_pipe(int wpipe)
return ret;
}
return ret;
}
+static void notify_health_quit_pipe(int *pipe)
+{
+ int ret;
+
+ do {
+ ret = write(pipe[1], "4", 1);
+ } while (ret < 0 && errno == EINTR);
+ if (ret < 0 || ret != 1) {
+ PERROR("write relay health quit");
+ }
+}
+
/*
* Stop all threads by closing the thread quit pipe.
*/
/*
* Stop all threads by closing the thread quit pipe.
*/
@@
-312,6
+332,8
@@
void stop_threads(void)
ERR("write error on thread quit pipe");
}
ERR("write error on thread quit pipe");
}
+ notify_health_quit_pipe(health_quit_pipe);
+
/* Dispatch thread */
CMM_STORE_SHARED(dispatch_thread_exit, 1);
futex_nto1_wake(&relay_cmd_queue.futex);
/* Dispatch thread */
CMM_STORE_SHARED(dispatch_thread_exit, 1);
futex_nto1_wake(&relay_cmd_queue.futex);
@@
-519,6
+541,8
@@
void *relay_thread_listener(void *data)
health_register(health_relayd, HEALTH_RELAYD_TYPE_LISTENER);
health_register(health_relayd, HEALTH_RELAYD_TYPE_LISTENER);
+ health_code_update();
+
control_sock = relay_init_sock(control_uri);
if (!control_sock) {
goto error_sock_control;
control_sock = relay_init_sock(control_uri);
if (!control_sock) {
goto error_sock_control;
@@
-550,10
+574,14
@@
void *relay_thread_listener(void *data)
}
while (1) {
}
while (1) {
+ health_code_update();
+
DBG("Listener accepting connections");
restart:
DBG("Listener accepting connections");
restart:
+ health_poll_entry();
ret = lttng_poll_wait(&events, -1);
ret = lttng_poll_wait(&events, -1);
+ health_poll_exit();
if (ret < 0) {
/*
* Restart interrupted system call.
if (ret < 0) {
/*
* Restart interrupted system call.
@@
-568,6
+596,8
@@
restart:
DBG("Relay new connection received");
for (i = 0; i < nb_fd; i++) {
DBG("Relay new connection received");
for (i = 0; i < nb_fd; i++) {
+ health_code_update();
+
/* Fetch once the poll data */
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
/* Fetch once the poll data */
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
@@
-662,7
+692,8
@@
error_sock_relay:
lttcomm_destroy_sock(control_sock);
error_sock_control:
if (err) {
lttcomm_destroy_sock(control_sock);
error_sock_control:
if (err) {
- DBG("Thread exited with error");
+ health_error();
+ ERR("Health error occurred in %s", __func__);
}
health_unregister(health_relayd);
DBG("Relay listener thread cleanup complete");
}
health_unregister(health_relayd);
DBG("Relay listener thread cleanup complete");
@@
-676,7
+707,7
@@
error_sock_control:
static
void *relay_thread_dispatcher(void *data)
{
static
void *relay_thread_dispatcher(void *data)
{
- int ret;
+ int ret
, err = -1
;
struct cds_wfq_node *node;
struct relay_command *relay_cmd = NULL;
struct cds_wfq_node *node;
struct relay_command *relay_cmd = NULL;
@@
-684,11
+715,17
@@
void *relay_thread_dispatcher(void *data)
health_register(health_relayd, HEALTH_RELAYD_TYPE_DISPATCHER);
health_register(health_relayd, HEALTH_RELAYD_TYPE_DISPATCHER);
+ health_code_update();
+
while (!CMM_LOAD_SHARED(dispatch_thread_exit)) {
while (!CMM_LOAD_SHARED(dispatch_thread_exit)) {
+ health_code_update();
+
/* Atomically prepare the queue futex */
futex_nto1_prepare(&relay_cmd_queue.futex);
do {
/* Atomically prepare the queue futex */
futex_nto1_prepare(&relay_cmd_queue.futex);
do {
+ health_code_update();
+
/* Dequeue commands */
node = cds_wfq_dequeue_blocking(&relay_cmd_queue.queue);
if (node == NULL) {
/* Dequeue commands */
node = cds_wfq_dequeue_blocking(&relay_cmd_queue.queue);
if (node == NULL) {
@@
-717,10
+754,19
@@
void *relay_thread_dispatcher(void *data)
} while (node != NULL);
/* Futex wait on queue. Blocking call on futex() */
} while (node != NULL);
/* Futex wait on queue. Blocking call on futex() */
+ health_poll_entry();
futex_nto1_wait(&relay_cmd_queue.futex);
futex_nto1_wait(&relay_cmd_queue.futex);
+ health_poll_exit();
}
}
+ /* Normal exit, no error */
+ err = 0;
+
error:
error:
+ if (err) {
+ health_error();
+ ERR("Health error occurred in %s", __func__);
+ }
health_unregister(health_relayd);
DBG("Dispatch thread dying");
stop_threads();
health_unregister(health_relayd);
DBG("Dispatch thread dying");
stop_threads();
@@
-780,8
+826,7
@@
void deferred_free_session(struct rcu_head *head)
* RCU read side lock MUST be acquired. If NO close_stream_check() was called
* BEFORE the stream lock MUST be acquired.
*/
* RCU read side lock MUST be acquired. If NO close_stream_check() was called
* BEFORE the stream lock MUST be acquired.
*/
-static void destroy_stream(struct relay_stream *stream,
- struct lttng_ht *ctf_traces_ht)
+static void destroy_stream(struct relay_stream *stream)
{
int delret;
struct relay_viewer_stream *vstream;
{
int delret;
struct relay_viewer_stream *vstream;
@@
-819,7
+864,7
@@
static void destroy_stream(struct relay_stream *stream,
delret = lttng_ht_del(relay_streams_ht, &iter);
assert(!delret);
iter.iter.node = &stream->ctf_trace_node.node;
delret = lttng_ht_del(relay_streams_ht, &iter);
assert(!delret);
iter.iter.node = &stream->ctf_trace_node.node;
- delret = lttng_ht_del(ctf_traces_ht, &iter);
+ delret = lttng_ht_del(
stream->
ctf_traces_ht, &iter);
assert(!delret);
call_rcu(&stream->rcu_node, deferred_free_stream);
DBG("Closed tracefile %d from close stream", stream->fd);
assert(!delret);
call_rcu(&stream->rcu_node, deferred_free_stream);
DBG("Closed tracefile %d from close stream", stream->fd);
@@
-852,7
+897,9
@@
void relay_delete_session(struct relay_command *cmd,
}
stream = caa_container_of(node, struct relay_stream, stream_n);
if (stream->session == cmd->session) {
}
stream = caa_container_of(node, struct relay_stream, stream_n);
if (stream->session == cmd->session) {
- destroy_stream(stream, cmd->ctf_traces_ht);
+ destroy_stream(stream);
+ cmd->session->stream_count--;
+ assert(cmd->session->stream_count >= 0);
}
}
}
}
@@
-1040,6
+1087,7
@@
int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
stream->ctf_trace->metadata_stream = stream;
}
ctf_trace_assign(cmd->ctf_traces_ht, stream);
stream->ctf_trace->metadata_stream = stream;
}
ctf_trace_assign(cmd->ctf_traces_ht, stream);
+ stream->ctf_traces_ht = cmd->ctf_traces_ht;
lttng_ht_node_init_ulong(&stream->stream_n,
(unsigned long) stream->stream_handle);
lttng_ht_node_init_ulong(&stream->stream_n,
(unsigned long) stream->stream_handle);
@@
-1048,6
+1096,7
@@
int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
lttng_ht_node_init_str(&stream->ctf_trace_node, stream->path_name);
lttng_ht_add_str(cmd->ctf_traces_ht, &stream->ctf_trace_node);
lttng_ht_node_init_str(&stream->ctf_trace_node, stream->path_name);
lttng_ht_add_str(cmd->ctf_traces_ht, &stream->ctf_trace_node);
+ session->stream_count++;
DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name,
stream->stream_handle);
DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name,
stream->stream_handle);
@@
-1124,9
+1173,11
@@
int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
stream->close_flag = 1;
stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
stream->close_flag = 1;
+ session->stream_count--;
+ assert(session->stream_count >= 0);
if (close_stream_check(stream)) {
if (close_stream_check(stream)) {
- destroy_stream(stream
, cmd->ctf_traces_ht
);
+ destroy_stream(stream);
}
end_unlock:
}
end_unlock:
@@
-2042,7
+2093,7
@@
int relay_process_data(struct relay_command *cmd)
/* Check if we need to close the FD */
if (close_stream_check(stream)) {
/* Check if we need to close the FD */
if (close_stream_check(stream)) {
- destroy_stream(stream
, cmd->ctf_traces_ht
);
+ destroy_stream(stream);
}
end_rcu_unlock:
}
end_rcu_unlock:
@@
-2084,9
+2135,17
@@
int relay_add_connection(int fd, struct lttng_poll_event *events,
goto error_read;
}
goto error_read;
}
- relay_connection->ctf_traces_ht = lttng_ht_new(0, LTTNG_HT_TYPE_STRING);
- if (!relay_connection->ctf_traces_ht) {
- goto error_read;
+ /*
+ * Only used by the control side and the reference is copied inside each
+ * stream from that connection. Thus a destroy HT must be done after every
+ * stream has been destroyed.
+ */
+ if (relay_connection->type == RELAY_CONTROL) {
+ relay_connection->ctf_traces_ht = lttng_ht_new(0,
+ LTTNG_HT_TYPE_STRING);
+ if (!relay_connection->ctf_traces_ht) {
+ goto error_read;
+ }
}
lttng_ht_node_init_ulong(&relay_connection->sock_n,
}
lttng_ht_node_init_ulong(&relay_connection->sock_n,
@@
-2111,7
+2170,6
@@
void deferred_free_connection(struct rcu_head *head)
struct relay_command *relay_connection =
caa_container_of(head, struct relay_command, rcu_node);
struct relay_command *relay_connection =
caa_container_of(head, struct relay_command, rcu_node);
- lttng_ht_destroy(relay_connection->ctf_traces_ht);
lttcomm_destroy_sock(relay_connection->sock);
free(relay_connection);
}
lttcomm_destroy_sock(relay_connection->sock);
free(relay_connection);
}
@@
-2125,12
+2183,13
@@
void relay_del_connection(struct lttng_ht *relay_connections_ht,
ret = lttng_ht_del(relay_connections_ht, iter);
assert(!ret);
ret = lttng_ht_del(relay_connections_ht, iter);
assert(!ret);
+
if (relay_connection->type == RELAY_CONTROL) {
relay_delete_session(relay_connection, sessions_ht);
if (relay_connection->type == RELAY_CONTROL) {
relay_delete_session(relay_connection, sessions_ht);
+ lttng_ht_destroy(relay_connection->ctf_traces_ht);
}
}
- call_rcu(&relay_connection->rcu_node,
- deferred_free_connection);
+ call_rcu(&relay_connection->rcu_node, deferred_free_connection);
}
/*
}
/*
@@
-2156,6
+2215,8
@@
void *relay_thread_worker(void *data)
health_register(health_relayd, HEALTH_RELAYD_TYPE_WORKER);
health_register(health_relayd, HEALTH_RELAYD_TYPE_WORKER);
+ health_code_update();
+
/* table of connections indexed on socket */
relay_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
if (!relay_connections_ht) {
/* table of connections indexed on socket */
relay_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
if (!relay_connections_ht) {
@@
-2182,9
+2243,13
@@
restart:
while (1) {
int idx = -1, i, seen_control = 0, last_notdel_data_fd = -1;
while (1) {
int idx = -1, i, seen_control = 0, last_notdel_data_fd = -1;
+ health_code_update();
+
/* Infinite blocking call, waiting for transmission */
DBG3("Relayd worker thread polling...");
/* Infinite blocking call, waiting for transmission */
DBG3("Relayd worker thread polling...");
+ health_poll_entry();
ret = lttng_poll_wait(&events, -1);
ret = lttng_poll_wait(&events, -1);
+ health_poll_exit();
if (ret < 0) {
/*
* Restart interrupted system call.
if (ret < 0) {
/*
* Restart interrupted system call.
@@
-2207,6
+2272,8
@@
restart:
uint32_t revents = LTTNG_POLL_GETEV(&events, i);
int pollfd = LTTNG_POLL_GETFD(&events, i);
uint32_t revents = LTTNG_POLL_GETEV(&events, i);
int pollfd = LTTNG_POLL_GETFD(&events, i);
+ health_code_update();
+
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
@@
-2309,6
+2376,9
@@
restart:
if (last_seen_data_fd >= 0) {
for (i = 0; i < nb_fd; i++) {
int pollfd = LTTNG_POLL_GETFD(&events, i);
if (last_seen_data_fd >= 0) {
for (i = 0; i < nb_fd; i++) {
int pollfd = LTTNG_POLL_GETFD(&events, i);
+
+ health_code_update();
+
if (last_seen_data_fd == pollfd) {
idx = i;
break;
if (last_seen_data_fd == pollfd) {
idx = i;
break;
@@
-2322,6
+2392,8
@@
restart:
uint32_t revents = LTTNG_POLL_GETEV(&events, i);
int pollfd = LTTNG_POLL_GETFD(&events, i);
uint32_t revents = LTTNG_POLL_GETEV(&events, i);
int pollfd = LTTNG_POLL_GETFD(&events, i);
+ health_code_update();
+
/* Skip the command pipe. It's handled in the first loop. */
if (pollfd == relay_cmd_pipe[0]) {
continue;
/* Skip the command pipe. It's handled in the first loop. */
if (pollfd == relay_cmd_pipe[0]) {
continue;
@@
-2371,6
+2443,9
@@
restart:
last_seen_data_fd = -1;
}
last_seen_data_fd = -1;
}
+ /* Normal exit, no error */
+ ret = 0;
+
exit:
error:
lttng_poll_clean(&events);
exit:
error:
lttng_poll_clean(&events);
@@
-2378,6
+2453,8
@@
error:
/* empty the hash table and free the memory */
rcu_read_lock();
cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, node, node) {
/* empty the hash table and free the memory */
rcu_read_lock();
cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, node, node) {
+ health_code_update();
+
node = lttng_ht_iter_get_node_ulong(&iter);
if (node) {
relay_connection = caa_container_of(node,
node = lttng_ht_iter_get_node_ulong(&iter);
if (node) {
relay_connection = caa_container_of(node,
@@
-2397,11
+2474,15
@@
relay_connections_ht_error:
if (err) {
DBG("Thread exited with error");
}
if (err) {
DBG("Thread exited with error");
}
- health_unregister(health_relayd);
DBG("Worker thread cleanup complete");
free(data_buffer);
DBG("Worker thread cleanup complete");
free(data_buffer);
- stop_threads();
+ if (err) {
+ health_error();
+ ERR("Health error occurred in %s", __func__);
+ }
+ health_unregister(health_relayd);
rcu_unregister_thread();
rcu_unregister_thread();
+ stop_threads();
return NULL;
}
return NULL;
}
@@
-2523,6
+2604,19
@@
int main(int argc, char **argv)
goto exit_health_app_create;
}
goto exit_health_app_create;
}
+ ret = utils_create_pipe(health_quit_pipe);
+ if (ret < 0) {
+ goto error_health_pipe;
+ }
+
+ /* Create thread to manage the client socket */
+ ret = pthread_create(&health_thread, NULL,
+ thread_manage_health, (void *) NULL);
+ if (ret != 0) {
+ PERROR("pthread_create health");
+ goto health_error;
+ }
+
/* Setup the dispatcher thread */
ret = pthread_create(&dispatcher_thread, NULL,
relay_thread_dispatcher, (void *) NULL);
/* Setup the dispatcher thread */
ret = pthread_create(&dispatcher_thread, NULL,
relay_thread_dispatcher, (void *) NULL);
@@
-2553,8
+2647,6
@@
int main(int argc, char **argv)
goto exit_live;
}
goto exit_live;
}
- live_stop_threads();
-
exit_live:
ret = pthread_join(listener_thread, &status);
if (ret != 0) {
exit_live:
ret = pthread_join(listener_thread, &status);
if (ret != 0) {
@@
-2577,6
+2669,21
@@
exit_worker:
}
exit_dispatcher:
}
exit_dispatcher:
+ ret = pthread_join(health_thread, &status);
+ if (ret != 0) {
+ PERROR("pthread_join health thread");
+ goto error; /* join error, exit without cleanup */
+ }
+
+ /*
+ * Stop live threads only after joining other threads.
+ */
+ live_stop_threads();
+
+health_error:
+ utils_close_pipe(health_quit_pipe);
+
+error_health_pipe:
health_app_destroy(health_relayd);
exit_health_app_create:
health_app_destroy(health_relayd);
exit_health_app_create:
This page took
0.030377 seconds
and
4
git commands to generate.