#include <inttypes.h>
#include <signal.h>
+#include <bin/lttng-consumerd/health-consumerd.h>
#include <common/common.h>
#include <common/utils.h>
#include <common/compat/poll.h>
#include "consumer.h"
#include "consumer-stream.h"
-#include "../bin/lttng-consumerd/health-consumerd.h"
struct lttng_consumer_global_data consumer_data = {
.stream_count = 0,
static void notify_health_quit_pipe(int *pipe)
{
- int ret;
+ ssize_t ret;
- do {
- ret = write(pipe[1], "4", 1);
- } while (ret < 0 && errno == EINTR);
- if (ret < 0 || ret != 1) {
+ ret = lttng_write(pipe[1], "4", 1);
+ if (ret < 1) {
PERROR("write consumer health quit");
}
}
enum consumer_channel_action action)
{
struct consumer_channel_msg msg;
- int ret;
+ ssize_t ret;
memset(&msg, 0, sizeof(msg));
msg.action = action;
msg.chan = chan;
msg.key = key;
- do {
- ret = write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
- } while (ret < 0 && errno == EINTR);
+ ret = lttng_write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
+ if (ret < sizeof(msg)) {
+ PERROR("notify_channel_pipe write error");
+ }
}
void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
enum consumer_channel_action *action)
{
struct consumer_channel_msg msg;
- int ret;
+ ssize_t ret;
- do {
- ret = read(ctx->consumer_channel_pipe[0], &msg, sizeof(msg));
- } while (ret < 0 && errno == EINTR);
- if (ret > 0) {
- *action = msg.action;
- *chan = msg.chan;
- *key = msg.key;
+ ret = lttng_read(ctx->consumer_channel_pipe[0], &msg, sizeof(msg));
+ if (ret < sizeof(msg)) {
+ ret = -1;
+ goto error;
}
- return ret;
+ *action = msg.action;
+ *chan = msg.chan;
+ *key = msg.key;
+error:
+ return (int) ret;
}
/*
channel->uid = uid;
channel->gid = gid;
channel->relayd_id = relayd_id;
- channel->output = output;
channel->tracefile_size = tracefile_size;
channel->tracefile_count = tracefile_count;
channel->monitor = monitor;
pthread_mutex_init(&channel->lock, NULL);
pthread_mutex_init(&channel->timer_lock, NULL);
+ switch (output) {
+ case LTTNG_EVENT_SPLICE:
+ channel->output = CONSUMER_CHANNEL_SPLICE;
+ break;
+ case LTTNG_EVENT_MMAP:
+ channel->output = CONSUMER_CHANNEL_MMAP;
+ break;
+ default:
+ assert(0);
+ free(channel);
+ channel = NULL;
+ goto end;
+ }
+
/*
* In monitor mode, the streams associated with the channel will be put in
* a special list ONLY owned by this channel. So, the refcount is set to 1
*/
void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
{
- int ret;
+ ssize_t ret;
+
consumer_quit = 1;
- do {
- ret = write(ctx->consumer_should_quit[1], "4", 1);
- } while (ret < 0 && errno == EINTR);
- if (ret < 0 || ret != 1) {
+ ret = lttng_write(ctx->consumer_should_quit[1], "4", 1);
+ if (ret < 1) {
PERROR("write consumer quit");
}
struct lttng_consumer_stream *stream,
struct consumer_relayd_sock_pair *relayd, unsigned long padding)
{
- int ret;
+ ssize_t ret;
struct lttcomm_relayd_metadata_payload hdr;
hdr.stream_id = htobe64(stream->relayd_stream_id);
hdr.padding_size = htobe32(padding);
- do {
- ret = write(fd, (void *) &hdr, sizeof(hdr));
- } while (ret < 0 && errno == EINTR);
- if (ret < 0 || ret != sizeof(hdr)) {
+ ret = lttng_write(fd, (void *) &hdr, sizeof(hdr));
+ if (ret < sizeof(hdr)) {
/*
* This error means that the fd's end is closed so ignore the perror
* not to clubber the error output since this can happen in a normal
stream->relayd_stream_id, padding);
end:
- return ret;
+ return (int) ret;
}
/*
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len,
unsigned long padding,
- struct lttng_packet_index *index)
+ struct ctf_packet_index *index)
{
unsigned long mmap_offset;
void *mmap_base;
}
while (len > 0) {
- do {
- ret = write(outfd, mmap_base + mmap_offset, len);
- } while (ret < 0 && errno == EINTR);
+ ret = lttng_write(outfd, mmap_base + mmap_offset, len);
DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
- if (ret < 0) {
+ if (ret < len) {
/*
* This is possible if the fd is closed on the other side (outfd)
* or any write problem. It can be verbose a bit for a normal
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len,
unsigned long padding,
- struct lttng_packet_index *index)
+ struct ctf_packet_index *index)
{
ssize_t ret = 0, written = 0, ret_splice = 0;
loff_t offset = 0;
pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
&stream, sizeof(stream));
- if (pipe_len < 0) {
- ERR("read metadata stream, ret: %zd", pipe_len);
+ if (pipe_len < sizeof(stream)) {
+ PERROR("read metadata stream");
/*
* Continue here to handle the rest of the streams.
*/
DBG("consumer_data_pipe wake up");
pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
&new_stream, sizeof(new_stream));
- if (pipe_readlen < 0) {
- ERR("Consumer data pipe ret %zd", pipe_readlen);
+ if (pipe_readlen < sizeof(new_stream)) {
+ PERROR("Consumer data pipe");
/* Continue so we can at least handle the current stream(s). */
continue;
}
uint64_t relayd_session_id)
{
int fd = -1, ret = -1, relayd_created = 0;
- enum lttng_error_code ret_code = LTTNG_OK;
+ enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
struct consumer_relayd_sock_pair *relayd = NULL;
assert(ctx);
}
/* First send a status message before receiving the fds. */
- ret = consumer_send_status_msg(sock, LTTNG_OK);
+ ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
if (ret < 0) {
/* Somehow, the session daemon is not responding anymore. */
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
assert(sock >= 0);
if (!channel) {
- msg.ret_code = -LTTNG_ERR_UST_CHAN_FAIL;
+ msg.ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
} else {
- msg.ret_code = LTTNG_OK;
+ msg.ret_code = LTTCOMM_CONSUMERD_SUCCESS;
msg.key = channel->key;
msg.stream_count = channel->streams.count;
}