X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=363fa4eda6c5ba0dc75296d4c9f7d764c80b21da;hp=761ce93753667f1b93d6f151c93a68d014855e7f;hb=7735ef9e674217413a63bd4a09a93ac0958fe58a;hpb=6197aea7399cfe3bb67f8602ba4c3122867ecf52 diff --git a/src/common/consumer.c b/src/common/consumer.c index 761ce9375..363fa4eda 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -27,6 +27,7 @@ #include #include #include +#include #include #include @@ -274,12 +275,19 @@ void consumer_del_stream(struct lttng_consumer_stream *stream) uatomic_dec(&relayd->refcount); assert(uatomic_read(&relayd->refcount) >= 0); + /* Closing streams requires to lock the control socket. */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_send_close_stream(&relayd->control_sock, stream->relayd_stream_id, stream->next_net_seq_num - 1); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { - ERR("Unable to close stream on the relayd. Continuing"); - /* Continue here. There is nothing we can do for the relayd.*/ + DBG("Unable to close stream on the relayd. Continuing"); + /* + * Continue here. There is nothing we can do for the relayd. + * Chances are that the relayd has closed the socket so we just + * continue cleaning up. + */ } /* Both conditions are met, we destroy the relayd. */ @@ -434,8 +442,10 @@ end: } /* - * Add relayd socket to global consumer data hashtable. + * Add relayd socket to global consumer data hashtable. RCU read side lock MUST + * be acquired before calling this. */ + int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd) { int ret = 0; @@ -447,20 +457,15 @@ int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd) goto end; } - rcu_read_lock(); - lttng_ht_lookup(consumer_data.relayd_ht, (void *)((unsigned long) relayd->net_seq_idx), &iter); node = lttng_ht_iter_get_node_ulong(&iter); if (node != NULL) { - rcu_read_unlock(); /* Relayd already exist. Ignore the insertion */ goto end; } lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node); - rcu_read_unlock(); - end: return ret; } @@ -1087,7 +1092,7 @@ static int write_relayd_metadata_id(int fd, PERROR("write metadata stream id"); goto end; } - DBG("Metadata stream id %zu written before data", + DBG("Metadata stream id %" PRIu64 " written before data", stream->relayd_stream_id); end: @@ -1095,7 +1100,11 @@ end: } /* - * Mmap the ring buffer, read it and write the data to the tracefile. + * Mmap the ring buffer, read it and write the data to the tracefile. This is a + * core function for writing trace buffers to either the local filesystem or + * the network. + * + * Careful review MUST be put if any changes occur! * * Returns the number of bytes written */ @@ -1168,12 +1177,6 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( written = ret; goto end; } - - /* - * We do this so the return value can match the len passed as - * argument to this function. - */ - written -= sizeof(stream->relayd_stream_id); } } /* Else, use the default set before which is the filesystem. */ @@ -1190,14 +1193,14 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( } goto end; } else if (ret > len) { - PERROR("Error in file write (ret %ld > len %lu)", ret, len); + PERROR("Error in file write (ret %zd > len %lu)", ret, len); written += ret; goto end; } else { len -= ret; mmap_offset += ret; } - DBG("Consumer mmap write() ret %ld (len %lu)", ret, len); + DBG("Consumer mmap write() ret %zd (len %lu)", ret, len); /* This call is useless on a socket so better save a syscall. */ if (!relayd) { @@ -1838,3 +1841,95 @@ void lttng_consumer_init(void) consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); } + +/* + * Process the ADD_RELAYD command receive by a consumer. + * + * This will create a relayd socket pair and add it to the relayd hash table. + * The caller MUST acquire a RCU read side lock before calling it. + */ +int consumer_add_relayd_socket(int net_seq_idx, int sock_type, + struct lttng_consumer_local_data *ctx, int sock, + struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock) +{ + int fd, ret = -1; + struct consumer_relayd_sock_pair *relayd; + + DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx); + + /* Get relayd reference if exists. */ + relayd = consumer_find_relayd(net_seq_idx); + if (relayd == NULL) { + /* Not found. Allocate one. */ + relayd = consumer_allocate_relayd_sock_pair(net_seq_idx); + if (relayd == NULL) { + lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); + goto error; + } + } + + /* Poll on consumer socket. */ + if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + ret = -EINTR; + goto error; + } + + /* Get relayd socket from session daemon */ + ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); + if (ret != sizeof(fd)) { + lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + ret = -1; + goto error; + } + + /* Copy socket information and received FD */ + switch (sock_type) { + case LTTNG_STREAM_CONTROL: + /* Copy received lttcomm socket */ + lttcomm_copy_sock(&relayd->control_sock, relayd_sock); + ret = lttcomm_create_sock(&relayd->control_sock); + if (ret < 0) { + goto error; + } + + /* Close the created socket fd which is useless */ + close(relayd->control_sock.fd); + + /* Assign new file descriptor */ + relayd->control_sock.fd = fd; + break; + case LTTNG_STREAM_DATA: + /* Copy received lttcomm socket */ + lttcomm_copy_sock(&relayd->data_sock, relayd_sock); + ret = lttcomm_create_sock(&relayd->data_sock); + if (ret < 0) { + goto error; + } + + /* Close the created socket fd which is useless */ + close(relayd->data_sock.fd); + + /* Assign new file descriptor */ + relayd->data_sock.fd = fd; + break; + default: + ERR("Unknown relayd socket type (%d)", sock_type); + goto error; + } + + DBG("Consumer %s socket created successfully with net idx %d (fd: %d)", + sock_type == LTTNG_STREAM_CONTROL ? "control" : "data", + relayd->net_seq_idx, fd); + + /* + * Add relayd socket pair to consumer data hashtable. If object already + * exists or on error, the function gracefully returns. + */ + consumer_add_relayd(relayd); + + /* All good! */ + ret = 0; + +error: + return ret; +}