Cleanup: Move `create_posix_shm()` to common/shm.c
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index ce2e17f742eae728649ebdfd8632565eae160ac6..aede53658736e2037d8f90bb53b050c4bb2c2997 100644 (file)
@@ -37,6 +37,7 @@
 #include <common/utils.h>
 #include <common/index/index.h>
 #include <common/consumer/consumer.h>
+#include <common/shm.h>
 #include <common/optional.h>
 
 #include "ust-consumer.h"
@@ -352,48 +353,6 @@ error_alloc:
        return ret;
 }
 
-/*
- * create_posix_shm is never called concurrently within a process.
- */
-static
-int create_posix_shm(void)
-{
-       char tmp_name[NAME_MAX];
-       int shmfd, ret;
-
-       ret = snprintf(tmp_name, NAME_MAX, "/ust-shm-consumer-%d", getpid());
-       if (ret < 0) {
-               PERROR("snprintf");
-               return -1;
-       }
-       /*
-        * Allocate shm, and immediately unlink its shm oject, keeping
-        * only the file descriptor as a reference to the object.
-        * We specifically do _not_ use the / at the beginning of the
-        * pathname so that some OS implementations can keep it local to
-        * the process (POSIX leaves this implementation-defined).
-        */
-       shmfd = shm_open(tmp_name, O_CREAT | O_EXCL | O_RDWR, 0700);
-       if (shmfd < 0) {
-               PERROR("shm_open");
-               goto error_shm_open;
-       }
-       ret = shm_unlink(tmp_name);
-       if (ret < 0 && errno != ENOENT) {
-               PERROR("shm_unlink");
-               goto error_shm_release;
-       }
-       return shmfd;
-
-error_shm_release:
-       ret = close(shmfd);
-       if (ret) {
-               PERROR("close");
-       }
-error_shm_open:
-       return -1;
-}
-
 static int open_ust_stream_fd(struct lttng_consumer_channel *channel, int cpu,
                const struct lttng_credentials *session_credentials)
 {
@@ -401,7 +360,7 @@ static int open_ust_stream_fd(struct lttng_consumer_channel *channel, int cpu,
        int ret;
 
        if (!channel->shm_path[0]) {
-               return create_posix_shm();
+               return shm_create_anonymous("ust-consumer");
        }
        ret = get_stream_shm_path(shm_path, channel->shm_path, cpu);
        if (ret) {
@@ -409,7 +368,8 @@ static int open_ust_stream_fd(struct lttng_consumer_channel *channel, int cpu,
        }
        return run_as_open(shm_path,
                O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR,
-               session_credentials->uid, session_credentials->gid);
+               lttng_credentials_get_uid(session_credentials),
+               lttng_credentials_get_gid(session_credentials));
 
 error_shm_path:
        return -1;
@@ -487,8 +447,10 @@ error_open:
                                ERR("Cannot get stream shm path");
                        }
                        closeret = run_as_unlink(shm_path,
-                                       channel->buffer_credentials.value.uid,
-                                       channel->buffer_credentials.value.gid);
+                                       lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(
+                                                       channel->buffer_credentials)),
+                                       lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(
+                                                       channel->buffer_credentials)));
                        if (closeret) {
                                PERROR("unlink %s", shm_path);
                        }
@@ -497,8 +459,10 @@ error_open:
        /* Try to rmdir all directories under shm_path root. */
        if (channel->root_shm_path[0]) {
                (void) run_as_rmdir_recursive(channel->root_shm_path,
-                               channel->buffer_credentials.value.uid,
-                               channel->buffer_credentials.value.gid,
+                               lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(
+                                               channel->buffer_credentials)),
+                               lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(
+                                               channel->buffer_credentials)),
                                LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG);
        }
        free(stream_fds);
@@ -1278,6 +1242,17 @@ error_unlock:
        return ret;
 }
 
+static
+void metadata_stream_reset_cache_consumed_position(
+               struct lttng_consumer_stream *stream)
+{
+       ASSERT_LOCKED(stream->lock);
+
+       DBG("Reset metadata cache of session %" PRIu64,
+                       stream->chan->session_id);
+       stream->ust_metadata_pushed = 0;
+}
+
 /*
  * Receive the metadata updates from the sessiond. Supports receiving
  * overlapping metadata, but is needs to always belong to a contiguous
@@ -1292,6 +1267,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
 {
        int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        char *metadata_str;
+       enum consumer_metadata_cache_write_status cache_write_status;
 
        DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len);
 
@@ -1315,9 +1291,41 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
        health_code_update();
 
        pthread_mutex_lock(&channel->metadata_cache->lock);
-       ret = consumer_metadata_cache_write(channel, offset, len, version,
+       cache_write_status = consumer_metadata_cache_write(
+                       channel->metadata_cache, offset, len, version,
                        metadata_str);
-       if (ret < 0) {
+       pthread_mutex_unlock(&channel->metadata_cache->lock);
+       switch (cache_write_status) {
+       case CONSUMER_METADATA_CACHE_WRITE_STATUS_NO_CHANGE:
+               /*
+                * The write entirely overlapped with existing contents of the
+                * same metadata version (same content); there is nothing to do.
+                */
+               break;
+       case CONSUMER_METADATA_CACHE_WRITE_STATUS_INVALIDATED:
+               /*
+                * The metadata cache was invalidated (previously pushed
+                * content has been overwritten). Reset the stream's consumed
+                * metadata position to ensure the metadata poll thread consumes
+                * the whole cache.
+                */
+               pthread_mutex_lock(&channel->metadata_stream->lock);
+               metadata_stream_reset_cache_consumed_position(
+                               channel->metadata_stream);
+               pthread_mutex_unlock(&channel->metadata_stream->lock);
+               /* Fall-through. */
+       case CONSUMER_METADATA_CACHE_WRITE_STATUS_APPENDED_CONTENT:
+               /*
+                * In both cases, the metadata poll thread has new data to
+                * consume.
+                */
+               ret = consumer_metadata_wakeup_pipe(channel);
+               if (ret) {
+                       ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
+                       goto end_free;
+               }
+               break;
+       case CONSUMER_METADATA_CACHE_WRITE_STATUS_ERROR:
                /* Unable to handle metadata. Notify session daemon. */
                ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
                /*
@@ -1325,10 +1333,10 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
                 * not have been updated which could create an infinite loop below when
                 * waiting for the metadata cache to be flushed.
                 */
-               pthread_mutex_unlock(&channel->metadata_cache->lock);
                goto end_free;
+       default:
+               abort();
        }
-       pthread_mutex_unlock(&channel->metadata_cache->lock);
 
        if (!wait) {
                goto end_free;
@@ -1461,8 +1469,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                struct ustctl_consumer_channel_attr attr;
                const uint64_t chunk_id = msg.u.ask_channel.chunk_id.value;
                const struct lttng_credentials buffer_credentials = {
-                       .uid = msg.u.ask_channel.buffer_credentials.uid,
-                       .gid = msg.u.ask_channel.buffer_credentials.gid,
+                       .uid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.ask_channel.buffer_credentials.uid),
+                       .gid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.ask_channel.buffer_credentials.gid),
                };
 
                /* Create a plain object and reserve a channel key. */
@@ -1511,15 +1519,15 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                switch (msg.u.ask_channel.output) {
                case LTTNG_EVENT_MMAP:
                default:
-                       attr.output = LTTNG_UST_MMAP;
+                       attr.output = LTTNG_UST_ABI_MMAP;
                        break;
                }
 
                /* Translate and save channel type. */
                switch (msg.u.ask_channel.type) {
-               case LTTNG_UST_CHAN_PER_CPU:
+               case LTTNG_UST_ABI_CHAN_PER_CPU:
                        channel->type = CONSUMER_CHANNEL_TYPE_DATA;
-                       attr.type = LTTNG_UST_CHAN_PER_CPU;
+                       attr.type = LTTNG_UST_ABI_CHAN_PER_CPU;
                        /*
                         * Set refcount to 1 for owner. Below, we will
                         * pass ownership to the
@@ -1527,9 +1535,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                         */
                        channel->refcount = 1;
                        break;
-               case LTTNG_UST_CHAN_METADATA:
+               case LTTNG_UST_ABI_CHAN_METADATA:
                        channel->type = CONSUMER_CHANNEL_TYPE_METADATA;
-                       attr.type = LTTNG_UST_CHAN_METADATA;
+                       attr.type = LTTNG_UST_ABI_CHAN_METADATA;
                        break;
                default:
                        assert(0);
@@ -1543,7 +1551,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto end_channel_error;
                }
 
-               if (msg.u.ask_channel.type == LTTNG_UST_CHAN_METADATA) {
+               if (msg.u.ask_channel.type == LTTNG_UST_ABI_CHAN_METADATA) {
                        ret = consumer_metadata_cache_allocate(channel);
                        if (ret < 0) {
                                ERR("Allocating metadata cache");
@@ -1576,7 +1584,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                 */
                ret = add_channel(channel, ctx);
                if (ret < 0) {
-                       if (msg.u.ask_channel.type == LTTNG_UST_CHAN_METADATA) {
+                       if (msg.u.ask_channel.type == LTTNG_UST_ABI_CHAN_METADATA) {
                                if (channel->switch_timer_enabled == 1) {
                                        consumer_timer_switch_stop(channel);
                                }
@@ -2072,8 +2080,8 @@ end_rotate_channel_nosignal:
        case LTTNG_CONSUMER_CREATE_TRACE_CHUNK:
        {
                const struct lttng_credentials credentials = {
-                       .uid = msg.u.create_trace_chunk.credentials.value.uid,
-                       .gid = msg.u.create_trace_chunk.credentials.value.gid,
+                       .uid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.uid),
+                       .gid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.gid),
                };
                const bool is_local_trace =
                                !msg.u.create_trace_chunk.relayd_id.is_set;
@@ -2181,6 +2189,28 @@ end_rotate_channel_nosignal:
                                msg.u.trace_chunk_exists.chunk_id);
                goto end_msg_sessiond;
        }
+       case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS:
+       {
+               const uint64_t key = msg.u.open_channel_packets.key;
+               struct lttng_consumer_channel *channel =
+                               consumer_find_channel(key);
+
+               if (channel) {
+                       pthread_mutex_lock(&channel->lock);
+                       ret_code = lttng_consumer_open_channel_packets(channel);
+                       pthread_mutex_unlock(&channel->lock);
+               } else {
+                       /*
+                        * The channel could have disappeared in per-pid
+                        * buffering mode.
+                        */
+                       DBG("Channel %" PRIu64 " not found", key);
+                       ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+               }
+
+               health_code_update();
+               goto end_msg_sessiond;
+       }
        default:
                break;
        }
@@ -2380,8 +2410,10 @@ void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
                                ERR("Cannot get stream shm path");
                        }
                        ret = run_as_unlink(shm_path,
-                                       chan->buffer_credentials.value.uid,
-                                       chan->buffer_credentials.value.gid);
+                                       lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(
+                                                       chan->buffer_credentials)),
+                                       lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(
+                                                       chan->buffer_credentials)));
                        if (ret) {
                                PERROR("unlink %s", shm_path);
                        }
@@ -2400,8 +2432,10 @@ void lttng_ustconsumer_free_channel(struct lttng_consumer_channel *chan)
        /* Try to rmdir all directories under shm_path root. */
        if (chan->root_shm_path[0]) {
                (void) run_as_rmdir_recursive(chan->root_shm_path,
-                               chan->buffer_credentials.value.uid,
-                               chan->buffer_credentials.value.gid,
+                               lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(
+                                               chan->buffer_credentials)),
+                               lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(
+                                               chan->buffer_credentials)),
                                LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG);
        }
        free(chan->stream_fds);
@@ -2434,15 +2468,6 @@ int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream)
        return ustctl_stream_close_wakeup_fd(stream->ustream);
 }
 
-static
-void metadata_stream_reset_cache_consumed_position(
-               struct lttng_consumer_stream *stream)
-{
-       DBG("Reset metadata cache of session %" PRIu64,
-                       stream->chan->session_id);
-       stream->ust_metadata_pushed = 0;
-}
-
 /*
  * Write up to one packet from the metadata cache to the channel.
  *
@@ -2456,8 +2481,8 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
        int ret;
 
        pthread_mutex_lock(&stream->chan->metadata_cache->lock);
-       if (stream->chan->metadata_cache->max_offset ==
-           stream->ust_metadata_pushed) {
+       if (stream->chan->metadata_cache->contents.size ==
+                       stream->ust_metadata_pushed) {
                /*
                 * In the context of a user space metadata channel, a
                 * change in version can be detected in two ways:
@@ -2494,9 +2519,9 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
        }
 
        write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan,
-                       &stream->chan->metadata_cache->data[stream->ust_metadata_pushed],
-                       stream->chan->metadata_cache->max_offset
-                       - stream->ust_metadata_pushed);
+                       &stream->chan->metadata_cache->contents.data[stream->ust_metadata_pushed],
+                       stream->chan->metadata_cache->contents.size -
+                                       stream->ust_metadata_pushed);
        assert(write_len != 0);
        if (write_len < 0) {
                ERR("Writing one metadata packet");
@@ -2505,7 +2530,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
        }
        stream->ust_metadata_pushed += write_len;
 
-       assert(stream->chan->metadata_cache->max_offset >=
+       assert(stream->chan->metadata_cache->contents.size >=
                        stream->ust_metadata_pushed);
        ret = write_len;
 
@@ -2873,8 +2898,8 @@ static int get_next_subbuffer_metadata(struct lttng_consumer_stream *stream,
                        }
                } else {
                        pthread_mutex_lock(&stream->chan->metadata_cache->lock);
-                       cache_empty = stream->chan->metadata_cache->max_offset ==
-                                     stream->ust_metadata_pushed;
+                       cache_empty = stream->chan->metadata_cache->contents.size ==
+                                       stream->ust_metadata_pushed;
                        pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
                }
        } while (!got_subbuffer);
@@ -2935,6 +2960,7 @@ static int put_next_subbuffer(struct lttng_consumer_stream *stream,
 static int signal_metadata(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
+       ASSERT_LOCKED(stream->metadata_rdv_lock);
        return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;
 }
 
@@ -3020,6 +3046,7 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
 
        assert(stream);
        assert(stream->ustream);
+       ASSERT_LOCKED(stream->lock);
 
        DBG("UST consumer checking data pending");
 
@@ -3032,7 +3059,9 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
                uint64_t contiguous, pushed;
 
                /* Ease our life a bit. */
-               contiguous = stream->chan->metadata_cache->max_offset;
+               pthread_mutex_lock(&stream->chan->metadata_cache->lock);
+               contiguous = stream->chan->metadata_cache->contents.size;
+               pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
                pushed = stream->ust_metadata_pushed;
 
                /*
This page took 0.028687 seconds and 4 git commands to generate.