static
int notify_thread_pipe(int wpipe)
{
- int ret;
+ ssize_t ret;
- do {
- ret = write(wpipe, "!", 1);
- } while (ret < 0 && errno == EINTR);
- if (ret < 0 || ret != 1) {
+ ret = lttng_write(wpipe, "!", 1);
+ if (ret < 1) {
PERROR("write poll pipe");
}
static void notify_health_quit_pipe(int *pipe)
{
- int ret;
+ ssize_t ret;
- do {
- ret = write(pipe[1], "4", 1);
- } while (ret < 0 && errno == EINTR);
- if (ret < 0 || ret != 1) {
+ ret = lttng_write(pipe[1], "4", 1);
+ if (ret < 1) {
PERROR("write relay health quit");
}
}
static
void *relay_thread_dispatcher(void *data)
{
- int ret, err = -1;
+ int err = -1;
+ ssize_t ret;
struct cds_wfq_node *node;
struct relay_command *relay_cmd = NULL;
* call is blocking so we can be assured that the data will be read
* at some point in time or wait to the end of the world :)
*/
- do {
- ret = write(relay_cmd_pipe[1], relay_cmd,
- sizeof(struct relay_command));
- } while (ret < 0 && errno == EINTR);
+ ret = lttng_write(relay_cmd_pipe[1], relay_cmd,
+ sizeof(struct relay_command));
free(relay_cmd);
- if (ret < 0 || ret != sizeof(struct relay_command)) {
+ if (ret < sizeof(struct relay_command)) {
PERROR("write cmd pipe");
goto error;
}
* lookup failure on the live thread side of a stream indicates
* that the viewer stream index received value should be used.
*/
+ pthread_mutex_lock(&stream->viewer_stream_rotation_lock);
vstream->total_index_received = stream->total_index_received;
+ vstream->close_write_flag = 1;
+ pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
}
/* Cleanup index of that stream. */
*/
static int write_padding_to_file(int fd, uint32_t size)
{
- int ret = 0;
+ ssize_t ret = 0;
char *zeros;
if (size == 0) {
goto end;
}
- do {
- ret = write(fd, zeros, size);
- } while (ret < 0 && errno == EINTR);
- if (ret < 0 || ret != size) {
+ ret = lttng_write(fd, zeros, size);
+ if (ret < size) {
PERROR("write padding to file");
}
struct relay_command *cmd)
{
int ret = htobe32(LTTNG_OK);
+ ssize_t size_ret;
struct relay_session *session = cmd->session;
struct lttcomm_relayd_metadata_payload *metadata_struct;
struct relay_stream *metadata_stream;
goto end_unlock;
}
- do {
- ret = write(metadata_stream->fd, metadata_struct->payload,
- payload_size);
- } while (ret < 0 && errno == EINTR);
- if (ret < 0 || ret != payload_size) {
+ size_ret = lttng_write(metadata_stream->fd, metadata_struct->payload,
+ payload_size);
+ if (size_ret < payload_size) {
ERR("Relay error writing metadata on file");
ret = -1;
goto end_unlock;
int relay_process_data(struct relay_command *cmd)
{
int ret = 0, rotate_index = 0;
+ ssize_t size_ret;
struct relay_stream *stream;
struct lttcomm_relayd_data_hdr data_hdr;
uint64_t stream_id;
if (stream->tracefile_size > 0 &&
(stream->tracefile_size_current + data_size) >
stream->tracefile_size) {
+ struct relay_viewer_stream *vstream;
+ uint64_t new_id;
+
+ new_id = (stream->tracefile_count_current + 1) %
+ stream->tracefile_count;
+ /*
+ * When we wrap-around back to 0, we start overwriting old
+ * trace data.
+ */
+ if (!stream->tracefile_overwrite && new_id == 0) {
+ stream->tracefile_overwrite = 1;
+ }
+ pthread_mutex_lock(&stream->viewer_stream_rotation_lock);
+ if (stream->tracefile_overwrite) {
+ stream->oldest_tracefile_id =
+ (stream->oldest_tracefile_id + 1) %
+ stream->tracefile_count;
+ }
+ vstream = live_find_viewer_stream_by_id(stream->stream_handle);
+ if (vstream) {
+ /*
+ * The viewer is reading a file about to be
+ * overwritten. Close the FDs it is
+ * currently using and let it handle the fault.
+ */
+ if (vstream->tracefile_count_current == new_id) {
+ pthread_mutex_lock(&vstream->overwrite_lock);
+ vstream->abort_flag = 1;
+ pthread_mutex_unlock(&vstream->overwrite_lock);
+ DBG("Streaming side setting abort_flag on stream %s_%lu\n",
+ stream->channel_name, new_id);
+ } else if (vstream->tracefile_count_current ==
+ stream->tracefile_count_current) {
+ /*
+ * The reader and writer were in the
+ * same trace file, inform the viewer
+ * that no new index will ever be added
+ * to this file.
+ */
+ vstream->close_write_flag = 1;
+ }
+ }
ret = utils_rotate_stream_file(stream->path_name, stream->channel_name,
stream->tracefile_size, stream->tracefile_count,
relayd_uid, relayd_gid, stream->fd,
&(stream->tracefile_count_current), &stream->fd);
+ stream->total_index_received = 0;
+ pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
if (ret < 0) {
ERR("Rotating stream output file");
goto end_rcu_unlock;
}
/* Write data to stream output fd. */
- do {
- ret = write(stream->fd, data_buffer, data_size);
- } while (ret < 0 && errno == EINTR);
- if (ret < 0 || ret != data_size) {
+ size_ret = lttng_write(stream->fd, data_buffer, data_size);
+ if (size_ret < data_size) {
ERR("Relay error writing data to file");
ret = -1;
goto end_rcu_unlock;
struct lttng_ht *relay_connections_ht)
{
struct relay_command *relay_connection;
- int ret;
+ ssize_t ret;
relay_connection = zmalloc(sizeof(struct relay_command));
if (relay_connection == NULL) {
PERROR("Relay command zmalloc");
goto error;
}
- do {
- ret = read(fd, relay_connection, sizeof(struct relay_command));
- } while (ret < 0 && errno == EINTR);
- if (ret < 0 || ret < sizeof(struct relay_command)) {
+ ret = lttng_read(fd, relay_connection, sizeof(struct relay_command));
+ if (ret < sizeof(struct relay_command)) {
PERROR("read relay cmd pipe");
goto error_read;
}