a85dfe0b047089be39cdd0a1d86c99258814f66f
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
1 /*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License, version 2 only,
7 * as published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19 #define _GNU_SOURCE
20 #include <assert.h>
21 #include <poll.h>
22 #include <pthread.h>
23 #include <stdlib.h>
24 #include <string.h>
25 #include <sys/mman.h>
26 #include <sys/socket.h>
27 #include <sys/types.h>
28 #include <inttypes.h>
29 #include <unistd.h>
30 #include <sys/stat.h>
31
32 #include <common/common.h>
33 #include <common/kernel-ctl/kernel-ctl.h>
34 #include <common/sessiond-comm/sessiond-comm.h>
35 #include <common/sessiond-comm/relayd.h>
36 #include <common/compat/fcntl.h>
37 #include <common/relayd/relayd.h>
38
39 #include "kernel-consumer.h"
40
41 extern struct lttng_consumer_global_data consumer_data;
42 extern int consumer_poll_timeout;
43 extern volatile int consumer_quit;
44
45 /*
46 * Take a snapshot for a specific fd
47 *
48 * Returns 0 on success, < 0 on error
49 */
50 int lttng_kconsumer_take_snapshot(struct lttng_consumer_local_data *ctx,
51 struct lttng_consumer_stream *stream)
52 {
53 int ret = 0;
54 int infd = stream->wait_fd;
55
56 ret = kernctl_snapshot(infd);
57 if (ret != 0) {
58 errno = -ret;
59 perror("Getting sub-buffer snapshot.");
60 }
61
62 return ret;
63 }
64
65 /*
66 * Get the produced position
67 *
68 * Returns 0 on success, < 0 on error
69 */
70 int lttng_kconsumer_get_produced_snapshot(
71 struct lttng_consumer_local_data *ctx,
72 struct lttng_consumer_stream *stream,
73 unsigned long *pos)
74 {
75 int ret;
76 int infd = stream->wait_fd;
77
78 ret = kernctl_snapshot_get_produced(infd, pos);
79 if (ret != 0) {
80 errno = -ret;
81 perror("kernctl_snapshot_get_produced");
82 }
83
84 return ret;
85 }
86
87 int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
88 int sock, struct pollfd *consumer_sockpoll)
89 {
90 ssize_t ret;
91 struct lttcomm_consumer_msg msg;
92
93 ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
94 if (ret != sizeof(msg)) {
95 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_CMD);
96 return ret;
97 }
98 if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
99 return -ENOENT;
100 }
101
102 /* relayd needs RCU read-side protection */
103 rcu_read_lock();
104
105 switch (msg.cmd_type) {
106 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
107 {
108 int fd;
109 struct consumer_relayd_sock_pair *relayd;
110
111 DBG("Consumer adding relayd socket");
112
113 /* Get relayd reference if exists. */
114 relayd = consumer_find_relayd(msg.u.relayd_sock.net_index);
115 if (relayd == NULL) {
116 /* Not found. Allocate one. */
117 relayd = consumer_allocate_relayd_sock_pair(
118 msg.u.relayd_sock.net_index);
119 if (relayd == NULL) {
120 lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
121 goto end_nosignal;
122 }
123 }
124
125 /* Poll on consumer socket. */
126 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
127 rcu_read_unlock();
128 return -EINTR;
129 }
130
131 /* Get relayd socket from session daemon */
132 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
133 if (ret != sizeof(fd)) {
134 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
135 goto end_nosignal;
136 }
137
138 /* Copy socket information and received FD */
139 switch (msg.u.relayd_sock.type) {
140 case LTTNG_STREAM_CONTROL:
141 /* Copy received lttcomm socket */
142 lttcomm_copy_sock(&relayd->control_sock, &msg.u.relayd_sock.sock);
143
144 ret = lttcomm_create_sock(&relayd->control_sock);
145 if (ret < 0) {
146 goto end_nosignal;
147 }
148
149 /* Close the created socket fd which is useless */
150 close(relayd->control_sock.fd);
151
152 /* Assign new file descriptor */
153 relayd->control_sock.fd = fd;
154 break;
155 case LTTNG_STREAM_DATA:
156 /* Copy received lttcomm socket */
157 lttcomm_copy_sock(&relayd->data_sock, &msg.u.relayd_sock.sock);
158 ret = lttcomm_create_sock(&relayd->data_sock);
159 if (ret < 0) {
160 goto end_nosignal;
161 }
162
163 /* Close the created socket fd which is useless */
164 close(relayd->data_sock.fd);
165
166 /* Assign new file descriptor */
167 relayd->data_sock.fd = fd;
168 break;
169 default:
170 ERR("Unknown relayd socket type");
171 goto end_nosignal;
172 }
173
174 DBG("Consumer %s socket created successfully with net idx %d (fd: %d)",
175 msg.u.relayd_sock.type == LTTNG_STREAM_CONTROL ? "control" : "data",
176 relayd->net_seq_idx, fd);
177
178 /*
179 * Add relayd socket pair to consumer data hashtable. If object already
180 * exists or on error, the function gracefully returns.
181 */
182 consumer_add_relayd(relayd);
183
184 goto end_nosignal;
185 }
186 case LTTNG_CONSUMER_ADD_CHANNEL:
187 {
188 struct lttng_consumer_channel *new_channel;
189
190 DBG("consumer_add_channel %d", msg.u.channel.channel_key);
191 new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
192 -1, -1,
193 msg.u.channel.mmap_len,
194 msg.u.channel.max_sb_size);
195 if (new_channel == NULL) {
196 lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
197 goto end_nosignal;
198 }
199 if (ctx->on_recv_channel != NULL) {
200 ret = ctx->on_recv_channel(new_channel);
201 if (ret == 0) {
202 consumer_add_channel(new_channel);
203 } else if (ret < 0) {
204 goto end_nosignal;
205 }
206 } else {
207 consumer_add_channel(new_channel);
208 }
209 goto end_nosignal;
210 }
211 case LTTNG_CONSUMER_ADD_STREAM:
212 {
213 int fd;
214 struct consumer_relayd_sock_pair *relayd = NULL;
215 struct lttng_consumer_stream *new_stream;
216
217 /* block */
218 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
219 rcu_read_unlock();
220 return -EINTR;
221 }
222
223 /* Get stream file descriptor from socket */
224 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
225 if (ret != sizeof(fd)) {
226 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
227 rcu_read_unlock();
228 return ret;
229 }
230
231 new_stream = consumer_allocate_stream(msg.u.stream.channel_key,
232 msg.u.stream.stream_key,
233 fd, fd,
234 msg.u.stream.state,
235 msg.u.stream.mmap_len,
236 msg.u.stream.output,
237 msg.u.stream.path_name,
238 msg.u.stream.uid,
239 msg.u.stream.gid,
240 msg.u.stream.net_index,
241 msg.u.stream.metadata_flag);
242 if (new_stream == NULL) {
243 lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
244 goto end_nosignal;
245 }
246
247 /* The stream is not metadata. Get relayd reference if exists. */
248 relayd = consumer_find_relayd(msg.u.stream.net_index);
249 if (relayd != NULL) {
250 /* Add stream on the relayd */
251 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
252 ret = relayd_add_stream(&relayd->control_sock,
253 msg.u.stream.name, msg.u.stream.path_name,
254 &new_stream->relayd_stream_id);
255 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
256 if (ret < 0) {
257 goto end_nosignal;
258 }
259 } else if (msg.u.stream.net_index != -1) {
260 ERR("Network sequence index %d unknown. Not adding stream.",
261 msg.u.stream.net_index);
262 free(new_stream);
263 goto end_nosignal;
264 }
265
266 if (ctx->on_recv_stream != NULL) {
267 ret = ctx->on_recv_stream(new_stream);
268 if (ret == 0) {
269 consumer_add_stream(new_stream);
270 } else if (ret < 0) {
271 goto end_nosignal;
272 }
273 } else {
274 consumer_add_stream(new_stream);
275 }
276
277 DBG("Kernel consumer_add_stream (%d)", fd);
278 break;
279 }
280 case LTTNG_CONSUMER_UPDATE_STREAM:
281 {
282 rcu_read_unlock();
283 return -ENOSYS;
284 }
285 case LTTNG_CONSUMER_DESTROY_RELAYD:
286 {
287 struct consumer_relayd_sock_pair *relayd;
288
289 DBG("Kernel consumer destroying relayd %" PRIu64,
290 msg.u.destroy_relayd.net_seq_idx);
291
292 /* Get relayd reference if exists. */
293 relayd = consumer_find_relayd(msg.u.destroy_relayd.net_seq_idx);
294 if (relayd == NULL) {
295 ERR("Unable to find relayd %" PRIu64,
296 msg.u.destroy_relayd.net_seq_idx);
297 goto end_nosignal;
298 }
299
300 /* Set destroy flag for this object */
301 uatomic_set(&relayd->destroy_flag, 1);
302
303 /* Destroy the relayd if refcount is 0 else set the destroy flag. */
304 if (uatomic_read(&relayd->refcount) == 0) {
305 consumer_destroy_relayd(relayd);
306 }
307 goto end_nosignal;
308 }
309 default:
310 goto end_nosignal;
311 }
312
313 /*
314 * Wake-up the other end by writing a null byte in the pipe (non-blocking).
315 * Important note: Because writing into the pipe is non-blocking (and
316 * therefore we allow dropping wakeup data, as long as there is wakeup data
317 * present in the pipe buffer to wake up the other end), the other end
318 * should perform the following sequence for waiting:
319 *
320 * 1) empty the pipe (reads).
321 * 2) perform update operation.
322 * 3) wait on the pipe (poll).
323 */
324 do {
325 ret = write(ctx->consumer_poll_pipe[1], "", 1);
326 } while (ret < 0 && errno == EINTR);
327 end_nosignal:
328 rcu_read_unlock();
329 return 0;
330 }
331
332 /*
333 * Consume data on a file descriptor and write it on a trace file.
334 */
335 ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
336 struct lttng_consumer_local_data *ctx)
337 {
338 unsigned long len;
339 int err;
340 ssize_t ret = 0;
341 int infd = stream->wait_fd;
342
343 DBG("In read_subbuffer (infd : %d)", infd);
344 /* Get the next subbuffer */
345 err = kernctl_get_next_subbuf(infd);
346 if (err != 0) {
347 /*
348 * This is a debug message even for single-threaded consumer,
349 * because poll() have more relaxed criterions than get subbuf,
350 * so get_subbuf may fail for short race windows where poll()
351 * would issue wakeups.
352 */
353 DBG("Reserving sub buffer failed (everything is normal, "
354 "it is due to concurrency)");
355 goto end;
356 }
357
358 switch (stream->output) {
359 case LTTNG_EVENT_SPLICE:
360 /* read the whole subbuffer */
361 err = kernctl_get_padded_subbuf_size(infd, &len);
362 if (err != 0) {
363 errno = -ret;
364 perror("Getting sub-buffer len failed.");
365 goto end;
366 }
367
368 /* splice the subbuffer to the tracefile */
369 ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, len);
370 if (ret != len) {
371 /*
372 * display the error but continue processing to try
373 * to release the subbuffer
374 */
375 ERR("Error splicing to tracefile (ret: %zd != len: %lu)",
376 ret, len);
377 }
378
379 break;
380 case LTTNG_EVENT_MMAP:
381 /* read the used subbuffer size */
382 err = kernctl_get_padded_subbuf_size(infd, &len);
383 if (err != 0) {
384 errno = -ret;
385 perror("Getting sub-buffer len failed.");
386 goto end;
387 }
388 /* write the subbuffer to the tracefile */
389 ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
390 if (ret != len) {
391 /*
392 * display the error but continue processing to try
393 * to release the subbuffer
394 */
395 ERR("Error writing to tracefile");
396 }
397 break;
398 default:
399 ERR("Unknown output method");
400 ret = -1;
401 }
402
403 err = kernctl_put_next_subbuf(infd);
404 if (err != 0) {
405 errno = -ret;
406 if (errno == EFAULT) {
407 perror("Error in unreserving sub buffer\n");
408 } else if (errno == EIO) {
409 /* Should never happen with newer LTTng versions */
410 perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
411 }
412 goto end;
413 }
414
415 end:
416 return ret;
417 }
418
419 int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
420 {
421 int ret;
422
423 /* Opening the tracefile in write mode */
424 if (strlen(stream->path_name) > 0 && stream->net_seq_idx == -1) {
425 ret = run_as_open(stream->path_name,
426 O_WRONLY|O_CREAT|O_TRUNC,
427 S_IRWXU|S_IRWXG|S_IRWXO,
428 stream->uid, stream->gid);
429 if (ret < 0) {
430 ERR("Opening %s", stream->path_name);
431 perror("open");
432 goto error;
433 }
434 stream->out_fd = ret;
435 }
436
437 if (stream->output == LTTNG_EVENT_MMAP) {
438 /* get the len of the mmap region */
439 unsigned long mmap_len;
440
441 ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
442 if (ret != 0) {
443 errno = -ret;
444 perror("kernctl_get_mmap_len");
445 goto error_close_fd;
446 }
447 stream->mmap_len = (size_t) mmap_len;
448
449 stream->mmap_base = mmap(NULL, stream->mmap_len,
450 PROT_READ, MAP_PRIVATE, stream->wait_fd, 0);
451 if (stream->mmap_base == MAP_FAILED) {
452 perror("Error mmaping");
453 ret = -1;
454 goto error_close_fd;
455 }
456 }
457
458 /* we return 0 to let the library handle the FD internally */
459 return 0;
460
461 error_close_fd:
462 {
463 int err;
464
465 err = close(stream->out_fd);
466 assert(!err);
467 }
468 error:
469 return ret;
470 }
471
This page took 0.039203 seconds and 4 git commands to generate.