X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=ad4b014c6f08dc09f4e47b22e19cbd47330ec5ae;hp=3024af4a75157b92a46a67014800b9eab66c8e64;hb=c30aaa51f34105a7f20b9ceb39866001843db6e6;hpb=77c7c900d190f7fb4f99a456c767f069da7e72b8 diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 3024af4a7..ad4b014c6 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -92,6 +92,11 @@ int lttng_ustconsumer_get_produced_snapshot( return ret; } +/* + * Receive command from session daemon and process it. + * + * Return 1 on success else a negative value or 0. + */ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll) { @@ -102,7 +107,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (ret != sizeof(msg)) { DBG("Consumer received unexpected message size %zd (expects %zu)", ret, sizeof(msg)); - lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD); return ret; } if (msg.cmd_type == LTTNG_CONSUMER_STOP) { @@ -115,81 +120,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, switch (msg.cmd_type) { case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: { - int fd; - struct consumer_relayd_sock_pair *relayd; - - DBG("UST Consumer adding relayd socket"); - - /* Get relayd reference if exists. */ - relayd = consumer_find_relayd(msg.u.relayd_sock.net_index); - if (relayd == NULL) { - /* Not found. Allocate one. */ - relayd = consumer_allocate_relayd_sock_pair( - msg.u.relayd_sock.net_index); - if (relayd == NULL) { - lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); - goto end_nosignal; - } - } - - /* Poll on consumer socket. */ - if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { - rcu_read_unlock(); - return -EINTR; - } - - /* Get relayd socket from session daemon */ - ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); - if (ret != sizeof(fd)) { - lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); - goto end_nosignal; - } - - /* Copy socket information and received FD */ - switch (msg.u.relayd_sock.type) { - case LTTNG_STREAM_CONTROL: - /* Copy received lttcomm socket */ - lttcomm_copy_sock(&relayd->control_sock, &msg.u.relayd_sock.sock); - ret = lttcomm_create_sock(&relayd->control_sock); - if (ret < 0) { - goto end_nosignal; - } - - /* Close the created socket fd which is useless */ - close(relayd->control_sock.fd); - - /* Assign new file descriptor */ - relayd->control_sock.fd = fd; - break; - case LTTNG_STREAM_DATA: - /* Copy received lttcomm socket */ - lttcomm_copy_sock(&relayd->data_sock, &msg.u.relayd_sock.sock); - ret = lttcomm_create_sock(&relayd->data_sock); - if (ret < 0) { - goto end_nosignal; - } - - /* Close the created socket fd which is useless */ - close(relayd->data_sock.fd); - - /* Assign new file descriptor */ - relayd->data_sock.fd = fd; - break; - default: - ERR("Unknown relayd socket type"); - goto end_nosignal; - } - - DBG("Consumer %s socket created successfully with net idx %d (fd: %d)", - msg.u.relayd_sock.type == LTTNG_STREAM_CONTROL ? "control" : "data", - relayd->net_seq_idx, fd); - - /* - * Add relayd socket pair to consumer data hashtable. If object already - * exists or on error, the function gracefully returns. - */ - consumer_add_relayd(relayd); - + ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index, + msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll, + &msg.u.relayd_sock.sock); goto end_nosignal; } case LTTNG_CONSUMER_ADD_CHANNEL: @@ -207,7 +140,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd); if (ret != sizeof(fds)) { - lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD); rcu_read_unlock(); return ret; } @@ -217,9 +150,10 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, new_channel = consumer_allocate_channel(msg.u.channel.channel_key, fds[0], -1, msg.u.channel.mmap_len, - msg.u.channel.max_sb_size); + msg.u.channel.max_sb_size, + msg.u.channel.nb_init_streams); if (new_channel == NULL) { - lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; } if (ctx->on_recv_channel != NULL) { @@ -250,7 +184,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd); if (ret != sizeof(fds)) { - lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD); rcu_read_unlock(); return ret; } @@ -272,7 +206,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.stream.net_index, msg.u.stream.metadata_flag); if (new_stream == NULL) { - lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; } @@ -295,14 +229,29 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_nosignal; } - if (ctx->on_recv_stream != NULL) { - ret = ctx->on_recv_stream(new_stream); - if (ret == 0) { - consumer_add_stream(new_stream); - } else if (ret < 0) { - goto end_nosignal; + /* Send stream to the metadata thread */ + if (new_stream->metadata_flag) { + if (ctx->on_recv_stream) { + ret = ctx->on_recv_stream(new_stream); + if (ret < 0) { + goto end_nosignal; + } + } + + do { + ret = write(ctx->consumer_metadata_pipe[1], new_stream, + sizeof(struct lttng_consumer_stream)); + } while (ret < 0 && errno == EINTR); + if (ret < 0) { + PERROR("write metadata pipe"); } } else { + if (ctx->on_recv_stream) { + ret = ctx->on_recv_stream(new_stream); + if (ret < 0) { + goto end_nosignal; + } + } consumer_add_stream(new_stream); } @@ -313,25 +262,30 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_DESTROY_RELAYD: { + uint64_t index = msg.u.destroy_relayd.net_seq_idx; struct consumer_relayd_sock_pair *relayd; - DBG("UST consumer destroying relayd %" PRIu64, - msg.u.destroy_relayd.net_seq_idx); + DBG("UST consumer destroying relayd %" PRIu64, index); /* Get relayd reference if exists. */ - relayd = consumer_find_relayd(msg.u.destroy_relayd.net_seq_idx); + relayd = consumer_find_relayd(index); if (relayd == NULL) { - ERR("Unable to find relayd %" PRIu64, msg.u.destroy_relayd.net_seq_idx); + ERR("Unable to find relayd %" PRIu64, index); goto end_nosignal; } - /* Set destroy flag for this object */ - uatomic_set(&relayd->destroy_flag, 1); + /* + * Each relayd socket pair has a refcount of stream attached to it + * which tells if the relayd is still active or not depending on the + * refcount value. + * + * This will set the destroy flag of the relayd object and destroy it + * if the refcount reaches zero when called. + * + * The destroy can happen either here or when a stream fd hangs up. + */ + consumer_flag_relayd_for_destroy(relayd); - /* Destroy the relayd if refcount is 0 else set the destroy flag. */ - if (uatomic_read(&relayd->refcount) == 0) { - consumer_destroy_relayd(relayd); - } goto end_nosignal; } case LTTNG_CONSUMER_UPDATE_STREAM: @@ -373,7 +327,12 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } while (ret < 0 && errno == EINTR); end_nosignal: rcu_read_unlock(); - return 0; + + /* + * Return 1 to indicate success since the 0 value can be a socket + * shutdown during the recv() or send() call. + */ + return 1; } int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan) @@ -441,7 +400,7 @@ void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream) int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { - unsigned long len; + unsigned long len, subbuf_size, padding; int err; long ret = 0; struct lttng_ust_shm_handle *handle; @@ -468,7 +427,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, /* Get the next subbuffer */ err = ustctl_get_next_subbuf(handle, buf); if (err != 0) { - ret = -ret; /* ustctl_get_next_subbuf returns negative, caller expect positive. */ + ret = err; /* ustctl_get_next_subbuf returns negative, caller expect positive. */ /* * This is a debug message even for single-threaded consumer, * because poll() have more relaxed criterions than get subbuf, @@ -480,17 +439,34 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, goto end; } assert(stream->output == LTTNG_EVENT_MMAP); - /* read the used subbuffer size */ + /* Get the full padded subbuffer size */ err = ustctl_get_padded_subbuf_size(handle, buf, &len); assert(err == 0); + + /* Get subbuffer data size (without padding) */ + err = ustctl_get_subbuf_size(handle, buf, &subbuf_size); + assert(err == 0); + + /* Make sure we don't get a subbuffer size bigger than the padded */ + assert(len >= subbuf_size); + + padding = len - subbuf_size; /* write the subbuffer to the tracefile */ - ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len); - if (ret != len) { + ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding); + /* + * The mmap operation should write subbuf_size amount of data when network + * streaming or the full padding (len) size when we are _not_ streaming. + */ + if ((ret != subbuf_size && stream->net_seq_idx != -1) || + (ret != len && stream->net_seq_idx == -1)) { /* - * display the error but continue processing to try - * to release the subbuffer + * Display the error but continue processing to try to release the + * subbuffer */ - ERR("Error writing to tracefile (expected: %ld, got: %ld)", ret, len); + ERR("Error writing to tracefile " + "(ret: %zd != len: %lu != subbuf_size: %lu)", + ret, len, subbuf_size); + } err = ustctl_put_next_subbuf(handle, buf); assert(err == 0);