X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=4a7efad3ec28dcbea09a6a070b9f9aef5a16dbef;hp=cbad61144b1d1ec391cf1649a85672751d1311be;hb=f75c5439a7fe3d47edf68098f249421b701fceaf;hpb=a3a86f35682d8dd69a617ded2777092de51fcf8c diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index cbad61144..4a7efad3e 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -73,6 +73,7 @@ static void destroy_channel(struct lttng_consumer_channel *channel) cds_list_del(&stream->send_node); ustctl_destroy_stream(stream->ustream); + lttng_trace_chunk_put(stream->trace_chunk); free(stream); } @@ -123,7 +124,7 @@ error: * Allocate and return a consumer channel object. */ static struct lttng_consumer_channel *allocate_channel(uint64_t session_id, - const char *pathname, const char *name, uid_t uid, gid_t gid, + const uint64_t *chunk_id, const char *pathname, const char *name, uint64_t relayd_id, uint64_t key, enum lttng_event_output output, uint64_t tracefile_size, uint64_t tracefile_count, uint64_t session_id_per_pid, unsigned int monitor, @@ -133,8 +134,8 @@ static struct lttng_consumer_channel *allocate_channel(uint64_t session_id, assert(pathname); assert(name); - return consumer_allocate_channel(key, session_id, pathname, name, uid, - gid, relayd_id, output, tracefile_size, + return consumer_allocate_channel(key, session_id, chunk_id, pathname, + name, relayd_id, output, tracefile_size, tracefile_count, session_id_per_pid, monitor, live_timer_interval, root_shm_path, shm_path); } @@ -157,12 +158,10 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, int key, stream = consumer_allocate_stream(channel->key, key, - LTTNG_CONSUMER_ACTIVE_STREAM, channel->name, - channel->uid, - channel->gid, channel->relayd_id, channel->session_id, + channel->trace_chunk, cpu, &alloc_ret, channel->type, @@ -263,6 +262,7 @@ end: /* * Create streams for the given channel using liblttng-ust-ctl. + * The channel lock must be acquired by the caller. * * Return 0 on success else a negative value. */ @@ -272,6 +272,7 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, int ret, cpu = 0; struct ustctl_consumer_stream *ustream; struct lttng_consumer_stream *stream; + pthread_mutex_t *current_stream_lock = NULL; assert(channel); assert(ctx); @@ -318,6 +319,8 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, uatomic_inc(&stream->chan->refcount); } + pthread_mutex_lock(&stream->lock); + current_stream_lock = &stream->lock; /* * Order is important this is why a list is used. On error, the caller * should clean this list. @@ -356,12 +359,17 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, sizeof(ust_metadata_pipe)); } } + pthread_mutex_unlock(&stream->lock); + current_stream_lock = NULL; } return 0; error: error_alloc: + if (current_stream_lock) { + pthread_mutex_unlock(current_stream_lock); + } return ret; } @@ -407,9 +415,8 @@ error_shm_open: return -1; } -static int open_ust_stream_fd(struct lttng_consumer_channel *channel, - struct ustctl_consumer_channel_attr *attr, - int cpu) +static int open_ust_stream_fd(struct lttng_consumer_channel *channel, int cpu, + const struct lttng_credentials *session_credentials) { char shm_path[PATH_MAX]; int ret; @@ -423,7 +430,7 @@ static int open_ust_stream_fd(struct lttng_consumer_channel *channel, } return run_as_open(shm_path, O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, - channel->uid, channel->gid); + session_credentials->uid, session_credentials->gid); error_shm_path: return -1; @@ -446,6 +453,7 @@ static int create_ust_channel(struct lttng_consumer_channel *channel, assert(channel); assert(attr); assert(ust_chanp); + assert(channel->buffer_credentials.is_set); DBG3("Creating channel to ustctl with attr: [overwrite: %d, " "subbuf_size: %" PRIu64 ", num_subbuf: %" PRIu64 ", " @@ -464,7 +472,8 @@ static int create_ust_channel(struct lttng_consumer_channel *channel, goto error_alloc; } for (i = 0; i < nr_stream_fds; i++) { - stream_fds[i] = open_ust_stream_fd(channel, attr, i); + stream_fds[i] = open_ust_stream_fd(channel, i, + &channel->buffer_credentials.value); if (stream_fds[i] < 0) { ret = -1; goto error_open; @@ -499,7 +508,8 @@ error_open: ERR("Cannot get stream shm path"); } closeret = run_as_unlink(shm_path, - channel->uid, channel->gid); + channel->buffer_credentials.value.uid, + channel->buffer_credentials.value.gid); if (closeret) { PERROR("unlink %s", shm_path); } @@ -508,7 +518,9 @@ error_open: /* Try to rmdir all directories under shm_path root. */ if (channel->root_shm_path[0]) { (void) run_as_rmdir_recursive(channel->root_shm_path, - channel->uid, channel->gid); + channel->buffer_credentials.value.uid, + channel->buffer_credentials.value.gid, + LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG); } free(stream_fds); error_alloc: @@ -641,7 +653,7 @@ error: * Return 0 on success or else, a negative value is returned and the channel * MUST be destroyed by consumer_del_channel(). */ -static int ask_channel(struct lttng_consumer_local_data *ctx, int sock, +static int ask_channel(struct lttng_consumer_local_data *ctx, struct lttng_consumer_channel *channel, struct ustctl_consumer_channel_attr *attr) { @@ -684,7 +696,9 @@ static int ask_channel(struct lttng_consumer_local_data *ctx, int sock, } /* Open all streams for this channel. */ + pthread_mutex_lock(&channel->lock); ret = create_ust_streams(channel, ctx); + pthread_mutex_unlock(&channel->lock); if (ret < 0) { goto end; } @@ -816,7 +830,6 @@ error: /* * Close metadata stream wakeup_fd using the given key to retrieve the channel. - * RCU read side lock MUST be acquired before calling this function. * * Return 0 on success else an LTTng error code. */ @@ -984,14 +997,15 @@ end: /* * Snapshot the whole metadata. + * RCU read-side lock must be held by the caller. * * Returns 0 on success, < 0 on error */ -static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id, +static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel, + uint64_t key, char *path, uint64_t relayd_id, struct lttng_consumer_local_data *ctx) { int ret = 0; - struct lttng_consumer_channel *metadata_channel; struct lttng_consumer_stream *metadata_stream; assert(path); @@ -1002,13 +1016,6 @@ static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id, rcu_read_lock(); - metadata_channel = consumer_find_channel(key); - if (!metadata_channel) { - ERR("UST snapshot metadata channel not found for key %" PRIu64, - key); - ret = -1; - goto error; - } assert(!metadata_channel->monitor); health_code_update(); @@ -1036,22 +1043,17 @@ static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id, metadata_stream = metadata_channel->metadata_stream; assert(metadata_stream); + pthread_mutex_lock(&metadata_stream->lock); if (relayd_id != (uint64_t) -1ULL) { metadata_stream->net_seq_idx = relayd_id; ret = consumer_send_relayd_stream(metadata_stream, path); - if (ret < 0) { - goto error_stream; - } } else { - ret = utils_create_stream_file(path, metadata_stream->name, - metadata_stream->chan->tracefile_size, - metadata_stream->tracefile_count_current, - metadata_stream->uid, metadata_stream->gid, NULL); - if (ret < 0) { - goto error_stream; - } - metadata_stream->out_fd = ret; - metadata_stream->tracefile_size_current = 0; + ret = consumer_stream_create_output_files(metadata_stream, + false); + } + pthread_mutex_unlock(&metadata_stream->lock); + if (ret < 0) { + goto error_stream; } do { @@ -1068,6 +1070,7 @@ error_stream: * Clean up the stream completly because the next snapshot will use a new * metadata stream. */ + pthread_mutex_lock(&metadata_stream->lock); consumer_stream_destroy(metadata_stream, NULL); cds_list_del(&metadata_stream->send_node); metadata_channel->metadata_stream = NULL; @@ -1079,16 +1082,18 @@ error: /* * Take a snapshot of all the stream of a channel. + * RCU read-side lock and the channel lock must be held by the caller. * * Returns 0 on success, < 0 on error */ -static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, - uint64_t nb_packets_per_stream, struct lttng_consumer_local_data *ctx) +static int snapshot_channel(struct lttng_consumer_channel *channel, + uint64_t key, char *path, uint64_t relayd_id, + uint64_t nb_packets_per_stream, + struct lttng_consumer_local_data *ctx) { int ret; unsigned use_relayd = 0; unsigned long consumed_pos, produced_pos; - struct lttng_consumer_channel *channel; struct lttng_consumer_stream *stream; assert(path); @@ -1100,12 +1105,6 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, use_relayd = 1; } - channel = consumer_find_channel(key); - if (!channel) { - ERR("UST snapshot channel not found for key %" PRIu64, key); - ret = -1; - goto error; - } assert(!channel->monitor); DBG("UST consumer snapshot channel %" PRIu64, key); @@ -1114,6 +1113,19 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, /* Lock stream because we are about to change its state. */ pthread_mutex_lock(&stream->lock); + assert(channel->trace_chunk); + if (!lttng_trace_chunk_get(channel->trace_chunk)) { + /* + * Can't happen barring an internal error as the channel + * holds a reference to the trace chunk. + */ + ERR("Failed to acquire reference to channel's trace chunk"); + ret = -1; + goto error_unlock; + } + assert(!stream->trace_chunk); + stream->trace_chunk = channel->trace_chunk; + stream->net_seq_idx = relayd_id; if (use_relayd) { @@ -1122,18 +1134,13 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, goto error_unlock; } } else { - ret = utils_create_stream_file(path, stream->name, - stream->chan->tracefile_size, - stream->tracefile_count_current, - stream->uid, stream->gid, NULL); + ret = consumer_stream_create_output_files(stream, + false); if (ret < 0) { goto error_unlock; } - stream->out_fd = ret; - stream->tracefile_size_current = 0; - - DBG("UST consumer snapshot stream %s/%s (%" PRIu64 ")", path, - stream->name, stream->key); + DBG("UST consumer snapshot stream (%" PRIu64 ")", + stream->key); } /* @@ -1172,7 +1179,7 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, produced_pos, nb_packets_per_stream, stream->max_sb_size); - while (consumed_pos < produced_pos) { + while ((long) (consumed_pos - produced_pos) < 0) { ssize_t read_len; unsigned long len, padded_len; @@ -1242,7 +1249,6 @@ error_close_stream: consumer_stream_close(stream); error_unlock: pthread_mutex_unlock(&stream->lock); -error: rcu_read_unlock(); return ret; } @@ -1428,12 +1434,20 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, { int ret; struct ustctl_consumer_channel_attr attr; + const uint64_t chunk_id = msg.u.ask_channel.chunk_id.value; + const struct lttng_credentials buffer_credentials = { + .uid = msg.u.ask_channel.buffer_credentials.uid, + .gid = msg.u.ask_channel.buffer_credentials.gid, + }; /* Create a plain object and reserve a channel key. */ channel = allocate_channel(msg.u.ask_channel.session_id, - msg.u.ask_channel.pathname, msg.u.ask_channel.name, - msg.u.ask_channel.uid, msg.u.ask_channel.gid, - msg.u.ask_channel.relayd_id, msg.u.ask_channel.key, + msg.u.ask_channel.chunk_id.is_set ? + &chunk_id : NULL, + msg.u.ask_channel.pathname, + msg.u.ask_channel.name, + msg.u.ask_channel.relayd_id, + msg.u.ask_channel.key, (enum lttng_event_output) msg.u.ask_channel.output, msg.u.ask_channel.tracefile_size, msg.u.ask_channel.tracefile_count, @@ -1446,6 +1460,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_channel_error; } + LTTNG_OPTIONAL_SET(&channel->buffer_credentials, + buffer_credentials); + /* * Assign UST application UID to the channel. This value is ignored for * per PID buffers. This is specific to UST thus setting this after the @@ -1494,7 +1511,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); - ret = ask_channel(ctx, sock, channel, &attr); + ret = ask_channel(ctx, channel, &attr); if (ret < 0) { goto end_channel_error; } @@ -1745,27 +1762,35 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_SNAPSHOT_CHANNEL: { - if (msg.u.snapshot_channel.metadata) { - ret = snapshot_metadata(msg.u.snapshot_channel.key, - msg.u.snapshot_channel.pathname, - msg.u.snapshot_channel.relayd_id, - ctx); - if (ret < 0) { - ERR("Snapshot metadata failed"); - ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA; - } + struct lttng_consumer_channel *channel; + uint64_t key = msg.u.snapshot_channel.key; + + channel = consumer_find_channel(key); + if (!channel) { + DBG("UST snapshot channel not found for key %" PRIu64, key); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; } else { - ret = snapshot_channel(msg.u.snapshot_channel.key, - msg.u.snapshot_channel.pathname, - msg.u.snapshot_channel.relayd_id, - msg.u.snapshot_channel.nb_packets_per_stream, - ctx); - if (ret < 0) { - ERR("Snapshot channel failed"); - ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL; + if (msg.u.snapshot_channel.metadata) { + ret = snapshot_metadata(channel, key, + msg.u.snapshot_channel.pathname, + msg.u.snapshot_channel.relayd_id, + ctx); + if (ret < 0) { + ERR("Snapshot metadata failed"); + ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED; + } + } else { + ret = snapshot_channel(channel, key, + msg.u.snapshot_channel.pathname, + msg.u.snapshot_channel.relayd_id, + msg.u.snapshot_channel.nb_packets_per_stream, + ctx); + if (ret < 0) { + ERR("Snapshot channel failed"); + ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED; + } } } - health_code_update(); ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { @@ -1922,65 +1947,31 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } goto end_msg_sessiond; } - case LTTNG_CONSUMER_SET_CHANNEL_ROTATE_PIPE: - { - int channel_rotate_pipe; - int flags; - - ret_code = LTTCOMM_CONSUMERD_SUCCESS; - /* Successfully received the command's type. */ - ret = consumer_send_status_msg(sock, ret_code); - if (ret < 0) { - goto error_fatal; - } - - ret = lttcomm_recv_fds_unix_sock(sock, &channel_rotate_pipe, 1); - if (ret != sizeof(channel_rotate_pipe)) { - ERR("Failed to receive channel rotate pipe"); - goto error_fatal; - } - - DBG("Received channel rotate pipe (%d)", channel_rotate_pipe); - ctx->channel_rotate_pipe = channel_rotate_pipe; - /* Set the pipe as non-blocking. */ - ret = fcntl(channel_rotate_pipe, F_GETFL, 0); - if (ret == -1) { - PERROR("fcntl get flags of the channel rotate pipe"); - goto error_fatal; - } - flags = ret; - - ret = fcntl(channel_rotate_pipe, F_SETFL, flags | O_NONBLOCK); - if (ret == -1) { - PERROR("fcntl set O_NONBLOCK flag of the channel rotate pipe"); - goto error_fatal; - } - DBG("Channel rotate pipe set as non-blocking"); - ret_code = LTTCOMM_CONSUMERD_SUCCESS; - ret = consumer_send_status_msg(sock, ret_code); - if (ret < 0) { - goto error_fatal; - } - break; - } case LTTNG_CONSUMER_ROTATE_CHANNEL: { - /* - * Sample the rotate position of all the streams in this channel. - */ - ret = lttng_consumer_rotate_channel(msg.u.rotate_channel.key, - msg.u.rotate_channel.pathname, - msg.u.rotate_channel.relayd_id, - msg.u.rotate_channel.metadata, - msg.u.rotate_channel.new_chunk_id, - ctx); - if (ret < 0) { - ERR("Rotate channel failed"); - ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; - } + struct lttng_consumer_channel *channel; + uint64_t key = msg.u.rotate_channel.key; - health_code_update(); + channel = consumer_find_channel(key); + if (!channel) { + DBG("Channel %" PRIu64 " not found", key); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } else { + /* + * Sample the rotate position of all the streams in + * this channel. + */ + ret = lttng_consumer_rotate_channel(channel, key, + msg.u.rotate_channel.relayd_id, + msg.u.rotate_channel.metadata, + ctx); + if (ret < 0) { + ERR("Rotate channel failed"); + ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL; + } + health_code_update(); + } ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ @@ -1994,30 +1985,20 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, * handle this, but it needs to be after the * consumer_send_status_msg() call. */ - ret = lttng_consumer_rotate_ready_streams( - msg.u.rotate_channel.key, ctx); - if (ret < 0) { - ERR("Rotate channel failed"); - ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + if (channel) { + ret = lttng_consumer_rotate_ready_streams( + channel, key, ctx); + if (ret < 0) { + ERR("Rotate channel failed"); + } } break; } - case LTTNG_CONSUMER_ROTATE_RENAME: + case LTTNG_CONSUMER_INIT: { - DBG("Consumer rename session %" PRIu64 " after rotation", - msg.u.rotate_rename.session_id); - ret = lttng_consumer_rotate_rename(msg.u.rotate_rename.old_path, - msg.u.rotate_rename.new_path, - msg.u.rotate_rename.uid, - msg.u.rotate_rename.gid, - msg.u.rotate_rename.relayd_id); - if (ret < 0) { - ERR("Rotate rename failed"); - ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; - } - + ret_code = lttng_consumer_init_command(ctx, + msg.u.init.sessiond_uuid); health_code_update(); - ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ @@ -2025,59 +2006,109 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } break; } - case LTTNG_CONSUMER_ROTATE_PENDING_RELAY: + case LTTNG_CONSUMER_CREATE_TRACE_CHUNK: { - uint32_t pending; - - DBG("Consumer rotate pending on relay for session %" PRIu64, - msg.u.rotate_pending_relay.session_id); - pending = lttng_consumer_rotate_pending_relay( - msg.u.rotate_pending_relay.session_id, - msg.u.rotate_pending_relay.relayd_id, - msg.u.rotate_pending_relay.chunk_id); - if (pending < 0) { - ERR("Rotate pending relay failed"); - ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; - } + const struct lttng_credentials credentials = { + .uid = msg.u.create_trace_chunk.credentials.value.uid, + .gid = msg.u.create_trace_chunk.credentials.value.gid, + }; + const bool is_local_trace = + !msg.u.create_trace_chunk.relayd_id.is_set; + const uint64_t relayd_id = + msg.u.create_trace_chunk.relayd_id.value; + const char *chunk_override_name = + *msg.u.create_trace_chunk.override_name ? + msg.u.create_trace_chunk.override_name : + NULL; + LTTNG_OPTIONAL(struct lttng_directory_handle) chunk_directory_handle = + LTTNG_OPTIONAL_INIT; - health_code_update(); + /* + * The session daemon will only provide a chunk directory file + * descriptor for local traces. + */ + if (is_local_trace) { + int chunk_dirfd; - ret = consumer_send_status_msg(sock, ret_code); - if (ret < 0) { - /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; - } + /* Acnowledge the reception of the command. */ + ret = consumer_send_status_msg(sock, + LTTCOMM_CONSUMERD_SUCCESS); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } - /* Send back returned value to session daemon */ - ret = lttcomm_send_unix_sock(sock, &pending, sizeof(pending)); - if (ret < 0) { - PERROR("send data pending ret code"); - goto error_fatal; + ret = lttcomm_recv_fds_unix_sock(sock, &chunk_dirfd, 1); + if (ret != sizeof(chunk_dirfd)) { + ERR("Failed to receive trace chunk directory file descriptor"); + goto error_fatal; + } + + DBG("Received trace chunk directory fd (%d)", + chunk_dirfd); + ret = lttng_directory_handle_init_from_dirfd( + &chunk_directory_handle.value, + chunk_dirfd); + if (ret) { + ERR("Failed to initialize chunk directory handle from directory file descriptor"); + if (close(chunk_dirfd)) { + PERROR("Failed to close chunk directory file descriptor"); + } + goto error_fatal; + } + chunk_directory_handle.is_set = true; + } + + ret_code = lttng_consumer_create_trace_chunk( + !is_local_trace ? &relayd_id : NULL, + msg.u.create_trace_chunk.session_id, + msg.u.create_trace_chunk.chunk_id, + (time_t) msg.u.create_trace_chunk + .creation_timestamp, + chunk_override_name, + msg.u.create_trace_chunk.credentials.is_set ? + &credentials : + NULL, + chunk_directory_handle.is_set ? + &chunk_directory_handle.value : + NULL); + + if (chunk_directory_handle.is_set) { + lttng_directory_handle_fini( + &chunk_directory_handle.value); } - break; + goto end_msg_sessiond; } - case LTTNG_CONSUMER_MKDIR: + case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK: { - DBG("Consumer mkdir %s in session %" PRIu64, - msg.u.mkdir.path, - msg.u.mkdir.session_id); - ret = lttng_consumer_mkdir(msg.u.mkdir.path, - msg.u.mkdir.uid, - msg.u.mkdir.gid, - msg.u.mkdir.relayd_id); - if (ret < 0) { - ERR("consumer mkdir failed"); - ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; - } - - health_code_update(); - - ret = consumer_send_status_msg(sock, ret_code); - if (ret < 0) { - /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; - } - break; + enum lttng_trace_chunk_command_type close_command = + msg.u.close_trace_chunk.close_command.value; + const uint64_t relayd_id = + msg.u.close_trace_chunk.relayd_id.value; + + ret_code = lttng_consumer_close_trace_chunk( + msg.u.close_trace_chunk.relayd_id.is_set ? + &relayd_id : + NULL, + msg.u.close_trace_chunk.session_id, + msg.u.close_trace_chunk.chunk_id, + (time_t) msg.u.close_trace_chunk.close_timestamp, + msg.u.close_trace_chunk.close_command.is_set ? + &close_command : + NULL); + goto end_msg_sessiond; + } + case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS: + { + const uint64_t relayd_id = + msg.u.trace_chunk_exists.relayd_id.value; + + ret_code = lttng_consumer_trace_chunk_exists( + msg.u.trace_chunk_exists.relayd_id.is_set ? + &relayd_id : NULL, + msg.u.trace_chunk_exists.session_id, + msg.u.trace_chunk_exists.chunk_id); + goto end_msg_sessiond; } default: break; @@ -2111,6 +2142,7 @@ end_msg_sessiond: return 1; end_channel_error: if (channel) { + pthread_mutex_unlock(&channel->lock); /* * Free channel here since no one has a reference to it. We don't * free after that because a stream can store this pointer. @@ -2277,6 +2309,7 @@ void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan) assert(chan); assert(chan->uchan); + assert(chan->buffer_credentials.is_set); if (chan->switch_timer_enabled == 1) { consumer_timer_switch_stop(chan); @@ -2295,7 +2328,9 @@ void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan) if (ret) { ERR("Cannot get stream shm path"); } - ret = run_as_unlink(shm_path, chan->uid, chan->gid); + ret = run_as_unlink(shm_path, + chan->buffer_credentials.value.uid, + chan->buffer_credentials.value.gid); if (ret) { PERROR("unlink %s", shm_path); } @@ -2307,13 +2342,16 @@ void lttng_ustconsumer_free_channel(struct lttng_consumer_channel *chan) { assert(chan); assert(chan->uchan); + assert(chan->buffer_credentials.is_set); consumer_metadata_cache_destroy(chan); ustctl_destroy_channel(chan->uchan); /* Try to rmdir all directories under shm_path root. */ if (chan->root_shm_path[0]) { (void) run_as_rmdir_recursive(chan->root_shm_path, - chan->uid, chan->gid); + chan->buffer_credentials.value.uid, + chan->buffer_credentials.value.gid, + LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG); } free(chan->stream_fds); } @@ -2486,6 +2524,13 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) stream->ust_metadata_pushed); ret = write_len; + /* + * Switch packet (but don't open the next one) on every commit of + * a metadata packet. Since the subbuffer is fully filled (with padding, + * if needed), the stream is "quiescent" after this commit. + */ + ustctl_flush_buffer(stream->ustream, 1); + stream->quiescent = true; end: pthread_mutex_unlock(&stream->chan->metadata_cache->lock); return ret; @@ -2530,7 +2575,6 @@ int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx, retry = 1; } - ustctl_flush_buffer(metadata->ustream, 1); ret = ustctl_snapshot(metadata->ustream); if (ret < 0) { if (errno != EAGAIN) { @@ -2663,12 +2707,12 @@ end: /* * Read subbuffer from the given stream. * - * Stream lock MUST be acquired. + * Stream and channel locks MUST be acquired by the caller. * * Return 0 on success else a negative value. */ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx, bool *rotated) + struct lttng_consumer_local_data *ctx) { unsigned long len, subbuf_size, padding; int err, write_index = 1, rotation_ret; @@ -2713,7 +2757,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, */ if (stream->rotate_ready) { DBG("Rotate stream before extracting data"); - rotation_ret = lttng_consumer_rotate_stream(ctx, stream, rotated); + rotation_ret = lttng_consumer_rotate_stream(ctx, stream); if (rotation_ret < 0) { ERR("Stream rotation error"); ret = -1; @@ -2734,7 +2778,6 @@ retry: if (ret <= 0) { goto error; } - ustctl_flush_buffer(stream->ustream, 1); goto retry; } @@ -2864,7 +2907,7 @@ rotate: */ rotation_ret = lttng_consumer_stream_is_rotate_ready(stream); if (rotation_ret == 1) { - rotation_ret = lttng_consumer_rotate_stream(ctx, stream, rotated); + rotation_ret = lttng_consumer_rotate_stream(ctx, stream); if (rotation_ret < 0) { ERR("Stream rotation error"); ret = -1; @@ -2890,31 +2933,16 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream) assert(stream); - /* Don't create anything if this is set for streaming. */ - if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) { - ret = utils_create_stream_file(stream->chan->pathname, stream->name, - stream->chan->tracefile_size, stream->tracefile_count_current, - stream->uid, stream->gid, NULL); - if (ret < 0) { + /* + * Don't create anything if this is set for streaming or if there is + * no current trace chunk on the parent channel. + */ + if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor && + stream->chan->trace_chunk) { + ret = consumer_stream_create_output_files(stream, true); + if (ret) { goto error; } - stream->out_fd = ret; - stream->tracefile_size_current = 0; - - if (!stream->metadata_flag) { - struct lttng_index_file *index_file; - - index_file = lttng_index_file_create(stream->chan->pathname, - stream->name, stream->uid, stream->gid, - stream->chan->tracefile_size, - stream->tracefile_count_current, - CTF_INDEX_MAJOR, CTF_INDEX_MINOR); - if (!index_file) { - goto error; - } - assert(!stream->index_file); - stream->index_file = index_file; - } } ret = 0; @@ -3120,7 +3148,7 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, request.key = channel->key; DBG("Sending metadata request to sessiond, session id %" PRIu64 - ", per-pid %" PRIu64 ", app UID %u and channek key %" PRIu64, + ", per-pid %" PRIu64 ", app UID %u and channel key %" PRIu64, request.session_id, request.session_id_per_pid, request.uid, request.key);