X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=be78e256f489661eda948fc8b143a25b69c23a0c;hp=242b05b3d6bb9c65939ea72a900a84fa74d382b7;hb=c869f647b0c4476645ab9ee01e362401fb8c1e42;hpb=e316aad5fbbe3782872083cb68dfdd58bccea811 diff --git a/src/common/consumer.c b/src/common/consumer.c index 242b05b3d..be78e256f 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -89,7 +89,7 @@ static struct lttng_consumer_stream *consumer_find_stream(int key, return stream; } -static void consumer_steal_stream_key(int key, struct lttng_ht *ht) +void consumer_steal_stream_key(int key, struct lttng_ht *ht) { struct lttng_consumer_stream *stream; @@ -409,6 +409,14 @@ struct lttng_consumer_stream *consumer_allocate_stream( lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd); lttng_ht_node_init_ulong(&stream->node, stream->key); + /* + * The cpu number is needed before using any ustctl_* actions. Ignored for + * the kernel so the value does not matter. + */ + pthread_mutex_lock(&consumer_data.lock); + stream->cpu = stream->chan->cpucount++; + pthread_mutex_unlock(&consumer_data.lock); + DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu," " out_fd %d, net_seq_idx %d)", stream->path_name, stream->key, stream->shm_fd, stream->wait_fd, @@ -437,28 +445,6 @@ int consumer_add_stream(struct lttng_consumer_stream *stream) pthread_mutex_lock(&consumer_data.lock); rcu_read_lock(); - switch (consumer_data.type) { - case LTTNG_CONSUMER_KERNEL: - break; - case LTTNG_CONSUMER32_UST: - case LTTNG_CONSUMER64_UST: - stream->cpu = stream->chan->cpucount++; - ret = lttng_ustconsumer_add_stream(stream); - if (ret) { - ret = -EINVAL; - goto error; - } - - /* Steal stream identifier only for UST */ - consumer_steal_stream_key(stream->key, consumer_data.stream_ht); - break; - default: - ERR("Unknown consumer_data type"); - assert(0); - ret = -ENOSYS; - goto error; - } - lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node); /* Check and cleanup relayd */ @@ -485,7 +471,6 @@ int consumer_add_stream(struct lttng_consumer_stream *stream) consumer_data.stream_count++; consumer_data.need_update = 1; -error: rcu_read_unlock(); pthread_mutex_unlock(&consumer_data.lock); @@ -1585,12 +1570,6 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, goto free_stream; } - rcu_read_lock(); - iter.iter.node = &stream->waitfd_node.node; - ret = lttng_ht_del(ht, &iter); - assert(!ret); - rcu_read_unlock(); - pthread_mutex_lock(&consumer_data.lock); switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: @@ -1611,6 +1590,12 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, goto end; } + rcu_read_lock(); + iter.iter.node = &stream->waitfd_node.node; + ret = lttng_ht_del(ht, &iter); + assert(!ret); + rcu_read_unlock(); + if (stream->out_fd >= 0) { ret = close(stream->out_fd); if (ret) { @@ -1697,27 +1682,6 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream, pthread_mutex_lock(&consumer_data.lock); - switch (consumer_data.type) { - case LTTNG_CONSUMER_KERNEL: - break; - case LTTNG_CONSUMER32_UST: - case LTTNG_CONSUMER64_UST: - ret = lttng_ustconsumer_add_stream(stream); - if (ret) { - ret = -EINVAL; - goto error; - } - - /* Steal stream identifier only for UST */ - consumer_steal_stream_key(stream->wait_fd, ht); - break; - default: - ERR("Unknown consumer_data type"); - assert(0); - ret = -ENOSYS; - goto error; - } - /* * From here, refcounts are updated so be _careful_ when returning an error * after this point. @@ -1747,7 +1711,6 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream, lttng_ht_add_unique_ulong(ht, &stream->waitfd_node); rcu_read_unlock(); -error: pthread_mutex_unlock(&consumer_data.lock); return ret; } @@ -1756,7 +1719,7 @@ error: * Thread polls on metadata file descriptor and write them on disk or on the * network. */ -void *lttng_consumer_thread_poll_metadata(void *data) +void *consumer_thread_metadata_poll(void *data) { int ret, i, pollfd; uint32_t revents, nb_fd; @@ -1888,12 +1851,15 @@ restart: lttng_ustconsumer_on_stream_hangup(stream); /* We just flushed the stream now read it. */ - len = ctx->on_buffer_ready(stream, ctx); - /* It's ok to have an unavailable sub-buffer */ - if (len < 0 && len != -EAGAIN) { - rcu_read_unlock(); - goto end; - } + do { + len = ctx->on_buffer_ready(stream, ctx); + /* + * We don't check the return value here since if we get + * a negative len, it means an error occured thus we + * simply remove it from the poll set and free the + * stream. + */ + } while (len > 0); } lttng_poll_del(&events, stream->wait_fd); @@ -1909,7 +1875,7 @@ restart: len = ctx->on_buffer_ready(stream, ctx); /* It's ok to have an unavailable sub-buffer */ - if (len < 0 && len != -EAGAIN) { + if (len < 0 && len != -EAGAIN && len != -ENODATA) { rcu_read_unlock(); goto end; } else if (len > 0) { @@ -1939,29 +1905,19 @@ end: * This thread polls the fds in the set to consume the data and write * it to tracefile if necessary. */ -void *lttng_consumer_thread_poll_fds(void *data) +void *consumer_thread_data_poll(void *data) { int num_rdy, num_hup, high_prio, ret, i; struct pollfd *pollfd = NULL; /* local view of the streams */ - struct lttng_consumer_stream **local_stream = NULL; + struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL; /* local view of consumer_data.fds_count */ int nb_fd = 0; struct lttng_consumer_local_data *ctx = data; ssize_t len; - pthread_t metadata_thread; - void *status; rcu_register_thread(); - /* Start metadata polling thread */ - ret = pthread_create(&metadata_thread, NULL, - lttng_consumer_thread_poll_metadata, (void *) ctx); - if (ret < 0) { - PERROR("pthread_create metadata thread"); - goto end; - } - local_stream = zmalloc(sizeof(struct lttng_consumer_stream)); while (1) { @@ -2042,13 +1998,35 @@ void *lttng_consumer_thread_poll_fds(void *data) */ if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) { size_t pipe_readlen; - char tmp; DBG("consumer_poll_pipe wake up"); /* Consume 1 byte of pipe data */ do { - pipe_readlen = read(ctx->consumer_poll_pipe[0], &tmp, 1); + pipe_readlen = read(ctx->consumer_poll_pipe[0], &new_stream, + sizeof(new_stream)); } while (pipe_readlen == -1 && errno == EINTR); + + /* + * If the stream is NULL, just ignore it. It's also possible that + * the sessiond poll thread changed the consumer_quit state and is + * waking us up to test it. + */ + if (new_stream == NULL) { + continue; + } + + ret = consumer_add_stream(new_stream); + if (ret) { + ERR("Consumer add stream %d failed. Continuing", + new_stream->key); + /* + * At this point, if the add_stream fails, it is not in the + * hash table thus passing the NULL value here. + */ + consumer_del_stream(new_stream, NULL); + } + + /* Continue to update the local streams and handle prio ones */ continue; } @@ -2059,7 +2037,7 @@ void *lttng_consumer_thread_poll_fds(void *data) high_prio = 1; len = ctx->on_buffer_ready(local_stream[i], ctx); /* it's ok to have an unavailable sub-buffer */ - if (len < 0 && len != -EAGAIN) { + if (len < 0 && len != -EAGAIN && len != -ENODATA) { goto end; } else if (len > 0) { local_stream[i]->data_read = 1; @@ -2082,7 +2060,7 @@ void *lttng_consumer_thread_poll_fds(void *data) DBG("Normal read on fd %d", pollfd[i].fd); len = ctx->on_buffer_ready(local_stream[i], ctx); /* it's ok to have an unavailable sub-buffer */ - if (len < 0 && len != -EAGAIN) { + if (len < 0 && len != -EAGAIN && len != -ENODATA) { goto end; } else if (len > 0) { local_stream[i]->data_read = 1; @@ -2145,19 +2123,13 @@ end: /* * Close the write side of the pipe so epoll_wait() in - * lttng_consumer_thread_poll_metadata can catch it. The thread is - * monitoring the read side of the pipe. If we close them both, epoll_wait - * strangely does not return and could create a endless wait period if the - * pipe is the only tracked fd in the poll set. The thread will take care - * of closing the read side. + * consumer_thread_metadata_poll can catch it. The thread is monitoring the + * read side of the pipe. If we close them both, epoll_wait strangely does + * not return and could create a endless wait period if the pipe is the + * only tracked fd in the poll set. The thread will take care of closing + * the read side. */ close(ctx->consumer_metadata_pipe[1]); - if (ret) { - ret = pthread_join(metadata_thread, &status); - if (ret < 0) { - PERROR("pthread_join metadata thread"); - } - } rcu_unregister_thread(); return NULL; @@ -2167,7 +2139,7 @@ end: * This thread listens on the consumerd socket and receives the file * descriptors from the session daemon. */ -void *lttng_consumer_thread_receive_fds(void *data) +void *consumer_thread_sessiond_poll(void *data) { int sock, client_socket, ret; /* @@ -2274,19 +2246,16 @@ end: consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT; /* - * Wake-up the other end by writing a null byte in the pipe - * (non-blocking). Important note: Because writing into the - * pipe is non-blocking (and therefore we allow dropping wakeup - * data, as long as there is wakeup data present in the pipe - * buffer to wake up the other end), the other end should - * perform the following sequence for waiting: - * 1) empty the pipe (reads). - * 2) perform update operation. - * 3) wait on the pipe (poll). + * Notify the data poll thread to poll back again and test the + * consumer_quit state to quit gracefully. */ do { - ret = write(ctx->consumer_poll_pipe[1], "", 1); + struct lttng_consumer_stream *null_stream = NULL; + + ret = write(ctx->consumer_poll_pipe[1], &null_stream, + sizeof(null_stream)); } while (ret < 0 && errno == EINTR); + rcu_unregister_thread(); return NULL; }