X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=008bf8e54904d71158400216393f0fc6c5786e63;hp=831592a1e96ef4a8a22b64fdc29ffeaa75f62786;hb=f73fabfda365d22e7dd180fb1614e37c446fbd9e;hpb=b0b335c8c5a963f24dbedb4e597f23ed66c5b915 diff --git a/src/common/consumer.c b/src/common/consumer.c index 831592a1e..008bf8e54 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -27,6 +27,7 @@ #include #include #include +#include #include #include @@ -177,11 +178,18 @@ void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd) int ret; struct lttng_ht_iter iter; + if (relayd == NULL) { + return; + } + DBG("Consumer destroy and close relayd socket pair"); iter.iter.node = &relayd->node.node; ret = lttng_ht_del(consumer_data.relayd_ht, &iter); - assert(!ret); + if (ret != 0) { + /* We assume the relayd was already destroyed */ + return; + } /* Close all sockets */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); @@ -193,6 +201,25 @@ void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd) call_rcu(&relayd->node.head, consumer_rcu_free_relayd); } +/* + * Flag a relayd socket pair for destruction. Destroy it if the refcount + * reaches zero. + * + * RCU read side lock MUST be aquired before calling this function. + */ +void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd) +{ + assert(relayd); + + /* Set destroy flag for this object */ + uatomic_set(&relayd->destroy_flag, 1); + + /* Destroy the relayd if refcount is 0 */ + if (uatomic_read(&relayd->refcount) == 0) { + consumer_destroy_relayd(relayd); + } +} + /* * Remove a stream from the global list protected by a mutex. This * function is also responsible for freeing its data structures. @@ -266,8 +293,25 @@ void consumer_del_stream(struct lttng_consumer_stream *stream) if (relayd != NULL) { uatomic_dec(&relayd->refcount); assert(uatomic_read(&relayd->refcount) >= 0); - if (uatomic_read(&relayd->refcount) == 0) { - /* Refcount of the relayd struct is 0, destroy it */ + + /* Closing streams requires to lock the control socket. */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_send_close_stream(&relayd->control_sock, + stream->relayd_stream_id, + stream->next_net_seq_num - 1); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0) { + DBG("Unable to close stream on the relayd. Continuing"); + /* + * Continue here. There is nothing we can do for the relayd. + * Chances are that the relayd has closed the socket so we just + * continue cleaning up. + */ + } + + /* Both conditions are met, we destroy the relayd. */ + if (uatomic_read(&relayd->refcount) == 0 && + uatomic_read(&relayd->destroy_flag)) { consumer_destroy_relayd(relayd); } } @@ -417,8 +461,10 @@ end: } /* - * Add relayd socket to global consumer data hashtable. + * Add relayd socket to global consumer data hashtable. RCU read side lock MUST + * be acquired before calling this. */ + int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd) { int ret = 0; @@ -430,20 +476,15 @@ int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd) goto end; } - rcu_read_lock(); - lttng_ht_lookup(consumer_data.relayd_ht, (void *)((unsigned long) relayd->net_seq_idx), &iter); node = lttng_ht_iter_get_node_ulong(&iter); if (node != NULL) { - rcu_read_unlock(); /* Relayd already exist. Ignore the insertion */ goto end; } lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node); - rcu_read_unlock(); - end: return ret; } @@ -469,6 +510,7 @@ struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair( obj->net_seq_idx = net_seq_idx; obj->refcount = 0; + obj->destroy_flag = 0; lttng_ht_node_init_ulong(&obj->node, obj->net_seq_idx); pthread_mutex_init(&obj->ctrl_sock_mutex, NULL); @@ -511,28 +553,19 @@ error: * * Return destination file descriptor or negative value on error. */ -int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, - size_t data_size) +static int write_relayd_stream_header(struct lttng_consumer_stream *stream, + size_t data_size, struct consumer_relayd_sock_pair *relayd) { int outfd = -1, ret; - struct consumer_relayd_sock_pair *relayd; struct lttcomm_relayd_data_hdr data_hdr; /* Safety net */ assert(stream); + assert(relayd); /* Reset data header */ memset(&data_hdr, 0, sizeof(data_hdr)); - rcu_read_lock(); - /* Get relayd reference of the stream. */ - relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd == NULL) { - /* Stream is either local or corrupted */ - goto error; - } - - DBG("Consumer found relayd socks with index %d", stream->net_seq_idx); if (stream->metadata_flag) { /* Caller MUST acquire the relayd control socket lock */ ret = relayd_send_metadata(&relayd->control_sock, data_size); @@ -546,6 +579,7 @@ int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, /* Set header with stream information */ data_hdr.stream_id = htobe64(stream->relayd_stream_id); data_hdr.data_size = htobe32(data_size); + data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++); /* Other fields are zeroed previously */ ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr, @@ -559,7 +593,6 @@ int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, } error: - rcu_read_unlock(); return outfd; } @@ -1060,7 +1093,37 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) } /* - * Mmap the ring buffer, read it and write the data to the tracefile. + * Write the metadata stream id on the specified file descriptor. + */ +static int write_relayd_metadata_id(int fd, + struct lttng_consumer_stream *stream, + struct consumer_relayd_sock_pair *relayd) +{ + int ret; + uint64_t metadata_id; + + metadata_id = htobe64(stream->relayd_stream_id); + do { + ret = write(fd, (void *) &metadata_id, + sizeof(stream->relayd_stream_id)); + } while (ret < 0 && errno == EINTR); + if (ret < 0) { + PERROR("write metadata stream id"); + goto end; + } + DBG("Metadata stream id %" PRIu64 " written before data", + stream->relayd_stream_id); + +end: + return ret; +} + +/* + * Mmap the ring buffer, read it and write the data to the tracefile. This is a + * core function for writing trace buffers to either the local filesystem or + * the network. + * + * Careful review MUST be put if any changes occur! * * Returns the number of bytes written */ @@ -1068,18 +1131,115 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len) { + unsigned long mmap_offset; + ssize_t ret = 0, written = 0; + off_t orig_offset = stream->out_fd_offset; + /* Default is on the disk */ + int outfd = stream->out_fd; + struct consumer_relayd_sock_pair *relayd = NULL; + + /* RCU lock for the relayd pointer */ + rcu_read_lock(); + + /* Flag that the current stream if set for network streaming. */ + if (stream->net_seq_idx != -1) { + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd == NULL) { + goto end; + } + } + + /* get the offset inside the fd to mmap */ switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: - return lttng_kconsumer_on_read_subbuffer_mmap(ctx, stream, len); + ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset); + break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: - return lttng_ustconsumer_on_read_subbuffer_mmap(ctx, stream, len); + ret = lttng_ustctl_get_mmap_read_offset(stream->chan->handle, + stream->buf, &mmap_offset); + break; default: ERR("Unknown consumer_data type"); assert(0); } + if (ret != 0) { + errno = -ret; + PERROR("tracer ctl get_mmap_read_offset"); + written = ret; + goto end; + } - return 0; + /* Handle stream on the relayd if the output is on the network */ + if (relayd) { + unsigned long netlen = len; + + /* + * Lock the control socket for the complete duration of the function + * since from this point on we will use the socket. + */ + if (stream->metadata_flag) { + /* Metadata requires the control socket. */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + netlen += sizeof(stream->relayd_stream_id); + } + + ret = write_relayd_stream_header(stream, netlen, relayd); + if (ret >= 0) { + /* Use the returned socket. */ + outfd = ret; + + /* Write metadata stream id before payload */ + if (stream->metadata_flag) { + ret = write_relayd_metadata_id(outfd, stream, relayd); + if (ret < 0) { + written = ret; + goto end; + } + } + } + /* Else, use the default set before which is the filesystem. */ + } + + while (len > 0) { + do { + ret = write(outfd, stream->mmap_base + mmap_offset, len); + } while (ret < 0 && errno == EINTR); + if (ret < 0) { + PERROR("Error in file write"); + if (written == 0) { + written = ret; + } + goto end; + } else if (ret > len) { + PERROR("Error in file write (ret %zd > len %lu)", ret, len); + written += ret; + goto end; + } else { + len -= ret; + mmap_offset += ret; + } + DBG("Consumer mmap write() ret %zd (len %lu)", ret, len); + + /* This call is useless on a socket so better save a syscall. */ + if (!relayd) { + /* This won't block, but will start writeout asynchronously */ + lttng_sync_file_range(outfd, stream->out_fd_offset, ret, + SYNC_FILE_RANGE_WRITE); + stream->out_fd_offset += ret; + } + written += ret; + } + lttng_consumer_sync_trace_file(stream, orig_offset); + +end: + /* Unlock only if ctrl socket used */ + if (relayd && stream->metadata_flag) { + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + } + + rcu_read_unlock(); + return written; } /* @@ -1091,18 +1251,151 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len) { + ssize_t ret = 0, written = 0, ret_splice = 0; + loff_t offset = 0; + off_t orig_offset = stream->out_fd_offset; + int fd = stream->wait_fd; + /* Default is on the disk */ + int outfd = stream->out_fd; + struct consumer_relayd_sock_pair *relayd = NULL; + switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: - return lttng_kconsumer_on_read_subbuffer_splice(ctx, stream, len); + break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: + /* Not supported for user space tracing */ return -ENOSYS; default: ERR("Unknown consumer_data type"); assert(0); - return -ENOSYS; } + /* RCU lock for the relayd pointer */ + rcu_read_lock(); + + /* Flag that the current stream if set for network streaming. */ + if (stream->net_seq_idx != -1) { + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd == NULL) { + goto end; + } + } + + /* Write metadata stream id before payload */ + if (stream->metadata_flag && relayd) { + /* + * Lock the control socket for the complete duration of the function + * since from this point on we will use the socket. + */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + + ret = write_relayd_metadata_id(ctx->consumer_thread_pipe[1], + stream, relayd); + if (ret < 0) { + written = ret; + goto end; + } + } + + while (len > 0) { + DBG("splice chan to pipe offset %lu of len %lu (fd : %d)", + (unsigned long)offset, len, fd); + ret_splice = splice(fd, &offset, ctx->consumer_thread_pipe[1], NULL, len, + SPLICE_F_MOVE | SPLICE_F_MORE); + DBG("splice chan to pipe, ret %zd", ret_splice); + if (ret_splice < 0) { + PERROR("Error in relay splice"); + if (written == 0) { + written = ret_splice; + } + ret = errno; + goto splice_error; + } + + /* Handle stream on the relayd if the output is on the network */ + if (relayd) { + if (stream->metadata_flag) { + /* Update counter to fit the spliced data */ + ret_splice += sizeof(stream->relayd_stream_id); + len += sizeof(stream->relayd_stream_id); + /* + * We do this so the return value can match the len passed as + * argument to this function. + */ + written -= sizeof(stream->relayd_stream_id); + } + + ret = write_relayd_stream_header(stream, ret_splice, relayd); + if (ret >= 0) { + /* Use the returned socket. */ + outfd = ret; + } else { + ERR("Remote relayd disconnected. Stopping"); + goto end; + } + } + + /* Splice data out */ + ret_splice = splice(ctx->consumer_thread_pipe[0], NULL, outfd, NULL, + ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE); + DBG("Kernel consumer splice pipe to file, ret %zd", ret_splice); + if (ret_splice < 0) { + PERROR("Error in file splice"); + if (written == 0) { + written = ret_splice; + } + ret = errno; + goto splice_error; + } else if (ret_splice > len) { + errno = EINVAL; + PERROR("Wrote more data than requested %zd (len: %lu)", + ret_splice, len); + written += ret_splice; + ret = errno; + goto splice_error; + } + len -= ret_splice; + + /* This call is useless on a socket so better save a syscall. */ + if (!relayd) { + /* This won't block, but will start writeout asynchronously */ + lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice, + SYNC_FILE_RANGE_WRITE); + stream->out_fd_offset += ret_splice; + } + written += ret_splice; + } + lttng_consumer_sync_trace_file(stream, orig_offset); + + ret = ret_splice; + + goto end; + +splice_error: + /* send the appropriate error description to sessiond */ + switch (ret) { + case EBADF: + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EBADF); + break; + case EINVAL: + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL); + break; + case ENOMEM: + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ENOMEM); + break; + case ESPIPE: + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ESPIPE); + break; + } + +end: + if (relayd && stream->metadata_flag) { + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + } + + rcu_read_unlock(); + return written; } /* @@ -1230,7 +1523,7 @@ void *lttng_consumer_thread_poll_fds(void *data) metadata_ht); if (ret < 0) { ERR("Error in allocating pollfd or local_outfds"); - lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR); pthread_mutex_unlock(&consumer_data.lock); goto end; } @@ -1256,7 +1549,7 @@ void *lttng_consumer_thread_poll_fds(void *data) goto restart; } perror("Poll error"); - lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR); goto end; } else if (num_rdy == 0) { DBG("Polling thread timed out"); @@ -1433,7 +1726,7 @@ void *lttng_consumer_thread_receive_fds(void *data) } DBG("Sending ready command to lttng-sessiond"); - ret = lttng_consumer_send_error(ctx, CONSUMERD_COMMAND_SOCK_READY); + ret = lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_COMMAND_SOCK_READY); /* return < 0 on error, but == 0 is not fatal */ if (ret < 0) { ERR("Error sending ready command to lttng-sessiond"); @@ -1567,3 +1860,95 @@ void lttng_consumer_init(void) consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); } + +/* + * Process the ADD_RELAYD command receive by a consumer. + * + * This will create a relayd socket pair and add it to the relayd hash table. + * The caller MUST acquire a RCU read side lock before calling it. + */ +int consumer_add_relayd_socket(int net_seq_idx, int sock_type, + struct lttng_consumer_local_data *ctx, int sock, + struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock) +{ + int fd, ret = -1; + struct consumer_relayd_sock_pair *relayd; + + DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx); + + /* Get relayd reference if exists. */ + relayd = consumer_find_relayd(net_seq_idx); + if (relayd == NULL) { + /* Not found. Allocate one. */ + relayd = consumer_allocate_relayd_sock_pair(net_seq_idx); + if (relayd == NULL) { + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); + goto error; + } + } + + /* Poll on consumer socket. */ + if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + ret = -EINTR; + goto error; + } + + /* Get relayd socket from session daemon */ + ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); + if (ret != sizeof(fd)) { + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD); + ret = -1; + goto error; + } + + /* Copy socket information and received FD */ + switch (sock_type) { + case LTTNG_STREAM_CONTROL: + /* Copy received lttcomm socket */ + lttcomm_copy_sock(&relayd->control_sock, relayd_sock); + ret = lttcomm_create_sock(&relayd->control_sock); + if (ret < 0) { + goto error; + } + + /* Close the created socket fd which is useless */ + close(relayd->control_sock.fd); + + /* Assign new file descriptor */ + relayd->control_sock.fd = fd; + break; + case LTTNG_STREAM_DATA: + /* Copy received lttcomm socket */ + lttcomm_copy_sock(&relayd->data_sock, relayd_sock); + ret = lttcomm_create_sock(&relayd->data_sock); + if (ret < 0) { + goto error; + } + + /* Close the created socket fd which is useless */ + close(relayd->data_sock.fd); + + /* Assign new file descriptor */ + relayd->data_sock.fd = fd; + break; + default: + ERR("Unknown relayd socket type (%d)", sock_type); + goto error; + } + + DBG("Consumer %s socket created successfully with net idx %d (fd: %d)", + sock_type == LTTNG_STREAM_CONTROL ? "control" : "data", + relayd->net_seq_idx, fd); + + /* + * Add relayd socket pair to consumer data hashtable. If object already + * exists or on error, the function gracefully returns. + */ + consumer_add_relayd(relayd); + + /* All good! */ + ret = 0; + +error: + return ret; +}