consumer: introduce channel lock
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index 3133835be78091700eaa64e16e114626ca8327f2..abfe613a710d9733dbff984f49728d0ec00b54f0 100644 (file)
@@ -373,18 +373,20 @@ static int send_sessiond_channel(int sock,
 
        DBG("UST consumer sending channel %s to sessiond", channel->name);
 
-       cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
-               /* Try to send the stream to the relayd if one is available. */
-               ret = consumer_send_relayd_stream(stream, stream->chan->pathname);
-               if (ret < 0) {
-                       /*
-                        * Flag that the relayd was the problem here probably due to a
-                        * communicaton error on the socket.
-                        */
-                       if (relayd_error) {
-                               *relayd_error = 1;
+       if (channel->relayd_id != (uint64_t) -1ULL) {
+               cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+                       /* Try to send the stream to the relayd if one is available. */
+                       ret = consumer_send_relayd_stream(stream, stream->chan->pathname);
+                       if (ret < 0) {
+                               /*
+                                * Flag that the relayd was the problem here probably due to a
+                                * communicaton error on the socket.
+                                */
+                               if (relayd_error) {
+                                       *relayd_error = 1;
+                               }
+                               ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
                        }
-                       ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
                }
        }
 
@@ -636,6 +638,7 @@ static int close_metadata(uint64_t chan_key)
        }
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&channel->lock);
 
        if (cds_lfht_is_node_deleted(&channel->node.node)) {
                goto error_unlock;
@@ -656,6 +659,7 @@ static int close_metadata(uint64_t chan_key)
        }
 
 error_unlock:
+       pthread_mutex_unlock(&channel->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 error:
        return ret;
@@ -701,11 +705,13 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
        }
 
        /* Send metadata stream to relayd if needed. */
-       ret = consumer_send_relayd_stream(metadata->metadata_stream,
-                       metadata->pathname);
-       if (ret < 0) {
-               ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
-               goto error;
+       if (metadata->metadata_stream->net_seq_idx != (uint64_t) -1ULL) {
+               ret = consumer_send_relayd_stream(metadata->metadata_stream,
+                               metadata->pathname);
+               if (ret < 0) {
+                       ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
+                       goto error;
+               }
        }
 
        ret = send_streams_to_thread(metadata, ctx);
@@ -855,7 +861,7 @@ error:
  * Returns 0 on success, < 0 on error
  */
 static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
-               struct lttng_consumer_local_data *ctx)
+               uint64_t max_stream_size, struct lttng_consumer_local_data *ctx)
 {
        int ret;
        unsigned use_relayd = 0;
@@ -926,6 +932,15 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
                        goto error_unlock;
                }
 
+               /*
+                * The original value is sent back if max stream size is larger than
+                * the possible size of the snapshot. Also, we asume that the session
+                * daemon should never send a maximum stream size that is lower than
+                * subbuffer size.
+                */
+               consumed_pos = consumer_get_consumed_maxsize(consumed_pos,
+                               produced_pos, max_stream_size);
+
                while (consumed_pos < produced_pos) {
                        ssize_t read_len;
                        unsigned long len, padded_len;
@@ -1034,7 +1049,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
         * and ultimately try to get rid of this global consumer data lock.
         */
        pthread_mutex_lock(&consumer_data.lock);
-
+       pthread_mutex_lock(&channel->lock);
        pthread_mutex_lock(&channel->metadata_cache->lock);
        ret = consumer_metadata_cache_write(channel, offset, len, metadata_str);
        if (ret < 0) {
@@ -1046,10 +1061,12 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
                 * waiting for the metadata cache to be flushed.
                 */
                pthread_mutex_unlock(&channel->metadata_cache->lock);
+               pthread_mutex_unlock(&channel->lock);
                pthread_mutex_unlock(&consumer_data.lock);
                goto end_free;
        }
        pthread_mutex_unlock(&channel->metadata_cache->lock);
+       pthread_mutex_unlock(&channel->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
        while (consumer_metadata_cache_flushed(channel, offset + len)) {
@@ -1421,6 +1438,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        ret = snapshot_channel(msg.u.snapshot_channel.key,
                                        msg.u.snapshot_channel.pathname,
                                        msg.u.snapshot_channel.relayd_id,
+                                       msg.u.snapshot_channel.max_stream_size,
                                        ctx);
                        if (ret < 0) {
                                ERR("Snapshot channel failed");
This page took 0.025319 seconds and 4 git commands to generate.