X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=7c01bc772a008aa38a05a399c7acadf4849fd0a2;hb=02d02e31d47c091a38154c9c188c08387902d97b;hp=a5dcc663d68c67f4b465ecabe70004f682842f3f;hpb=66ab32be7aabac934946764a656e617a054622ac;p=lttng-tools.git diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index a5dcc663d..7c01bc772 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -60,7 +60,11 @@ int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream) int infd = stream->wait_fd; ret = kernctl_snapshot(infd); - if (ret != 0) { + /* + * -EAGAIN is not an error, it just means that there is no data to + * be read. + */ + if (ret != 0 && ret != -EAGAIN) { PERROR("Getting sub-buffer snapshot."); } @@ -187,14 +191,6 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, DBG("Kernel consumer snapshot stream %s/%s (%" PRIu64 ")", path, stream->name, stream->key); } - if (relayd_id != -1ULL) { - ret = consumer_send_relayd_streams_sent(relayd_id); - if (ret < 0) { - ERR("sending streams sent to relayd"); - goto end_unlock; - } - channel->streams_sent_to_relayd = true; - } ret = kernctl_buffer_flush_empty(stream->wait_fd); if (ret < 0) { @@ -389,7 +385,7 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path, do { health_code_update(); - ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx); + ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx, NULL); if (ret_read < 0) { if (ret_read != -EAGAIN) { ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)", @@ -662,6 +658,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, new_stream->chan = channel; new_stream->wait_fd = fd; + consumer_stream_update_channel_attributes(new_stream, + channel); switch (channel->output) { case CONSUMER_CHANNEL_SPLICE: new_stream->output = LTTNG_EVENT_SPLICE; @@ -748,26 +746,14 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Get the right pipe where the stream will be sent. */ if (new_stream->metadata_flag) { - ret = consumer_add_metadata_stream(new_stream); - if (ret) { - ERR("Consumer add metadata stream %" PRIu64 " failed. Continuing", - new_stream->key); - consumer_stream_free(new_stream); - goto end_nosignal; - } + consumer_add_metadata_stream(new_stream); stream_pipe = ctx->consumer_metadata_pipe; } else { - ret = consumer_add_data_stream(new_stream); - if (ret) { - ERR("Consumer add stream %" PRIu64 " failed. Continuing", - new_stream->key); - consumer_stream_free(new_stream); - goto end_nosignal; - } + consumer_add_data_stream(new_stream); stream_pipe = ctx->consumer_data_pipe; } - /* Vitible to other threads */ + /* Visible to other threads */ new_stream->globally_visible = 1; health_code_update(); @@ -785,8 +771,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_nosignal; } - DBG("Kernel consumer ADD_STREAM %s (fd: %d) with relayd id %" PRIu64, - new_stream->name, fd, new_stream->relayd_stream_id); + DBG("Kernel consumer ADD_STREAM %s (fd: %d) %s with relayd id %" PRIu64, + new_stream->name, fd, new_stream->chan->pathname, new_stream->relayd_stream_id); break; } case LTTNG_CONSUMER_STREAMS_SENT: @@ -1092,6 +1078,95 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } break; } + case LTTNG_CONSUMER_SET_CHANNEL_ROTATE_PIPE: + { + int channel_rotate_pipe; + int flags; + + ret_code = LTTCOMM_CONSUMERD_SUCCESS; + /* Successfully received the command's type. */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + goto error_fatal; + } + + ret = lttcomm_recv_fds_unix_sock(sock, &channel_rotate_pipe, 1); + if (ret != (ssize_t) sizeof(channel_rotate_pipe)) { + ERR("Failed to receive channel rotate pipe"); + goto error_fatal; + } + + DBG("Received channel rotate pipe (%d)", channel_rotate_pipe); + ctx->channel_rotate_pipe = channel_rotate_pipe; + /* Set the pipe as non-blocking. */ + ret = fcntl(channel_rotate_pipe, F_GETFL, 0); + if (ret == -1) { + PERROR("fcntl get flags of the channel rotate pipe"); + goto error_fatal; + } + flags = ret; + + ret = fcntl(channel_rotate_pipe, F_SETFL, flags | O_NONBLOCK); + if (ret == -1) { + PERROR("fcntl set O_NONBLOCK flag of the channel rotate pipe"); + goto error_fatal; + } + DBG("Channel rotate pipe set as non-blocking"); + ret_code = LTTCOMM_CONSUMERD_SUCCESS; + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + goto error_fatal; + } + break; + } + case LTTNG_CONSUMER_ROTATE_RENAME: + { + DBG("Consumer rename session %" PRIu64 " after rotation, old path = \"%s\", new path = \"%s\"", + msg.u.rotate_rename.session_id, + msg.u.rotate_rename.old_path, + msg.u.rotate_rename.new_path); + ret = lttng_consumer_rotate_rename(msg.u.rotate_rename.old_path, + msg.u.rotate_rename.new_path, + msg.u.rotate_rename.uid, + msg.u.rotate_rename.gid, + msg.u.rotate_rename.relayd_id); + if (ret < 0) { + ERR("Rotate rename failed"); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } + + health_code_update(); + + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + break; + } + case LTTNG_CONSUMER_MKDIR: + { + DBG("Consumer mkdir %s in session %" PRIu64, + msg.u.mkdir.path, + msg.u.mkdir.session_id); + ret = lttng_consumer_mkdir(msg.u.mkdir.path, + msg.u.mkdir.uid, + msg.u.mkdir.gid, + msg.u.mkdir.relayd_id); + if (ret < 0) { + ERR("consumer mkdir failed"); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } + + health_code_update(); + + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + break; + } default: goto end_nosignal; } @@ -1331,16 +1406,30 @@ end: * Consume data on a file descriptor and write it on a trace file. */ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx, bool *rotated) { unsigned long len, subbuf_size, padding; - int err, write_index = 1; + int err, write_index = 1, rotation_ret; ssize_t ret = 0; int infd = stream->wait_fd; struct ctf_packet_index index; DBG("In read_subbuffer (infd : %d)", infd); + /* + * If the stream was flagged to be ready for rotation before we extract the + * next packet, rotate it now. + */ + if (stream->rotate_ready) { + DBG("Rotate stream before extracting data"); + rotation_ret = lttng_consumer_rotate_stream(ctx, stream, rotated); + if (rotation_ret < 0) { + ERR("Stream rotation error"); + ret = -1; + goto error; + } + } + /* Get the next subbuffer */ err = kernctl_get_next_subbuf(infd); if (err != 0) { @@ -1353,7 +1442,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, DBG("Reserving sub buffer failed (everything is normal, " "it is due to concurrency)"); ret = err; - goto end; + goto error; } /* Get the full subbuffer size including padding */ @@ -1369,10 +1458,10 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); } ret = err; - goto end; + goto error; } ret = err; - goto end; + goto error; } if (!stream->metadata_flag) { @@ -1387,9 +1476,9 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); } ret = err; - goto end; + goto error; } - goto end; + goto error; } ret = update_stream_stats(stream); if (ret < 0) { @@ -1402,9 +1491,9 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); } ret = err; - goto end; + goto error; } - goto end; + goto error; } } else { write_index = 0; @@ -1419,9 +1508,9 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); } ret = err; - goto end; + goto error; } - goto end; + goto error; } } @@ -1466,10 +1555,10 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); } ret = err; - goto end; + goto error; } ret = err; - goto end; + goto error; } /* Make sure the tracer is not gone mad on us! */ @@ -1512,12 +1601,12 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); } ret = err; - goto end; + goto error; } /* Write index if needed. */ if (!write_index) { - goto end; + goto rotate; } if (stream->chan->live_timer_interval && !stream->metadata_flag) { @@ -1541,16 +1630,35 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, pthread_mutex_unlock(&stream->metadata_timer_lock); } if (err < 0) { - goto end; + goto error; } } err = consumer_stream_write_index(stream, &index); if (err < 0) { - goto end; + goto error; } -end: +rotate: + /* + * After extracting the packet, we check if the stream is now ready to be + * rotated and perform the action immediately. + */ + rotation_ret = lttng_consumer_stream_is_rotate_ready(stream); + if (rotation_ret == 1) { + rotation_ret = lttng_consumer_rotate_stream(ctx, stream, rotated); + if (rotation_ret < 0) { + ERR("Stream rotation error"); + ret = -1; + goto error; + } + } else if (rotation_ret < 0) { + ERR("Checking if stream is ready to rotate"); + ret = -1; + goto error; + } + +error: return ret; }