X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=08592f6c0b8713f1ce1765a7de4d9237fd8bd81a;hp=27dfe32b8592487e6c5e881604a650085240f536;hb=ccf7af6c78ba7a206baa9d0b9578468a1af734e1;hpb=4d86847e8786d4902dceeb1dff91791112d2c396 diff --git a/src/common/consumer.c b/src/common/consumer.c index 27dfe32b8..08592f6c0 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -755,7 +755,7 @@ static int write_relayd_stream_header(struct lttng_consumer_stream *stream, * this next value, 1 should always be substracted in order to compare * the last seen sequence number on the relayd side to the last sent. */ - data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++); + data_hdr.net_seq_num = htobe64(stream->next_net_seq_num); /* Other fields are zeroed previously */ ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr, @@ -764,6 +764,8 @@ static int write_relayd_stream_header(struct lttng_consumer_stream *stream, goto error; } + ++stream->next_net_seq_num; + /* Set to go on data socket */ outfd = relayd->data_sock.fd; } @@ -2155,7 +2157,7 @@ restart: if (stream == NULL) { /* Check for deleted streams. */ validate_endpoint_status_metadata_stream(&events); - continue; + goto restart; } DBG("Adding metadata stream %d to poll set", @@ -2281,14 +2283,11 @@ void *consumer_thread_data_poll(void *data) */ pthread_mutex_lock(&consumer_data.lock); if (consumer_data.need_update) { - if (pollfd != NULL) { - free(pollfd); - pollfd = NULL; - } - if (local_stream != NULL) { - free(local_stream); - local_stream = NULL; - } + free(pollfd); + pollfd = NULL; + + free(local_stream); + local_stream = NULL; /* allocate for all fds + 1 for the consumer_data_pipe */ pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd)); @@ -2349,7 +2348,7 @@ void *consumer_thread_data_poll(void *data) * array update over low-priority reads. */ if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) { - size_t pipe_readlen; + ssize_t pipe_readlen; DBG("consumer_data_pipe wake up"); /* Consume 1 byte of pipe data */ @@ -2485,14 +2484,8 @@ void *consumer_thread_data_poll(void *data) } end: DBG("polling thread exiting"); - if (pollfd != NULL) { - free(pollfd); - pollfd = NULL; - } - if (local_stream != NULL) { - free(local_stream); - local_stream = NULL; - } + free(pollfd); + free(local_stream); /* * Close the write side of the pipe so epoll_wait() in