Cygwin: Fixup of wait pipe hangup commit
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
1 /*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License, version 2 only,
7 * as published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19 #define _GNU_SOURCE
20 #include <assert.h>
21 #include <poll.h>
22 #include <pthread.h>
23 #include <stdlib.h>
24 #include <string.h>
25 #include <sys/mman.h>
26 #include <sys/socket.h>
27 #include <sys/stat.h>
28 #include <sys/types.h>
29 #include <unistd.h>
30 #include <lttng/ust-ctl.h>
31
32 #include <common/common.h>
33 #include <common/sessiond-comm/sessiond-comm.h>
34 #include <common/compat/fcntl.h>
35
36 #include "ust-consumer.h"
37
38 extern struct lttng_consumer_global_data consumer_data;
39 extern int consumer_poll_timeout;
40 extern volatile int consumer_quit;
41
42 /*
43 * Mmap the ring buffer, read it and write the data to the tracefile.
44 *
45 * Returns the number of bytes written, else negative value on error.
46 */
47 ssize_t lttng_ustconsumer_on_read_subbuffer_mmap(
48 struct lttng_consumer_local_data *ctx,
49 struct lttng_consumer_stream *stream, unsigned long len)
50 {
51 unsigned long mmap_offset;
52 long ret = 0, written = 0;
53 off_t orig_offset = stream->out_fd_offset;
54 int outfd = stream->out_fd;
55
56 /* get the offset inside the fd to mmap */
57 ret = ustctl_get_mmap_read_offset(stream->chan->handle,
58 stream->buf, &mmap_offset);
59 if (ret != 0) {
60 errno = -ret;
61 PERROR("ustctl_get_mmap_read_offset");
62 written = ret;
63 goto end;
64 }
65 while (len > 0) {
66 ret = write(outfd, stream->mmap_base + mmap_offset, len);
67 if (ret < 0) {
68 if (errno == EINTR) {
69 /* restart the interrupted system call */
70 continue;
71 } else {
72 PERROR("Error in file write");
73 if (written == 0) {
74 written = ret;
75 }
76 goto end;
77 }
78 } else if (ret > len) {
79 PERROR("Error in file write");
80 written += ret;
81 goto end;
82 } else {
83 len -= ret;
84 mmap_offset += ret;
85 }
86 /* This won't block, but will start writeout asynchronously */
87 lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
88 SYNC_FILE_RANGE_WRITE);
89 stream->out_fd_offset += ret;
90 written += ret;
91 }
92 lttng_consumer_sync_trace_file(stream, orig_offset);
93 end:
94 return written;
95 }
96
97 /*
98 * Splice the data from the ring buffer to the tracefile.
99 *
100 * Returns the number of bytes spliced.
101 */
102 ssize_t lttng_ustconsumer_on_read_subbuffer_splice(
103 struct lttng_consumer_local_data *ctx,
104 struct lttng_consumer_stream *stream, unsigned long len)
105 {
106 return -ENOSYS;
107 }
108
109 /*
110 * Take a snapshot for a specific fd
111 *
112 * Returns 0 on success, < 0 on error
113 */
114 int lttng_ustconsumer_take_snapshot(struct lttng_consumer_local_data *ctx,
115 struct lttng_consumer_stream *stream)
116 {
117 int ret = 0;
118
119 ret = ustctl_snapshot(stream->chan->handle, stream->buf);
120 if (ret != 0) {
121 errno = -ret;
122 PERROR("Getting sub-buffer snapshot.");
123 }
124
125 return ret;
126 }
127
128 /*
129 * Get the produced position
130 *
131 * Returns 0 on success, < 0 on error
132 */
133 int lttng_ustconsumer_get_produced_snapshot(
134 struct lttng_consumer_local_data *ctx,
135 struct lttng_consumer_stream *stream,
136 unsigned long *pos)
137 {
138 int ret;
139
140 ret = ustctl_snapshot_get_produced(stream->chan->handle,
141 stream->buf, pos);
142 if (ret != 0) {
143 errno = -ret;
144 PERROR("kernctl_snapshot_get_produced");
145 }
146
147 return ret;
148 }
149
150 int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
151 int sock, struct pollfd *consumer_sockpoll)
152 {
153 ssize_t ret;
154 struct lttcomm_consumer_msg msg;
155
156 ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
157 if (ret != sizeof(msg)) {
158 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
159 return ret;
160 }
161 if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
162 return -ENOENT;
163 }
164
165 switch (msg.cmd_type) {
166 case LTTNG_CONSUMER_ADD_CHANNEL:
167 {
168 struct lttng_consumer_channel *new_channel;
169 int fds[1];
170 char *path;
171
172 /* block */
173 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
174 return -EINTR;
175 }
176
177 path = lttcomm_recv_string(sock);
178
179 if (!path) {
180 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
181 return -1;
182 }
183
184 DBG("consumer_add_channel received path %s", path);
185
186 fds[0] = open(path, O_RDWR);
187
188 if (fds[0] < 0) {
189 DBG("consumer_add_channel open error on path %s", path);
190 free(path);
191 return -1;
192 }
193
194 if (fcntl(fds[0], F_SETFD, FD_CLOEXEC) < 0) {
195 DBG("consumer_add_channel fcntl error");
196 free(path);
197 return -1;
198 }
199
200 free(path);
201
202 DBG("consumer_add_channel %d", msg.u.channel.channel_key);
203
204 new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
205 fds[0], -1,
206 msg.u.channel.mmap_len,
207 msg.u.channel.max_sb_size);
208 if (new_channel == NULL) {
209 lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
210 goto end_nosignal;
211 }
212 if (ctx->on_recv_channel != NULL) {
213 ret = ctx->on_recv_channel(new_channel);
214 if (ret == 0) {
215 consumer_add_channel(new_channel);
216 } else if (ret < 0) {
217 goto end_nosignal;
218 }
219 } else {
220 consumer_add_channel(new_channel);
221 }
222 goto end_nosignal;
223 }
224 case LTTNG_CONSUMER_ADD_STREAM:
225 {
226 struct lttng_consumer_stream *new_stream;
227 int fds[2], i;
228 char *shm_path, *wait_pipe_path;
229
230 /* block */
231 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
232 return -EINTR;
233 }
234
235 shm_path = lttcomm_recv_string(sock);
236
237 if (!shm_path) {
238 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
239 return -1;
240 }
241
242 wait_pipe_path = lttcomm_recv_string(sock);
243
244 if (!wait_pipe_path) {
245 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
246 free(shm_path);
247 return -1;
248 }
249
250 DBG("consumer_add_stream received path %s", shm_path);
251 DBG("consumer_add_stream received path %s", wait_pipe_path);
252
253 fds[0] = open(shm_path, O_RDWR);
254
255 if (fds[0] < 0) {
256 DBG("consumer_add_stream open error on path %s", shm_path);
257 free(shm_path);
258 free(wait_pipe_path);
259 return -1;
260 }
261
262 fds[1] = open(wait_pipe_path, O_RDONLY);
263
264 if (fds[1] < 0) {
265 DBG("consumer_add_stream open error on path %s", wait_pipe_path);
266 PERROR("open");
267 free(shm_path);
268 free(wait_pipe_path);
269 return -1;
270 }
271
272 free(shm_path);
273 free(wait_pipe_path);
274
275 for (i = 0; i < 2; ++i) {
276 if (fcntl(fds[i], F_SETFD, FD_CLOEXEC) < 0) {
277 DBG("consumer_add_stream fcntl error");
278 return -1;
279 }
280 }
281
282 DBG("consumer_add_stream %s (%d,%d)", msg.u.stream.path_name,
283 fds[0], fds[1]);
284 assert(msg.u.stream.output == LTTNG_EVENT_MMAP);
285 new_stream = consumer_allocate_stream(msg.u.channel.channel_key,
286 msg.u.stream.stream_key,
287 fds[0], fds[1],
288 msg.u.stream.state,
289 msg.u.stream.mmap_len,
290 msg.u.stream.output,
291 msg.u.stream.path_name,
292 msg.u.stream.uid,
293 msg.u.stream.gid);
294 if (new_stream == NULL) {
295 lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
296 goto end;
297 }
298 if (ctx->on_recv_stream != NULL) {
299 ret = ctx->on_recv_stream(new_stream);
300 if (ret == 0) {
301 consumer_add_stream(new_stream);
302 } else if (ret < 0) {
303 goto end;
304 }
305 } else {
306 consumer_add_stream(new_stream);
307 }
308 break;
309 }
310 case LTTNG_CONSUMER_UPDATE_STREAM:
311 {
312 return -ENOSYS;
313 #if 0
314 if (ctx->on_update_stream != NULL) {
315 ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state);
316 if (ret == 0) {
317 consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state);
318 } else if (ret < 0) {
319 goto end;
320 }
321 } else {
322 consumer_change_stream_state(msg.u.stream.stream_key,
323 msg.u.stream.state);
324 }
325 #endif
326 break;
327 }
328 default:
329 break;
330 }
331 end:
332 /*
333 * Wake-up the other end by writing a null byte in the pipe
334 * (non-blocking). Important note: Because writing into the
335 * pipe is non-blocking (and therefore we allow dropping wakeup
336 * data, as long as there is wakeup data present in the pipe
337 * buffer to wake up the other end), the other end should
338 * perform the following sequence for waiting:
339 * 1) empty the pipe (reads).
340 * 2) perform update operation.
341 * 3) wait on the pipe (poll).
342 */
343 do {
344 ret = write(ctx->consumer_poll_pipe[1], "", 1);
345 } while (ret == -1UL && errno == EINTR);
346 end_nosignal:
347
348 /*
349 * Return 1 to indicate success since the 0 value can be a socket shutdown
350 * during the recv() or send() call.
351 */
352 return 1;
353 }
354
355 int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan)
356 {
357 struct lttng_ust_object_data obj;
358
359 obj.handle = -1;
360 obj.shm_fd = chan->shm_fd;
361 obj.wait_fd = chan->wait_fd;
362 obj.memory_map_size = chan->mmap_len;
363 chan->handle = ustctl_map_channel(&obj);
364 if (!chan->handle) {
365 return -ENOMEM;
366 }
367 chan->wait_fd_is_copy = 1;
368 chan->shm_fd = -1;
369
370 return 0;
371 }
372
373 void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
374 {
375 ustctl_flush_buffer(stream->chan->handle, stream->buf, 0);
376 stream->hangup_flush_done = 1;
377 }
378
379 void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
380 {
381 ustctl_unmap_channel(chan->handle);
382 }
383
384 int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream)
385 {
386 struct lttng_ust_object_data obj;
387 int ret;
388
389 obj.handle = -1;
390 obj.shm_fd = stream->shm_fd;
391 obj.wait_fd = stream->wait_fd;
392 obj.memory_map_size = stream->mmap_len;
393 ret = ustctl_add_stream(stream->chan->handle, &obj);
394 if (ret)
395 return ret;
396 stream->buf = ustctl_open_stream_read(stream->chan->handle, stream->cpu);
397 if (!stream->buf)
398 return -EBUSY;
399 /* ustctl_open_stream_read has closed the shm fd. */
400 stream->wait_fd_is_copy = 1;
401 stream->shm_fd = -1;
402
403 stream->mmap_base = ustctl_get_mmap_base(stream->chan->handle, stream->buf);
404 if (!stream->mmap_base) {
405 return -EINVAL;
406 }
407
408 return 0;
409 }
410
411 void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream)
412 {
413 ustctl_close_stream_read(stream->chan->handle, stream->buf);
414 }
415
416
417 int lttng_ustconsumer_check_pipe(struct lttng_consumer_stream *stream,
418 struct lttng_consumer_local_data *ctx)
419 {
420 ssize_t readlen;
421 char dummy;
422
423 DBG("In check_pipe (wait_fd: %d, stream key: %d)\n",
424 stream->wait_fd, stream->key);
425
426 /* We consume the 1 byte written into the wait_fd by UST */
427
428 do {
429 readlen = read(stream->wait_fd, &dummy, 1);
430 } while (readlen == -1 && errno == EINTR);
431 if (readlen == -1) {
432 return -1; /* error */
433 }
434
435 DBG("Read %zu byte from pipe: %c\n", readlen, readlen ? dummy : '\0');
436
437 if (readlen == 0)
438 return 1; /* POLLHUP */
439
440 return 0; /* no error nor HUP */
441
442 }
443
444 int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
445 struct lttng_consumer_local_data *ctx)
446 {
447 unsigned long len;
448 int err;
449 long ret = 0;
450 struct lttng_ust_shm_handle *handle;
451 struct lttng_ust_lib_ring_buffer *buf;
452 char dummy;
453 ssize_t readlen;
454
455 DBG("In read_subbuffer (wait_fd: %d, stream key: %d)",
456 stream->wait_fd, stream->key);
457
458 /* We can consume the 1 byte written into the wait_fd by UST */
459 if (!stream->hangup_flush_done) {
460 do {
461 readlen = read(stream->wait_fd, &dummy, 1);
462 } while (readlen == -1 && errno == EINTR);
463 if (readlen == -1) {
464 ret = readlen;
465 goto end;
466 }
467 DBG("Read %zu byte from pipe: %c\n", readlen, dummy);
468 }
469
470 buf = stream->buf;
471 handle = stream->chan->handle;
472 /* Get the next subbuffer */
473 err = ustctl_get_next_subbuf(handle, buf);
474 if (err != 0) {
475 ret = -ret; /* ustctl_get_next_subbuf returns negative, caller expect positive. */
476 /*
477 * This is a debug message even for single-threaded consumer,
478 * because poll() have more relaxed criterions than get subbuf,
479 * so get_subbuf may fail for short race windows where poll()
480 * would issue wakeups.
481 */
482 DBG("Reserving sub buffer failed (everything is normal, "
483 "it is due to concurrency)");
484 goto end;
485 }
486 assert(stream->output == LTTNG_EVENT_MMAP);
487 /* read the used subbuffer size */
488 err = ustctl_get_padded_subbuf_size(handle, buf, &len);
489 assert(err == 0);
490 /* write the subbuffer to the tracefile */
491 ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
492 if (ret != len) {
493 /*
494 * display the error but continue processing to try
495 * to release the subbuffer
496 */
497 ERR("Error writing to tracefile");
498 }
499 err = ustctl_put_next_subbuf(handle, buf);
500 assert(err == 0);
501 end:
502 return ret;
503 }
504
505 int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
506 {
507 int ret;
508
509 /* Opening the tracefile in write mode */
510 if (stream->path_name != NULL) {
511 ret = run_as_open(stream->path_name,
512 O_WRONLY|O_CREAT|O_TRUNC,
513 S_IRWXU|S_IRWXG|S_IRWXO,
514 stream->uid, stream->gid);
515 if (ret < 0) {
516 ERR("Opening %s", stream->path_name);
517 PERROR("open");
518 goto error;
519 }
520 stream->out_fd = ret;
521 }
522
523 /* we return 0 to let the library handle the FD internally */
524 return 0;
525
526 error:
527 return ret;
528 }
This page took 0.040475 seconds and 4 git commands to generate.