X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=52a4ed171d6b0dde5d1759c781f54ef45994cab0;hb=6bf5e7c9c58c736978108e197e1b529471ff3ac7;hp=5f87f4b5018fbec1a2de542797f46b4f6d17475f;hpb=331744e34f56a5aec69b05d356d6901e67926acc;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index 5f87f4b50..52a4ed171 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -783,7 +783,7 @@ static int write_relayd_stream_header(struct lttng_consumer_stream *stream, } /* Metadata are always sent on the control socket. */ - outfd = relayd->control_sock.fd; + outfd = relayd->control_sock.sock.fd; } else { /* Set header with stream information */ data_hdr.stream_id = htobe64(stream->relayd_stream_id); @@ -808,7 +808,7 @@ static int write_relayd_stream_header(struct lttng_consumer_stream *stream, ++stream->next_net_seq_num; /* Set to go on data socket */ - outfd = relayd->data_sock.fd; + outfd = relayd->data_sock.sock.fd; } error: @@ -828,7 +828,9 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, uid_t uid, gid_t gid, int relayd_id, - enum lttng_event_output output) + enum lttng_event_output output, + uint64_t tracefile_size, + uint64_t tracefile_count) { struct lttng_consumer_channel *channel; @@ -845,6 +847,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->gid = gid; channel->relayd_id = relayd_id; channel->output = output; + channel->tracefile_size = tracefile_size; + channel->tracefile_count = tracefile_count; strncpy(channel->pathname, pathname, sizeof(channel->pathname)); channel->pathname[sizeof(channel->pathname) - 1] = '\0'; @@ -1390,6 +1394,24 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( } else { /* No streaming, we have to set the len with the full padding */ len += padding; + + /* + * Check if we need to change the tracefile before writing the packet. + */ + if (stream->chan->tracefile_size > 0 && + (stream->tracefile_size_current + len) > + stream->chan->tracefile_size) { + ret = utils_rotate_stream_file(stream->chan->pathname, + stream->name, stream->chan->tracefile_size, + stream->chan->tracefile_count, stream->uid, stream->gid, + stream->out_fd, &(stream->tracefile_count_current)); + if (ret < 0) { + ERR("Rotating output file"); + goto end; + } + outfd = stream->out_fd = ret; + } + stream->tracefile_size_current += len; } while (len > 0) { @@ -1552,6 +1574,24 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( } else { /* No streaming, we have to set the len with the full padding */ len += padding; + + /* + * Check if we need to change the tracefile before writing the packet. + */ + if (stream->chan->tracefile_size > 0 && + (stream->tracefile_size_current + len) > + stream->chan->tracefile_size) { + ret = utils_rotate_stream_file(stream->chan->pathname, + stream->name, stream->chan->tracefile_size, + stream->chan->tracefile_count, stream->uid, stream->gid, + stream->out_fd, &(stream->tracefile_count_current)); + if (ret < 0) { + ERR("Rotating output file"); + goto end; + } + outfd = stream->out_fd = ret; + } + stream->tracefile_size_current += len; } while (len > 0) { @@ -2954,13 +2994,16 @@ void lttng_consumer_init(void) */ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, struct lttng_consumer_local_data *ctx, int sock, - struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock, - unsigned int sessiond_id) + struct pollfd *consumer_sockpoll, + struct lttcomm_relayd_sock *relayd_sock, unsigned int sessiond_id) { int fd = -1, ret = -1, relayd_created = 0; enum lttng_error_code ret_code = LTTNG_OK; struct consumer_relayd_sock_pair *relayd; + assert(ctx); + assert(relayd_sock); + DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx); /* First send a status message before receiving the fds. */ @@ -3010,11 +3053,11 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, switch (sock_type) { case LTTNG_STREAM_CONTROL: /* Copy received lttcomm socket */ - lttcomm_copy_sock(&relayd->control_sock, relayd_sock); - ret = lttcomm_create_sock(&relayd->control_sock); + lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock); + ret = lttcomm_create_sock(&relayd->control_sock.sock); /* Immediately try to close the created socket if valid. */ - if (relayd->control_sock.fd >= 0) { - if (close(relayd->control_sock.fd)) { + if (relayd->control_sock.sock.fd >= 0) { + if (close(relayd->control_sock.sock.fd)) { PERROR("close relayd control socket"); } } @@ -3024,7 +3067,10 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, } /* Assign new file descriptor */ - relayd->control_sock.fd = fd; + relayd->control_sock.sock.fd = fd; + /* Assign version values. */ + relayd->control_sock.major = relayd_sock->major; + relayd->control_sock.minor = relayd_sock->minor; /* * Create a session on the relayd and store the returned id. Lock the @@ -3052,11 +3098,11 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, break; case LTTNG_STREAM_DATA: /* Copy received lttcomm socket */ - lttcomm_copy_sock(&relayd->data_sock, relayd_sock); - ret = lttcomm_create_sock(&relayd->data_sock); + lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock); + ret = lttcomm_create_sock(&relayd->data_sock.sock); /* Immediately try to close the created socket if valid. */ - if (relayd->data_sock.fd >= 0) { - if (close(relayd->data_sock.fd)) { + if (relayd->data_sock.sock.fd >= 0) { + if (close(relayd->data_sock.sock.fd)) { PERROR("close relayd data socket"); } } @@ -3066,7 +3112,10 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, } /* Assign new file descriptor */ - relayd->data_sock.fd = fd; + relayd->data_sock.sock.fd = fd; + /* Assign version values. */ + relayd->data_sock.major = relayd_sock->major; + relayd->data_sock.minor = relayd_sock->minor; break; default: ERR("Unknown relayd socket type (%d)", sock_type);