From: David Goulet Date: Mon, 11 Mar 2013 17:00:33 +0000 (-0400) Subject: Merge remote-tracking branch 'cbab-github/tests-cleanup' into cbab X-Git-Tag: v2.2.0-rc1~60 X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=commitdiff_plain;h=5d2e1e66a968d9e555f9b8b00d0589ebfaf3de32;hp=0ea4ac57c12607472d511f7eadf2826056fbc018 Merge remote-tracking branch 'cbab-github/tests-cleanup' into cbab Conflicts: tests/regression/tools/streaming/test_high_throughput_limits tests/unit/test_kernel_data.c tests/unit/test_ust_data.c --- diff --git a/extras/bindings/swig/python/Makefile.am b/extras/bindings/swig/python/Makefile.am index c6a69010e..6e7baa69d 100644 --- a/extras/bindings/swig/python/Makefile.am +++ b/extras/bindings/swig/python/Makefile.am @@ -4,16 +4,14 @@ lttng.i: lttng.i.in AM_CFLAGS = -I$(PYTHON_INCLUDE) -I$(top_srcdir)/lib/lttng-ctl -I../common \ $(BUDDY_CFLAGS) -EXTRA_DIST = lttng.i -python_PYTHON = lttng.py +EXTRA_DIST = lttng.i.in +nodist_python_PYTHON = lttng.py pyexec_LTLIBRARIES = _lttng.la MAINTAINERCLEANFILES = lttng_wrap.c lttng.py -_lttng_la_SOURCES = lttng_wrap.c - +nodist__lttng_la_SOURCES = lttng_wrap.c _lttng_la_LDFLAGS = -module - _lttng_la_LIBADD = $(top_srcdir)/src/lib/lttng-ctl/liblttng-ctl.la \ $(top_srcdir)/src/common/sessiond-comm/libsessiond-comm.la diff --git a/src/bin/lttng-consumerd/lttng-consumerd.c b/src/bin/lttng-consumerd/lttng-consumerd.c index b854aabac..84868077c 100644 --- a/src/bin/lttng-consumerd/lttng-consumerd.c +++ b/src/bin/lttng-consumerd/lttng-consumerd.c @@ -51,8 +51,8 @@ /* TODO : support UST (all direct kernel-ctl accesses). */ -/* the two threads (receive fd, poll and metadata) */ -static pthread_t data_thread, metadata_thread, sessiond_thread; +/* threads (channel handling, poll, metadata, sessiond) */ +static pthread_t channel_thread, data_thread, metadata_thread, sessiond_thread; /* to count the number of times the user pressed ctrl+c */ static int sigintcount = 0; @@ -363,12 +363,20 @@ int main(int argc, char **argv) } lttng_consumer_set_error_sock(ctx, ret); + /* Create thread to manage channels */ + ret = pthread_create(&channel_thread, NULL, consumer_thread_channel_poll, + (void *) ctx); + if (ret != 0) { + perror("pthread_create"); + goto error; + } + /* Create thread to manage the polling/writing of trace metadata */ ret = pthread_create(&metadata_thread, NULL, consumer_thread_metadata_poll, (void *) ctx); if (ret != 0) { perror("pthread_create"); - goto error; + goto metadata_error; } /* Create thread to manage the polling/writing of trace data */ @@ -407,6 +415,13 @@ data_error: goto error; } +metadata_error: + ret = pthread_join(channel_thread, &status); + if (ret != 0) { + perror("pthread_join"); + goto error; + } + if (!ret) { ret = EXIT_SUCCESS; lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_EXIT_SUCCESS); diff --git a/src/bin/lttng-sessiond/Makefile.am b/src/bin/lttng-sessiond/Makefile.am index 0964c9402..26a2d13a0 100644 --- a/src/bin/lttng-sessiond/Makefile.am +++ b/src/bin/lttng-sessiond/Makefile.am @@ -9,7 +9,7 @@ bin_PROGRAMS = lttng-sessiond lttng_sessiond_SOURCES = utils.c utils.h \ trace-kernel.c trace-kernel.h \ kernel.c kernel.h \ - ust-ctl.h ust-app.h trace-ust.h \ + ust-ctl.h ust-app.h trace-ust.h ust-thread.h \ context.c context.h \ channel.c channel.h \ event.c event.h \ @@ -26,7 +26,9 @@ lttng_sessiond_SOURCES = utils.c utils.h \ testpoint.h if HAVE_LIBLTTNG_UST_CTL -lttng_sessiond_SOURCES += trace-ust.c ust-app.c ust-consumer.c ust-consumer.h +lttng_sessiond_SOURCES += trace-ust.c ust-registry.c ust-registry.h ust-app.c \ + ust-consumer.c ust-consumer.h ust-thread.c \ + ust-metadata.c ust-clock.h endif # Add main.c at the end for compile order diff --git a/src/bin/lttng-sessiond/channel.c b/src/bin/lttng-sessiond/channel.c index 7ebe4b16e..91be4f41c 100644 --- a/src/bin/lttng-sessiond/channel.c +++ b/src/bin/lttng-sessiond/channel.c @@ -49,8 +49,6 @@ struct lttng_channel *channel_new_default_attr(int dom) } chan->attr.overwrite = DEFAULT_CHANNEL_OVERWRITE; - chan->attr.switch_timer_interval = DEFAULT_CHANNEL_SWITCH_TIMER; - chan->attr.read_timer_interval = DEFAULT_CHANNEL_READ_TIMER; switch (dom) { case LTTNG_DOMAIN_KERNEL: @@ -58,6 +56,8 @@ struct lttng_channel *channel_new_default_attr(int dom) default_get_kernel_channel_subbuf_size(); chan->attr.num_subbuf = DEFAULT_KERNEL_CHANNEL_SUBBUF_NUM; chan->attr.output = DEFAULT_KERNEL_CHANNEL_OUTPUT; + chan->attr.switch_timer_interval = DEFAULT_KERNEL_CHANNEL_SWITCH_TIMER; + chan->attr.read_timer_interval = DEFAULT_KERNEL_CHANNEL_READ_TIMER; break; case LTTNG_DOMAIN_UST: #if 0 @@ -68,6 +68,8 @@ struct lttng_channel *channel_new_default_attr(int dom) chan->attr.subbuf_size = default_get_ust_channel_subbuf_size(); chan->attr.num_subbuf = DEFAULT_UST_CHANNEL_SUBBUF_NUM; chan->attr.output = DEFAULT_UST_CHANNEL_OUTPUT; + chan->attr.switch_timer_interval = DEFAULT_UST_CHANNEL_SWITCH_TIMER; + chan->attr.read_timer_interval = DEFAULT_UST_CHANNEL_READ_TIMER; break; default: goto error; /* Not implemented */ diff --git a/src/bin/lttng-sessiond/cmd.c b/src/bin/lttng-sessiond/cmd.c index a47c504c7..1a8fbba1e 100644 --- a/src/bin/lttng-sessiond/cmd.c +++ b/src/bin/lttng-sessiond/cmd.c @@ -38,12 +38,11 @@ /* * Used to keep a unique index for each relayd socket created where this value * is associated with streams on the consumer so it can match the right relayd - * to send to. - * - * This value should be incremented atomically for safety purposes and future - * possible concurrent access. + * to send to. It must be accessed with the relayd_net_seq_idx_lock + * held. */ -static unsigned int relayd_net_seq_idx; +static pthread_mutex_t relayd_net_seq_idx_lock = PTHREAD_MUTEX_INITIALIZER; +static uint64_t relayd_net_seq_idx; /* * Create a session path used by list_lttng_sessions for the case that the @@ -566,15 +565,15 @@ static int send_consumer_relayd_socket(int domain, struct ltt_session *session, } /* Set the network sequence index if not set. */ - if (consumer->net_seq_index == -1) { + if (consumer->net_seq_index == (uint64_t) -1ULL) { + pthread_mutex_lock(&relayd_net_seq_idx_lock); /* * Increment net_seq_idx because we are about to transfer the * new relayd socket to the consumer. + * Assign unique key so the consumer can match streams. */ - uatomic_inc(&relayd_net_seq_idx); - /* Assign unique key so the consumer can match streams */ - uatomic_set(&consumer->net_seq_index, - uatomic_read(&relayd_net_seq_idx)); + consumer->net_seq_index = ++relayd_net_seq_idx; + pthread_mutex_unlock(&relayd_net_seq_idx_lock); } /* Send relayd socket to consumer. */ @@ -2136,10 +2135,12 @@ error: void cmd_init(void) { /* - * Set network sequence index to 1 for streams to match a relayd socket on - * the consumer side. + * Set network sequence index to 1 for streams to match a relayd + * socket on the consumer side. */ - uatomic_set(&relayd_net_seq_idx, 1); + pthread_mutex_lock(&relayd_net_seq_idx_lock); + relayd_net_seq_idx = 1; + pthread_mutex_unlock(&relayd_net_seq_idx_lock); DBG("Command subsystem initialized"); } diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index ff5360aae..92abcf21d 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -76,7 +77,7 @@ end: * negative value is sent back and both parameters are untouched. */ int consumer_recv_status_channel(struct consumer_socket *sock, - unsigned long *key, unsigned int *stream_count) + uint64_t *key, unsigned int *stream_count) { int ret; struct lttcomm_consumer_status_channel reply; @@ -359,7 +360,7 @@ struct consumer_output *consumer_create_output(enum consumer_dst_type type) /* By default, consumer output is enabled */ output->enabled = 1; output->type = type; - output->net_seq_index = -1; + output->net_seq_index = (uint64_t) -1ULL; output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); @@ -622,8 +623,8 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, const char *name, uid_t uid, gid_t gid, - int relayd_id, - unsigned long key, + uint64_t relayd_id, + uint64_t key, unsigned char *uuid) { assert(msg); @@ -660,12 +661,12 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, */ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, enum lttng_consumer_command cmd, - int channel_key, + uint64_t channel_key, uint64_t session_id, const char *pathname, uid_t uid, gid_t gid, - int relayd_id, + uint64_t relayd_id, const char *name, unsigned int nb_init_streams, enum lttng_event_output output, @@ -700,8 +701,8 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, */ void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg, enum lttng_consumer_command cmd, - int channel_key, - int stream_key, + uint64_t channel_key, + uint64_t stream_key, int cpu) { assert(msg); @@ -758,7 +759,7 @@ error: */ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, struct lttcomm_sock *sock, struct consumer_output *consumer, - enum lttng_stream_type type, unsigned int session_id) + enum lttng_stream_type type, uint64_t session_id) { int ret; struct lttcomm_consumer_msg msg; @@ -865,7 +866,7 @@ error: * This function has a different behavior with the consumer i.e. that it waits * for a reply from the consumer if yes or no the data is pending. */ -int consumer_is_data_pending(unsigned int id, +int consumer_is_data_pending(uint64_t session_id, struct consumer_output *consumer) { int ret; @@ -878,9 +879,9 @@ int consumer_is_data_pending(unsigned int id, msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING; - msg.u.data_pending.session_id = (uint64_t) id; + msg.u.data_pending.session_id = session_id; - DBG3("Consumer data pending for id %u", id); + DBG3("Consumer data pending for id %" PRIu64, session_id); /* Send command for each consumer */ rcu_read_lock(); @@ -924,8 +925,8 @@ int consumer_is_data_pending(unsigned int id, } rcu_read_unlock(); - DBG("Consumer data is %s pending for session id %u", - ret_code == 1 ? "" : "NOT", id); + DBG("Consumer data is %s pending for session id %" PRIu64, + ret_code == 1 ? "" : "NOT", session_id); return ret_code; error_unlock: diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h index af337baa3..3616d467c 100644 --- a/src/bin/lttng-sessiond/consumer.h +++ b/src/bin/lttng-sessiond/consumer.h @@ -126,7 +126,7 @@ struct consumer_output { * side. It tells the consumer which streams goes to which relayd with this * index. The relayd sockets are index with it on the consumer side. */ - int net_seq_index; + uint64_t net_seq_index; /* * Subdirectory path name used for both local and network consumer. @@ -170,12 +170,12 @@ int consumer_send_channel(struct consumer_socket *sock, struct lttcomm_consumer_msg *msg); int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, struct lttcomm_sock *sock, struct consumer_output *consumer, - enum lttng_stream_type type, unsigned int session_id); + enum lttng_stream_type type, uint64_t session_id); int consumer_send_destroy_relayd(struct consumer_socket *sock, struct consumer_output *consumer); int consumer_recv_status_reply(struct consumer_socket *sock); int consumer_recv_status_channel(struct consumer_socket *sock, - unsigned long *key, unsigned int *stream_count); + uint64_t *key, unsigned int *stream_count); void consumer_output_send_destroy_relayd(struct consumer_output *consumer); int consumer_create_socket(struct consumer_data *data, struct consumer_output *output); @@ -195,27 +195,27 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, const char *name, uid_t uid, gid_t gid, - int relayd_id, - unsigned long key, + uint64_t relayd_id, + uint64_t key, unsigned char *uuid); void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg, enum lttng_consumer_command cmd, - int channel_key, - int stream_key, + uint64_t channel_key, + uint64_t stream_key, int cpu); void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, enum lttng_consumer_command cmd, - int channel_key, + uint64_t channel_key, uint64_t session_id, const char *pathname, uid_t uid, gid_t gid, - int relayd_id, + uint64_t relayd_id, const char *name, unsigned int nb_init_streams, enum lttng_event_output output, int type); -int consumer_is_data_pending(unsigned int id, +int consumer_is_data_pending(uint64_t session_id, struct consumer_output *consumer); #endif /* _CONSUMER_H */ diff --git a/src/bin/lttng-sessiond/lttng-sessiond.h b/src/bin/lttng-sessiond/lttng-sessiond.h index 63e9be998..9258f38a7 100644 --- a/src/bin/lttng-sessiond/lttng-sessiond.h +++ b/src/bin/lttng-sessiond/lttng-sessiond.h @@ -23,6 +23,7 @@ #include #include +#include #include #include "session.h" @@ -64,4 +65,13 @@ struct ust_cmd_queue { struct cds_wfq_queue queue; }; +/* + * This pipe is used to inform the thread managing application notify + * communication that a command is queued and ready to be processed. + */ +extern int apps_cmd_notify_pipe[2]; + +int sessiond_set_thread_pollset(struct lttng_poll_event *events, size_t size); +int sessiond_check_thread_quit_pipe(int fd, uint32_t events); + #endif /* _LTT_SESSIOND_H */ diff --git a/src/bin/lttng-sessiond/lttng-ust-ctl.h b/src/bin/lttng-sessiond/lttng-ust-ctl.h index cd833b826..7dd36a93c 100644 --- a/src/bin/lttng-sessiond/lttng-ust-ctl.h +++ b/src/bin/lttng-sessiond/lttng-ust-ctl.h @@ -136,13 +136,21 @@ void ustctl_destroy_channel(struct ustctl_consumer_channel *chan); int ustctl_send_channel_to_sessiond(int sock, struct ustctl_consumer_channel *channel); +int ustctl_channel_close_wait_fd(struct ustctl_consumer_channel *consumer_chan); +int ustctl_channel_close_wakeup_fd(struct ustctl_consumer_channel *consumer_chan); +int ustctl_channel_get_wait_fd(struct ustctl_consumer_channel *consumer_chan); +int ustctl_channel_get_wakeup_fd(struct ustctl_consumer_channel *consumer_chan); + /* * Send a NULL stream to finish iteration over all streams of a given * channel. */ int ustctl_send_stream_to_sessiond(int sock, struct ustctl_consumer_stream *stream); +int ustctl_stream_close_wait_fd(struct ustctl_consumer_stream *stream); int ustctl_stream_close_wakeup_fd(struct ustctl_consumer_stream *stream); +int ustctl_stream_get_wait_fd(struct ustctl_consumer_stream *stream); +int ustctl_stream_get_wakeup_fd(struct ustctl_consumer_stream *stream); /* Create/destroy stream buffers for read */ struct ustctl_consumer_stream * @@ -150,9 +158,6 @@ struct ustctl_consumer_stream * int cpu); void ustctl_destroy_stream(struct ustctl_consumer_stream *stream); -int ustctl_get_wait_fd(struct ustctl_consumer_stream *stream); -int ustctl_get_wakeup_fd(struct ustctl_consumer_stream *stream); - /* For mmap mode, readable without "get" operation */ int ustctl_get_mmap_len(struct ustctl_consumer_stream *stream, unsigned long *len); diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index e9529f1bf..f7bb53ef7 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -37,7 +37,6 @@ #include #include -#include #include #include #include @@ -61,14 +60,12 @@ #include "fd-limit.h" #include "health.h" #include "testpoint.h" +#include "ust-thread.h" #define CONSUMERD_FILE "lttng-consumerd" /* Const values */ -const char default_home_dir[] = DEFAULT_HOME_DIR; const char default_tracing_group[] = DEFAULT_TRACING_GROUP; -const char default_ust_sock_dir[] = DEFAULT_UST_SOCK_DIR; -const char default_global_apps_pipe[] = DEFAULT_GLOBAL_APPS_PIPE; const char *progname; const char *opt_tracing_group; @@ -149,8 +146,11 @@ static int thread_quit_pipe[2] = { -1, -1 }; */ static int apps_cmd_pipe[2] = { -1, -1 }; +int apps_cmd_notify_pipe[2] = { -1, -1 }; + /* Pthread, Mutexes and Semaphores */ static pthread_t apps_thread; +static pthread_t apps_notify_thread; static pthread_t reg_apps_thread; static pthread_t client_thread; static pthread_t kernel_thread; @@ -279,15 +279,11 @@ void setup_consumerd_path(void) /* * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set. */ -static int create_thread_poll_set(struct lttng_poll_event *events, - unsigned int size) +int sessiond_set_thread_pollset(struct lttng_poll_event *events, size_t size) { int ret; - if (events == NULL || size == 0) { - ret = -1; - goto error; - } + assert(events); ret = lttng_poll_create(events, size, LTTNG_CLOEXEC); if (ret < 0) { @@ -295,7 +291,7 @@ static int create_thread_poll_set(struct lttng_poll_event *events, } /* Add quit pipe */ - ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN); + ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR); if (ret < 0) { goto error; } @@ -311,7 +307,7 @@ error: * * Return 1 if it was triggered else 0; */ -static int check_thread_quit_pipe(int fd, uint32_t events) +int sessiond_check_thread_quit_pipe(int fd, uint32_t events) { if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) { return 1; @@ -721,7 +717,7 @@ static void *thread_manage_kernel(void *data) /* Clean events object. We are about to populate it again. */ lttng_poll_clean(&events); - ret = create_thread_poll_set(&events, 2); + ret = sessiond_set_thread_pollset(&events, 2); if (ret < 0) { goto error_poll_create; } @@ -771,7 +767,7 @@ static void *thread_manage_kernel(void *data) health_code_update(); /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); + ret = sessiond_check_thread_quit_pipe(pollfd, revents); if (ret) { err = 0; goto exit; @@ -870,7 +866,7 @@ static void *thread_manage_consumer(void *data) * Pass 2 as size here for the thread quit pipe and kconsumerd_err_sock. * Nothing more will be added to this poll set. */ - ret = create_thread_poll_set(&events, 2); + ret = sessiond_set_thread_pollset(&events, 2); if (ret < 0) { goto error_poll; } @@ -917,7 +913,7 @@ restart: health_code_update(); /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); + ret = sessiond_check_thread_quit_pipe(pollfd, revents); if (ret) { err = 0; goto exit; @@ -1011,7 +1007,7 @@ restart_poll: health_code_update(); /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); + ret = sessiond_check_thread_quit_pipe(pollfd, revents); if (ret) { err = 0; goto exit; @@ -1093,7 +1089,6 @@ static void *thread_manage_apps(void *data) { int i, ret, pollfd, err = -1; uint32_t revents, nb_fd; - struct ust_command ust_cmd; struct lttng_poll_event events; DBG("[thread] Manage application started"); @@ -1109,7 +1104,7 @@ static void *thread_manage_apps(void *data) health_code_update(); - ret = create_thread_poll_set(&events, 2); + ret = sessiond_set_thread_pollset(&events, 2); if (ret < 0) { goto error_poll_create; } @@ -1153,7 +1148,7 @@ static void *thread_manage_apps(void *data) health_code_update(); /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); + ret = sessiond_check_thread_quit_pipe(pollfd, revents); if (ret) { err = 0; goto exit; @@ -1165,11 +1160,13 @@ static void *thread_manage_apps(void *data) ERR("Apps command pipe error"); goto error; } else if (revents & LPOLLIN) { + int sock; + /* Empty pipe */ do { - ret = read(apps_cmd_pipe[0], &ust_cmd, sizeof(ust_cmd)); + ret = read(apps_cmd_pipe[0], &sock, sizeof(sock)); } while (ret < 0 && errno == EINTR); - if (ret < 0 || ret < sizeof(ust_cmd)) { + if (ret < 0 || ret < sizeof(sock)) { PERROR("read apps cmd pipe"); goto error; } @@ -1177,70 +1174,23 @@ static void *thread_manage_apps(void *data) health_code_update(); /* - * @session_lock - * Lock the global session list so from the register up to - * the registration done message, no thread can see the - * application and change its state. + * We only monitor the error events of the socket. This + * thread does not handle any incoming data from UST + * (POLLIN). */ - session_lock_list(); - - /* Register applicaton to the session daemon */ - ret = ust_app_register(&ust_cmd.reg_msg, - ust_cmd.sock); - if (ret == -ENOMEM) { - session_unlock_list(); + ret = lttng_poll_add(&events, sock, + LPOLLERR | LPOLLHUP | LPOLLRDHUP); + if (ret < 0) { goto error; - } else if (ret < 0) { - session_unlock_list(); - break; } - health_code_update(); + /* Set socket timeout for both receiving and ending */ + (void) lttcomm_setsockopt_rcv_timeout(sock, + app_socket_timeout); + (void) lttcomm_setsockopt_snd_timeout(sock, + app_socket_timeout); - /* - * Validate UST version compatibility. - */ - ret = ust_app_validate_version(ust_cmd.sock); - if (ret >= 0) { - /* - * Add channel(s) and event(s) to newly registered apps - * from lttng global UST domain. - */ - update_ust_app(ust_cmd.sock); - } - - health_code_update(); - - ret = ust_app_register_done(ust_cmd.sock); - if (ret < 0) { - /* - * If the registration is not possible, we simply - * unregister the apps and continue - */ - ust_app_unregister(ust_cmd.sock); - } else { - /* - * We only monitor the error events of the socket. This - * thread does not handle any incoming data from UST - * (POLLIN). - */ - ret = lttng_poll_add(&events, ust_cmd.sock, - LPOLLERR & LPOLLHUP & LPOLLRDHUP); - if (ret < 0) { - session_unlock_list(); - goto error; - } - - /* Set socket timeout for both receiving and ending */ - (void) lttcomm_setsockopt_rcv_timeout(ust_cmd.sock, - app_socket_timeout); - (void) lttcomm_setsockopt_snd_timeout(ust_cmd.sock, - app_socket_timeout); - - DBG("Apps with sock %d added to poll set", - ust_cmd.sock); - } - session_unlock_list(); + DBG("Apps with sock %d added to poll set", sock); health_code_update(); @@ -1293,6 +1243,38 @@ error_testpoint: return NULL; } +/* + * Send a socket to a thread This is called from the dispatch UST registration + * thread once all sockets are set for the application. + * + * On success, return 0 else a negative value being the errno message of the + * write(). + */ +static int send_socket_to_thread(int fd, int sock) +{ + int ret; + + /* Sockets MUST be set or else this should not have been called. */ + assert(fd >= 0); + assert(sock >= 0); + + do { + ret = write(fd, &sock, sizeof(sock)); + } while (ret < 0 && errno == EINTR); + if (ret < 0 || ret != sizeof(sock)) { + PERROR("write apps pipe %d", fd); + if (ret < 0) { + ret = -errno; + } + goto error; + } + + /* All good. Don't send back the write positive ret value. */ + ret = 0; +error: + return ret; +} + /* * Dispatch request from the registration threads to the application * communication thread. @@ -1302,6 +1284,12 @@ static void *thread_dispatch_ust_registration(void *data) int ret; struct cds_wfq_node *node; struct ust_command *ust_cmd = NULL; + struct { + struct ust_app *app; + struct cds_list_head head; + } *wait_node = NULL, *tmp_wait_node; + + CDS_LIST_HEAD(wait_queue); DBG("[thread] Dispatch UST command started"); @@ -1310,6 +1298,8 @@ static void *thread_dispatch_ust_registration(void *data) futex_nto1_prepare(&ust_cmd_queue.futex); do { + struct ust_app *app = NULL; + /* Dequeue command for registration */ node = cds_wfq_dequeue_blocking(&ust_cmd_queue.queue); if (node == NULL) { @@ -1326,34 +1316,122 @@ static void *thread_dispatch_ust_registration(void *data) ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid, ust_cmd->sock, ust_cmd->reg_msg.name, ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor); - /* - * Inform apps thread of the new application registration. This - * call is blocking so we can be assured that the data will be read - * at some point in time or wait to the end of the world :) - */ - if (apps_cmd_pipe[1] >= 0) { - do { - ret = write(apps_cmd_pipe[1], ust_cmd, - sizeof(struct ust_command)); - } while (ret < 0 && errno == EINTR); - if (ret < 0 || ret != sizeof(struct ust_command)) { - PERROR("write apps cmd pipe"); - if (errno == EBADF) { - /* - * We can't inform the application thread to process - * registration. We will exit or else application - * registration will not occur and tracing will never - * start. - */ - goto error; + + if (ust_cmd->reg_msg.type == USTCTL_SOCKET_CMD) { + wait_node = zmalloc(sizeof(*wait_node)); + if (!wait_node) { + PERROR("zmalloc wait_node dispatch"); + goto error; + } + CDS_INIT_LIST_HEAD(&wait_node->head); + + /* Create application object if socket is CMD. */ + wait_node->app = ust_app_create(&ust_cmd->reg_msg, + ust_cmd->sock); + if (!wait_node->app) { + ret = close(ust_cmd->sock); + if (ret < 0) { + PERROR("close ust sock dispatch %d", ust_cmd->sock); } + lttng_fd_put(1, LTTNG_FD_APPS); + free(wait_node); + continue; } + /* + * Add application to the wait queue so we can set the notify + * socket before putting this object in the global ht. + */ + cds_list_add(&wait_node->head, &wait_queue); + + /* + * We have to continue here since we don't have the notify + * socket and the application MUST be added to the hash table + * only at that moment. + */ + continue; } else { - /* Application manager thread is not available. */ + /* + * Look for the application in the local wait queue and set the + * notify socket if found. + */ + cds_list_for_each_entry_safe(wait_node, tmp_wait_node, + &wait_queue, head) { + if (wait_node->app->pid == ust_cmd->reg_msg.pid) { + wait_node->app->notify_sock = ust_cmd->sock; + cds_list_del(&wait_node->head); + app = wait_node->app; + free(wait_node); + DBG3("UST app notify socket %d is set", ust_cmd->sock); + break; + } + } + } + + if (app) { + /* + * @session_lock_list + * + * Lock the global session list so from the register up to the + * registration done message, no thread can see the application + * and change its state. + */ + session_lock_list(); + rcu_read_lock(); + + /* + * Add application to the global hash table. This needs to be + * done before the update to the UST registry can locate the + * application. + */ + ust_app_add(app); + + /* Set app version. This call will print an error if needed. */ + (void) ust_app_version(app); + + /* Send notify socket through the notify pipe. */ + ret = send_socket_to_thread(apps_cmd_notify_pipe[1], + app->notify_sock); + if (ret < 0) { + rcu_read_unlock(); + session_unlock_list(); + /* No notify thread, stop the UST tracing. */ + goto error; + } + + /* + * Update newly registered application with the tracing + * registry info already enabled information. + */ + update_ust_app(app->sock); + + /* + * Don't care about return value. Let the manage apps threads + * handle app unregistration upon socket close. + */ + (void) ust_app_register_done(app->sock); + + /* + * Even if the application socket has been closed, send the app + * to the thread and unregistration will take place at that + * place. + */ + ret = send_socket_to_thread(apps_cmd_pipe[1], app->sock); + if (ret < 0) { + rcu_read_unlock(); + session_unlock_list(); + /* No apps. thread, stop the UST tracing. */ + goto error; + } + + rcu_read_unlock(); + session_unlock_list(); + } else { + /* Application manager threads are not available. */ ret = close(ust_cmd->sock); if (ret < 0) { PERROR("close ust_cmd sock"); } + lttng_fd_put(1, LTTNG_FD_APPS); } free(ust_cmd); } while (node != NULL); @@ -1363,6 +1441,13 @@ static void *thread_dispatch_ust_registration(void *data) } error: + /* Clean up wait queue. */ + cds_list_for_each_entry_safe(wait_node, tmp_wait_node, + &wait_queue, head) { + cds_list_del(&wait_node->head); + free(wait_node); + } + DBG("Dispatch thread dying"); return NULL; } @@ -1398,7 +1483,7 @@ static void *thread_registration_apps(void *data) * Pass 2 as size here for the thread quit pipe and apps socket. Nothing * more will be added to this poll set. */ - ret = create_thread_poll_set(&events, 2); + ret = sessiond_set_thread_pollset(&events, 2); if (ret < 0) { goto error_create_poll; } @@ -1445,7 +1530,7 @@ static void *thread_registration_apps(void *data) pollfd = LTTNG_POLL_GETFD(&events, i); /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); + ret = sessiond_check_thread_quit_pipe(pollfd, revents); if (ret) { err = 0; goto exit; @@ -1490,16 +1575,12 @@ static void *thread_registration_apps(void *data) sock = -1; continue; } + health_code_update(); - ret = lttcomm_recv_unix_sock(sock, &ust_cmd->reg_msg, - sizeof(struct ust_register_msg)); - if (ret < 0 || ret < sizeof(struct ust_register_msg)) { - if (ret < 0) { - PERROR("lttcomm_recv_unix_sock register apps"); - } else { - ERR("Wrong size received on apps register"); - } + ret = ust_app_recv_registration(sock, &ust_cmd->reg_msg); + if (ret < 0) { free(ust_cmd); + /* Close socket of the application. */ ret = close(sock); if (ret) { PERROR("close"); @@ -2966,7 +3047,7 @@ static void *thread_manage_health(void *data) * Pass 2 as size here for the thread quit pipe and client_sock. Nothing * more will be added to this poll set. */ - ret = create_thread_poll_set(&events, 2); + ret = sessiond_set_thread_pollset(&events, 2); if (ret < 0) { goto error; } @@ -3001,7 +3082,7 @@ restart: pollfd = LTTNG_POLL_GETFD(&events, i); /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); + ret = sessiond_check_thread_quit_pipe(pollfd, revents); if (ret) { err = 0; goto exit; @@ -3154,7 +3235,7 @@ static void *thread_manage_clients(void *data) * Pass 2 as size here for the thread quit pipe and client_sock. Nothing * more will be added to this poll set. */ - ret = create_thread_poll_set(&events, 2); + ret = sessiond_set_thread_pollset(&events, 2); if (ret < 0) { goto error_create_poll; } @@ -3206,7 +3287,7 @@ static void *thread_manage_clients(void *data) health_code_update(); /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); + ret = sessiond_check_thread_quit_pipe(pollfd, revents); if (ret) { err = 0; goto exit; @@ -4006,7 +4087,7 @@ int main(int argc, char **argv) /* Set global SHM for ust */ if (strlen(wait_shm_path) == 0) { snprintf(wait_shm_path, PATH_MAX, - DEFAULT_HOME_APPS_WAIT_SHM_PATH, geteuid()); + DEFAULT_HOME_APPS_WAIT_SHM_PATH, getuid()); } /* Set health check Unix path */ @@ -4022,6 +4103,7 @@ int main(int argc, char **argv) DBG("Client socket path %s", client_unix_sock_path); DBG("Application socket path %s", apps_unix_sock_path); + DBG("Application wait path %s", wait_shm_path); DBG("LTTng run directory path: %s", rundir); /* 32 bits consumerd path setup */ @@ -4130,6 +4212,11 @@ int main(int argc, char **argv) goto exit; } + /* Setup the thread apps notify communication pipe. */ + if (utils_create_pipe_cloexec(apps_cmd_notify_pipe) < 0) { + goto exit; + } + /* Init UST command queue. */ cds_wfq_init(&ust_cmd_queue.queue); @@ -4194,6 +4281,14 @@ int main(int argc, char **argv) goto exit_apps; } + /* Create thread to manage application notify socket */ + ret = pthread_create(&apps_notify_thread, NULL, + ust_thread_manage_notify, (void *) NULL); + if (ret != 0) { + PERROR("pthread_create apps"); + goto exit_apps; + } + /* Don't start this thread if kernel tracing is not requested nor root */ if (is_root && !opt_no_kernel) { /* Create kernel thread to manage kernel event */ diff --git a/src/bin/lttng-sessiond/trace-kernel.c b/src/bin/lttng-sessiond/trace-kernel.c index 48be06576..990684be9 100644 --- a/src/bin/lttng-sessiond/trace-kernel.c +++ b/src/bin/lttng-sessiond/trace-kernel.c @@ -272,8 +272,8 @@ struct ltt_kernel_metadata *trace_kernel_create_metadata(void) chan->attr.overwrite = DEFAULT_CHANNEL_OVERWRITE; chan->attr.subbuf_size = default_get_metadata_subbuf_size(); chan->attr.num_subbuf = DEFAULT_METADATA_SUBBUF_NUM; - chan->attr.switch_timer_interval = DEFAULT_CHANNEL_SWITCH_TIMER; - chan->attr.read_timer_interval = DEFAULT_CHANNEL_READ_TIMER; + chan->attr.switch_timer_interval = DEFAULT_KERNEL_CHANNEL_SWITCH_TIMER; + chan->attr.read_timer_interval = DEFAULT_KERNEL_CHANNEL_READ_TIMER; chan->attr.output = DEFAULT_KERNEL_CHANNEL_OUTPUT; /* Init metadata */ diff --git a/src/bin/lttng-sessiond/trace-ust.c b/src/bin/lttng-sessiond/trace-ust.c index 5e06a8452..97a9c77e8 100644 --- a/src/bin/lttng-sessiond/trace-ust.c +++ b/src/bin/lttng-sessiond/trace-ust.c @@ -412,8 +412,8 @@ struct ltt_ust_metadata *trace_ust_create_metadata(char *path) lum->attr.overwrite = DEFAULT_CHANNEL_OVERWRITE; lum->attr.subbuf_size = default_get_metadata_subbuf_size(); lum->attr.num_subbuf = DEFAULT_METADATA_SUBBUF_NUM; - lum->attr.switch_timer_interval = DEFAULT_CHANNEL_SWITCH_TIMER; - lum->attr.read_timer_interval = DEFAULT_CHANNEL_READ_TIMER; + lum->attr.switch_timer_interval = DEFAULT_UST_CHANNEL_SWITCH_TIMER; + lum->attr.read_timer_interval = DEFAULT_UST_CHANNEL_READ_TIMER; lum->attr.output = LTTNG_UST_MMAP; lum->handle = -1; diff --git a/src/bin/lttng-sessiond/ust-app.c b/src/bin/lttng-sessiond/ust-app.c index aa188931d..225d3f86a 100644 --- a/src/bin/lttng-sessiond/ust-app.c +++ b/src/bin/lttng-sessiond/ust-app.c @@ -73,6 +73,10 @@ static struct consumer_socket *find_consumer_socket_by_bitness(int bits, } socket = consumer_find_socket(consumer_fd, consumer); + if (!socket) { + ERR("Consumer socket fd %d not found in consumer obj %p", + consumer_fd, consumer); + } end: return socket; @@ -142,16 +146,18 @@ no_match: * Unique add of an ust app event in the given ht. This uses the custom * ht_match_ust_app_event match function and the event name as hash. */ -static void add_unique_ust_app_event(struct lttng_ht *ht, +static void add_unique_ust_app_event(struct ust_app_channel *ua_chan, struct ust_app_event *event) { struct cds_lfht_node *node_ptr; struct ust_app_ht_key key; + struct lttng_ht *ht; - assert(ht); - assert(ht->ht); + assert(ua_chan); + assert(ua_chan->events); assert(event); + ht = ua_chan->events; key.name = event->attr.name; key.filter = event->filter; key.loglevel = event->attr.loglevel; @@ -162,6 +168,28 @@ static void add_unique_ust_app_event(struct lttng_ht *ht, assert(node_ptr == &event->node.node); } +/* + * Close the notify socket from the given RCU head object. This MUST be called + * through a call_rcu(). + */ +static void close_notify_sock_rcu(struct rcu_head *head) +{ + int ret; + struct ust_app_notify_sock_obj *obj = + caa_container_of(head, struct ust_app_notify_sock_obj, head); + + /* Must have a valid fd here. */ + assert(obj->fd >= 0); + + ret = close(obj->fd); + if (ret) { + ERR("close notify sock %d RCU", obj->fd); + } + lttng_fd_put(LTTNG_FD_APPS, 1); + + free(obj); +} + /* * Delete ust context safely. RCU read lock must be held before calling * this function. @@ -175,9 +203,9 @@ void delete_ust_app_ctx(int sock, struct ust_app_ctx *ua_ctx) if (ua_ctx->obj) { ret = ustctl_release_object(sock, ua_ctx->obj); - if (ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) { - ERR("UST app sock %d release context obj failed with ret %d", - sock, ret); + if (ret < 0 && ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) { + ERR("UST app sock %d release ctx obj handle %d failed with ret %d", + sock, ua_ctx->obj->handle, ret); } free(ua_ctx->obj); } @@ -236,7 +264,8 @@ void delete_ust_app_stream(int sock, struct ust_app_stream *stream) * this function. */ static -void delete_ust_app_channel(int sock, struct ust_app_channel *ua_chan) +void delete_ust_app_channel(int sock, struct ust_app_channel *ua_chan, + struct ust_app *app) { int ret; struct lttng_ht_iter iter; @@ -271,7 +300,13 @@ void delete_ust_app_channel(int sock, struct ust_app_channel *ua_chan) } lttng_ht_destroy(ua_chan->events); + /* Wipe and free registry. */ + ust_registry_channel_destroy(&ua_chan->session->registry, &ua_chan->registry); + if (ua_chan->obj != NULL) { + /* Remove channel from application UST object descriptor. */ + iter.iter.node = &ua_chan->ust_objd_node.node; + lttng_ht_del(app->ust_objd, &iter); ret = ustctl_release_object(sock, ua_chan->obj); if (ret < 0 && ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) { ERR("UST app sock %d release channel obj failed with ret %d", @@ -283,29 +318,178 @@ void delete_ust_app_channel(int sock, struct ust_app_channel *ua_chan) free(ua_chan); } +/* + * For a given application and session, push metadata to consumer. The session + * lock MUST be acquired here before calling this. + * + * Return 0 on success else a negative error. + */ +static int push_metadata(struct ust_app *app, struct ust_app_session *ua_sess) +{ + int ret; + char *metadata_str = NULL; + size_t len, offset; + struct consumer_socket *socket; + + assert(app); + assert(ua_sess); + + if (!ua_sess->consumer || !ua_sess->metadata) { + /* No consumer means no stream associated so just return gracefully. */ + ret = 0; + goto end; + } + + rcu_read_lock(); + + /* Get consumer socket to use to push the metadata.*/ + socket = find_consumer_socket_by_bitness(app->bits_per_long, + ua_sess->consumer); + if (!socket) { + ret = -1; + goto error_rcu_unlock; + } + + /* + * TODO: Currently, we hold the socket lock around sampling of the next + * metadata segment to ensure we send metadata over the consumer socket in + * the correct order. This makes the registry lock nest inside the socket + * lock. + * + * Please note that this is a temporary measure: we should move this lock + * back into ust_consumer_push_metadata() when the consumer gets the + * ability to reorder the metadata it receives. + */ + pthread_mutex_lock(socket->lock); + pthread_mutex_lock(&ua_sess->registry.lock); + + offset = ua_sess->registry.metadata_len_sent; + len = ua_sess->registry.metadata_len - ua_sess->registry.metadata_len_sent; + if (len == 0) { + DBG3("No metadata to push for session id %d", ua_sess->id); + ret = 0; + goto error_reg_unlock; + } + assert(len > 0); + + /* Allocate only what we have to send. */ + metadata_str = zmalloc(len); + if (!metadata_str) { + PERROR("zmalloc ust app metadata string"); + ret = -ENOMEM; + goto error_reg_unlock; + } + /* Copy what we haven't send out. */ + memcpy(metadata_str, ua_sess->registry.metadata + offset, len); + + pthread_mutex_unlock(&ua_sess->registry.lock); + + ret = ust_consumer_push_metadata(socket, ua_sess, metadata_str, len, + offset); + if (ret < 0) { + pthread_mutex_unlock(socket->lock); + goto error_rcu_unlock; + } + + /* Update len sent of the registry. */ + pthread_mutex_lock(&ua_sess->registry.lock); + ua_sess->registry.metadata_len_sent += len; + pthread_mutex_unlock(&ua_sess->registry.lock); + pthread_mutex_unlock(socket->lock); + + rcu_read_unlock(); + free(metadata_str); + return 0; + +error_reg_unlock: + pthread_mutex_unlock(&ua_sess->registry.lock); + pthread_mutex_unlock(socket->lock); +error_rcu_unlock: + rcu_read_unlock(); + free(metadata_str); +end: + return ret; +} + +/* + * Send to the consumer a close metadata command for the given session. Once + * done, the metadata channel is deleted and the session metadata pointer is + * nullified. The session lock MUST be acquired here unless the application is + * in the destroy path. + * + * Return 0 on success else a negative value. + */ +static int close_metadata(struct ust_app *app, struct ust_app_session *ua_sess) +{ + int ret; + struct consumer_socket *socket; + + assert(app); + assert(ua_sess); + + /* Ignore if no metadata. Valid since it can be called on unregister. */ + if (!ua_sess->metadata) { + ret = 0; + goto error; + } + + rcu_read_lock(); + + /* Get consumer socket to use to push the metadata.*/ + socket = find_consumer_socket_by_bitness(app->bits_per_long, + ua_sess->consumer); + if (!socket) { + ret = -1; + goto error_rcu_unlock; + } + + ret = ust_consumer_close_metadata(socket, ua_sess->metadata); + if (ret < 0) { + goto error_rcu_unlock; + } + +error_rcu_unlock: + /* Destroy metadata on our side since we must not use it anymore. */ + delete_ust_app_channel(-1, ua_sess->metadata, app); + ua_sess->metadata = NULL; + + rcu_read_unlock(); +error: + return ret; +} + /* * Delete ust app session safely. RCU read lock must be held before calling * this function. */ static -void delete_ust_app_session(int sock, struct ust_app_session *ua_sess) +void delete_ust_app_session(int sock, struct ust_app_session *ua_sess, + struct ust_app *app) { int ret; struct lttng_ht_iter iter; struct ust_app_channel *ua_chan; + assert(ua_sess); + if (ua_sess->metadata) { - delete_ust_app_channel(sock, ua_sess->metadata); + /* Push metadata for application before freeing the application. */ + (void) push_metadata(app, ua_sess); + + /* And ask to close it for this session. */ + (void) close_metadata(app, ua_sess); } cds_lfht_for_each_entry(ua_sess->channels->ht, &iter.iter, ua_chan, node.node) { ret = lttng_ht_del(ua_sess->channels, &iter); assert(!ret); - delete_ust_app_channel(sock, ua_chan); + delete_ust_app_channel(sock, ua_chan, app); } lttng_ht_destroy(ua_sess->channels); + ust_registry_session_destroy(&ua_sess->registry); + if (ua_sess->handle != -1) { ret = ustctl_release_handle(sock, ua_sess->handle); if (ret < 0 && ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) { @@ -338,7 +522,7 @@ void delete_ust_app(struct ust_app *app) cds_list_for_each_entry_safe(ua_sess, tmp_ua_sess, &app->teardown_head, teardown_node) { /* Free every object in the session and the session. */ - delete_ust_app_session(sock, ua_sess); + delete_ust_app_session(sock, ua_sess, app); } /* @@ -385,7 +569,7 @@ void delete_ust_app_rcu(struct rcu_head *head) * Delete the session from the application ht and delete the data structure by * freeing every object inside and releasing them. */ -static void destroy_session(struct ust_app *app, +static void destroy_app_session(struct ust_app *app, struct ust_app_session *ua_sess) { int ret; @@ -402,7 +586,7 @@ static void destroy_session(struct ust_app *app, } /* Once deleted, free the data structure. */ - delete_ust_app_session(app->sock, ua_sess); + delete_ust_app_session(app->sock, ua_sess, app); end: return; @@ -412,7 +596,7 @@ end: * Alloc new UST app session. */ static -struct ust_app_session *alloc_ust_app_session(void) +struct ust_app_session *alloc_ust_app_session(struct ust_app *app) { struct ust_app_session *ua_sess; @@ -425,9 +609,15 @@ struct ust_app_session *alloc_ust_app_session(void) ua_sess->handle = -1; ua_sess->channels = lttng_ht_new(0, LTTNG_HT_TYPE_STRING); - - if ((lttng_uuid_generate(ua_sess->uuid))) { - ERR("Failed to generate UST uuid"); + pthread_mutex_init(&ua_sess->lock, NULL); + if (ust_registry_session_init(&ua_sess->registry, app, + app->bits_per_long, + app->uint8_t_alignment, + app->uint16_t_alignment, + app->uint32_t_alignment, + app->uint64_t_alignment, + app->long_alignment, + app->byte_order)) { goto error; } @@ -444,6 +634,7 @@ error_free: */ static struct ust_app_channel *alloc_ust_app_channel(char *name, + struct ust_app_session *ua_sess, struct lttng_ust_channel_attr *attr) { struct ust_app_channel *ua_chan; @@ -468,6 +659,9 @@ struct ust_app_channel *alloc_ust_app_channel(char *name, CDS_INIT_LIST_HEAD(&ua_chan->streams.head); + /* Initialize UST registry. */ + ust_registry_channel_init(&ua_sess->registry, &ua_chan->registry); + /* Copy attributes */ if (attr) { /* Translate from lttng_ust_channel to ustctl_consumer_channel_attr. */ @@ -614,6 +808,29 @@ error: return NULL; } +/* + * Find an ust_app using the notify sock and return it. RCU read side lock must + * be held before calling this helper function. + */ +static struct ust_app *find_app_by_notify_sock(int sock) +{ + struct lttng_ht_node_ulong *node; + struct lttng_ht_iter iter; + + lttng_ht_lookup(ust_app_ht_by_notify_sock, (void *)((unsigned long) sock), + &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node == NULL) { + DBG2("UST app find by notify sock %d not found", sock); + goto error; + } + + return caa_container_of(node, struct ust_app, notify_sock_n); + +error: + return NULL; +} + /* * Lookup for an ust app event based on event name, filter bytecode and the * event loglevel. @@ -652,6 +869,8 @@ end: /* * Create the channel context on the tracer. + * + * Called with UST app session lock held. */ static int create_ust_channel_context(struct ust_app_channel *ua_chan, @@ -675,7 +894,8 @@ int create_ust_channel_context(struct ust_app_channel *ua_chan, ua_ctx->handle = ua_ctx->obj->handle; - DBG2("UST app context created successfully for channel %s", ua_chan->name); + DBG2("UST app context handle %d created successfully for channel %s", + ua_ctx->handle, ua_chan->name); error: health_code_update(); @@ -840,7 +1060,8 @@ error: } /* - * Create the specified channel onto the UST tracer for a UST session. + * Create the specified channel onto the UST tracer for a UST session. This + * MUST be called with UST app session lock held. * * Return 0 on success. On error, a negative value is returned. */ @@ -858,6 +1079,7 @@ static int create_ust_channel(struct ust_app *app, assert(ua_chan); assert(consumer); + rcu_read_lock(); health_code_update(); /* Get the right consumer socket for the application. */ @@ -880,7 +1102,7 @@ static int create_ust_channel(struct ust_app *app, /* * Compute the number of fd needed before receiving them. It must be 2 per - * stream. + * stream (2 being the default value here). */ nb_fd = DEFAULT_UST_STREAM_FD_NUM * ua_chan->expected_stream_count; @@ -908,6 +1130,8 @@ static int create_ust_channel(struct ust_app *app, goto error; } + health_code_update(); + /* Send all streams to application. */ cds_list_for_each_entry_safe(stream, stmp, &ua_chan->streams.head, list) { ret = ust_consumer_send_stream_to_ust(app, ua_chan, stream); @@ -921,6 +1145,11 @@ static int create_ust_channel(struct ust_app *app, /* Flag the channel that it is sent to the application. */ ua_chan->is_sent = 1; + /* Assign session to channel. */ + ua_chan->session = ua_sess; + /* Initialize ust objd object using the received handle and add it. */ + lttng_ht_node_init_ulong(&ua_chan->ust_objd_node, ua_chan->handle); + lttng_ht_add_unique_ulong(app->ust_objd, &ua_chan->ust_objd_node); health_code_update(); @@ -932,6 +1161,7 @@ static int create_ust_channel(struct ust_app *app, } } + rcu_read_unlock(); return 0; error_destroy: @@ -946,11 +1176,14 @@ error_fd_get: (void) ust_consumer_destroy_channel(socket, ua_chan); error: health_code_update(); + rcu_read_unlock(); return ret; } /* * Create the specified event onto the UST tracer for a UST session. + * + * Should be called with session mutex held. */ static int create_ust_event(struct ust_app *app, struct ust_app_session *ua_sess, @@ -1091,7 +1324,7 @@ static void shadow_copy_channel(struct ust_app_channel *ua_chan, continue; } shadow_copy_event(ua_event, uevent); - add_unique_ust_app_event(ua_chan->events, ua_event); + add_unique_ust_app_event(ua_chan, ua_event); } } @@ -1148,7 +1381,7 @@ static void shadow_copy_session(struct ust_app_session *ua_sess, DBG2("Channel %s not found on shadow session copy, creating it", uchan->name); - ua_chan = alloc_ust_app_channel(uchan->name, &uchan->attr); + ua_chan = alloc_ust_app_channel(uchan->name, ua_sess, &uchan->attr); if (ua_chan == NULL) { /* malloc failed FIXME: Might want to do handle ENOMEM .. */ continue; @@ -1226,7 +1459,7 @@ static int create_ust_app_session(struct ltt_ust_session *usess, if (ua_sess == NULL) { DBG2("UST app pid: %d session id %d not found, creating it", app->pid, usess->id); - ua_sess = alloc_ust_app_session(); + ua_sess = alloc_ust_app_session(app); if (ua_sess == NULL) { /* Only malloc can failed so something is really wrong */ ret = -ENOMEM; @@ -1247,7 +1480,7 @@ static int create_ust_app_session(struct ltt_ust_session *usess, } else { DBG("UST app creating session failed. Application is dead"); } - delete_ust_app_session(-1, ua_sess); + delete_ust_app_session(-1, ua_sess, app); if (ret != -ENOMEM) { /* * Tracer is probably gone or got an internal error so let's @@ -1267,10 +1500,19 @@ static int create_ust_app_session(struct ltt_ust_session *usess, DBG2("UST app session created successfully with handle %d", ret); } + /* + * Assign consumer if not already set. For one application, there is only + * one possible consumer has of now. + */ + if (!ua_sess->consumer) { + ua_sess->consumer = usess->consumer; + } + *ua_sess_ptr = ua_sess; if (is_created) { *is_created = created; } + /* Everything went well. */ ret = 0; @@ -1281,6 +1523,8 @@ error: /* * Create a context for the channel on the tracer. + * + * Called with UST app session lock held. */ static int create_ust_app_channel_context(struct ust_app_session *ua_sess, @@ -1322,6 +1566,8 @@ error: /* * Enable on the tracer side a ust app event for the session and channel. + * + * Called with UST app session lock held. */ static int enable_ust_app_event(struct ust_app_session *ua_sess, @@ -1412,6 +1658,8 @@ error: /* * Create UST app channel and create it on the tracer. Set ua_chanp of the * newly created channel if not NULL. + * + * Called with UST app session lock held. */ static int create_ust_app_channel(struct ust_app_session *ua_sess, struct ltt_ust_channel *uchan, struct ust_app *app, @@ -1431,7 +1679,7 @@ static int create_ust_app_channel(struct ust_app_session *ua_sess, goto end; } - ua_chan = alloc_ust_app_channel(uchan->name, &uchan->attr); + ua_chan = alloc_ust_app_channel(uchan->name, ua_sess, &uchan->attr); if (ua_chan == NULL) { /* Only malloc can fail here */ ret = -ENOMEM; @@ -1447,12 +1695,12 @@ static int create_ust_app_channel(struct ust_app_session *ua_sess, goto error; } - /* Only add the channel if successful on the tracer side. */ - lttng_ht_add_unique_str(ua_sess->channels, &ua_chan->node); - DBG2("UST app create channel %s for PID %d completed", ua_chan->name, app->pid); + /* Only add the channel if successful on the tracer side. */ + lttng_ht_add_unique_str(ua_sess->channels, &ua_chan->node); + end: if (ua_chanp) { *ua_chanp = ua_chan; @@ -1462,12 +1710,14 @@ end: return 0; error: - delete_ust_app_channel(ua_chan->is_sent ? app->sock : -1, ua_chan); + delete_ust_app_channel(ua_chan->is_sent ? app->sock : -1, ua_chan, app); return ret; } /* * Create UST app event and create it on the tracer side. + * + * Called with ust app session mutex held. */ static int create_ust_app_event(struct ust_app_session *ua_sess, @@ -1502,7 +1752,7 @@ int create_ust_app_event(struct ust_app_session *ua_sess, goto error; } - add_unique_ust_app_event(ua_chan->events, ua_event); + add_unique_ust_app_event(ua_chan, ua_event); DBG2("UST app create event %s for PID %d completed", ua_event->name, app->pid); @@ -1518,15 +1768,19 @@ error: /* * Create UST metadata and open it on the tracer side. + * + * Called with UST app session lock held. */ static int create_ust_app_metadata(struct ust_app_session *ua_sess, struct ust_app *app, struct consumer_output *consumer) { int ret = 0; struct ust_app_channel *metadata; + struct consumer_socket *socket; assert(ua_sess); assert(app); + assert(consumer); if (ua_sess->metadata) { /* Already exist. Return success. */ @@ -1534,7 +1788,7 @@ static int create_ust_app_metadata(struct ust_app_session *ua_sess, } /* Allocate UST metadata */ - metadata = alloc_ust_app_channel(DEFAULT_METADATA_NAME, NULL); + metadata = alloc_ust_app_channel(DEFAULT_METADATA_NAME, ua_sess, NULL); if (!metadata) { /* malloc() failed */ ret = -ENOMEM; @@ -1545,24 +1799,48 @@ static int create_ust_app_metadata(struct ust_app_session *ua_sess, metadata->attr.overwrite = DEFAULT_CHANNEL_OVERWRITE; metadata->attr.subbuf_size = default_get_metadata_subbuf_size(); metadata->attr.num_subbuf = DEFAULT_METADATA_SUBBUF_NUM; - metadata->attr.switch_timer_interval = DEFAULT_CHANNEL_SWITCH_TIMER; - metadata->attr.read_timer_interval = DEFAULT_CHANNEL_READ_TIMER; + metadata->attr.switch_timer_interval = DEFAULT_UST_CHANNEL_SWITCH_TIMER; + metadata->attr.read_timer_interval = DEFAULT_UST_CHANNEL_READ_TIMER; metadata->attr.output = LTTNG_UST_MMAP; metadata->attr.type = LTTNG_UST_CHAN_METADATA; - ret = create_ust_channel(app, ua_sess, metadata, consumer); + /* Get the right consumer socket for the application. */ + socket = find_consumer_socket_by_bitness(app->bits_per_long, consumer); + if (!socket) { + ret = -EINVAL; + goto error_consumer; + } + + /* + * Ask the metadata channel creation to the consumer. The metadata object + * will be created by the consumer and kept their. However, the stream is + * never added or monitored until we do a first push metadata to the + * consumer. + */ + ret = ust_consumer_ask_channel(ua_sess, metadata, consumer, socket); + if (ret < 0) { + goto error_consumer; + } + + /* + * The setup command will make the metadata stream be sent to the relayd, + * if applicable, and the thread managing the metadatas. This is important + * because after this point, if an error occurs, the only way the stream + * can be deleted is to be monitored in the consumer. + */ + ret = ust_consumer_setup_metadata(socket, metadata); if (ret < 0) { - goto error_create; + goto error_consumer; } ua_sess->metadata = metadata; - DBG2("UST metadata opened for app pid %d", app->pid); + DBG2("UST metadata created for app pid %d", app->pid); end: return 0; -error_create: - delete_ust_app_channel(metadata->is_sent ? app->sock : -1, metadata); +error_consumer: + delete_ust_app_channel(-1, metadata, app); error: return ret; } @@ -1576,114 +1854,159 @@ struct lttng_ht *ust_app_get_ht(void) } /* - * Return ust app pointer or NULL if not found. + * Return ust app pointer or NULL if not found. RCU read side lock MUST be + * acquired before calling this function. */ struct ust_app *ust_app_find_by_pid(pid_t pid) { + struct ust_app *app = NULL; struct lttng_ht_node_ulong *node; struct lttng_ht_iter iter; - rcu_read_lock(); lttng_ht_lookup(ust_app_ht, (void *)((unsigned long) pid), &iter); node = lttng_ht_iter_get_node_ulong(&iter); if (node == NULL) { DBG2("UST app no found with pid %d", pid); goto error; } - rcu_read_unlock(); DBG2("Found UST app by pid %d", pid); - return caa_container_of(node, struct ust_app, pid_n); + app = caa_container_of(node, struct ust_app, pid_n); error: - rcu_read_unlock(); - return NULL; + return app; } /* - * Using pid and uid (of the app), allocate a new ust_app struct and - * add it to the global traceable app list. + * Allocate and init an UST app object using the registration information and + * the command socket. This is called when the command socket connects to the + * session daemon. * - * On success, return 0, else return malloc -ENOMEM, or -EINVAL if app - * bitness is not supported. + * The object is returned on success or else NULL. */ -int ust_app_register(struct ust_register_msg *msg, int sock) +struct ust_app *ust_app_create(struct ust_register_msg *msg, int sock) { - struct ust_app *lta; - int ret; + struct ust_app *lta = NULL; + + assert(msg); + assert(sock >= 0); + + DBG3("UST app creating application for socket %d", sock); if ((msg->bits_per_long == 64 && (uatomic_read(&ust_consumerd64_fd) == -EINVAL)) || (msg->bits_per_long == 32 && (uatomic_read(&ust_consumerd32_fd) == -EINVAL))) { ERR("Registration failed: application \"%s\" (pid: %d) has " - "%d-bit long, but no consumerd for this long size is available.\n", - msg->name, msg->pid, msg->bits_per_long); - ret = close(sock); - if (ret) { - PERROR("close"); - } - lttng_fd_put(LTTNG_FD_APPS, 1); - return -EINVAL; - } - if (msg->major != LTTNG_UST_COMM_MAJOR) { - ERR("Registration failed: application \"%s\" (pid: %d) has " - "communication protocol version %u.%u, but sessiond supports 2.x.\n", - msg->name, msg->pid, msg->major, msg->minor); - ret = close(sock); - if (ret) { - PERROR("close"); - } - lttng_fd_put(LTTNG_FD_APPS, 1); - return -EINVAL; + "%d-bit long, but no consumerd for this size is available.\n", + msg->name, msg->pid, msg->bits_per_long); + goto error; } + lta = zmalloc(sizeof(struct ust_app)); if (lta == NULL) { PERROR("malloc"); - return -ENOMEM; + goto error; } lta->ppid = msg->ppid; lta->uid = msg->uid; lta->gid = msg->gid; - lta->compatible = 0; /* Not compatible until proven */ + lta->bits_per_long = msg->bits_per_long; + lta->uint8_t_alignment = msg->uint8_t_alignment; + lta->uint16_t_alignment = msg->uint16_t_alignment; + lta->uint32_t_alignment = msg->uint32_t_alignment; + lta->uint64_t_alignment = msg->uint64_t_alignment; + lta->long_alignment = msg->long_alignment; + lta->byte_order = msg->byte_order; + lta->v_major = msg->major; lta->v_minor = msg->minor; - strncpy(lta->name, msg->name, sizeof(lta->name)); - lta->name[16] = '\0'; lta->sessions = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + lta->ust_objd = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + lta->notify_sock = -1; + + /* Copy name and make sure it's NULL terminated. */ + strncpy(lta->name, msg->name, sizeof(lta->name)); + lta->name[UST_APP_PROCNAME_LEN] = '\0'; + + /* + * Before this can be called, when receiving the registration information, + * the application compatibility is checked. So, at this point, the + * application can work with this session daemon. + */ + lta->compatible = 1; lta->pid = msg->pid; - lttng_ht_node_init_ulong(<a->pid_n, (unsigned long)lta->pid); + lttng_ht_node_init_ulong(<a->pid_n, (unsigned long) lta->pid); lta->sock = sock; - lttng_ht_node_init_ulong(<a->sock_n, (unsigned long)lta->sock); + lttng_ht_node_init_ulong(<a->sock_n, (unsigned long) lta->sock); CDS_INIT_LIST_HEAD(<a->teardown_head); +error: + return lta; +} + +/* + * For a given application object, add it to every hash table. + */ +void ust_app_add(struct ust_app *app) +{ + assert(app); + assert(app->notify_sock >= 0); + rcu_read_lock(); /* * On a re-registration, we want to kick out the previous registration of * that pid */ - lttng_ht_add_replace_ulong(ust_app_ht, <a->pid_n); + lttng_ht_add_replace_ulong(ust_app_ht, &app->pid_n); /* * The socket _should_ be unique until _we_ call close. So, a add_unique * for the ust_app_ht_by_sock is used which asserts fail if the entry was * already in the table. */ - lttng_ht_add_unique_ulong(ust_app_ht_by_sock, <a->sock_n); + lttng_ht_add_unique_ulong(ust_app_ht_by_sock, &app->sock_n); + + /* Add application to the notify socket hash table. */ + lttng_ht_node_init_ulong(&app->notify_sock_n, app->notify_sock); + lttng_ht_add_unique_ulong(ust_app_ht_by_notify_sock, &app->notify_sock_n); + + DBG("App registered with pid:%d ppid:%d uid:%d gid:%d sock:%d name:%s " + "notify_sock:%d (version %d.%d)", app->pid, app->ppid, app->uid, + app->gid, app->sock, app->name, app->notify_sock, app->v_major, + app->v_minor); rcu_read_unlock(); +} - DBG("App registered with pid:%d ppid:%d uid:%d gid:%d sock:%d name:%s" - " (version %d.%d)", lta->pid, lta->ppid, lta->uid, lta->gid, - lta->sock, lta->name, lta->v_major, lta->v_minor); +/* + * Set the application version into the object. + * + * Return 0 on success else a negative value either an errno code or a + * LTTng-UST error code. + */ +int ust_app_version(struct ust_app *app) +{ + int ret; - return 0; + assert(app); + + ret = ustctl_tracer_version(app->sock, &app->version); + if (ret < 0) { + if (ret != -LTTNG_UST_ERR_EXITING && ret != -EPIPE) { + ERR("UST app %d verson failed with ret %d", app->sock, ret); + } else { + DBG3("UST app %d verion failed. Application is dead", app->sock); + } + } + + return ret; } /* @@ -1705,27 +2028,30 @@ void ust_app_unregister(int sock) /* Get the node reference for a call_rcu */ lttng_ht_lookup(ust_app_ht_by_sock, (void *)((unsigned long) sock), &iter); node = lttng_ht_iter_get_node_ulong(&iter); - if (node == NULL) { - ERR("Unable to find app by sock %d", sock); - goto error; - } + assert(node); lta = caa_container_of(node, struct ust_app, sock_n); - DBG("PID %d unregistering with sock %d", lta->pid, sock); /* Remove application from PID hash table */ ret = lttng_ht_del(ust_app_ht_by_sock, &iter); assert(!ret); - /* Assign second node for deletion */ - iter.iter.node = <a->pid_n.node; + /* + * Remove application from notify hash table. The thread handling the + * notify socket could have deleted the node so ignore on error because + * either way it's valid. The close of that socket is handled by the other + * thread. + */ + iter.iter.node = <a->notify_sock_n.node; + (void) lttng_ht_del(ust_app_ht_by_notify_sock, &iter); /* * Ignore return value since the node might have been removed before by an * add replace during app registration because the PID can be reassigned by * the OS. */ + iter.iter.node = <a->pid_n.node; ret = lttng_ht_del(ust_app_ht, &iter); if (ret) { DBG3("Unregister app by PID %d failed. This can happen on pid reuse", @@ -1745,13 +2071,29 @@ void ust_app_unregister(int sock) * Add session to list for teardown. This is safe since at this point we * are the only one using this list. */ + pthread_mutex_lock(&ua_sess->lock); + + /* + * Normally, this is done in the delete session process which is + * executed in the call rcu below. However, upon registration we can't + * afford to wait for the grace period before pushing data or else the + * data pending feature can race between the unregistration and stop + * command where the data pending command is sent *before* the grace + * period ended. + * + * The close metadata below nullifies the metadata pointer in the + * session so the delete session will NOT push/close a second time. + */ + (void) push_metadata(lta, ua_sess); + (void) close_metadata(lta, ua_sess); + cds_list_add(&ua_sess->teardown_node, <a->teardown_head); + pthread_mutex_unlock(&ua_sess->lock); } /* Free memory */ call_rcu(<a->pid_n.head, delete_ust_app_rcu); -error: rcu_read_unlock(); return; } @@ -1989,9 +2331,17 @@ void ust_app_clean_list(void) assert(!ret); } + /* Cleanup notify socket hash table */ + cds_lfht_for_each_entry(ust_app_ht_by_notify_sock->ht, &iter.iter, app, + notify_sock_n.node) { + ret = lttng_ht_del(ust_app_ht_by_notify_sock, &iter); + assert(!ret); + } + /* Destroy is done only when the ht is empty */ lttng_ht_destroy(ust_app_ht); lttng_ht_destroy(ust_app_ht_by_sock); + lttng_ht_destroy(ust_app_ht_by_notify_sock); rcu_read_unlock(); } @@ -2003,6 +2353,7 @@ void ust_app_ht_alloc(void) { ust_app_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); ust_app_ht_by_sock = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + ust_app_ht_by_notify_sock = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); } /* @@ -2293,9 +2644,11 @@ int ust_app_create_channel_glb(struct ltt_ust_session *usess, } assert(ua_sess); + pthread_mutex_lock(&ua_sess->lock); /* Create channel onto application. We don't need the chan ref. */ ret = create_ust_app_channel(ua_sess, uchan, app, usess->consumer, LTTNG_UST_CHAN_PER_CPU, NULL); + pthread_mutex_unlock(&ua_sess->lock); if (ret < 0) { if (ret == -ENOMEM) { /* No more memory is a fatal error. Stop right now. */ @@ -2303,7 +2656,7 @@ int ust_app_create_channel_glb(struct ltt_ust_session *usess, } /* Cleanup the created session if it's the case. */ if (created) { - destroy_session(app, ua_sess); + destroy_app_session(app, ua_sess); } } } @@ -2353,6 +2706,8 @@ int ust_app_enable_event_glb(struct ltt_ust_session *usess, continue; } + pthread_mutex_lock(&ua_sess->lock); + /* Lookup channel in the ust app session */ lttng_ht_lookup(ua_sess->channels, (void *)uchan->name, &uiter); ua_chan_node = lttng_ht_iter_get_node_str(&uiter); @@ -2367,13 +2722,16 @@ int ust_app_enable_event_glb(struct ltt_ust_session *usess, if (ua_event == NULL) { DBG3("UST app enable event %s not found for app PID %d." "Skipping app", uevent->attr.name, app->pid); - continue; + goto next_app; } ret = enable_ust_app_event(ua_sess, ua_event, app); if (ret < 0) { + pthread_mutex_unlock(&ua_sess->lock); goto error; } + next_app: + pthread_mutex_unlock(&ua_sess->lock); } error: @@ -2415,6 +2773,7 @@ int ust_app_create_event_glb(struct ltt_ust_session *usess, continue; } + pthread_mutex_lock(&ua_sess->lock); /* Lookup channel in the ust app session */ lttng_ht_lookup(ua_sess->channels, (void *)uchan->name, &uiter); ua_chan_node = lttng_ht_iter_get_node_str(&uiter); @@ -2424,6 +2783,7 @@ int ust_app_create_event_glb(struct ltt_ust_session *usess, ua_chan = caa_container_of(ua_chan_node, struct ust_app_channel, node); ret = create_ust_app_event(ua_sess, ua_chan, uevent, app); + pthread_mutex_unlock(&ua_sess->lock); if (ret < 0) { if (ret != -LTTNG_UST_ERR_EXIST) { /* Possible value at this point: -ENOMEM. If so, we stop! */ @@ -2462,6 +2822,8 @@ int ust_app_start_trace(struct ltt_ust_session *usess, struct ust_app *app) goto end; } + pthread_mutex_lock(&ua_sess->lock); + /* Upon restart, we skip the setup, already done */ if (ua_sess->started) { goto skip_setup; @@ -2475,7 +2837,7 @@ int ust_app_start_trace(struct ltt_ust_session *usess, struct ust_app *app) if (ret < 0) { if (ret != -EEXIST) { ERR("Trace directory creation error"); - goto error_rcu_unlock; + goto error_unlock; } } } @@ -2483,7 +2845,7 @@ int ust_app_start_trace(struct ltt_ust_session *usess, struct ust_app *app) /* Create the metadata for the application. */ ret = create_ust_app_metadata(ua_sess, app, usess->consumer); if (ret < 0) { - goto error_rcu_unlock; + goto error_unlock; } health_code_update(); @@ -2498,12 +2860,14 @@ skip_setup: } else { DBG("UST app start session failed. Application is dead."); } - goto error_rcu_unlock; + goto error_unlock; } /* Indicate that the session has been started once */ ua_sess->started = 1; + pthread_mutex_unlock(&ua_sess->lock); + health_code_update(); /* Quiescent wait after starting trace */ @@ -2518,7 +2882,8 @@ end: health_code_update(); return 0; -error_rcu_unlock: +error_unlock: + pthread_mutex_unlock(&ua_sess->lock); rcu_read_unlock(); health_code_update(); return -1; @@ -2539,14 +2904,16 @@ int ust_app_stop_trace(struct ltt_ust_session *usess, struct ust_app *app) rcu_read_lock(); if (!app->compatible) { - goto end; + goto end_no_session; } ua_sess = lookup_session_by_app(usess, app); if (ua_sess == NULL) { - goto end; + goto end_no_session; } + pthread_mutex_lock(&ua_sess->lock); + /* * If started = 0, it means that stop trace has been called for a session * that was never started. It's possible since we can have a fail start @@ -2596,7 +2963,7 @@ int ust_app_stop_trace(struct ltt_ust_session *usess, struct ust_app *app) DBG3("UST app failed to flush %s. Application is dead.", ua_chan->name); /* No need to continue. */ - goto end; + break; } /* Continuing flushing all buffers */ continue; @@ -2605,25 +2972,19 @@ int ust_app_stop_trace(struct ltt_ust_session *usess, struct ust_app *app) health_code_update(); - assert(ua_sess->metadata->is_sent); - /* Flush all buffers before stopping */ - ret = ustctl_sock_flush_buffer(app->sock, ua_sess->metadata->obj); + ret = push_metadata(app, ua_sess); if (ret < 0) { - if (ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) { - ERR("UST app PID %d metadata flush failed with ret %d", app->pid, - ret); - goto error_rcu_unlock; - } else { - DBG3("UST app failed to flush metadata. Application is dead."); - } + goto error_rcu_unlock; } -end: + pthread_mutex_unlock(&ua_sess->lock); +end_no_session: rcu_read_unlock(); health_code_update(); return 0; error_rcu_unlock: + pthread_mutex_unlock(&ua_sess->lock); rcu_read_unlock(); health_code_update(); return -1; @@ -2656,7 +3017,7 @@ static int destroy_trace(struct ltt_ust_session *usess, struct ust_app *app) ua_sess = caa_container_of(node, struct ust_app_session, node); health_code_update(); - destroy_session(app, ua_sess); + destroy_app_session(app, ua_sess); health_code_update(); @@ -2666,7 +3027,6 @@ static int destroy_trace(struct ltt_ust_session *usess, struct ust_app *app) ERR("UST app wait quiescent failed for app pid %d ret %d", app->pid, ret); } - end: rcu_read_unlock(); health_code_update(); @@ -2715,6 +3075,7 @@ int ust_app_stop_trace_all(struct ltt_ust_session *usess) cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) { ret = ust_app_stop_trace(usess, app); if (ret < 0) { + ERR("UST app stop trace failed with ret %d", ret); /* Continue to next apps even on error */ continue; } @@ -2774,7 +3135,11 @@ void ust_app_global_update(struct ltt_ust_session *usess, int sock) app = find_app_by_sock(sock); if (app == NULL) { - ERR("Failed to find app sock %d", sock); + /* + * Application can be unregistered before so this is possible hence + * simply stopping the update. + */ + DBG3("UST app update failed to find app sock %d", sock); goto error; } @@ -2789,8 +3154,10 @@ void ust_app_global_update(struct ltt_ust_session *usess, int sock) } assert(ua_sess); + pthread_mutex_lock(&ua_sess->lock); + /* - * We can iterate safely here over all UST app session sicne the create ust + * We can iterate safely here over all UST app session since the create ust * app session above made a shadow copy of the UST global domain from the * ltt ust session. */ @@ -2803,14 +3170,14 @@ void ust_app_global_update(struct ltt_ust_session *usess, int sock) * descriptor are available or ENOMEM so stopping here is the only * thing we can do for now. */ - goto error; + goto error_unlock; } cds_lfht_for_each_entry(ua_chan->ctx->ht, &iter_ctx.iter, ua_ctx, node.node) { ret = create_ust_channel_context(ua_chan, ua_ctx, app); if (ret < 0) { - goto error; + goto error_unlock; } } @@ -2820,11 +3187,13 @@ void ust_app_global_update(struct ltt_ust_session *usess, int sock) node.node) { ret = create_ust_event(app, ua_sess, ua_chan, ua_event); if (ret < 0) { - goto error; + goto error_unlock; } } } + pthread_mutex_unlock(&ua_sess->lock); + if (usess->start_trace) { ret = ust_app_start_trace(usess, app); if (ret < 0) { @@ -2838,9 +3207,11 @@ void ust_app_global_update(struct ltt_ust_session *usess, int sock) rcu_read_unlock(); return; +error_unlock: + pthread_mutex_unlock(&ua_sess->lock); error: if (ua_sess) { - destroy_session(app, ua_sess); + destroy_app_session(app, ua_sess); } rcu_read_unlock(); return; @@ -2874,19 +3245,21 @@ int ust_app_add_ctx_channel_glb(struct ltt_ust_session *usess, continue; } + pthread_mutex_lock(&ua_sess->lock); /* Lookup channel in the ust app session */ lttng_ht_lookup(ua_sess->channels, (void *)uchan->name, &uiter); ua_chan_node = lttng_ht_iter_get_node_str(&uiter); if (ua_chan_node == NULL) { - continue; + goto next_app; } ua_chan = caa_container_of(ua_chan_node, struct ust_app_channel, node); - ret = create_ust_app_channel_context(ua_sess, ua_chan, &uctx->ctx, app); if (ret < 0) { - continue; + goto next_app; } + next_app: + pthread_mutex_unlock(&ua_sess->lock); } rcu_read_unlock(); @@ -2915,20 +3288,22 @@ int ust_app_enable_event_pid(struct ltt_ust_session *usess, if (app == NULL) { ERR("UST app enable event per PID %d not found", pid); ret = -1; - goto error; + goto end; } if (!app->compatible) { ret = 0; - goto error; + goto end; } ua_sess = lookup_session_by_app(usess, app); if (!ua_sess) { /* The application has problem or is probably dead. */ - goto error; + ret = 0; + goto end; } + pthread_mutex_lock(&ua_sess->lock); /* Lookup channel in the ust app session */ lttng_ht_lookup(ua_sess->channels, (void *)uchan->name, &iter); ua_chan_node = lttng_ht_iter_get_node_str(&iter); @@ -2942,16 +3317,18 @@ int ust_app_enable_event_pid(struct ltt_ust_session *usess, if (ua_event == NULL) { ret = create_ust_app_event(ua_sess, ua_chan, uevent, app); if (ret < 0) { - goto error; + goto end_unlock; } } else { ret = enable_ust_app_event(ua_sess, ua_event, app); if (ret < 0) { - goto error; + goto end_unlock; } } -error: +end_unlock: + pthread_mutex_unlock(&ua_sess->lock); +end: rcu_read_unlock(); return ret; } @@ -3019,52 +3396,6 @@ error: return ret; } -/* - * Validate version of UST apps and set the compatible bit. - */ -int ust_app_validate_version(int sock) -{ - int ret; - struct ust_app *app; - - rcu_read_lock(); - - app = find_app_by_sock(sock); - assert(app); - - health_code_update(); - - ret = ustctl_tracer_version(sock, &app->version); - if (ret < 0) { - if (ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) { - ERR("UST app tracer version failed for app pid %d", app->pid); - } - goto error; - } - - /* Validate version */ - if (app->version.major != UST_APP_MAJOR_VERSION) { - goto error; - } - - DBG2("UST app PID %d is compatible with internal major version %d " - "(supporting == %d)", app->pid, app->version.major, - UST_APP_MAJOR_VERSION); - app->compatible = 1; - rcu_read_unlock(); - health_code_update(); - return 0; - -error: - DBG2("UST app PID %d is not compatible with internal major version %d " - "(supporting == %d)", app->pid, app->version.major, - UST_APP_MAJOR_VERSION); - app->compatible = 0; - rcu_read_unlock(); - health_code_update(); - return -1; -} - /* * Calibrate registered applications. */ @@ -3110,3 +3441,409 @@ int ust_app_calibrate_glb(struct lttng_ust_calibrate *calibrate) return ret; } + +/* + * Receive registration and populate the given msg structure. + * + * On success return 0 else a negative value returned by the ustctl call. + */ +int ust_app_recv_registration(int sock, struct ust_register_msg *msg) +{ + int ret; + uint32_t pid, ppid, uid, gid; + + assert(msg); + + ret = ustctl_recv_reg_msg(sock, &msg->type, &msg->major, &msg->minor, + &pid, &ppid, &uid, &gid, + &msg->bits_per_long, + &msg->uint8_t_alignment, + &msg->uint16_t_alignment, + &msg->uint32_t_alignment, + &msg->uint64_t_alignment, + &msg->long_alignment, + &msg->byte_order, + msg->name); + if (ret < 0) { + switch (-ret) { + case EPIPE: + case ECONNRESET: + case LTTNG_UST_ERR_EXITING: + DBG3("UST app recv reg message failed. Application died"); + break; + case LTTNG_UST_ERR_UNSUP_MAJOR: + ERR("UST app recv reg unsupported version %d.%d. Supporting %d.%d", + msg->major, msg->minor, LTTNG_UST_ABI_MAJOR_VERSION, + LTTNG_UST_ABI_MINOR_VERSION); + break; + default: + ERR("UST app recv reg message failed with ret %d", ret); + break; + } + goto error; + } + msg->pid = (pid_t) pid; + msg->ppid = (pid_t) ppid; + msg->uid = (uid_t) uid; + msg->gid = (gid_t) gid; + +error: + return ret; +} + +/* + * Return a ust app channel object using the application object and the channel + * object descriptor has a key. If not found, NULL is returned. A RCU read side + * lock MUST be acquired before calling this function. + */ +static struct ust_app_channel *find_channel_by_objd(struct ust_app *app, + int objd) +{ + struct lttng_ht_node_ulong *node; + struct lttng_ht_iter iter; + struct ust_app_channel *ua_chan = NULL; + + assert(app); + + lttng_ht_lookup(app->ust_objd, (void *)((unsigned long) objd), &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node == NULL) { + DBG2("UST app channel find by objd %d not found", objd); + goto error; + } + + ua_chan = caa_container_of(node, struct ust_app_channel, ust_objd_node); + +error: + return ua_chan; +} + +/* + * Reply to a register channel notification from an application on the notify + * socket. The channel metadata is also created. + * + * The session UST registry lock is acquired in this function. + * + * On success 0 is returned else a negative value. + */ +static int reply_ust_register_channel(int sock, int sobjd, int cobjd, + size_t nr_fields, struct ustctl_field *fields) +{ + int ret, ret_code = 0; + uint32_t chan_id, reg_count; + enum ustctl_channel_header type; + struct ust_app *app; + struct ust_app_channel *ua_chan; + struct ust_app_session *ua_sess; + + rcu_read_lock(); + + /* Lookup application. If not found, there is a code flow error. */ + app = find_app_by_notify_sock(sock); + if (!app) { + DBG("Application socket %d is being teardown. Abort event notify", + sock); + ret = 0; + goto error_rcu_unlock; + } + + /* Lookup channel by UST object descriptor. Should always be found. */ + ua_chan = find_channel_by_objd(app, cobjd); + assert(ua_chan); + assert(ua_chan->session); + ua_sess = ua_chan->session; + assert(ua_sess); + + pthread_mutex_lock(&ua_sess->registry.lock); + + if (ust_registry_is_max_id(ua_chan->session->registry.used_channel_id)) { + ret_code = -1; + chan_id = -1U; + type = -1; + goto reply; + } + + /* Don't assign ID to metadata. */ + if (ua_chan->attr.type == LTTNG_UST_CHAN_METADATA) { + chan_id = -1U; + } else { + chan_id = ust_registry_get_next_chan_id(&ua_chan->session->registry); + } + + reg_count = ust_registry_get_event_count(&ua_chan->registry); + if (reg_count < 31) { + type = USTCTL_CHANNEL_HEADER_COMPACT; + } else { + type = USTCTL_CHANNEL_HEADER_LARGE; + } + + ua_chan->registry.nr_ctx_fields = nr_fields; + ua_chan->registry.ctx_fields = fields; + ua_chan->registry.chan_id = chan_id; + ua_chan->registry.header_type = type; + + /* Append to metadata */ + if (!ret_code) { + ret_code = ust_metadata_channel_statedump(&ua_chan->session->registry, + &ua_chan->registry); + if (ret_code) { + ERR("Error appending channel metadata (errno = %d)", ret_code); + goto reply; + } + } + +reply: + DBG3("UST app replying to register channel with id %u, type: %d, ret: %d", + chan_id, type, ret_code); + + ret = ustctl_reply_register_channel(sock, chan_id, type, ret_code); + if (ret < 0) { + if (ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) { + ERR("UST app reply channel failed with ret %d", ret); + } else { + DBG3("UST app reply channel failed. Application died"); + } + goto error; + } + +error: + pthread_mutex_unlock(&ua_sess->registry.lock); +error_rcu_unlock: + rcu_read_unlock(); + return ret; +} + +/* + * Add event to the UST channel registry. When the event is added to the + * registry, the metadata is also created. Once done, this replies to the + * application with the appropriate error code. + * + * The session UST registry lock is acquired in the function. + * + * On success 0 is returned else a negative value. + */ +static int add_event_ust_registry(int sock, int sobjd, int cobjd, char *name, + char *sig, size_t nr_fields, struct ustctl_field *fields, int loglevel, + char *model_emf_uri) +{ + int ret, ret_code; + uint32_t event_id = 0; + struct ust_app *app; + struct ust_app_channel *ua_chan; + struct ust_app_session *ua_sess; + + rcu_read_lock(); + + /* Lookup application. If not found, there is a code flow error. */ + app = find_app_by_notify_sock(sock); + if (!app) { + DBG("Application socket %d is being teardown. Abort event notify", + sock); + ret = 0; + goto error_rcu_unlock; + } + + /* Lookup channel by UST object descriptor. Should always be found. */ + ua_chan = find_channel_by_objd(app, cobjd); + assert(ua_chan); + assert(ua_chan->session); + ua_sess = ua_chan->session; + + pthread_mutex_lock(&ua_sess->registry.lock); + + ret_code = ust_registry_create_event(&ua_sess->registry, + &ua_chan->registry, sobjd, cobjd, name, sig, nr_fields, fields, + loglevel, model_emf_uri, &event_id); + + /* + * The return value is returned to ustctl so in case of an error, the + * application can be notified. In case of an error, it's important not to + * return a negative error or else the application will get closed. + */ + ret = ustctl_reply_register_event(sock, event_id, ret_code); + if (ret < 0) { + if (ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) { + ERR("UST app reply event failed with ret %d", ret); + } else { + DBG3("UST app reply event failed. Application died"); + } + /* + * No need to wipe the create event since the application socket will + * get close on error hence cleaning up everything by itself. + */ + goto error; + } + + DBG3("UST registry event %s has been added successfully", name); + +error: + pthread_mutex_unlock(&ua_sess->registry.lock); +error_rcu_unlock: + rcu_read_unlock(); + return ret; +} + +/* + * Handle application notification through the given notify socket. + * + * Return 0 on success or else a negative value. + */ +int ust_app_recv_notify(int sock) +{ + int ret; + enum ustctl_notify_cmd cmd; + + DBG3("UST app receiving notify from sock %d", sock); + + ret = ustctl_recv_notify(sock, &cmd); + if (ret < 0) { + if (ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) { + ERR("UST app recv notify failed with ret %d", ret); + } else { + DBG3("UST app recv notify failed. Application died"); + } + goto error; + } + + switch (cmd) { + case USTCTL_NOTIFY_CMD_EVENT: + { + int sobjd, cobjd, loglevel; + char name[LTTNG_UST_SYM_NAME_LEN], *sig, *model_emf_uri; + size_t nr_fields; + struct ustctl_field *fields; + + DBG2("UST app ustctl register event received"); + + ret = ustctl_recv_register_event(sock, &sobjd, &cobjd, name, &loglevel, + &sig, &nr_fields, &fields, &model_emf_uri); + if (ret < 0) { + if (ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) { + ERR("UST app recv event failed with ret %d", ret); + } else { + DBG3("UST app recv event failed. Application died"); + } + goto error; + } + + /* Add event to the UST registry coming from the notify socket. */ + ret = add_event_ust_registry(sock, sobjd, cobjd, name, sig, nr_fields, + fields, loglevel, model_emf_uri); + if (ret < 0) { + goto error; + } + + break; + } + case USTCTL_NOTIFY_CMD_CHANNEL: + { + int sobjd, cobjd; + size_t nr_fields; + struct ustctl_field *fields; + + DBG2("UST app ustctl register channel received"); + + ret = ustctl_recv_register_channel(sock, &sobjd, &cobjd, &nr_fields, + &fields); + if (ret < 0) { + if (ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) { + ERR("UST app recv channel failed with ret %d", ret); + } else { + DBG3("UST app recv channel failed. Application died"); + } + goto error; + } + + ret = reply_ust_register_channel(sock, sobjd, cobjd, nr_fields, + fields); + if (ret < 0) { + goto error; + } + + break; + } + default: + /* Should NEVER happen. */ + assert(0); + } + +error: + return ret; +} + +/* + * Once the notify socket hangs up, this is called. First, it tries to find the + * corresponding application. On failure, the call_rcu to close the socket is + * executed. If an application is found, it tries to delete it from the notify + * socket hash table. Whathever the result, it proceeds to the call_rcu. + * + * Note that an object needs to be allocated here so on ENOMEM failure, the + * call RCU is not done but the rest of the cleanup is. + */ +void ust_app_notify_sock_unregister(int sock) +{ + int err_enomem = 0; + struct lttng_ht_iter iter; + struct ust_app *app; + struct ust_app_notify_sock_obj *obj; + + assert(sock >= 0); + + rcu_read_lock(); + + obj = zmalloc(sizeof(*obj)); + if (!obj) { + /* + * An ENOMEM is kind of uncool. If this strikes we continue the + * procedure but the call_rcu will not be called. In this case, we + * accept the fd leak rather than possibly creating an unsynchronized + * state between threads. + * + * TODO: The notify object should be created once the notify socket is + * registered and stored independantely from the ust app object. The + * tricky part is to synchronize the teardown of the application and + * this notify object. Let's keep that in mind so we can avoid this + * kind of shenanigans with ENOMEM in the teardown path. + */ + err_enomem = 1; + } else { + obj->fd = sock; + } + + DBG("UST app notify socket unregister %d", sock); + + /* + * Lookup application by notify socket. If this fails, this means that the + * hash table delete has already been done by the application + * unregistration process so we can safely close the notify socket in a + * call RCU. + */ + app = find_app_by_notify_sock(sock); + if (!app) { + goto close_socket; + } + + iter.iter.node = &app->notify_sock_n.node; + + /* + * Whatever happens here either we fail or succeed, in both cases we have + * to close the socket after a grace period to continue to the call RCU + * here. If the deletion is successful, the application is not visible + * anymore by other threads and is it fails it means that it was already + * deleted from the hash table so either way we just have to close the + * socket. + */ + (void) lttng_ht_del(ust_app_ht_by_notify_sock, &iter); + +close_socket: + rcu_read_unlock(); + + /* + * Close socket after a grace period to avoid for the socket to be reused + * before the application object is freed creating potential race between + * threads trying to add unique in the global hash table. + */ + if (!err_enomem) { + call_rcu(&obj->head, close_notify_sock_rcu); + } +} diff --git a/src/bin/lttng-sessiond/ust-app.h b/src/bin/lttng-sessiond/ust-app.h index e8bb9a980..c6294d0a4 100644 --- a/src/bin/lttng-sessiond/ust-app.h +++ b/src/bin/lttng-sessiond/ust-app.h @@ -22,18 +22,28 @@ #include #include "trace-ust.h" - -/* lttng-ust supported version. */ -#define LTTNG_UST_COMM_MAJOR 2 /* comm protocol major version */ -#define UST_APP_MAJOR_VERSION 3 /* Internal UST version supported */ +#include "ust-registry.h" #define UST_APP_EVENT_LIST_SIZE 32 +/* Process name (short). */ +#define UST_APP_PROCNAME_LEN 16 + struct lttng_filter_bytecode; struct lttng_ust_filter_bytecode; extern int ust_consumerd64_fd, ust_consumerd32_fd; +/* + * Object used to close the notify socket in a call_rcu(). Since the + * application might not be found, we need an independant object containing the + * notify socket fd. + */ +struct ust_app_notify_sock_obj { + int fd; + struct rcu_head head; +}; + struct ust_app_ht_key { const char *name; const struct lttng_ust_filter_bytecode *filter; @@ -44,14 +54,23 @@ struct ust_app_ht_key { * Application registration data structure. */ struct ust_register_msg { + enum ustctl_socket_type type; uint32_t major; uint32_t minor; + uint32_t abi_major; + uint32_t abi_minor; pid_t pid; pid_t ppid; uid_t uid; gid_t gid; uint32_t bits_per_long; - char name[16]; + uint32_t uint8_t_alignment; + uint32_t uint16_t_alignment; + uint32_t uint32_t_alignment; + uint32_t uint64_t_alignment; + uint32_t long_alignment; + int byte_order; /* BIG_ENDIAN or LITTLE_ENDIAN */ + char name[LTTNG_UST_ABI_PROCNAME_LEN]; }; /* @@ -66,6 +85,12 @@ struct lttng_ht *ust_app_ht; */ struct lttng_ht *ust_app_ht_by_sock; +/* + * Global applications HT used by the session daemon. This table is indexed by + * socket using the notify_sock_n node and notify_sock value of an ust_app. + */ +struct lttng_ht *ust_app_ht_by_notify_sock; + /* Stream list containing ust_app_stream. */ struct ust_app_stream_list { unsigned int count; @@ -97,7 +122,6 @@ struct ust_app_stream { struct lttng_ust_object_data *obj; /* Using a list of streams to keep order. */ struct cds_list_head list; - struct ustctl_consumer_stream *ustream; }; struct ust_app_channel { @@ -106,26 +130,47 @@ struct ust_app_channel { /* Channel and streams were sent to the UST tracer. */ int is_sent; /* Unique key used to identify the channel on the consumer side. */ - unsigned long key; + uint64_t key; /* Number of stream that this channel is expected to receive. */ unsigned int expected_stream_count; char name[LTTNG_UST_SYM_NAME_LEN]; struct lttng_ust_object_data *obj; struct ustctl_consumer_channel_attr attr; - struct ustctl_consumer_channel *channel; struct ust_app_stream_list streams; + /* Session pointer that owns this object. */ + struct ust_app_session *session; struct lttng_ht *ctx; struct lttng_ht *events; + /* + * UST event registry. The ONLY writer is the application thread. + */ + struct ust_registry_channel registry; + /* + * Node indexed by channel name in the channels' hash table of a session. + */ struct lttng_ht_node_str node; + /* + * Node indexed by UST channel object descriptor (handle). Stored in the + * ust_objd hash table in the ust_app object. + */ + struct lttng_ht_node_ulong ust_objd_node; }; struct ust_app_session { + /* + * Lock protecting this session's ust app interaction. Held + * across command send/recv to/from app. Never nests within the + * session registry lock. + */ + pthread_mutex_t lock; + int enabled; /* started: has the session been in started state at any time ? */ int started; /* allows detection of start vs restart. */ int handle; /* used has unique identifier for app session */ int id; /* session unique identifier */ struct ust_app_channel *metadata; + struct ust_registry_session registry; struct lttng_ht *channels; /* Registered channels */ struct lttng_ht_node_ulong node; char path[PATH_MAX]; @@ -133,8 +178,11 @@ struct ust_app_session { uid_t uid; gid_t gid; struct cds_list_head teardown_node; - /* Universal unique identifier used by the tracer. */ - unsigned char uuid[UUID_STR_LEN]; + /* + * Once at least *one* session is created onto the application, the + * corresponding consumer is set so we can use it on unregistration. + */ + struct consumer_output *consumer; }; /* @@ -143,21 +191,33 @@ struct ust_app_session { */ struct ust_app { int sock; + int notify_sock; pid_t pid; pid_t ppid; uid_t uid; /* User ID that owns the apps */ gid_t gid; /* Group ID that owns the apps */ - int bits_per_long; + + /* App ABI */ + uint32_t bits_per_long; + uint32_t uint8_t_alignment; + uint32_t uint16_t_alignment; + uint32_t uint32_t_alignment; + uint32_t uint64_t_alignment; + uint32_t long_alignment; + int byte_order; /* BIG_ENDIAN or LITTLE_ENDIAN */ + int compatible; /* If the lttng-ust tracer version does not match the supported version of the session daemon, this flag is set to 0 (NOT compatible) else 1. */ struct lttng_ust_tracer_version version; - uint32_t v_major; /* Verion major number */ - uint32_t v_minor; /* Verion minor number */ - char name[17]; /* Process name (short) */ + uint32_t v_major; /* Version major number */ + uint32_t v_minor; /* Version minor number */ + /* Extra for the NULL byte. */ + char name[UST_APP_PROCNAME_LEN + 1]; struct lttng_ht *sessions; struct lttng_ht_node_ulong pid_n; struct lttng_ht_node_ulong sock_n; + struct lttng_ht_node_ulong notify_sock_n; /* * This is a list of ust app session that, once the app is going into * teardown mode, in the RCU call, each node in this list is removed and @@ -168,6 +228,10 @@ struct ust_app { * when a session is destroyed. */ struct cds_list_head teardown_head; + /* + * Hash table containing ust_app_channel indexed by channel objd. + */ + struct lttng_ht *ust_objd; }; #ifdef HAVE_LIBLTTNG_UST_CTL @@ -178,6 +242,7 @@ int ust_app_register_done(int sock) { return ustctl_register_done(sock); } +int ust_app_version(struct ust_app *app); void ust_app_unregister(int sock); unsigned long ust_app_list_count(void); int ust_app_start_trace(struct ltt_ust_session *usess, struct ust_app *app); @@ -217,9 +282,13 @@ void ust_app_clean_list(void); void ust_app_ht_alloc(void); struct lttng_ht *ust_app_get_ht(void); struct ust_app *ust_app_find_by_pid(pid_t pid); -int ust_app_validate_version(int sock); int ust_app_calibrate_glb(struct lttng_ust_calibrate *calibrate); struct ust_app_stream *ust_app_alloc_stream(void); +int ust_app_recv_registration(int sock, struct ust_register_msg *msg); +int ust_app_recv_notify(int sock); +void ust_app_add(struct ust_app *app); +struct ust_app *ust_app_create(struct ust_register_msg *msg, int sock); +void ust_app_notify_sock_unregister(int sock); #else /* HAVE_LIBLTTNG_UST_CTL */ @@ -264,6 +333,11 @@ int ust_app_register_done(int sock) return -ENOSYS; } static inline +int ust_app_version(struct ust_app *app) +{ + return -ENOSYS; +} +static inline void ust_app_unregister(int sock) { } @@ -374,15 +448,33 @@ int ust_app_disable_event_pid(struct ltt_ust_session *usess, return 0; } static inline -int ust_app_validate_version(int sock) +int ust_app_calibrate_glb(struct lttng_ust_calibrate *calibrate) { return 0; } static inline -int ust_app_calibrate_glb(struct lttng_ust_calibrate *calibrate) +int ust_app_recv_registration(int sock, struct ust_register_msg *msg) { return 0; } +static inline +int ust_app_recv_notify(int sock) +{ + return 0; +} +static inline +struct ust_app *ust_app_create(struct ust_register_msg *msg, int sock) +{ + return NULL; +} +static inline +void ust_app_add(struct ust_app *app) +{ +} +static inline +void ust_app_notify_sock_unregister(int sock) +{ +} #endif /* HAVE_LIBLTTNG_UST_CTL */ diff --git a/src/bin/lttng-sessiond/ust-clock.h b/src/bin/lttng-sessiond/ust-clock.h new file mode 100644 index 000000000..7d9c99a66 --- /dev/null +++ b/src/bin/lttng-sessiond/ust-clock.h @@ -0,0 +1,83 @@ +/* + * Copyright (C) 2010 Pierre-Marc Fournier + * Copyright (C) 2011 Mathieu Desnoyers + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; version 2.1 of + * the License. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef _UST_CLOCK_H +#define _UST_CLOCK_H + +#include +#include +#include +#include +#include +#include + +#include + +/* TRACE CLOCK */ + +/* + * Currently using the kernel MONOTONIC clock, waiting for kernel-side + * LTTng to implement mmap'd trace clock. + */ + +/* Choosing correct trace clock */ + +static __inline__ +uint64_t trace_clock_read64(void) +{ + struct timespec ts; + + clock_gettime(CLOCK_MONOTONIC, &ts); + return ((uint64_t) ts.tv_sec * 1000000000ULL) + ts.tv_nsec; +} + +static __inline__ +uint64_t trace_clock_freq(void) +{ + return 1000000000ULL; +} + +static __inline__ +int trace_clock_uuid(char *uuid) +{ + int ret = 0; + size_t len; + FILE *fp; + + /* + * boot_id needs to be read once before being used concurrently + * to deal with a Linux kernel race. A fix is proposed for + * upstream, but the work-around is needed for older kernels. + */ + fp = fopen("/proc/sys/kernel/random/boot_id", "r"); + if (!fp) { + return -ENOENT; + } + len = fread(uuid, 1, UUID_STR_LEN - 1, fp); + if (len < UUID_STR_LEN - 1) { + ret = -EINVAL; + goto end; + } + uuid[UUID_STR_LEN - 1] = '\0'; +end: + fclose(fp); + return ret; +} + +#endif /* _UST_CLOCK_H */ diff --git a/src/bin/lttng-sessiond/ust-consumer.c b/src/bin/lttng-sessiond/ust-consumer.c index 7c1ae402c..33cb6ff37 100644 --- a/src/bin/lttng-sessiond/ust-consumer.c +++ b/src/bin/lttng-sessiond/ust-consumer.c @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -99,7 +100,7 @@ static int ask_channel_creation(struct ust_app_session *ua_sess, struct consumer_socket *socket) { int ret; - unsigned long key; + uint64_t key; char *pathname = NULL; struct lttcomm_consumer_msg msg; @@ -132,7 +133,7 @@ static int ask_channel_creation(struct ust_app_session *ua_sess, ua_sess->gid, consumer->net_seq_index, ua_chan->key, - ua_sess->uuid); + ua_sess->registry.uuid); health_code_update(); @@ -151,7 +152,7 @@ static int ask_channel_creation(struct ust_app_session *ua_sess, /* We need at least one where 1 stream for 1 cpu. */ assert(ua_chan->expected_stream_count > 0); - DBG2("UST ask channel %lu successfully done with %u stream(s)", key, + DBG2("UST ask channel %" PRIu64 " successfully done with %u stream(s)", key, ua_chan->expected_stream_count); error: @@ -228,7 +229,6 @@ int ust_consumer_get_channel(struct consumer_socket *socket, } goto error; } - ua_chan->handle = ua_chan->obj->handle; /* Next, get all streams. */ while (1) { @@ -345,6 +345,7 @@ int ust_consumer_send_stream_to_ust(struct ust_app *app, } goto error; } + channel->handle = channel->obj->handle; error: return ret; @@ -383,3 +384,131 @@ int ust_consumer_send_channel_to_ust(struct ust_app *app, error: return ret; } + +/* + * Send metadata string to consumer. + * + * Return 0 on success else a negative value. + */ +int ust_consumer_push_metadata(struct consumer_socket *socket, + struct ust_app_session *ua_sess, char *metadata_str, + size_t len, size_t target_offset) +{ + int ret; + struct lttcomm_consumer_msg msg; + + assert(socket); + assert(socket->fd >= 0); + assert(ua_sess); + assert(ua_sess->metadata); + + DBG2("UST consumer push metadata to consumer socket %d", socket->fd); + + msg.cmd_type = LTTNG_CONSUMER_PUSH_METADATA; + msg.u.push_metadata.key = ua_sess->metadata->key; + msg.u.push_metadata.target_offset = target_offset; + msg.u.push_metadata.len = len; + + /* + * TODO: reenable these locks when the consumerd gets the ability to + * reorder the metadata it receives. This fits with locking in + * src/bin/lttng-sessiond/ust-app.c:push_metadata() + * + * pthread_mutex_lock(socket->lock); + */ + + health_code_update(); + ret = consumer_send_msg(socket, &msg); + if (ret < 0) { + goto error; + } + + DBG3("UST consumer push metadata on sock %d of len %lu", socket->fd, len); + + ret = lttcomm_send_unix_sock(socket->fd, metadata_str, len); + if (ret < 0) { + fprintf(stderr, "send error: %d\n", ret); + goto error; + } + + health_code_update(); + ret = consumer_recv_status_reply(socket); + if (ret < 0) { + goto error; + } + +error: + health_code_update(); + /* + * pthread_mutex_unlock(socket->lock); + */ + return ret; +} + +/* + * Send a close metdata command to consumer using the given channel key. + * + * Return 0 on success else a negative value. + */ +int ust_consumer_close_metadata(struct consumer_socket *socket, + struct ust_app_channel *ua_chan) +{ + int ret; + struct lttcomm_consumer_msg msg; + + assert(ua_chan); + assert(socket); + assert(socket->fd >= 0); + + DBG2("UST consumer close metadata channel key %lu", ua_chan->key); + + msg.cmd_type = LTTNG_CONSUMER_CLOSE_METADATA; + msg.u.close_metadata.key = ua_chan->key; + + pthread_mutex_lock(socket->lock); + health_code_update(); + + ret = consumer_send_msg(socket, &msg); + if (ret < 0) { + goto error; + } + +error: + health_code_update(); + pthread_mutex_unlock(socket->lock); + return ret; +} + +/* + * Send a setup metdata command to consumer using the given channel key. + * + * Return 0 on success else a negative value. + */ +int ust_consumer_setup_metadata(struct consumer_socket *socket, + struct ust_app_channel *ua_chan) +{ + int ret; + struct lttcomm_consumer_msg msg; + + assert(ua_chan); + assert(socket); + assert(socket->fd >= 0); + + DBG2("UST consumer setup metadata channel key %lu", ua_chan->key); + + msg.cmd_type = LTTNG_CONSUMER_SETUP_METADATA; + msg.u.setup_metadata.key = ua_chan->key; + + pthread_mutex_lock(socket->lock); + health_code_update(); + + ret = consumer_send_msg(socket, &msg); + if (ret < 0) { + goto error; + } + +error: + health_code_update(); + pthread_mutex_unlock(socket->lock); + return ret; +} diff --git a/src/bin/lttng-sessiond/ust-consumer.h b/src/bin/lttng-sessiond/ust-consumer.h index f48ea42f2..8739af552 100644 --- a/src/bin/lttng-sessiond/ust-consumer.h +++ b/src/bin/lttng-sessiond/ust-consumer.h @@ -37,4 +37,14 @@ int ust_consumer_send_stream_to_ust(struct ust_app *app, int ust_consumer_send_channel_to_ust(struct ust_app *app, struct ust_app_session *ua_sess, struct ust_app_channel *channel); +int ust_consumer_push_metadata(struct consumer_socket *socket, + struct ust_app_session *ua_sess, char *metadata_str, + size_t len, size_t target_offset); + +int ust_consumer_close_metadata(struct consumer_socket *socket, + struct ust_app_channel *ua_chan); + +int ust_consumer_setup_metadata(struct consumer_socket *socket, + struct ust_app_channel *ua_chan); + #endif /* _UST_CONSUMER_H */ diff --git a/src/bin/lttng-sessiond/ust-metadata.c b/src/bin/lttng-sessiond/ust-metadata.c new file mode 100644 index 000000000..45512ac7b --- /dev/null +++ b/src/bin/lttng-sessiond/ust-metadata.c @@ -0,0 +1,674 @@ +/* + * ust-metadata.c + * + * LTTng-UST metadata generation + * + * Copyright (C) 2010-2013 Mathieu Desnoyers + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include + +#include "ust-registry.h" +#include "ust-clock.h" +#include "ust-app.h" + +#ifndef max_t +#define max_t(type, a, b) ((type) ((a) > (b) ? (a) : (b))) +#endif + +static inline +int fls(unsigned int x) +{ + int r = 32; + + if (!x) + return 0; + if (!(x & 0xFFFF0000U)) { + x <<= 16; + r -= 16; + } + if (!(x & 0xFF000000U)) { + x <<= 8; + r -= 8; + } + if (!(x & 0xF0000000U)) { + x <<= 4; + r -= 4; + } + if (!(x & 0xC0000000U)) { + x <<= 2; + r -= 2; + } + if (!(x & 0x80000000U)) { + x <<= 1; + r -= 1; + } + return r; +} + +static inline +int get_count_order(unsigned int count) +{ + int order; + + order = fls(count) - 1; + if (count & (count - 1)) + order++; + return order; +} + +/* + * Returns offset where to write in metadata array, or negative error value on error. + */ +static +ssize_t metadata_reserve(struct ust_registry_session *session, size_t len) +{ + size_t new_len = session->metadata_len + len; + size_t new_alloc_len = new_len; + size_t old_alloc_len = session->metadata_alloc_len; + ssize_t ret; + + if (new_alloc_len > (UINT32_MAX >> 1)) + return -EINVAL; + if ((old_alloc_len << 1) > (UINT32_MAX >> 1)) + return -EINVAL; + + if (new_alloc_len > old_alloc_len) { + char *newptr; + + new_alloc_len = + max_t(size_t, 1U << get_count_order(new_alloc_len), old_alloc_len << 1); + newptr = realloc(session->metadata, new_alloc_len); + if (!newptr) + return -ENOMEM; + session->metadata = newptr; + /* We zero directly the memory from start of allocation. */ + memset(&session->metadata[old_alloc_len], 0, new_alloc_len - old_alloc_len); + session->metadata_alloc_len = new_alloc_len; + } + ret = session->metadata_len; + session->metadata_len += len; + return ret; +} + +/* + * We have exclusive access to our metadata buffer (protected by the + * ust_lock), so we can do racy operations such as looking for + * remaining space left in packet and write, since mutual exclusion + * protects us from concurrent writes. + */ +static +int lttng_metadata_printf(struct ust_registry_session *session, + const char *fmt, ...) +{ + char *str = NULL; + size_t len; + va_list ap; + ssize_t offset; + int ret; + + va_start(ap, fmt); + ret = vasprintf(&str, fmt, ap); + va_end(ap); + if (ret < 0) + return -ENOMEM; + + len = strlen(str); + offset = metadata_reserve(session, len); + if (offset < 0) { + ret = offset; + goto end; + } + memcpy(&session->metadata[offset], str, len); + DBG3("Append to metadata: \"%s\"", str); + ret = 0; + +end: + free(str); + return ret; +} + +static +int _lttng_field_statedump(struct ust_registry_session *session, + const struct ustctl_field *field) +{ + int ret = 0; + const char *bo_be = " byte_order = be;"; + const char *bo_le = " byte_order = le;"; + const char *bo_native = ""; + const char *bo_reverse; + + if (session->byte_order == BIG_ENDIAN) + bo_reverse = bo_le; + else + bo_reverse = bo_be; + + switch (field->type.atype) { + case ustctl_atype_integer: + ret = lttng_metadata_printf(session, + " integer { size = %u; align = %u; signed = %u; encoding = %s; base = %u;%s } _%s;\n", + field->type.u.basic.integer.size, + field->type.u.basic.integer.alignment, + field->type.u.basic.integer.signedness, + (field->type.u.basic.integer.encoding == ustctl_encode_none) + ? "none" + : (field->type.u.basic.integer.encoding == ustctl_encode_UTF8) + ? "UTF8" + : "ASCII", + field->type.u.basic.integer.base, + field->type.u.basic.integer.reverse_byte_order ? bo_reverse : bo_native, + field->name); + break; + case ustctl_atype_float: + ret = lttng_metadata_printf(session, + " floating_point { exp_dig = %u; mant_dig = %u; align = %u;%s } _%s;\n", + field->type.u.basic._float.exp_dig, + field->type.u.basic._float.mant_dig, + field->type.u.basic._float.alignment, + field->type.u.basic.integer.reverse_byte_order ? bo_reverse : bo_native, + field->name); + break; + case ustctl_atype_enum: + return -EINVAL; + case ustctl_atype_array: + { + const struct ustctl_basic_type *elem_type; + + elem_type = &field->type.u.array.elem_type; + ret = lttng_metadata_printf(session, + " integer { size = %u; align = %u; signed = %u; encoding = %s; base = %u;%s } _%s[%u];\n", + elem_type->u.basic.integer.size, + elem_type->u.basic.integer.alignment, + elem_type->u.basic.integer.signedness, + (elem_type->u.basic.integer.encoding == ustctl_encode_none) + ? "none" + : (elem_type->u.basic.integer.encoding == ustctl_encode_UTF8) + ? "UTF8" + : "ASCII", + elem_type->u.basic.integer.base, + elem_type->u.basic.integer.reverse_byte_order ? bo_reverse : bo_native, + field->name, field->type.u.array.length); + break; + } + case ustctl_atype_sequence: + { + const struct ustctl_basic_type *elem_type; + const struct ustctl_basic_type *length_type; + + elem_type = &field->type.u.sequence.elem_type; + length_type = &field->type.u.sequence.length_type; + ret = lttng_metadata_printf(session, + " integer { size = %u; align = %u; signed = %u; encoding = %s; base = %u;%s } __%s_length;\n", + length_type->u.basic.integer.size, + (unsigned int) length_type->u.basic.integer.alignment, + length_type->u.basic.integer.signedness, + (length_type->u.basic.integer.encoding == ustctl_encode_none) + ? "none" + : ((length_type->u.basic.integer.encoding == ustctl_encode_UTF8) + ? "UTF8" + : "ASCII"), + length_type->u.basic.integer.base, + length_type->u.basic.integer.reverse_byte_order ? bo_reverse : bo_native, + field->name); + if (ret) + return ret; + + ret = lttng_metadata_printf(session, + " integer { size = %u; align = %u; signed = %u; encoding = %s; base = %u;%s } _%s[ __%s_length ];\n", + elem_type->u.basic.integer.size, + (unsigned int) elem_type->u.basic.integer.alignment, + elem_type->u.basic.integer.signedness, + (elem_type->u.basic.integer.encoding == ustctl_encode_none) + ? "none" + : ((elem_type->u.basic.integer.encoding == ustctl_encode_UTF8) + ? "UTF8" + : "ASCII"), + elem_type->u.basic.integer.base, + elem_type->u.basic.integer.reverse_byte_order ? bo_reverse : bo_native, + field->name, + field->name); + break; + } + + case ustctl_atype_string: + /* Default encoding is UTF8 */ + ret = lttng_metadata_printf(session, + " string%s _%s;\n", + field->type.u.basic.string.encoding == ustctl_encode_ASCII ? + " { encoding = ASCII; }" : "", + field->name); + break; + default: + return -EINVAL; + } + return ret; +} + +static +int _lttng_context_metadata_statedump(struct ust_registry_session *session, + size_t nr_ctx_fields, + struct ustctl_field *ctx) +{ + int ret = 0; + int i; + + if (!ctx) + return 0; + for (i = 0; i < nr_ctx_fields; i++) { + const struct ustctl_field *field = &ctx[i]; + + ret = _lttng_field_statedump(session, field); + if (ret) + return ret; + } + return ret; +} + +static +int _lttng_fields_metadata_statedump(struct ust_registry_session *session, + struct ust_registry_event *event) +{ + int ret = 0; + int i; + + for (i = 0; i < event->nr_fields; i++) { + const struct ustctl_field *field = &event->fields[i]; + + ret = _lttng_field_statedump(session, field); + if (ret) + return ret; + } + return ret; +} + +/* + * Should be called with session registry mutex held. + */ +int ust_metadata_event_statedump(struct ust_registry_session *session, + struct ust_registry_channel *chan, + struct ust_registry_event *event) +{ + int ret = 0; + + /* Don't dump metadata events */ + if (chan->chan_id == -1U) + return 0; + + ret = lttng_metadata_printf(session, + "event {\n" + " name = \"%s\";\n" + " id = %u;\n" + " stream_id = %u;\n", + event->name, + event->id, + chan->chan_id); + if (ret) + goto end; + + ret = lttng_metadata_printf(session, + " loglevel = %d;\n", + event->loglevel); + if (ret) + goto end; + + if (event->model_emf_uri) { + ret = lttng_metadata_printf(session, + " model.emf.uri = \"%s\";\n", + event->model_emf_uri); + if (ret) + goto end; + } + +#if 0 /* context for events not supported */ + if (event->ctx) { + ret = lttng_metadata_printf(session, + " context := struct {\n"); + if (ret) + goto end; + } + ret = _lttng_context_metadata_statedump(session, event->ctx); + if (ret) + goto end; + if (event->ctx) { + ret = lttng_metadata_printf(session, + " };\n"); + if (ret) + goto end; + } +#endif + ret = lttng_metadata_printf(session, + " fields := struct {\n" + ); + if (ret) + goto end; + + ret = _lttng_fields_metadata_statedump(session, event); + if (ret) + goto end; + + ret = lttng_metadata_printf(session, + " };\n" + "};\n\n"); + if (ret) + goto end; + +end: + return ret; +} + +/* + * Should be called with session registry mutex held. + */ +int ust_metadata_channel_statedump(struct ust_registry_session *session, + struct ust_registry_channel *chan) +{ + int ret = 0; + + /* Don't dump metadata events */ + if (chan->chan_id == -1U) + return 0; + + if (!chan->header_type) + return -EINVAL; + + ret = lttng_metadata_printf(session, + "stream {\n" + " id = %u;\n" + " event.header := %s;\n" + " packet.context := struct packet_context;\n", + chan->chan_id, + chan->header_type == USTCTL_CHANNEL_HEADER_COMPACT ? + "struct event_header_compact" : + "struct event_header_large"); + if (ret) + goto end; + + if (chan->ctx_fields) { + ret = lttng_metadata_printf(session, + " event.context := struct {\n"); + if (ret) + goto end; + } + ret = _lttng_context_metadata_statedump(session, + chan->nr_ctx_fields, + chan->ctx_fields); + if (ret) + goto end; + if (chan->ctx_fields) { + ret = lttng_metadata_printf(session, + " };\n"); + if (ret) + goto end; + } + + ret = lttng_metadata_printf(session, + "};\n\n"); + +end: + return ret; +} + +static +int _lttng_stream_packet_context_declare(struct ust_registry_session *session) +{ + return lttng_metadata_printf(session, + "struct packet_context {\n" + " uint64_clock_monotonic_t timestamp_begin;\n" + " uint64_clock_monotonic_t timestamp_end;\n" + " uint64_t content_size;\n" + " uint64_t packet_size;\n" + " unsigned long events_discarded;\n" + " uint32_t cpu_id;\n" + "};\n\n" + ); +} + +/* + * Compact header: + * id: range: 0 - 30. + * id 31 is reserved to indicate an extended header. + * + * Large header: + * id: range: 0 - 65534. + * id 65535 is reserved to indicate an extended header. + */ +static +int _lttng_event_header_declare(struct ust_registry_session *session) +{ + return lttng_metadata_printf(session, + "struct event_header_compact {\n" + " enum : uint5_t { compact = 0 ... 30, extended = 31 } id;\n" + " variant {\n" + " struct {\n" + " uint27_clock_monotonic_t timestamp;\n" + " } compact;\n" + " struct {\n" + " uint32_t id;\n" + " uint64_clock_monotonic_t timestamp;\n" + " } extended;\n" + " } v;\n" + "} align(%u);\n" + "\n" + "struct event_header_large {\n" + " enum : uint16_t { compact = 0 ... 65534, extended = 65535 } id;\n" + " variant {\n" + " struct {\n" + " uint32_clock_monotonic_t timestamp;\n" + " } compact;\n" + " struct {\n" + " uint32_t id;\n" + " uint64_clock_monotonic_t timestamp;\n" + " } extended;\n" + " } v;\n" + "} align(%u);\n\n", + session->uint32_t_alignment, + session->uint16_t_alignment + ); +} + +/* + * Approximation of NTP time of day to clock monotonic correlation, + * taken at start of trace. + * Yes, this is only an approximation. Yes, we can (and will) do better + * in future versions. + */ +static +uint64_t measure_clock_offset(void) +{ + uint64_t offset, monotonic[2], realtime; + struct timespec rts = { 0, 0 }; + int ret; + + monotonic[0] = trace_clock_read64(); + ret = clock_gettime(CLOCK_REALTIME, &rts); + if (ret < 0) + return 0; + monotonic[1] = trace_clock_read64(); + offset = (monotonic[0] + monotonic[1]) >> 1; + realtime = (uint64_t) rts.tv_sec * 1000000000ULL; + realtime += rts.tv_nsec; + offset = realtime - offset; + return offset; +} + + +/* + * Should be called with session registry mutex held. + */ +int ust_metadata_session_statedump(struct ust_registry_session *session, + struct ust_app *app) +{ + unsigned char *uuid_c; + char uuid_s[UUID_STR_LEN], + clock_uuid_s[UUID_STR_LEN]; + int ret = 0; + char hostname[HOST_NAME_MAX]; + + uuid_c = session->uuid; + + snprintf(uuid_s, sizeof(uuid_s), + "%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x", + uuid_c[0], uuid_c[1], uuid_c[2], uuid_c[3], + uuid_c[4], uuid_c[5], uuid_c[6], uuid_c[7], + uuid_c[8], uuid_c[9], uuid_c[10], uuid_c[11], + uuid_c[12], uuid_c[13], uuid_c[14], uuid_c[15]); + + ret = lttng_metadata_printf(session, + "typealias integer { size = 8; align = %u; signed = false; } := uint8_t;\n" + "typealias integer { size = 16; align = %u; signed = false; } := uint16_t;\n" + "typealias integer { size = 32; align = %u; signed = false; } := uint32_t;\n" + "typealias integer { size = 64; align = %u; signed = false; } := uint64_t;\n" + "typealias integer { size = %u; align = %u; signed = false; } := unsigned long;\n" + "typealias integer { size = 5; align = 1; signed = false; } := uint5_t;\n" + "typealias integer { size = 27; align = 1; signed = false; } := uint27_t;\n" + "\n" + "trace {\n" + " major = %u;\n" + " minor = %u;\n" + " uuid = \"%s\";\n" + " byte_order = %s;\n" + " packet.header := struct {\n" + " uint32_t magic;\n" + " uint8_t uuid[16];\n" + " uint32_t stream_id;\n" + " };\n" + "};\n\n", + session->uint8_t_alignment, + session->uint16_t_alignment, + session->uint32_t_alignment, + session->uint64_t_alignment, + session->bits_per_long, + session->long_alignment, + CTF_SPEC_MAJOR, + CTF_SPEC_MINOR, + uuid_s, + session->byte_order == BIG_ENDIAN ? "be" : "le" + ); + if (ret) + goto end; + + /* ignore error, just use empty string if error. */ + hostname[0] = '\0'; + ret = gethostname(hostname, sizeof(hostname)); + if (ret && errno == ENAMETOOLONG) + hostname[HOST_NAME_MAX - 1] = '\0'; + ret = lttng_metadata_printf(session, + "env {\n" + " hostname = \"%s\";\n" + " domain = \"ust\";\n" + " tracer_name = \"lttng-ust\";\n" + " tracer_major = %u;\n" + " tracer_minor = %u;\n" + " tracer_patchlevel = %u;\n", + hostname, + app->version.major, + app->version.minor, + app->version.patchlevel + ); + if (ret) + goto end; + + /* + * If per-application registry, we can output extra information + * about the application. + */ + if (app) { + ret = lttng_metadata_printf(session, + " vpid = %d;\n" + " procname = \"%s\";\n", + (int) app->pid, + app->name + ); + if (ret) + goto end; + } + + ret = lttng_metadata_printf(session, + "};\n\n" + ); + if (ret) + goto end; + + + ret = lttng_metadata_printf(session, + "clock {\n" + " name = %s;\n", + "monotonic" + ); + if (ret) + goto end; + + if (!trace_clock_uuid(clock_uuid_s)) { + ret = lttng_metadata_printf(session, + " uuid = \"%s\";\n", + clock_uuid_s + ); + if (ret) + goto end; + } + + ret = lttng_metadata_printf(session, + " description = \"Monotonic Clock\";\n" + " freq = %" PRIu64 "; /* Frequency, in Hz */\n" + " /* clock value offset from Epoch is: offset * (1/freq) */\n" + " offset = %" PRIu64 ";\n" + "};\n\n", + trace_clock_freq(), + measure_clock_offset() + ); + if (ret) + goto end; + + ret = lttng_metadata_printf(session, + "typealias integer {\n" + " size = 27; align = 1; signed = false;\n" + " map = clock.monotonic.value;\n" + "} := uint27_clock_monotonic_t;\n" + "\n" + "typealias integer {\n" + " size = 32; align = %u; signed = false;\n" + " map = clock.monotonic.value;\n" + "} := uint32_clock_monotonic_t;\n" + "\n" + "typealias integer {\n" + " size = 64; align = %u; signed = false;\n" + " map = clock.monotonic.value;\n" + "} := uint64_clock_monotonic_t;\n\n", + session->uint32_t_alignment, + session->uint64_t_alignment + ); + if (ret) + goto end; + + ret = _lttng_stream_packet_context_declare(session); + if (ret) + goto end; + + ret = _lttng_event_header_declare(session); + if (ret) + goto end; + +end: + return ret; +} diff --git a/src/bin/lttng-sessiond/ust-registry.c b/src/bin/lttng-sessiond/ust-registry.c new file mode 100644 index 000000000..31d33df4c --- /dev/null +++ b/src/bin/lttng-sessiond/ust-registry.c @@ -0,0 +1,372 @@ +/* + * Copyright (C) 2013 - David Goulet + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#define _GNU_SOURCE +#include + +#include +#include "ust-registry.h" + +/* + * Hash table match function for event in the registry. + */ +static int ht_match_event(struct cds_lfht_node *node, const void *_key) +{ + struct ust_registry_event *event; + const struct ust_registry_event *key; + + assert(node); + assert(_key); + + event = caa_container_of(node, struct ust_registry_event, node.node); + assert(event); + key = _key; + + /* It has to be a perfect match. */ + if (strncmp(event->name, key->name, sizeof(event->name)) != 0) { + goto no_match; + } + + /* It has to be a perfect match. */ + if (strncmp(event->signature, key->signature, + strlen(event->signature) != 0)) { + goto no_match; + } + + /* Match */ + return 1; + +no_match: + return 0; +} + +/* + * Allocate event and initialize it. This does NOT set a valid event id from a + * registry. + */ +static struct ust_registry_event *alloc_event(int session_objd, + int channel_objd, char *name, char *sig, size_t nr_fields, + struct ustctl_field *fields, int loglevel, char *model_emf_uri) +{ + struct ust_registry_event *event = NULL; + + event = zmalloc(sizeof(*event)); + if (!event) { + PERROR("zmalloc ust registry event"); + goto error; + } + + event->session_objd = session_objd; + event->channel_objd = channel_objd; + /* Allocated by ustctl. */ + event->signature = sig; + event->nr_fields = nr_fields; + event->fields = fields; + event->loglevel = loglevel; + event->model_emf_uri = model_emf_uri; + if (name) { + /* Copy event name and force NULL byte. */ + strncpy(event->name, name, sizeof(event->name)); + event->name[sizeof(event->name) - 1] = '\0'; + } + lttng_ht_node_init_str(&event->node, event->name); + +error: + return event; +} + +/* + * Free event data structure. This does NOT delete it from any hash table. It's + * safe to pass a NULL pointer. This shoudl be called inside a call RCU if the + * event is previously deleted from a rcu hash table. + */ +static void destroy_event(struct ust_registry_event *event) +{ + if (!event) { + return; + } + + free(event->fields); + free(event->model_emf_uri); + free(event->signature); + free(event); +} + +/* + * Destroy event function call of the call RCU. + */ +static void destroy_event_rcu(struct rcu_head *head) +{ + struct lttng_ht_node_str *node = + caa_container_of(head, struct lttng_ht_node_str, head); + struct ust_registry_event *event = + caa_container_of(node, struct ust_registry_event, node); + + destroy_event(event); +} + +/* + * Find an event using the name and signature in the given registry. RCU read + * side lock MUST be acquired before calling this function and as long as the + * event reference is kept by the caller. + * + * On success, the event pointer is returned else NULL. + */ +struct ust_registry_event *ust_registry_find_event( + struct ust_registry_channel *chan, char *name, char *sig) +{ + struct lttng_ht_node_str *node; + struct lttng_ht_iter iter; + struct ust_registry_event *event = NULL; + struct ust_registry_event key; + + assert(chan); + assert(name); + assert(sig); + + /* Setup key for the match function. */ + strncpy(key.name, name, sizeof(key.name)); + key.name[sizeof(key.name) - 1] = '\0'; + key.signature = sig; + + cds_lfht_lookup(chan->ht->ht, chan->ht->hash_fct(name, lttng_ht_seed), + chan->ht->match_fct, &key, &iter.iter); + node = lttng_ht_iter_get_node_str(&iter); + if (!node) { + goto end; + } + event = caa_container_of(node, struct ust_registry_event, node); + +end: + return event; +} + +/* + * Create a ust_registry_event from the given parameters and add it to the + * registry hash table. If event_id is valid, it is set with the newly created + * event id. + * + * On success, return 0 else a negative value. The created event MUST be unique + * so on duplicate entry -EINVAL is returned. On error, event_id is untouched. + * + * Should be called with session registry mutex held. + */ +int ust_registry_create_event(struct ust_registry_session *session, + struct ust_registry_channel *chan, + int session_objd, int channel_objd, char *name, char *sig, + size_t nr_fields, struct ustctl_field *fields, int loglevel, + char *model_emf_uri, uint32_t *event_id) +{ + int ret; + struct cds_lfht_node *nptr; + struct ust_registry_event *event = NULL; + + assert(session); + assert(chan); + assert(name); + assert(sig); + + /* + * This should not happen but since it comes from the UST tracer, an + * external party, don't assert and simply validate values. + */ + if (session_objd < 0 || channel_objd < 0) { + ret = -EINVAL; + goto error; + } + + /* Check if we've reached the maximum possible id. */ + if (ust_registry_is_max_id(chan->used_event_id)) { + ret = -ENOENT; + goto error; + } + + event = alloc_event(session_objd, channel_objd, name, sig, nr_fields, + fields, loglevel, model_emf_uri); + if (!event) { + ret = -ENOMEM; + goto error; + } + + event->id = ust_registry_get_next_event_id(chan); + + DBG3("UST registry creating event with event: %s, sig: %s, id: %u, " + "chan_objd: %u, sess_objd: %u", event->name, event->signature, + event->id, event->channel_objd, event->session_objd); + + rcu_read_lock(); + /* + * This is an add unique with a custom match function for event. The node + * are matched using the event name and signature. + */ + nptr = cds_lfht_add_unique(chan->ht->ht, chan->ht->hash_fct(event->node.key, + lttng_ht_seed), chan->ht->match_fct, event, &event->node.node); + if (nptr != &event->node.node) { + ERR("UST registry create event add unique failed for event: %s, " + "sig: %s, id: %u, chan_objd: %u, sess_objd: %u", event->name, + event->signature, event->id, event->channel_objd, + event->session_objd); + ret = -EINVAL; + goto error_unlock; + } + + /* Set event id if user wants it. */ + if (event_id) { + *event_id = event->id; + } + rcu_read_unlock(); + + /* Append to metadata */ + ret = ust_metadata_event_statedump(session, chan, event); + if (ret) { + ERR("Error appending event metadata (errno = %d)", ret); + return ret; + } + + return 0; + +error_unlock: + rcu_read_unlock(); +error: + destroy_event(event); + return ret; +} + +/* + * For a given event in a registry, delete the entry and destroy the event. + * This MUST be called within a RCU read side lock section. + */ +void ust_registry_destroy_event(struct ust_registry_channel *chan, + struct ust_registry_event *event) +{ + int ret; + struct lttng_ht_iter iter; + + assert(chan); + assert(event); + + /* Delete the node first. */ + iter.iter.node = &event->node.node; + ret = lttng_ht_del(chan->ht, &iter); + assert(!ret); + + call_rcu(&event->node.head, destroy_event_rcu); + + return; +} + +/* + * Initialize registry with default values. + */ +void ust_registry_channel_init(struct ust_registry_session *session, + struct ust_registry_channel *chan) +{ + assert(chan); + + memset(chan, 0, sizeof(struct ust_registry_channel)); + + chan->ht = lttng_ht_new(0, LTTNG_HT_TYPE_STRING); + assert(chan->ht); + + /* Set custom match function. */ + chan->ht->match_fct = ht_match_event; +} + +/* + * Destroy every element of the registry and free the memory. This does NOT + * free the registry pointer since it might not have been allocated before so + * it's the caller responsability. + * + * This MUST be called within a RCU read side lock section. + */ +void ust_registry_channel_destroy(struct ust_registry_session *session, + struct ust_registry_channel *chan) +{ + struct lttng_ht_iter iter; + struct ust_registry_event *event; + + assert(chan); + + /* Destroy all event associated with this registry. */ + cds_lfht_for_each_entry(chan->ht->ht, &iter.iter, event, node.node) { + /* Delete the node from the ht and free it. */ + ust_registry_destroy_event(chan, event); + } + lttng_ht_destroy(chan->ht); +} + +/* + * Initialize registry with default values. + */ +int ust_registry_session_init(struct ust_registry_session *session, + struct ust_app *app, + uint32_t bits_per_long, + uint32_t uint8_t_alignment, + uint32_t uint16_t_alignment, + uint32_t uint32_t_alignment, + uint32_t uint64_t_alignment, + uint32_t long_alignment, + int byte_order) +{ + int ret; + + assert(session); + + memset(session, 0, sizeof(struct ust_registry_session)); + + pthread_mutex_init(&session->lock, NULL); + session->bits_per_long = bits_per_long; + session->uint8_t_alignment = uint8_t_alignment; + session->uint16_t_alignment = uint16_t_alignment; + session->uint32_t_alignment = uint32_t_alignment; + session->uint64_t_alignment = uint64_t_alignment; + session->long_alignment = long_alignment; + session->byte_order = byte_order; + + ret = lttng_uuid_generate(session->uuid); + if (ret) { + ERR("Failed to generate UST uuid (errno = %d)", ret); + goto error; + } + + pthread_mutex_lock(&session->lock); + ret = ust_metadata_session_statedump(session, app); + pthread_mutex_unlock(&session->lock); + if (ret) { + ERR("Failed to generate session metadata (errno = %d)", ret); + goto error; + } + + return 0; + +error: + return -1; +} + +/* + * Destroy session registry. This does NOT free the given pointer since it + * might get passed as a reference. The registry lock should NOT be acquired. + */ +void ust_registry_session_destroy(struct ust_registry_session *reg) +{ + int ret; + + /* On error, EBUSY can be returned if lock. Code flow error. */ + ret = pthread_mutex_destroy(®->lock); + assert(!ret); + + free(reg->metadata); +} diff --git a/src/bin/lttng-sessiond/ust-registry.h b/src/bin/lttng-sessiond/ust-registry.h new file mode 100644 index 000000000..5efa0828d --- /dev/null +++ b/src/bin/lttng-sessiond/ust-registry.h @@ -0,0 +1,207 @@ +/* + * Copyright (C) 2013 - David Goulet + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef LTTNG_UST_REGISTRY_H +#define LTTNG_UST_REGISTRY_H + +#include +#include +#include + +#include +#include + +#define CTF_SPEC_MAJOR 1 +#define CTF_SPEC_MINOR 8 + +struct ust_app; + +struct ust_registry_session { + /* + * With multiple writers and readers, use this lock to access + * the registry. Use defined macros above to lock it. + * Can nest within the ust app session lock. + */ + pthread_mutex_t lock; + /* Next channel ID available for a newly registered channel. */ + uint32_t next_channel_id; + /* Once this value reaches UINT32_MAX, no more id can be allocated. */ + uint32_t used_channel_id; + /* Universal unique identifier used by the tracer. */ + unsigned char uuid[UUID_LEN]; + + /* session ABI description */ + + /* Size of long, in bits */ + unsigned int bits_per_long; + /* Alignment, in bits */ + unsigned int uint8_t_alignment, + uint16_t_alignment, + uint32_t_alignment, + uint64_t_alignment, + long_alignment; + /* endianness */ + int byte_order; /* BIG_ENDIAN or LITTLE_ENDIAN */ + + /* Generated metadata. */ + char *metadata; /* NOT null-terminated ! Use memcpy. */ + size_t metadata_len, metadata_alloc_len; + /* Length of bytes sent to the consumer. */ + size_t metadata_len_sent; +}; + +struct ust_registry_channel { + /* Id set when replying to a register channel. */ + uint32_t chan_id; + enum ustctl_channel_header header_type; + + /* + * Hash table containing events sent by the UST tracer. MUST be accessed + * with a RCU read side lock acquired. + */ + struct lttng_ht *ht; + /* Next event ID available for a newly registered event. */ + uint32_t next_event_id; + /* Once this value reaches UINT32_MAX, no more id can be allocated. */ + uint32_t used_event_id; + /* + * Context fields of the registry. Context are per channel. Allocated by a + * register channel notification from the UST tracer. + */ + size_t nr_ctx_fields; + struct ustctl_field *ctx_fields; +}; + +/* + * Event registered from a UST tracer sent to the session daemon. This is + * indexed and matched by . + */ +struct ust_registry_event { + int id; + /* Both objd are set by the tracer. */ + int session_objd; + int channel_objd; + /* Name of the event returned by the tracer. */ + char name[LTTNG_UST_SYM_NAME_LEN]; + char *signature; + int loglevel; + size_t nr_fields; + struct ustctl_field *fields; + char *model_emf_uri; + /* + * Node in the ust-registry hash table. The event name is used to + * initialize the node and the event_name/signature for the match function. + */ + struct lttng_ht_node_str node; +}; + +/* + * Validate that the id has reached the maximum allowed or not. + * + * Return 0 if NOT else 1. + */ +static inline int ust_registry_is_max_id(uint32_t id) +{ + return (id == UINT32_MAX) ? 1 : 0; +} + +/* + * Return next available event id and increment the used counter. The + * ust_registry_is_max_id function MUST be called before in order to validate + * if the maximum number of IDs have been reached. If not, it is safe to call + * this function. + * + * Return a unique channel ID. If max is reached, the used_event_id counter is + * returned. + */ +static inline uint32_t ust_registry_get_next_event_id( + struct ust_registry_channel *r) +{ + if (ust_registry_is_max_id(r->used_event_id)) { + return r->used_event_id; + } + + r->used_event_id++; + return r->next_event_id++; +} + +/* + * Return next available channel id and increment the used counter. The + * ust_registry_is_max_id function MUST be called before in order to validate + * if the maximum number of IDs have been reached. If not, it is safe to call + * this function. + * + * Return a unique channel ID. If max is reached, the used_channel_id counter + * is returned. + */ +static inline uint32_t ust_registry_get_next_chan_id( + struct ust_registry_session *r) +{ + if (ust_registry_is_max_id(r->used_channel_id)) { + return r->used_channel_id; + } + + r->used_channel_id++; + return r->next_channel_id++; +} + +/* + * Return registry event count. This is read atomically. + */ +static inline uint32_t ust_registry_get_event_count( + struct ust_registry_channel *r) +{ + return (uint32_t) uatomic_read(&r->used_event_id); +} + +void ust_registry_channel_init(struct ust_registry_session *session, + struct ust_registry_channel *chan); +void ust_registry_channel_destroy(struct ust_registry_session *session, + struct ust_registry_channel *chan); + +int ust_registry_session_init(struct ust_registry_session *session, + struct ust_app *app, + uint32_t bits_per_long, + uint32_t uint8_t_alignment, + uint32_t uint16_t_alignment, + uint32_t uint32_t_alignment, + uint32_t uint64_t_alignment, + uint32_t long_alignment, + int byte_order); + +void ust_registry_session_destroy(struct ust_registry_session *session); + +int ust_registry_create_event(struct ust_registry_session *session, + struct ust_registry_channel *channel, + int session_objd, int channel_objd, char *name, char *sig, + size_t nr_fields, struct ustctl_field *fields, int loglevel, + char *model_emf_uri, uint32_t *event_id); +struct ust_registry_event *ust_registry_find_event( + struct ust_registry_channel *chan, char *name, char *sig); +void ust_registry_destroy_event(struct ust_registry_channel *chan, + struct ust_registry_event *event); + +/* app can be NULL for registry shared across applications. */ +int ust_metadata_session_statedump(struct ust_registry_session *session, + struct ust_app *app); +int ust_metadata_channel_statedump(struct ust_registry_session *session, + struct ust_registry_channel *chan); +int ust_metadata_event_statedump(struct ust_registry_session *session, + struct ust_registry_channel *chan, + struct ust_registry_event *event); + +#endif /* LTTNG_UST_REGISTRY_H */ diff --git a/src/bin/lttng-sessiond/ust-thread.c b/src/bin/lttng-sessiond/ust-thread.c new file mode 100644 index 000000000..552b7ddd8 --- /dev/null +++ b/src/bin/lttng-sessiond/ust-thread.c @@ -0,0 +1,164 @@ +/* + * Copyright (C) 2013 - David Goulet + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#define _GNU_SOURCE +#include + +#include +#include + +#include "fd-limit.h" +#include "lttng-sessiond.h" +#include "ust-thread.h" + +/* + * This thread manage application notify communication. + */ +void *ust_thread_manage_notify(void *data) +{ + int i, ret, pollfd; + uint32_t revents, nb_fd; + struct lttng_poll_event events; + + DBG("[ust-thread] Manage application notify command"); + + rcu_register_thread(); + rcu_thread_online(); + + ret = sessiond_set_thread_pollset(&events, 2); + if (ret < 0) { + goto error_poll_create; + } + + /* Add notify pipe to the pollset. */ + ret = lttng_poll_add(&events, apps_cmd_notify_pipe[0], LPOLLIN | LPOLLERR); + if (ret < 0) { + goto error; + } + + while (1) { + DBG3("[ust-thread] Manage notify polling on %d fds", + LTTNG_POLL_GETNB(&events)); + + /* Inifinite blocking call, waiting for transmission */ +restart: + ret = lttng_poll_wait(&events, -1); + if (ret < 0) { + /* + * Restart interrupted system call. + */ + if (errno == EINTR) { + goto restart; + } + goto error; + } + + nb_fd = ret; + + for (i = 0; i < nb_fd; i++) { + /* Fetch once the poll data */ + revents = LTTNG_POLL_GETEV(&events, i); + pollfd = LTTNG_POLL_GETFD(&events, i); + + /* Thread quit pipe has been closed. Killing thread. */ + ret = sessiond_check_thread_quit_pipe(pollfd, revents); + if (ret) { + goto exit; + } + + /* Inspect the apps cmd pipe */ + if (pollfd == apps_cmd_notify_pipe[0]) { + int sock; + + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Apps notify command pipe error"); + goto error; + } else if (!(revents & LPOLLIN)) { + /* No POLLIN and not a catched error, stop the thread. */ + ERR("Notify command pipe failed. revent: %u", revents); + goto error; + } + + do { + /* Get socket from dispatch thread. */ + ret = read(apps_cmd_notify_pipe[0], &sock, sizeof(sock)); + } while (ret < 0 && errno == EINTR); + if (ret < 0 || ret < sizeof(sock)) { + PERROR("read apps notify pipe"); + goto error; + } + + ret = lttng_poll_add(&events, sock, + LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP); + if (ret < 0) { + /* + * It's possible we've reached the max poll fd allowed. + * Let's close the socket but continue normal execution. + */ + ret = close(sock); + if (ret) { + PERROR("close notify socket %d", sock); + } + lttng_fd_put(LTTNG_FD_APPS, 1); + continue; + } + DBG3("UST thread notify added sock %d to pollset", sock); + } else { + /* + * At this point, we know that a registered application + * triggered the event. + */ + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + /* Removing from the poll set */ + ret = lttng_poll_del(&events, pollfd); + if (ret < 0) { + goto error; + } + + /* The socket is closed after a grace period here. */ + ust_app_notify_sock_unregister(pollfd); + } else if (revents & (LPOLLIN | LPOLLPRI)) { + ret = ust_app_recv_notify(pollfd); + if (ret < 0) { + /* + * If the notification failed either the application is + * dead or an internal error happened. In both cases, + * we can only continue here. If the application is + * dead, an unregistration will follow or else the + * application will notice that we are not responding + * on that socket and will close it. + */ + continue; + } + } else { + ERR("Unknown poll events %u for sock %d", revents, pollfd); + continue; + } + } + } + } + +exit: +error: + lttng_poll_clean(&events); +error_poll_create: + utils_close_pipe(apps_cmd_notify_pipe); + apps_cmd_notify_pipe[0] = apps_cmd_notify_pipe[1] = -1; + DBG("Application notify communication apps thread cleanup complete"); + rcu_thread_offline(); + rcu_unregister_thread(); + return NULL; +} diff --git a/src/bin/lttng-sessiond/ust-thread.h b/src/bin/lttng-sessiond/ust-thread.h new file mode 100644 index 000000000..0292df983 --- /dev/null +++ b/src/bin/lttng-sessiond/ust-thread.h @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2013 - David Goulet + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef UST_THREAD_H +#define UST_THREAD_H + +#ifdef HAVE_LIBLTTNG_UST_CTL + +void *ust_thread_manage_notify(void *data); + +#else /* HAVE_LIBLTTNG_UST_CTL */ + +void *ust_thread_manage_notify(void *data) +{ + return NULL; +} + +#endif /* HAVE_LIBLTTNG_UST_CTL */ + +#endif /* UST_THREAD_H */ diff --git a/src/common/compat/uuid.h b/src/common/compat/uuid.h index 5bf8beb40..35faf5388 100644 --- a/src/common/compat/uuid.h +++ b/src/common/compat/uuid.h @@ -28,10 +28,14 @@ * Includes final \0. */ #define UUID_STR_LEN 37 +#define UUID_LEN 16 #ifdef LTTNG_HAVE_LIBUUID #include +/* + * uuid_out is of len UUID_LEN. + */ static inline int lttng_uuid_generate(unsigned char *uuid_out) { @@ -43,6 +47,9 @@ int lttng_uuid_generate(unsigned char *uuid_out) #include #include +/* + * uuid_out is of len UUID_LEN. + */ static inline int lttng_uuid_generate(unsigned char *uuid_out) { diff --git a/src/common/consumer.c b/src/common/consumer.c index 09b3bee33..300fd2a2f 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -47,6 +47,16 @@ struct lttng_consumer_global_data consumer_data = { .type = LTTNG_CONSUMER_UNKNOWN, }; +enum consumer_channel_action { + CONSUMER_CHANNEL_ADD, + CONSUMER_CHANNEL_QUIT, +}; + +struct consumer_channel_msg { + enum consumer_channel_action action; + struct lttng_consumer_channel *chan; +}; + /* * Flag to inform the polling thread to quit when all fd hung up. Updated by * the consumer_thread_receive_fds when it notices that all fds has hung up. @@ -78,28 +88,59 @@ static void notify_thread_pipe(int wpipe) } while (ret < 0 && errno == EINTR); } +static void notify_channel_pipe(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_channel *chan, + enum consumer_channel_action action) +{ + struct consumer_channel_msg msg; + int ret; + + msg.action = action; + msg.chan = chan; + do { + ret = write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg)); + } while (ret < 0 && errno == EINTR); +} + +static int read_channel_pipe(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_channel **chan, + enum consumer_channel_action *action) +{ + struct consumer_channel_msg msg; + int ret; + + do { + ret = read(ctx->consumer_channel_pipe[0], &msg, sizeof(msg)); + } while (ret < 0 && errno == EINTR); + if (ret > 0) { + *action = msg.action; + *chan = msg.chan; + } + return ret; +} + /* * Find a stream. The consumer_data.lock must be locked during this * call. */ -static struct lttng_consumer_stream *find_stream(int key, +static struct lttng_consumer_stream *find_stream(uint64_t key, struct lttng_ht *ht) { struct lttng_ht_iter iter; - struct lttng_ht_node_ulong *node; + struct lttng_ht_node_u64 *node; struct lttng_consumer_stream *stream = NULL; assert(ht); - /* Negative keys are lookup failures */ - if (key < 0) { + /* -1ULL keys are lookup failures */ + if (key == (uint64_t) -1ULL) { return NULL; } rcu_read_lock(); - lttng_ht_lookup(ht, (void *)((unsigned long) key), &iter); - node = lttng_ht_iter_get_node_ulong(&iter); + lttng_ht_lookup(ht, &key, &iter); + node = lttng_ht_iter_get_node_u64(&iter); if (node != NULL) { stream = caa_container_of(node, struct lttng_consumer_stream, node); } @@ -116,13 +157,13 @@ static void steal_stream_key(int key, struct lttng_ht *ht) rcu_read_lock(); stream = find_stream(key, ht); if (stream) { - stream->key = -1; + stream->key = -1ULL; /* * We don't want the lookup to match, but we still need * to iterate on this stream when iterating over the hash table. Just * change the node key. */ - stream->node.key = -1; + stream->node.key = -1ULL; } rcu_read_unlock(); } @@ -133,19 +174,19 @@ static void steal_stream_key(int key, struct lttng_ht *ht) * RCU read side lock MUST be acquired before calling this function and * protects the channel ptr. */ -struct lttng_consumer_channel *consumer_find_channel(unsigned long key) +struct lttng_consumer_channel *consumer_find_channel(uint64_t key) { struct lttng_ht_iter iter; - struct lttng_ht_node_ulong *node; + struct lttng_ht_node_u64 *node; struct lttng_consumer_channel *channel = NULL; - /* Negative keys are lookup failures */ - if (key < 0) { + /* -1ULL keys are lookup failures */ + if (key == (uint64_t) -1ULL) { return NULL; } - lttng_ht_lookup(consumer_data.channel_ht, (void *) key, &iter); - node = lttng_ht_iter_get_node_ulong(&iter); + lttng_ht_lookup(consumer_data.channel_ht, &key, &iter); + node = lttng_ht_iter_get_node_u64(&iter); if (node != NULL) { channel = caa_container_of(node, struct lttng_consumer_channel, node); } @@ -155,8 +196,8 @@ struct lttng_consumer_channel *consumer_find_channel(unsigned long key) static void free_stream_rcu(struct rcu_head *head) { - struct lttng_ht_node_ulong *node = - caa_container_of(head, struct lttng_ht_node_ulong, head); + struct lttng_ht_node_u64 *node = + caa_container_of(head, struct lttng_ht_node_u64, head); struct lttng_consumer_stream *stream = caa_container_of(node, struct lttng_consumer_stream, node); @@ -165,8 +206,8 @@ static void free_stream_rcu(struct rcu_head *head) static void free_channel_rcu(struct rcu_head *head) { - struct lttng_ht_node_ulong *node = - caa_container_of(head, struct lttng_ht_node_ulong, head); + struct lttng_ht_node_u64 *node = + caa_container_of(head, struct lttng_ht_node_u64, head); struct lttng_consumer_channel *channel = caa_container_of(node, struct lttng_consumer_channel, node); @@ -178,8 +219,8 @@ static void free_channel_rcu(struct rcu_head *head) */ static void free_relayd_rcu(struct rcu_head *head) { - struct lttng_ht_node_ulong *node = - caa_container_of(head, struct lttng_ht_node_ulong, head); + struct lttng_ht_node_u64 *node = + caa_container_of(head, struct lttng_ht_node_u64, head); struct consumer_relayd_sock_pair *relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node); @@ -233,7 +274,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) int ret; struct lttng_ht_iter iter; - DBG("Consumer delete channel key %d", channel->key); + DBG("Consumer delete channel key %" PRIu64, channel->key); pthread_mutex_lock(&consumer_data.lock); @@ -425,7 +466,10 @@ void consumer_del_stream(struct lttng_consumer_stream *stream, ret = lttng_ht_del(ht, &iter); assert(!ret); - /* Remove node session id from the consumer_data stream ht */ + iter.iter.node = &stream->node_channel_id.node; + ret = lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter); + assert(!ret); + iter.iter.node = &stream->node_session_id.node; ret = lttng_ht_del(consumer_data.stream_list_ht, &iter); assert(!ret); @@ -490,8 +534,8 @@ free_stream_rcu: call_rcu(&stream->node.head, free_stream_rcu); } -struct lttng_consumer_stream *consumer_allocate_stream(int channel_key, - int stream_key, +struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, + uint64_t stream_key, enum lttng_consumer_stream_state state, const char *channel_name, uid_t uid, @@ -540,13 +584,16 @@ struct lttng_consumer_stream *consumer_allocate_stream(int channel_key, } /* Key is always the wait_fd for streams. */ - lttng_ht_node_init_ulong(&stream->node, stream->key); + lttng_ht_node_init_u64(&stream->node, stream->key); + + /* Init node per channel id key */ + lttng_ht_node_init_u64(&stream->node_channel_id, channel_key); /* Init session id node with the stream session id */ - lttng_ht_node_init_ulong(&stream->node_session_id, stream->session_id); + lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id); - DBG3("Allocated stream %s (key %d, relayd_id %d, session_id %" PRIu64, - stream->name, stream->key, stream->net_seq_idx, stream->session_id); + DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64 " relayd_id %" PRIu64 ", session_id %" PRIu64, + stream->name, stream->key, channel_key, stream->net_seq_idx, stream->session_id); rcu_read_unlock(); return stream; @@ -573,7 +620,7 @@ static int add_stream(struct lttng_consumer_stream *stream, assert(stream); assert(ht); - DBG3("Adding consumer stream %d", stream->key); + DBG3("Adding consumer stream %" PRIu64, stream->key); pthread_mutex_lock(&consumer_data.lock); pthread_mutex_lock(&stream->lock); @@ -582,14 +629,17 @@ static int add_stream(struct lttng_consumer_stream *stream, /* Steal stream identifier to avoid having streams with the same key */ steal_stream_key(stream->key, ht); - lttng_ht_add_unique_ulong(ht, &stream->node); + lttng_ht_add_unique_u64(ht, &stream->node); + + lttng_ht_add_u64(consumer_data.stream_per_chan_id_ht, + &stream->node_channel_id); /* * Add stream to the stream_list_ht of the consumer data. No need to steal * the key since the HT does not use it and we allow to add redundant keys * into this table. */ - lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id); + lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id); /* Check and cleanup relayd */ relayd = consumer_find_relayd(stream->net_seq_idx); @@ -629,18 +679,18 @@ static int add_stream(struct lttng_consumer_stream *stream, static int add_relayd(struct consumer_relayd_sock_pair *relayd) { int ret = 0; - struct lttng_ht_node_ulong *node; + struct lttng_ht_node_u64 *node; struct lttng_ht_iter iter; assert(relayd); lttng_ht_lookup(consumer_data.relayd_ht, - (void *)((unsigned long) relayd->net_seq_idx), &iter); - node = lttng_ht_iter_get_node_ulong(&iter); + &relayd->net_seq_idx, &iter); + node = lttng_ht_iter_get_node_u64(&iter); if (node != NULL) { goto end; } - lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node); + lttng_ht_add_unique_u64(consumer_data.relayd_ht, &relayd->node); end: return ret; @@ -668,7 +718,7 @@ struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair( obj->net_seq_idx = net_seq_idx; obj->refcount = 0; obj->destroy_flag = 0; - lttng_ht_node_init_ulong(&obj->node, obj->net_seq_idx); + lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx); pthread_mutex_init(&obj->ctrl_sock_mutex, NULL); error: @@ -682,20 +732,20 @@ error: * RCU read-side lock must be held across this call and while using the * returned object. */ -struct consumer_relayd_sock_pair *consumer_find_relayd(int key) +struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key) { struct lttng_ht_iter iter; - struct lttng_ht_node_ulong *node; + struct lttng_ht_node_u64 *node; struct consumer_relayd_sock_pair *relayd = NULL; /* Negative keys are lookup failures */ - if (key < 0) { + if (key == (uint64_t) -1ULL) { goto error; } - lttng_ht_lookup(consumer_data.relayd_ht, (void *)((unsigned long) key), + lttng_ht_lookup(consumer_data.relayd_ht, &key, &iter); - node = lttng_ht_iter_get_node_ulong(&iter); + node = lttng_ht_iter_get_node_u64(&iter); if (node != NULL) { relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node); } @@ -801,10 +851,13 @@ struct lttng_consumer_channel *consumer_allocate_channel(unsigned long key, strncpy(channel->name, name, sizeof(channel->name)); channel->name[sizeof(channel->name) - 1] = '\0'; - lttng_ht_node_init_ulong(&channel->node, channel->key); + lttng_ht_node_init_u64(&channel->node, channel->key); + + channel->wait_fd = -1; + CDS_INIT_LIST_HEAD(&channel->streams.head); - DBG("Allocated channel (key %d)", channel->key) + DBG("Allocated channel (key %" PRIu64 ")", channel->key) end: return channel; @@ -813,31 +866,37 @@ end: /* * Add a channel to the global list protected by a mutex. */ -int consumer_add_channel(struct lttng_consumer_channel *channel) +int consumer_add_channel(struct lttng_consumer_channel *channel, + struct lttng_consumer_local_data *ctx) { int ret = 0; - struct lttng_ht_node_ulong *node; + struct lttng_ht_node_u64 *node; struct lttng_ht_iter iter; pthread_mutex_lock(&consumer_data.lock); rcu_read_lock(); lttng_ht_lookup(consumer_data.channel_ht, - (void *)((unsigned long) channel->key), &iter); - node = lttng_ht_iter_get_node_ulong(&iter); + &channel->key, &iter); + node = lttng_ht_iter_get_node_u64(&iter); if (node != NULL) { /* Channel already exist. Ignore the insertion */ - ERR("Consumer add channel key %d already exists!", channel->key); + ERR("Consumer add channel key %" PRIu64 " already exists!", + channel->key); ret = -1; goto end; } - lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node); + lttng_ht_add_unique_u64(consumer_data.channel_ht, &channel->node); end: rcu_read_unlock(); pthread_mutex_unlock(&consumer_data.lock); + if (!ret && channel->wait_fd != -1 && + channel->metadata_stream == NULL) { + notify_channel_pipe(ctx, channel, CONSUMER_CHANNEL_ADD); + } return ret; } @@ -978,6 +1037,8 @@ void lttng_consumer_cleanup(void) cleanup_relayd_ht(); + lttng_ht_destroy(consumer_data.stream_per_chan_id_ht); + /* * This HT contains streams that are freed by either the metadata thread or * the data thread so we do *nothing* on the hash table and simply destroy @@ -1062,7 +1123,7 @@ struct lttng_consumer_local_data *lttng_consumer_create( int (*recv_stream)(struct lttng_consumer_stream *stream), int (*update_stream)(int stream_key, uint32_t state)) { - int ret, i; + int ret; struct lttng_consumer_local_data *ctx; assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN || @@ -1114,6 +1175,12 @@ struct lttng_consumer_local_data *lttng_consumer_create( goto error_thread_pipe; } + ret = pipe(ctx->consumer_channel_pipe); + if (ret < 0) { + PERROR("Error creating channel pipe"); + goto error_channel_pipe; + } + ret = utils_create_pipe(ctx->consumer_metadata_pipe); if (ret < 0) { goto error_metadata_pipe; @@ -1129,26 +1196,14 @@ struct lttng_consumer_local_data *lttng_consumer_create( error_splice_pipe: utils_close_pipe(ctx->consumer_metadata_pipe); error_metadata_pipe: + utils_close_pipe(ctx->consumer_channel_pipe); +error_channel_pipe: utils_close_pipe(ctx->consumer_thread_pipe); error_thread_pipe: - for (i = 0; i < 2; i++) { - int err; - - err = close(ctx->consumer_should_quit[i]); - if (err) { - PERROR("close"); - } - } + utils_close_pipe(ctx->consumer_should_quit); error_poll_fcntl: error_quit_pipe: - for (i = 0; i < 2; i++) { - int err; - - err = close(ctx->consumer_data_pipe[i]); - if (err) { - PERROR("close"); - } - } + utils_close_pipe(ctx->consumer_data_pipe); error_poll_pipe: free(ctx); error: @@ -1168,30 +1223,10 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) if (ret) { PERROR("close"); } - ret = close(ctx->consumer_thread_pipe[0]); - if (ret) { - PERROR("close"); - } - ret = close(ctx->consumer_thread_pipe[1]); - if (ret) { - PERROR("close"); - } - ret = close(ctx->consumer_data_pipe[0]); - if (ret) { - PERROR("close"); - } - ret = close(ctx->consumer_data_pipe[1]); - if (ret) { - PERROR("close"); - } - ret = close(ctx->consumer_should_quit[0]); - if (ret) { - PERROR("close"); - } - ret = close(ctx->consumer_should_quit[1]); - if (ret) { - PERROR("close"); - } + utils_close_pipe(ctx->consumer_thread_pipe); + utils_close_pipe(ctx->consumer_channel_pipe); + utils_close_pipe(ctx->consumer_data_pipe); + utils_close_pipe(ctx->consumer_should_quit); utils_close_pipe(ctx->consumer_splice_metadata_pipe); unlink(ctx->consumer_command_sock_path); @@ -1634,7 +1669,6 @@ int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream) assert(0); return -ENOSYS; } - } /* @@ -1728,6 +1762,32 @@ static void destroy_stream_ht(struct lttng_ht *ht) lttng_ht_destroy(ht); } +void lttng_consumer_close_metadata(void) +{ + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + /* + * The Kernel consumer has a different metadata scheme so we don't + * close anything because the stream will be closed by the session + * daemon. + */ + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + /* + * Close all metadata streams. The metadata hash table is passed and + * this call iterates over it by closing all wakeup fd. This is safe + * because at this point we are sure that the metadata producer is + * either dead or blocked. + */ + lttng_ustconsumer_close_metadata(metadata_ht); + break; + default: + ERR("Unknown consumer_data type"); + assert(0); + } +} + /* * Clean up a metadata stream and free its memory. */ @@ -1780,7 +1840,10 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, ret = lttng_ht_del(ht, &iter); assert(!ret); - /* Remove node session id from the consumer_data stream ht */ + iter.iter.node = &stream->node_channel_id.node; + ret = lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter); + assert(!ret); + iter.iter.node = &stream->node_session_id.node; ret = lttng_ht_del(consumer_data.stream_list_ht, &iter); assert(!ret); @@ -1852,12 +1915,12 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream, int ret = 0; struct consumer_relayd_sock_pair *relayd; struct lttng_ht_iter iter; - struct lttng_ht_node_ulong *node; + struct lttng_ht_node_u64 *node; assert(stream); assert(ht); - DBG3("Adding metadata stream %d to hash table", stream->key); + DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key); pthread_mutex_lock(&consumer_data.lock); pthread_mutex_lock(&stream->lock); @@ -1873,8 +1936,8 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream, * Lookup the stream just to make sure it does not exist in our internal * state. This should NEVER happen. */ - lttng_ht_lookup(ht, (void *)((unsigned long) stream->key), &iter); - node = lttng_ht_iter_get_node_ulong(&iter); + lttng_ht_lookup(ht, &stream->key, &iter); + node = lttng_ht_iter_get_node_u64(&iter); assert(!node); /* Find relayd and, if one is found, increment refcount. */ @@ -1897,14 +1960,17 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream, uatomic_dec(&stream->chan->nb_init_stream_left); } - lttng_ht_add_unique_ulong(ht, &stream->node); + lttng_ht_add_unique_u64(ht, &stream->node); + + lttng_ht_add_unique_u64(consumer_data.stream_per_chan_id_ht, + &stream->node_channel_id); /* * Add stream to the stream_list_ht of the consumer data. No need to steal * the key since the HT does not use it and we allow to add redundant keys * into this table. */ - lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id); + lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id); rcu_read_unlock(); @@ -1976,17 +2042,17 @@ void *consumer_thread_metadata_poll(void *data) uint32_t revents, nb_fd; struct lttng_consumer_stream *stream = NULL; struct lttng_ht_iter iter; - struct lttng_ht_node_ulong *node; + struct lttng_ht_node_u64 *node; struct lttng_poll_event events; struct lttng_consumer_local_data *ctx = data; ssize_t len; rcu_register_thread(); - metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (!metadata_ht) { /* ENOMEM at this point. Better to bail out. */ - goto error; + goto end_ht; } DBG("Thread metadata poll started"); @@ -1995,7 +2061,7 @@ void *consumer_thread_metadata_poll(void *data) ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC); if (ret < 0) { ERR("Poll set creation failed"); - goto end; + goto end_poll; } ret = lttng_poll_add(&events, ctx->consumer_metadata_pipe[0], LPOLLIN); @@ -2092,9 +2158,12 @@ restart: } rcu_read_lock(); - lttng_ht_lookup(metadata_ht, (void *)((unsigned long) pollfd), - &iter); - node = lttng_ht_iter_get_node_ulong(&iter); + { + uint64_t tmp_id = (uint64_t) pollfd; + + lttng_ht_lookup(metadata_ht, &tmp_id, &iter); + } + node = lttng_ht_iter_get_node_u64(&iter); assert(node); stream = caa_container_of(node, struct lttng_consumer_stream, @@ -2151,10 +2220,11 @@ restart: error: end: DBG("Metadata poll thread exiting"); - lttng_poll_clean(&events); + lttng_poll_clean(&events); +end_poll: destroy_stream_ht(metadata_ht); - +end_ht: rcu_unregister_thread(); return NULL; } @@ -2176,7 +2246,7 @@ void *consumer_thread_data_poll(void *data) rcu_register_thread(); - data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (data_ht == NULL) { /* ENOMEM at this point. Better to bail out. */ goto end; @@ -2285,7 +2355,7 @@ void *consumer_thread_data_poll(void *data) ret = add_stream(new_stream, data_ht); if (ret) { - ERR("Consumer add stream %d failed. Continuing", + ERR("Consumer add stream %" PRIu64 " failed. Continuing", new_stream->key); /* * At this point, if the add_stream fails, it is not in the @@ -2417,6 +2487,222 @@ end: return NULL; } +/* + * Close wake-up end of each stream belonging to the channel. This will + * allow the poll() on the stream read-side to detect when the + * write-side (application) finally closes them. + */ +static +void consumer_close_channel_streams(struct lttng_consumer_channel *channel) +{ + struct lttng_ht *ht; + struct lttng_consumer_stream *stream; + struct lttng_ht_iter iter; + + ht = consumer_data.stream_per_chan_id_ht; + + rcu_read_lock(); + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct, &channel->key, + &iter.iter, stream, node_channel_id.node) { + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + /* + * Note: a mutex is taken internally within + * liblttng-ust-ctl to protect timer wakeup_fd + * use from concurrent close. + */ + lttng_ustconsumer_close_stream_wakeup(stream); + break; + default: + ERR("Unknown consumer_data type"); + assert(0); + } + } + rcu_read_unlock(); +} + +static void destroy_channel_ht(struct lttng_ht *ht) +{ + struct lttng_ht_iter iter; + struct lttng_consumer_channel *channel; + int ret; + + if (ht == NULL) { + return; + } + + rcu_read_lock(); + cds_lfht_for_each_entry(ht->ht, &iter.iter, channel, wait_fd_node.node) { + ret = lttng_ht_del(ht, &iter); + assert(ret != 0); + } + rcu_read_unlock(); + + lttng_ht_destroy(ht); +} + +/* + * This thread polls the channel fds to detect when they are being + * closed. It closes all related streams if the channel is detected as + * closed. It is currently only used as a shim layer for UST because the + * consumerd needs to keep the per-stream wakeup end of pipes open for + * periodical flush. + */ +void *consumer_thread_channel_poll(void *data) +{ + int ret, i, pollfd; + uint32_t revents, nb_fd; + struct lttng_consumer_channel *chan = NULL; + struct lttng_ht_iter iter; + struct lttng_ht_node_u64 *node; + struct lttng_poll_event events; + struct lttng_consumer_local_data *ctx = data; + struct lttng_ht *channel_ht; + + rcu_register_thread(); + + channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + if (!channel_ht) { + /* ENOMEM at this point. Better to bail out. */ + goto end_ht; + } + + DBG("Thread channel poll started"); + + /* Size is set to 1 for the consumer_channel pipe */ + ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC); + if (ret < 0) { + ERR("Poll set creation failed"); + goto end_poll; + } + + ret = lttng_poll_add(&events, ctx->consumer_channel_pipe[0], LPOLLIN); + if (ret < 0) { + goto end; + } + + /* Main loop */ + DBG("Channel main loop started"); + + while (1) { + /* Only the channel pipe is set */ + if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) { + goto end; + } + +restart: + DBG("Channel poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events)); + ret = lttng_poll_wait(&events, -1); + DBG("Channel event catched in thread"); + if (ret < 0) { + if (errno == EINTR) { + ERR("Poll EINTR catched"); + goto restart; + } + goto end; + } + + nb_fd = ret; + + /* From here, the event is a channel wait fd */ + for (i = 0; i < nb_fd; i++) { + revents = LTTNG_POLL_GETEV(&events, i); + pollfd = LTTNG_POLL_GETFD(&events, i); + + /* Just don't waste time if no returned events for the fd */ + if (!revents) { + continue; + } + if (pollfd == ctx->consumer_channel_pipe[0]) { + if (revents & (LPOLLERR | LPOLLHUP)) { + DBG("Channel thread pipe hung up"); + /* + * Remove the pipe from the poll set and continue the loop + * since their might be data to consume. + */ + lttng_poll_del(&events, ctx->consumer_channel_pipe[0]); + continue; + } else if (revents & LPOLLIN) { + enum consumer_channel_action action; + + ret = read_channel_pipe(ctx, &chan, &action); + if (ret <= 0) { + ERR("Error reading channel pipe"); + continue; + } + + switch (action) { + case CONSUMER_CHANNEL_ADD: + DBG("Adding channel %d to poll set", + chan->wait_fd); + + lttng_ht_node_init_u64(&chan->wait_fd_node, + chan->wait_fd); + lttng_ht_add_unique_u64(channel_ht, + &chan->wait_fd_node); + /* Add channel to the global poll events list */ + lttng_poll_add(&events, chan->wait_fd, + LPOLLIN | LPOLLPRI); + break; + case CONSUMER_CHANNEL_QUIT: + /* + * Remove the pipe from the poll set and continue the loop + * since their might be data to consume. + */ + lttng_poll_del(&events, ctx->consumer_channel_pipe[0]); + continue; + default: + ERR("Unknown action"); + break; + } + } + + /* Handle other stream */ + continue; + } + + rcu_read_lock(); + { + uint64_t tmp_id = (uint64_t) pollfd; + + lttng_ht_lookup(channel_ht, &tmp_id, &iter); + } + node = lttng_ht_iter_get_node_u64(&iter); + assert(node); + + chan = caa_container_of(node, struct lttng_consumer_channel, + wait_fd_node); + + /* Check for error event */ + if (revents & (LPOLLERR | LPOLLHUP)) { + DBG("Channel fd %d is hup|err.", pollfd); + + lttng_poll_del(&events, chan->wait_fd); + ret = lttng_ht_del(channel_ht, &iter); + assert(ret == 0); + consumer_close_channel_streams(chan); + } + + /* Release RCU lock for the channel looked up */ + rcu_read_unlock(); + } + } + +end: + lttng_poll_clean(&events); +end_poll: + destroy_channel_ht(channel_ht); +end_ht: + DBG("Channel poll thread exiting"); + rcu_unregister_thread(); + return NULL; +} + /* * This thread listens on the consumerd socket and receives the file * descriptors from the session daemon. @@ -2521,6 +2807,14 @@ void *consumer_thread_sessiond_poll(void *data) end: DBG("Consumer thread sessiond poll exiting"); + /* + * Close metadata streams since the producer is the session daemon which + * just died. + * + * NOTE: for now, this only applies to the UST tracer. + */ + lttng_consumer_close_metadata(); + /* * when all fds have hung up, the polling thread * can exit cleanly @@ -2533,6 +2827,8 @@ end: */ notify_thread_pipe(ctx->consumer_data_pipe[1]); + notify_channel_pipe(ctx, NULL, CONSUMER_CHANNEL_QUIT); + /* Cleaning up possibly open sockets. */ if (sock >= 0) { ret = close(sock); @@ -2597,9 +2893,10 @@ int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream) */ void lttng_consumer_init(void) { - consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); - consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); - consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + consumer_data.stream_per_chan_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); } /* @@ -2730,7 +3027,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, goto error; } - DBG("Consumer %s socket created successfully with net idx %d (fd: %d)", + DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)", sock_type == LTTNG_STREAM_CONTROL ? "control" : "data", relayd->net_seq_idx, fd); @@ -2867,8 +3164,8 @@ int consumer_data_pending(uint64_t id) } cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct((void *)((unsigned long) id), lttng_ht_seed), - ht->match_fct, (void *)((unsigned long) id), + ht->hash_fct(&id, lttng_ht_seed), + ht->match_fct, &id, &iter.iter, stream, node_session_id.node) { /* If this call fails, the stream is being used hence data pending. */ ret = stream_try_lock(stream); diff --git a/src/common/consumer.h b/src/common/consumer.h index 92f9e2095..29836e5b0 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -49,6 +49,9 @@ enum lttng_consumer_command { LTTNG_CONSUMER_ASK_CHANNEL_CREATION, LTTNG_CONSUMER_GET_CHANNEL, LTTNG_CONSUMER_DESTROY_CHANNEL, + LTTNG_CONSUMER_PUSH_METADATA, + LTTNG_CONSUMER_CLOSE_METADATA, + LTTNG_CONSUMER_SETUP_METADATA, }; /* State of each fd in consumer */ @@ -77,7 +80,7 @@ enum consumer_channel_output { enum consumer_channel_type { CONSUMER_CHANNEL_TYPE_METADATA = 0, - CONSUMER_CHANNEL_TYPE_DATA = 1, + CONSUMER_CHANNEL_TYPE_DATA = 1, }; struct stream_list { @@ -87,9 +90,9 @@ struct stream_list { struct lttng_consumer_channel { /* HT node used for consumer_data.channel_ht */ - struct lttng_ht_node_ulong node; + struct lttng_ht_node_u64 node; /* Indexed key. Incremented value in the consumer. */ - int key; + uint64_t key; /* Number of streams referencing this channel */ int refcount; /* Tracing session id on the session daemon side. */ @@ -102,7 +105,7 @@ struct lttng_consumer_channel { uid_t uid; gid_t gid; /* Relayd id of the channel. -1 if it does not apply. */ - int relayd_id; + int64_t relayd_id; /* * Number of streams NOT initialized yet. This is used in order to not * delete this channel if streams are getting initialized. @@ -122,6 +125,22 @@ struct lttng_consumer_channel { * LTTNG_CONSUMER_GET_CHANNEL. */ struct stream_list streams; + /* + * Set if the channel is metadata. We keep a reference to the stream + * because we have to flush data once pushed by the session daemon. For a + * regular channel, this is always set to NULL. + */ + struct lttng_consumer_stream *metadata_stream; + /* + * Metadata written so far. Helps keeping track of + * contiguousness and order. + */ + uint64_t contig_metadata_written; + + /* for UST */ + int wait_fd; + /* Node within channel thread ht */ + struct lttng_ht_node_u64 wait_fd_node; }; /* @@ -130,14 +149,16 @@ struct lttng_consumer_channel { */ struct lttng_consumer_stream { /* HT node used by the data_ht and metadata_ht */ - struct lttng_ht_node_ulong node; + struct lttng_ht_node_u64 node; + /* stream indexed per channel key node */ + struct lttng_ht_node_u64 node_channel_id; /* HT node used in consumer_data.stream_list_ht */ - struct lttng_ht_node_ulong node_session_id; + struct lttng_ht_node_u64 node_session_id; /* Pointer to associated channel. */ struct lttng_consumer_channel *chan; /* Key by which the stream is indexed for 'node'. */ - int key; + uint64_t key; /* * File descriptor of the data output file. This can be either a file or a * socket fd for relayd streaming. @@ -167,7 +188,7 @@ struct lttng_consumer_stream { uid_t uid; gid_t gid; /* Network sequence number. Indicating on which relayd socket it goes. */ - int net_seq_idx; + uint64_t net_seq_idx; /* Identify if the stream is the metadata */ unsigned int metadata_flag; /* Used when the stream is set for network streaming */ @@ -214,7 +235,7 @@ struct lttng_consumer_stream { */ struct consumer_relayd_sock_pair { /* Network sequence number. */ - int net_seq_idx; + int64_t net_seq_idx; /* Number of stream associated with this relayd */ unsigned int refcount; @@ -245,7 +266,7 @@ struct consumer_relayd_sock_pair { * this socket is for now only used in a single thread. */ struct lttcomm_sock data_sock; - struct lttng_ht_node_ulong node; + struct lttng_ht_node_u64 node; /* Session id on both sides for the sockets. */ uint64_t relayd_session_id; @@ -306,6 +327,7 @@ struct lttng_consumer_local_data { char *consumer_command_sock_path; /* communication with splice */ int consumer_thread_pipe[2]; + int consumer_channel_pipe[2]; int consumer_splice_metadata_pipe[2]; /* Data stream poll thread pipe. To transfer data stream to the thread */ int consumer_data_pipe[2]; @@ -358,6 +380,11 @@ struct lttng_consumer_global_data { * This HT uses the "node_session_id" of the consumer stream. */ struct lttng_ht *stream_list_ht; + + /* + * This HT uses the "node_channel_id" of the consumer stream. + */ + struct lttng_ht *stream_per_chan_id_ht; }; /* @@ -407,8 +434,8 @@ void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, */ int lttng_consumer_poll_socket(struct pollfd *kconsumer_sockpoll); -struct lttng_consumer_stream *consumer_allocate_stream(int channel_key, - int stream_key, +struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, + uint64_t stream_key, enum lttng_consumer_stream_state state, const char *channel_name, uid_t uid, @@ -418,7 +445,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(int channel_key, int cpu, int *alloc_ret, enum consumer_channel_type type); -struct lttng_consumer_channel *consumer_allocate_channel(unsigned long key, +struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, uint64_t session_id, const char *pathname, const char *name, @@ -430,14 +457,15 @@ void consumer_del_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht); void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht); -int consumer_add_channel(struct lttng_consumer_channel *channel); +int consumer_add_channel(struct lttng_consumer_channel *channel, + struct lttng_consumer_local_data *ctx); void consumer_del_channel(struct lttng_consumer_channel *channel); /* lttng-relayd consumer command */ struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair( int net_seq_idx); -struct consumer_relayd_sock_pair *consumer_find_relayd(int key); -struct lttng_consumer_channel *consumer_find_channel(unsigned long key); +struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key); +struct lttng_consumer_channel *consumer_find_channel(uint64_t key); int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, size_t data_size); void consumer_steal_stream_key(int key, struct lttng_ht *ht); @@ -464,6 +492,7 @@ int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, void *consumer_thread_metadata_poll(void *data); void *consumer_thread_data_poll(void *data); void *consumer_thread_sessiond_poll(void *data); +void *consumer_thread_channel_poll(void *data); int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll); diff --git a/src/common/defaults.h b/src/common/defaults.h index 93eb176a0..fca645870 100644 --- a/src/common/defaults.h +++ b/src/common/defaults.h @@ -44,9 +44,6 @@ #define DEFAULT_GLOBAL_APPS_PIPE DEFAULT_UST_SOCK_DIR "/global" #define DEFAULT_TRACE_OUTPUT DEFAULT_HOME_DIR "/lttng" -#define DEFAULT_GLOBAL_APPS_WAIT_SHM_PATH "/lttng-ust-apps-wait" -#define DEFAULT_HOME_APPS_WAIT_SHM_PATH "/lttng-ust-apps-wait-%u" - /* Default directory where the trace are written in per domain */ #define DEFAULT_KERNEL_TRACE_DIR "/kernel" #define DEFAULT_UST_TRACE_DIR "/ust" @@ -83,12 +80,25 @@ /* Default unix socket path */ #define DEFAULT_GLOBAL_CLIENT_UNIX_SOCK DEFAULT_LTTNG_RUNDIR "/client-lttng-sessiond" -#define DEFAULT_GLOBAL_APPS_UNIX_SOCK DEFAULT_LTTNG_RUNDIR "/apps-lttng-sessiond" -#define DEFAULT_HOME_APPS_UNIX_SOCK DEFAULT_LTTNG_HOME_RUNDIR "/apps-lttng-sessiond" #define DEFAULT_HOME_CLIENT_UNIX_SOCK DEFAULT_LTTNG_HOME_RUNDIR "/client-lttng-sessiond" #define DEFAULT_GLOBAL_HEALTH_UNIX_SOCK DEFAULT_LTTNG_RUNDIR "/health.sock" #define DEFAULT_HOME_HEALTH_UNIX_SOCK DEFAULT_LTTNG_HOME_RUNDIR "/health.sock" +#ifdef HAVE_LIBLTTNG_UST_CTL +#define DEFAULT_GLOBAL_APPS_UNIX_SOCK \ + DEFAULT_LTTNG_RUNDIR "/" LTTNG_UST_SOCK_FILENAME +#define DEFAULT_HOME_APPS_UNIX_SOCK \ + DEFAULT_LTTNG_HOME_RUNDIR "/" LTTNG_UST_SOCK_FILENAME +#define DEFAULT_GLOBAL_APPS_WAIT_SHM_PATH \ + "/" LTTNG_UST_WAIT_FILENAME +#define DEFAULT_HOME_APPS_WAIT_SHM_PATH \ + DEFAULT_GLOBAL_APPS_WAIT_SHM_PATH "-%d" + +#else +#define DEFAULT_GLOBAL_APPS_UNIX_SOCK +#define DEFAULT_HOME_APPS_UNIX_SOCK +#endif /* HAVE_LIBLTTNG_UST_CTL */ + /* * Value taken from the hard limit allowed by the kernel when using setrlimit * with RLIMIT_NOFILE on an Intel i7 CPU and Linux 3.0.3. @@ -111,7 +121,7 @@ /* DEFAULT_CHANNEL_SUBBUF_NUM must always be a power of 2 */ #define DEFAULT_CHANNEL_SUBBUF_NUM 4 #define DEFAULT_CHANNEL_SWITCH_TIMER 0 /* usec */ -#define DEFAULT_CHANNEL_READ_TIMER 200 /* usec */ +#define DEFAULT_CHANNEL_READ_TIMER 200000 /* usec */ #define DEFAULT_CHANNEL_OUTPUT LTTNG_EVENT_MMAP #define DEFAULT_METADATA_SUBBUF_SIZE 4096 @@ -129,6 +139,10 @@ /* See lttng-kernel.h enum lttng_kernel_output for channel output */ #define DEFAULT_KERNEL_CHANNEL_OUTPUT LTTNG_EVENT_SPLICE +#define DEFAULT_KERNEL_CHANNEL_SWITCH_TIMER \ + DEFAULT_CHANNEL_SWITCH_TIMER +#define DEFAULT_KERNEL_CHANNEL_READ_TIMER 200000 /* usec */ + /* User space defaults */ /* Must be a power of 2 */ @@ -138,6 +152,10 @@ /* See lttng-ust.h enum lttng_ust_output */ #define DEFAULT_UST_CHANNEL_OUTPUT LTTNG_EVENT_MMAP +#define DEFAULT_UST_CHANNEL_SWITCH_TIMER \ + DEFAULT_CHANNEL_SWITCH_TIMER +#define DEFAULT_UST_CHANNEL_READ_TIMER 0 /* usec */ + /* * Default timeout value for the sem_timedwait() call. Blocking forever is not * wanted so a timeout is used to control the data flow and not freeze the diff --git a/src/common/hashtable/hashtable.c b/src/common/hashtable/hashtable.c index 24d579773..263df4683 100644 --- a/src/common/hashtable/hashtable.c +++ b/src/common/hashtable/hashtable.c @@ -54,6 +54,17 @@ static int match_ulong(struct cds_lfht_node *node, const void *key) return hash_match_key_ulong((void *) match_node->key, (void *) key); } +/* + * Match function for u64 node. + */ +static int match_u64(struct cds_lfht_node *node, const void *key) +{ + struct lttng_ht_node_u64 *match_node = + caa_container_of(node, struct lttng_ht_node_u64, node); + + return hash_match_key_u64(&match_node->key, (void *) key); +} + /* * Return an allocated lttng hashtable. */ @@ -88,6 +99,10 @@ struct lttng_ht *lttng_ht_new(unsigned long size, int type) ht->match_fct = match_ulong; ht->hash_fct = hash_key_ulong; break; + case LTTNG_HT_TYPE_U64: + ht->match_fct = match_u64; + ht->hash_fct = hash_key_u64; + break; default: ERR("Unknown lttng hashtable type %d", type); goto error; @@ -136,6 +151,18 @@ void lttng_ht_node_init_ulong(struct lttng_ht_node_ulong *node, cds_lfht_node_init(&node->node); } +/* + * Init lttng ht node uint64_t. + */ +void lttng_ht_node_init_u64(struct lttng_ht_node_u64 *node, + uint64_t key) +{ + assert(node); + + node->key = key; + cds_lfht_node_init(&node->node); +} + /* * Free lttng ht node string. */ @@ -154,6 +181,15 @@ void lttng_ht_node_free_ulong(struct lttng_ht_node_ulong *node) free(node); } +/* + * Free lttng ht node uint64_t. + */ +void lttng_ht_node_free_u64(struct lttng_ht_node_ulong *node) +{ + assert(node); + free(node); +} + /* * Lookup function in hashtable. */ @@ -196,6 +232,20 @@ void lttng_ht_add_ulong(struct lttng_ht *ht, struct lttng_ht_node_ulong *node) &node->node); } +/* + * Add uint64_t node to hashtable. + + */ +void lttng_ht_add_u64(struct lttng_ht *ht, struct lttng_ht_node_u64 *node) +{ + assert(ht); + assert(ht->ht); + assert(node); + + cds_lfht_add(ht->ht, ht->hash_fct(&node->key, lttng_ht_seed), + &node->node); +} + /* * Add unique unsigned long node to hashtable. */ @@ -213,6 +263,23 @@ void lttng_ht_add_unique_ulong(struct lttng_ht *ht, assert(node_ptr == &node->node); } +/* + * Add unique uint64_t node to hashtable. + */ +void lttng_ht_add_unique_u64(struct lttng_ht *ht, + struct lttng_ht_node_u64 *node) +{ + struct cds_lfht_node *node_ptr; + assert(ht); + assert(ht->ht); + assert(node); + + node_ptr = cds_lfht_add_unique(ht->ht, + ht->hash_fct(&node->key, lttng_ht_seed), ht->match_fct, + &node->key, &node->node); + assert(node_ptr == &node->node); +} + /* * Add replace unsigned long node to hashtable. */ @@ -235,6 +302,28 @@ struct lttng_ht_node_ulong *lttng_ht_add_replace_ulong(struct lttng_ht *ht, assert(node_ptr == &node->node); } +/* + * Add replace unsigned long node to hashtable. + */ +struct lttng_ht_node_u64 *lttng_ht_add_replace_u64(struct lttng_ht *ht, + struct lttng_ht_node_u64 *node) +{ + struct cds_lfht_node *node_ptr; + assert(ht); + assert(ht->ht); + assert(node); + + node_ptr = cds_lfht_add_replace(ht->ht, + ht->hash_fct(&node->key, lttng_ht_seed), ht->match_fct, + &node->key, &node->node); + if (!node_ptr) { + return NULL; + } else { + return caa_container_of(node_ptr, struct lttng_ht_node_u64, node); + } + assert(node_ptr == &node->node); +} + /* * Delete node from hashtable. */ @@ -319,10 +408,26 @@ struct lttng_ht_node_ulong *lttng_ht_iter_get_node_ulong( return caa_container_of(node, struct lttng_ht_node_ulong, node); } +/* + * Return lttng ht unsigned long node from iterator. + */ +struct lttng_ht_node_u64 *lttng_ht_iter_get_node_u64( + struct lttng_ht_iter *iter) +{ + struct cds_lfht_node *node; + + assert(iter); + node = cds_lfht_iter_get_node(&iter->iter); + if (!node) { + return NULL; + } + return caa_container_of(node, struct lttng_ht_node_u64, node); +} + /* * lib constructor */ -static void __attribute__((constructor)) _init() +static void __attribute__((constructor)) _init(void) { /* Init hash table seed */ lttng_ht_seed = (unsigned long) time(NULL); diff --git a/src/common/hashtable/hashtable.h b/src/common/hashtable/hashtable.h index 4007a9c6c..b4c1909b7 100644 --- a/src/common/hashtable/hashtable.h +++ b/src/common/hashtable/hashtable.h @@ -19,6 +19,7 @@ #define _LTT_HT_H #include +#include #include "rculfhash.h" #include "rculfhash-internal.h" @@ -31,6 +32,7 @@ typedef cds_lfht_match_fct hash_match_fct; enum lttng_ht_type { LTTNG_HT_TYPE_STRING, LTTNG_HT_TYPE_ULONG, + LTTNG_HT_TYPE_U64, }; struct lttng_ht { @@ -55,6 +57,12 @@ struct lttng_ht_node_ulong { struct rcu_head head; }; +struct lttng_ht_node_u64 { + uint64_t key; + struct cds_lfht_node node; + struct rcu_head head; +}; + /* Hashtable new and destroy */ extern struct lttng_ht *lttng_ht_new(unsigned long size, int type); extern void lttng_ht_destroy(struct lttng_ht *ht); @@ -63,8 +71,11 @@ extern void lttng_ht_destroy(struct lttng_ht *ht); extern void lttng_ht_node_init_str(struct lttng_ht_node_str *node, char *key); extern void lttng_ht_node_init_ulong(struct lttng_ht_node_ulong *node, unsigned long key); +extern void lttng_ht_node_init_u64(struct lttng_ht_node_u64 *node, + uint64_t key); extern void lttng_ht_node_free_str(struct lttng_ht_node_str *node); extern void lttng_ht_node_free_ulong(struct lttng_ht_node_ulong *node); +extern void lttng_ht_node_free_u64(struct lttng_ht_node_ulong *node); extern void lttng_ht_lookup(struct lttng_ht *ht, void *key, struct lttng_ht_iter *iter); @@ -74,10 +85,16 @@ extern void lttng_ht_add_unique_str(struct lttng_ht *ht, struct lttng_ht_node_str *node); extern void lttng_ht_add_unique_ulong(struct lttng_ht *ht, struct lttng_ht_node_ulong *node); +extern void lttng_ht_add_unique_u64(struct lttng_ht *ht, + struct lttng_ht_node_u64 *node); extern struct lttng_ht_node_ulong *lttng_ht_add_replace_ulong( struct lttng_ht *ht, struct lttng_ht_node_ulong *node); +extern struct lttng_ht_node_u64 *lttng_ht_add_replace_u64( + struct lttng_ht *ht, struct lttng_ht_node_u64 *node); extern void lttng_ht_add_ulong(struct lttng_ht *ht, struct lttng_ht_node_ulong *node); +extern void lttng_ht_add_u64(struct lttng_ht *ht, + struct lttng_ht_node_u64 *node); extern int lttng_ht_del(struct lttng_ht *ht, struct lttng_ht_iter *iter); @@ -91,5 +108,7 @@ extern struct lttng_ht_node_str *lttng_ht_iter_get_node_str( struct lttng_ht_iter *iter); extern struct lttng_ht_node_ulong *lttng_ht_iter_get_node_ulong( struct lttng_ht_iter *iter); +extern struct lttng_ht_node_u64 *lttng_ht_iter_get_node_u64( + struct lttng_ht_iter *iter); #endif /* _LTT_HT_H */ diff --git a/src/common/hashtable/utils.c b/src/common/hashtable/utils.c index 850f9e5db..8d0e515ae 100644 --- a/src/common/hashtable/utils.c +++ b/src/common/hashtable/utils.c @@ -446,12 +446,8 @@ static uint32_t __attribute__((unused)) hashlittle(const void *key, return c; } -#if (CAA_BITS_PER_LONG == 64) -/* - * Hash function for number value. - */ LTTNG_HIDDEN -unsigned long hash_key_ulong(void *_key, unsigned long seed) +unsigned long hash_key_u64(void *_key, unsigned long seed) { union { uint64_t v64; @@ -463,10 +459,21 @@ unsigned long hash_key_ulong(void *_key, unsigned long seed) } key; v.v64 = (uint64_t) seed; - key.v64 = (uint64_t) _key; + key.v64 = *(uint64_t *) _key; hashword2(key.v32, 2, &v.v32[0], &v.v32[1]); return v.v64; } + +#if (CAA_BITS_PER_LONG == 64) +/* + * Hash function for number value. + */ +LTTNG_HIDDEN +unsigned long hash_key_ulong(void *_key, unsigned long seed) +{ + uint64_t __key = (uint64_t) _key; + return (unsigned long) hash_key_u64(&__key, seed); +} #else /* * Hash function for number value. @@ -502,6 +509,19 @@ int hash_match_key_ulong(void *key1, void *key2) return 0; } +/* + * Hash function compare for number value. + */ +LTTNG_HIDDEN +int hash_match_key_u64(void *key1, void *key2) +{ + if (*(uint64_t *) key1 == *(uint64_t *) key2) { + return 1; + } + + return 0; +} + /* * Hash compare function for string. */ diff --git a/src/common/hashtable/utils.h b/src/common/hashtable/utils.h index 4f0890892..38d6121e4 100644 --- a/src/common/hashtable/utils.h +++ b/src/common/hashtable/utils.h @@ -21,8 +21,10 @@ #include unsigned long hash_key_ulong(void *_key, unsigned long seed); +unsigned long hash_key_u64(void *_key, unsigned long seed); unsigned long hash_key_str(void *key, unsigned long seed); int hash_match_key_ulong(void *key1, void *key2); +int hash_match_key_u64(void *key1, void *key2); int hash_match_key_str(void *key1, void *key2); #endif /* _LTT_HT_UTILS_H */ diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 9d75d3f09..0de73443e 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -128,7 +128,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_nosignal; } - DBG("consumer_add_channel %d", msg.u.channel.channel_key); + DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key); new_channel = consumer_allocate_channel(msg.u.channel.channel_key, msg.u.channel.session_id, msg.u.channel.pathname, msg.u.channel.name, msg.u.channel.uid, msg.u.channel.gid, @@ -153,12 +153,12 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (ctx->on_recv_channel != NULL) { ret = ctx->on_recv_channel(new_channel); if (ret == 0) { - consumer_add_channel(new_channel); + consumer_add_channel(new_channel, ctx); } else if (ret < 0) { goto end_nosignal; } } else { - consumer_add_channel(new_channel); + consumer_add_channel(new_channel, ctx); } goto end_nosignal; } @@ -180,7 +180,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, * We could not find the channel. Can happen if cpu hotplug * happens while tearing down. */ - ERR("Unable to find channel key %d", msg.u.stream.channel_key); + ERR("Unable to find channel key %" PRIu64, msg.u.stream.channel_key); ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND; } @@ -265,8 +265,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, consumer_del_stream(new_stream, NULL); goto end_nosignal; } - } else if (new_stream->net_seq_idx != -1) { - ERR("Network sequence index %d unknown. Not adding stream.", + } else if (new_stream->net_seq_idx != (uint64_t) -1ULL) { + ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.", new_stream->net_seq_idx); consumer_del_stream(new_stream, NULL); goto end_nosignal; @@ -464,8 +464,8 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, * network streaming or the full padding (len) size when we are _not_ * streaming. */ - if ((ret != subbuf_size && stream->net_seq_idx != -1) || - (ret != len && stream->net_seq_idx == -1)) { + if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) || + (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) { /* * Display the error but continue processing to try to release the * subbuffer @@ -513,7 +513,7 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) } /* Opening the tracefile in write mode */ - if (stream->net_seq_idx == -1) { + if (stream->net_seq_idx == (uint64_t) -1ULL) { ret = run_as_open(full_path, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO, stream->uid, stream->gid); if (ret < 0) { diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h index c390a9f52..60a3eade6 100644 --- a/src/common/sessiond-comm/sessiond-comm.h +++ b/src/common/sessiond-comm/sessiond-comm.h @@ -117,6 +117,8 @@ enum lttcomm_return_code { LTTCOMM_CONSUMERD_SPLICE_ENOMEM, /* ENOMEM from splice(2) */ LTTCOMM_CONSUMERD_SPLICE_ESPIPE, /* ESPIPE from splice(2) */ LTTCOMM_CONSUMERD_ENOMEM, /* Consumer is out of memory */ + LTTCOMM_CONSUMERD_ERROR_METADATA, /* Error with metadata. */ + LTTCOMM_CONSUMERD_FATAL, /* Fatal error. */ /* MUST be last element */ LTTCOMM_NR, /* Last element */ @@ -145,7 +147,7 @@ struct lttcomm_sockaddr { } LTTNG_PACKED; struct lttcomm_sock { - int fd; + int32_t fd; enum lttcomm_sock_proto proto; struct lttcomm_sockaddr sockaddr; const struct lttcomm_proto_ops *ops; @@ -172,7 +174,7 @@ struct lttcomm_proto_ops { * Data structure received from lttng client to session daemon. */ struct lttcomm_session_msg { - uint32_t cmd_type; /* enum lttcomm_sessiond_command */ + uint32_t cmd_type; /* enum lttcomm_sessiond_command */ struct lttng_session session; struct lttng_domain domain; union { @@ -233,9 +235,9 @@ struct lttng_filter_bytecode { * Data structure for the response from sessiond to the lttng client. */ struct lttcomm_lttng_msg { - uint32_t cmd_type; /* enum lttcomm_sessiond_command */ - uint32_t ret_code; /* enum lttcomm_return_code */ - uint32_t pid; /* pid_t */ + uint32_t cmd_type; /* enum lttcomm_sessiond_command */ + uint32_t ret_code; /* enum lttcomm_return_code */ + uint32_t pid; /* pid_t */ uint32_t data_size; /* Contains: trace_name + data */ char payload[]; @@ -259,26 +261,26 @@ struct lttcomm_consumer_msg { uint32_t cmd_type; /* enum consumerd_command */ union { struct { - int channel_key; + uint64_t channel_key; uint64_t session_id; char pathname[PATH_MAX]; - uid_t uid; - gid_t gid; - int relayd_id; + uint32_t uid; + uint32_t gid; + uint64_t relayd_id; /* nb_init_streams is the number of streams open initially. */ - unsigned int nb_init_streams; + uint32_t nb_init_streams; char name[LTTNG_SYMBOL_NAME_LEN]; /* Use splice or mmap to consume this fd */ enum lttng_event_output output; int type; /* Per cpu or metadata. */ } LTTNG_PACKED channel; /* Only used by Kernel. */ struct { - int stream_key; - int channel_key; - int cpu; /* On which CPU this stream is assigned. */ + uint64_t stream_key; + uint64_t channel_key; + int32_t cpu; /* On which CPU this stream is assigned. */ } LTTNG_PACKED stream; /* Only used by Kernel. */ struct { - int net_index; + uint64_t net_index; enum lttng_stream_type type; /* Open socket to the relayd */ struct lttcomm_sock sock; @@ -292,28 +294,39 @@ struct lttcomm_consumer_msg { uint64_t session_id; } LTTNG_PACKED data_pending; struct { - uint64_t subbuf_size; /* bytes */ - uint64_t num_subbuf; /* power of 2 */ - int overwrite; /* 1: overwrite, 0: discard */ - unsigned int switch_timer_interval; /* usec */ - unsigned int read_timer_interval; /* usec */ - int output; /* splice, mmap */ - int type; /* metadata or per_cpu */ - uint64_t session_id; /* Tracing session id */ - char pathname[PATH_MAX]; /* Channel file path. */ + uint64_t subbuf_size; /* bytes */ + uint64_t num_subbuf; /* power of 2 */ + int32_t overwrite; /* 1: overwrite, 0: discard */ + uint32_t switch_timer_interval; /* usec */ + uint32_t read_timer_interval; /* usec */ + int32_t output; /* splice, mmap */ + int32_t type; /* metadata or per_cpu */ + uint64_t session_id; /* Tracing session id */ + char pathname[PATH_MAX]; /* Channel file path. */ char name[LTTNG_SYMBOL_NAME_LEN]; /* Channel name. */ - uid_t uid; /* User ID of the session */ - gid_t gid; /* Group ID ot the session */ - int relayd_id; /* Relayd id if apply. */ - unsigned long key; /* Unique channel key. */ + uint32_t uid; /* User ID of the session */ + uint32_t gid; /* Group ID ot the session */ + uint64_t relayd_id; /* Relayd id if apply. */ + uint64_t key; /* Unique channel key. */ unsigned char uuid[UUID_STR_LEN]; /* uuid for ust tracer. */ } LTTNG_PACKED ask_channel; struct { - unsigned long key; + uint64_t key; } LTTNG_PACKED get_channel; struct { - unsigned long key; + uint64_t key; } LTTNG_PACKED destroy_channel; + struct { + uint64_t key; /* Metadata channel key. */ + uint64_t target_offset; /* Offset in the consumer */ + uint64_t len; /* Length of metadata to be received. */ + } LTTNG_PACKED push_metadata; + struct { + uint64_t key; /* Metadata channel key. */ + } LTTNG_PACKED close_metadata; + struct { + uint64_t key; /* Metadata channel key. */ + } LTTNG_PACKED setup_metadata; } u; } LTTNG_PACKED; @@ -326,7 +339,7 @@ struct lttcomm_consumer_status_msg { struct lttcomm_consumer_status_channel { enum lttng_error_code ret_code; - unsigned long key; + uint64_t key; unsigned int stream_count; } LTTNG_PACKED; diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 442925754..5a09ff51f 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -88,17 +88,17 @@ static int add_channel(struct lttng_consumer_channel *channel, if (ctx->on_recv_channel != NULL) { ret = ctx->on_recv_channel(channel); if (ret == 0) { - ret = consumer_add_channel(channel); + ret = consumer_add_channel(channel, ctx); } else if (ret < 0) { /* Most likely an ENOMEM. */ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto error; } } else { - ret = consumer_add_channel(channel); + ret = consumer_add_channel(channel, ctx); } - DBG("UST consumer channel added (key: %u)", channel->key); + DBG("UST consumer channel added (key: %" PRIu64 ")", channel->key); error: return ret; @@ -109,7 +109,7 @@ error: */ static struct lttng_consumer_channel *allocate_channel(uint64_t session_id, const char *pathname, const char *name, uid_t uid, gid_t gid, - int relayd_id, unsigned long key, enum lttng_event_output output) + int relayd_id, uint64_t key, enum lttng_event_output output) { assert(pathname); assert(name); @@ -223,8 +223,8 @@ static int send_stream_to_relayd(struct lttng_consumer_stream *stream) if (ret < 0) { goto error; } - } else if (stream->net_seq_idx != -1) { - ERR("Network sequence index %d unknown. Not adding stream.", + } else if (stream->net_seq_idx != (uint64_t) -1ULL) { + ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.", stream->net_seq_idx); ret = -1; goto error; @@ -234,6 +234,11 @@ error: return ret; } +/* + * Create streams for the given channel using liblttng-ust-ctl. + * + * Return 0 on success else a negative value. + */ static int create_ust_streams(struct lttng_consumer_channel *channel, struct lttng_consumer_local_data *ctx) { @@ -251,7 +256,7 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, while ((ustream = ustctl_create_stream(channel->uchan, cpu))) { int wait_fd; - wait_fd = ustctl_get_wait_fd(ustream); + wait_fd = ustctl_stream_get_wait_fd(ustream); /* Allocate consumer stream object. */ stream = allocate_stream(cpu, wait_fd, channel, ctx, &ret); @@ -288,11 +293,16 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, } } - DBG("UST consumer add stream %s (key: %d) with relayd id %" PRIu64, + DBG("UST consumer add stream %s (key: %" PRIu64 ") with relayd id %" PRIu64, stream->name, stream->key, stream->relayd_stream_id); /* Set next CPU stream. */ channel->streams.count = ++cpu; + + /* Keep stream reference when creating metadata. */ + if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) { + channel->metadata_stream = stream; + } } return 0; @@ -338,6 +348,11 @@ error_create: return ret; } +/* + * Send a single given stream to the session daemon using the sock. + * + * Return 0 on success else a negative value. + */ static int send_sessiond_stream(int sock, struct lttng_consumer_stream *stream) { int ret; @@ -345,7 +360,7 @@ static int send_sessiond_stream(int sock, struct lttng_consumer_stream *stream) assert(stream); assert(sock >= 0); - DBG2("UST consumer sending stream %d to sessiond", stream->key); + DBG2("UST consumer sending stream %" PRIu64 " to sessiond", stream->key); /* Send stream to session daemon. */ ret = ustctl_send_stream_to_sessiond(sock, stream->ustream); @@ -353,11 +368,6 @@ static int send_sessiond_stream(int sock, struct lttng_consumer_stream *stream) goto error; } - ret = ustctl_stream_close_wakeup_fd(stream->ustream); - if (ret < 0) { - goto error; - } - error: return ret; } @@ -365,8 +375,7 @@ error: /* * Send channel to sessiond. * - * Return 0 on success or else a negative value. On error, the channel is - * destroy using ustctl. + * Return 0 on success or else a negative value. */ static int send_sessiond_channel(int sock, struct lttng_consumer_channel *channel, @@ -387,6 +396,11 @@ static int send_sessiond_channel(int sock, goto error; } + ret = ustctl_channel_close_wakeup_fd(channel->uchan); + if (ret < 0) { + goto error; + } + /* The channel was sent successfully to the sessiond at this point. */ cds_list_for_each_entry(stream, &channel->streams.head, send_node) { /* Try to send the stream to the relayd if one is available. */ @@ -462,6 +476,12 @@ static int ask_channel(struct lttng_consumer_local_data *ctx, int sock, goto error; } + channel->wait_fd = ustctl_channel_get_wait_fd(channel->uchan); + + if (ret < 0) { + goto error; + } + /* Open all streams for this channel. */ ret = create_ust_streams(channel, ctx); if (ret < 0) { @@ -472,6 +492,155 @@ error: return ret; } +/* + * Send all stream of a channel to the right thread handling it. + * + * On error, return a negative value else 0 on success. + */ +static int send_streams_to_thread(struct lttng_consumer_channel *channel, + struct lttng_consumer_local_data *ctx) +{ + int ret = 0; + struct lttng_consumer_stream *stream, *stmp; + + assert(channel); + assert(ctx); + + /* Send streams to the corresponding thread. */ + cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head, + send_node) { + /* Sending the stream to the thread. */ + ret = send_stream_to_thread(stream, ctx); + if (ret < 0) { + /* + * If we are unable to send the stream to the thread, there is + * a big problem so just stop everything. + */ + goto error; + } + + /* Remove node from the channel stream list. */ + cds_list_del(&stream->send_node); + } + +error: + return ret; +} + +/* + * Write metadata to the given channel using ustctl to convert the string to + * the ringbuffer. + * + * Return 0 on success else a negative value. + */ +static int push_metadata(struct lttng_consumer_channel *metadata, + const char *metadata_str, uint64_t target_offset, uint64_t len) +{ + int ret; + + assert(metadata); + assert(metadata_str); + + DBG("UST consumer writing metadata to channel %s", metadata->name); + + assert(target_offset == metadata->contig_metadata_written); + ret = ustctl_write_metadata_to_channel(metadata->uchan, metadata_str, len); + if (ret < 0) { + ERR("ustctl write metadata fail with ret %d, len %ld", ret, len); + goto error; + } + metadata->contig_metadata_written += len; + + ustctl_flush_buffer(metadata->metadata_stream->ustream, 1); + +error: + return ret; +} + +/* + * Close metadata stream wakeup_fd using the given key to retrieve the channel. + * + * Return 0 on success else an LTTng error code. + */ +static int close_metadata(uint64_t chan_key) +{ + int ret; + struct lttng_consumer_channel *channel; + + DBG("UST consumer close metadata key %lu", chan_key); + + channel = consumer_find_channel(chan_key); + if (!channel) { + ERR("UST consumer close metadata %lu not found", chan_key); + ret = LTTNG_ERR_UST_CHAN_NOT_FOUND; + goto error; + } + + ret = ustctl_stream_close_wakeup_fd(channel->metadata_stream->ustream); + if (ret < 0) { + ERR("UST consumer unable to close fd of metadata (ret: %d)", ret); + ret = LTTCOMM_CONSUMERD_ERROR_METADATA; + goto error; + } + +error: + return ret; +} + +/* + * RCU read side lock MUST be acquired before calling this function. + * + * Return 0 on success else an LTTng error code. + */ +static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key) +{ + int ret; + struct lttng_consumer_channel *metadata; + + DBG("UST consumer setup metadata key %lu", key); + + metadata = consumer_find_channel(key); + if (!metadata) { + ERR("UST consumer push metadata %" PRIu64 " not found", key); + ret = LTTNG_ERR_UST_CHAN_NOT_FOUND; + goto error; + } + + /* + * Send metadata stream to relayd if one available. Availability is + * known if the stream is still in the list of the channel. + */ + if (cds_list_empty(&metadata->streams.head)) { + ERR("Metadata channel key %" PRIu64 ", no stream available.", key); + ret = LTTCOMM_CONSUMERD_ERROR_METADATA; + goto error; + } + + /* Send metadata stream to relayd if needed. */ + ret = send_stream_to_relayd(metadata->metadata_stream); + if (ret < 0) { + ret = LTTCOMM_CONSUMERD_ERROR_METADATA; + goto error; + } + + ret = send_streams_to_thread(metadata, ctx); + if (ret < 0) { + /* + * If we are unable to send the stream to the thread, there is + * a big problem so just stop everything. + */ + ret = LTTCOMM_CONSUMERD_FATAL; + goto error; + } + /* List MUST be empty after or else it could be reused. */ + assert(cds_list_empty(&metadata->streams.head)); + + ret = 0; + +error: + return ret; +} + /* * Receive command from session daemon and process it. * @@ -548,13 +717,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, consumer_flag_relayd_for_destroy(relayd); } - ret = consumer_send_status_msg(sock, ret_code); - if (ret < 0) { - /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; - } - - goto end_nosignal; + goto end_msg_sessiond; } case LTTNG_CONSUMER_UPDATE_STREAM: { @@ -665,9 +828,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, case LTTNG_CONSUMER_GET_CHANNEL: { int ret, relayd_err = 0; - unsigned long key = msg.u.get_channel.key; + uint64_t key = msg.u.get_channel.key; struct lttng_consumer_channel *channel; - struct lttng_consumer_stream *stream, *stmp; channel = consumer_find_channel(key); if (!channel) { @@ -702,58 +864,108 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto error_fatal; } - /* Send streams to the corresponding thread. */ - cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head, - send_node) { - /* Sending the stream to the thread. */ - ret = send_stream_to_thread(stream, ctx); - if (ret < 0) { - /* - * If we are unable to send the stream to the thread, there is - * a big problem so just stop everything. - */ - goto error_fatal; - } - - /* Remove node from the channel stream list. */ - cds_list_del(&stream->send_node); + ret = send_streams_to_thread(channel, ctx); + if (ret < 0) { + /* + * If we are unable to send the stream to the thread, there is + * a big problem so just stop everything. + */ + goto error_fatal; } - /* List MUST be empty after or else it could be reused. */ assert(cds_list_empty(&channel->streams.head)); - /* Inform sessiond that everything is done and OK on our side. */ - ret = consumer_send_status_msg(sock, LTTNG_OK); - if (ret < 0) { - /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; + goto end_msg_sessiond; + } + case LTTNG_CONSUMER_DESTROY_CHANNEL: + { + uint64_t key = msg.u.destroy_channel.key; + struct lttng_consumer_channel *channel; + + channel = consumer_find_channel(key); + if (!channel) { + ERR("UST consumer get channel key %lu not found", key); + ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND; + goto end_msg_sessiond; } - break; + destroy_channel(channel); + + goto end_msg_sessiond; } - case LTTNG_CONSUMER_DESTROY_CHANNEL: + case LTTNG_CONSUMER_CLOSE_METADATA: { int ret; - unsigned long key = msg.u.destroy_channel.key; + + ret = close_metadata(msg.u.close_metadata.key); + if (ret != 0) { + ret_code = ret; + } + + goto end_msg_sessiond; + } + case LTTNG_CONSUMER_PUSH_METADATA: + { + int ret; + uint64_t len = msg.u.push_metadata.len; + uint64_t target_offset = msg.u.push_metadata.target_offset; + uint64_t key = msg.u.push_metadata.key; struct lttng_consumer_channel *channel; + char *metadata_str; - DBG("UST consumer destroy channel key %lu", key); + DBG("UST consumer push metadata key %lu of len %lu", key, len); channel = consumer_find_channel(key); if (!channel) { - ERR("UST consumer destroy channel %lu not found", key); + ERR("UST consumer push metadata %lu not found", key); ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND; - } else { - /* Protocol error if the stream list is NOT empty. */ - assert(!cds_list_empty(&channel->streams.head)); - consumer_del_channel(channel); } + metadata_str = zmalloc(len * sizeof(char)); + if (!metadata_str) { + PERROR("zmalloc metadata string"); + ret_code = LTTCOMM_CONSUMERD_ENOMEM; + goto end_msg_sessiond; + } + + /* Tell session daemon we are ready to receive the metadata. */ ret = consumer_send_status_msg(sock, LTTNG_OK); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ + goto error_fatal; + } + + /* Wait for more data. */ + if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { goto end_nosignal; } + + /* Receive metadata string. */ + ret = lttcomm_recv_unix_sock(sock, metadata_str, len); + if (ret < 0) { + /* Session daemon is dead so return gracefully. */ + goto end_nosignal; + } + + ret = push_metadata(channel, metadata_str, target_offset, len); + free(metadata_str); + if (ret < 0) { + /* Unable to handle metadata. Notify session daemon. */ + ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA; + goto end_msg_sessiond; + } + + goto end_msg_sessiond; + } + case LTTNG_CONSUMER_SETUP_METADATA: + { + int ret; + + ret = setup_metadata(ctx, msg.u.setup_metadata.key); + if (ret) { + ret_code = ret; + } + goto end_msg_sessiond; } default: break; @@ -945,8 +1157,8 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, * The mmap operation should write subbuf_size amount of data when network * streaming or the full padding (len) size when we are _not_ streaming. */ - if ((ret != subbuf_size && stream->net_seq_idx != -1) || - (ret != len && stream->net_seq_idx == -1)) { + if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) || + (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) { /* * Display the error but continue processing to try to release the * subbuffer. This is a DBG statement since any unexpected kill or @@ -974,7 +1186,7 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream) char full_path[PATH_MAX]; /* Opening the tracefile in write mode */ - if (stream->net_seq_idx != -1) { + if (stream->net_seq_idx != (uint64_t) -1ULL) { goto end; } @@ -1033,3 +1245,51 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream) end: return ret; } + +/* + * Close every metadata stream wait fd of the metadata hash table. This + * function MUST be used very carefully so not to run into a race between the + * metadata thread handling streams and this function closing their wait fd. + * + * For UST, this is used when the session daemon hangs up. Its the metadata + * producer so calling this is safe because we are assured that no state change + * can occur in the metadata thread for the streams in the hash table. + */ +void lttng_ustconsumer_close_metadata(struct lttng_ht *metadata_ht) +{ + int ret; + struct lttng_ht_iter iter; + struct lttng_consumer_stream *stream; + + assert(metadata_ht); + assert(metadata_ht->ht); + + DBG("UST consumer closing all metadata streams"); + + rcu_read_lock(); + cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, + node.node) { + int fd = stream->wait_fd; + + /* + * Whatever happens here we have to continue to try to close every + * streams. Let's report at least the error on failure. + */ + ret = ustctl_stream_close_wakeup_fd(stream->ustream); + if (ret) { + ERR("Unable to close metadata stream fd %d ret %d", fd, ret); + } + DBG("Metadata wait fd %d closed", fd); + } + rcu_read_unlock(); +} + +void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream) +{ + int ret; + + ret = ustctl_stream_close_wakeup_fd(stream->ustream); + if (ret < 0) { + ERR("Unable to close wakeup fd"); + } +} diff --git a/src/common/ust-consumer/ust-consumer.h b/src/common/ust-consumer/ust-consumer.h index 009fa5e59..bbaff6cbf 100644 --- a/src/common/ust-consumer/ust-consumer.h +++ b/src/common/ust-consumer/ust-consumer.h @@ -49,6 +49,8 @@ int lttng_ustctl_get_mmap_read_offset(struct lttng_consumer_stream *stream, unsigned long *off); void *lttng_ustctl_get_mmap_base(struct lttng_consumer_stream *stream); int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream); +void lttng_ustconsumer_close_metadata(struct lttng_ht *ht); +void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream); #else /* HAVE_LIBLTTNG_UST_CTL */ @@ -146,6 +148,14 @@ void *lttng_ustctl_get_mmap_base(struct lttng_consumer_stream *stream) { return NULL; } +static inline +void lttng_ustconsumer_close_metadata(struct lttng_ht *ht) +{ +} +static inline +void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream) +{ +} #endif /* HAVE_LIBLTTNG_UST_CTL */ #endif /* _LTTNG_USTCONSUMER_H */ diff --git a/src/lib/lttng-ctl/filter/filter-visitor-generate-bytecode.c b/src/lib/lttng-ctl/filter/filter-visitor-generate-bytecode.c index 39f74591f..898072227 100644 --- a/src/lib/lttng-ctl/filter/filter-visitor-generate-bytecode.c +++ b/src/lib/lttng-ctl/filter/filter-visitor-generate-bytecode.c @@ -107,11 +107,14 @@ int32_t bytecode_reserve(struct lttng_filter_bytecode_alloc **fb, uint32_t align return -EINVAL; if (new_alloc_len > old_alloc_len) { + struct lttng_filter_bytecode_alloc *newptr; + new_alloc_len = max_t(uint32_t, 1U << get_count_order(new_alloc_len), old_alloc_len << 1); - *fb = realloc(*fb, new_alloc_len); - if (!*fb) + newptr = realloc(*fb, new_alloc_len); + if (!newptr) return -ENOMEM; + *fb = newptr; /* We zero directly the memory from start of allocation. */ memset(&((char *) *fb)[old_alloc_len], 0, new_alloc_len - old_alloc_len); (*fb)->alloc_len = new_alloc_len; diff --git a/src/lib/lttng-ctl/lttng-ctl.c b/src/lib/lttng-ctl/lttng-ctl.c index 205744ef7..8f63bf3e8 100644 --- a/src/lib/lttng-ctl/lttng-ctl.c +++ b/src/lib/lttng-ctl/lttng-ctl.c @@ -1589,7 +1589,7 @@ int _lttng_create_session_ext(const char *name, const char *url, struct lttcomm_session_msg lsm; struct lttng_uri *uris = NULL; - if (name == NULL || datetime == NULL) { + if (name == NULL || datetime == NULL || url == NULL) { return -LTTNG_ERR_INVALID; } diff --git a/tests/regression/tools/streaming/test_high_throughput_limits b/tests/regression/tools/streaming/test_high_throughput_limits index 7c253ba84..2412374bb 100755 --- a/tests/regression/tools/streaming/test_high_throughput_limits +++ b/tests/regression/tools/streaming/test_high_throughput_limits @@ -84,14 +84,6 @@ function create_lttng_session_with_uri ok $? "Create session with uri $uri" } -function enable_lttng_consumer -{ - uri=$1 - # Create session with custom URI - $TESTDIR/../src/bin/lttng/$LTTNG_BIN enable-consumer -u $uri >/dev/null 2>&1 - ok $? "Enable consumer with uri $uri" -} - function run_apps { for i in `seq 1 $NR_APP_ITER`; do @@ -115,7 +107,6 @@ function test_high_throughput { NETWORK_URI="net://localhost" create_lttng_session_with_uri $SESSION_NAME $NETWORK_URI - enable_lttng_consumer $NETWORK_URI enable_ust_lttng_event $SESSION_NAME $EVENT_NAME start_lttng_tracing $SESSION_NAME run_apps diff --git a/tests/unit/test_kernel_data.c b/tests/unit/test_kernel_data.c index 1359b7fa8..13f48f9b5 100644 --- a/tests/unit/test_kernel_data.c +++ b/tests/unit/test_kernel_data.c @@ -100,9 +100,9 @@ static void test_create_kernel_metadata(void) kern->metadata->conf->attr.num_subbuf == DEFAULT_METADATA_SUBBUF_NUM && kern->metadata->conf->attr.switch_timer_interval - == DEFAULT_CHANNEL_SWITCH_TIMER && + == DEFAULT_KERNEL_CHANNEL_SWITCH_TIMER && kern->metadata->conf->attr.read_timer_interval - == DEFAULT_CHANNEL_READ_TIMER && + == DEFAULT_KERNEL_CHANNEL_READ_TIMER && kern->metadata->conf->attr.output == DEFAULT_KERNEL_CHANNEL_OUTPUT, "Validate kernel session metadata"); diff --git a/tests/unit/test_ust_data.c b/tests/unit/test_ust_data.c index ae59c07a0..0f1c99b95 100644 --- a/tests/unit/test_ust_data.c +++ b/tests/unit/test_ust_data.c @@ -109,9 +109,9 @@ static void test_create_ust_metadata(void) metadata->attr.num_subbuf == DEFAULT_METADATA_SUBBUF_NUM && metadata->attr.switch_timer_interval - == DEFAULT_CHANNEL_SWITCH_TIMER && + == DEFAULT_UST_CHANNEL_SWITCH_TIMER && metadata->attr.read_timer_interval - == DEFAULT_CHANNEL_READ_TIMER && + == DEFAULT_UST_CHANNEL_READ_TIMER && metadata->attr.output == LTTNG_UST_MMAP, "Validate UST session metadata");