Fix: leak of reply buffer on data pending check
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.c
1 /*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _LGPL_SOURCE
19 #include <stdio.h>
20 #include <stdlib.h>
21 #include <sys/stat.h>
22 #include <unistd.h>
23
24 #include <common/common.h>
25 #include <common/defaults.h>
26 #include <common/compat/string.h>
27
28 #include "consumer.h"
29 #include "health-sessiond.h"
30 #include "kernel-consumer.h"
31
32 static char *create_channel_path(struct consumer_output *consumer,
33 uid_t uid, gid_t gid)
34 {
35 int ret;
36 char tmp_path[PATH_MAX];
37 char *pathname = NULL;
38
39 assert(consumer);
40
41 /* Get the right path name destination */
42 if (consumer->type == CONSUMER_DST_LOCAL) {
43 /* Set application path to the destination path */
44 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s",
45 consumer->dst.trace_path, consumer->subdir);
46 if (ret < 0) {
47 PERROR("snprintf kernel channel path");
48 goto error;
49 }
50 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
51 if (!pathname) {
52 PERROR("lttng_strndup");
53 goto error;
54 }
55
56 /* Create directory */
57 ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG, uid, gid);
58 if (ret < 0) {
59 if (errno != EEXIST) {
60 ERR("Trace directory creation error");
61 goto error;
62 }
63 }
64 DBG3("Kernel local consumer tracefile path: %s", pathname);
65 } else {
66 ret = snprintf(tmp_path, sizeof(tmp_path), "%s", consumer->subdir);
67 if (ret < 0) {
68 PERROR("snprintf kernel metadata path");
69 goto error;
70 }
71 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
72 if (!pathname) {
73 PERROR("lttng_strndup");
74 goto error;
75 }
76 DBG3("Kernel network consumer subdir path: %s", pathname);
77 }
78
79 return pathname;
80
81 error:
82 free(pathname);
83 return NULL;
84 }
85
86 /*
87 * Sending a single channel to the consumer with command ADD_CHANNEL.
88 */
89 int kernel_consumer_add_channel(struct consumer_socket *sock,
90 struct ltt_kernel_channel *channel, struct ltt_kernel_session *session,
91 unsigned int monitor)
92 {
93 int ret;
94 char *pathname;
95 struct lttcomm_consumer_msg lkm;
96 struct consumer_output *consumer;
97
98 /* Safety net */
99 assert(channel);
100 assert(session);
101 assert(session->consumer);
102
103 consumer = session->consumer;
104
105 DBG("Kernel consumer adding channel %s to kernel consumer",
106 channel->channel->name);
107
108 if (monitor) {
109 pathname = create_channel_path(consumer, session->uid, session->gid);
110 } else {
111 /* Empty path. */
112 pathname = strdup("");
113 }
114 if (!pathname) {
115 ret = -1;
116 goto error;
117 }
118
119 /* Prep channel message structure */
120 consumer_init_channel_comm_msg(&lkm,
121 LTTNG_CONSUMER_ADD_CHANNEL,
122 channel->fd,
123 session->id,
124 pathname,
125 session->uid,
126 session->gid,
127 consumer->net_seq_index,
128 channel->channel->name,
129 channel->stream_count,
130 channel->channel->attr.output,
131 CONSUMER_CHANNEL_TYPE_DATA,
132 channel->channel->attr.tracefile_size,
133 channel->channel->attr.tracefile_count,
134 monitor,
135 channel->channel->attr.live_timer_interval);
136
137 health_code_update();
138
139 ret = consumer_send_channel(sock, &lkm);
140 if (ret < 0) {
141 goto error;
142 }
143
144 health_code_update();
145
146 error:
147 free(pathname);
148 return ret;
149 }
150
151 /*
152 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
153 */
154 int kernel_consumer_add_metadata(struct consumer_socket *sock,
155 struct ltt_kernel_session *session, unsigned int monitor)
156 {
157 int ret;
158 char *pathname;
159 struct lttcomm_consumer_msg lkm;
160 struct consumer_output *consumer;
161
162 /* Safety net */
163 assert(session);
164 assert(session->consumer);
165 assert(sock);
166
167 DBG("Sending metadata %d to kernel consumer", session->metadata_stream_fd);
168
169 /* Get consumer output pointer */
170 consumer = session->consumer;
171
172 if (monitor) {
173 pathname = create_channel_path(consumer, session->uid, session->gid);
174 } else {
175 /* Empty path. */
176 pathname = strdup("");
177 }
178 if (!pathname) {
179 ret = -1;
180 goto error;
181 }
182
183 /* Prep channel message structure */
184 consumer_init_channel_comm_msg(&lkm,
185 LTTNG_CONSUMER_ADD_CHANNEL,
186 session->metadata->fd,
187 session->id,
188 pathname,
189 session->uid,
190 session->gid,
191 consumer->net_seq_index,
192 DEFAULT_METADATA_NAME,
193 1,
194 DEFAULT_KERNEL_CHANNEL_OUTPUT,
195 CONSUMER_CHANNEL_TYPE_METADATA,
196 0, 0,
197 monitor, 0);
198
199 health_code_update();
200
201 ret = consumer_send_channel(sock, &lkm);
202 if (ret < 0) {
203 goto error;
204 }
205
206 health_code_update();
207
208 /* Prep stream message structure */
209 consumer_init_stream_comm_msg(&lkm,
210 LTTNG_CONSUMER_ADD_STREAM,
211 session->metadata->fd,
212 session->metadata_stream_fd,
213 0); /* CPU: 0 for metadata. */
214
215 health_code_update();
216
217 /* Send stream and file descriptor */
218 ret = consumer_send_stream(sock, consumer, &lkm,
219 &session->metadata_stream_fd, 1);
220 if (ret < 0) {
221 goto error;
222 }
223
224 health_code_update();
225
226 error:
227 free(pathname);
228 return ret;
229 }
230
231 /*
232 * Sending a single stream to the consumer with command ADD_STREAM.
233 */
234 int kernel_consumer_add_stream(struct consumer_socket *sock,
235 struct ltt_kernel_channel *channel, struct ltt_kernel_stream *stream,
236 struct ltt_kernel_session *session, unsigned int monitor)
237 {
238 int ret;
239 struct lttcomm_consumer_msg lkm;
240 struct consumer_output *consumer;
241
242 assert(channel);
243 assert(stream);
244 assert(session);
245 assert(session->consumer);
246 assert(sock);
247
248 DBG("Sending stream %d of channel %s to kernel consumer",
249 stream->fd, channel->channel->name);
250
251 /* Get consumer output pointer */
252 consumer = session->consumer;
253
254 /* Prep stream consumer message */
255 consumer_init_stream_comm_msg(&lkm,
256 LTTNG_CONSUMER_ADD_STREAM,
257 channel->fd,
258 stream->fd,
259 stream->cpu);
260
261 health_code_update();
262
263 /* Send stream and file descriptor */
264 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
265 if (ret < 0) {
266 goto error;
267 }
268
269 health_code_update();
270
271 error:
272 return ret;
273 }
274
275 /*
276 * Sending the notification that all streams were sent with STREAMS_SENT.
277 */
278 int kernel_consumer_streams_sent(struct consumer_socket *sock,
279 struct ltt_kernel_session *session, uint64_t channel_key)
280 {
281 int ret;
282 struct lttcomm_consumer_msg lkm;
283 struct consumer_output *consumer;
284
285 assert(sock);
286 assert(session);
287
288 DBG("Sending streams_sent");
289 /* Get consumer output pointer */
290 consumer = session->consumer;
291
292 /* Prep stream consumer message */
293 consumer_init_streams_sent_comm_msg(&lkm,
294 LTTNG_CONSUMER_STREAMS_SENT,
295 channel_key, consumer->net_seq_index);
296
297 health_code_update();
298
299 /* Send stream and file descriptor */
300 ret = consumer_send_msg(sock, &lkm);
301 if (ret < 0) {
302 goto error;
303 }
304
305 error:
306 return ret;
307 }
308
309 /*
310 * Send all stream fds of kernel channel to the consumer.
311 */
312 int kernel_consumer_send_channel_stream(struct consumer_socket *sock,
313 struct ltt_kernel_channel *channel, struct ltt_kernel_session *session,
314 unsigned int monitor)
315 {
316 int ret;
317 struct ltt_kernel_stream *stream;
318
319 /* Safety net */
320 assert(channel);
321 assert(session);
322 assert(session->consumer);
323 assert(sock);
324
325 /* Bail out if consumer is disabled */
326 if (!session->consumer->enabled) {
327 ret = LTTNG_OK;
328 goto error;
329 }
330
331 DBG("Sending streams of channel %s to kernel consumer",
332 channel->channel->name);
333
334 ret = kernel_consumer_add_channel(sock, channel, session, monitor);
335 if (ret < 0) {
336 goto error;
337 }
338
339 /* Send streams */
340 cds_list_for_each_entry(stream, &channel->stream_list.head, list) {
341 if (!stream->fd) {
342 continue;
343 }
344
345 /* Add stream on the kernel consumer side. */
346 ret = kernel_consumer_add_stream(sock, channel, stream, session,
347 monitor);
348 if (ret < 0) {
349 goto error;
350 }
351 }
352
353 error:
354 return ret;
355 }
356
357 /*
358 * Send all stream fds of the kernel session to the consumer.
359 */
360 int kernel_consumer_send_session(struct consumer_socket *sock,
361 struct ltt_kernel_session *session)
362 {
363 int ret, monitor = 0;
364 struct ltt_kernel_channel *chan;
365
366 /* Safety net */
367 assert(session);
368 assert(session->consumer);
369 assert(sock);
370
371 /* Bail out if consumer is disabled */
372 if (!session->consumer->enabled) {
373 ret = LTTNG_OK;
374 goto error;
375 }
376
377 /* Don't monitor the streams on the consumer if in flight recorder. */
378 if (session->output_traces) {
379 monitor = 1;
380 }
381
382 DBG("Sending session stream to kernel consumer");
383
384 if (session->metadata_stream_fd >= 0 && session->metadata) {
385 ret = kernel_consumer_add_metadata(sock, session, monitor);
386 if (ret < 0) {
387 goto error;
388 }
389 }
390
391 /* Send channel and streams of it */
392 cds_list_for_each_entry(chan, &session->channel_list.head, list) {
393 ret = kernel_consumer_send_channel_stream(sock, chan, session,
394 monitor);
395 if (ret < 0) {
396 goto error;
397 }
398 if (monitor) {
399 /*
400 * Inform the relay that all the streams for the
401 * channel were sent.
402 */
403 ret = kernel_consumer_streams_sent(sock, session, chan->fd);
404 if (ret < 0) {
405 goto error;
406 }
407 }
408 }
409
410 DBG("Kernel consumer FDs of metadata and channel streams sent");
411
412 session->consumer_fds_sent = 1;
413 return 0;
414
415 error:
416 return ret;
417 }
418
419 int kernel_consumer_destroy_channel(struct consumer_socket *socket,
420 struct ltt_kernel_channel *channel)
421 {
422 int ret;
423 struct lttcomm_consumer_msg msg;
424
425 assert(channel);
426 assert(socket);
427
428 DBG("Sending kernel consumer destroy channel key %d", channel->fd);
429
430 memset(&msg, 0, sizeof(msg));
431 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
432 msg.u.destroy_channel.key = channel->fd;
433
434 pthread_mutex_lock(socket->lock);
435 health_code_update();
436
437 ret = consumer_send_msg(socket, &msg);
438 if (ret < 0) {
439 goto error;
440 }
441
442 error:
443 health_code_update();
444 pthread_mutex_unlock(socket->lock);
445 return ret;
446 }
447
448 int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
449 struct ltt_kernel_metadata *metadata)
450 {
451 int ret;
452 struct lttcomm_consumer_msg msg;
453
454 assert(metadata);
455 assert(socket);
456
457 DBG("Sending kernel consumer destroy channel key %d", metadata->fd);
458
459 memset(&msg, 0, sizeof(msg));
460 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
461 msg.u.destroy_channel.key = metadata->fd;
462
463 pthread_mutex_lock(socket->lock);
464 health_code_update();
465
466 ret = consumer_send_msg(socket, &msg);
467 if (ret < 0) {
468 goto error;
469 }
470
471 error:
472 health_code_update();
473 pthread_mutex_unlock(socket->lock);
474 return ret;
475 }
This page took 0.039445 seconds and 4 git commands to generate.