Fix: relayd refcount updates for stream
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index f318af153ca3aca1dcdd1573590d7447de61490c..3133835be78091700eaa64e16e114626ca8327f2 100644 (file)
@@ -152,7 +152,8 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, int key,
                        channel->session_id,
                        cpu,
                        &alloc_ret,
-                       channel->type);
+                       channel->type,
+                       channel->monitor);
        if (stream == NULL) {
                switch (alloc_ret) {
                case -ENOENT:
@@ -208,42 +209,6 @@ static int send_stream_to_thread(struct lttng_consumer_stream *stream,
        return ret;
 }
 
-/*
- * Search for a relayd object related to the stream. If found, send the stream
- * to the relayd.
- *
- * On success, returns 0 else a negative value.
- */
-static int send_stream_to_relayd(struct lttng_consumer_stream *stream)
-{
-       int ret = 0;
-       struct consumer_relayd_sock_pair *relayd;
-
-       assert(stream);
-
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd != NULL) {
-               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-               /* Add stream on the relayd */
-               ret = relayd_add_stream(&relayd->control_sock, stream->name,
-                               stream->chan->pathname, &stream->relayd_stream_id,
-                               stream->chan->tracefile_size,
-                               stream->chan->tracefile_count);
-               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-               if (ret < 0) {
-                       goto error;
-               }
-       } else if (stream->net_seq_idx != (uint64_t) -1ULL) {
-               ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.",
-                               stream->net_seq_idx);
-               ret = -1;
-               goto error;
-       }
-
-error:
-       return ret;
-}
-
 /*
  * Create streams for the given channel using liblttng-ust-ctl.
  *
@@ -410,7 +375,7 @@ static int send_sessiond_channel(int sock,
 
        cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
                /* Try to send the stream to the relayd if one is available. */
-               ret = send_stream_to_relayd(stream);
+               ret = consumer_send_relayd_stream(stream, stream->chan->pathname);
                if (ret < 0) {
                        /*
                         * Flag that the relayd was the problem here probably due to a
@@ -559,6 +524,12 @@ static int send_streams_to_thread(struct lttng_consumer_channel *channel,
 
                /* Remove node from the channel stream list. */
                cds_list_del(&stream->send_node);
+
+               /*
+                * From this point on, the stream's ownership has been moved away from
+                * the channel and becomes globally visible.
+                */
+               stream->globally_visible = 1;
        }
 
 error:
@@ -726,11 +697,12 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
        if (cds_list_empty(&metadata->streams.head)) {
                ERR("Metadata channel key %" PRIu64 ", no stream available.", key);
                ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
-               goto error;
+               goto error_no_stream;
        }
 
        /* Send metadata stream to relayd if needed. */
-       ret = send_stream_to_relayd(metadata->metadata_stream);
+       ret = consumer_send_relayd_stream(metadata->metadata_stream,
+                       metadata->pathname);
        if (ret < 0) {
                ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
                goto error;
@@ -758,7 +730,9 @@ error:
         * the stream is still in the local stream list of the channel. This call
         * will make sure to clean that list.
         */
-       consumer_del_channel(metadata);
+       cds_list_del(&metadata->metadata_stream->send_node);
+       consumer_stream_destroy(metadata->metadata_stream, NULL);
+error_no_stream:
 end:
        return ret;
 }
@@ -1106,12 +1080,12 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        if (ret != sizeof(msg)) {
                DBG("Consumer received unexpected message size %zd (expects %zu)",
                        ret, sizeof(msg));
-               lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
                /*
                 * The ret value might 0 meaning an orderly shutdown but this is ok
                 * since the caller handles this.
                 */
                if (ret > 0) {
+                       lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
                        ret = -1;
                }
                return ret;
This page took 0.02444 seconds and 4 git commands to generate.