X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=5a219fc0b6c543d5c0f5270c3febd4ccfb4adb3a;hp=8c2bee33363614cbf58a4e5082aad84d9b8140ad;hb=c30aaa51f34105a7f20b9ceb39866001843db6e6;hpb=f02e1e8a5820da2eda835add020f92ca8d32c973 diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 8c2bee333..5a219fc0b 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -91,7 +92,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); if (ret != sizeof(msg)) { - lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_CMD); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); return ret; } if (msg.cmd_type == LTTNG_CONSUMER_STOP) { @@ -104,81 +105,9 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, switch (msg.cmd_type) { case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: { - int fd; - struct consumer_relayd_sock_pair *relayd; - - DBG("Consumer adding relayd socket"); - - /* Get relayd reference if exists. */ - relayd = consumer_find_relayd(msg.u.relayd_sock.net_index); - if (relayd == NULL) { - /* Not found. Allocate one. */ - relayd = consumer_allocate_relayd_sock_pair( - msg.u.relayd_sock.net_index); - if (relayd == NULL) { - lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); - goto end_nosignal; - } - } - - /* Poll on consumer socket. */ - if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { - return -EINTR; - } - - /* Get relayd socket from session daemon */ - ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); - if (ret != sizeof(fd)) { - lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); - goto end_nosignal; - } - - /* Copy socket information and received FD */ - switch (msg.u.relayd_sock.type) { - case LTTNG_STREAM_CONTROL: - /* Copy received lttcomm socket */ - lttcomm_copy_sock(&relayd->control_sock, &msg.u.relayd_sock.sock); - - ret = lttcomm_create_sock(&relayd->control_sock); - if (ret < 0) { - goto end_nosignal; - } - - /* Close the created socket fd which is useless */ - close(relayd->control_sock.fd); - - /* Assign new file descriptor */ - relayd->control_sock.fd = fd; - break; - case LTTNG_STREAM_DATA: - /* Copy received lttcomm socket */ - lttcomm_copy_sock(&relayd->data_sock, &msg.u.relayd_sock.sock); - ret = lttcomm_create_sock(&relayd->data_sock); - if (ret < 0) { - goto end_nosignal; - } - - /* Close the created socket fd which is useless */ - close(relayd->data_sock.fd); - - /* Assign new file descriptor */ - relayd->data_sock.fd = fd; - break; - default: - ERR("Unknown relayd socket type"); - goto end_nosignal; - } - - DBG("Consumer %s socket created successfully with net idx %d (fd: %d)", - msg.u.relayd_sock.type == LTTNG_STREAM_CONTROL ? "control" : "data", - relayd->net_seq_idx, fd); - - /* - * Add relayd socket pair to consumer data hashtable. If object already - * exists or on error, the function gracefully returns. - */ - consumer_add_relayd(relayd); - + ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index, + msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll, + &msg.u.relayd_sock.sock); goto end_nosignal; } case LTTNG_CONSUMER_ADD_CHANNEL: @@ -189,9 +118,10 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, new_channel = consumer_allocate_channel(msg.u.channel.channel_key, -1, -1, msg.u.channel.mmap_len, - msg.u.channel.max_sb_size); + msg.u.channel.max_sb_size, + msg.u.channel.nb_init_streams); if (new_channel == NULL) { - lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; } if (ctx->on_recv_channel != NULL) { @@ -214,13 +144,15 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + rcu_read_unlock(); return -EINTR; } /* Get stream file descriptor from socket */ ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); if (ret != sizeof(fd)) { - lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD); + rcu_read_unlock(); return ret; } @@ -236,10 +168,19 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.stream.net_index, msg.u.stream.metadata_flag); if (new_stream == NULL) { - lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); - goto end; + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); + goto end_nosignal; } + /* + * The buffer flush is done on the session daemon side for the kernel + * so no need for the stream "hangup_flush_done" variable to be + * tracked. This is important for a kernel stream since we don't rely + * on the flush state of the stream to read data. It's not the case for + * user space tracing. + */ + new_stream->hangup_flush_done = 0; + /* The stream is not metadata. Get relayd reference if exists. */ relayd = consumer_find_relayd(msg.u.stream.net_index); if (relayd != NULL) { @@ -250,23 +191,38 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, &new_stream->relayd_stream_id); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { - goto end; + goto end_nosignal; } } else if (msg.u.stream.net_index != -1) { ERR("Network sequence index %d unknown. Not adding stream.", msg.u.stream.net_index); free(new_stream); - goto end; + goto end_nosignal; } - if (ctx->on_recv_stream != NULL) { - ret = ctx->on_recv_stream(new_stream); - if (ret == 0) { - consumer_add_stream(new_stream); - } else if (ret < 0) { - goto end; + /* Send stream to the metadata thread */ + if (new_stream->metadata_flag) { + if (ctx->on_recv_stream) { + ret = ctx->on_recv_stream(new_stream); + if (ret < 0) { + goto end_nosignal; + } + } + + do { + ret = write(ctx->consumer_metadata_pipe[1], new_stream, + sizeof(struct lttng_consumer_stream)); + } while (ret < 0 && errno == EINTR); + if (ret < 0) { + PERROR("write metadata pipe"); } } else { + if (ctx->on_recv_stream) { + ret = ctx->on_recv_stream(new_stream); + if (ret < 0) { + goto end_nosignal; + } + } consumer_add_stream(new_stream); } @@ -275,30 +231,48 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_UPDATE_STREAM: { - if (ctx->on_update_stream != NULL) { - ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state); - if (ret == 0) { - consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state); - } else if (ret < 0) { - goto end; - } - } else { - consumer_change_stream_state(msg.u.stream.stream_key, - msg.u.stream.state); + rcu_read_unlock(); + return -ENOSYS; + } + case LTTNG_CONSUMER_DESTROY_RELAYD: + { + uint64_t index = msg.u.destroy_relayd.net_seq_idx; + struct consumer_relayd_sock_pair *relayd; + + DBG("Kernel consumer destroying relayd %" PRIu64, index); + + /* Get relayd reference if exists. */ + relayd = consumer_find_relayd(index); + if (relayd == NULL) { + ERR("Unable to find relayd %" PRIu64, index); + goto end_nosignal; } - break; + + /* + * Each relayd socket pair has a refcount of stream attached to it + * which tells if the relayd is still active or not depending on the + * refcount value. + * + * This will set the destroy flag of the relayd object and destroy it + * if the refcount reaches zero when called. + * + * The destroy can happen either here or when a stream fd hangs up. + */ + consumer_flag_relayd_for_destroy(relayd); + + goto end_nosignal; } default: - break; + goto end_nosignal; } -end: + /* - * Wake-up the other end by writing a null byte in the pipe - * (non-blocking). Important note: Because writing into the - * pipe is non-blocking (and therefore we allow dropping wakeup - * data, as long as there is wakeup data present in the pipe - * buffer to wake up the other end), the other end should - * perform the following sequence for waiting: + * Wake-up the other end by writing a null byte in the pipe (non-blocking). + * Important note: Because writing into the pipe is non-blocking (and + * therefore we allow dropping wakeup data, as long as there is wakeup data + * present in the pipe buffer to wake up the other end), the other end + * should perform the following sequence for waiting: + * * 1) empty the pipe (reads). * 2) perform update operation. * 3) wait on the pipe (poll). @@ -308,7 +282,12 @@ end: } while (ret < 0 && errno == EINTR); end_nosignal: rcu_read_unlock(); - return 0; + + /* + * Return 1 to indicate success since the 0 value can be a socket + * shutdown during the recv() or send() call. + */ + return 1; } /* @@ -317,7 +296,7 @@ end_nosignal: ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { - unsigned long len; + unsigned long len, subbuf_size, padding; int err; ssize_t ret = 0; int infd = stream->wait_fd; @@ -326,6 +305,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, /* Get the next subbuffer */ err = kernctl_get_next_subbuf(infd); if (err != 0) { + ret = err; /* * This is a debug message even for single-threaded consumer, * because poll() have more relaxed criterions than get subbuf, @@ -337,60 +317,92 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, goto end; } + /* Get the full subbuffer size including padding */ + err = kernctl_get_padded_subbuf_size(infd, &len); + if (err != 0) { + errno = -err; + perror("Getting sub-buffer len failed."); + ret = err; + goto end; + } + switch (stream->output) { - case LTTNG_EVENT_SPLICE: - /* read the whole subbuffer */ - err = kernctl_get_padded_subbuf_size(infd, &len); - if (err != 0) { - errno = -ret; - perror("Getting sub-buffer len failed."); - goto end; - } + case LTTNG_EVENT_SPLICE: - /* splice the subbuffer to the tracefile */ - ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, len); - if (ret != len) { - /* - * display the error but continue processing to try - * to release the subbuffer - */ - ERR("Error splicing to tracefile (ret: %ld != len: %ld)", - ret, len); - } + /* + * XXX: The lttng-modules splice "actor" does not handle copying + * partial pages hence only using the subbuffer size without the + * padding makes the splice fail. + */ + subbuf_size = len; + padding = 0; - break; - case LTTNG_EVENT_MMAP: - /* read the used subbuffer size */ - err = kernctl_get_padded_subbuf_size(infd, &len); - if (err != 0) { - errno = -ret; - perror("Getting sub-buffer len failed."); - goto end; - } - /* write the subbuffer to the tracefile */ - ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len); - if (ret != len) { - /* - * display the error but continue processing to try - * to release the subbuffer - */ - ERR("Error writing to tracefile"); - } - break; - default: - ERR("Unknown output method"); - ret = -1; + /* splice the subbuffer to the tracefile */ + ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, subbuf_size, + padding); + /* + * XXX: Splice does not support network streaming so the return value + * is simply checked against subbuf_size and not like the mmap() op. + */ + if (ret != subbuf_size) { + /* + * display the error but continue processing to try + * to release the subbuffer + */ + ERR("Error splicing to tracefile (ret: %zd != len: %lu)", + ret, subbuf_size); + } + break; + case LTTNG_EVENT_MMAP: + /* Get subbuffer size without padding */ + err = kernctl_get_subbuf_size(infd, &subbuf_size); + if (err != 0) { + errno = -err; + perror("Getting sub-buffer len failed."); + ret = err; + goto end; + } + + /* Make sure the tracer is not gone mad on us! */ + assert(len >= subbuf_size); + + padding = len - subbuf_size; + + /* write the subbuffer to the tracefile */ + ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, + padding); + /* + * The mmap operation should write subbuf_size amount of data when + * network streaming or the full padding (len) size when we are _not_ + * streaming. + */ + if ((ret != subbuf_size && stream->net_seq_idx != -1) || + (ret != len && stream->net_seq_idx == -1)) { + /* + * Display the error but continue processing to try to release the + * subbuffer + */ + ERR("Error writing to tracefile " + "(ret: %zd != len: %lu != subbuf_size: %lu)", + ret, len, subbuf_size); + } + break; + default: + ERR("Unknown output method"); + ret = -1; } err = kernctl_put_next_subbuf(infd); if (err != 0) { - errno = -ret; + errno = -err; if (errno == EFAULT) { perror("Error in unreserving sub buffer\n"); } else if (errno == EIO) { /* Should never happen with newer LTTng versions */ perror("Reader has been pushed by the writer, last sub-buffer corrupted."); } + + ret = -err; goto end; }