X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=481bbeba04b2f91eef880335cbd3ac8ef5cb9302;hp=b8c1a3cd9a74559e3cf7a79ba8e4610ae8038097;hb=4586339794f1139978f2b982d8124e3f5c6bb28e;hpb=f263b7fd113e51d0737554e8232b8669e142a260 diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index b8c1a3cd9..481bbeba0 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -17,6 +17,7 @@ */ #define _GNU_SOURCE +#define _LGPL_SOURCE #include #include #include @@ -46,6 +47,8 @@ #include "ust-consumer.h" +#define INT_MAX_STR_LEN 12 /* includes \0 */ + extern struct lttng_consumer_global_data consumer_data; extern int consumer_poll_timeout; extern volatile int consumer_quit; @@ -79,6 +82,7 @@ static void destroy_channel(struct lttng_consumer_channel *channel) */ if (channel->uchan) { lttng_ustconsumer_del_channel(channel); + lttng_ustconsumer_free_channel(channel); } free(channel); } @@ -123,14 +127,16 @@ static struct lttng_consumer_channel *allocate_channel(uint64_t session_id, uint64_t relayd_id, uint64_t key, enum lttng_event_output output, uint64_t tracefile_size, uint64_t tracefile_count, uint64_t session_id_per_pid, unsigned int monitor, - unsigned int live_timer_interval) + unsigned int live_timer_interval, + const char *root_shm_path, const char *shm_path) { assert(pathname); assert(name); return consumer_allocate_channel(key, session_id, pathname, name, uid, gid, relayd_id, output, tracefile_size, - tracefile_count, session_id_per_pid, monitor, live_timer_interval); + tracefile_count, session_id_per_pid, monitor, + live_timer_interval, root_shm_path, shm_path); } /* @@ -239,6 +245,26 @@ error: return ret; } +static +int get_stream_shm_path(char *stream_shm_path, const char *shm_path, int cpu) +{ + char cpu_nr[INT_MAX_STR_LEN]; /* int max len */ + int ret; + + strncpy(stream_shm_path, shm_path, PATH_MAX); + stream_shm_path[PATH_MAX - 1] = '\0'; + ret = snprintf(cpu_nr, INT_MAX_STR_LEN, "%i", cpu); + if (ret < 0) { + PERROR("snprintf"); + goto end; + } + strncat(stream_shm_path, cpu_nr, + PATH_MAX - strlen(stream_shm_path) - 1); + ret = 0; +end: + return ret; +} + /* * Create streams for the given channel using liblttng-ust-ctl. * @@ -327,8 +353,12 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, /* Keep stream reference when creating metadata. */ if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) { channel->metadata_stream = stream; - stream->ust_metadata_poll_pipe[0] = ust_metadata_pipe[0]; - stream->ust_metadata_poll_pipe[1] = ust_metadata_pipe[1]; + if (channel->monitor) { + /* Set metadata poll pipe if we created one */ + memcpy(stream->ust_metadata_poll_pipe, + ust_metadata_pipe, + sizeof(ust_metadata_pipe)); + } } } @@ -339,20 +369,87 @@ error_alloc: return ret; } +/* + * create_posix_shm is never called concurrently within a process. + */ +static +int create_posix_shm(void) +{ + char tmp_name[NAME_MAX]; + int shmfd, ret; + + ret = snprintf(tmp_name, NAME_MAX, "/ust-shm-consumer-%d", getpid()); + if (ret < 0) { + PERROR("snprintf"); + return -1; + } + /* + * Allocate shm, and immediately unlink its shm oject, keeping + * only the file descriptor as a reference to the object. + * We specifically do _not_ use the / at the beginning of the + * pathname so that some OS implementations can keep it local to + * the process (POSIX leaves this implementation-defined). + */ + shmfd = shm_open(tmp_name, O_CREAT | O_EXCL | O_RDWR, 0700); + if (shmfd < 0) { + PERROR("shm_open"); + goto error_shm_open; + } + ret = shm_unlink(tmp_name); + if (ret < 0 && errno != ENOENT) { + PERROR("shm_unlink"); + goto error_shm_release; + } + return shmfd; + +error_shm_release: + ret = close(shmfd); + if (ret) { + PERROR("close"); + } +error_shm_open: + return -1; +} + +static int open_ust_stream_fd(struct lttng_consumer_channel *channel, + struct ustctl_consumer_channel_attr *attr, + int cpu) +{ + char shm_path[PATH_MAX]; + int ret; + + if (!channel->shm_path[0]) { + return create_posix_shm(); + } + ret = get_stream_shm_path(shm_path, channel->shm_path, cpu); + if (ret) { + goto error_shm_path; + } + return run_as_open(shm_path, + O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, + channel->uid, channel->gid); + +error_shm_path: + return -1; +} + /* * Create an UST channel with the given attributes and send it to the session * daemon using the ust ctl API. * * Return 0 on success or else a negative value. */ -static int create_ust_channel(struct ustctl_consumer_channel_attr *attr, - struct ustctl_consumer_channel **chanp) +static int create_ust_channel(struct lttng_consumer_channel *channel, + struct ustctl_consumer_channel_attr *attr, + struct ustctl_consumer_channel **ust_chanp) { - int ret; - struct ustctl_consumer_channel *channel; + int ret, nr_stream_fds, i, j; + int *stream_fds; + struct ustctl_consumer_channel *ust_channel; + assert(channel); assert(attr); - assert(chanp); + assert(ust_chanp); DBG3("Creating channel to ustctl with attr: [overwrite: %d, " "subbuf_size: %" PRIu64 ", num_subbuf: %" PRIu64 ", " @@ -361,17 +458,64 @@ static int create_ust_channel(struct ustctl_consumer_channel_attr *attr, attr->num_subbuf, attr->switch_timer_interval, attr->read_timer_interval, attr->output, attr->type); - channel = ustctl_create_channel(attr); - if (!channel) { + if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) + nr_stream_fds = 1; + else + nr_stream_fds = ustctl_get_nr_stream_per_channel(); + stream_fds = zmalloc(nr_stream_fds * sizeof(*stream_fds)); + if (!stream_fds) { + ret = -1; + goto error_alloc; + } + for (i = 0; i < nr_stream_fds; i++) { + stream_fds[i] = open_ust_stream_fd(channel, attr, i); + if (stream_fds[i] < 0) { + ret = -1; + goto error_open; + } + } + ust_channel = ustctl_create_channel(attr, stream_fds, nr_stream_fds); + if (!ust_channel) { ret = -1; goto error_create; } - - *chanp = channel; + channel->nr_stream_fds = nr_stream_fds; + channel->stream_fds = stream_fds; + *ust_chanp = ust_channel; return 0; error_create: +error_open: + for (j = i - 1; j >= 0; j--) { + int closeret; + + closeret = close(stream_fds[j]); + if (closeret) { + PERROR("close"); + } + if (channel->shm_path[0]) { + char shm_path[PATH_MAX]; + + closeret = get_stream_shm_path(shm_path, + channel->shm_path, j); + if (closeret) { + ERR("Cannot get stream shm path"); + } + closeret = run_as_unlink(shm_path, + channel->uid, channel->gid); + if (closeret) { + PERROR("unlink %s", shm_path); + } + } + } + /* Try to rmdir all directories under shm_path root. */ + if (channel->root_shm_path[0]) { + (void) run_as_recursive_rmdir(channel->root_shm_path, + channel->uid, channel->gid); + } + free(stream_fds); +error_alloc: return ret; } @@ -433,7 +577,7 @@ static int send_sessiond_channel(int sock, if (relayd_error) { *relayd_error = 1; } - ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL; + ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; } if (net_seq_idx == -1ULL) { net_seq_idx = stream->net_seq_idx; @@ -525,7 +669,7 @@ static int ask_channel(struct lttng_consumer_local_data *ctx, int sock, channel->nb_init_stream_left = 0; /* The reply msg status is handled in the following call. */ - ret = create_ust_channel(attr, &channel->uchan); + ret = create_ust_channel(channel, attr, &channel->uchan); if (ret < 0) { goto end; } @@ -859,7 +1003,7 @@ error: * Returns 0 on success, < 0 on error */ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, - uint64_t max_stream_size, struct lttng_consumer_local_data *ctx) + uint64_t nb_packets_per_stream, struct lttng_consumer_local_data *ctx) { int ret; unsigned use_relayd = 0; @@ -941,12 +1085,13 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, /* * The original value is sent back if max stream size is larger than - * the possible size of the snapshot. Also, we asume that the session + * the possible size of the snapshot. Also, we assume that the session * daemon should never send a maximum stream size that is lower than * subbuffer size. */ - consumed_pos = consumer_get_consumed_maxsize(consumed_pos, - produced_pos, max_stream_size); + consumed_pos = consumer_get_consume_start_pos(consumed_pos, + produced_pos, nb_packets_per_stream, + stream->max_sb_size); while (consumed_pos < produced_pos) { ssize_t read_len; @@ -1023,7 +1168,12 @@ error: } /* - * Receive the metadata updates from the sessiond. + * Receive the metadata updates from the sessiond. Supports receiving + * overlapping metadata, but is needs to always belong to a contiguous + * range starting from 0. + * Be careful about the locks held when calling this function: it needs + * the metadata cache flush to concurrently progress in order to + * complete. */ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, uint64_t len, struct lttng_consumer_channel *channel, @@ -1146,7 +1296,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, relayd = consumer_find_relayd(index); if (relayd == NULL) { DBG("Unable to find relayd %" PRIu64, index); - ret_code = LTTNG_ERR_NO_CONSUMER; + ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; } /* @@ -1208,7 +1358,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.ask_channel.tracefile_count, msg.u.ask_channel.session_id_per_pid, msg.u.ask_channel.monitor, - msg.u.ask_channel.live_timer_interval); + msg.u.ask_channel.live_timer_interval, + msg.u.ask_channel.root_shm_path, + msg.u.ask_channel.shm_path); if (!channel) { goto end_channel_error; } @@ -1327,7 +1479,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, channel = consumer_find_channel(key); if (!channel) { ERR("UST consumer get channel key %" PRIu64 " not found", key); - ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND; + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; goto end_msg_sessiond; } @@ -1435,6 +1587,15 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); + if (!len) { + /* + * There is nothing to receive. We have simply + * checked whether the channel can be found. + */ + ret_code = LTTCOMM_CONSUMERD_SUCCESS; + goto end_msg_sessiond; + } + /* Tell session daemon we are ready to receive the metadata. */ ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS); if (ret < 0) { @@ -1483,17 +1644,17 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ctx); if (ret < 0) { ERR("Snapshot metadata failed"); - ret_code = LTTNG_ERR_UST_META_FAIL; + ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA; } } else { ret = snapshot_channel(msg.u.snapshot_channel.key, msg.u.snapshot_channel.pathname, msg.u.snapshot_channel.relayd_id, - msg.u.snapshot_channel.max_stream_size, + msg.u.snapshot_channel.nb_packets_per_stream, ctx); if (ret < 0) { ERR("Snapshot channel failed"); - ret_code = LTTNG_ERR_UST_CHAN_FAIL; + ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL; } } @@ -1662,14 +1823,49 @@ void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream) void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan) { + int i; + assert(chan); assert(chan->uchan); if (chan->switch_timer_enabled == 1) { consumer_timer_switch_stop(chan); } + for (i = 0; i < chan->nr_stream_fds; i++) { + int ret; + + ret = close(chan->stream_fds[i]); + if (ret) { + PERROR("close"); + } + if (chan->shm_path[0]) { + char shm_path[PATH_MAX]; + + ret = get_stream_shm_path(shm_path, chan->shm_path, i); + if (ret) { + ERR("Cannot get stream shm path"); + } + ret = run_as_unlink(shm_path, chan->uid, chan->gid); + if (ret) { + PERROR("unlink %s", shm_path); + } + } + } + /* Try to rmdir all directories under shm_path root. */ + if (chan->root_shm_path[0]) { + (void) run_as_recursive_rmdir(chan->root_shm_path, + chan->uid, chan->gid); + } +} + +void lttng_ustconsumer_free_channel(struct lttng_consumer_channel *chan) +{ + assert(chan); + assert(chan->uchan); + consumer_metadata_cache_destroy(chan); ustctl_destroy_channel(chan->uchan); + free(chan->stream_fds); } void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream) @@ -1768,7 +1964,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) int ret; pthread_mutex_lock(&stream->chan->metadata_cache->lock); - if (stream->chan->metadata_cache->contiguous + if (stream->chan->metadata_cache->max_offset == stream->ust_metadata_pushed) { ret = 0; goto end; @@ -1776,7 +1972,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan, &stream->chan->metadata_cache->data[stream->ust_metadata_pushed], - stream->chan->metadata_cache->contiguous + stream->chan->metadata_cache->max_offset - stream->ust_metadata_pushed); assert(write_len != 0); if (write_len < 0) { @@ -1786,7 +1982,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) } stream->ust_metadata_pushed += write_len; - assert(stream->chan->metadata_cache->contiguous >= + assert(stream->chan->metadata_cache->max_offset >= stream->ust_metadata_pushed); ret = write_len; @@ -1800,7 +1996,9 @@ end: * Sync metadata meaning request them to the session daemon and snapshot to the * metadata thread can consumer them. * - * Metadata stream lock MUST be acquired. + * Metadata stream lock is held here, but we need to release it when + * interacting with sessiond, else we cause a deadlock with live + * awaiting on metadata to be pushed out. * * Return 0 if new metadatda is available, EAGAIN if the metadata stream * is empty or a negative value on error. @@ -1814,6 +2012,7 @@ int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx, assert(ctx); assert(metadata); + pthread_mutex_unlock(&metadata->lock); /* * Request metadata from the sessiond, but don't wait for the flush * because we locked the metadata thread. @@ -1822,6 +2021,7 @@ int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx, if (ret < 0) { goto end; } + pthread_mutex_lock(&metadata->lock); ret = commit_one_metadata_packet(metadata); if (ret <= 0) { @@ -1881,7 +2081,7 @@ static int notify_if_more_data(struct lttng_consumer_stream *stream, goto end; } - ret = ustctl_put_next_subbuf(ustream); + ret = ustctl_put_subbuf(ustream); assert(!ret); /* This stream still has data. Flag it and wake up the data thread. */ @@ -2048,7 +2248,23 @@ retry: /* * In live, block until all the metadata is sent. */ + pthread_mutex_lock(&stream->metadata_timer_lock); + assert(!stream->missed_metadata_flush); + stream->waiting_on_metadata = true; + pthread_mutex_unlock(&stream->metadata_timer_lock); + err = consumer_stream_sync_metadata(ctx, stream->session_id); + + pthread_mutex_lock(&stream->metadata_timer_lock); + stream->waiting_on_metadata = false; + if (stream->missed_metadata_flush) { + stream->missed_metadata_flush = false; + pthread_mutex_unlock(&stream->metadata_timer_lock); + (void) consumer_flush_ust_index(stream); + } else { + pthread_mutex_unlock(&stream->metadata_timer_lock); + } + if (err < 0) { goto end; } @@ -2129,7 +2345,7 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream) uint64_t contiguous, pushed; /* Ease our life a bit. */ - contiguous = stream->chan->metadata_cache->contiguous; + contiguous = stream->chan->metadata_cache->max_offset; pushed = stream->ust_metadata_pushed; /* @@ -2144,7 +2360,7 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream) */ DBG("UST consumer metadata pending check: contiguous %" PRIu64 " vs pushed %" PRIu64, contiguous, pushed); - assert(((int64_t) contiguous - pushed) >= 0); + assert(((int64_t) (contiguous - pushed)) >= 0); if ((contiguous != pushed) || (((int64_t) contiguous - pushed) > 0 || contiguous == 0)) { ret = 1; /* Data is pending */ @@ -2258,6 +2474,10 @@ void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream) * function or any of its callees. Timers have a very strict locking * semantic with respect to teardown. Failure to respect this semantic * introduces deadlocks. + * + * DON'T hold the metadata lock when calling this function, else this + * can cause deadlock involving consumer awaiting for metadata to be + * pushed out due to concurrent interaction with the session daemon. */ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, struct lttng_consumer_channel *channel, int timer, int wait) @@ -2383,3 +2603,15 @@ end: pthread_mutex_unlock(&ctx->metadata_socket_lock); return ret; } + +/* + * Return the ustctl call for the get stream id. + */ +int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream, + uint64_t *stream_id) +{ + assert(stream); + assert(stream_id); + + return ustctl_get_stream_id(stream->ustream, stream_id); +}