UST periodical metadata flush
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index 06b59c58442e4a833b3b0947aa3d34244b28a305..431b94626736d042d92db89db0774e7e941cc3e7 100644 (file)
 #include <inttypes.h>
 #include <unistd.h>
 #include <urcu/list.h>
+#include <signal.h>
 
 #include <common/common.h>
 #include <common/sessiond-comm/sessiond-comm.h>
 #include <common/relayd/relayd.h>
 #include <common/compat/fcntl.h>
+#include <common/consumer-metadata-cache.h>
+#include <common/consumer-timer.h>
 
 #include "ust-consumer.h"
 
@@ -530,10 +533,12 @@ error:
 /*
  * Write metadata to the given channel using ustctl to convert the string to
  * the ringbuffer.
+ * Called only from consumer_metadata_cache_write.
+ * The metadata cache lock MUST be acquired to write in the cache.
  *
  * Return 0 on success else a negative value.
  */
-static int push_metadata(struct lttng_consumer_channel *metadata,
+int lttng_ustconsumer_push_metadata(struct lttng_consumer_channel *metadata,
                const char *metadata_str, uint64_t target_offset, uint64_t len)
 {
        int ret;
@@ -543,13 +548,13 @@ static int push_metadata(struct lttng_consumer_channel *metadata,
 
        DBG("UST consumer writing metadata to channel %s", metadata->name);
 
-       assert(target_offset == metadata->contig_metadata_written);
-       ret = ustctl_write_metadata_to_channel(metadata->uchan, metadata_str, len);
+       assert(target_offset <= metadata->metadata_cache->max_offset);
+       ret = ustctl_write_metadata_to_channel(metadata->uchan,
+                       metadata_str + target_offset, len);
        if (ret < 0) {
                ERR("ustctl write metadata fail with ret %d, len %ld", ret, len);
                goto error;
        }
-       metadata->contig_metadata_written += len;
 
        ustctl_flush_buffer(metadata->metadata_stream->ustream, 1);
 
@@ -619,6 +624,11 @@ static int close_metadata(uint64_t chan_key)
                ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
                goto error;
        }
+       if (channel->switch_timer_enabled == 1) {
+               DBG("Deleting timer on metadata channel");
+               consumer_timer_switch_stop(channel);
+       }
+       consumer_metadata_cache_destroy(channel);
 
 error:
        return ret;
@@ -678,6 +688,51 @@ error:
        return ret;
 }
 
+/*
+ * Receive the metadata updates from the sessiond.
+ */
+int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
+               uint64_t len, struct lttng_consumer_channel *channel)
+{
+       int ret, ret_code = LTTNG_OK;
+       char *metadata_str;
+
+       DBG("UST consumer push metadata key %lu of len %lu", key, len);
+
+       metadata_str = zmalloc(len * sizeof(char));
+       if (!metadata_str) {
+               PERROR("zmalloc metadata string");
+               ret_code = LTTCOMM_CONSUMERD_ENOMEM;
+               goto end;
+       }
+
+       /* Receive metadata string. */
+       ret = lttcomm_recv_unix_sock(sock, metadata_str, len);
+       if (ret < 0) {
+               /* Session daemon is dead so return gracefully. */
+               ret_code = ret;
+               goto end_free;
+       }
+
+       pthread_mutex_lock(&channel->metadata_cache->lock);
+       ret = consumer_metadata_cache_write(channel, offset, len, metadata_str);
+       if (ret < 0) {
+               /* Unable to handle metadata. Notify session daemon. */
+               ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
+       }
+       pthread_mutex_unlock(&channel->metadata_cache->lock);
+
+       while (consumer_metadata_cache_flushed(channel, offset + len)) {
+               DBG("Waiting for metadata to be flushed");
+               usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
+       }
+
+end_free:
+       free(metadata_str);
+end:
+       return ret_code;
+}
+
 /*
  * Receive command from session daemon and process it.
  *
@@ -847,6 +902,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto end_channel_error;
                }
 
+
                /*
                 * Channel and streams are now created. Inform the session daemon that
                 * everything went well and should wait to receive the channel and
@@ -861,6 +917,16 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto end_nosignal;
                }
 
+               if (msg.u.ask_channel.type == LTTNG_UST_CHAN_METADATA) {
+                       ret = consumer_metadata_cache_allocate(channel);
+                       if (ret < 0) {
+                               ERR("Allocating metadata cache");
+                               goto end_channel_error;
+                       }
+                       consumer_timer_switch_start(channel, attr.switch_timer_interval);
+                       attr.switch_timer_interval = 0;
+               }
+
                break;
        }
        case LTTNG_CONSUMER_GET_CHANNEL:
@@ -957,10 +1023,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        {
                int ret;
                uint64_t len = msg.u.push_metadata.len;
-               uint64_t target_offset = msg.u.push_metadata.target_offset;
                uint64_t key = msg.u.push_metadata.key;
+               uint64_t offset = msg.u.push_metadata.target_offset;
                struct lttng_consumer_channel *channel;
-               char *metadata_str;
 
                DBG("UST consumer push metadata key %lu of len %lu", key, len);
 
@@ -968,14 +1033,6 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                if (!channel) {
                        ERR("UST consumer push metadata %lu not found", key);
                        ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
-                       goto end_msg_sessiond;
-               }
-
-               metadata_str = zmalloc(len * sizeof(char));
-               if (!metadata_str) {
-                       PERROR("zmalloc metadata string");
-                       ret_code = LTTCOMM_CONSUMERD_ENOMEM;
-                       goto end_msg_sessiond;
                }
 
                /* Tell session daemon we are ready to receive the metadata. */
@@ -990,22 +1047,15 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto end_nosignal;
                }
 
-               /* Receive metadata string. */
-               ret = lttcomm_recv_unix_sock(sock, metadata_str, len);
+               ret = lttng_ustconsumer_recv_metadata(sock, key, offset,
+                               len, channel);
                if (ret < 0) {
-                       /* Session daemon is dead so return gracefully. */
+                       /* error receiving from sessiond */
                        goto end_nosignal;
-               }
-
-               ret = push_metadata(channel, metadata_str, target_offset, len);
-               free(metadata_str);
-               if (ret < 0) {
-                       /* Unable to handle metadata. Notify session daemon. */
-                       ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
+               } else {
+                       ret_code = ret;
                        goto end_msg_sessiond;
                }
-
-               goto end_msg_sessiond;
        }
        case LTTNG_CONSUMER_SETUP_METADATA:
        {
@@ -1223,6 +1273,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        }
        err = ustctl_put_next_subbuf(ustream);
        assert(err == 0);
+
 end:
        return ret;
 }
@@ -1343,3 +1394,96 @@ void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
                ERR("Unable to close wakeup fd");
        }
 }
+
+int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_channel *channel)
+{
+       struct lttcomm_metadata_request_msg request;
+       struct lttcomm_consumer_msg msg;
+       enum lttng_error_code ret_code = LTTNG_OK;
+       uint64_t len, key, offset;
+       int ret;
+
+       assert(channel);
+       assert(channel->metadata_cache);
+
+       /* send the metadata request to sessiond */
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER64_UST:
+               request.bits_per_long = 64;
+               break;
+       case LTTNG_CONSUMER32_UST:
+               request.bits_per_long = 32;
+               break;
+       default:
+               request.bits_per_long = 0;
+               break;
+       }
+
+       request.session_id = channel->session_id;
+       request.uid = channel->uid;
+       request.key = channel->key;
+       DBG("Sending metadata request to sessiond, session %" PRIu64,
+                       channel->session_id);
+
+       ret = lttcomm_send_unix_sock(ctx->consumer_metadata_socket, &request,
+                       sizeof(request));
+       if (ret < 0) {
+               ERR("Asking metadata to sessiond");
+               goto end;
+       }
+
+       /* Receive the metadata from sessiond */
+       ret = lttcomm_recv_unix_sock(ctx->consumer_metadata_socket, &msg,
+                       sizeof(msg));
+       if (ret != sizeof(msg)) {
+               DBG("Consumer received unexpected message size %d (expects %lu)",
+                       ret, sizeof(msg));
+               lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
+               /*
+                * The ret value might 0 meaning an orderly shutdown but this is ok
+                * since the caller handles this.
+                */
+               goto end;
+       }
+
+       if (msg.cmd_type == LTTNG_ERR_UND) {
+               /* No registry found */
+               (void) consumer_send_status_msg(ctx->consumer_metadata_socket,
+                               ret_code);
+               ret = 0;
+               goto end;
+       } else if (msg.cmd_type != LTTNG_CONSUMER_PUSH_METADATA) {
+               ERR("Unexpected cmd_type received %d", msg.cmd_type);
+               ret = -1;
+               goto end;
+       }
+
+       len = msg.u.push_metadata.len;
+       key = msg.u.push_metadata.key;
+       offset = msg.u.push_metadata.target_offset;
+
+       assert(key == channel->key);
+       if (len == 0) {
+               DBG("No new metadata to receive for key %" PRIu64, key);
+       }
+
+       /* Tell session daemon we are ready to receive the metadata. */
+       ret = consumer_send_status_msg(ctx->consumer_metadata_socket,
+                       LTTNG_OK);
+       if (ret < 0 || len == 0) {
+               /*
+                * Somehow, the session daemon is not responding anymore or there is
+                * nothing to receive.
+                */
+               goto end;
+       }
+
+       ret_code = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
+                       key, offset, len, channel);
+       (void) consumer_send_status_msg(ctx->consumer_metadata_socket, ret_code);
+       ret = 0;
+
+end:
+       return ret;
+}
This page took 0.026427 seconds and 4 git commands to generate.