uint64_t metadata_id;
struct consumer_relayd_sock_pair *relayd = NULL;
+ /* RCU lock for the relayd pointer */
+ rcu_read_lock();
+
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
goto end;
}
- /* RCU lock for the relayd pointer */
- rcu_read_lock();
-
/* Handle stream on the relayd if the output is on the network */
if (relayd) {
+ unsigned long netlen = len;
+
/*
* Lock the control socket for the complete duration of the function
* since from this point on we will use the socket.
if (stream->metadata_flag) {
/* Metadata requires the control socket. */
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ netlen += sizeof(stream->relayd_stream_id);
}
- ret = consumer_handle_stream_before_relayd(stream, len);
+ ret = consumer_handle_stream_before_relayd(stream, netlen);
if (ret >= 0) {
/* Use the returned socket. */
outfd = ret;
do {
ret = write(outfd, (void *) &metadata_id,
sizeof(stream->relayd_stream_id));
- if (ret < 0) {
- PERROR("write metadata stream id");
- written = ret;
- goto end;
- }
- } while (errno == EINTR);
+ } while (ret < 0 && errno == EINTR);
+ if (ret < 0) {
+ PERROR("write metadata stream id");
+ written = ret;
+ goto end;
+ }
DBG("Metadata stream id %zu written before data",
stream->relayd_stream_id);
/*
}
while (len > 0) {
- ret = write(outfd, stream->mmap_base + mmap_offset, len);
+ do {
+ ret = write(outfd, stream->mmap_base + mmap_offset, len);
+ } while (ret < 0 && errno == EINTR);
if (ret < 0) {
- if (errno == EINTR) {
- /* restart the interrupted system call */
- continue;
- } else {
- perror("Error in file write");
- if (written == 0) {
- written = ret;
- }
- goto end;
+ perror("Error in file write");
+ if (written == 0) {
+ written = ret;
}
+ goto end;
} else if (ret > len) {
perror("Error in file write");
written += ret;
uint64_t metadata_id;
struct consumer_relayd_sock_pair *relayd = NULL;
+ /* RCU lock for the relayd pointer */
+ rcu_read_lock();
+
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
}
}
- /* RCU lock for the relayd pointer */
- rcu_read_lock();
-
/* Write metadata stream id before payload */
if (stream->metadata_flag && relayd) {
/*
*/
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ metadata_id = htobe64(stream->relayd_stream_id);
do {
- metadata_id = htobe64(stream->relayd_stream_id);
ret = write(ctx->consumer_thread_pipe[1],
(void *) &metadata_id,
sizeof(stream->relayd_stream_id));
- if (ret < 0) {
- PERROR("write metadata stream id");
- written = ret;
- goto end;
- }
- } while (errno == EINTR);
+ } while (ret < 0 && errno == EINTR);
+ if (ret < 0) {
+ PERROR("write metadata stream id");
+ written = ret;
+ goto end;
+ }
DBG("Metadata stream id %zu written before data",
stream->relayd_stream_id);
}
return -ENOENT;
}
+ /* relayd needs RCU read-side protection */
+ rcu_read_lock();
+
switch (msg.cmd_type) {
case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
{
*/
do {
ret = write(ctx->consumer_poll_pipe[1], "", 1);
- } while (ret == -1UL && errno == EINTR);
+ } while (ret < 0 && errno == EINTR);
end_nosignal:
+ rcu_read_unlock();
return 0;
}