Fix: format string type mismatch
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
index 5b05d9f6b6d4e9d9b0c588700def21dc5fa06cfe..8416bc5a3716fb5400cbeb49ae373858a2137144 100644 (file)
@@ -104,88 +104,27 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
        return ret;
 }
 
-/*
- * Find a relayd and send the stream
- *
- * Returns 0 on success, < 0 on error
- */
-static
-int send_relayd_stream(struct lttng_consumer_stream *stream, char *path)
-{
-       struct consumer_relayd_sock_pair *relayd;
-       int ret = 0;
-       char *stream_path;
-
-       if (path != NULL) {
-               stream_path = path;
-       } else {
-               stream_path = stream->chan->pathname;
-       }
-       /* The stream is not metadata. Get relayd reference if exists. */
-       rcu_read_lock();
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd != NULL) {
-               /* Add stream on the relayd */
-               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-               ret = relayd_add_stream(&relayd->control_sock,
-                               stream->name, stream_path,
-                               &stream->relayd_stream_id,
-                               stream->chan->tracefile_size,
-                               stream->chan->tracefile_count);
-               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-               if (ret < 0) {
-                       goto end;
-               }
-               uatomic_inc(&relayd->refcount);
-       } else if (stream->net_seq_idx != (uint64_t) -1ULL) {
-               ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.",
-                               stream->net_seq_idx);
-               ret = -1;
-               goto end;
-       }
-
-end:
-       rcu_read_unlock();
-       return ret;
-}
-
-/*
- * Find a relayd and close the stream
- */
-static
-void close_relayd_stream(struct lttng_consumer_stream *stream)
-{
-       struct consumer_relayd_sock_pair *relayd;
-
-       /* The stream is not metadata. Get relayd reference if exists. */
-       rcu_read_lock();
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd != NULL) {
-               consumer_stream_relayd_close(stream, relayd);
-       }
-       rcu_read_unlock();
-}
-
 /*
  * Take a snapshot of all the stream of a channel
  *
  * Returns 0 on success, < 0 on error
  */
 int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
-               uint64_t relayd_id, struct lttng_consumer_local_data *ctx)
+               uint64_t relayd_id, uint64_t max_stream_size,
+               struct lttng_consumer_local_data *ctx)
 {
        int ret;
        unsigned long consumed_pos, produced_pos;
        struct lttng_consumer_channel *channel;
        struct lttng_consumer_stream *stream;
 
-       DBG("Kernel consumer snapshot channel %lu", key);
+       DBG("Kernel consumer snapshot channel %" PRIu64, key);
 
        rcu_read_lock();
 
        channel = consumer_find_channel(key);
        if (!channel) {
-               ERR("No channel found for key %lu", key);
+               ERR("No channel found for key %" PRIu64, key);
                ret = -1;
                goto end;
        }
@@ -197,25 +136,28 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                goto end;
        }
 
-       cds_list_for_each_entry(stream, &channel->stream_no_monitor_list.head,
-                       no_monitor_node) {
+       cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
                /*
                 * Lock stream because we are about to change its state.
                 */
                pthread_mutex_lock(&stream->lock);
 
+               /*
+                * Assign the received relayd ID so we can use it for streaming. The streams
+                * are not visible to anyone so this is OK to change it.
+                */
                stream->net_seq_idx = relayd_id;
                channel->relayd_id = relayd_id;
                if (relayd_id != (uint64_t) -1ULL) {
-                       ret = send_relayd_stream(stream, path);
+                       ret = consumer_send_relayd_stream(stream, path);
                        if (ret < 0) {
                                ERR("sending stream to relayd");
                                goto end_unlock;
                        }
-                       DBG("Stream %s sent to the relayd", stream->name);
                } else {
                        ret = utils_create_stream_file(path, stream->name,
-                                       stream->chan->tracefile_size, stream->tracefile_count_current,
+                                       stream->chan->tracefile_size,
+                                       stream->tracefile_count_current,
                                        stream->uid, stream->gid);
                        if (ret < 0) {
                                ERR("utils_create_stream_file");
@@ -231,7 +173,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
 
                ret = kernctl_buffer_flush(stream->wait_fd);
                if (ret < 0) {
-                       ERR("Failed to flush kernel metadata stream");
+                       ERR("Failed to flush kernel stream");
                        goto end_unlock;
                }
 
@@ -262,6 +204,15 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        }
                }
 
+               /*
+                * The original value is sent back if max stream size is larger than
+                * the possible size of the snapshot. Also, we asume that the session
+                * daemon should never send a maximum stream size that is lower than
+                * subbuffer size.
+                */
+               consumed_pos = consumer_get_consumed_maxsize(consumed_pos,
+                               produced_pos, max_stream_size);
+
                while (consumed_pos < produced_pos) {
                        ssize_t read_len;
                        unsigned long len, padded_len;
@@ -282,22 +233,21 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
                        if (ret < 0) {
                                ERR("Snapshot kernctl_get_subbuf_size");
-                               goto end_unlock;
+                               goto error_put_subbuf;
                        }
 
                        ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
                        if (ret < 0) {
                                ERR("Snapshot kernctl_get_padded_subbuf_size");
-                               goto end_unlock;
+                               goto error_put_subbuf;
                        }
 
                        read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len,
                                        padded_len - len);
                        /*
-                        * We write the padded len in local tracefiles but the
-                        * data len when using a relay.
-                        * Display the error but continue processing to try to
-                        * release the subbuffer.
+                        * We write the padded len in local tracefiles but the data len
+                        * when using a relay. Display the error but continue processing
+                        * to try to release the subbuffer.
                         */
                        if (relayd_id != (uint64_t) -1ULL) {
                                if (read_len != len) {
@@ -320,12 +270,14 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                }
 
                if (relayd_id == (uint64_t) -1ULL) {
-                       ret = close(stream->out_fd);
-                       if (ret < 0) {
-                               PERROR("Kernel consumer snapshot close out_fd");
-                               goto end_unlock;
+                       if (stream->out_fd >= 0) {
+                               ret = close(stream->out_fd);
+                               if (ret < 0) {
+                                       PERROR("Kernel consumer snapshot close out_fd");
+                                       goto end_unlock;
+                               }
+                               stream->out_fd = -1;
                        }
-                       stream->out_fd = -1;
                } else {
                        close_relayd_stream(stream);
                        stream->net_seq_idx = (uint64_t) -1ULL;
@@ -337,6 +289,11 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
        ret = 0;
        goto end;
 
+error_put_subbuf:
+       ret = kernctl_put_subbuf(stream->wait_fd);
+       if (ret < 0) {
+               ERR("Snapshot kernctl_put_subbuf error path");
+       }
 end_unlock:
        pthread_mutex_unlock(&stream->lock);
 end:
@@ -352,9 +309,12 @@ end:
 int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path,
                uint64_t relayd_id, struct lttng_consumer_local_data *ctx)
 {
+       int ret, use_relayd = 0;
+       ssize_t ret_read;
        struct lttng_consumer_channel *metadata_channel;
        struct lttng_consumer_stream *metadata_stream;
-       int ret;
+
+       assert(ctx);
 
        DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s",
                        key, path);
@@ -363,58 +323,71 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path,
 
        metadata_channel = consumer_find_channel(key);
        if (!metadata_channel) {
-               ERR("Snapshot kernel metadata channel not found for key %lu", key);
+               ERR("Kernel snapshot metadata not found for key %" PRIu64, key);
                ret = -1;
-               goto end;
+               goto error;
        }
 
        metadata_stream = metadata_channel->metadata_stream;
        assert(metadata_stream);
 
+       /* Flag once that we have a valid relayd for the stream. */
        if (relayd_id != (uint64_t) -1ULL) {
-               ret = send_relayd_stream(metadata_stream, path);
+               use_relayd = 1;
+       }
+
+       if (use_relayd) {
+               ret = consumer_send_relayd_stream(metadata_stream, path);
                if (ret < 0) {
-                       ERR("sending stream to relayd");
+                       goto error;
                }
-               DBG("Stream %s sent to the relayd", metadata_stream->name);
        } else {
                ret = utils_create_stream_file(path, metadata_stream->name,
                                metadata_stream->chan->tracefile_size,
                                metadata_stream->tracefile_count_current,
                                metadata_stream->uid, metadata_stream->gid);
                if (ret < 0) {
-                       goto end;
+                       goto error;
                }
                metadata_stream->out_fd = ret;
        }
 
-       ret = 0;
-       while (ret >= 0) {
-               ret = lttng_kconsumer_read_subbuffer(metadata_stream, ctx);
-               if (ret < 0) {
-                       if (ret != -EPERM) {
-                               ERR("Kernel snapshot reading subbuffer");
-                               goto end;
+       do {
+               ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx);
+               if (ret_read < 0) {
+                       if (ret_read != -EPERM) {
+                               ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)",
+                                               ret_read);
+                               goto error;
                        }
-                       /* "ret" is negative at this point so we will exit the loop. */
+                       /* ret_read is negative at this point so we will exit the loop. */
                        continue;
                }
-       }
+       } while (ret_read >= 0);
 
-       if (relayd_id == (uint64_t) -1ULL) {
-               ret = close(metadata_stream->out_fd);
-               if (ret < 0) {
-                       PERROR("Kernel consumer snapshot close out_fd");
-                       goto end;
-               }
-               metadata_stream->out_fd = -1;
-       } else {
+       if (use_relayd) {
                close_relayd_stream(metadata_stream);
                metadata_stream->net_seq_idx = (uint64_t) -1ULL;
+       } else {
+               if (metadata_stream->out_fd >= 0) {
+                       ret = close(metadata_stream->out_fd);
+                       if (ret < 0) {
+                               PERROR("Kernel consumer snapshot metadata close out_fd");
+                               /*
+                                * Don't go on error here since the snapshot was successful at this
+                                * point but somehow the close failed.
+                                */
+                       }
+                       metadata_stream->out_fd = -1;
+               }
        }
 
        ret = 0;
-end:
+
+       cds_list_del(&metadata_stream->send_node);
+       consumer_stream_destroy(metadata_stream, NULL);
+       metadata_channel->metadata_stream = NULL;
+error:
        rcu_read_unlock();
        return ret;
 }
@@ -433,8 +406,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
        ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
        if (ret != sizeof(msg)) {
-               lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
                if (ret > 0) {
+                       lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
                        ret = -1;
                }
                return ret;
@@ -557,20 +530,15 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                /* First send a status message before receiving the fds. */
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
-                       /*
-                        * Somehow, the session daemon is not responding
-                        * anymore.
-                        */
+                       /* Somehow, the session daemon is not responding anymore. */
                        goto error_fatal;
                }
                if (ret_code != LTTNG_OK) {
-                       /*
-                        * Channel was not found.
-                        */
+                       /* Channel was not found. */
                        goto end_nosignal;
                }
 
-               /* block */
+               /* Blocking call */
                if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
                        rcu_read_unlock();
                        return -EINTR;
@@ -605,7 +573,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                channel->session_id,
                                msg.u.stream.cpu,
                                &alloc_ret,
-                               channel->type);
+                               channel->type,
+                               channel->monitor);
                if (new_stream == NULL) {
                        switch (alloc_ret) {
                        case -ENOMEM:
@@ -616,6 +585,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        }
                        goto end_nosignal;
                }
+
                new_stream->chan = channel;
                new_stream->wait_fd = fd;
                switch (channel->output) {
@@ -653,7 +623,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                if (ctx->on_recv_stream) {
                        ret = ctx->on_recv_stream(new_stream);
                        if (ret < 0) {
-                               consumer_del_stream(new_stream, NULL);
+                               consumer_stream_free(new_stream);
                                goto end_nosignal;
                        }
                }
@@ -664,33 +634,57 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                /* Do not monitor this stream. */
                if (!channel->monitor) {
-                       DBG("Kernel consumer add stream %s in no monitor mode with"
+                       DBG("Kernel consumer add stream %s in no monitor mode with "
                                        "relayd id %" PRIu64, new_stream->name,
-                                       new_stream->relayd_stream_id);
-                       cds_list_add(&new_stream->no_monitor_node,
-                                       &channel->stream_no_monitor_list.head);
+                                       new_stream->net_seq_idx);
+                       cds_list_add(&new_stream->send_node, &channel->streams.head);
                        break;
                }
 
-               ret = send_relayd_stream(new_stream, NULL);
-               if (ret < 0) {
-                       consumer_del_stream(new_stream, NULL);
-                       goto end_nosignal;
+               /* Send stream to relayd if the stream has an ID. */
+               if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
+                       ret = consumer_send_relayd_stream(new_stream,
+                                       new_stream->chan->pathname);
+                       if (ret < 0) {
+                               consumer_stream_free(new_stream);
+                               goto end_nosignal;
+                       }
                }
 
                /* Get the right pipe where the stream will be sent. */
                if (new_stream->metadata_flag) {
+                       ret = consumer_add_metadata_stream(new_stream);
+                       if (ret) {
+                               ERR("Consumer add metadata stream %" PRIu64 " failed. Continuing",
+                                               new_stream->key);
+                               consumer_stream_free(new_stream);
+                               goto end_nosignal;
+                       }
                        stream_pipe = ctx->consumer_metadata_pipe;
                } else {
+                       ret = consumer_add_data_stream(new_stream);
+                       if (ret) {
+                               ERR("Consumer add stream %" PRIu64 " failed. Continuing",
+                                               new_stream->key);
+                               consumer_stream_free(new_stream);
+                               goto end_nosignal;
+                       }
                        stream_pipe = ctx->consumer_data_pipe;
                }
 
+               /* Vitible to other threads */
+               new_stream->globally_visible = 1;
+
                ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream));
                if (ret < 0) {
                        ERR("Consumer write %s stream to pipe %d",
                                        new_stream->metadata_flag ? "metadata" : "data",
                                        lttng_pipe_get_writefd(stream_pipe));
-                       consumer_del_stream(new_stream, NULL);
+                       if (new_stream->metadata_flag) {
+                               consumer_del_stream_for_metadata(new_stream);
+                       } else {
+                               consumer_del_stream_for_data(new_stream);
+                       }
                        goto end_nosignal;
                }
 
@@ -774,7 +768,9 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                } else {
                        ret = lttng_kconsumer_snapshot_channel(msg.u.snapshot_channel.key,
                                        msg.u.snapshot_channel.pathname,
-                                       msg.u.snapshot_channel.relayd_id, ctx);
+                                       msg.u.snapshot_channel.relayd_id,
+                                       msg.u.snapshot_channel.max_stream_size,
+                                       ctx);
                        if (ret < 0) {
                                ERR("Snapshot channel failed");
                                ret_code = LTTNG_ERR_KERN_CHAN_FAIL;
@@ -1005,11 +1001,12 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
        return 0;
 
 error_close_fd:
-       {
+       if (stream->out_fd >= 0) {
                int err;
 
                err = close(stream->out_fd);
                assert(!err);
+               stream->out_fd = -1;
        }
 error:
        return ret;
@@ -1029,6 +1026,11 @@ int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream)
 
        assert(stream);
 
+       if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
+               ret = 0;
+               goto end;
+       }
+
        ret = kernctl_get_next_subbuf(stream->wait_fd);
        if (ret == 0) {
                /* There is still data so let's put back this subbuffer. */
This page took 0.029562 seconds and 4 git commands to generate.