X-Git-Url: http://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=21ac08f1b4d278c3a4fd35226508e274903673ce;hb=5c786dedd0156b93984f89ba47ec841277ed7dae;hp=02198bd1b9e854f26a8614c5f76fbf76a36835f1;hpb=d771f8323f5f8964145e149502d6dc8d8cac8745;p=lttng-tools.git diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 02198bd1b..21ac08f1b 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -104,79 +104,14 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, return ret; } -/* - * Find a relayd and send the stream - * - * Returns 0 on success, < 0 on error - */ -static int send_relayd_stream(struct lttng_consumer_stream *stream, - char *path) -{ - int ret = 0; - const char *stream_path; - struct consumer_relayd_sock_pair *relayd; - - assert(stream); - assert(stream->net_seq_idx != -1ULL); - - if (path != NULL) { - stream_path = path; - } else { - stream_path = stream->chan->pathname; - } - - /* The stream is not metadata. Get relayd reference if exists. */ - rcu_read_lock(); - relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd != NULL) { - /* Add stream on the relayd */ - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = relayd_add_stream(&relayd->control_sock, stream->name, - stream_path, &stream->relayd_stream_id, - stream->chan->tracefile_size, stream->chan->tracefile_count); - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - if (ret < 0) { - goto end; - } - uatomic_inc(&relayd->refcount); - } else { - ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.", - stream->key, stream->net_seq_idx); - ret = -1; - goto end; - } - - DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64, - stream->name, stream->key, stream->net_seq_idx); - -end: - rcu_read_unlock(); - return ret; -} - -/* - * Find a relayd and close the stream - */ -static void close_relayd_stream(struct lttng_consumer_stream *stream) -{ - struct consumer_relayd_sock_pair *relayd; - - /* The stream is not metadata. Get relayd reference if exists. */ - rcu_read_lock(); - relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd) { - consumer_stream_relayd_close(stream, relayd); - } - rcu_read_unlock(); -} - /* * Take a snapshot of all the stream of a channel * * Returns 0 on success, < 0 on error */ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, - uint64_t relayd_id, struct lttng_consumer_local_data *ctx) + uint64_t relayd_id, uint64_t max_stream_size, + struct lttng_consumer_local_data *ctx) { int ret; unsigned long consumed_pos, produced_pos; @@ -201,8 +136,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, goto end; } - cds_list_for_each_entry(stream, &channel->stream_no_monitor_list.head, - no_monitor_node) { + cds_list_for_each_entry(stream, &channel->streams.head, send_node) { /* * Lock stream because we are about to change its state. */ @@ -215,14 +149,15 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, stream->net_seq_idx = relayd_id; channel->relayd_id = relayd_id; if (relayd_id != (uint64_t) -1ULL) { - ret = send_relayd_stream(stream, path); + ret = consumer_send_relayd_stream(stream, path); if (ret < 0) { ERR("sending stream to relayd"); goto end_unlock; } } else { ret = utils_create_stream_file(path, stream->name, - stream->chan->tracefile_size, stream->tracefile_count_current, + stream->chan->tracefile_size, + stream->tracefile_count_current, stream->uid, stream->gid); if (ret < 0) { ERR("utils_create_stream_file"); @@ -238,7 +173,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, ret = kernctl_buffer_flush(stream->wait_fd); if (ret < 0) { - ERR("Failed to flush kernel metadata stream"); + ERR("Failed to flush kernel stream"); goto end_unlock; } @@ -269,6 +204,15 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, } } + /* + * The original value is sent back if max stream size is larger than + * the possible size of the snapshot. Also, we asume that the session + * daemon should never send a maximum stream size that is lower than + * subbuffer size. + */ + consumed_pos = consumer_get_consumed_maxsize(consumed_pos, + produced_pos, max_stream_size); + while (consumed_pos < produced_pos) { ssize_t read_len; unsigned long len, padded_len; @@ -326,12 +270,14 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, } if (relayd_id == (uint64_t) -1ULL) { - ret = close(stream->out_fd); - if (ret < 0) { - PERROR("Kernel consumer snapshot close out_fd"); - goto end_unlock; + if (stream->out_fd >= 0) { + ret = close(stream->out_fd); + if (ret < 0) { + PERROR("Kernel consumer snapshot close out_fd"); + goto end_unlock; + } + stream->out_fd = -1; } - stream->out_fd = -1; } else { close_relayd_stream(stream); stream->net_seq_idx = (uint64_t) -1ULL; @@ -391,7 +337,7 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path, } if (use_relayd) { - ret = send_relayd_stream(metadata_stream, path); + ret = consumer_send_relayd_stream(metadata_stream, path); if (ret < 0) { goto error; } @@ -423,19 +369,24 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path, close_relayd_stream(metadata_stream); metadata_stream->net_seq_idx = (uint64_t) -1ULL; } else { - ret = close(metadata_stream->out_fd); - if (ret < 0) { - PERROR("Kernel consumer snapshot metadata close out_fd"); - /* - * Don't go on error here since the snapshot was successful at this - * point but somehow the close failed. - */ + if (metadata_stream->out_fd >= 0) { + ret = close(metadata_stream->out_fd); + if (ret < 0) { + PERROR("Kernel consumer snapshot metadata close out_fd"); + /* + * Don't go on error here since the snapshot was successful at this + * point but somehow the close failed. + */ + } + metadata_stream->out_fd = -1; } - metadata_stream->out_fd = -1; } ret = 0; + cds_list_del(&metadata_stream->send_node); + consumer_stream_destroy(metadata_stream, NULL); + metadata_channel->metadata_stream = NULL; error: rcu_read_unlock(); return ret; @@ -455,8 +406,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); if (ret != sizeof(msg)) { - lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); if (ret > 0) { + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); ret = -1; } return ret; @@ -622,7 +573,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, channel->session_id, msg.u.stream.cpu, &alloc_ret, - channel->type); + channel->type, + channel->monitor); if (new_stream == NULL) { switch (alloc_ret) { case -ENOMEM: @@ -685,15 +637,18 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, DBG("Kernel consumer add stream %s in no monitor mode with " "relayd id %" PRIu64, new_stream->name, new_stream->net_seq_idx); - cds_list_add(&new_stream->no_monitor_node, - &channel->stream_no_monitor_list.head); + cds_list_add(&new_stream->send_node, &channel->streams.head); break; } - ret = send_relayd_stream(new_stream, NULL); - if (ret < 0) { - consumer_stream_free(new_stream); - goto end_nosignal; + /* Send stream to relayd if the stream has an ID. */ + if (new_stream->net_seq_idx != (uint64_t) -1ULL) { + ret = consumer_send_relayd_stream(new_stream, + new_stream->chan->pathname); + if (ret < 0) { + consumer_stream_free(new_stream); + goto end_nosignal; + } } /* Get the right pipe where the stream will be sent. */ @@ -711,6 +666,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, consumer_stream_free(new_stream); goto end_nosignal; } + /* Successfully sent to the right thread. */ + new_stream->globally_visible = 1; DBG("Kernel consumer ADD_STREAM %s (fd: %d) with relayd id %" PRIu64, new_stream->name, fd, new_stream->relayd_stream_id); @@ -792,7 +749,9 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } else { ret = lttng_kconsumer_snapshot_channel(msg.u.snapshot_channel.key, msg.u.snapshot_channel.pathname, - msg.u.snapshot_channel.relayd_id, ctx); + msg.u.snapshot_channel.relayd_id, + msg.u.snapshot_channel.max_stream_size, + ctx); if (ret < 0) { ERR("Snapshot channel failed"); ret_code = LTTNG_ERR_KERN_CHAN_FAIL; @@ -1023,11 +982,12 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) return 0; error_close_fd: - { + if (stream->out_fd >= 0) { int err; err = close(stream->out_fd); assert(!err); + stream->out_fd = -1; } error: return ret;