X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=47463c6501eddb48c4963d99c15b3d3c510c035c;hp=a85dfe0b047089be39cdd0a1d86c99258814f66f;hb=4cbc1a04e8ac3c1dd4f9a4dc44b56ee8430189f0;hpb=77c7c900d190f7fb4f99a456c767f069da7e72b8 diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index a85dfe0b0..47463c650 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -92,7 +92,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); if (ret != sizeof(msg)) { - lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_CMD); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); return ret; } if (msg.cmd_type == LTTNG_CONSUMER_STOP) { @@ -105,82 +105,9 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, switch (msg.cmd_type) { case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: { - int fd; - struct consumer_relayd_sock_pair *relayd; - - DBG("Consumer adding relayd socket"); - - /* Get relayd reference if exists. */ - relayd = consumer_find_relayd(msg.u.relayd_sock.net_index); - if (relayd == NULL) { - /* Not found. Allocate one. */ - relayd = consumer_allocate_relayd_sock_pair( - msg.u.relayd_sock.net_index); - if (relayd == NULL) { - lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); - goto end_nosignal; - } - } - - /* Poll on consumer socket. */ - if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { - rcu_read_unlock(); - return -EINTR; - } - - /* Get relayd socket from session daemon */ - ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); - if (ret != sizeof(fd)) { - lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); - goto end_nosignal; - } - - /* Copy socket information and received FD */ - switch (msg.u.relayd_sock.type) { - case LTTNG_STREAM_CONTROL: - /* Copy received lttcomm socket */ - lttcomm_copy_sock(&relayd->control_sock, &msg.u.relayd_sock.sock); - - ret = lttcomm_create_sock(&relayd->control_sock); - if (ret < 0) { - goto end_nosignal; - } - - /* Close the created socket fd which is useless */ - close(relayd->control_sock.fd); - - /* Assign new file descriptor */ - relayd->control_sock.fd = fd; - break; - case LTTNG_STREAM_DATA: - /* Copy received lttcomm socket */ - lttcomm_copy_sock(&relayd->data_sock, &msg.u.relayd_sock.sock); - ret = lttcomm_create_sock(&relayd->data_sock); - if (ret < 0) { - goto end_nosignal; - } - - /* Close the created socket fd which is useless */ - close(relayd->data_sock.fd); - - /* Assign new file descriptor */ - relayd->data_sock.fd = fd; - break; - default: - ERR("Unknown relayd socket type"); - goto end_nosignal; - } - - DBG("Consumer %s socket created successfully with net idx %d (fd: %d)", - msg.u.relayd_sock.type == LTTNG_STREAM_CONTROL ? "control" : "data", - relayd->net_seq_idx, fd); - - /* - * Add relayd socket pair to consumer data hashtable. If object already - * exists or on error, the function gracefully returns. - */ - consumer_add_relayd(relayd); - + ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index, + msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll, + &msg.u.relayd_sock.sock); goto end_nosignal; } case LTTNG_CONSUMER_ADD_CHANNEL: @@ -193,7 +120,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.channel.mmap_len, msg.u.channel.max_sb_size); if (new_channel == NULL) { - lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; } if (ctx->on_recv_channel != NULL) { @@ -223,7 +150,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Get stream file descriptor from socket */ ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); if (ret != sizeof(fd)) { - lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD); rcu_read_unlock(); return ret; } @@ -240,7 +167,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.stream.net_index, msg.u.stream.metadata_flag); if (new_stream == NULL) { - lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; } @@ -284,26 +211,30 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_DESTROY_RELAYD: { + uint64_t index = msg.u.destroy_relayd.net_seq_idx; struct consumer_relayd_sock_pair *relayd; - DBG("Kernel consumer destroying relayd %" PRIu64, - msg.u.destroy_relayd.net_seq_idx); + DBG("Kernel consumer destroying relayd %" PRIu64, index); /* Get relayd reference if exists. */ - relayd = consumer_find_relayd(msg.u.destroy_relayd.net_seq_idx); + relayd = consumer_find_relayd(index); if (relayd == NULL) { - ERR("Unable to find relayd %" PRIu64, - msg.u.destroy_relayd.net_seq_idx); + ERR("Unable to find relayd %" PRIu64, index); goto end_nosignal; } - /* Set destroy flag for this object */ - uatomic_set(&relayd->destroy_flag, 1); + /* + * Each relayd socket pair has a refcount of stream attached to it + * which tells if the relayd is still active or not depending on the + * refcount value. + * + * This will set the destroy flag of the relayd object and destroy it + * if the refcount reaches zero when called. + * + * The destroy can happen either here or when a stream fd hangs up. + */ + consumer_flag_relayd_for_destroy(relayd); - /* Destroy the relayd if refcount is 0 else set the destroy flag. */ - if (uatomic_read(&relayd->refcount) == 0) { - consumer_destroy_relayd(relayd); - } goto end_nosignal; } default: @@ -326,7 +257,12 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } while (ret < 0 && errno == EINTR); end_nosignal: rcu_read_unlock(); - return 0; + + /* + * Return 1 to indicate success since the 0 value can be a socket + * shutdown during the recv() or send() call. + */ + return 1; } /*