X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=72c820d90a11ec11d61e2e9024cb8f44f1ad009e;hp=f093f0cf5caab19bdf0108cc940021711447b43d;hb=4adabd6161a6decfcd21108b45df6959c34de74c;hpb=fb3a43a9284f3300e9b66edc2f2c2d2767895423 diff --git a/src/common/consumer.c b/src/common/consumer.c index f093f0cf5..72c820d90 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -542,7 +542,8 @@ error: * Return destination file descriptor or negative value on error. */ static int write_relayd_stream_header(struct lttng_consumer_stream *stream, - size_t data_size, struct consumer_relayd_sock_pair *relayd) + size_t data_size, unsigned long padding, + struct consumer_relayd_sock_pair *relayd) { int outfd = -1, ret; struct lttcomm_relayd_data_hdr data_hdr; @@ -567,6 +568,7 @@ static int write_relayd_stream_header(struct lttng_consumer_stream *stream, /* Set header with stream information */ data_hdr.stream_id = htobe64(stream->relayd_stream_id); data_hdr.data_size = htobe32(data_size); + data_hdr.padding_size = htobe32(padding); data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++); /* Other fields are zeroed previously */ @@ -1094,22 +1096,23 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) */ static int write_relayd_metadata_id(int fd, struct lttng_consumer_stream *stream, - struct consumer_relayd_sock_pair *relayd) + struct consumer_relayd_sock_pair *relayd, + unsigned long padding) { int ret; - uint64_t metadata_id; + struct lttcomm_relayd_metadata_payload hdr; - metadata_id = htobe64(stream->relayd_stream_id); + hdr.stream_id = htobe64(stream->relayd_stream_id); + hdr.padding_size = htobe32(padding); do { - ret = write(fd, (void *) &metadata_id, - sizeof(stream->relayd_stream_id)); + ret = write(fd, (void *) &hdr, sizeof(hdr)); } while (ret < 0 && errno == EINTR); if (ret < 0) { PERROR("write metadata stream id"); goto end; } - DBG("Metadata stream id %" PRIu64 " written before data", - stream->relayd_stream_id); + DBG("Metadata stream id %" PRIu64 " with padding %lu written before data", + stream->relayd_stream_id, padding); end: return ret; @@ -1126,7 +1129,8 @@ end: */ ssize_t lttng_consumer_on_read_subbuffer_mmap( struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream, unsigned long len) + struct lttng_consumer_stream *stream, unsigned long len, + unsigned long padding) { unsigned long mmap_offset; ssize_t ret = 0, written = 0; @@ -1178,17 +1182,17 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( if (stream->metadata_flag) { /* Metadata requires the control socket. */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); - netlen += sizeof(stream->relayd_stream_id); + netlen += sizeof(struct lttcomm_relayd_metadata_payload); } - ret = write_relayd_stream_header(stream, netlen, relayd); + ret = write_relayd_stream_header(stream, netlen, padding, relayd); if (ret >= 0) { /* Use the returned socket. */ outfd = ret; /* Write metadata stream id before payload */ if (stream->metadata_flag) { - ret = write_relayd_metadata_id(outfd, stream, relayd); + ret = write_relayd_metadata_id(outfd, stream, relayd, padding); if (ret < 0) { written = ret; goto end; @@ -1196,12 +1200,16 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( } } /* Else, use the default set before which is the filesystem. */ + } else { + /* No streaming, we have to set the len with the full padding */ + len += padding; } while (len > 0) { do { ret = write(outfd, stream->mmap_base + mmap_offset, len); } while (ret < 0 && errno == EINTR); + DBG("Consumer mmap write() ret %zd (len %lu)", ret, len); if (ret < 0) { PERROR("Error in file write"); if (written == 0) { @@ -1216,7 +1224,6 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( len -= ret; mmap_offset += ret; } - DBG("Consumer mmap write() ret %zd (len %lu)", ret, len); /* This call is useless on a socket so better save a syscall. */ if (!relayd) { @@ -1246,7 +1253,8 @@ end: */ ssize_t lttng_consumer_on_read_subbuffer_splice( struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream, unsigned long len) + struct lttng_consumer_stream *stream, unsigned long len, + unsigned long padding) { ssize_t ret = 0, written = 0, ret_splice = 0; loff_t offset = 0; @@ -1292,23 +1300,42 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( } /* Write metadata stream id before payload */ - if (stream->metadata_flag && relayd) { - /* - * Lock the control socket for the complete duration of the function - * since from this point on we will use the socket. - */ - pthread_mutex_lock(&relayd->ctrl_sock_mutex); + if (relayd) { + int total_len = len; - ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd); - if (ret < 0) { - written = ret; + if (stream->metadata_flag) { + /* + * Lock the control socket for the complete duration of the function + * since from this point on we will use the socket. + */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + + ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd, + padding); + if (ret < 0) { + written = ret; + goto end; + } + + total_len += sizeof(struct lttcomm_relayd_metadata_payload); + } + + ret = write_relayd_stream_header(stream, total_len, padding, relayd); + if (ret >= 0) { + /* Use the returned socket. */ + outfd = ret; + } else { + ERR("Remote relayd disconnected. Stopping"); goto end; } + } else { + /* No streaming, we have to set the len with the full padding */ + len += padding; } while (len > 0) { - DBG("splice chan to pipe offset %lu of len %lu (fd : %d)", - (unsigned long)offset, len, fd); + DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)", + (unsigned long)offset, len, fd, splice_pipe[1]); ret_splice = splice(fd, &offset, splice_pipe[1], NULL, len, SPLICE_F_MOVE | SPLICE_F_MORE); DBG("splice chan to pipe, ret %zd", ret_splice); @@ -1324,30 +1351,24 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( /* Handle stream on the relayd if the output is on the network */ if (relayd) { if (stream->metadata_flag) { + size_t metadata_payload_size = + sizeof(struct lttcomm_relayd_metadata_payload); + /* Update counter to fit the spliced data */ - ret_splice += sizeof(stream->relayd_stream_id); - len += sizeof(stream->relayd_stream_id); + ret_splice += metadata_payload_size; + len += metadata_payload_size; /* * We do this so the return value can match the len passed as * argument to this function. */ - written -= sizeof(stream->relayd_stream_id); - } - - ret = write_relayd_stream_header(stream, ret_splice, relayd); - if (ret >= 0) { - /* Use the returned socket. */ - outfd = ret; - } else { - ERR("Remote relayd disconnected. Stopping"); - goto end; + written -= metadata_payload_size; } } /* Splice data out */ ret_splice = splice(splice_pipe[0], NULL, outfd, NULL, ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE); - DBG("Kernel consumer splice pipe to file, ret %zd", ret_splice); + DBG("Consumer splice pipe to file, ret %zd", ret_splice); if (ret_splice < 0) { PERROR("Error in file splice"); if (written == 0) { @@ -1675,7 +1696,7 @@ restart: /* Check the metadata pipe for incoming metadata. */ if (pollfd == ctx->consumer_metadata_pipe[0]) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLNVAL)) { + if (revents & (LPOLLERR | LPOLLHUP )) { DBG("Metadata thread pipe hung up"); /* * Remove the pipe from the poll set and continue the loop @@ -1755,7 +1776,7 @@ restart: * Remove the stream from the hash table since there is no data * left on the fd because we previously did a read on the buffer. */ - if (revents & (LPOLLERR | LPOLLHUP | LPOLLNVAL)) { + if (revents & (LPOLLERR | LPOLLHUP)) { DBG("Metadata fd %d is hup|err|nval.", pollfd); if (!stream->hangup_flush_done && (consumer_data.type == LTTNG_CONSUMER32_UST