+ unsigned int consumed_old;
+ int err, ret=0;
+
+
+ err = ioctl(pair->channel, RELAYFS_GET_SUBBUF,
+ &consumed_old);
+ printf("cookie : %u\n", consumed_old);
+ if(err != 0) {
+ ret = errno;
+ perror("Reserving sub buffer failed (everything is normal)");
+ goto get_error;
+ }
+
+ err = TEMP_FAILURE_RETRY(write(pair->trace,
+ pair->mmap
+ + (consumed_old & ((pair->n_subbufs * pair->subbuf_size)-1)),
+ pair->subbuf_size));
+
+ if(err < 0) {
+ ret = errno;
+ perror("Error in writing to file");
+ goto write_error;
+ }
+
+
+write_error:
+ err = ioctl(pair->channel, RELAYFS_PUT_SUBBUF, &consumed_old);
+ if(err != 0) {
+ ret = errno;
+ if(errno == -EFAULT) {
+ perror("Error in unreserving sub buffer\n");
+ } else if(errno == -EIO) {
+ perror("Reader has been pushed by the writer, last subbuffer corrupted.");
+ /* FIXME : we may delete the last written buffer if we wish. */
+ }
+ goto get_error;
+ }
+
+get_error:
+ return ret;
+}
+
+
+
+int map_channels(struct channel_trace_fd *fd_pairs)
+{
+ int i,j;
+ int ret=0;
+
+ if(fd_pairs->num_pairs <= 0) {
+ printf("No channel to read\n");
+ goto end;
+ }
+
+ /* Get the subbuf sizes and number */
+
+ for(i=0;i<fd_pairs->num_pairs;i++) {
+ struct fd_pair *pair = &fd_pairs->pair[i];
+
+ ret = ioctl(pair->channel, RELAYFS_GET_N_SUBBUFS,
+ &pair->n_subbufs);
+ if(ret != 0) {
+ perror("Error in getting the number of subbuffers");
+ goto end;
+ }
+ ret = ioctl(pair->channel, RELAYFS_GET_SUBBUF_SIZE,
+ &pair->subbuf_size);
+ if(ret != 0) {
+ perror("Error in getting the size of the subbuffers");
+ goto end;
+ }
+ ret = pthread_mutex_init(&pair->mutex, NULL); /* Fast mutex */
+ if(ret != 0) {
+ perror("Error in mutex init");
+ goto end;
+ }
+ }
+
+ /* Mmap each FD */
+ for(i=0;i<fd_pairs->num_pairs;i++) {
+ struct fd_pair *pair = &fd_pairs->pair[i];
+
+ pair->mmap = mmap(0, pair->subbuf_size * pair->n_subbufs, PROT_READ,
+ MAP_SHARED, pair->channel, 0);
+ if(pair->mmap == MAP_FAILED) {
+ perror("Mmap error");
+ goto munmap;
+ }
+ }
+
+ goto end; /* success */
+
+ /* Error handling */
+ /* munmap only the successfully mmapped indexes */
+munmap:
+ /* Munmap each FD */
+ for(j=0;j<i;j++) {
+ struct fd_pair *pair = &fd_pairs->pair[j];
+ int err_ret;
+
+ err_ret = munmap(pair->mmap, pair->subbuf_size * pair->n_subbufs);
+ if(err_ret != 0) {
+ perror("Error in munmap");
+ }
+ ret |= err_ret;
+ }
+
+end:
+ return ret;
+
+
+}
+
+
+int unmap_channels(struct channel_trace_fd *fd_pairs)
+{
+ int j;
+ int ret=0;
+
+ /* Munmap each FD */
+ for(j=0;j<fd_pairs->num_pairs;j++) {
+ struct fd_pair *pair = &fd_pairs->pair[j];
+ int err_ret;
+
+ err_ret = munmap(pair->mmap, pair->subbuf_size * pair->n_subbufs);
+ if(err_ret != 0) {
+ perror("Error in munmap");
+ }
+ ret |= err_ret;
+ err_ret = pthread_mutex_destroy(&pair->mutex);
+ if(err_ret != 0) {
+ perror("Error in mutex destroy");
+ }
+ ret |= err_ret;
+ }
+
+ return ret;
+}
+
+
+/* read_channels
+ *
+ * Thread worker.
+ *
+ * Read the relayfs channels and write them in the paired tracefiles.
+ *
+ * @fd_pairs : paired channels and trace files.
+ *
+ * returns (void*)0 on success, (void*)-1 on error.
+ *
+ * Note that the high priority polled channels are consumed first. We then poll
+ * again to see if these channels are still in priority. Only when no
+ * high priority channel is left, we start reading low priority channels.
+ *
+ * Note that a channel is considered high priority when the buffer is almost
+ * full.
+ */
+
+void * read_channels(void *arg)
+{
+ struct pollfd *pollfd;
+ int i,j;
+ int num_rdy, num_hup;
+ int high_prio;
+ int ret = 0;
+ struct channel_trace_fd *fd_pairs = (struct channel_trace_fd *)arg;
+
+ /* Start polling the FD */
+
+ pollfd = malloc(fd_pairs->num_pairs * sizeof(struct pollfd));
+
+ /* Note : index in pollfd is the same index as fd_pair->pair */
+ for(i=0;i<fd_pairs->num_pairs;i++) {
+ pollfd[i].fd = fd_pairs->pair[i].channel;
+ pollfd[i].events = POLLIN|POLLPRI;
+ }
+
+ while(1) {
+ high_prio = 0;
+ num_hup = 0;
+#ifdef DEBUG
+ printf("Press a key for next poll...\n");
+ char buf[1];
+ read(STDIN_FILENO, &buf, 1);
+ printf("Next poll (polling %d fd) :\n", fd_pairs->num_pairs);
+#endif //DEBUG
+
+ /* Have we received a signal ? */
+ if(quit_program) break;
+
+ num_rdy = poll(pollfd, fd_pairs->num_pairs, -1);
+ if(num_rdy == -1) {
+ perror("Poll error");
+ goto free_fd;
+ }
+
+ printf("Data received\n");
+
+ for(i=0;i<fd_pairs->num_pairs;i++) {
+ switch(pollfd[i].revents) {
+ case POLLERR:
+ printf("Error returned in polling fd %d.\n", pollfd[i].fd);
+ num_hup++;
+ break;
+ case POLLHUP:
+ printf("Polling fd %d tells it has hung up.\n", pollfd[i].fd);
+ num_hup++;
+ break;
+ case POLLNVAL:
+ printf("Polling fd %d tells fd is not open.\n", pollfd[i].fd);
+ num_hup++;
+ break;
+ case POLLPRI:
+ if(pthread_mutex_trylock(&fd_pairs->pair[i].mutex) == 0) {
+ printf("Urgent read on fd %d\n", pollfd[i].fd);
+ /* Take care of high priority channels first. */
+ high_prio = 1;
+ /* it's ok to have an unavailable subbuffer */
+ ret = read_subbuffer(&fd_pairs->pair[i]);
+ if(ret == -EAGAIN) ret = 0;
+
+ ret = pthread_mutex_unlock(&fd_pairs->pair[i].mutex);
+ if(ret)
+ printf("Error in mutex unlock : %s\n", strerror(ret));
+ }
+ break;
+ }
+ }
+ /* If every FD has hung up, we end the read loop here */
+ if(num_hup == fd_pairs->num_pairs) break;
+
+ if(!high_prio) {
+ for(i=0;i<fd_pairs->num_pairs;i++) {
+ switch(pollfd[i].revents) {
+ case POLLIN:
+ if(pthread_mutex_trylock(&fd_pairs->pair[i].mutex) == 0) {
+ /* Take care of low priority channels. */
+ printf("Normal read on fd %d\n", pollfd[i].fd);
+ /* it's ok to have an unavailable subbuffer */
+ ret = read_subbuffer(&fd_pairs->pair[i]);
+ if(ret == -EAGAIN) ret = 0;
+
+ ret = pthread_mutex_unlock(&fd_pairs->pair[i].mutex);
+ if(ret)
+ printf("Error in mutex unlock : %s\n", strerror(ret));
+ }
+ break;
+ }
+ }
+ }
+
+ }
+
+free_fd:
+ free(pollfd);
+
+end:
+ return (void*)ret;