X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=c2ad0fdd8cef613f170f4c43673d5d356fde8f93;hp=f6add2c2c88327d05f1dd1ce90b49241df74c1f8;hb=c8f59ee5fc11492ef472dc5cfd2fd2c4926b1787;hpb=00e2e675d54dc726a7c8f8887c889cc8ef022003 diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index f6add2c2c..c2ad0fdd8 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -18,6 +18,7 @@ #define _GNU_SOURCE #include +#include #include #include #include @@ -26,8 +27,8 @@ #include #include #include +#include #include -#include #include #include @@ -41,121 +42,14 @@ extern int consumer_poll_timeout; extern volatile int consumer_quit; /* - * Mmap the ring buffer, read it and write the data to the tracefile. - * - * Returns the number of bytes written, else negative value on error. - */ -ssize_t lttng_ustconsumer_on_read_subbuffer_mmap( - struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream, unsigned long len) -{ - unsigned long mmap_offset; - long ret = 0, written = 0; - off_t orig_offset = stream->out_fd_offset; - int outfd = stream->out_fd; - uint64_t metadata_id; - struct consumer_relayd_sock_pair *relayd = NULL; - - /* Flag that the current stream if set for network streaming. */ - if (stream->net_seq_idx != -1) { - relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd == NULL) { - goto end; - } - } - - /* get the offset inside the fd to mmap */ - ret = ustctl_get_mmap_read_offset(stream->chan->handle, - stream->buf, &mmap_offset); - if (ret != 0) { - errno = -ret; - PERROR("ustctl_get_mmap_read_offset"); - written = ret; - goto end; - } - - /* Handle stream on the relayd if the output is on the network */ - if (relayd) { - if (stream->metadata_flag) { - /* Only lock if metadata since we use the control socket. */ - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - } - - ret = consumer_handle_stream_before_relayd(stream, len); - if (ret >= 0) { - outfd = ret; - - /* Write metadata stream id before payload */ - if (stream->metadata_flag) { - metadata_id = htobe64(stream->relayd_stream_id); - do { - ret = write(outfd, (void *) &metadata_id, - sizeof(stream->relayd_stream_id)); - if (ret < 0) { - PERROR("write metadata stream id"); - written = ret; - goto end; - } - } while (errno == EINTR); - DBG("Metadata stream id %zu written before data", - stream->relayd_stream_id); - } - } - /* Else, use the default set before which is the filesystem. */ - } - - while (len > 0) { - ret = write(outfd, stream->mmap_base + mmap_offset, len); - if (ret < 0) { - if (errno == EINTR) { - /* restart the interrupted system call */ - continue; - } else { - PERROR("Error in file write"); - if (written == 0) { - written = ret; - } - goto end; - } - } else if (ret > len) { - PERROR("ret %ld > len %lu", ret, len); - written += ret; - goto end; - } else { - len -= ret; - mmap_offset += ret; - } - DBG("UST mmap write() ret %ld (len %lu)", ret, len); - - /* This call is useless on a socket so better save a syscall. */ - if (!relayd) { - /* This won't block, but will start writeout asynchronously */ - lttng_sync_file_range(outfd, stream->out_fd_offset, ret, - SYNC_FILE_RANGE_WRITE); - stream->out_fd_offset += ret; - } - written += ret; - } - lttng_consumer_sync_trace_file(stream, orig_offset); - -end: - if (relayd && stream->metadata_flag) { - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - } - return written; -} - -/* - * Splice the data from the ring buffer to the tracefile. - * - * Returns the number of bytes spliced. + * Wrapper over the mmap() read offset from ust-ctl library. Since this can be + * compiled out, we isolate it in this library. */ -ssize_t lttng_ustconsumer_on_read_subbuffer_splice( - struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream, unsigned long len) +int lttng_ustctl_get_mmap_read_offset(struct lttng_ust_shm_handle *handle, + struct lttng_ust_lib_ring_buffer *buf, unsigned long *off) { - return -ENOSYS; -} + return ustctl_get_mmap_read_offset(handle, buf, off); +}; /* * Take a snapshot for a specific fd @@ -198,6 +92,11 @@ int lttng_ustconsumer_get_produced_snapshot( return ret; } +/* + * Receive command from session daemon and process it. + * + * Return 1 on success else a negative value or 0. + */ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll) { @@ -206,90 +105,24 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); if (ret != sizeof(msg)) { - lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + DBG("Consumer received unexpected message size %zd (expects %zu)", + ret, sizeof(msg)); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD); return ret; } if (msg.cmd_type == LTTNG_CONSUMER_STOP) { return -ENOENT; } + /* relayd needs RCU read-side lock */ + rcu_read_lock(); + switch (msg.cmd_type) { case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: { - int fd; - struct consumer_relayd_sock_pair *relayd; - - DBG("UST Consumer adding relayd socket"); - - /* Get relayd reference if exists. */ - relayd = consumer_find_relayd(msg.u.relayd_sock.net_index); - if (relayd == NULL) { - /* Not found. Allocate one. */ - relayd = consumer_allocate_relayd_sock_pair( - msg.u.relayd_sock.net_index); - if (relayd == NULL) { - lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); - goto end_nosignal; - } - } - - /* Poll on consumer socket. */ - if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { - return -EINTR; - } - - /* Get relayd socket from session daemon */ - ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); - if (ret != sizeof(fd)) { - lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); - goto end_nosignal; - } - - /* Copy socket information and received FD */ - switch (msg.u.relayd_sock.type) { - case LTTNG_STREAM_CONTROL: - /* Copy received lttcomm socket */ - lttcomm_copy_sock(&relayd->control_sock, &msg.u.relayd_sock.sock); - ret = lttcomm_create_sock(&relayd->control_sock); - if (ret < 0) { - goto end_nosignal; - } - - /* Close the created socket fd which is useless */ - close(relayd->control_sock.fd); - - /* Assign new file descriptor */ - relayd->control_sock.fd = fd; - break; - case LTTNG_STREAM_DATA: - /* Copy received lttcomm socket */ - lttcomm_copy_sock(&relayd->data_sock, &msg.u.relayd_sock.sock); - ret = lttcomm_create_sock(&relayd->data_sock); - if (ret < 0) { - goto end_nosignal; - } - - /* Close the created socket fd which is useless */ - close(relayd->data_sock.fd); - - /* Assign new file descriptor */ - relayd->data_sock.fd = fd; - break; - default: - ERR("Unknown relayd socket type"); - goto end_nosignal; - } - - DBG("Consumer %s socket created successfully with net idx %d (fd: %d)", - msg.u.relayd_sock.type == LTTNG_STREAM_CONTROL ? "control" : "data", - relayd->net_seq_idx, fd); - - /* - * Add relayd socket pair to consumer data hashtable. If object already - * exists or on error, the function gracefully returns. - */ - consumer_add_relayd(relayd); - + ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index, + msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll, + &msg.u.relayd_sock.sock); goto end_nosignal; } case LTTNG_CONSUMER_ADD_CHANNEL: @@ -298,13 +131,17 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int fds[1]; size_t nb_fd = 1; + DBG("UST Consumer adding channel"); + /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + rcu_read_unlock(); return -EINTR; } ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd); if (ret != sizeof(fds)) { - lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD); + rcu_read_unlock(); return ret; } @@ -313,9 +150,10 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, new_channel = consumer_allocate_channel(msg.u.channel.channel_key, fds[0], -1, msg.u.channel.mmap_len, - msg.u.channel.max_sb_size); + msg.u.channel.max_sb_size, + msg.u.channel.nb_init_streams); if (new_channel == NULL) { - lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; } if (ctx->on_recv_channel != NULL) { @@ -333,22 +171,30 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, case LTTNG_CONSUMER_ADD_STREAM: { struct lttng_consumer_stream *new_stream; - int fds[2]; + int fds[2], stream_pipe; size_t nb_fd = 2; struct consumer_relayd_sock_pair *relayd = NULL; + int alloc_ret = 0; + + DBG("UST Consumer adding stream"); /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + rcu_read_unlock(); return -EINTR; } ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd); if (ret != sizeof(fds)) { - lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD); + rcu_read_unlock(); return ret; } + DBG("Consumer command ADD_STREAM chan %d stream %d", + msg.u.stream.channel_key, msg.u.stream.stream_key); + assert(msg.u.stream.output == LTTNG_EVENT_MMAP); - new_stream = consumer_allocate_stream(msg.u.channel.channel_key, + new_stream = consumer_allocate_stream(msg.u.stream.channel_key, msg.u.stream.stream_key, fds[0], fds[1], msg.u.stream.state, @@ -358,10 +204,25 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.stream.uid, msg.u.stream.gid, msg.u.stream.net_index, - msg.u.stream.metadata_flag); + msg.u.stream.metadata_flag, + msg.u.stream.session_id, + &alloc_ret); if (new_stream == NULL) { - lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); - goto end; + switch (alloc_ret) { + case -ENOMEM: + case -EINVAL: + default: + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); + break; + case -ENOENT: + /* + * We could not find the channel. Can happen if cpu hotplug + * happens while tearing down. + */ + DBG3("Could not find channel"); + break; + } + goto end_nosignal; } /* The stream is not metadata. Get relayd reference if exists. */ @@ -374,69 +235,109 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, &new_stream->relayd_stream_id); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { - goto end; + consumer_del_stream(new_stream, NULL); + goto end_nosignal; } } else if (msg.u.stream.net_index != -1) { ERR("Network sequence index %d unknown. Not adding stream.", msg.u.stream.net_index); - free(new_stream); - goto end; + consumer_del_stream(new_stream, NULL); + goto end_nosignal; } - if (ctx->on_recv_stream != NULL) { + /* Do actions once stream has been received. */ + if (ctx->on_recv_stream) { ret = ctx->on_recv_stream(new_stream); - if (ret == 0) { - consumer_add_stream(new_stream); - } else if (ret < 0) { - goto end; + if (ret < 0) { + consumer_del_stream(new_stream, NULL); + goto end_nosignal; } + } + + /* Get the right pipe where the stream will be sent. */ + if (new_stream->metadata_flag) { + stream_pipe = ctx->consumer_metadata_pipe[1]; } else { - consumer_add_stream(new_stream); + stream_pipe = ctx->consumer_data_pipe[1]; + } + + do { + ret = write(stream_pipe, &new_stream, sizeof(new_stream)); + } while (ret < 0 && errno == EINTR); + if (ret < 0) { + PERROR("Consumer write %s stream to pipe %d", + new_stream->metadata_flag ? "metadata" : "data", + stream_pipe); + consumer_del_stream(new_stream, NULL); + goto end_nosignal; } - DBG("UST consumer_add_stream %s (%d,%d) with relayd id %lu", + DBG("UST consumer ADD_STREAM %s (%d,%d) with relayd id %" PRIu64, msg.u.stream.path_name, fds[0], fds[1], new_stream->relayd_stream_id); break; } + case LTTNG_CONSUMER_DESTROY_RELAYD: + { + uint64_t index = msg.u.destroy_relayd.net_seq_idx; + struct consumer_relayd_sock_pair *relayd; + + DBG("UST consumer destroying relayd %" PRIu64, index); + + /* Get relayd reference if exists. */ + relayd = consumer_find_relayd(index); + if (relayd == NULL) { + ERR("Unable to find relayd %" PRIu64, index); + goto end_nosignal; + } + + /* + * Each relayd socket pair has a refcount of stream attached to it + * which tells if the relayd is still active or not depending on the + * refcount value. + * + * This will set the destroy flag of the relayd object and destroy it + * if the refcount reaches zero when called. + * + * The destroy can happen either here or when a stream fd hangs up. + */ + consumer_flag_relayd_for_destroy(relayd); + + goto end_nosignal; + } case LTTNG_CONSUMER_UPDATE_STREAM: { + rcu_read_unlock(); return -ENOSYS; -#if 0 - if (ctx->on_update_stream != NULL) { - ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state); - if (ret == 0) { - consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state); - } else if (ret < 0) { - goto end; - } - } else { - consumer_change_stream_state(msg.u.stream.stream_key, - msg.u.stream.state); + } + case LTTNG_CONSUMER_DATA_AVAILABLE: + { + int32_t ret; + uint64_t id = msg.u.data_available.session_id; + + DBG("UST consumer data available command for id %" PRIu64, id); + + ret = consumer_data_available(id); + + /* Send back returned value to session daemon */ + ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret)); + if (ret < 0) { + PERROR("send data available ret code"); } -#endif break; } default: break; } -end: + +end_nosignal: + rcu_read_unlock(); + /* - * Wake-up the other end by writing a null byte in the pipe - * (non-blocking). Important note: Because writing into the - * pipe is non-blocking (and therefore we allow dropping wakeup - * data, as long as there is wakeup data present in the pipe - * buffer to wake up the other end), the other end should - * perform the following sequence for waiting: - * 1) empty the pipe (reads). - * 2) perform update operation. - * 3) wait on the pipe (poll). + * Return 1 to indicate success since the 0 value can be a socket + * shutdown during the recv() or send() call. */ - do { - ret = write(ctx->consumer_poll_pipe[1], "", 1); - } while (ret == -1UL && errno == EINTR); -end_nosignal: - return 0; + return 1; } int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan) @@ -468,7 +369,7 @@ void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan) ustctl_unmap_channel(chan->handle); } -int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream) +int lttng_ustconsumer_add_stream(struct lttng_consumer_stream *stream) { struct lttng_ust_object_data obj; int ret; @@ -478,21 +379,35 @@ int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream) obj.wait_fd = stream->wait_fd; obj.memory_map_size = stream->mmap_len; ret = ustctl_add_stream(stream->chan->handle, &obj); - if (ret) - return ret; + if (ret) { + ERR("UST ctl add_stream failed with ret %d", ret); + goto error; + } + stream->buf = ustctl_open_stream_read(stream->chan->handle, stream->cpu); - if (!stream->buf) - return -EBUSY; + if (!stream->buf) { + ERR("UST ctl open_stream_read failed"); + ret = -EBUSY; + goto error; + } + /* ustctl_open_stream_read has closed the shm fd. */ stream->wait_fd_is_copy = 1; stream->shm_fd = -1; stream->mmap_base = ustctl_get_mmap_base(stream->chan->handle, stream->buf); if (!stream->mmap_base) { - return -EINVAL; + ERR("UST ctl get_mmap_base failed"); + ret = -EINVAL; + goto mmap_error; } return 0; + +mmap_error: + ustctl_close_stream_read(stream->chan->handle, stream->buf); +error: + return ret; } void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream) @@ -504,7 +419,7 @@ void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream) int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { - unsigned long len; + unsigned long len, subbuf_size, padding; int err; long ret = 0; struct lttng_ust_shm_handle *handle; @@ -531,7 +446,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, /* Get the next subbuffer */ err = ustctl_get_next_subbuf(handle, buf); if (err != 0) { - ret = -ret; /* ustctl_get_next_subbuf returns negative, caller expect positive. */ + ret = err; /* ustctl_get_next_subbuf returns negative, caller expect positive. */ /* * This is a debug message even for single-threaded consumer, * because poll() have more relaxed criterions than get subbuf, @@ -543,17 +458,33 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, goto end; } assert(stream->output == LTTNG_EVENT_MMAP); - /* read the used subbuffer size */ + /* Get the full padded subbuffer size */ err = ustctl_get_padded_subbuf_size(handle, buf, &len); assert(err == 0); + + /* Get subbuffer data size (without padding) */ + err = ustctl_get_subbuf_size(handle, buf, &subbuf_size); + assert(err == 0); + + /* Make sure we don't get a subbuffer size bigger than the padded */ + assert(len >= subbuf_size); + + padding = len - subbuf_size; /* write the subbuffer to the tracefile */ - ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len); - if (ret != len) { + ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding); + /* + * The mmap operation should write subbuf_size amount of data when network + * streaming or the full padding (len) size when we are _not_ streaming. + */ + if ((ret != subbuf_size && stream->net_seq_idx != -1) || + (ret != len && stream->net_seq_idx == -1)) { /* - * display the error but continue processing to try - * to release the subbuffer + * Display the error but continue processing to try to release the + * subbuffer */ - ERR("Error writing to tracefile"); + ERR("Error writing to tracefile " + "(ret: %zd != len: %lu != subbuf_size: %lu)", + ret, len, subbuf_size); } err = ustctl_put_next_subbuf(handle, buf); assert(err == 0); @@ -579,9 +510,58 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream) stream->out_fd = ret; } + ret = lttng_ustconsumer_add_stream(stream); + if (ret) { + consumer_del_stream(stream, NULL); + ret = -1; + goto error; + } + /* we return 0 to let the library handle the FD internally */ return 0; error: return ret; } + +/* + * Check if data is still being extracted from the buffers for a specific + * stream. Consumer data lock MUST be acquired before calling this function. + * + * Return 0 if the traced data are still getting read else 1 meaning that the + * data is available for trace viewer reading. + */ +int lttng_ustconsumer_data_available(struct lttng_consumer_stream *stream) +{ + int ret; + + assert(stream); + + DBG("UST consumer checking data availability"); + + /* + * Try to lock the stream mutex. On failure, we know that the stream is + * being used else where hence there is data still being extracted. + */ + ret = pthread_mutex_trylock(&stream->lock); + if (ret == EBUSY) { + goto data_not_available; + } + /* The stream is now locked so we can do our ustctl calls */ + + ret = ustctl_get_next_subbuf(stream->chan->handle, stream->buf); + if (ret == 0) { + /* There is still data so let's put back this subbuffer. */ + ret = ustctl_put_subbuf(stream->chan->handle, stream->buf); + assert(ret == 0); + pthread_mutex_unlock(&stream->lock); + goto data_not_available; + } + + /* Data is available to be read for this stream. */ + pthread_mutex_unlock(&stream->lock); + return 1; + +data_not_available: + return 0; +}