Fix: coding style and debug statement
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
index 29ef3a4ce426d1db8755796fdafffceed43525b6..ada5594b791e84e166138a42ab1678a1540dcc82 100644 (file)
@@ -109,41 +109,46 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
  *
  * Returns 0 on success, < 0 on error
  */
-static
-int send_relayd_stream(struct lttng_consumer_stream *stream, char *path)
+static int send_relayd_stream(struct lttng_consumer_stream *stream,
+               char *path)
 {
-       struct consumer_relayd_sock_pair *relayd;
        int ret = 0;
-       char *stream_path;
+       const char *stream_path;
+       struct consumer_relayd_sock_pair *relayd;
+
+       assert(stream);
+       assert(stream->net_seq_idx != -1ULL);
 
        if (path != NULL) {
                stream_path = path;
        } else {
                stream_path = stream->chan->pathname;
        }
+
        /* The stream is not metadata. Get relayd reference if exists. */
        rcu_read_lock();
        relayd = consumer_find_relayd(stream->net_seq_idx);
        if (relayd != NULL) {
                /* Add stream on the relayd */
                pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-               ret = relayd_add_stream(&relayd->control_sock,
-                               stream->name, stream_path,
-                               &stream->relayd_stream_id,
-                               stream->chan->tracefile_size,
-                               stream->chan->tracefile_count);
+               ret = relayd_add_stream(&relayd->control_sock, stream->name,
+                               stream_path, &stream->relayd_stream_id,
+                               stream->chan->tracefile_size, stream->chan->tracefile_count);
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (ret < 0) {
                        goto end;
                }
                uatomic_inc(&relayd->refcount);
-       } else if (stream->net_seq_idx != (uint64_t) -1ULL) {
-               ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.",
-                               stream->net_seq_idx);
+       } else {
+               ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
+                               stream->key, stream->net_seq_idx);
                ret = -1;
                goto end;
        }
 
+       DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
+                       stream->name, stream->key, stream->net_seq_idx);
+
 end:
        rcu_read_unlock();
        return ret;
@@ -152,15 +157,14 @@ end:
 /*
  * Find a relayd and close the stream
  */
-static
-void close_relayd_stream(struct lttng_consumer_stream *stream)
+static void close_relayd_stream(struct lttng_consumer_stream *stream)
 {
        struct consumer_relayd_sock_pair *relayd;
 
        /* The stream is not metadata. Get relayd reference if exists. */
        rcu_read_lock();
        relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd != NULL) {
+       if (relayd) {
                consumer_stream_relayd_close(stream, relayd);
        }
        rcu_read_unlock();
@@ -204,6 +208,10 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                 */
                pthread_mutex_lock(&stream->lock);
 
+               /*
+                * Assign the received relayd ID so we can use it for streaming. The streams
+                * are not visible to anyone so this is OK to change it.
+                */
                stream->net_seq_idx = relayd_id;
                channel->relayd_id = relayd_id;
                if (relayd_id != (uint64_t) -1ULL) {
@@ -212,7 +220,6 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                                ERR("sending stream to relayd");
                                goto end_unlock;
                        }
-                       DBG("Stream %s sent to the relayd", stream->name);
                } else {
                        ret = utils_create_stream_file(path, stream->name,
                                        stream->chan->tracefile_size, stream->tracefile_count_current,
@@ -282,22 +289,21 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
                        if (ret < 0) {
                                ERR("Snapshot kernctl_get_subbuf_size");
-                               goto end_unlock;
+                               goto error_put_subbuf;
                        }
 
                        ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
                        if (ret < 0) {
                                ERR("Snapshot kernctl_get_padded_subbuf_size");
-                               goto end_unlock;
+                               goto error_put_subbuf;
                        }
 
                        read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len,
                                        padded_len - len);
                        /*
-                        * We write the padded len in local tracefiles but the
-                        * data len when using a relay.
-                        * Display the error but continue processing to try to
-                        * release the subbuffer.
+                        * We write the padded len in local tracefiles but the data len
+                        * when using a relay. Display the error but continue processing
+                        * to try to release the subbuffer.
                         */
                        if (relayd_id != (uint64_t) -1ULL) {
                                if (read_len != len) {
@@ -337,6 +343,11 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
        ret = 0;
        goto end;
 
+error_put_subbuf:
+       ret = kernctl_put_subbuf(stream->wait_fd);
+       if (ret < 0) {
+               ERR("Snapshot kernctl_put_subbuf error path");
+       }
 end_unlock:
        pthread_mutex_unlock(&stream->lock);
 end:
@@ -350,7 +361,7 @@ end:
  * Returns 0 on success, < 0 on error
  */
 int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path,
-               struct lttng_consumer_local_data *ctx)
+               uint64_t relayd_id, struct lttng_consumer_local_data *ctx)
 {
        struct lttng_consumer_channel *metadata_channel;
        struct lttng_consumer_stream *metadata_stream;
@@ -371,14 +382,22 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path,
        metadata_stream = metadata_channel->metadata_stream;
        assert(metadata_stream);
 
-       ret = utils_create_stream_file(path, metadata_stream->name,
-                       metadata_stream->chan->tracefile_size,
-                       metadata_stream->tracefile_count_current,
-                       metadata_stream->uid, metadata_stream->gid);
-       if (ret < 0) {
-               goto end;
+       if (relayd_id != (uint64_t) -1ULL) {
+               ret = send_relayd_stream(metadata_stream, path);
+               if (ret < 0) {
+                       ERR("sending stream to relayd");
+               }
+               DBG("Stream %s sent to the relayd", metadata_stream->name);
+       } else {
+               ret = utils_create_stream_file(path, metadata_stream->name,
+                               metadata_stream->chan->tracefile_size,
+                               metadata_stream->tracefile_count_current,
+                               metadata_stream->uid, metadata_stream->gid);
+               if (ret < 0) {
+                       goto end;
+               }
+               metadata_stream->out_fd = ret;
        }
-       metadata_stream->out_fd = ret;
 
        ret = 0;
        while (ret >= 0) {
@@ -393,6 +412,18 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path,
                }
        }
 
+       if (relayd_id == (uint64_t) -1ULL) {
+               ret = close(metadata_stream->out_fd);
+               if (ret < 0) {
+                       PERROR("Kernel consumer snapshot close out_fd");
+                       goto end;
+               }
+               metadata_stream->out_fd = -1;
+       } else {
+               close_relayd_stream(metadata_stream);
+               metadata_stream->net_seq_idx = (uint64_t) -1ULL;
+       }
+
        ret = 0;
 end:
        rcu_read_unlock();
@@ -460,13 +491,24 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                msg.u.channel.name, msg.u.channel.uid, msg.u.channel.gid,
                                msg.u.channel.relayd_id, msg.u.channel.output,
                                msg.u.channel.tracefile_size,
-                               msg.u.channel.tracefile_count,
+                               msg.u.channel.tracefile_count, 0,
                                msg.u.channel.monitor);
                if (new_channel == NULL) {
                        lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
                        goto end_nosignal;
                }
                new_channel->nb_init_stream_left = msg.u.channel.nb_init_streams;
+               switch (msg.u.channel.output) {
+               case LTTNG_EVENT_SPLICE:
+                       new_channel->output = CONSUMER_CHANNEL_SPLICE;
+                       break;
+               case LTTNG_EVENT_MMAP:
+                       new_channel->output = CONSUMER_CHANNEL_MMAP;
+                       break;
+               default:
+                       ERR("Channel output unknown %d", msg.u.channel.output);
+                       goto end_nosignal;
+               }
 
                /* Translate and save channel type. */
                switch (msg.u.channel.type) {
@@ -619,12 +661,6 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                 */
                new_stream->hangup_flush_done = 0;
 
-               ret = send_relayd_stream(new_stream, NULL);
-               if (ret < 0) {
-                       consumer_del_stream(new_stream, NULL);
-                       goto end_nosignal;
-               }
-
                if (ctx->on_recv_stream) {
                        ret = ctx->on_recv_stream(new_stream);
                        if (ret < 0) {
@@ -639,14 +675,20 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                /* Do not monitor this stream. */
                if (!channel->monitor) {
-                       DBG("Kernel consumer add stream %s in no monitor mode with"
+                       DBG("Kernel consumer add stream %s in no monitor mode with "
                                        "relayd id %" PRIu64, new_stream->name,
-                                       new_stream->relayd_stream_id);
+                                       new_stream->net_seq_idx);
                        cds_list_add(&new_stream->no_monitor_node,
                                        &channel->stream_no_monitor_list.head);
                        break;
                }
 
+               ret = send_relayd_stream(new_stream, NULL);
+               if (ret < 0) {
+                       consumer_del_stream(new_stream, NULL);
+                       goto end_nosignal;
+               }
+
                /* Get the right pipe where the stream will be sent. */
                if (new_stream->metadata_flag) {
                        stream_pipe = ctx->consumer_metadata_pipe;
@@ -734,7 +776,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        {
                if (msg.u.snapshot_channel.metadata == 1) {
                        ret = lttng_kconsumer_snapshot_metadata(msg.u.snapshot_channel.key,
-                                       msg.u.snapshot_channel.pathname, ctx);
+                                       msg.u.snapshot_channel.pathname,
+                                       msg.u.snapshot_channel.relayd_id, ctx);
                        if (ret < 0) {
                                ERR("Snapshot metadata failed");
                                ret_code = LTTNG_ERR_KERN_META_FAIL;
This page took 0.027297 seconds and 4 git commands to generate.