X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.c;h=afc346333c5dc780a57c2bb86a548a9b0c60d82f;hp=7911a5b4e8d4d0701d40e88d3ebdd6841c4061af;hb=0b50e4b3fb9859af7072adcca784684834e5f8d1;hpb=fb83fe64f250bec7416f18891a8264450c61ead3 diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 7911a5b4e..afc346333 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -47,6 +47,7 @@ #include #include #include +#include struct lttng_consumer_global_data consumer_data = { .stream_count = 0, @@ -66,13 +67,16 @@ struct consumer_channel_msg { uint64_t key; /* del */ }; +/* Flag used to temporarily pause data consumption from testpoints. */ +int data_consumption_paused; + /* * Flag to inform the polling thread to quit when all fd hung up. Updated by * the consumer_thread_receive_fds when it notices that all fds has hung up. * Also updated by the signal handler (consumer_should_exit()). Read by the * polling threads. */ -volatile int consumer_quit; +int consumer_quit; /* * Global hash table containing respectively metadata and data streams. The @@ -367,6 +371,9 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) if (channel->live_timer_enabled == 1) { consumer_timer_live_stop(channel); } + if (channel->monitor_timer_enabled == 1) { + consumer_timer_monitor_stop(channel); + } switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: @@ -534,6 +541,16 @@ void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream) consumer_stream_destroy(stream, metadata_ht); } +void consumer_stream_update_channel_attributes( + struct lttng_consumer_stream *stream, + struct lttng_consumer_channel *channel) +{ + stream->channel_read_only_attributes.tracefile_size = + channel->tracefile_size; + memcpy(stream->channel_read_only_attributes.path, channel->pathname, + sizeof(stream->channel_read_only_attributes.path)); +} + struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, uint64_t stream_key, enum lttng_consumer_stream_state state, @@ -545,7 +562,8 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, int cpu, int *alloc_ret, enum consumer_channel_type type, - unsigned int monitor) + unsigned int monitor, + uint64_t trace_archive_id) { int ret; struct lttng_consumer_stream *stream; @@ -570,8 +588,9 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, stream->session_id = session_id; stream->monitor = monitor; stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE; - stream->index_fd = -1; + stream->index_file = NULL; stream->last_sequence_number = -1ULL; + stream->trace_archive_id = trace_archive_id; pthread_mutex_init(&stream->lock, NULL); pthread_mutex_init(&stream->metadata_timer_lock, NULL); @@ -623,10 +642,9 @@ end: /* * Add a stream to the global list protected by a mutex. */ -int consumer_add_data_stream(struct lttng_consumer_stream *stream) +void consumer_add_data_stream(struct lttng_consumer_stream *stream) { struct lttng_ht *ht = data_ht; - int ret = 0; assert(stream); assert(ht); @@ -676,8 +694,6 @@ int consumer_add_data_stream(struct lttng_consumer_stream *stream) pthread_mutex_unlock(&stream->chan->timer_lock); pthread_mutex_unlock(&stream->chan->lock); pthread_mutex_unlock(&consumer_data.lock); - - return ret; } void consumer_del_data_stream(struct lttng_consumer_stream *stream) @@ -792,7 +808,8 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_add_stream(&relayd->control_sock, stream->name, path, &stream->relayd_stream_id, - stream->chan->tracefile_size, stream->chan->tracefile_count); + stream->chan->tracefile_size, stream->chan->tracefile_count, + stream->trace_archive_id); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { goto end; @@ -1021,7 +1038,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, CDS_INIT_LIST_HEAD(&channel->streams.head); - DBG("Allocated channel (key %" PRIu64 ")", channel->key) + DBG("Allocated channel (key %" PRIu64 ")", channel->key); end: return channel; @@ -1070,7 +1087,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel, */ static int update_poll_array(struct lttng_consumer_local_data *ctx, struct pollfd **pollfd, struct lttng_consumer_stream **local_stream, - struct lttng_ht *ht) + struct lttng_ht *ht, int *nb_inactive_fd) { int i = 0; struct lttng_ht_iter iter; @@ -1082,6 +1099,7 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, assert(local_stream); DBG("Updating poll fd array"); + *nb_inactive_fd = 0; rcu_read_lock(); cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { /* @@ -1092,9 +1110,14 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, * just after the check. However, this is OK since the stream(s) will * be deleted once the thread is notified that the end point state has * changed where this function will be called back again. + * + * We track the number of inactive FDs because they still need to be + * closed by the polling thread after a wakeup on the data_pipe or + * metadata_pipe. */ if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM || stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) { + (*nb_inactive_fd)++; continue; } /* @@ -1220,7 +1243,7 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx) { ssize_t ret; - consumer_quit = 1; + CMM_STORE_SHARED(consumer_quit, 1); ret = lttng_write(ctx->consumer_should_quit[1], "4", 1); if (ret < 1) { PERROR("write consumer quit"); @@ -1229,9 +1252,15 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx) DBG("Consumer flag that it should quit"); } + +/* + * Flush pending writes to trace output disk file. + */ +static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset) { + int ret; int outfd = stream->out_fd; /* @@ -1262,8 +1291,12 @@ void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, * defined. So it can be expected to lead to lower throughput in * streaming. */ - posix_fadvise(outfd, orig_offset - stream->max_sb_size, + ret = posix_fadvise(outfd, orig_offset - stream->max_sb_size, stream->max_sb_size, POSIX_FADV_DONTNEED); + if (ret && ret != -ENOSYS) { + errno = ret; + PERROR("posix_fadvise on fd %i", outfd); + } } /* @@ -1337,6 +1370,8 @@ struct lttng_consumer_local_data *lttng_consumer_create( goto error_metadata_pipe; } + ctx->channel_monitor_pipe = -1; + return ctx; error_metadata_pipe: @@ -1443,7 +1478,7 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) */ static int write_relayd_metadata_id(int fd, struct lttng_consumer_stream *stream, - struct consumer_relayd_sock_pair *relayd, unsigned long padding) + unsigned long padding) { ssize_t ret; struct lttcomm_relayd_metadata_payload hdr; @@ -1520,7 +1555,6 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( mmap_base = stream->mmap_base; ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset); if (ret < 0) { - ret = -errno; PERROR("tracer ctl get_mmap_read_offset"); goto end; } @@ -1556,6 +1590,16 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( if (stream->metadata_flag) { /* Metadata requires the control socket. */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); + if (stream->reset_metadata_flag) { + ret = relayd_reset_metadata(&relayd->control_sock, + stream->relayd_stream_id, + stream->metadata_version); + if (ret < 0) { + relayd_hang_up = 1; + goto write_error; + } + stream->reset_metadata_flag = 0; + } netlen += sizeof(struct lttcomm_relayd_metadata_payload); } @@ -1569,7 +1613,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( /* Write metadata stream id before payload */ if (stream->metadata_flag) { - ret = write_relayd_metadata_id(outfd, stream, relayd, padding); + ret = write_relayd_metadata_id(outfd, stream, padding); if (ret < 0) { relayd_hang_up = 1; goto write_error; @@ -1579,6 +1623,15 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( /* No streaming, we have to set the len with the full padding */ len += padding; + if (stream->metadata_flag && stream->reset_metadata_flag) { + ret = utils_truncate_stream_file(stream->out_fd, 0); + if (ret < 0) { + ERR("Reset metadata file"); + goto end; + } + stream->reset_metadata_flag = 0; + } + /* * Check if we need to change the tracefile before writing the packet. */ @@ -1596,21 +1649,16 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( } outfd = stream->out_fd; - if (stream->index_fd >= 0) { - ret = close(stream->index_fd); - if (ret < 0) { - PERROR("Closing index"); - goto end; - } - stream->index_fd = -1; - ret = index_create_file(stream->chan->pathname, + if (stream->index_file) { + lttng_index_file_put(stream->index_file); + stream->index_file = lttng_index_file_create(stream->chan->pathname, stream->name, stream->uid, stream->gid, stream->chan->tracefile_size, - stream->tracefile_count_current); - if (ret < 0) { + stream->tracefile_count_current, + CTF_INDEX_MAJOR, CTF_INDEX_MINOR); + if (!stream->index_file) { goto end; } - stream->index_fd = ret; } /* Reset current size because we just perform a rotation. */ @@ -1663,8 +1711,8 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( lttng_sync_file_range(outfd, stream->out_fd_offset, len, SYNC_FILE_RANGE_WRITE); stream->out_fd_offset += len; + lttng_consumer_sync_trace_file(stream, orig_offset); } - lttng_consumer_sync_trace_file(stream, orig_offset); write_error: /* @@ -1744,7 +1792,17 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd, + if (stream->reset_metadata_flag) { + ret = relayd_reset_metadata(&relayd->control_sock, + stream->relayd_stream_id, + stream->metadata_version); + if (ret < 0) { + relayd_hang_up = 1; + goto write_error; + } + stream->reset_metadata_flag = 0; + } + ret = write_relayd_metadata_id(splice_pipe[1], stream, padding); if (ret < 0) { written = ret; @@ -1767,6 +1825,14 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( /* No streaming, we have to set the len with the full padding */ len += padding; + if (stream->metadata_flag && stream->reset_metadata_flag) { + ret = utils_truncate_stream_file(stream->out_fd, 0); + if (ret < 0) { + ERR("Reset metadata file"); + goto end; + } + stream->reset_metadata_flag = 0; + } /* * Check if we need to change the tracefile before writing the packet. */ @@ -1785,22 +1851,16 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( } outfd = stream->out_fd; - if (stream->index_fd >= 0) { - ret = close(stream->index_fd); - if (ret < 0) { - PERROR("Closing index"); - goto end; - } - stream->index_fd = -1; - ret = index_create_file(stream->chan->pathname, + if (stream->index_file) { + lttng_index_file_put(stream->index_file); + stream->index_file = lttng_index_file_create(stream->chan->pathname, stream->name, stream->uid, stream->gid, stream->chan->tracefile_size, - stream->tracefile_count_current); - if (ret < 0) { - written = ret; + stream->tracefile_count_current, + CTF_INDEX_MAJOR, CTF_INDEX_MINOR); + if (!stream->index_file) { goto end; } - stream->index_fd = ret; } /* Reset current size because we just perform a rotation. */ @@ -1875,7 +1935,9 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( stream->output_written += ret_splice; written += ret_splice; } - lttng_consumer_sync_trace_file(stream, orig_offset); + if (!relayd) { + lttng_consumer_sync_trace_file(stream, orig_offset); + } goto end; write_error: @@ -1912,6 +1974,25 @@ end: return written; } +/* + * Sample the snapshot positions for a specific fd + * + * Returns 0 on success, < 0 on error + */ +int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream) +{ + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + return lttng_kconsumer_sample_snapshot_positions(stream); + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + return lttng_ustconsumer_sample_snapshot_positions(stream); + default: + ERR("Unknown consumer_data type"); + assert(0); + return -ENOSYS; + } +} /* * Take a snapshot for a specific fd * @@ -1953,6 +2034,27 @@ int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, } } +/* + * Get the consumed position (free-running counter position in bytes). + * + * Returns 0 on success, < 0 on error + */ +int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, + unsigned long *pos) +{ + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + return lttng_kconsumer_get_consumed_snapshot(stream, pos); + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + return lttng_ustconsumer_get_consumed_snapshot(stream, pos); + default: + ERR("Unknown consumer_data type"); + assert(0); + return -ENOSYS; + } +} + int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll) { @@ -2015,6 +2117,10 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, pthread_mutex_lock(&consumer_data.lock); pthread_mutex_lock(&stream->chan->lock); pthread_mutex_lock(&stream->lock); + if (stream->chan->metadata_cache) { + /* Only applicable to userspace consumers. */ + pthread_mutex_lock(&stream->chan->metadata_cache->lock); + } /* Remove any reference to that stream. */ consumer_stream_delete(stream, ht); @@ -2038,6 +2144,9 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, */ stream->chan->metadata_stream = NULL; + if (stream->chan->metadata_cache) { + pthread_mutex_unlock(&stream->chan->metadata_cache->lock); + } pthread_mutex_unlock(&stream->lock); pthread_mutex_unlock(&stream->chan->lock); pthread_mutex_unlock(&consumer_data.lock); @@ -2053,10 +2162,9 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, * Action done with the metadata stream when adding it to the consumer internal * data structures to handle it. */ -int consumer_add_metadata_stream(struct lttng_consumer_stream *stream) +void consumer_add_metadata_stream(struct lttng_consumer_stream *stream) { struct lttng_ht *ht = metadata_ht; - int ret = 0; struct lttng_ht_iter iter; struct lttng_ht_node_u64 *node; @@ -2100,7 +2208,7 @@ int consumer_add_metadata_stream(struct lttng_consumer_stream *stream) lttng_ht_add_unique_u64(ht, &stream->node); - lttng_ht_add_unique_u64(consumer_data.stream_per_chan_id_ht, + lttng_ht_add_u64(consumer_data.stream_per_chan_id_ht, &stream->node_channel_id); /* @@ -2116,7 +2224,6 @@ int consumer_add_metadata_stream(struct lttng_consumer_stream *stream) pthread_mutex_unlock(&stream->chan->lock); pthread_mutex_unlock(&stream->chan->timer_lock); pthread_mutex_unlock(&consumer_data.lock); - return ret; } /* @@ -2172,6 +2279,73 @@ static void validate_endpoint_status_metadata_stream( rcu_read_unlock(); } +static +int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx, + uint64_t key) +{ + ssize_t ret; + + do { + ret = write(ctx->channel_rotate_pipe, &key, sizeof(key)); + } while (ret == -1 && errno == EINTR); + if (ret == -1) { + PERROR("Failed to write to the channel rotation pipe"); + } else { + DBG("Sent channel rotation notification for channel key %" + PRIu64, key); + ret = 0; + } + + return (int) ret; +} + +/* + * Perform operations that need to be done after a stream has + * rotated and released the stream lock. + * + * Multiple rotations cannot occur simultaneously, so we know the state of the + * "rotated" stream flag cannot change. + * + * This MUST be called WITHOUT the stream lock held. + */ +static +int consumer_post_rotation(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx) +{ + int ret = 0; + + pthread_mutex_lock(&stream->chan->lock); + + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + /* + * The ust_metadata_pushed counter has been reset to 0, so now + * we can wakeup the metadata thread so it dumps the metadata + * cache to the new file. + */ + if (stream->metadata_flag) { + consumer_metadata_wakeup_pipe(stream->chan); + } + break; + default: + ERR("Unknown consumer_data type"); + abort(); + } + + if (--stream->chan->nr_stream_rotate_pending == 0) { + DBG("Rotation of channel \"%s\" completed, notifying the session daemon", + stream->chan->name); + ret = rotate_notify_sessiond(ctx, stream->chan->key); + } + assert(stream->chan->nr_stream_rotate_pending >= 0); + pthread_mutex_unlock(&stream->chan->lock); + + return ret; +} + /* * Thread polls on metadata file descriptor and write them on disk or on the * network. @@ -2224,10 +2398,10 @@ restart: DBG("Metadata poll return from wait with %d fd(s)", LTTNG_POLL_GETNB(&events)); health_poll_exit(); - DBG("Metadata event catched in thread"); + DBG("Metadata event caught in thread"); if (ret < 0) { if (errno == EINTR) { - ERR("Poll EINTR catched"); + ERR("Poll EINTR caught"); goto restart; } if (LTTNG_POLL_GETNB(&events) == 0) { @@ -2325,7 +2499,7 @@ restart: len = ctx->on_buffer_ready(stream, ctx); /* * We don't check the return value here since if we get - * a negative len, it means an error occured thus we + * a negative len, it means an error occurred thus we * simply remove it from the poll set and free the * stream. */ @@ -2352,7 +2526,7 @@ restart: len = ctx->on_buffer_ready(stream, ctx); /* * We don't check the return value here since if we get - * a negative len, it means an error occured thus we + * a negative len, it means an error occurred thus we * simply remove it from the poll set and free the * stream. */ @@ -2403,7 +2577,9 @@ void *consumer_thread_data_poll(void *data) /* local view of the streams */ struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL; /* local view of consumer_data.fds_count */ - int nb_fd = 0; + int nb_fd = 0, nb_pipes_fd; + /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */ + int nb_inactive_fd = 0; struct lttng_consumer_local_data *ctx = data; ssize_t len; @@ -2442,17 +2618,19 @@ void *consumer_thread_data_poll(void *data) local_stream = NULL; /* - * Allocate for all fds +1 for the consumer_data_pipe and +1 for - * wake up pipe. + * Allocate for all fds + 2: + * +1 for the consumer_data_pipe + * +1 for wake up pipe */ - pollfd = zmalloc((consumer_data.stream_count + 2) * sizeof(struct pollfd)); + nb_pipes_fd = 2; + pollfd = zmalloc((consumer_data.stream_count + nb_pipes_fd) * sizeof(struct pollfd)); if (pollfd == NULL) { PERROR("pollfd malloc"); pthread_mutex_unlock(&consumer_data.lock); goto end; } - local_stream = zmalloc((consumer_data.stream_count + 2) * + local_stream = zmalloc((consumer_data.stream_count + nb_pipes_fd) * sizeof(struct lttng_consumer_stream *)); if (local_stream == NULL) { PERROR("local_stream malloc"); @@ -2460,7 +2638,7 @@ void *consumer_thread_data_poll(void *data) goto end; } ret = update_poll_array(ctx, &pollfd, local_stream, - data_ht); + data_ht, &nb_inactive_fd); if (ret < 0) { ERR("Error in allocating pollfd or local_outfds"); lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR); @@ -2473,15 +2651,19 @@ void *consumer_thread_data_poll(void *data) pthread_mutex_unlock(&consumer_data.lock); /* No FDs and consumer_quit, consumer_cleanup the thread */ - if (nb_fd == 0 && consumer_quit == 1) { + if (nb_fd == 0 && nb_inactive_fd == 0 && + CMM_LOAD_SHARED(consumer_quit) == 1) { err = 0; /* All is OK */ goto end; } /* poll on the array of fds */ restart: - DBG("polling on %d fd", nb_fd + 2); + DBG("polling on %d fd", nb_fd + nb_pipes_fd); + if (testpoint(consumerd_thread_data_poll)) { + goto end; + } health_poll_entry(); - num_rdy = poll(pollfd, nb_fd + 2, -1); + num_rdy = poll(pollfd, nb_fd + nb_pipes_fd, -1); health_poll_exit(); DBG("poll num_rdy : %d", num_rdy); if (num_rdy == -1) { @@ -2499,6 +2681,12 @@ void *consumer_thread_data_poll(void *data) goto end; } + if (caa_unlikely(data_consumption_paused)) { + DBG("Data consumption paused, sleeping..."); + sleep(1); + goto restart; + } + /* * If the consumer_data_pipe triggered poll go directly to the * beginning of the loop to update the array. We want to prioritize @@ -2807,10 +2995,10 @@ restart: DBG("Channel poll return from wait with %d fd(s)", LTTNG_POLL_GETNB(&events)); health_poll_exit(); - DBG("Channel event catched in thread"); + DBG("Channel event caught in thread"); if (ret < 0) { if (errno == EINTR) { - ERR("Poll EINTR catched"); + ERR("Poll EINTR caught"); goto restart; } if (LTTNG_POLL_GETNB(&events) == 0) { @@ -3141,7 +3329,7 @@ void *consumer_thread_sessiond_poll(void *data) err = 0; goto end; } - if (consumer_quit) { + if (CMM_LOAD_SHARED(consumer_quit)) { DBG("consumer_thread_receive_fds received quit from signal"); err = 0; /* All is OK */ goto end; @@ -3166,7 +3354,7 @@ end: * when all fds have hung up, the polling thread * can exit cleanly */ - consumer_quit = 1; + CMM_STORE_SHARED(consumer_quit, 1); /* * Notify the data poll thread to poll back again and test the @@ -3207,6 +3395,8 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { ssize_t ret; + int rotate_ret; + bool rotated = false; pthread_mutex_lock(&stream->lock); if (stream->metadata_flag) { @@ -3215,11 +3405,11 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: - ret = lttng_kconsumer_read_subbuffer(stream, ctx); + ret = lttng_kconsumer_read_subbuffer(stream, ctx, &rotated); break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: - ret = lttng_ustconsumer_read_subbuffer(stream, ctx); + ret = lttng_ustconsumer_read_subbuffer(stream, ctx, &rotated); break; default: ERR("Unknown consumer_data type"); @@ -3233,6 +3423,14 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, pthread_mutex_unlock(&stream->metadata_rdv_lock); } pthread_mutex_unlock(&stream->lock); + if (rotated) { + rotate_ret = consumer_post_rotation(stream, ctx); + if (rotate_ret < 0) { + ERR("Failed after a rotation"); + ret = -1; + } + } + return ret; } @@ -3298,7 +3496,7 @@ error: * This will create a relayd socket pair and add it to the relayd hash table. * The caller MUST acquire a RCU read side lock before calling it. */ -int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, + void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll, struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id, @@ -3320,7 +3518,6 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, /* Not found. Allocate one. */ relayd = consumer_allocate_relayd_sock_pair(net_seq_idx); if (relayd == NULL) { - ret = -ENOMEM; ret_code = LTTCOMM_CONSUMERD_ENOMEM; goto error; } else { @@ -3353,14 +3550,12 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, if (ret) { /* Needing to exit in the middle of a command: error. */ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR); - ret = -EINTR; goto error_nosignal; } /* Get relayd socket from session daemon */ ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); if (ret != sizeof(fd)) { - ret = -1; fd = -1; /* Just in case it gets set with an invalid value. */ /* @@ -3434,7 +3629,6 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, break; default: ERR("Unknown relayd socket type (%d)", sock_type); - ret = -1; ret_code = LTTCOMM_CONSUMERD_FATAL; goto error; } @@ -3458,7 +3652,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, add_relayd(relayd); /* All good! */ - return 0; + return; error: if (consumer_send_status_msg(sock, ret_code) < 0) { @@ -3476,8 +3670,6 @@ error_nosignal: if (relayd_created) { free(relayd); } - - return ret; } /* @@ -3723,3 +3915,548 @@ unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos, } return start_pos; } + +static +int consumer_flush_buffer(struct lttng_consumer_stream *stream, int producer_active) +{ + int ret = 0; + + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + ret = kernctl_buffer_flush(stream->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel stream"); + goto end; + } + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + lttng_ustctl_flush_buffer(stream, producer_active); + break; + default: + ERR("Unknown consumer_data type"); + abort(); + } + +end: + return ret; +} + +/* + * Sample the rotate position for all the streams of a channel. If a stream + * is already at the rotate position (produced == consumed), we flag it as + * ready for rotation. The rotation of ready streams occurs after we have + * replied to the session daemon that we have finished sampling the positions. + * + * Returns 0 on success, < 0 on error + */ +int lttng_consumer_rotate_channel(uint64_t key, const char *path, + uint64_t relayd_id, uint32_t metadata, uint64_t new_chunk_id, + struct lttng_consumer_local_data *ctx) +{ + int ret; + struct lttng_consumer_channel *channel; + struct lttng_consumer_stream *stream; + struct lttng_ht_iter iter; + struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht; + + DBG("Consumer sample rotate position for channel %" PRIu64, key); + + rcu_read_lock(); + + channel = consumer_find_channel(key); + if (!channel) { + ERR("No channel found for key %" PRIu64, key); + ret = -1; + goto end; + } + + pthread_mutex_lock(&channel->lock); + channel->current_chunk_id = new_chunk_id; + + ret = lttng_strncpy(channel->pathname, path, sizeof(channel->pathname)); + if (ret) { + ERR("Failed to copy new path to channel during channel rotation"); + ret = -1; + goto end_unlock_channel; + } + + if (relayd_id == -1ULL) { + /* + * The domain path (/ust or /kernel) has been created before, we + * now need to create the last part of the path: the application/user + * specific section (uid/1000/64-bit). + */ + ret = utils_mkdir_recursive(channel->pathname, S_IRWXU | S_IRWXG, + channel->uid, channel->gid); + if (ret < 0) { + ERR("Failed to create trace directory at %s during rotation", + channel->pathname); + ret = -1; + goto end_unlock_channel; + } + } + + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct, &channel->key, &iter.iter, + stream, node_channel_id.node) { + unsigned long consumed_pos; + + health_code_update(); + + /* + * Lock stream because we are about to change its state. + */ + pthread_mutex_lock(&stream->lock); + + ret = lttng_strncpy(stream->channel_read_only_attributes.path, + channel->pathname, + sizeof(stream->channel_read_only_attributes.path)); + if (ret) { + ERR("Failed to sample channel path name during channel rotation"); + goto end_unlock_stream; + } + ret = lttng_consumer_sample_snapshot_positions(stream); + if (ret < 0) { + ERR("Failed to sample snapshot position during channel rotation"); + goto end_unlock_stream; + } + + ret = lttng_consumer_get_produced_snapshot(stream, + &stream->rotate_position); + if (ret < 0) { + ERR("Failed to sample produced position during channel rotation"); + goto end_unlock_stream; + } + + lttng_consumer_get_consumed_snapshot(stream, + &consumed_pos); + if (consumed_pos == stream->rotate_position) { + stream->rotate_ready = true; + } + channel->nr_stream_rotate_pending++; + + ret = consumer_flush_buffer(stream, 1); + if (ret < 0) { + ERR("Failed to flush stream %" PRIu64 " during channel rotation", + stream->key); + goto end_unlock_stream; + } + + pthread_mutex_unlock(&stream->lock); + } + pthread_mutex_unlock(&channel->lock); + + ret = 0; + goto end; + +end_unlock_stream: + pthread_mutex_unlock(&stream->lock); +end_unlock_channel: + pthread_mutex_unlock(&channel->lock); +end: + rcu_read_unlock(); + return ret; +} + +/* + * Check if a stream is ready to be rotated after extracting it. + * + * Return 1 if it is ready for rotation, 0 if it is not, a negative value on + * error. Stream lock must be held. + */ +int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream) +{ + int ret; + unsigned long consumed_pos; + + if (!stream->rotate_position && !stream->rotate_ready) { + ret = 0; + goto end; + } + + if (stream->rotate_ready) { + ret = 1; + goto end; + } + + /* + * If we don't have the rotate_ready flag, check the consumed position + * to determine if we need to rotate. + */ + ret = lttng_consumer_sample_snapshot_positions(stream); + if (ret < 0) { + ERR("Taking snapshot positions"); + goto end; + } + + ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos); + if (ret < 0) { + ERR("Consumed snapshot position"); + goto end; + } + + /* Rotate position not reached yet (with check for overflow). */ + if ((long) (consumed_pos - stream->rotate_position) < 0) { + ret = 0; + goto end; + } + ret = 1; + +end: + return ret; +} + +/* + * Reset the state for a stream after a rotation occurred. + */ +void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream) +{ + stream->rotate_position = 0; + stream->rotate_ready = false; +} + +/* + * Perform the rotation a local stream file. + */ +int rotate_local_stream(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream) +{ + int ret; + + DBG("Rotate local stream: stream key %" PRIu64 ", channel key %" PRIu64 " at path %s", + stream->key, + stream->chan->key, + stream->channel_read_only_attributes.path); + + ret = close(stream->out_fd); + if (ret < 0) { + PERROR("Closing trace file (fd %d), stream %" PRIu64, + stream->out_fd, stream->key); + assert(0); + goto error; + } + + ret = utils_create_stream_file( + stream->channel_read_only_attributes.path, + stream->name, + stream->channel_read_only_attributes.tracefile_size, + stream->tracefile_count_current, + stream->uid, stream->gid, NULL); + if (ret < 0) { + ERR("Rotate create stream file"); + goto error; + } + stream->out_fd = ret; + stream->tracefile_size_current = 0; + + if (!stream->metadata_flag) { + struct lttng_index_file *index_file; + + lttng_index_file_put(stream->index_file); + + index_file = lttng_index_file_create( + stream->channel_read_only_attributes.path, + stream->name, stream->uid, stream->gid, + stream->channel_read_only_attributes.tracefile_size, + stream->tracefile_count_current, + CTF_INDEX_MAJOR, CTF_INDEX_MINOR); + if (!index_file) { + ERR("Create index file during rotation"); + goto error; + } + stream->index_file = index_file; + stream->out_fd_offset = 0; + } + + ret = 0; + goto end; + +error: + ret = -1; +end: + return ret; + +} + +/* + * Perform the rotation a stream file on the relay. + */ +int rotate_relay_stream(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream) +{ + int ret; + struct consumer_relayd_sock_pair *relayd; + + DBG("Rotate relay stream"); + relayd = consumer_find_relayd(stream->net_seq_idx); + if (!relayd) { + ERR("Failed to find relayd"); + ret = -1; + goto end; + } + + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_rotate_stream(&relayd->control_sock, + stream->relayd_stream_id, + stream->channel_read_only_attributes.path, + stream->chan->current_chunk_id, + stream->last_sequence_number); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret) { + ERR("Rotate relay stream"); + } + +end: + return ret; +} + +/* + * Performs the stream rotation for the rotate session feature if needed. + * It must be called with the stream lock held. + * + * Return 0 on success, a negative number of error. + */ +int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, bool *rotated) +{ + int ret; + + DBG("Consumer rotate stream %" PRIu64, stream->key); + + if (stream->net_seq_idx != (uint64_t) -1ULL) { + ret = rotate_relay_stream(ctx, stream); + } else { + ret = rotate_local_stream(ctx, stream); + } + if (ret < 0) { + ERR("Rotate stream"); + goto error; + } + + if (stream->metadata_flag) { + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + /* + * Reset the position of what has been read from the metadata + * cache to 0 so we can dump it again. + */ + ret = kernctl_metadata_cache_dump(stream->wait_fd); + if (ret < 0) { + ERR("Failed to dump the kernel metadata cache after rotation"); + goto error; + } + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + /* + * Reset the position pushed from the metadata cache so it + * will write from the beginning on the next push. + */ + stream->ust_metadata_pushed = 0; + break; + default: + ERR("Unknown consumer_data type"); + abort(); + } + } + lttng_consumer_reset_stream_rotate_state(stream); + + if (rotated) { + *rotated = true; + } + + ret = 0; + +error: + return ret; +} + +/* + * Rotate all the ready streams now. + * + * This is especially important for low throughput streams that have already + * been consumed, we cannot wait for their next packet to perform the + * rotation. + * + * Returns 0 on success, < 0 on error + */ +int lttng_consumer_rotate_ready_streams(uint64_t key, + struct lttng_consumer_local_data *ctx) +{ + int ret; + struct lttng_consumer_channel *channel; + struct lttng_consumer_stream *stream; + struct lttng_ht_iter iter; + struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht; + + rcu_read_lock(); + + DBG("Consumer rotate ready streams in channel %" PRIu64, key); + + channel = consumer_find_channel(key); + if (!channel) { + ERR("No channel found for key %" PRIu64, key); + ret = -1; + goto end; + } + + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct, &channel->key, &iter.iter, + stream, node_channel_id.node) { + health_code_update(); + + pthread_mutex_lock(&stream->lock); + + if (!stream->rotate_ready) { + pthread_mutex_unlock(&stream->lock); + continue; + } + DBG("Consumer rotate ready stream %" PRIu64, stream->key); + + ret = lttng_consumer_rotate_stream(ctx, stream, NULL); + pthread_mutex_unlock(&stream->lock); + if (ret) { + goto end; + } + + ret = consumer_post_rotation(stream, ctx); + if (ret) { + goto end; + } + } + + ret = 0; + +end: + rcu_read_unlock(); + return ret; +} + +static +int rotate_rename_local(const char *old_path, const char *new_path, + uid_t uid, gid_t gid) +{ + int ret; + + assert(old_path); + assert(new_path); + + ret = utils_mkdir_recursive(new_path, S_IRWXU | S_IRWXG, uid, gid); + if (ret < 0) { + ERR("Create directory on rotate"); + goto end; + } + + ret = rename(old_path, new_path); + if (ret < 0 && errno != ENOENT) { + PERROR("Rename completed rotation chunk"); + goto end; + } + + ret = 0; +end: + return ret; +} + +static +int rotate_rename_relay(const char *old_path, const char *new_path, + uint64_t relayd_id) +{ + int ret; + struct consumer_relayd_sock_pair *relayd; + + relayd = consumer_find_relayd(relayd_id); + if (!relayd) { + ERR("Failed to find relayd while running rotate_rename_relay command"); + ret = -1; + goto end; + } + + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_rotate_rename(&relayd->control_sock, old_path, new_path); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); +end: + return ret; +} + +int lttng_consumer_rotate_rename(const char *old_path, const char *new_path, + uid_t uid, gid_t gid, uint64_t relayd_id) +{ + if (relayd_id != -1ULL) { + return rotate_rename_relay(old_path, new_path, relayd_id); + } else { + return rotate_rename_local(old_path, new_path, uid, gid); + } +} + +int lttng_consumer_rotate_pending_relay(uint64_t session_id, + uint64_t relayd_id, uint64_t chunk_id) +{ + int ret; + struct consumer_relayd_sock_pair *relayd; + + relayd = consumer_find_relayd(relayd_id); + if (!relayd) { + ERR("Failed to find relayd"); + ret = -1; + goto end; + } + + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_rotate_pending(&relayd->control_sock, chunk_id); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + +end: + return ret; +} + +static +int mkdir_local(const char *path, uid_t uid, gid_t gid) +{ + int ret; + + ret = utils_mkdir_recursive(path, S_IRWXU | S_IRWXG, uid, gid); + if (ret < 0) { + /* utils_mkdir_recursive logs an error. */ + goto end; + } + + ret = 0; +end: + return ret; +} + +static +int mkdir_relay(const char *path, uint64_t relayd_id) +{ + int ret; + struct consumer_relayd_sock_pair *relayd; + + relayd = consumer_find_relayd(relayd_id); + if (!relayd) { + ERR("Failed to find relayd"); + ret = -1; + goto end; + } + + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_mkdir(&relayd->control_sock, path); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + +end: + return ret; + +} + +int lttng_consumer_mkdir(const char *path, uid_t uid, gid_t gid, + uint64_t relayd_id) +{ + if (relayd_id != -1ULL) { + return mkdir_relay(path, relayd_id); + } else { + return mkdir_local(path, uid, gid); + } +}