Fix: consumer relay sender RCU usage
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
1 /*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License, version 2 only,
7 * as published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19 #define _GNU_SOURCE
20 #include <assert.h>
21 #include <poll.h>
22 #include <pthread.h>
23 #include <stdlib.h>
24 #include <string.h>
25 #include <sys/mman.h>
26 #include <sys/socket.h>
27 #include <sys/stat.h>
28 #include <sys/types.h>
29 #include <unistd.h>
30 #include <lttng/ust-ctl.h>
31
32 #include <common/common.h>
33 #include <common/sessiond-comm/sessiond-comm.h>
34 #include <common/relayd/relayd.h>
35 #include <common/compat/fcntl.h>
36
37 #include "ust-consumer.h"
38
39 extern struct lttng_consumer_global_data consumer_data;
40 extern int consumer_poll_timeout;
41 extern volatile int consumer_quit;
42
43 /*
44 * Mmap the ring buffer, read it and write the data to the tracefile.
45 *
46 * Returns the number of bytes written, else negative value on error.
47 */
48 ssize_t lttng_ustconsumer_on_read_subbuffer_mmap(
49 struct lttng_consumer_local_data *ctx,
50 struct lttng_consumer_stream *stream, unsigned long len)
51 {
52 unsigned long mmap_offset;
53 long ret = 0, written = 0;
54 off_t orig_offset = stream->out_fd_offset;
55 int outfd = stream->out_fd;
56 uint64_t metadata_id;
57 struct consumer_relayd_sock_pair *relayd = NULL;
58
59 /* RCU lock for the relayd pointer */
60 rcu_read_lock();
61
62 /* Flag that the current stream if set for network streaming. */
63 if (stream->net_seq_idx != -1) {
64 relayd = consumer_find_relayd(stream->net_seq_idx);
65 if (relayd == NULL) {
66 ERR("Cannot find relay for network stream\n");
67 goto end;
68 }
69 }
70
71 /* get the offset inside the fd to mmap */
72 ret = ustctl_get_mmap_read_offset(stream->chan->handle,
73 stream->buf, &mmap_offset);
74 if (ret != 0) {
75 errno = -ret;
76 PERROR("ustctl_get_mmap_read_offset");
77 written = ret;
78 goto end;
79 }
80
81 /* Handle stream on the relayd if the output is on the network */
82 if (relayd) {
83 if (stream->metadata_flag) {
84 /* Only lock if metadata since we use the control socket. */
85 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
86 }
87
88 ret = consumer_handle_stream_before_relayd(stream, len);
89 if (ret >= 0) {
90 outfd = ret;
91
92 /* Write metadata stream id before payload */
93 if (stream->metadata_flag) {
94 metadata_id = htobe64(stream->relayd_stream_id);
95 do {
96 ret = write(outfd, (void *) &metadata_id,
97 sizeof(stream->relayd_stream_id));
98 } while (ret < 0 && errno == EINTR);
99 if (ret < 0) {
100 PERROR("write metadata stream id");
101 written = ret;
102 goto end;
103 }
104 DBG("Metadata stream id %zu written before data",
105 stream->relayd_stream_id);
106 }
107 }
108 /* Else, use the default set before which is the filesystem. */
109 }
110
111 while (len > 0) {
112 do {
113 ret = write(outfd, stream->mmap_base + mmap_offset, len);
114 } while (ret < 0 && errno == EINTR);
115 if (ret < 0) {
116 PERROR("Error in file write");
117 if (written == 0) {
118 written = ret;
119 }
120 goto end;
121 } else if (ret > len) {
122 PERROR("ret %ld > len %lu", ret, len);
123 written += ret;
124 goto end;
125 } else {
126 len -= ret;
127 mmap_offset += ret;
128 }
129 DBG("UST mmap write() ret %ld (len %lu)", ret, len);
130
131 /* This call is useless on a socket so better save a syscall. */
132 if (!relayd) {
133 /* This won't block, but will start writeout asynchronously */
134 lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
135 SYNC_FILE_RANGE_WRITE);
136 stream->out_fd_offset += ret;
137 }
138 written += ret;
139 }
140 lttng_consumer_sync_trace_file(stream, orig_offset);
141
142 end:
143 if (relayd && stream->metadata_flag) {
144 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
145 }
146 rcu_read_unlock();
147 return written;
148 }
149
150 /*
151 * Splice the data from the ring buffer to the tracefile.
152 *
153 * Returns the number of bytes spliced.
154 */
155 ssize_t lttng_ustconsumer_on_read_subbuffer_splice(
156 struct lttng_consumer_local_data *ctx,
157 struct lttng_consumer_stream *stream, unsigned long len)
158 {
159 return -ENOSYS;
160 }
161
162 /*
163 * Take a snapshot for a specific fd
164 *
165 * Returns 0 on success, < 0 on error
166 */
167 int lttng_ustconsumer_take_snapshot(struct lttng_consumer_local_data *ctx,
168 struct lttng_consumer_stream *stream)
169 {
170 int ret = 0;
171
172 ret = ustctl_snapshot(stream->chan->handle, stream->buf);
173 if (ret != 0) {
174 errno = -ret;
175 PERROR("Getting sub-buffer snapshot.");
176 }
177
178 return ret;
179 }
180
181 /*
182 * Get the produced position
183 *
184 * Returns 0 on success, < 0 on error
185 */
186 int lttng_ustconsumer_get_produced_snapshot(
187 struct lttng_consumer_local_data *ctx,
188 struct lttng_consumer_stream *stream,
189 unsigned long *pos)
190 {
191 int ret;
192
193 ret = ustctl_snapshot_get_produced(stream->chan->handle,
194 stream->buf, pos);
195 if (ret != 0) {
196 errno = -ret;
197 PERROR("kernctl_snapshot_get_produced");
198 }
199
200 return ret;
201 }
202
203 int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
204 int sock, struct pollfd *consumer_sockpoll)
205 {
206 ssize_t ret;
207 struct lttcomm_consumer_msg msg;
208
209 ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
210 if (ret != sizeof(msg)) {
211 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
212 return ret;
213 }
214 if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
215 return -ENOENT;
216 }
217
218 /* relayd need RCU read-side lock */
219 rcu_read_lock();
220
221 switch (msg.cmd_type) {
222 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
223 {
224 int fd;
225 struct consumer_relayd_sock_pair *relayd;
226
227 DBG("UST Consumer adding relayd socket");
228
229 /* Get relayd reference if exists. */
230 relayd = consumer_find_relayd(msg.u.relayd_sock.net_index);
231 if (relayd == NULL) {
232 /* Not found. Allocate one. */
233 relayd = consumer_allocate_relayd_sock_pair(
234 msg.u.relayd_sock.net_index);
235 if (relayd == NULL) {
236 lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
237 goto end_nosignal;
238 }
239 }
240
241 /* Poll on consumer socket. */
242 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
243 return -EINTR;
244 }
245
246 /* Get relayd socket from session daemon */
247 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
248 if (ret != sizeof(fd)) {
249 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
250 goto end_nosignal;
251 }
252
253 /* Copy socket information and received FD */
254 switch (msg.u.relayd_sock.type) {
255 case LTTNG_STREAM_CONTROL:
256 /* Copy received lttcomm socket */
257 lttcomm_copy_sock(&relayd->control_sock, &msg.u.relayd_sock.sock);
258 ret = lttcomm_create_sock(&relayd->control_sock);
259 if (ret < 0) {
260 goto end_nosignal;
261 }
262
263 /* Close the created socket fd which is useless */
264 close(relayd->control_sock.fd);
265
266 /* Assign new file descriptor */
267 relayd->control_sock.fd = fd;
268 break;
269 case LTTNG_STREAM_DATA:
270 /* Copy received lttcomm socket */
271 lttcomm_copy_sock(&relayd->data_sock, &msg.u.relayd_sock.sock);
272 ret = lttcomm_create_sock(&relayd->data_sock);
273 if (ret < 0) {
274 goto end_nosignal;
275 }
276
277 /* Close the created socket fd which is useless */
278 close(relayd->data_sock.fd);
279
280 /* Assign new file descriptor */
281 relayd->data_sock.fd = fd;
282 break;
283 default:
284 ERR("Unknown relayd socket type");
285 goto end_nosignal;
286 }
287
288 DBG("Consumer %s socket created successfully with net idx %d (fd: %d)",
289 msg.u.relayd_sock.type == LTTNG_STREAM_CONTROL ? "control" : "data",
290 relayd->net_seq_idx, fd);
291
292 /*
293 * Add relayd socket pair to consumer data hashtable. If object already
294 * exists or on error, the function gracefully returns.
295 */
296 consumer_add_relayd(relayd);
297
298 goto end_nosignal;
299 }
300 case LTTNG_CONSUMER_ADD_CHANNEL:
301 {
302 struct lttng_consumer_channel *new_channel;
303 int fds[1];
304 size_t nb_fd = 1;
305
306 /* block */
307 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
308 return -EINTR;
309 }
310 ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd);
311 if (ret != sizeof(fds)) {
312 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
313 return ret;
314 }
315
316 DBG("consumer_add_channel %d", msg.u.channel.channel_key);
317
318 new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
319 fds[0], -1,
320 msg.u.channel.mmap_len,
321 msg.u.channel.max_sb_size);
322 if (new_channel == NULL) {
323 lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
324 goto end_nosignal;
325 }
326 if (ctx->on_recv_channel != NULL) {
327 ret = ctx->on_recv_channel(new_channel);
328 if (ret == 0) {
329 consumer_add_channel(new_channel);
330 } else if (ret < 0) {
331 goto end_nosignal;
332 }
333 } else {
334 consumer_add_channel(new_channel);
335 }
336 goto end_nosignal;
337 }
338 case LTTNG_CONSUMER_ADD_STREAM:
339 {
340 struct lttng_consumer_stream *new_stream;
341 int fds[2];
342 size_t nb_fd = 2;
343 struct consumer_relayd_sock_pair *relayd = NULL;
344
345 /* block */
346 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
347 return -EINTR;
348 }
349 ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd);
350 if (ret != sizeof(fds)) {
351 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
352 return ret;
353 }
354
355 assert(msg.u.stream.output == LTTNG_EVENT_MMAP);
356 new_stream = consumer_allocate_stream(msg.u.channel.channel_key,
357 msg.u.stream.stream_key,
358 fds[0], fds[1],
359 msg.u.stream.state,
360 msg.u.stream.mmap_len,
361 msg.u.stream.output,
362 msg.u.stream.path_name,
363 msg.u.stream.uid,
364 msg.u.stream.gid,
365 msg.u.stream.net_index,
366 msg.u.stream.metadata_flag);
367 if (new_stream == NULL) {
368 lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
369 goto end;
370 }
371
372 /* The stream is not metadata. Get relayd reference if exists. */
373 relayd = consumer_find_relayd(msg.u.stream.net_index);
374 if (relayd != NULL) {
375 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
376 /* Add stream on the relayd */
377 ret = relayd_add_stream(&relayd->control_sock,
378 msg.u.stream.name, msg.u.stream.path_name,
379 &new_stream->relayd_stream_id);
380 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
381 if (ret < 0) {
382 goto end;
383 }
384 } else if (msg.u.stream.net_index != -1) {
385 ERR("Network sequence index %d unknown. Not adding stream.",
386 msg.u.stream.net_index);
387 free(new_stream);
388 goto end;
389 }
390
391 if (ctx->on_recv_stream != NULL) {
392 ret = ctx->on_recv_stream(new_stream);
393 if (ret == 0) {
394 consumer_add_stream(new_stream);
395 } else if (ret < 0) {
396 goto end;
397 }
398 } else {
399 consumer_add_stream(new_stream);
400 }
401
402 DBG("UST consumer_add_stream %s (%d,%d) with relayd id %lu",
403 msg.u.stream.path_name, fds[0], fds[1],
404 new_stream->relayd_stream_id);
405 break;
406 }
407 case LTTNG_CONSUMER_UPDATE_STREAM:
408 {
409 return -ENOSYS;
410 #if 0
411 if (ctx->on_update_stream != NULL) {
412 ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state);
413 if (ret == 0) {
414 consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state);
415 } else if (ret < 0) {
416 goto end;
417 }
418 } else {
419 consumer_change_stream_state(msg.u.stream.stream_key,
420 msg.u.stream.state);
421 }
422 #endif
423 break;
424 }
425 default:
426 break;
427 }
428 end:
429 /*
430 * Wake-up the other end by writing a null byte in the pipe
431 * (non-blocking). Important note: Because writing into the
432 * pipe is non-blocking (and therefore we allow dropping wakeup
433 * data, as long as there is wakeup data present in the pipe
434 * buffer to wake up the other end), the other end should
435 * perform the following sequence for waiting:
436 * 1) empty the pipe (reads).
437 * 2) perform update operation.
438 * 3) wait on the pipe (poll).
439 */
440 do {
441 ret = write(ctx->consumer_poll_pipe[1], "", 1);
442 } while (ret < 0 && errno == EINTR);
443 end_nosignal:
444 rcu_read_unlock();
445 return 0;
446 }
447
448 int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan)
449 {
450 struct lttng_ust_object_data obj;
451
452 obj.handle = -1;
453 obj.shm_fd = chan->shm_fd;
454 obj.wait_fd = chan->wait_fd;
455 obj.memory_map_size = chan->mmap_len;
456 chan->handle = ustctl_map_channel(&obj);
457 if (!chan->handle) {
458 return -ENOMEM;
459 }
460 chan->wait_fd_is_copy = 1;
461 chan->shm_fd = -1;
462
463 return 0;
464 }
465
466 void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
467 {
468 ustctl_flush_buffer(stream->chan->handle, stream->buf, 0);
469 stream->hangup_flush_done = 1;
470 }
471
472 void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
473 {
474 ustctl_unmap_channel(chan->handle);
475 }
476
477 int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream)
478 {
479 struct lttng_ust_object_data obj;
480 int ret;
481
482 obj.handle = -1;
483 obj.shm_fd = stream->shm_fd;
484 obj.wait_fd = stream->wait_fd;
485 obj.memory_map_size = stream->mmap_len;
486 ret = ustctl_add_stream(stream->chan->handle, &obj);
487 if (ret)
488 return ret;
489 stream->buf = ustctl_open_stream_read(stream->chan->handle, stream->cpu);
490 if (!stream->buf)
491 return -EBUSY;
492 /* ustctl_open_stream_read has closed the shm fd. */
493 stream->wait_fd_is_copy = 1;
494 stream->shm_fd = -1;
495
496 stream->mmap_base = ustctl_get_mmap_base(stream->chan->handle, stream->buf);
497 if (!stream->mmap_base) {
498 return -EINVAL;
499 }
500
501 return 0;
502 }
503
504 void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream)
505 {
506 ustctl_close_stream_read(stream->chan->handle, stream->buf);
507 }
508
509
510 int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
511 struct lttng_consumer_local_data *ctx)
512 {
513 unsigned long len;
514 int err;
515 long ret = 0;
516 struct lttng_ust_shm_handle *handle;
517 struct lttng_ust_lib_ring_buffer *buf;
518 char dummy;
519 ssize_t readlen;
520
521 DBG("In read_subbuffer (wait_fd: %d, stream key: %d)",
522 stream->wait_fd, stream->key);
523
524 /* We can consume the 1 byte written into the wait_fd by UST */
525 if (!stream->hangup_flush_done) {
526 do {
527 readlen = read(stream->wait_fd, &dummy, 1);
528 } while (readlen == -1 && errno == EINTR);
529 if (readlen == -1) {
530 ret = readlen;
531 goto end;
532 }
533 }
534
535 buf = stream->buf;
536 handle = stream->chan->handle;
537 /* Get the next subbuffer */
538 err = ustctl_get_next_subbuf(handle, buf);
539 if (err != 0) {
540 ret = -ret; /* ustctl_get_next_subbuf returns negative, caller expect positive. */
541 /*
542 * This is a debug message even for single-threaded consumer,
543 * because poll() have more relaxed criterions than get subbuf,
544 * so get_subbuf may fail for short race windows where poll()
545 * would issue wakeups.
546 */
547 DBG("Reserving sub buffer failed (everything is normal, "
548 "it is due to concurrency)");
549 goto end;
550 }
551 assert(stream->output == LTTNG_EVENT_MMAP);
552 /* read the used subbuffer size */
553 err = ustctl_get_padded_subbuf_size(handle, buf, &len);
554 assert(err == 0);
555 /* write the subbuffer to the tracefile */
556 ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
557 if (ret != len) {
558 /*
559 * display the error but continue processing to try
560 * to release the subbuffer
561 */
562 ERR("Error writing to tracefile");
563 }
564 err = ustctl_put_next_subbuf(handle, buf);
565 assert(err == 0);
566 end:
567 return ret;
568 }
569
570 int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
571 {
572 int ret;
573
574 /* Opening the tracefile in write mode */
575 if (stream->path_name != NULL && stream->net_seq_idx == -1) {
576 ret = run_as_open(stream->path_name,
577 O_WRONLY|O_CREAT|O_TRUNC,
578 S_IRWXU|S_IRWXG|S_IRWXO,
579 stream->uid, stream->gid);
580 if (ret < 0) {
581 ERR("Opening %s", stream->path_name);
582 PERROR("open");
583 goto error;
584 }
585 stream->out_fd = ret;
586 }
587
588 /* we return 0 to let the library handle the FD internally */
589 return 0;
590
591 error:
592 return ret;
593 }
This page took 0.040278 seconds and 4 git commands to generate.