X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=e56afa78c78f020e6fc8d71a1d5b8a3c1a6ec5f6;hb=ad0b0d2321fc4e798a2aaf7c17568e3afdf41834;hp=34751969dbedee0f6f35a08a057f836d32e609d8;hpb=b4a650f369fb14961249093a0763631509130f18;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index 34751969d..e56afa78c 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -150,6 +150,31 @@ error: return (int) ret; } +/* + * Cleanup the stream list of a channel. Those streams are not yet globally + * visible + */ +static void clean_channel_stream_list(struct lttng_consumer_channel *channel) +{ + struct lttng_consumer_stream *stream, *stmp; + + assert(channel); + + /* Delete streams that might have been left in the stream list. */ + cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head, + send_node) { + cds_list_del(&stream->send_node); + /* + * Once a stream is added to this list, the buffers were created so we + * have a guarantee that this call will succeed. Setting the monitor + * mode to 0 so we don't lock nor try to delete the stream from the + * global hash table. + */ + stream->monitor = 0; + consumer_stream_destroy(stream, NULL); + } +} + /* * Find a stream. The consumer_data.lock must be locked during this * call. @@ -292,23 +317,14 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) { int ret; struct lttng_ht_iter iter; - struct lttng_consumer_stream *stream, *stmp; DBG("Consumer delete channel key %" PRIu64, channel->key); pthread_mutex_lock(&consumer_data.lock); pthread_mutex_lock(&channel->lock); - /* Delete streams that might have been left in the stream list. */ - cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head, - send_node) { - cds_list_del(&stream->send_node); - /* - * Once a stream is added to this list, the buffers were created so - * we have a guarantee that this call will succeed. - */ - consumer_stream_destroy(stream, NULL); - } + /* Destroy streams that might have been left in the stream list. */ + clean_channel_stream_list(channel); if (channel->live_timer_enabled == 1) { consumer_timer_live_stop(channel); @@ -1441,7 +1457,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( { unsigned long mmap_offset; void *mmap_base; - ssize_t ret = 0, written = 0; + ssize_t ret = 0; off_t orig_offset = stream->out_fd_offset; /* Default is on the disk */ int outfd = stream->out_fd; @@ -1465,9 +1481,9 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( case LTTNG_CONSUMER_KERNEL: mmap_base = stream->mmap_base; ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset); - if (ret != 0) { + if (ret < 0) { + ret = -errno; PERROR("tracer ctl get_mmap_read_offset"); - written = -errno; goto end; } break; @@ -1476,13 +1492,13 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( mmap_base = lttng_ustctl_get_mmap_base(stream); if (!mmap_base) { ERR("read mmap get mmap base for stream %s", stream->name); - written = -EPERM; + ret = -EPERM; goto end; } ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset); if (ret != 0) { PERROR("tracer ctl get_mmap_read_offset"); - written = ret; + ret = -EINVAL; goto end; } break; @@ -1506,30 +1522,20 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( } ret = write_relayd_stream_header(stream, netlen, padding, relayd); - if (ret >= 0) { - /* Use the returned socket. */ - outfd = ret; + if (ret < 0) { + relayd_hang_up = 1; + goto write_error; + } + /* Use the returned socket. */ + outfd = ret; - /* Write metadata stream id before payload */ - if (stream->metadata_flag) { - ret = write_relayd_metadata_id(outfd, stream, relayd, padding); - if (ret < 0) { - written = ret; - /* Socket operation failed. We consider the relayd dead */ - if (ret == -EPIPE || ret == -EINVAL) { - relayd_hang_up = 1; - goto write_error; - } - goto end; - } - } - } else { - /* Socket operation failed. We consider the relayd dead */ - if (ret == -EPIPE || ret == -EINVAL) { + /* Write metadata stream id before payload */ + if (stream->metadata_flag) { + ret = write_relayd_metadata_id(outfd, stream, relayd, padding); + if (ret < 0) { relayd_hang_up = 1; goto write_error; } - /* Else, use the default set before which is the filesystem. */ } } else { /* No streaming, we have to set the len with the full padding */ @@ -1586,13 +1592,12 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( * amount written. */ if (ret < 0) { - written = -errno; - } else { - written = ret; + ret = -errno; } + relayd_hang_up = 1; /* Socket operation failed. We consider the relayd dead */ - if (errno == EPIPE || errno == EINVAL) { + if (errno == EPIPE || errno == EINVAL || errno == EBADF) { /* * This is possible if the fd is closed on the other side * (outfd) or any write problem. It can be verbose a bit for a @@ -1600,16 +1605,13 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( * abruptly. This can happen so set this to a DBG statement. */ DBG("Consumer mmap write detected relayd hang up"); - relayd_hang_up = 1; - goto write_error; + } else { + /* Unhandled error, print it and stop function right now. */ + PERROR("Error in write mmap (ret %zd != len %lu)", ret, len); } - - /* Unhandled error, print it and stop function right now. */ - PERROR("Error in write mmap (ret %zd != len %lu)", ret, len); - goto end; + goto write_error; } stream->output_written += ret; - written = ret; /* This call is useless on a socket so better save a syscall. */ if (!relayd) { @@ -1636,7 +1638,7 @@ end: } rcu_read_unlock(); - return written; + return ret; } /* @@ -1681,7 +1683,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( if (stream->net_seq_idx != (uint64_t) -1ULL) { relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd == NULL) { - ret = -EPIPE; + written = -ret; goto end; } } @@ -1699,7 +1701,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( /* Write metadata stream id before payload */ if (relayd) { - int total_len = len; + unsigned long total_len = len; if (stream->metadata_flag) { /* @@ -1712,31 +1714,21 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( padding); if (ret < 0) { written = ret; - /* Socket operation failed. We consider the relayd dead */ - if (ret == -EBADF) { - WARN("Remote relayd disconnected. Stopping"); - relayd_hang_up = 1; - goto write_error; - } - goto end; + relayd_hang_up = 1; + goto write_error; } total_len += sizeof(struct lttcomm_relayd_metadata_payload); } ret = write_relayd_stream_header(stream, total_len, padding, relayd); - if (ret >= 0) { - /* Use the returned socket. */ - outfd = ret; - } else { - /* Socket operation failed. We consider the relayd dead */ - if (ret == -EBADF) { - WARN("Remote relayd disconnected. Stopping"); - relayd_hang_up = 1; - goto write_error; - } - goto end; + if (ret < 0) { + written = ret; + relayd_hang_up = 1; + goto write_error; } + /* Use the returned socket. */ + outfd = ret; } else { /* No streaming, we have to set the len with the full padding */ len += padding; @@ -1753,6 +1745,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( stream->out_fd, &(stream->tracefile_count_current), &stream->out_fd); if (ret < 0) { + written = ret; ERR("Rotating output file"); goto end; } @@ -1764,6 +1757,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( stream->chan->tracefile_size, stream->tracefile_count_current); if (ret < 0) { + written = ret; goto end; } stream->index_fd = ret; @@ -1786,28 +1780,24 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( DBG("splice chan to pipe, ret %zd", ret_splice); if (ret_splice < 0) { ret = errno; - if (written == 0) { - written = ret_splice; - } + written = -ret; PERROR("Error in relay splice"); goto splice_error; } /* Handle stream on the relayd if the output is on the network */ - if (relayd) { - if (stream->metadata_flag) { - size_t metadata_payload_size = - sizeof(struct lttcomm_relayd_metadata_payload); + if (relayd && stream->metadata_flag) { + size_t metadata_payload_size = + sizeof(struct lttcomm_relayd_metadata_payload); - /* Update counter to fit the spliced data */ - ret_splice += metadata_payload_size; - len += metadata_payload_size; - /* - * We do this so the return value can match the len passed as - * argument to this function. - */ - written -= metadata_payload_size; - } + /* Update counter to fit the spliced data */ + ret_splice += metadata_payload_size; + len += metadata_payload_size; + /* + * We do this so the return value can match the len passed as + * argument to this function. + */ + written -= metadata_payload_size; } /* Splice data out */ @@ -1816,24 +1806,16 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( DBG("Consumer splice pipe to file, ret %zd", ret_splice); if (ret_splice < 0) { ret = errno; - if (written == 0) { - written = ret_splice; - } - /* Socket operation failed. We consider the relayd dead */ - if (errno == EBADF || errno == EPIPE || errno == ESPIPE) { - WARN("Remote relayd disconnected. Stopping"); - relayd_hang_up = 1; - goto write_error; - } - PERROR("Error in file splice"); - goto splice_error; + written = -ret; + relayd_hang_up = 1; + goto write_error; } else if (ret_splice > len) { /* * We don't expect this code path to be executed but you never know * so this is an extra protection agains a buggy splice(). */ - written += ret_splice; ret = errno; + written += ret_splice; PERROR("Wrote more data than requested %zd (len: %lu)", ret_splice, len); goto splice_error; @@ -2817,8 +2799,6 @@ restart: break; case CONSUMER_CHANNEL_DEL: { - struct lttng_consumer_stream *stream, *stmp; - /* * This command should never be called if the channel * has streams monitored by either the data or metadata @@ -2845,14 +2825,9 @@ restart: break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: - /* Delete streams that might have been left in the stream list. */ - cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head, - send_node) { - health_code_update(); - /* Remove from the list and destroy it. */ - cds_list_del(&stream->send_node); - consumer_stream_destroy(stream, NULL); - } + health_code_update(); + /* Destroy streams that might have been left in the stream list. */ + clean_channel_stream_list(chan); break; default: ERR("Unknown consumer_data type");