X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=b0b926bb01a1f65c58c9d25da47bc980b4125e83;hp=c909548907d78c435392c4fa3c428ac0bab51d67;hb=9d0482e7fb70c4deaebe57872cbd75a6a1c213fb;hpb=212d67a218a0e805950f85bd95143a996e957322 diff --git a/src/common/consumer.c b/src/common/consumer.c index c90954890..b0b926bb0 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -18,6 +18,7 @@ */ #define _GNU_SOURCE +#define _LGPL_SOURCE #include #include #include @@ -34,6 +35,7 @@ #include #include #include +#include #include #include #include @@ -250,6 +252,32 @@ struct lttng_consumer_channel *consumer_find_channel(uint64_t key) return channel; } +/* + * There is a possibility that the consumer does not have enough time between + * the close of the channel on the session daemon and the cleanup in here thus + * once we have a channel add with an existing key, we know for sure that this + * channel will eventually get cleaned up by all streams being closed. + * + * This function just nullifies the already existing channel key. + */ +static void steal_channel_key(uint64_t key) +{ + struct lttng_consumer_channel *channel; + + rcu_read_lock(); + channel = consumer_find_channel(key); + if (channel) { + channel->key = (uint64_t) -1ULL; + /* + * We don't want the lookup to match, but we still need to iterate on + * this channel when iterating over the hash table. Just change the + * node key. + */ + channel->node.key = (uint64_t) -1ULL; + } + rcu_read_unlock(); +} + static void free_channel_rcu(struct rcu_head *head) { struct lttng_ht_node_u64 *node = @@ -979,43 +1007,35 @@ end: /* * Add a channel to the global list protected by a mutex. * - * On success 0 is returned else a negative value. + * Always return 0 indicating success. */ int consumer_add_channel(struct lttng_consumer_channel *channel, struct lttng_consumer_local_data *ctx) { - int ret = 0; - struct lttng_ht_node_u64 *node; - struct lttng_ht_iter iter; - pthread_mutex_lock(&consumer_data.lock); pthread_mutex_lock(&channel->lock); pthread_mutex_lock(&channel->timer_lock); - rcu_read_lock(); - lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter); - node = lttng_ht_iter_get_node_u64(&iter); - if (node != NULL) { - /* Channel already exist. Ignore the insertion */ - ERR("Consumer add channel key %" PRIu64 " already exists!", - channel->key); - ret = -EEXIST; - goto end; - } + /* + * This gives us a guarantee that the channel we are about to add to the + * channel hash table will be unique. See this function comment on the why + * we need to steel the channel key at this stage. + */ + steal_channel_key(channel->key); + rcu_read_lock(); lttng_ht_add_unique_u64(consumer_data.channel_ht, &channel->node); - -end: rcu_read_unlock(); + pthread_mutex_unlock(&channel->timer_lock); pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); - if (!ret && channel->wait_fd != -1 && - channel->type == CONSUMER_CHANNEL_TYPE_DATA) { + if (channel->wait_fd != -1 && channel->type == CONSUMER_CHANNEL_TYPE_DATA) { notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD); } - return ret; + + return 0; } /* @@ -1073,12 +1093,15 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, */ (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe); (*pollfd)[i].events = POLLIN | POLLPRI; + + (*pollfd)[i + 1].fd = lttng_pipe_get_readfd(ctx->consumer_wakeup_pipe); + (*pollfd)[i + 1].events = POLLIN | POLLPRI; return i; } /* - * Poll on the should_quit pipe and the command socket return -1 on error and - * should exit, 0 if data is available on the command socket + * Poll on the should_quit pipe and the command socket return -1 on + * error, 1 if should exit, 0 if data is available on the command socket */ int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll) { @@ -1094,16 +1117,13 @@ restart: goto restart; } PERROR("Poll error"); - goto exit; + return -1; } if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) { DBG("consumer_should_quit wake up"); - goto exit; + return 1; } return 0; - -exit: - return -1; } /* @@ -1272,18 +1292,17 @@ struct lttng_consumer_local_data *lttng_consumer_create( goto error_poll_pipe; } + ctx->consumer_wakeup_pipe = lttng_pipe_open(0); + if (!ctx->consumer_wakeup_pipe) { + goto error_wakeup_pipe; + } + ret = pipe(ctx->consumer_should_quit); if (ret < 0) { PERROR("Error creating recv pipe"); goto error_quit_pipe; } - ret = pipe(ctx->consumer_thread_pipe); - if (ret < 0) { - PERROR("Error creating thread pipe"); - goto error_thread_pipe; - } - ret = pipe(ctx->consumer_channel_pipe); if (ret < 0) { PERROR("Error creating channel pipe"); @@ -1295,22 +1314,15 @@ struct lttng_consumer_local_data *lttng_consumer_create( goto error_metadata_pipe; } - ret = utils_create_pipe(ctx->consumer_splice_metadata_pipe); - if (ret < 0) { - goto error_splice_pipe; - } - return ctx; -error_splice_pipe: - lttng_pipe_destroy(ctx->consumer_metadata_pipe); error_metadata_pipe: utils_close_pipe(ctx->consumer_channel_pipe); error_channel_pipe: - utils_close_pipe(ctx->consumer_thread_pipe); -error_thread_pipe: utils_close_pipe(ctx->consumer_should_quit); error_quit_pipe: + lttng_pipe_destroy(ctx->consumer_wakeup_pipe); +error_wakeup_pipe: lttng_pipe_destroy(ctx->consumer_data_pipe); error_poll_pipe: free(ctx); @@ -1378,6 +1390,10 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) DBG("Consumer destroying it. Closing everything."); + if (!ctx) { + return; + } + destroy_data_stream_ht(data_ht); destroy_metadata_stream_ht(metadata_ht); @@ -1389,12 +1405,11 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) if (ret) { PERROR("close"); } - utils_close_pipe(ctx->consumer_thread_pipe); utils_close_pipe(ctx->consumer_channel_pipe); lttng_pipe_destroy(ctx->consumer_data_pipe); lttng_pipe_destroy(ctx->consumer_metadata_pipe); + lttng_pipe_destroy(ctx->consumer_wakeup_pipe); utils_close_pipe(ctx->consumer_should_quit); - utils_close_pipe(ctx->consumer_splice_metadata_pipe); unlink(ctx->consumer_command_sock_path); free(ctx); @@ -1415,7 +1430,7 @@ static int write_relayd_metadata_id(int fd, ret = lttng_write(fd, (void *) &hdr, sizeof(hdr)); if (ret < sizeof(hdr)) { /* - * This error means that the fd's end is closed so ignore the perror + * This error means that the fd's end is closed so ignore the PERROR * not to clubber the error output since this can happen in a normal * code path. */ @@ -1457,7 +1472,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( { unsigned long mmap_offset; void *mmap_base; - ssize_t ret = 0, written = 0; + ssize_t ret = 0; off_t orig_offset = stream->out_fd_offset; /* Default is on the disk */ int outfd = stream->out_fd; @@ -1481,9 +1496,9 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( case LTTNG_CONSUMER_KERNEL: mmap_base = stream->mmap_base; ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset); - if (ret != 0) { + if (ret < 0) { + ret = -errno; PERROR("tracer ctl get_mmap_read_offset"); - written = -errno; goto end; } break; @@ -1492,13 +1507,13 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( mmap_base = lttng_ustctl_get_mmap_base(stream); if (!mmap_base) { ERR("read mmap get mmap base for stream %s", stream->name); - written = -EPERM; + ret = -EPERM; goto end; } ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset); if (ret != 0) { PERROR("tracer ctl get_mmap_read_offset"); - written = ret; + ret = -EINVAL; goto end; } break; @@ -1522,30 +1537,20 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( } ret = write_relayd_stream_header(stream, netlen, padding, relayd); - if (ret >= 0) { - /* Use the returned socket. */ - outfd = ret; + if (ret < 0) { + relayd_hang_up = 1; + goto write_error; + } + /* Use the returned socket. */ + outfd = ret; - /* Write metadata stream id before payload */ - if (stream->metadata_flag) { - ret = write_relayd_metadata_id(outfd, stream, relayd, padding); - if (ret < 0) { - written = ret; - /* Socket operation failed. We consider the relayd dead */ - if (ret == -EPIPE || ret == -EINVAL) { - relayd_hang_up = 1; - goto write_error; - } - goto end; - } - } - } else { - /* Socket operation failed. We consider the relayd dead */ - if (ret == -EPIPE || ret == -EINVAL) { + /* Write metadata stream id before payload */ + if (stream->metadata_flag) { + ret = write_relayd_metadata_id(outfd, stream, relayd, padding); + if (ret < 0) { relayd_hang_up = 1; goto write_error; } - /* Else, use the default set before which is the filesystem. */ } } else { /* No streaming, we have to set the len with the full padding */ @@ -1602,13 +1607,12 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( * amount written. */ if (ret < 0) { - written = -errno; - } else { - written = ret; + ret = -errno; } + relayd_hang_up = 1; /* Socket operation failed. We consider the relayd dead */ - if (errno == EPIPE || errno == EINVAL) { + if (errno == EPIPE || errno == EINVAL || errno == EBADF) { /* * This is possible if the fd is closed on the other side * (outfd) or any write problem. It can be verbose a bit for a @@ -1616,16 +1620,13 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( * abruptly. This can happen so set this to a DBG statement. */ DBG("Consumer mmap write detected relayd hang up"); - relayd_hang_up = 1; - goto write_error; + } else { + /* Unhandled error, print it and stop function right now. */ + PERROR("Error in write mmap (ret %zd != len %lu)", ret, len); } - - /* Unhandled error, print it and stop function right now. */ - PERROR("Error in write mmap (ret %zd != len %lu)", ret, len); - goto end; + goto write_error; } stream->output_written += ret; - written = ret; /* This call is useless on a socket so better save a syscall. */ if (!relayd) { @@ -1652,7 +1653,7 @@ end: } rcu_read_unlock(); - return written; + return ret; } /* @@ -1697,25 +1698,15 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( if (stream->net_seq_idx != (uint64_t) -1ULL) { relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd == NULL) { - ret = -EPIPE; + written = -ret; goto end; } } - - /* - * Choose right pipe for splice. Metadata and trace data are handled by - * different threads hence the use of two pipes in order not to race or - * corrupt the written data. - */ - if (stream->metadata_flag) { - splice_pipe = ctx->consumer_splice_metadata_pipe; - } else { - splice_pipe = ctx->consumer_thread_pipe; - } + splice_pipe = stream->splice_pipe; /* Write metadata stream id before payload */ if (relayd) { - int total_len = len; + unsigned long total_len = len; if (stream->metadata_flag) { /* @@ -1728,31 +1719,21 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( padding); if (ret < 0) { written = ret; - /* Socket operation failed. We consider the relayd dead */ - if (ret == -EBADF) { - WARN("Remote relayd disconnected. Stopping"); - relayd_hang_up = 1; - goto write_error; - } - goto end; + relayd_hang_up = 1; + goto write_error; } total_len += sizeof(struct lttcomm_relayd_metadata_payload); } ret = write_relayd_stream_header(stream, total_len, padding, relayd); - if (ret >= 0) { - /* Use the returned socket. */ - outfd = ret; - } else { - /* Socket operation failed. We consider the relayd dead */ - if (ret == -EBADF) { - WARN("Remote relayd disconnected. Stopping"); - relayd_hang_up = 1; - goto write_error; - } - goto end; + if (ret < 0) { + written = ret; + relayd_hang_up = 1; + goto write_error; } + /* Use the returned socket. */ + outfd = ret; } else { /* No streaming, we have to set the len with the full padding */ len += padding; @@ -1769,6 +1750,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( stream->out_fd, &(stream->tracefile_count_current), &stream->out_fd); if (ret < 0) { + written = ret; ERR("Rotating output file"); goto end; } @@ -1780,6 +1762,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( stream->chan->tracefile_size, stream->tracefile_count_current); if (ret < 0) { + written = ret; goto end; } stream->index_fd = ret; @@ -1802,54 +1785,43 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( DBG("splice chan to pipe, ret %zd", ret_splice); if (ret_splice < 0) { ret = errno; - if (written == 0) { - written = ret_splice; - } + written = -ret; PERROR("Error in relay splice"); goto splice_error; } /* Handle stream on the relayd if the output is on the network */ - if (relayd) { - if (stream->metadata_flag) { - size_t metadata_payload_size = - sizeof(struct lttcomm_relayd_metadata_payload); + if (relayd && stream->metadata_flag) { + size_t metadata_payload_size = + sizeof(struct lttcomm_relayd_metadata_payload); - /* Update counter to fit the spliced data */ - ret_splice += metadata_payload_size; - len += metadata_payload_size; - /* - * We do this so the return value can match the len passed as - * argument to this function. - */ - written -= metadata_payload_size; - } + /* Update counter to fit the spliced data */ + ret_splice += metadata_payload_size; + len += metadata_payload_size; + /* + * We do this so the return value can match the len passed as + * argument to this function. + */ + written -= metadata_payload_size; } /* Splice data out */ ret_splice = splice(splice_pipe[0], NULL, outfd, NULL, ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE); - DBG("Consumer splice pipe to file, ret %zd", ret_splice); + DBG("Consumer splice pipe to file (out_fd: %d), ret %zd", + outfd, ret_splice); if (ret_splice < 0) { ret = errno; - if (written == 0) { - written = ret_splice; - } - /* Socket operation failed. We consider the relayd dead */ - if (errno == EBADF || errno == EPIPE || errno == ESPIPE) { - WARN("Remote relayd disconnected. Stopping"); - relayd_hang_up = 1; - goto write_error; - } - PERROR("Error in file splice"); - goto splice_error; + written = -ret; + relayd_hang_up = 1; + goto write_error; } else if (ret_splice > len) { /* * We don't expect this code path to be executed but you never know * so this is an extra protection agains a buggy splice(). */ - written += ret_splice; ret = errno; + written += ret_splice; PERROR("Wrote more data than requested %zd (len: %lu)", ret_splice, len); goto splice_error; @@ -2209,18 +2181,13 @@ void *consumer_thread_metadata_poll(void *data) DBG("Metadata main loop started"); while (1) { - health_code_update(); - - /* Only the metadata pipe is set */ - if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) { - err = 0; /* All is OK */ - goto end; - } - restart: - DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events)); + health_code_update(); health_poll_entry(); + DBG("Metadata poll wait"); ret = lttng_poll_wait(&events, -1); + DBG("Metadata poll return from wait with %d fd(s)", + LTTNG_POLL_GETNB(&events)); health_poll_exit(); DBG("Metadata event catched in thread"); if (ret < 0) { @@ -2228,7 +2195,10 @@ restart: ERR("Poll EINTR catched"); goto restart; } - goto error; + if (LTTNG_POLL_GETNB(&events) == 0) { + err = 0; /* All is OK */ + } + goto end; } nb_fd = ret; @@ -2240,6 +2210,11 @@ restart: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); + if (!revents) { + /* No activity for this FD (poll implementation). */ + continue; + } + if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) { if (revents & (LPOLLERR | LPOLLHUP )) { DBG("Metadata thread pipe hung up"); @@ -2356,7 +2331,6 @@ restart: /* All is OK */ err = 0; -error: end: DBG("Metadata poll thread exiting"); @@ -2421,16 +2395,18 @@ void *consumer_thread_data_poll(void *data) free(local_stream); local_stream = NULL; - /* allocate for all fds + 1 for the consumer_data_pipe */ - pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd)); + /* + * Allocate for all fds +1 for the consumer_data_pipe and +1 for + * wake up pipe. + */ + pollfd = zmalloc((consumer_data.stream_count + 2) * sizeof(struct pollfd)); if (pollfd == NULL) { PERROR("pollfd malloc"); pthread_mutex_unlock(&consumer_data.lock); goto end; } - /* allocate for all fds + 1 for the consumer_data_pipe */ - local_stream = zmalloc((consumer_data.stream_count + 1) * + local_stream = zmalloc((consumer_data.stream_count + 2) * sizeof(struct lttng_consumer_stream *)); if (local_stream == NULL) { PERROR("local_stream malloc"); @@ -2457,9 +2433,9 @@ void *consumer_thread_data_poll(void *data) } /* poll on the array of fds */ restart: - DBG("polling on %d fd", nb_fd + 1); + DBG("polling on %d fd", nb_fd + 2); health_poll_entry(); - num_rdy = poll(pollfd, nb_fd + 1, -1); + num_rdy = poll(pollfd, nb_fd + 2, -1); health_poll_exit(); DBG("poll num_rdy : %d", num_rdy); if (num_rdy == -1) { @@ -2508,6 +2484,20 @@ void *consumer_thread_data_poll(void *data) continue; } + /* Handle wakeup pipe. */ + if (pollfd[nb_fd + 1].revents & (POLLIN | POLLPRI)) { + char dummy; + ssize_t pipe_readlen; + + pipe_readlen = lttng_pipe_read(ctx->consumer_wakeup_pipe, &dummy, + sizeof(dummy)); + if (pipe_readlen < 0) { + PERROR("Consumer data wakeup pipe"); + } + /* We've been awakened to handle stream(s). */ + ctx->has_wakeup = 0; + } + /* Take care of high priority channels first. */ for (i = 0; i < nb_fd; i++) { health_code_update(); @@ -2546,7 +2536,8 @@ void *consumer_thread_data_poll(void *data) continue; } if ((pollfd[i].revents & POLLIN) || - local_stream[i]->hangup_flush_done) { + local_stream[i]->hangup_flush_done || + local_stream[i]->has_data) { DBG("Normal read on fd %d", pollfd[i].fd); len = ctx->on_buffer_ready(local_stream[i], ctx); /* it's ok to have an unavailable sub-buffer */ @@ -2762,18 +2753,13 @@ void *consumer_thread_channel_poll(void *data) DBG("Channel main loop started"); while (1) { - health_code_update(); - - /* Only the channel pipe is set */ - if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) { - err = 0; /* All is OK */ - goto end; - } - restart: - DBG("Channel poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events)); + health_code_update(); + DBG("Channel poll wait"); health_poll_entry(); ret = lttng_poll_wait(&events, -1); + DBG("Channel poll return from wait with %d fd(s)", + LTTNG_POLL_GETNB(&events)); health_poll_exit(); DBG("Channel event catched in thread"); if (ret < 0) { @@ -2781,6 +2767,9 @@ restart: ERR("Poll EINTR catched"); goto restart; } + if (LTTNG_POLL_GETNB(&events) == 0) { + err = 0; /* All is OK */ + } goto end; } @@ -2793,10 +2782,11 @@ restart: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); - /* Just don't waste time if no returned events for the fd */ if (!revents) { + /* No activity for this FD (poll implementation). */ continue; } + if (pollfd == ctx->consumer_channel_pipe[0]) { if (revents & (LPOLLERR | LPOLLHUP)) { DBG("Channel thread pipe hung up"); @@ -2960,8 +2950,8 @@ static int set_metadata_socket(struct lttng_consumer_local_data *ctx, assert(ctx); assert(sockpoll); - if (lttng_consumer_poll_socket(sockpoll) < 0) { - ret = -1; + ret = lttng_consumer_poll_socket(sockpoll); + if (ret) { goto error; } DBG("Metadata connection on client_socket"); @@ -3030,7 +3020,12 @@ void *consumer_thread_sessiond_poll(void *data) consumer_sockpoll[1].fd = client_socket; consumer_sockpoll[1].events = POLLIN | POLLPRI; - if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + ret = lttng_consumer_poll_socket(consumer_sockpoll); + if (ret) { + if (ret > 0) { + /* should exit */ + err = 0; + } goto end; } DBG("Connection on client_socket"); @@ -3047,7 +3042,11 @@ void *consumer_thread_sessiond_poll(void *data) * command unix socket. */ ret = set_metadata_socket(ctx, consumer_sockpoll, client_socket); - if (ret < 0) { + if (ret) { + if (ret > 0) { + /* should exit */ + err = 0; + } goto end; } @@ -3068,15 +3067,15 @@ void *consumer_thread_sessiond_poll(void *data) health_poll_entry(); ret = lttng_consumer_poll_socket(consumer_sockpoll); health_poll_exit(); - if (ret < 0) { + if (ret) { + if (ret > 0) { + /* should exit */ + err = 0; + } goto end; } DBG("Incoming command on sock"); ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll); - if (ret == -ENOENT) { - DBG("Received STOP command"); - goto end; - } if (ret <= 0) { /* * This could simply be a session daemon quitting. Don't output @@ -3294,7 +3293,9 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, } /* Poll on consumer socket. */ - if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + ret = lttng_consumer_poll_socket(consumer_sockpoll); + if (ret) { + /* Needing to exit in the middle of a command: error. */ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR); ret = -EINTR; goto error_nosignal; @@ -3548,15 +3549,6 @@ int consumer_data_pending(uint64_t id) */ ret = cds_lfht_is_node_deleted(&stream->node.node); if (!ret) { - /* - * An empty output file is not valid. We need at least one packet - * generated per stream, even if it contains no event, so it - * contains at least one packet header. - */ - if (stream->output_written == 0) { - pthread_mutex_unlock(&stream->lock); - goto data_pending; - } /* Check the stream if there is data in the buffers. */ ret = data_pending(stream); if (ret == 1) { @@ -3629,6 +3621,7 @@ int consumer_send_status_msg(int sock, int ret_code) { struct lttcomm_consumer_status_msg msg; + memset(&msg, 0, sizeof(msg)); msg.ret_code = ret_code; return lttcomm_send_unix_sock(sock, &msg, sizeof(msg)); @@ -3646,6 +3639,7 @@ int consumer_send_status_channel(int sock, assert(sock >= 0); + memset(&msg, 0, sizeof(msg)); if (!channel) { msg.ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL; } else {