+/*
+ * thread_manage_kconsumerd
+ *
+ * This thread manage the kconsumerd error sent
+ * back to the session daemon.
+ */
+static void *thread_manage_kconsumerd(void *data)
+{
+ int sock, ret;
+ enum lttcomm_return_code code;
+
+ DBG("[thread] Manage kconsumerd started");
+
+ ret = lttcomm_listen_unix_sock(kconsumerd_err_sock);
+ if (ret < 0) {
+ goto error;
+ }
+
+ sock = lttcomm_accept_unix_sock(kconsumerd_err_sock);
+ if (sock < 0) {
+ goto error;
+ }
+
+ /* Getting status code from kconsumerd */
+ ret = lttcomm_recv_unix_sock(sock, &code, sizeof(enum lttcomm_return_code));
+ if (ret <= 0) {
+ goto error;
+ }
+
+ if (code == KCONSUMERD_COMMAND_SOCK_READY) {
+ kconsumerd_cmd_sock = lttcomm_connect_unix_sock(kconsumerd_cmd_unix_sock_path);
+ if (kconsumerd_cmd_sock < 0) {
+ sem_post(&kconsumerd_sem);
+ perror("kconsumerd connect");
+ goto error;
+ }
+ /* Signal condition to tell that the kconsumerd is ready */
+ sem_post(&kconsumerd_sem);
+ DBG("Kconsumerd command socket ready");
+ } else {
+ DBG("Kconsumerd error when waiting for SOCK_READY : %s",
+ lttcomm_get_readable_code(-code));
+ goto error;
+ }
+
+ /* Wait for any kconsumerd error */
+ ret = lttcomm_recv_unix_sock(sock, &code, sizeof(enum lttcomm_return_code));
+ if (ret <= 0) {
+ ERR("Kconsumerd closed the command socket");
+ goto error;
+ }
+
+ ERR("Kconsumerd return code : %s", lttcomm_get_readable_code(-code));
+
+error:
+ kconsumerd_pid = 0;
+ DBG("Kconsumerd thread dying");
+ return NULL;
+}
+
+/*
+ * thread_manage_apps
+ *
+ * This thread manage the application socket communication
+ */
+static void *thread_manage_apps(void *data)
+{
+ int sock, ret;
+
+ /* TODO: Something more elegant is needed but fine for now */
+ /* FIXME: change all types to either uint8_t, uint32_t, uint64_t
+ * for 32-bit vs 64-bit compat processes. */
+ /* replicate in ust with version number */
+ struct {
+ int reg; /* 1:register, 0:unregister */
+ pid_t pid;
+ uid_t uid;
+ } reg_msg;
+
+ DBG("[thread] Manage apps started");
+
+ ret = lttcomm_listen_unix_sock(apps_sock);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Notify all applications to register */
+ notify_apps(default_global_apps_pipe);
+
+ while (1) {
+ DBG("Accepting application registration");
+ /* Blocking call, waiting for transmission */
+ sock = lttcomm_accept_unix_sock(apps_sock);
+ if (sock < 0) {
+ goto error;
+ }
+
+ /* Basic recv here to handle the very simple data
+ * that the libust send to register (reg_msg).
+ */
+ ret = recv(sock, ®_msg, sizeof(reg_msg), 0);
+ if (ret < 0) {
+ perror("recv");
+ continue;
+ }
+
+ /* Add application to the global traceable list */
+ if (reg_msg.reg == 1) {
+ /* Registering */
+ ret = register_traceable_app(reg_msg.pid, reg_msg.uid);
+ if (ret < 0) {
+ /* register_traceable_app only return an error with
+ * ENOMEM. At this point, we better stop everything.
+ */
+ goto error;
+ }
+ } else {
+ /* Unregistering */
+ unregister_traceable_app(reg_msg.pid);
+ }
+ }
+
+error:
+
+ return NULL;
+}
+
+/*
+ * spawn_kconsumerd_thread
+ *
+ * Start the thread_manage_kconsumerd. This must be done after a kconsumerd
+ * exec or it will fails.
+ */
+static int spawn_kconsumerd_thread(void)
+{
+ int ret;
+
+ /* Setup semaphore */
+ sem_init(&kconsumerd_sem, 0, 0);
+
+ ret = pthread_create(&kconsumerd_thread, NULL, thread_manage_kconsumerd, (void *) NULL);
+ if (ret != 0) {
+ perror("pthread_create kconsumerd");
+ goto error;
+ }
+
+ /* Wait for the kconsumerd thread to be ready */
+ sem_wait(&kconsumerd_sem);
+
+ if (kconsumerd_pid == 0) {
+ ERR("Kconsumerd did not start");
+ goto error;
+ }
+
+ return 0;
+
+error:
+ ret = LTTCOMM_KERN_CONSUMER_FAIL;
+ return ret;
+}
+
+/*
+ * spawn_kconsumerd
+ *
+ * Fork and exec a kernel consumer daemon (kconsumerd).
+ *
+ * NOTE: It is very important to fork a kconsumerd BEFORE opening any kernel
+ * file descriptor using the libkernelctl or kernel-ctl functions. So, a
+ * kernel consumer MUST only be spawned before creating a kernel session.
+ *
+ * Return pid if successful else -1.
+ */
+static pid_t spawn_kconsumerd(void)
+{
+ int ret;
+ pid_t pid;
+
+ DBG("Spawning kconsumerd");
+
+ pid = fork();
+ if (pid == 0) {
+ /*
+ * Exec kconsumerd.
+ */
+ execlp("kconsumerd", "kconsumerd", "--quiet", NULL);
+ if (errno != 0) {
+ perror("kernel start consumer exec");
+ }
+ exit(EXIT_FAILURE);
+ } else if (pid > 0) {
+ ret = pid;
+ goto error;
+ } else {
+ perror("kernel start consumer fork");
+ ret = -errno;
+ goto error;
+ }
+
+error:
+ return ret;
+}
+
+/*
+ * start_kconsumerd
+ *
+ * Spawn the kconsumerd daemon and session daemon thread.
+ */
+static int start_kconsumerd(void)
+{
+ int ret;
+
+ pthread_mutex_lock(&kconsumerd_pid_mutex);
+ if (kconsumerd_pid != 0) {
+ goto end;
+ }
+
+ ret = spawn_kconsumerd();
+ if (ret < 0) {
+ ERR("Spawning kconsumerd failed");
+ ret = LTTCOMM_KERN_CONSUMER_FAIL;
+ pthread_mutex_unlock(&kconsumerd_pid_mutex);
+ goto error;
+ }
+
+ /* Setting up the global kconsumerd_pid */
+ kconsumerd_pid = ret;
+ pthread_mutex_unlock(&kconsumerd_pid_mutex);
+
+ DBG("Kconsumerd pid %d", ret);
+
+ DBG("Spawning kconsumerd thread");
+ ret = spawn_kconsumerd_thread();
+ if (ret < 0) {
+ ERR("Fatal error spawning kconsumerd thread");
+ goto error;
+ }
+
+end:
+ pthread_mutex_unlock(&kconsumerd_pid_mutex);
+ return 0;
+
+error:
+ return ret;
+}
+
+/*
+ * send_kconsumerd_fds
+ *
+ * Send all stream fds of the kernel session to the consumer.
+ */
+static int send_kconsumerd_fds(int sock, struct ltt_kernel_session *session)
+{
+ int ret;
+ size_t nb_fd;
+ struct ltt_kernel_stream *stream;
+ struct ltt_kernel_channel *chan;
+ struct lttcomm_kconsumerd_header lkh;
+ struct lttcomm_kconsumerd_msg lkm;
+
+ nb_fd = session->stream_count_global;
+
+ /* Setup header */
+ lkh.payload_size = (nb_fd + 1) * sizeof(struct lttcomm_kconsumerd_msg);
+ lkh.cmd_type = ADD_STREAM;
+
+ DBG("Sending kconsumerd header");
+
+ ret = lttcomm_send_unix_sock(sock, &lkh, sizeof(struct lttcomm_kconsumerd_header));
+ if (ret < 0) {
+ perror("send kconsumerd header");
+ goto error;
+ }
+
+ DBG("Sending metadata stream fd");
+
+ /* Send metadata stream fd first */
+ lkm.fd = session->metadata_stream_fd;
+ lkm.state = ACTIVE_FD;
+ lkm.max_sb_size = session->metadata->conf->subbuf_size;
+ strncpy(lkm.path_name, session->metadata->pathname, PATH_MAX);
+
+ ret = lttcomm_send_fds_unix_sock(sock, &lkm, &lkm.fd, 1, sizeof(lkm));
+ if (ret < 0) {
+ perror("send kconsumerd fd");
+ goto error;
+ }
+
+ cds_list_for_each_entry(chan, &session->channel_list.head, list) {
+ cds_list_for_each_entry(stream, &chan->stream_list.head, list) {
+ lkm.fd = stream->fd;
+ lkm.state = stream->state;
+ lkm.max_sb_size = chan->channel->subbuf_size;
+ strncpy(lkm.path_name, stream->pathname, PATH_MAX);
+
+ DBG("Sending fd %d to kconsumerd", lkm.fd);
+
+ ret = lttcomm_send_fds_unix_sock(sock, &lkm, &lkm.fd, 1, sizeof(lkm));
+ if (ret < 0) {
+ perror("send kconsumerd fd");
+ goto error;
+ }
+ }
+ }
+
+ DBG("Kconsumerd fds sent");
+
+ return 0;
+
+error:
+ return ret;
+}
+
+/*
+ * create_trace_dir
+ *
+ * Create the trace output directory.
+ */
+static int create_trace_dir(struct ltt_kernel_session *session)
+{
+ int ret;
+ struct ltt_kernel_channel *chan;
+
+ /* Create all channel directories */
+ cds_list_for_each_entry(chan, &session->channel_list.head, list) {
+ DBG("Creating trace directory at %s", chan->pathname);
+ ret = mkdir(chan->pathname, S_IRWXU | S_IRWXG );
+ if (ret < 0) {
+ perror("mkdir trace path");
+ ret = -errno;
+ goto error;
+ }
+ }
+
+ return 0;
+
+error:
+ return ret;
+}
+