Implement the relayd live features
[lttng-tools.git] / src / common / consumer.c
index 1f96fa9d460c5a97eac61a9c79acadbf239a7e91..b8695698da7301b69b114fe373680b248294b486 100644 (file)
 #include <common/common.h>
 #include <common/utils.h>
 #include <common/compat/poll.h>
+#include <common/index/index.h>
 #include <common/kernel-ctl/kernel-ctl.h>
 #include <common/sessiond-comm/relayd.h>
 #include <common/sessiond-comm/sessiond-comm.h>
 #include <common/kernel-consumer/kernel-consumer.h>
 #include <common/relayd/relayd.h>
 #include <common/ust-consumer/ust-consumer.h>
+#include <common/consumer-timer.h>
 
 #include "consumer.h"
 #include "consumer-stream.h"
@@ -291,19 +293,28 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
        DBG("Consumer delete channel key %" PRIu64, channel->key);
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&channel->lock);
+
+       /* Delete streams that might have been left in the stream list. */
+       cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
+                       send_node) {
+               cds_list_del(&stream->send_node);
+               /*
+                * Once a stream is added to this list, the buffers were created so
+                * we have a guarantee that this call will succeed.
+                */
+               consumer_stream_destroy(stream, NULL);
+       }
+
+       if (channel->live_timer_enabled == 1) {
+               consumer_timer_live_stop(channel);
+       }
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
-               /* Delete streams that might have been left in the stream list. */
-               cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
-                               send_node) {
-                       cds_list_del(&stream->send_node);
-                       lttng_ustconsumer_del_stream(stream);
-                       free(stream);
-               }
                lttng_ustconsumer_del_channel(channel);
                break;
        default:
@@ -312,25 +323,6 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
                goto end;
        }
 
-       /* Empty no monitor streams list. */
-       if (!channel->monitor) {
-               struct lttng_consumer_stream *stream, *stmp;
-
-               /*
-                * So, these streams are not visible to any data thread. This is why we
-                * close and free them because they were never added to any data
-                * structure apart from this one.
-                */
-               cds_list_for_each_entry_safe(stream, stmp,
-                               &channel->stream_no_monitor_list.head, no_monitor_node) {
-                       cds_list_del(&stream->no_monitor_node);
-                       /* Close everything in that stream. */
-                       consumer_stream_close(stream);
-                       /* Free the ressource. */
-                       consumer_stream_free(stream);
-               }
-       }
-
        rcu_read_lock();
        iter.iter.node = &channel->node.node;
        ret = lttng_ht_del(consumer_data.channel_ht, &iter);
@@ -339,6 +331,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
 
        call_rcu(&channel->node.head, free_channel_rcu);
 end:
+       pthread_mutex_unlock(&channel->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 }
 
@@ -470,6 +463,19 @@ void consumer_del_stream(struct lttng_consumer_stream *stream,
        consumer_stream_destroy(stream, ht);
 }
 
+/*
+ * XXX naming of del vs destroy is all mixed up.
+ */
+void consumer_del_stream_for_data(struct lttng_consumer_stream *stream)
+{
+       consumer_stream_destroy(stream, data_ht);
+}
+
+void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
+{
+       consumer_stream_destroy(stream, metadata_ht);
+}
+
 struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
                uint64_t stream_key,
                enum lttng_consumer_stream_state state,
@@ -480,7 +486,8 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
                uint64_t session_id,
                int cpu,
                int *alloc_ret,
-               enum consumer_channel_type type)
+               enum consumer_channel_type type,
+               unsigned int monitor)
 {
        int ret;
        struct lttng_consumer_stream *stream;
@@ -497,11 +504,15 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
        stream->key = stream_key;
        stream->out_fd = -1;
        stream->out_fd_offset = 0;
+       stream->output_written = 0;
        stream->state = state;
        stream->uid = uid;
        stream->gid = gid;
        stream->net_seq_idx = relayd_id;
        stream->session_id = session_id;
+       stream->monitor = monitor;
+       stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
+       stream->index_fd = -1;
        pthread_mutex_init(&stream->lock, NULL);
 
        /* If channel is the metadata, flag this stream as metadata. */
@@ -549,11 +560,10 @@ end:
 /*
  * Add a stream to the global list protected by a mutex.
  */
-static int add_stream(struct lttng_consumer_stream *stream,
-               struct lttng_ht *ht)
+int consumer_add_data_stream(struct lttng_consumer_stream *stream)
 {
+       struct lttng_ht *ht = data_ht;
        int ret = 0;
-       struct consumer_relayd_sock_pair *relayd;
 
        assert(stream);
        assert(ht);
@@ -561,6 +571,8 @@ static int add_stream(struct lttng_consumer_stream *stream,
        DBG3("Adding consumer stream %" PRIu64, stream->key);
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->chan->lock);
+       pthread_mutex_lock(&stream->chan->timer_lock);
        pthread_mutex_lock(&stream->lock);
        rcu_read_lock();
 
@@ -579,12 +591,6 @@ static int add_stream(struct lttng_consumer_stream *stream,
         */
        lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
 
-       /* Check and cleanup relayd */
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd != NULL) {
-               uatomic_inc(&relayd->refcount);
-       }
-
        /*
         * When nb_init_stream_left reaches 0, we don't need to trigger any action
         * in terms of destroying the associated channel, because the action that
@@ -604,11 +610,18 @@ static int add_stream(struct lttng_consumer_stream *stream,
 
        rcu_read_unlock();
        pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&stream->chan->timer_lock);
+       pthread_mutex_unlock(&stream->chan->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
        return ret;
 }
 
+void consumer_del_data_stream(struct lttng_consumer_stream *stream)
+{
+       consumer_del_stream(stream, data_ht);
+}
+
 /*
  * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
  * be acquired before calling this.
@@ -693,6 +706,68 @@ error:
        return relayd;
 }
 
+/*
+ * Find a relayd and send the stream
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
+               char *path)
+{
+       int ret = 0;
+       struct consumer_relayd_sock_pair *relayd;
+
+       assert(stream);
+       assert(stream->net_seq_idx != -1ULL);
+       assert(path);
+
+       /* The stream is not metadata. Get relayd reference if exists. */
+       rcu_read_lock();
+       relayd = consumer_find_relayd(stream->net_seq_idx);
+       if (relayd != NULL) {
+               /* Add stream on the relayd */
+               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               ret = relayd_add_stream(&relayd->control_sock, stream->name,
+                               path, &stream->relayd_stream_id,
+                               stream->chan->tracefile_size, stream->chan->tracefile_count);
+               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+               if (ret < 0) {
+                       goto end;
+               }
+
+               uatomic_inc(&relayd->refcount);
+               stream->sent_to_relayd = 1;
+       } else {
+               ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
+                               stream->key, stream->net_seq_idx);
+               ret = -1;
+               goto end;
+       }
+
+       DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
+                       stream->name, stream->key, stream->net_seq_idx);
+
+end:
+       rcu_read_unlock();
+       return ret;
+}
+
+/*
+ * Find a relayd and close the stream
+ */
+void close_relayd_stream(struct lttng_consumer_stream *stream)
+{
+       struct consumer_relayd_sock_pair *relayd;
+
+       /* The stream is not metadata. Get relayd reference if exists. */
+       rcu_read_lock();
+       relayd = consumer_find_relayd(stream->net_seq_idx);
+       if (relayd) {
+               consumer_stream_relayd_close(stream, relayd);
+       }
+       rcu_read_unlock();
+}
+
 /*
  * Handle stream for relayd transmission if the stream applies for network
  * streaming where the net sequence index is set.
@@ -770,7 +845,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                uint64_t tracefile_size,
                uint64_t tracefile_count,
                uint64_t session_id_per_pid,
-               unsigned int monitor)
+               unsigned int monitor,
+               unsigned int live_timer_interval)
 {
        struct lttng_consumer_channel *channel;
 
@@ -791,6 +867,9 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        channel->tracefile_size = tracefile_size;
        channel->tracefile_count = tracefile_count;
        channel->monitor = monitor;
+       channel->live_timer_interval = live_timer_interval;
+       pthread_mutex_init(&channel->lock, NULL);
+       pthread_mutex_init(&channel->timer_lock, NULL);
 
        /*
         * In monitor mode, the streams associated with the channel will be put in
@@ -816,7 +895,6 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        channel->wait_fd = -1;
 
        CDS_INIT_LIST_HEAD(&channel->streams.head);
-       CDS_INIT_LIST_HEAD(&channel->stream_no_monitor_list.head);
 
        DBG("Allocated channel (key %" PRIu64 ")", channel->key)
 
@@ -837,6 +915,8 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
        struct lttng_ht_iter iter;
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&channel->lock);
+       pthread_mutex_lock(&channel->timer_lock);
        rcu_read_lock();
 
        lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter);
@@ -853,10 +933,12 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
 
 end:
        rcu_read_unlock();
+       pthread_mutex_unlock(&channel->timer_lock);
+       pthread_mutex_unlock(&channel->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
        if (!ret && channel->wait_fd != -1 &&
-                       channel->metadata_stream == NULL) {
+                       channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
                notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
        }
        return ret;
@@ -1088,7 +1170,7 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                        struct lttng_consumer_local_data *ctx),
                int (*recv_channel)(struct lttng_consumer_channel *channel),
                int (*recv_stream)(struct lttng_consumer_stream *stream),
-               int (*update_stream)(int stream_key, uint32_t state))
+               int (*update_stream)(uint64_t stream_key, uint32_t state))
 {
        int ret;
        struct lttng_consumer_local_data *ctx;
@@ -1105,6 +1187,7 @@ struct lttng_consumer_local_data *lttng_consumer_create(
 
        ctx->consumer_error_socket = -1;
        ctx->consumer_metadata_socket = -1;
+       pthread_mutex_init(&ctx->metadata_socket_lock, NULL);
        /* assign the callbacks */
        ctx->on_buffer_ready = buffer_ready;
        ctx->on_recv_channel = recv_channel;
@@ -1244,7 +1327,8 @@ end:
 ssize_t lttng_consumer_on_read_subbuffer_mmap(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len,
-               unsigned long padding)
+               unsigned long padding,
+               struct lttng_packet_index *index)
 {
        unsigned long mmap_offset;
        void *mmap_base;
@@ -1262,6 +1346,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        if (stream->net_seq_idx != (uint64_t) -1ULL) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
                if (relayd == NULL) {
+                       ret = -EPIPE;
                        goto end;
                }
        }
@@ -1271,28 +1356,31 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        case LTTNG_CONSUMER_KERNEL:
                mmap_base = stream->mmap_base;
                ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
+               if (ret != 0) {
+                       PERROR("tracer ctl get_mmap_read_offset");
+                       written = -errno;
+                       goto end;
+               }
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
                mmap_base = lttng_ustctl_get_mmap_base(stream);
                if (!mmap_base) {
                        ERR("read mmap get mmap base for stream %s", stream->name);
-                       written = -1;
+                       written = -EPERM;
                        goto end;
                }
                ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset);
-
+               if (ret != 0) {
+                       PERROR("tracer ctl get_mmap_read_offset");
+                       written = ret;
+                       goto end;
+               }
                break;
        default:
                ERR("Unknown consumer_data type");
                assert(0);
        }
-       if (ret != 0) {
-               errno = -ret;
-               PERROR("tracer ctl get_mmap_read_offset");
-               written = ret;
-               goto end;
-       }
 
        /* Handle stream on the relayd if the output is on the network */
        if (relayd) {
@@ -1347,16 +1435,34 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                        ret = utils_rotate_stream_file(stream->chan->pathname,
                                        stream->name, stream->chan->tracefile_size,
                                        stream->chan->tracefile_count, stream->uid, stream->gid,
-                                       stream->out_fd, &(stream->tracefile_count_current));
+                                       stream->out_fd, &(stream->tracefile_count_current),
+                                       &stream->out_fd);
                        if (ret < 0) {
                                ERR("Rotating output file");
                                goto end;
                        }
-                       outfd = stream->out_fd = ret;
+                       outfd = stream->out_fd;
+
+                       if (stream->index_fd >= 0) {
+                               ret = index_create_file(stream->chan->pathname,
+                                               stream->name, stream->uid, stream->gid,
+                                               stream->chan->tracefile_size,
+                                               stream->tracefile_count_current);
+                               if (ret < 0) {
+                                       goto end;
+                               }
+                               stream->index_fd = ret;
+                       }
+
                        /* Reset current size because we just perform a rotation. */
                        stream->tracefile_size_current = 0;
+                       stream->out_fd_offset = 0;
+                       orig_offset = 0;
                }
                stream->tracefile_size_current += len;
+               if (index) {
+                       index->offset = htobe64(stream->out_fd_offset);
+               }
        }
 
        while (len > 0) {
@@ -1373,7 +1479,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                         */
                        DBG("Error in file write mmap");
                        if (written == 0) {
-                               written = ret;
+                               written = -errno;
                        }
                        /* Socket operation failed. We consider the relayd dead */
                        if (errno == EPIPE || errno == EINVAL) {
@@ -1397,6 +1503,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                                        SYNC_FILE_RANGE_WRITE);
                        stream->out_fd_offset += ret;
                }
+               stream->output_written += ret;
                written += ret;
        }
        lttng_consumer_sync_trace_file(stream, orig_offset);
@@ -1430,7 +1537,8 @@ end:
 ssize_t lttng_consumer_on_read_subbuffer_splice(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len,
-               unsigned long padding)
+               unsigned long padding,
+               struct lttng_packet_index *index)
 {
        ssize_t ret = 0, written = 0, ret_splice = 0;
        loff_t offset = 0;
@@ -1461,6 +1569,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
        if (stream->net_seq_idx != (uint64_t) -1ULL) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
                if (relayd == NULL) {
+                       ret = -EPIPE;
                        goto end;
                }
        }
@@ -1529,16 +1638,32 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                        ret = utils_rotate_stream_file(stream->chan->pathname,
                                        stream->name, stream->chan->tracefile_size,
                                        stream->chan->tracefile_count, stream->uid, stream->gid,
-                                       stream->out_fd, &(stream->tracefile_count_current));
+                                       stream->out_fd, &(stream->tracefile_count_current),
+                                       &stream->out_fd);
                        if (ret < 0) {
                                ERR("Rotating output file");
                                goto end;
                        }
-                       outfd = stream->out_fd = ret;
+                       outfd = stream->out_fd;
+
+                       if (stream->index_fd >= 0) {
+                               ret = index_create_file(stream->chan->pathname,
+                                               stream->name, stream->uid, stream->gid,
+                                               stream->chan->tracefile_size,
+                                               stream->tracefile_count_current);
+                               if (ret < 0) {
+                                       goto end;
+                               }
+                               stream->index_fd = ret;
+                       }
+
                        /* Reset current size because we just perform a rotation. */
                        stream->tracefile_size_current = 0;
+                       stream->out_fd_offset = 0;
+                       orig_offset = 0;
                }
                stream->tracefile_size_current += len;
+               index->offset = htobe64(stream->out_fd_offset);
        }
 
        while (len > 0) {
@@ -1607,6 +1732,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                                        SYNC_FILE_RANGE_WRITE);
                        stream->out_fd_offset += ret_splice;
                }
+               stream->output_written += ret_splice;
                written += ret_splice;
        }
        lttng_consumer_sync_trace_file(stream, orig_offset);
@@ -1812,6 +1938,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
        }
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->chan->lock);
        pthread_mutex_lock(&stream->lock);
 
        switch (consumer_data.type) {
@@ -1831,6 +1958,13 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
+               if (stream->monitor) {
+                       /* close the write-side in close_metadata */
+                       ret = close(stream->ust_metadata_poll_pipe[0]);
+                       if (ret < 0) {
+                               PERROR("Close UST metadata read-side poll pipe");
+                       }
+               }
                lttng_ustconsumer_del_stream(stream);
                break;
        default:
@@ -1899,12 +2033,13 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
 end:
        /*
         * Nullify the stream reference so it is not used after deletion. The
-        * consumer data lock MUST be acquired before being able to check for a
-        * NULL pointer value.
+        * channel lock MUST be acquired before being able to check for
+        * NULL pointer value.
         */
        stream->chan->metadata_stream = NULL;
 
        pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&stream->chan->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
        if (free_chan) {
@@ -1919,11 +2054,10 @@ free_stream_rcu:
  * Action done with the metadata stream when adding it to the consumer internal
  * data structures to handle it.
  */
-static int add_metadata_stream(struct lttng_consumer_stream *stream,
-               struct lttng_ht *ht)
+int consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
 {
+       struct lttng_ht *ht = metadata_ht;
        int ret = 0;
-       struct consumer_relayd_sock_pair *relayd;
        struct lttng_ht_iter iter;
        struct lttng_ht_node_u64 *node;
 
@@ -1933,6 +2067,8 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream,
        DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->chan->lock);
+       pthread_mutex_lock(&stream->chan->timer_lock);
        pthread_mutex_lock(&stream->lock);
 
        /*
@@ -1950,12 +2086,6 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream,
        node = lttng_ht_iter_get_node_u64(&iter);
        assert(!node);
 
-       /* Find relayd and, if one is found, increment refcount. */
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd != NULL) {
-               uatomic_inc(&relayd->refcount);
-       }
-
        /*
         * When nb_init_stream_left reaches 0, we don't need to trigger any action
         * in terms of destroying the associated channel, because the action that
@@ -1984,6 +2114,8 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream,
        rcu_read_unlock();
 
        pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&stream->chan->lock);
+       pthread_mutex_unlock(&stream->chan->timer_lock);
        pthread_mutex_unlock(&consumer_data.lock);
        return ret;
 }
@@ -2107,11 +2239,6 @@ restart:
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
-                       /* Just don't waste time if no returned events for the fd */
-                       if (!revents) {
-                               continue;
-                       }
-
                        if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
                                if (revents & (LPOLLERR | LPOLLHUP )) {
                                        DBG("Metadata thread pipe hung up");
@@ -2129,7 +2256,7 @@ restart:
                                        pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
                                                        &stream, sizeof(stream));
                                        if (pipe_len < 0) {
-                                               ERR("read metadata stream, ret: %ld", pipe_len);
+                                               ERR("read metadata stream, ret: %zd", pipe_len);
                                                /*
                                                 * Continue here to handle the rest of the streams.
                                                 */
@@ -2146,14 +2273,6 @@ restart:
                                        DBG("Adding metadata stream %d to poll set",
                                                        stream->wait_fd);
 
-                                       ret = add_metadata_stream(stream, metadata_ht);
-                                       if (ret) {
-                                               ERR("Unable to add metadata stream");
-                                               /* Stream was not setup properly. Continuing. */
-                                               consumer_del_metadata_stream(stream, NULL);
-                                               continue;
-                                       }
-
                                        /* Add metadata stream to the global poll events list */
                                        lttng_poll_add(&events, stream->wait_fd,
                                                        LPOLLIN | LPOLLPRI);
@@ -2207,14 +2326,21 @@ restart:
                                DBG("Metadata available on fd %d", pollfd);
                                assert(stream->wait_fd == pollfd);
 
-                               len = ctx->on_buffer_ready(stream, ctx);
+                               do {
+                                       len = ctx->on_buffer_ready(stream, ctx);
+                                       /*
+                                        * We don't check the return value here since if we get
+                                        * a negative len, it means an error occured thus we
+                                        * simply remove it from the poll set and free the
+                                        * stream.
+                                        */
+                               } while (len > 0);
+
                                /* It's ok to have an unavailable sub-buffer */
                                if (len < 0 && len != -EAGAIN && len != -ENODATA) {
                                        /* Clean up stream from consumer and free it. */
                                        lttng_poll_del(&events, stream->wait_fd);
                                        consumer_del_metadata_stream(stream, metadata_ht);
-                               } else if (len > 0) {
-                                       stream->data_read = 1;
                                }
                        }
 
@@ -2258,7 +2384,11 @@ void *consumer_thread_data_poll(void *data)
                goto end;
        }
 
-       local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
+       local_stream = zmalloc(sizeof(struct lttng_consumer_stream *));
+       if (local_stream == NULL) {
+               PERROR("local_stream malloc");
+               goto end;
+       }
 
        while (1) {
                high_prio = 0;
@@ -2341,7 +2471,7 @@ void *consumer_thread_data_poll(void *data)
                        pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
                                        &new_stream, sizeof(new_stream));
                        if (pipe_readlen < 0) {
-                               ERR("Consumer data pipe ret %ld", pipe_readlen);
+                               ERR("Consumer data pipe ret %zd", pipe_readlen);
                                /* Continue so we can at least handle the current stream(s). */
                                continue;
                        }
@@ -2356,17 +2486,6 @@ void *consumer_thread_data_poll(void *data)
                                continue;
                        }
 
-                       ret = add_stream(new_stream, data_ht);
-                       if (ret) {
-                               ERR("Consumer add stream %" PRIu64 " failed. Continuing",
-                                               new_stream->key);
-                               /*
-                                * At this point, if the add_stream fails, it is not in the
-                                * hash table thus passing the NULL value here.
-                                */
-                               consumer_del_stream(new_stream, NULL);
-                       }
-
                        /* Continue to update the local streams and handle prio ones */
                        continue;
                }
@@ -2744,7 +2863,6 @@ restart:
                                lttng_poll_del(&events, chan->wait_fd);
                                ret = lttng_ht_del(channel_ht, &iter);
                                assert(ret == 0);
-                               assert(cds_list_empty(&chan->streams.head));
                                consumer_close_channel_streams(chan);
 
                                /* Release our own refcount */
@@ -2999,7 +3117,8 @@ void lttng_consumer_init(void)
 int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                struct lttng_consumer_local_data *ctx, int sock,
                struct pollfd *consumer_sockpoll,
-               struct lttcomm_relayd_sock *relayd_sock, unsigned int sessiond_id)
+               struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id,
+               uint64_t relayd_session_id)
 {
        int fd = -1, ret = -1, relayd_created = 0;
        enum lttng_error_code ret_code = LTTNG_OK;
@@ -3021,7 +3140,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                        ret_code = LTTCOMM_CONSUMERD_ENOMEM;
                        goto error;
                } else {
-                       relayd->sessiond_session_id = (uint64_t) sessiond_id;
+                       relayd->sessiond_session_id = sessiond_id;
                        relayd_created = 1;
                }
 
@@ -3099,29 +3218,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                relayd->control_sock.major = relayd_sock->major;
                relayd->control_sock.minor = relayd_sock->minor;
 
-               /*
-                * Create a session on the relayd and store the returned id. Lock the
-                * control socket mutex if the relayd was NOT created before.
-                */
-               if (!relayd_created) {
-                       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-               }
-               ret = relayd_create_session(&relayd->control_sock,
-                               &relayd->relayd_session_id);
-               if (!relayd_created) {
-                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-               }
-               if (ret < 0) {
-                       /*
-                        * Close all sockets of a relayd object. It will be freed if it was
-                        * created at the error code path or else it will be garbage
-                        * collect.
-                        */
-                       (void) relayd_close(&relayd->control_sock);
-                       (void) relayd_close(&relayd->data_sock);
-                       ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
-                       goto error;
-               }
+               relayd->relayd_session_id = relayd_session_id;
 
                break;
        case LTTNG_STREAM_DATA:
@@ -3322,6 +3419,15 @@ int consumer_data_pending(uint64_t id)
                 */
                ret = cds_lfht_is_node_deleted(&stream->node.node);
                if (!ret) {
+                       /*
+                        * An empty output file is not valid. We need at least one packet
+                        * generated per stream, even if it contains no event, so it
+                        * contains at least one packet header.
+                        */
+                       if (stream->output_written == 0) {
+                               pthread_mutex_unlock(&stream->lock);
+                               goto data_pending;
+                       }
                        /* Check the stream if there is data in the buffers. */
                        ret = data_pending(stream);
                        if (ret == 1) {
@@ -3421,3 +3527,23 @@ int consumer_send_status_channel(int sock,
 
        return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
 }
+
+/*
+ * Using a maximum stream size with the produced and consumed position of a
+ * stream, computes the new consumed position to be as close as possible to the
+ * maximum possible stream size.
+ *
+ * If maximum stream size is lower than the possible buffer size (produced -
+ * consumed), the consumed_pos given is returned untouched else the new value
+ * is returned.
+ */
+unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos,
+               unsigned long produced_pos, uint64_t max_stream_size)
+{
+       if (max_stream_size && max_stream_size < (produced_pos - consumed_pos)) {
+               /* Offset from the produced position to get the latest buffers. */
+               return produced_pos - max_stream_size;
+       }
+
+       return consumed_pos;
+}
This page took 0.032938 seconds and 4 git commands to generate.