Backport HT fixes from urcu upstream
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
CommitLineData
3bd1e081
MD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
d14d33bf
AM
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License, version 2 only,
7 * as published by the Free Software Foundation.
3bd1e081
MD
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
d14d33bf
AM
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
3bd1e081
MD
17 */
18
19#define _GNU_SOURCE
20#include <assert.h>
3bd1e081
MD
21#include <poll.h>
22#include <pthread.h>
23#include <stdlib.h>
24#include <string.h>
25#include <sys/mman.h>
26#include <sys/socket.h>
27#include <sys/types.h>
28#include <unistd.h>
dbb5dfe6 29#include <sys/stat.h>
3bd1e081 30
990570ed 31#include <common/common.h>
10a8a223 32#include <common/kernel-ctl/kernel-ctl.h>
10a8a223 33#include <common/sessiond-comm/sessiond-comm.h>
00e2e675 34#include <common/sessiond-comm/relayd.h>
dbb5dfe6 35#include <common/compat/fcntl.h>
00e2e675 36#include <common/relayd/relayd.h>
0857097f 37
10a8a223 38#include "kernel-consumer.h"
3bd1e081
MD
39
40extern struct lttng_consumer_global_data consumer_data;
41extern int consumer_poll_timeout;
42extern volatile int consumer_quit;
43
3bd1e081
MD
44/*
45 * Take a snapshot for a specific fd
46 *
47 * Returns 0 on success, < 0 on error
48 */
49int lttng_kconsumer_take_snapshot(struct lttng_consumer_local_data *ctx,
50 struct lttng_consumer_stream *stream)
51{
52 int ret = 0;
53 int infd = stream->wait_fd;
54
55 ret = kernctl_snapshot(infd);
56 if (ret != 0) {
87dc6a9c 57 errno = -ret;
3bd1e081
MD
58 perror("Getting sub-buffer snapshot.");
59 }
60
61 return ret;
62}
63
64/*
65 * Get the produced position
66 *
67 * Returns 0 on success, < 0 on error
68 */
69int lttng_kconsumer_get_produced_snapshot(
70 struct lttng_consumer_local_data *ctx,
71 struct lttng_consumer_stream *stream,
72 unsigned long *pos)
73{
74 int ret;
75 int infd = stream->wait_fd;
76
77 ret = kernctl_snapshot_get_produced(infd, pos);
78 if (ret != 0) {
87dc6a9c 79 errno = -ret;
3bd1e081
MD
80 perror("kernctl_snapshot_get_produced");
81 }
82
83 return ret;
84}
85
86int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
87 int sock, struct pollfd *consumer_sockpoll)
88{
89 ssize_t ret;
90 struct lttcomm_consumer_msg msg;
91
92 ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
93 if (ret != sizeof(msg)) {
f2fc6720 94 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_CMD);
3bd1e081
MD
95 return ret;
96 }
97 if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
98 return -ENOENT;
99 }
100
b0b335c8
MD
101 /* relayd needs RCU read-side protection */
102 rcu_read_lock();
103
3bd1e081 104 switch (msg.cmd_type) {
00e2e675
DG
105 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
106 {
107 int fd;
108 struct consumer_relayd_sock_pair *relayd;
109
110 DBG("Consumer adding relayd socket");
111
112 /* Get relayd reference if exists. */
113 relayd = consumer_find_relayd(msg.u.relayd_sock.net_index);
114 if (relayd == NULL) {
115 /* Not found. Allocate one. */
116 relayd = consumer_allocate_relayd_sock_pair(
117 msg.u.relayd_sock.net_index);
118 if (relayd == NULL) {
119 lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
120 goto end_nosignal;
121 }
122 }
123
124 /* Poll on consumer socket. */
125 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
126 return -EINTR;
127 }
128
129 /* Get relayd socket from session daemon */
130 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
131 if (ret != sizeof(fd)) {
132 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
133 goto end_nosignal;
134 }
135
136 /* Copy socket information and received FD */
137 switch (msg.u.relayd_sock.type) {
138 case LTTNG_STREAM_CONTROL:
139 /* Copy received lttcomm socket */
140 lttcomm_copy_sock(&relayd->control_sock, &msg.u.relayd_sock.sock);
141
142 ret = lttcomm_create_sock(&relayd->control_sock);
143 if (ret < 0) {
144 goto end_nosignal;
145 }
146
147 /* Close the created socket fd which is useless */
148 close(relayd->control_sock.fd);
149
150 /* Assign new file descriptor */
151 relayd->control_sock.fd = fd;
152 break;
153 case LTTNG_STREAM_DATA:
154 /* Copy received lttcomm socket */
155 lttcomm_copy_sock(&relayd->data_sock, &msg.u.relayd_sock.sock);
156 ret = lttcomm_create_sock(&relayd->data_sock);
157 if (ret < 0) {
158 goto end_nosignal;
159 }
160
161 /* Close the created socket fd which is useless */
162 close(relayd->data_sock.fd);
163
164 /* Assign new file descriptor */
165 relayd->data_sock.fd = fd;
166 break;
167 default:
168 ERR("Unknown relayd socket type");
169 goto end_nosignal;
170 }
171
172 DBG("Consumer %s socket created successfully with net idx %d (fd: %d)",
173 msg.u.relayd_sock.type == LTTNG_STREAM_CONTROL ? "control" : "data",
174 relayd->net_seq_idx, fd);
175
176 /*
177 * Add relayd socket pair to consumer data hashtable. If object already
178 * exists or on error, the function gracefully returns.
179 */
180 consumer_add_relayd(relayd);
181
182 goto end_nosignal;
183 }
3bd1e081
MD
184 case LTTNG_CONSUMER_ADD_CHANNEL:
185 {
186 struct lttng_consumer_channel *new_channel;
187
188 DBG("consumer_add_channel %d", msg.u.channel.channel_key);
189 new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
190 -1, -1,
191 msg.u.channel.mmap_len,
192 msg.u.channel.max_sb_size);
193 if (new_channel == NULL) {
194 lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
195 goto end_nosignal;
196 }
197 if (ctx->on_recv_channel != NULL) {
198 ret = ctx->on_recv_channel(new_channel);
199 if (ret == 0) {
200 consumer_add_channel(new_channel);
201 } else if (ret < 0) {
202 goto end_nosignal;
203 }
204 } else {
205 consumer_add_channel(new_channel);
206 }
207 goto end_nosignal;
208 }
209 case LTTNG_CONSUMER_ADD_STREAM:
210 {
f2fc6720 211 int fd;
00e2e675
DG
212 struct consumer_relayd_sock_pair *relayd = NULL;
213 struct lttng_consumer_stream *new_stream;
3bd1e081
MD
214
215 /* block */
216 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
217 return -EINTR;
218 }
00e2e675
DG
219
220 /* Get stream file descriptor from socket */
f2fc6720
MD
221 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
222 if (ret != sizeof(fd)) {
3bd1e081
MD
223 lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
224 return ret;
225 }
3bd1e081 226
3bd1e081
MD
227 new_stream = consumer_allocate_stream(msg.u.stream.channel_key,
228 msg.u.stream.stream_key,
f2fc6720 229 fd, fd,
3bd1e081
MD
230 msg.u.stream.state,
231 msg.u.stream.mmap_len,
232 msg.u.stream.output,
6df2e2c9
MD
233 msg.u.stream.path_name,
234 msg.u.stream.uid,
00e2e675
DG
235 msg.u.stream.gid,
236 msg.u.stream.net_index,
237 msg.u.stream.metadata_flag);
3bd1e081
MD
238 if (new_stream == NULL) {
239 lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
240 goto end;
241 }
00e2e675
DG
242
243 /* The stream is not metadata. Get relayd reference if exists. */
244 relayd = consumer_find_relayd(msg.u.stream.net_index);
245 if (relayd != NULL) {
246 /* Add stream on the relayd */
247 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
248 ret = relayd_add_stream(&relayd->control_sock,
249 msg.u.stream.name, msg.u.stream.path_name,
250 &new_stream->relayd_stream_id);
251 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
252 if (ret < 0) {
253 goto end;
254 }
255 } else if (msg.u.stream.net_index != -1) {
256 ERR("Network sequence index %d unknown. Not adding stream.",
257 msg.u.stream.net_index);
258 free(new_stream);
259 goto end;
260 }
261
3bd1e081
MD
262 if (ctx->on_recv_stream != NULL) {
263 ret = ctx->on_recv_stream(new_stream);
264 if (ret == 0) {
265 consumer_add_stream(new_stream);
266 } else if (ret < 0) {
267 goto end;
268 }
269 } else {
270 consumer_add_stream(new_stream);
271 }
00e2e675
DG
272
273 DBG("Kernel consumer_add_stream (%d)", fd);
3bd1e081
MD
274 break;
275 }
276 case LTTNG_CONSUMER_UPDATE_STREAM:
277 {
278 if (ctx->on_update_stream != NULL) {
279 ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state);
280 if (ret == 0) {
281 consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state);
282 } else if (ret < 0) {
283 goto end;
284 }
285 } else {
286 consumer_change_stream_state(msg.u.stream.stream_key,
287 msg.u.stream.state);
288 }
289 break;
290 }
291 default:
292 break;
293 }
294end:
04fdd819
MD
295 /*
296 * Wake-up the other end by writing a null byte in the pipe
297 * (non-blocking). Important note: Because writing into the
298 * pipe is non-blocking (and therefore we allow dropping wakeup
299 * data, as long as there is wakeup data present in the pipe
300 * buffer to wake up the other end), the other end should
301 * perform the following sequence for waiting:
302 * 1) empty the pipe (reads).
303 * 2) perform update operation.
304 * 3) wait on the pipe (poll).
305 */
306 do {
307 ret = write(ctx->consumer_poll_pipe[1], "", 1);
6f94560a 308 } while (ret < 0 && errno == EINTR);
3bd1e081 309end_nosignal:
b0b335c8 310 rcu_read_unlock();
3bd1e081
MD
311 return 0;
312}
d41f73b7
MD
313
314/*
315 * Consume data on a file descriptor and write it on a trace file.
316 */
4078b776 317ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
d41f73b7
MD
318 struct lttng_consumer_local_data *ctx)
319{
320 unsigned long len;
321 int err;
4078b776 322 ssize_t ret = 0;
d41f73b7
MD
323 int infd = stream->wait_fd;
324
325 DBG("In read_subbuffer (infd : %d)", infd);
326 /* Get the next subbuffer */
327 err = kernctl_get_next_subbuf(infd);
328 if (err != 0) {
d41f73b7
MD
329 /*
330 * This is a debug message even for single-threaded consumer,
331 * because poll() have more relaxed criterions than get subbuf,
332 * so get_subbuf may fail for short race windows where poll()
333 * would issue wakeups.
334 */
335 DBG("Reserving sub buffer failed (everything is normal, "
336 "it is due to concurrency)");
337 goto end;
338 }
339
340 switch (stream->output) {
341 case LTTNG_EVENT_SPLICE:
342 /* read the whole subbuffer */
343 err = kernctl_get_padded_subbuf_size(infd, &len);
344 if (err != 0) {
87dc6a9c 345 errno = -ret;
d41f73b7
MD
346 perror("Getting sub-buffer len failed.");
347 goto end;
348 }
349
350 /* splice the subbuffer to the tracefile */
351 ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, len);
47e81c02 352 if (ret != len) {
d41f73b7
MD
353 /*
354 * display the error but continue processing to try
355 * to release the subbuffer
356 */
00e2e675
DG
357 ERR("Error splicing to tracefile (ret: %ld != len: %ld)",
358 ret, len);
d41f73b7 359 }
47e81c02 360
d41f73b7
MD
361 break;
362 case LTTNG_EVENT_MMAP:
363 /* read the used subbuffer size */
364 err = kernctl_get_padded_subbuf_size(infd, &len);
365 if (err != 0) {
87dc6a9c 366 errno = -ret;
d41f73b7
MD
367 perror("Getting sub-buffer len failed.");
368 goto end;
369 }
370 /* write the subbuffer to the tracefile */
371 ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
47e81c02 372 if (ret != len) {
d41f73b7
MD
373 /*
374 * display the error but continue processing to try
375 * to release the subbuffer
376 */
377 ERR("Error writing to tracefile");
378 }
379 break;
380 default:
381 ERR("Unknown output method");
382 ret = -1;
383 }
384
385 err = kernctl_put_next_subbuf(infd);
386 if (err != 0) {
87dc6a9c 387 errno = -ret;
d41f73b7
MD
388 if (errno == EFAULT) {
389 perror("Error in unreserving sub buffer\n");
390 } else if (errno == EIO) {
391 /* Should never happen with newer LTTng versions */
392 perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
393 }
394 goto end;
395 }
396
397end:
398 return ret;
399}
400
401int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
402{
403 int ret;
404
405 /* Opening the tracefile in write mode */
00e2e675 406 if (strlen(stream->path_name) > 0 && stream->net_seq_idx == -1) {
e11d277b 407 ret = run_as_open(stream->path_name,
60b6c79c
MD
408 O_WRONLY|O_CREAT|O_TRUNC,
409 S_IRWXU|S_IRWXG|S_IRWXO,
410 stream->uid, stream->gid);
d41f73b7
MD
411 if (ret < 0) {
412 ERR("Opening %s", stream->path_name);
413 perror("open");
414 goto error;
415 }
416 stream->out_fd = ret;
417 }
418
419 if (stream->output == LTTNG_EVENT_MMAP) {
420 /* get the len of the mmap region */
421 unsigned long mmap_len;
422
423 ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
424 if (ret != 0) {
87dc6a9c 425 errno = -ret;
d41f73b7
MD
426 perror("kernctl_get_mmap_len");
427 goto error_close_fd;
428 }
429 stream->mmap_len = (size_t) mmap_len;
430
431 stream->mmap_base = mmap(NULL, stream->mmap_len,
432 PROT_READ, MAP_PRIVATE, stream->wait_fd, 0);
433 if (stream->mmap_base == MAP_FAILED) {
434 perror("Error mmaping");
435 ret = -1;
436 goto error_close_fd;
437 }
438 }
439
440 /* we return 0 to let the library handle the FD internally */
441 return 0;
442
443error_close_fd:
444 {
445 int err;
446
447 err = close(stream->out_fd);
448 assert(!err);
449 }
450error:
451 return ret;
452}
453
This page took 0.050818 seconds and 4 git commands to generate.