#include <poll.h>
#include <unistd.h>
#include <sys/mman.h>
+#include <assert.h>
+#include <ltt-kconsumerd.h>
+#include <lttng-kernel-ctl.h>
+#include <lttng-sessiond-comm.h>
#include <lttng/lttng-kconsumerd.h>
-
-#include "lttngerr.h"
-#include "kernelctl.h"
-#include "ltt-kconsumerd.h"
-#include "lttng-sessiond-comm.h"
+#include <lttngerr.h>
/* the two threads (receive fd and poll) */
static pthread_t threads[2];
err = kernctl_get_next_subbuf(infd);
if (err != 0) {
ret = errno;
- perror("Reserving sub buffer failed (everything is normal, "
+ /*
+ * This is a debug message even for single-threaded consumer,
+ * because poll() have more relaxed criterions than get subbuf,
+ * so get_subbuf may fail for short race windows where poll()
+ * would issue wakeups.
+ */
+ DBG("Reserving sub buffer failed (everything is normal, "
"it is due to concurrency)");
goto end;
}
return ret;
}
+static int on_recv_fd(struct lttng_kconsumerd_fd *kconsumerd_fd)
+{
+ int ret;
+
+ /* Opening the tracefile in write mode */
+ if (kconsumerd_fd->path_name != NULL) {
+ ret = open(kconsumerd_fd->path_name,
+ O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
+ if (ret < 0) {
+ ERR("Opening %s", kconsumerd_fd->path_name);
+ perror("open");
+ goto error;
+ }
+ kconsumerd_fd->out_fd = ret;
+ }
+
+ if (kconsumerd_fd->output == LTTNG_EVENT_MMAP) {
+ /* get the len of the mmap region */
+ unsigned long mmap_len;
+
+ ret = kernctl_get_mmap_len(kconsumerd_fd->consumerd_fd, &mmap_len);
+ if (ret != 0) {
+ ret = errno;
+ perror("kernctl_get_mmap_len");
+ goto error_close_fd;
+ }
+ kconsumerd_fd->mmap_len = (size_t) mmap_len;
+
+ kconsumerd_fd->mmap_base = mmap(NULL, kconsumerd_fd->mmap_len,
+ PROT_READ, MAP_PRIVATE, kconsumerd_fd->consumerd_fd, 0);
+ if (kconsumerd_fd->mmap_base == MAP_FAILED) {
+ perror("Error mmaping");
+ ret = -1;
+ goto error_close_fd;
+ }
+ }
+
+ /* we return 0 to let the library handle the FD internally */
+ return 0;
+
+error_close_fd:
+ {
+ int err;
+
+ err = close(kconsumerd_fd->out_fd);
+ assert(!err);
+ }
+error:
+ return ret;
+}
+
/*
* main
*/
snprintf(command_sock_path, PATH_MAX,
KCONSUMERD_CMD_SOCK_PATH);
}
- /* create the pipe to wake to receiving thread when needed */
- ctx = lttng_kconsumerd_create(read_subbuffer);
+ /* create the consumer instance with and assign the callbacks */
+ ctx = lttng_kconsumerd_create(read_subbuffer, on_recv_fd, NULL);
if (ctx == NULL) {
goto error;
}