ust consumer: data_pending check is endpoint active
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index ff4ba25b9730f0a2896ef798e8869ca7c09aa1cc..4c93aa7c2849d7771a8d9ab29069ac2fcf42684c 100644 (file)
@@ -152,7 +152,8 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, int key,
                        channel->session_id,
                        cpu,
                        &alloc_ret,
-                       channel->type);
+                       channel->type,
+                       channel->monitor);
        if (stream == NULL) {
                switch (alloc_ret) {
                case -ENOENT:
@@ -208,42 +209,6 @@ static int send_stream_to_thread(struct lttng_consumer_stream *stream,
        return ret;
 }
 
-/*
- * Search for a relayd object related to the stream. If found, send the stream
- * to the relayd.
- *
- * On success, returns 0 else a negative value.
- */
-static int send_stream_to_relayd(struct lttng_consumer_stream *stream)
-{
-       int ret = 0;
-       struct consumer_relayd_sock_pair *relayd;
-
-       assert(stream);
-
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd != NULL) {
-               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-               /* Add stream on the relayd */
-               ret = relayd_add_stream(&relayd->control_sock, stream->name,
-                               stream->chan->pathname, &stream->relayd_stream_id,
-                               stream->chan->tracefile_size,
-                               stream->chan->tracefile_count);
-               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-               if (ret < 0) {
-                       goto error;
-               }
-       } else if (stream->net_seq_idx != (uint64_t) -1ULL) {
-               ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.",
-                               stream->net_seq_idx);
-               ret = -1;
-               goto error;
-       }
-
-error:
-       return ret;
-}
-
 /*
  * Create streams for the given channel using liblttng-ust-ctl.
  *
@@ -408,18 +373,20 @@ static int send_sessiond_channel(int sock,
 
        DBG("UST consumer sending channel %s to sessiond", channel->name);
 
-       cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
-               /* Try to send the stream to the relayd if one is available. */
-               ret = send_stream_to_relayd(stream);
-               if (ret < 0) {
-                       /*
-                        * Flag that the relayd was the problem here probably due to a
-                        * communicaton error on the socket.
-                        */
-                       if (relayd_error) {
-                               *relayd_error = 1;
+       if (channel->relayd_id != (uint64_t) -1ULL) {
+               cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+                       /* Try to send the stream to the relayd if one is available. */
+                       ret = consumer_send_relayd_stream(stream, stream->chan->pathname);
+                       if (ret < 0) {
+                               /*
+                                * Flag that the relayd was the problem here probably due to a
+                                * communicaton error on the socket.
+                                */
+                               if (relayd_error) {
+                                       *relayd_error = 1;
+                               }
+                               ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
                        }
-                       ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
                }
        }
 
@@ -559,6 +526,12 @@ static int send_streams_to_thread(struct lttng_consumer_channel *channel,
 
                /* Remove node from the channel stream list. */
                cds_list_del(&stream->send_node);
+
+               /*
+                * From this point on, the stream's ownership has been moved away from
+                * the channel and becomes globally visible.
+                */
+               stream->globally_visible = 1;
        }
 
 error:
@@ -665,6 +638,7 @@ static int close_metadata(uint64_t chan_key)
        }
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&channel->lock);
 
        if (cds_lfht_is_node_deleted(&channel->node.node)) {
                goto error_unlock;
@@ -685,6 +659,7 @@ static int close_metadata(uint64_t chan_key)
        }
 
 error_unlock:
+       pthread_mutex_unlock(&channel->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 error:
        return ret;
@@ -730,10 +705,13 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
        }
 
        /* Send metadata stream to relayd if needed. */
-       ret = send_stream_to_relayd(metadata->metadata_stream);
-       if (ret < 0) {
-               ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
-               goto error;
+       if (metadata->metadata_stream->net_seq_idx != (uint64_t) -1ULL) {
+               ret = consumer_send_relayd_stream(metadata->metadata_stream,
+                               metadata->pathname);
+               if (ret < 0) {
+                       ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
+                       goto error;
+               }
        }
 
        ret = send_streams_to_thread(metadata, ctx);
@@ -883,7 +861,7 @@ error:
  * Returns 0 on success, < 0 on error
  */
 static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
-               struct lttng_consumer_local_data *ctx)
+               uint64_t max_stream_size, struct lttng_consumer_local_data *ctx)
 {
        int ret;
        unsigned use_relayd = 0;
@@ -954,6 +932,15 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
                        goto error_unlock;
                }
 
+               /*
+                * The original value is sent back if max stream size is larger than
+                * the possible size of the snapshot. Also, we asume that the session
+                * daemon should never send a maximum stream size that is lower than
+                * subbuffer size.
+                */
+               consumed_pos = consumer_get_consumed_maxsize(consumed_pos,
+                               produced_pos, max_stream_size);
+
                while (consumed_pos < produced_pos) {
                        ssize_t read_len;
                        unsigned long len, padded_len;
@@ -1062,7 +1049,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
         * and ultimately try to get rid of this global consumer data lock.
         */
        pthread_mutex_lock(&consumer_data.lock);
-
+       pthread_mutex_lock(&channel->lock);
        pthread_mutex_lock(&channel->metadata_cache->lock);
        ret = consumer_metadata_cache_write(channel, offset, len, metadata_str);
        if (ret < 0) {
@@ -1074,10 +1061,12 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
                 * waiting for the metadata cache to be flushed.
                 */
                pthread_mutex_unlock(&channel->metadata_cache->lock);
+               pthread_mutex_unlock(&channel->lock);
                pthread_mutex_unlock(&consumer_data.lock);
                goto end_free;
        }
        pthread_mutex_unlock(&channel->metadata_cache->lock);
+       pthread_mutex_unlock(&channel->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
        while (consumer_metadata_cache_flushed(channel, offset + len)) {
@@ -1108,12 +1097,12 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        if (ret != sizeof(msg)) {
                DBG("Consumer received unexpected message size %zd (expects %zu)",
                        ret, sizeof(msg));
-               lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
                /*
                 * The ret value might 0 meaning an orderly shutdown but this is ok
                 * since the caller handles this.
                 */
                if (ret > 0) {
+                       lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
                        ret = -1;
                }
                return ret;
@@ -1449,6 +1438,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        ret = snapshot_channel(msg.u.snapshot_channel.key,
                                        msg.u.snapshot_channel.pathname,
                                        msg.u.snapshot_channel.relayd_id,
+                                       msg.u.snapshot_channel.max_stream_size,
                                        ctx);
                        if (ret < 0) {
                                ERR("Snapshot channel failed");
@@ -1744,6 +1734,11 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
 
        DBG("UST consumer checking data pending");
 
+       if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
+               ret = 0;
+               goto end;
+       }
+
        ret = ustctl_get_next_subbuf(stream->ustream);
        if (ret == 0) {
                /* There is still data so let's put back this subbuffer. */
This page took 0.02627 seconds and 4 git commands to generate.