Move stream file rotation functions to utils
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index 5a09ff51fb751069a3ad2fdbed17e698a0630dd0..bc0f585385814f1ba25be632f2a21fbd04232de9 100644 (file)
 #include <inttypes.h>
 #include <unistd.h>
 #include <urcu/list.h>
+#include <signal.h>
 
 #include <common/common.h>
 #include <common/sessiond-comm/sessiond-comm.h>
 #include <common/relayd/relayd.h>
 #include <common/compat/fcntl.h>
+#include <common/consumer-metadata-cache.h>
+#include <common/consumer-timer.h>
+#include <common/utils.h>
 
 #include "ust-consumer.h"
 
@@ -109,13 +113,14 @@ error:
  */
 static struct lttng_consumer_channel *allocate_channel(uint64_t session_id,
                const char *pathname, const char *name, uid_t uid, gid_t gid,
-               int relayd_id, uint64_t key, enum lttng_event_output output)
+               int relayd_id, uint64_t key, enum lttng_event_output output,
+               uint64_t tracefile_size, uint64_t tracefile_count)
 {
        assert(pathname);
        assert(name);
 
        return consumer_allocate_channel(key, session_id, pathname, name, uid, gid,
-                       relayd_id, output);
+                       relayd_id, output, tracefile_size, tracefile_count);
 }
 
 /*
@@ -530,10 +535,12 @@ error:
 /*
  * Write metadata to the given channel using ustctl to convert the string to
  * the ringbuffer.
+ * Called only from consumer_metadata_cache_write.
+ * The metadata cache lock MUST be acquired to write in the cache.
  *
  * Return 0 on success else a negative value.
  */
-static int push_metadata(struct lttng_consumer_channel *metadata,
+int lttng_ustconsumer_push_metadata(struct lttng_consumer_channel *metadata,
                const char *metadata_str, uint64_t target_offset, uint64_t len)
 {
        int ret;
@@ -543,13 +550,13 @@ static int push_metadata(struct lttng_consumer_channel *metadata,
 
        DBG("UST consumer writing metadata to channel %s", metadata->name);
 
-       assert(target_offset == metadata->contig_metadata_written);
-       ret = ustctl_write_metadata_to_channel(metadata->uchan, metadata_str, len);
+       assert(target_offset <= metadata->metadata_cache->max_offset);
+       ret = ustctl_write_metadata_to_channel(metadata->uchan,
+                       metadata_str + target_offset, len);
        if (ret < 0) {
                ERR("ustctl write metadata fail with ret %d, len %ld", ret, len);
                goto error;
        }
-       metadata->contig_metadata_written += len;
 
        ustctl_flush_buffer(metadata->metadata_stream->ustream, 1);
 
@@ -557,6 +564,43 @@ error:
        return ret;
 }
 
+/*
+ * Flush channel's streams using the given key to retrieve the channel.
+ *
+ * Return 0 on success else an LTTng error code.
+ */
+static int flush_channel(uint64_t chan_key)
+{
+       int ret = 0;
+       struct lttng_consumer_channel *channel;
+       struct lttng_consumer_stream *stream;
+       struct lttng_ht *ht;
+       struct lttng_ht_iter iter;
+
+       DBG("UST consumer flush channel key %lu", chan_key);
+
+       channel = consumer_find_channel(chan_key);
+       if (!channel) {
+               ERR("UST consumer flush channel %lu not found", chan_key);
+               ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+               goto error;
+       }
+
+       ht = consumer_data.stream_per_chan_id_ht;
+
+       /* For each stream of the channel id, flush it. */
+       rcu_read_lock();
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
+                       &channel->key, &iter.iter, stream, node_channel_id.node) {
+                       ustctl_flush_buffer(stream->ustream, 1);
+       }
+       rcu_read_unlock();
+
+error:
+       return ret;
+}
+
 /*
  * Close metadata stream wakeup_fd using the given key to retrieve the channel.
  *
@@ -582,6 +626,11 @@ static int close_metadata(uint64_t chan_key)
                ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
                goto error;
        }
+       if (channel->switch_timer_enabled == 1) {
+               DBG("Deleting timer on metadata channel");
+               consumer_timer_switch_stop(channel);
+       }
+       consumer_metadata_cache_destroy(channel);
 
 error:
        return ret;
@@ -641,6 +690,51 @@ error:
        return ret;
 }
 
+/*
+ * Receive the metadata updates from the sessiond.
+ */
+int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
+               uint64_t len, struct lttng_consumer_channel *channel)
+{
+       int ret, ret_code = LTTNG_OK;
+       char *metadata_str;
+
+       DBG("UST consumer push metadata key %lu of len %lu", key, len);
+
+       metadata_str = zmalloc(len * sizeof(char));
+       if (!metadata_str) {
+               PERROR("zmalloc metadata string");
+               ret_code = LTTCOMM_CONSUMERD_ENOMEM;
+               goto end;
+       }
+
+       /* Receive metadata string. */
+       ret = lttcomm_recv_unix_sock(sock, metadata_str, len);
+       if (ret < 0) {
+               /* Session daemon is dead so return gracefully. */
+               ret_code = ret;
+               goto end_free;
+       }
+
+       pthread_mutex_lock(&channel->metadata_cache->lock);
+       ret = consumer_metadata_cache_write(channel, offset, len, metadata_str);
+       if (ret < 0) {
+               /* Unable to handle metadata. Notify session daemon. */
+               ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
+       }
+       pthread_mutex_unlock(&channel->metadata_cache->lock);
+
+       while (consumer_metadata_cache_flushed(channel, offset + len)) {
+               DBG("Waiting for metadata to be flushed");
+               usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
+       }
+
+end_free:
+       free(metadata_str);
+end:
+       return ret_code;
+}
+
 /*
  * Receive command from session daemon and process it.
  *
@@ -756,7 +850,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                msg.u.ask_channel.pathname, msg.u.ask_channel.name,
                                msg.u.ask_channel.uid, msg.u.ask_channel.gid,
                                msg.u.ask_channel.relayd_id, msg.u.ask_channel.key,
-                               (enum lttng_event_output) msg.u.ask_channel.output);
+                               (enum lttng_event_output) msg.u.ask_channel.output,
+                               msg.u.ask_channel.tracefile_size,
+                               msg.u.ask_channel.tracefile_count);
                if (!channel) {
                        goto end_channel_error;
                }
@@ -767,6 +863,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                attr.overwrite = msg.u.ask_channel.overwrite;
                attr.switch_timer_interval = msg.u.ask_channel.switch_timer_interval;
                attr.read_timer_interval = msg.u.ask_channel.read_timer_interval;
+               attr.chan_id = msg.u.ask_channel.chan_id;
                memcpy(attr.uuid, msg.u.ask_channel.uuid, sizeof(attr.uuid));
 
                /* Translate the event output type to UST. */
@@ -809,6 +906,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto end_channel_error;
                }
 
+
                /*
                 * Channel and streams are now created. Inform the session daemon that
                 * everything went well and should wait to receive the channel and
@@ -823,6 +921,16 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto end_nosignal;
                }
 
+               if (msg.u.ask_channel.type == LTTNG_UST_CHAN_METADATA) {
+                       ret = consumer_metadata_cache_allocate(channel);
+                       if (ret < 0) {
+                               ERR("Allocating metadata cache");
+                               goto end_channel_error;
+                       }
+                       consumer_timer_switch_start(channel, attr.switch_timer_interval);
+                       attr.switch_timer_interval = 0;
+               }
+
                break;
        }
        case LTTNG_CONSUMER_GET_CHANNEL:
@@ -904,14 +1012,24 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                goto end_msg_sessiond;
        }
+       case LTTNG_CONSUMER_FLUSH_CHANNEL:
+       {
+               int ret;
+
+               ret = flush_channel(msg.u.flush_channel.key);
+               if (ret != 0) {
+                       ret_code = ret;
+               }
+
+               goto end_msg_sessiond;
+       }
        case LTTNG_CONSUMER_PUSH_METADATA:
        {
                int ret;
                uint64_t len = msg.u.push_metadata.len;
-               uint64_t target_offset = msg.u.push_metadata.target_offset;
                uint64_t key = msg.u.push_metadata.key;
+               uint64_t offset = msg.u.push_metadata.target_offset;
                struct lttng_consumer_channel *channel;
-               char *metadata_str;
 
                DBG("UST consumer push metadata key %lu of len %lu", key, len);
 
@@ -921,13 +1039,6 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
                }
 
-               metadata_str = zmalloc(len * sizeof(char));
-               if (!metadata_str) {
-                       PERROR("zmalloc metadata string");
-                       ret_code = LTTCOMM_CONSUMERD_ENOMEM;
-                       goto end_msg_sessiond;
-               }
-
                /* Tell session daemon we are ready to receive the metadata. */
                ret = consumer_send_status_msg(sock, LTTNG_OK);
                if (ret < 0) {
@@ -940,22 +1051,15 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto end_nosignal;
                }
 
-               /* Receive metadata string. */
-               ret = lttcomm_recv_unix_sock(sock, metadata_str, len);
+               ret = lttng_ustconsumer_recv_metadata(sock, key, offset,
+                               len, channel);
                if (ret < 0) {
-                       /* Session daemon is dead so return gracefully. */
+                       /* error receiving from sessiond */
                        goto end_nosignal;
-               }
-
-               ret = push_metadata(channel, metadata_str, target_offset, len);
-               free(metadata_str);
-               if (ret < 0) {
-                       /* Unable to handle metadata. Notify session daemon. */
-                       ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
+               } else {
+                       ret_code = ret;
                        goto end_msg_sessiond;
                }
-
-               goto end_msg_sessiond;
        }
        case LTTNG_CONSUMER_SETUP_METADATA:
        {
@@ -1173,41 +1277,32 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        }
        err = ustctl_put_next_subbuf(ustream);
        assert(err == 0);
+
 end:
        return ret;
 }
 
 /*
  * Called when a stream is created.
+ *
+ * Return 0 on success or else a negative value.
  */
 int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
 {
        int ret;
-       char full_path[PATH_MAX];
 
-       /* Opening the tracefile in write mode */
-       if (stream->net_seq_idx != (uint64_t) -1ULL) {
-               goto end;
-       }
-
-       ret = snprintf(full_path, sizeof(full_path), "%s/%s",
-                       stream->chan->pathname, stream->name);
-       if (ret < 0) {
-               PERROR("snprintf on_recv_stream");
-               goto error;
-       }
-
-       ret = run_as_open(full_path, O_WRONLY | O_CREAT | O_TRUNC,
-                       S_IRWXU | S_IRWXG | S_IRWXO, stream->uid, stream->gid);
-       if (ret < 0) {
-               PERROR("open stream path %s", full_path);
-               goto error;
+       /* Don't create anything if this is set for streaming. */
+       if (stream->net_seq_idx == (uint64_t) -1ULL) {
+               ret = utils_create_stream_file(stream->chan->pathname, stream->name,
+                               stream->chan->tracefile_size, stream->tracefile_count_current,
+                               stream->uid, stream->gid);
+               if (ret < 0) {
+                       goto error;
+               }
+               stream->out_fd = ret;
+               stream->tracefile_size_current = 0;
        }
-       stream->out_fd = ret;
-
-end:
-       /* we return 0 to let the library handle the FD internally */
-       return 0;
+       ret = 0;
 
 error:
        return ret;
@@ -1293,3 +1388,96 @@ void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
                ERR("Unable to close wakeup fd");
        }
 }
+
+int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_channel *channel)
+{
+       struct lttcomm_metadata_request_msg request;
+       struct lttcomm_consumer_msg msg;
+       enum lttng_error_code ret_code = LTTNG_OK;
+       uint64_t len, key, offset;
+       int ret;
+
+       assert(channel);
+       assert(channel->metadata_cache);
+
+       /* send the metadata request to sessiond */
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER64_UST:
+               request.bits_per_long = 64;
+               break;
+       case LTTNG_CONSUMER32_UST:
+               request.bits_per_long = 32;
+               break;
+       default:
+               request.bits_per_long = 0;
+               break;
+       }
+
+       request.session_id = channel->session_id;
+       request.uid = channel->uid;
+       request.key = channel->key;
+       DBG("Sending metadata request to sessiond, session %" PRIu64,
+                       channel->session_id);
+
+       ret = lttcomm_send_unix_sock(ctx->consumer_metadata_socket, &request,
+                       sizeof(request));
+       if (ret < 0) {
+               ERR("Asking metadata to sessiond");
+               goto end;
+       }
+
+       /* Receive the metadata from sessiond */
+       ret = lttcomm_recv_unix_sock(ctx->consumer_metadata_socket, &msg,
+                       sizeof(msg));
+       if (ret != sizeof(msg)) {
+               DBG("Consumer received unexpected message size %d (expects %lu)",
+                       ret, sizeof(msg));
+               lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
+               /*
+                * The ret value might 0 meaning an orderly shutdown but this is ok
+                * since the caller handles this.
+                */
+               goto end;
+       }
+
+       if (msg.cmd_type == LTTNG_ERR_UND) {
+               /* No registry found */
+               (void) consumer_send_status_msg(ctx->consumer_metadata_socket,
+                               ret_code);
+               ret = 0;
+               goto end;
+       } else if (msg.cmd_type != LTTNG_CONSUMER_PUSH_METADATA) {
+               ERR("Unexpected cmd_type received %d", msg.cmd_type);
+               ret = -1;
+               goto end;
+       }
+
+       len = msg.u.push_metadata.len;
+       key = msg.u.push_metadata.key;
+       offset = msg.u.push_metadata.target_offset;
+
+       assert(key == channel->key);
+       if (len == 0) {
+               DBG("No new metadata to receive for key %" PRIu64, key);
+       }
+
+       /* Tell session daemon we are ready to receive the metadata. */
+       ret = consumer_send_status_msg(ctx->consumer_metadata_socket,
+                       LTTNG_OK);
+       if (ret < 0 || len == 0) {
+               /*
+                * Somehow, the session daemon is not responding anymore or there is
+                * nothing to receive.
+                */
+               goto end;
+       }
+
+       ret_code = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
+                       key, offset, len, channel);
+       (void) consumer_send_status_msg(ctx->consumer_metadata_socket, ret_code);
+       ret = 0;
+
+end:
+       return ret;
+}
This page took 0.033709 seconds and 4 git commands to generate.