Fix: free metadata cache after grace period in consumer
[lttng-tools.git] / src / common / consumer.c
index 821a04e3d73d80025e76ad99f1cb0558725c7188..f2ccf9536b147b72574638046135da3e425b771a 100644 (file)
@@ -18,6 +18,7 @@
  */
 
 #define _GNU_SOURCE
+#define _LGPL_SOURCE
 #include <assert.h>
 #include <poll.h>
 #include <pthread.h>
@@ -34,6 +35,7 @@
 #include <common/common.h>
 #include <common/utils.h>
 #include <common/compat/poll.h>
+#include <common/compat/endian.h>
 #include <common/index/index.h>
 #include <common/kernel-ctl/kernel-ctl.h>
 #include <common/sessiond-comm/relayd.h>
@@ -46,6 +48,7 @@
 #include "consumer.h"
 #include "consumer-stream.h"
 #include "consumer-testpoint.h"
+#include "align.h"
 
 struct lttng_consumer_global_data consumer_data = {
        .stream_count = 0,
@@ -283,6 +286,17 @@ static void free_channel_rcu(struct rcu_head *head)
        struct lttng_consumer_channel *channel =
                caa_container_of(node, struct lttng_consumer_channel, node);
 
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               lttng_ustconsumer_free_channel(channel);
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               abort();
+       }
        free(channel);
 }
 
@@ -560,6 +574,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
        stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
        stream->index_fd = -1;
        pthread_mutex_init(&stream->lock, NULL);
+       pthread_mutex_init(&stream->metadata_timer_lock, NULL);
 
        /* If channel is the metadata, flag this stream as metadata. */
        if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
@@ -933,7 +948,9 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                uint64_t tracefile_count,
                uint64_t session_id_per_pid,
                unsigned int monitor,
-               unsigned int live_timer_interval)
+               unsigned int live_timer_interval,
+               const char *root_shm_path,
+               const char *shm_path)
 {
        struct lttng_consumer_channel *channel;
 
@@ -990,6 +1007,15 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        strncpy(channel->name, name, sizeof(channel->name));
        channel->name[sizeof(channel->name) - 1] = '\0';
 
+       if (root_shm_path) {
+               strncpy(channel->root_shm_path, root_shm_path, sizeof(channel->root_shm_path));
+               channel->root_shm_path[sizeof(channel->root_shm_path) - 1] = '\0';
+       }
+       if (shm_path) {
+               strncpy(channel->shm_path, shm_path, sizeof(channel->shm_path));
+               channel->shm_path[sizeof(channel->shm_path) - 1] = '\0';
+       }
+
        lttng_ht_node_init_u64(&channel->node, channel->key);
 
        channel->wait_fd = -1;
@@ -1091,12 +1117,15 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
         */
        (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
        (*pollfd)[i].events = POLLIN | POLLPRI;
+
+       (*pollfd)[i + 1].fd = lttng_pipe_get_readfd(ctx->consumer_wakeup_pipe);
+       (*pollfd)[i + 1].events = POLLIN | POLLPRI;
        return i;
 }
 
 /*
- * Poll on the should_quit pipe and the command socket return -1 on error and
- * should exit, 0 if data is available on the command socket
+ * Poll on the should_quit pipe and the command socket return -1 on
+ * error, 1 if should exit, 0 if data is available on the command socket
  */
 int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
 {
@@ -1112,16 +1141,13 @@ restart:
                        goto restart;
                }
                PERROR("Poll error");
-               goto exit;
+               return -1;
        }
        if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
                DBG("consumer_should_quit wake up");
-               goto exit;
+               return 1;
        }
        return 0;
-
-exit:
-       return -1;
 }
 
 /*
@@ -1290,18 +1316,17 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                goto error_poll_pipe;
        }
 
+       ctx->consumer_wakeup_pipe = lttng_pipe_open(0);
+       if (!ctx->consumer_wakeup_pipe) {
+               goto error_wakeup_pipe;
+       }
+
        ret = pipe(ctx->consumer_should_quit);
        if (ret < 0) {
                PERROR("Error creating recv pipe");
                goto error_quit_pipe;
        }
 
-       ret = pipe(ctx->consumer_thread_pipe);
-       if (ret < 0) {
-               PERROR("Error creating thread pipe");
-               goto error_thread_pipe;
-       }
-
        ret = pipe(ctx->consumer_channel_pipe);
        if (ret < 0) {
                PERROR("Error creating channel pipe");
@@ -1313,22 +1338,15 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                goto error_metadata_pipe;
        }
 
-       ret = utils_create_pipe(ctx->consumer_splice_metadata_pipe);
-       if (ret < 0) {
-               goto error_splice_pipe;
-       }
-
        return ctx;
 
-error_splice_pipe:
-       lttng_pipe_destroy(ctx->consumer_metadata_pipe);
 error_metadata_pipe:
        utils_close_pipe(ctx->consumer_channel_pipe);
 error_channel_pipe:
-       utils_close_pipe(ctx->consumer_thread_pipe);
-error_thread_pipe:
        utils_close_pipe(ctx->consumer_should_quit);
 error_quit_pipe:
+       lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
+error_wakeup_pipe:
        lttng_pipe_destroy(ctx->consumer_data_pipe);
 error_poll_pipe:
        free(ctx);
@@ -1396,6 +1414,10 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
 
        DBG("Consumer destroying it. Closing everything.");
 
+       if (!ctx) {
+               return;
+       }
+
        destroy_data_stream_ht(data_ht);
        destroy_metadata_stream_ht(metadata_ht);
 
@@ -1407,12 +1429,11 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
        if (ret) {
                PERROR("close");
        }
-       utils_close_pipe(ctx->consumer_thread_pipe);
        utils_close_pipe(ctx->consumer_channel_pipe);
        lttng_pipe_destroy(ctx->consumer_data_pipe);
        lttng_pipe_destroy(ctx->consumer_metadata_pipe);
+       lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
        utils_close_pipe(ctx->consumer_should_quit);
-       utils_close_pipe(ctx->consumer_splice_metadata_pipe);
 
        unlink(ctx->consumer_command_sock_path);
        free(ctx);
@@ -1433,7 +1454,7 @@ static int write_relayd_metadata_id(int fd,
        ret = lttng_write(fd, (void *) &hdr, sizeof(hdr));
        if (ret < sizeof(hdr)) {
                /*
-                * This error means that the fd's end is closed so ignore the perror
+                * This error means that the fd's end is closed so ignore the PERROR
                 * not to clubber the error output since this can happen in a normal
                 * code path.
                 */
@@ -1705,17 +1726,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                        goto end;
                }
        }
-
-       /*
-        * Choose right pipe for splice. Metadata and trace data are handled by
-        * different threads hence the use of two pipes in order not to race or
-        * corrupt the written data.
-        */
-       if (stream->metadata_flag) {
-               splice_pipe = ctx->consumer_splice_metadata_pipe;
-       } else {
-               splice_pipe = ctx->consumer_thread_pipe;
-       }
+       splice_pipe = stream->splice_pipe;
 
        /* Write metadata stream id before payload */
        if (relayd) {
@@ -1821,7 +1832,8 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                /* Splice data out */
                ret_splice = splice(splice_pipe[0], NULL, outfd, NULL,
                                ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
-               DBG("Consumer splice pipe to file, ret %zd", ret_splice);
+               DBG("Consumer splice pipe to file (out_fd: %d), ret %zd",
+                               outfd, ret_splice);
                if (ret_splice < 0) {
                        ret = errno;
                        written = -ret;
@@ -2193,18 +2205,13 @@ void *consumer_thread_metadata_poll(void *data)
        DBG("Metadata main loop started");
 
        while (1) {
-               health_code_update();
-
-               /* Only the metadata pipe is set */
-               if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
-                       err = 0;        /* All is OK */
-                       goto end;
-               }
-
 restart:
-               DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
+               health_code_update();
                health_poll_entry();
+               DBG("Metadata poll wait");
                ret = lttng_poll_wait(&events, -1);
+               DBG("Metadata poll return from wait with %d fd(s)",
+                               LTTNG_POLL_GETNB(&events));
                health_poll_exit();
                DBG("Metadata event catched in thread");
                if (ret < 0) {
@@ -2212,7 +2219,10 @@ restart:
                                ERR("Poll EINTR catched");
                                goto restart;
                        }
-                       goto error;
+                       if (LTTNG_POLL_GETNB(&events) == 0) {
+                               err = 0;        /* All is OK */
+                       }
+                       goto end;
                }
 
                nb_fd = ret;
@@ -2224,6 +2234,11 @@ restart:
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
+                       if (!revents) {
+                               /* No activity for this FD (poll implementation). */
+                               continue;
+                       }
+
                        if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
                                if (revents & (LPOLLERR | LPOLLHUP )) {
                                        DBG("Metadata thread pipe hung up");
@@ -2340,7 +2355,6 @@ restart:
 
        /* All is OK */
        err = 0;
-error:
 end:
        DBG("Metadata poll thread exiting");
 
@@ -2405,16 +2419,18 @@ void *consumer_thread_data_poll(void *data)
                        free(local_stream);
                        local_stream = NULL;
 
-                       /* allocate for all fds + 1 for the consumer_data_pipe */
-                       pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
+                       /*
+                        * Allocate for all fds +1 for the consumer_data_pipe and +1 for
+                        * wake up pipe.
+                        */
+                       pollfd = zmalloc((consumer_data.stream_count + 2) * sizeof(struct pollfd));
                        if (pollfd == NULL) {
                                PERROR("pollfd malloc");
                                pthread_mutex_unlock(&consumer_data.lock);
                                goto end;
                        }
 
-                       /* allocate for all fds + 1 for the consumer_data_pipe */
-                       local_stream = zmalloc((consumer_data.stream_count + 1) *
+                       local_stream = zmalloc((consumer_data.stream_count + 2) *
                                        sizeof(struct lttng_consumer_stream *));
                        if (local_stream == NULL) {
                                PERROR("local_stream malloc");
@@ -2441,9 +2457,9 @@ void *consumer_thread_data_poll(void *data)
                }
                /* poll on the array of fds */
        restart:
-               DBG("polling on %d fd", nb_fd + 1);
+               DBG("polling on %d fd", nb_fd + 2);
                health_poll_entry();
-               num_rdy = poll(pollfd, nb_fd + 1, -1);
+               num_rdy = poll(pollfd, nb_fd + 2, -1);
                health_poll_exit();
                DBG("poll num_rdy : %d", num_rdy);
                if (num_rdy == -1) {
@@ -2492,6 +2508,20 @@ void *consumer_thread_data_poll(void *data)
                        continue;
                }
 
+               /* Handle wakeup pipe. */
+               if (pollfd[nb_fd + 1].revents & (POLLIN | POLLPRI)) {
+                       char dummy;
+                       ssize_t pipe_readlen;
+
+                       pipe_readlen = lttng_pipe_read(ctx->consumer_wakeup_pipe, &dummy,
+                                       sizeof(dummy));
+                       if (pipe_readlen < 0) {
+                               PERROR("Consumer data wakeup pipe");
+                       }
+                       /* We've been awakened to handle stream(s). */
+                       ctx->has_wakeup = 0;
+               }
+
                /* Take care of high priority channels first. */
                for (i = 0; i < nb_fd; i++) {
                        health_code_update();
@@ -2530,7 +2560,8 @@ void *consumer_thread_data_poll(void *data)
                                continue;
                        }
                        if ((pollfd[i].revents & POLLIN) ||
-                                       local_stream[i]->hangup_flush_done) {
+                                       local_stream[i]->hangup_flush_done ||
+                                       local_stream[i]->has_data) {
                                DBG("Normal read on fd %d", pollfd[i].fd);
                                len = ctx->on_buffer_ready(local_stream[i], ctx);
                                /* it's ok to have an unavailable sub-buffer */
@@ -2746,18 +2777,13 @@ void *consumer_thread_channel_poll(void *data)
        DBG("Channel main loop started");
 
        while (1) {
-               health_code_update();
-
-               /* Only the channel pipe is set */
-               if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
-                       err = 0;        /* All is OK */
-                       goto end;
-               }
-
 restart:
-               DBG("Channel poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
+               health_code_update();
+               DBG("Channel poll wait");
                health_poll_entry();
                ret = lttng_poll_wait(&events, -1);
+               DBG("Channel poll return from wait with %d fd(s)",
+                               LTTNG_POLL_GETNB(&events));
                health_poll_exit();
                DBG("Channel event catched in thread");
                if (ret < 0) {
@@ -2765,6 +2791,9 @@ restart:
                                ERR("Poll EINTR catched");
                                goto restart;
                        }
+                       if (LTTNG_POLL_GETNB(&events) == 0) {
+                               err = 0;        /* All is OK */
+                       }
                        goto end;
                }
 
@@ -2777,10 +2806,11 @@ restart:
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
-                       /* Just don't waste time if no returned events for the fd */
                        if (!revents) {
+                               /* No activity for this FD (poll implementation). */
                                continue;
                        }
+
                        if (pollfd == ctx->consumer_channel_pipe[0]) {
                                if (revents & (LPOLLERR | LPOLLHUP)) {
                                        DBG("Channel thread pipe hung up");
@@ -2944,8 +2974,8 @@ static int set_metadata_socket(struct lttng_consumer_local_data *ctx,
        assert(ctx);
        assert(sockpoll);
 
-       if (lttng_consumer_poll_socket(sockpoll) < 0) {
-               ret = -1;
+       ret = lttng_consumer_poll_socket(sockpoll);
+       if (ret) {
                goto error;
        }
        DBG("Metadata connection on client_socket");
@@ -3014,7 +3044,12 @@ void *consumer_thread_sessiond_poll(void *data)
        consumer_sockpoll[1].fd = client_socket;
        consumer_sockpoll[1].events = POLLIN | POLLPRI;
 
-       if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+       ret = lttng_consumer_poll_socket(consumer_sockpoll);
+       if (ret) {
+               if (ret > 0) {
+                       /* should exit */
+                       err = 0;
+               }
                goto end;
        }
        DBG("Connection on client_socket");
@@ -3031,7 +3066,11 @@ void *consumer_thread_sessiond_poll(void *data)
         * command unix socket.
         */
        ret = set_metadata_socket(ctx, consumer_sockpoll, client_socket);
-       if (ret < 0) {
+       if (ret) {
+               if (ret > 0) {
+                       /* should exit */
+                       err = 0;
+               }
                goto end;
        }
 
@@ -3052,15 +3091,15 @@ void *consumer_thread_sessiond_poll(void *data)
                health_poll_entry();
                ret = lttng_consumer_poll_socket(consumer_sockpoll);
                health_poll_exit();
-               if (ret < 0) {
+               if (ret) {
+                       if (ret > 0) {
+                               /* should exit */
+                               err = 0;
+                       }
                        goto end;
                }
                DBG("Incoming command on sock");
                ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
-               if (ret == -ENOENT) {
-                       DBG("Received STOP command");
-                       goto end;
-               }
                if (ret <= 0) {
                        /*
                         * This could simply be a session daemon quitting. Don't output
@@ -3278,7 +3317,9 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
        }
 
        /* Poll on consumer socket. */
-       if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+       ret = lttng_consumer_poll_socket(consumer_sockpoll);
+       if (ret) {
+               /* Needing to exit in the middle of a command: error. */
                lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
                ret = -EINTR;
                goto error_nosignal;
@@ -3532,15 +3573,6 @@ int consumer_data_pending(uint64_t id)
                 */
                ret = cds_lfht_is_node_deleted(&stream->node.node);
                if (!ret) {
-                       /*
-                        * An empty output file is not valid. We need at least one packet
-                        * generated per stream, even if it contains no event, so it
-                        * contains at least one packet header.
-                        */
-                       if (stream->output_written == 0) {
-                               pthread_mutex_unlock(&stream->lock);
-                               goto data_pending;
-                       }
                        /* Check the stream if there is data in the buffers. */
                        ret = data_pending(stream);
                        if (ret == 1) {
@@ -3613,6 +3645,7 @@ int consumer_send_status_msg(int sock, int ret_code)
 {
        struct lttcomm_consumer_status_msg msg;
 
+       memset(&msg, 0, sizeof(msg));
        msg.ret_code = ret_code;
 
        return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
@@ -3630,6 +3663,7 @@ int consumer_send_status_channel(int sock,
 
        assert(sock >= 0);
 
+       memset(&msg, 0, sizeof(msg));
        if (!channel) {
                msg.ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
        } else {
@@ -3641,22 +3675,19 @@ int consumer_send_status_channel(int sock,
        return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
 }
 
-/*
- * Using a maximum stream size with the produced and consumed position of a
- * stream, computes the new consumed position to be as close as possible to the
- * maximum possible stream size.
- *
- * If maximum stream size is lower than the possible buffer size (produced -
- * consumed), the consumed_pos given is returned untouched else the new value
- * is returned.
- */
-unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos,
-               unsigned long produced_pos, uint64_t max_stream_size)
+unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
+               unsigned long produced_pos, uint64_t nb_packets_per_stream,
+               uint64_t max_sb_size)
 {
-       if (max_stream_size && max_stream_size < (produced_pos - consumed_pos)) {
-               /* Offset from the produced position to get the latest buffers. */
-               return produced_pos - max_stream_size;
-       }
+       unsigned long start_pos;
 
-       return consumed_pos;
+       if (!nb_packets_per_stream) {
+               return consumed_pos;    /* Grab everything */
+       }
+       start_pos = produced_pos - offset_align_floor(produced_pos, max_sb_size);
+       start_pos -= max_sb_size * nb_packets_per_stream;
+       if ((long) (start_pos - consumed_pos) < 0) {
+               return consumed_pos;    /* Grab everything */
+       }
+       return start_pos;
 }
This page took 0.029701 seconds and 4 git commands to generate.