Divide read subbuffer consumer function
authorDavid Goulet <dgoulet@efficios.com>
Tue, 31 Jul 2012 16:35:39 +0000 (12:35 -0400)
committerDavid Goulet <dgoulet@efficios.com>
Tue, 31 Jul 2012 16:35:58 +0000 (12:35 -0400)
Signed-off-by: David Goulet <dgoulet@efficios.com>
src/common/consumer.c

index 1863cddc5757e6e1788642dd788629303a890881..761ce93753667f1b93d6f151c93a68d014855e7f 100644 (file)
@@ -529,28 +529,19 @@ error:
  *
  * Return destination file descriptor or negative value on error.
  */
-int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
-               size_t data_size)
+static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
+               size_t data_size, struct consumer_relayd_sock_pair *relayd)
 {
        int outfd = -1, ret;
-       struct consumer_relayd_sock_pair *relayd;
        struct lttcomm_relayd_data_hdr data_hdr;
 
        /* Safety net */
        assert(stream);
+       assert(relayd);
 
        /* Reset data header */
        memset(&data_hdr, 0, sizeof(data_hdr));
 
-       rcu_read_lock();
-       /* Get relayd reference of the stream. */
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd == NULL) {
-               /* Stream is either local or corrupted */
-               goto error;
-       }
-
-       DBG("Consumer found relayd socks with index %d", stream->net_seq_idx);
        if (stream->metadata_flag) {
                /* Caller MUST acquire the relayd control socket lock */
                ret = relayd_send_metadata(&relayd->control_sock, data_size);
@@ -578,7 +569,6 @@ int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
        }
 
 error:
-       rcu_read_unlock();
        return outfd;
 }
 
@@ -1078,6 +1068,32 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
        free(ctx);
 }
 
+/*
+ * Write the metadata stream id on the specified file descriptor.
+ */
+static int write_relayd_metadata_id(int fd,
+               struct lttng_consumer_stream *stream,
+               struct consumer_relayd_sock_pair *relayd)
+{
+       int ret;
+       uint64_t metadata_id;
+
+       metadata_id = htobe64(stream->relayd_stream_id);
+       do {
+               ret = write(fd, (void *) &metadata_id,
+                               sizeof(stream->relayd_stream_id));
+       } while (ret < 0 && errno == EINTR);
+       if (ret < 0) {
+               PERROR("write metadata stream id");
+               goto end;
+       }
+       DBG("Metadata stream id %zu written before data",
+                       stream->relayd_stream_id);
+
+end:
+       return ret;
+}
+
 /*
  * Mmap the ring buffer, read it and write the data to the tracefile.
  *
@@ -1092,7 +1108,6 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        off_t orig_offset = stream->out_fd_offset;
        /* Default is on the disk */
        int outfd = stream->out_fd;
-       uint64_t metadata_id;
        struct consumer_relayd_sock_pair *relayd = NULL;
 
        /* RCU lock for the relayd pointer */
@@ -1141,25 +1156,19 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                        netlen += sizeof(stream->relayd_stream_id);
                }
 
-               ret = consumer_handle_stream_before_relayd(stream, netlen);
+               ret = write_relayd_stream_header(stream, netlen, relayd);
                if (ret >= 0) {
                        /* Use the returned socket. */
                        outfd = ret;
 
                        /* Write metadata stream id before payload */
                        if (stream->metadata_flag) {
-                               metadata_id = htobe64(stream->relayd_stream_id);
-                               do {
-                                       ret = write(outfd, (void *) &metadata_id,
-                                                       sizeof(stream->relayd_stream_id));
-                               } while (ret < 0 && errno == EINTR);
+                               ret = write_relayd_metadata_id(outfd, stream, relayd);
                                if (ret < 0) {
-                                       PERROR("write metadata stream id");
                                        written = ret;
                                        goto end;
                                }
-                               DBG("Metadata stream id %zu written before data",
-                                               stream->relayd_stream_id);
+
                                /*
                                 * We do this so the return value can match the len passed as
                                 * argument to this function.
@@ -1226,7 +1235,6 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
        int fd = stream->wait_fd;
        /* Default is on the disk */
        int outfd = stream->out_fd;
-       uint64_t metadata_id;
        struct consumer_relayd_sock_pair *relayd = NULL;
 
        switch (consumer_data.type) {
@@ -1260,18 +1268,12 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                 */
                pthread_mutex_lock(&relayd->ctrl_sock_mutex);
 
-               metadata_id = htobe64(stream->relayd_stream_id);
-               do {
-                       ret = write(ctx->consumer_thread_pipe[1], (void *) &metadata_id,
-                                       sizeof(stream->relayd_stream_id));
-               } while (ret < 0 && errno == EINTR);
+               ret = write_relayd_metadata_id(ctx->consumer_thread_pipe[1],
+                               stream, relayd);
                if (ret < 0) {
-                       PERROR("write metadata stream id");
                        written = ret;
                        goto end;
                }
-               DBG("Metadata stream id %zu written before data",
-                               stream->relayd_stream_id);
        }
 
        while (len > 0) {
@@ -1302,15 +1304,13 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                                written -= sizeof(stream->relayd_stream_id);
                        }
 
-                       ret = consumer_handle_stream_before_relayd(stream, ret_splice);
+                       ret = write_relayd_stream_header(stream, ret_splice, relayd);
                        if (ret >= 0) {
                                /* Use the returned socket. */
                                outfd = ret;
                        } else {
-                               if (outfd == -1) {
-                                       ERR("Remote relayd disconnected. Stopping");
-                                       goto end;
-                               }
+                               ERR("Remote relayd disconnected. Stopping");
+                               goto end;
                        }
                }
 
This page took 0.028696 seconds and 4 git commands to generate.