X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=a2980e77d29c33b57c4a12889cb82db019956fa0;hp=f093f0cf5caab19bdf0108cc940021711447b43d;hb=d09e1200ec761aef77c721bd648a299eefcc8565;hpb=fb3a43a9284f3300e9b66edc2f2c2d2767895423 diff --git a/src/common/consumer.c b/src/common/consumer.c index f093f0cf5..a2980e77d 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -175,7 +175,7 @@ static void consumer_rcu_free_relayd(struct rcu_head *head) * * This function MUST be called with the consumer_data lock acquired. */ -void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd) +static void destroy_relayd(struct consumer_relayd_sock_pair *relayd) { int ret; struct lttng_ht_iter iter; @@ -218,7 +218,7 @@ void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd) /* Destroy the relayd if refcount is 0 */ if (uatomic_read(&relayd->refcount) == 0) { - consumer_destroy_relayd(relayd); + destroy_relayd(relayd); } } @@ -314,7 +314,7 @@ void consumer_del_stream(struct lttng_consumer_stream *stream) /* Both conditions are met, we destroy the relayd. */ if (uatomic_read(&relayd->refcount) == 0 && uatomic_read(&relayd->destroy_flag)) { - consumer_destroy_relayd(relayd); + destroy_relayd(relayd); } } rcu_read_unlock(); @@ -452,8 +452,7 @@ end: * Add relayd socket to global consumer data hashtable. RCU read side lock MUST * be acquired before calling this. */ - -int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd) +static int add_relayd(struct consumer_relayd_sock_pair *relayd) { int ret = 0; struct lttng_ht_node_ulong *node; @@ -542,7 +541,8 @@ error: * Return destination file descriptor or negative value on error. */ static int write_relayd_stream_header(struct lttng_consumer_stream *stream, - size_t data_size, struct consumer_relayd_sock_pair *relayd) + size_t data_size, unsigned long padding, + struct consumer_relayd_sock_pair *relayd) { int outfd = -1, ret; struct lttcomm_relayd_data_hdr data_hdr; @@ -567,6 +567,7 @@ static int write_relayd_stream_header(struct lttng_consumer_stream *stream, /* Set header with stream information */ data_hdr.stream_id = htobe64(stream->relayd_stream_id); data_hdr.data_size = htobe32(data_size); + data_hdr.padding_size = htobe32(padding); data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++); /* Other fields are zeroed previously */ @@ -1094,22 +1095,23 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) */ static int write_relayd_metadata_id(int fd, struct lttng_consumer_stream *stream, - struct consumer_relayd_sock_pair *relayd) + struct consumer_relayd_sock_pair *relayd, + unsigned long padding) { int ret; - uint64_t metadata_id; + struct lttcomm_relayd_metadata_payload hdr; - metadata_id = htobe64(stream->relayd_stream_id); + hdr.stream_id = htobe64(stream->relayd_stream_id); + hdr.padding_size = htobe32(padding); do { - ret = write(fd, (void *) &metadata_id, - sizeof(stream->relayd_stream_id)); + ret = write(fd, (void *) &hdr, sizeof(hdr)); } while (ret < 0 && errno == EINTR); if (ret < 0) { PERROR("write metadata stream id"); goto end; } - DBG("Metadata stream id %" PRIu64 " written before data", - stream->relayd_stream_id); + DBG("Metadata stream id %" PRIu64 " with padding %lu written before data", + stream->relayd_stream_id, padding); end: return ret; @@ -1126,7 +1128,8 @@ end: */ ssize_t lttng_consumer_on_read_subbuffer_mmap( struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream, unsigned long len) + struct lttng_consumer_stream *stream, unsigned long len, + unsigned long padding) { unsigned long mmap_offset; ssize_t ret = 0, written = 0; @@ -1178,17 +1181,17 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( if (stream->metadata_flag) { /* Metadata requires the control socket. */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); - netlen += sizeof(stream->relayd_stream_id); + netlen += sizeof(struct lttcomm_relayd_metadata_payload); } - ret = write_relayd_stream_header(stream, netlen, relayd); + ret = write_relayd_stream_header(stream, netlen, padding, relayd); if (ret >= 0) { /* Use the returned socket. */ outfd = ret; /* Write metadata stream id before payload */ if (stream->metadata_flag) { - ret = write_relayd_metadata_id(outfd, stream, relayd); + ret = write_relayd_metadata_id(outfd, stream, relayd, padding); if (ret < 0) { written = ret; goto end; @@ -1196,12 +1199,16 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( } } /* Else, use the default set before which is the filesystem. */ + } else { + /* No streaming, we have to set the len with the full padding */ + len += padding; } while (len > 0) { do { ret = write(outfd, stream->mmap_base + mmap_offset, len); } while (ret < 0 && errno == EINTR); + DBG("Consumer mmap write() ret %zd (len %lu)", ret, len); if (ret < 0) { PERROR("Error in file write"); if (written == 0) { @@ -1216,7 +1223,6 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( len -= ret; mmap_offset += ret; } - DBG("Consumer mmap write() ret %zd (len %lu)", ret, len); /* This call is useless on a socket so better save a syscall. */ if (!relayd) { @@ -1246,7 +1252,8 @@ end: */ ssize_t lttng_consumer_on_read_subbuffer_splice( struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream, unsigned long len) + struct lttng_consumer_stream *stream, unsigned long len, + unsigned long padding) { ssize_t ret = 0, written = 0, ret_splice = 0; loff_t offset = 0; @@ -1292,23 +1299,42 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( } /* Write metadata stream id before payload */ - if (stream->metadata_flag && relayd) { - /* - * Lock the control socket for the complete duration of the function - * since from this point on we will use the socket. - */ - pthread_mutex_lock(&relayd->ctrl_sock_mutex); + if (relayd) { + int total_len = len; - ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd); - if (ret < 0) { - written = ret; + if (stream->metadata_flag) { + /* + * Lock the control socket for the complete duration of the function + * since from this point on we will use the socket. + */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + + ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd, + padding); + if (ret < 0) { + written = ret; + goto end; + } + + total_len += sizeof(struct lttcomm_relayd_metadata_payload); + } + + ret = write_relayd_stream_header(stream, total_len, padding, relayd); + if (ret >= 0) { + /* Use the returned socket. */ + outfd = ret; + } else { + ERR("Remote relayd disconnected. Stopping"); goto end; } + } else { + /* No streaming, we have to set the len with the full padding */ + len += padding; } while (len > 0) { - DBG("splice chan to pipe offset %lu of len %lu (fd : %d)", - (unsigned long)offset, len, fd); + DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)", + (unsigned long)offset, len, fd, splice_pipe[1]); ret_splice = splice(fd, &offset, splice_pipe[1], NULL, len, SPLICE_F_MOVE | SPLICE_F_MORE); DBG("splice chan to pipe, ret %zd", ret_splice); @@ -1324,30 +1350,24 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( /* Handle stream on the relayd if the output is on the network */ if (relayd) { if (stream->metadata_flag) { + size_t metadata_payload_size = + sizeof(struct lttcomm_relayd_metadata_payload); + /* Update counter to fit the spliced data */ - ret_splice += sizeof(stream->relayd_stream_id); - len += sizeof(stream->relayd_stream_id); + ret_splice += metadata_payload_size; + len += metadata_payload_size; /* * We do this so the return value can match the len passed as * argument to this function. */ - written -= sizeof(stream->relayd_stream_id); - } - - ret = write_relayd_stream_header(stream, ret_splice, relayd); - if (ret >= 0) { - /* Use the returned socket. */ - outfd = ret; - } else { - ERR("Remote relayd disconnected. Stopping"); - goto end; + written -= metadata_payload_size; } } /* Splice data out */ ret_splice = splice(splice_pipe[0], NULL, outfd, NULL, ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE); - DBG("Kernel consumer splice pipe to file, ret %zd", ret_splice); + DBG("Consumer splice pipe to file, ret %zd", ret_splice); if (ret_splice < 0) { PERROR("Error in file splice"); if (written == 0) { @@ -1482,12 +1502,14 @@ static void destroy_stream_ht(struct lttng_ht *ht) return; } + rcu_read_lock(); cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { ret = lttng_ht_del(ht, &iter); assert(!ret); free(stream); } + rcu_read_unlock(); lttng_ht_destroy(ht); } @@ -1573,7 +1595,7 @@ static void consumer_del_metadata_stream(struct lttng_consumer_stream *stream) /* Both conditions are met, we destroy the relayd. */ if (uatomic_read(&relayd->refcount) == 0 && uatomic_read(&relayd->destroy_flag)) { - consumer_destroy_relayd(relayd); + destroy_relayd(relayd); } } rcu_read_unlock(); @@ -1675,7 +1697,7 @@ restart: /* Check the metadata pipe for incoming metadata. */ if (pollfd == ctx->consumer_metadata_pipe[0]) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLNVAL)) { + if (revents & (LPOLLERR | LPOLLHUP )) { DBG("Metadata thread pipe hung up"); /* * Remove the pipe from the poll set and continue the loop @@ -1709,9 +1731,11 @@ restart: DBG("Adding metadata stream %d to poll set", stream->wait_fd); + rcu_read_lock(); /* The node should be init at this point */ lttng_ht_add_unique_ulong(metadata_ht, &stream->waitfd_node); + rcu_read_unlock(); /* Add metadata stream to the global poll events list */ lttng_poll_add(&events, stream->wait_fd, @@ -1726,11 +1750,13 @@ restart: /* From here, the event is a metadata wait fd */ + rcu_read_lock(); lttng_ht_lookup(metadata_ht, (void *)((unsigned long) pollfd), &iter); node = lttng_ht_iter_get_node_ulong(&iter); if (node == NULL) { /* FD not found, continue loop */ + rcu_read_unlock(); continue; } @@ -1745,6 +1771,7 @@ restart: len = ctx->on_buffer_ready(stream, ctx); /* It's ok to have an unavailable sub-buffer */ if (len < 0 && len != -EAGAIN) { + rcu_read_unlock(); goto end; } else if (len > 0) { stream->data_read = 1; @@ -1755,7 +1782,7 @@ restart: * Remove the stream from the hash table since there is no data * left on the fd because we previously did a read on the buffer. */ - if (revents & (LPOLLERR | LPOLLHUP | LPOLLNVAL)) { + if (revents & (LPOLLERR | LPOLLHUP)) { DBG("Metadata fd %d is hup|err|nval.", pollfd); if (!stream->hangup_flush_done && (consumer_data.type == LTTNG_CONSUMER32_UST @@ -1767,15 +1794,18 @@ restart: len = ctx->on_buffer_ready(stream, ctx); /* It's ok to have an unavailable sub-buffer */ if (len < 0 && len != -EAGAIN) { + rcu_read_unlock(); goto end; } } /* Removing it from hash table, poll set and free memory */ lttng_ht_del(metadata_ht, &iter); + lttng_poll_del(&events, stream->wait_fd); consumer_del_metadata_stream(stream); } + rcu_read_unlock(); } } @@ -2269,7 +2299,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, * Add relayd socket pair to consumer data hashtable. If object already * exists or on error, the function gracefully returns. */ - consumer_add_relayd(relayd); + add_relayd(relayd); /* All good! */ ret = 0;