Update version to v2.1.0-rc1
[lttng-tools.git] / src / bin / lttng-sessiond / consumer.c
CommitLineData
00e2e675
DG
1/*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18#define _GNU_SOURCE
19#include <assert.h>
20#include <stdio.h>
21#include <stdlib.h>
22#include <string.h>
23#include <sys/stat.h>
24#include <sys/types.h>
25#include <unistd.h>
26
27#include <common/common.h>
28#include <common/defaults.h>
29#include <common/uri.h>
30
31#include "consumer.h"
32
a4b92340
DG
33/*
34 * From a consumer_data structure, allocate and add a consumer socket to the
35 * consumer output.
36 *
37 * Return 0 on success, else negative value on error
38 */
39int consumer_create_socket(struct consumer_data *data,
40 struct consumer_output *output)
41{
42 int ret = 0;
43 struct consumer_socket *socket;
44
45 assert(data);
46
47 if (output == NULL || data->cmd_sock < 0) {
48 /*
49 * Not an error. Possible there is simply not spawned consumer or it's
50 * disabled for the tracing session asking the socket.
51 */
52 goto error;
53 }
54
55 rcu_read_lock();
56 socket = consumer_find_socket(data->cmd_sock, output);
57 rcu_read_unlock();
58 if (socket == NULL) {
59 socket = consumer_allocate_socket(data->cmd_sock);
60 if (socket == NULL) {
61 ret = -1;
62 goto error;
63 }
64
65 socket->lock = &data->lock;
66 rcu_read_lock();
67 consumer_add_socket(socket, output);
68 rcu_read_unlock();
69 }
70
71 DBG3("Consumer socket created (fd: %d) and added to output",
72 data->cmd_sock);
73
74error:
75 return ret;
76}
77
173af62f
DG
78/*
79 * Find a consumer_socket in a consumer_output hashtable. Read side lock must
80 * be acquired before calling this function and across use of the
81 * returned consumer_socket.
82 */
83struct consumer_socket *consumer_find_socket(int key,
84 struct consumer_output *consumer)
85{
86 struct lttng_ht_iter iter;
87 struct lttng_ht_node_ulong *node;
88 struct consumer_socket *socket = NULL;
89
90 /* Negative keys are lookup failures */
a4b92340 91 if (key < 0 || consumer == NULL) {
173af62f
DG
92 return NULL;
93 }
94
95 lttng_ht_lookup(consumer->socks, (void *)((unsigned long) key),
96 &iter);
97 node = lttng_ht_iter_get_node_ulong(&iter);
98 if (node != NULL) {
99 socket = caa_container_of(node, struct consumer_socket, node);
100 }
101
102 return socket;
103}
104
105/*
106 * Allocate a new consumer_socket and return the pointer.
107 */
108struct consumer_socket *consumer_allocate_socket(int fd)
109{
110 struct consumer_socket *socket = NULL;
111
112 socket = zmalloc(sizeof(struct consumer_socket));
113 if (socket == NULL) {
114 PERROR("zmalloc consumer socket");
115 goto error;
116 }
117
118 socket->fd = fd;
119 lttng_ht_node_init_ulong(&socket->node, fd);
120
121error:
122 return socket;
123}
124
125/*
126 * Add consumer socket to consumer output object. Read side lock must be
127 * acquired before calling this function.
128 */
129void consumer_add_socket(struct consumer_socket *sock,
130 struct consumer_output *consumer)
131{
132 assert(sock);
133 assert(consumer);
134
135 lttng_ht_add_unique_ulong(consumer->socks, &sock->node);
136}
137
138/*
139 * Delte consumer socket to consumer output object. Read side lock must be
140 * acquired before calling this function.
141 */
142void consumer_del_socket(struct consumer_socket *sock,
143 struct consumer_output *consumer)
144{
145 int ret;
146 struct lttng_ht_iter iter;
147
148 assert(sock);
149 assert(consumer);
150
151 iter.iter.node = &sock->node.node;
152 ret = lttng_ht_del(consumer->socks, &iter);
153 assert(!ret);
154}
155
156/*
157 * RCU destroy call function.
158 */
159static void destroy_socket_rcu(struct rcu_head *head)
160{
161 struct lttng_ht_node_ulong *node =
162 caa_container_of(head, struct lttng_ht_node_ulong, head);
163 struct consumer_socket *socket =
164 caa_container_of(node, struct consumer_socket, node);
165
166 free(socket);
167}
168
169/*
170 * Destroy and free socket pointer in a call RCU. Read side lock must be
171 * acquired before calling this function.
172 */
173void consumer_destroy_socket(struct consumer_socket *sock)
174{
175 assert(sock);
176
177 /*
178 * We DO NOT close the file descriptor here since it is global to the
179 * session daemon and is closed only if the consumer dies.
180 */
181
182 call_rcu(&sock->node.head, destroy_socket_rcu);
183}
184
00e2e675
DG
185/*
186 * Allocate and assign data to a consumer_output object.
187 *
188 * Return pointer to structure.
189 */
190struct consumer_output *consumer_create_output(enum consumer_dst_type type)
191{
192 struct consumer_output *output = NULL;
193
194 output = zmalloc(sizeof(struct consumer_output));
195 if (output == NULL) {
196 PERROR("zmalloc consumer_output");
197 goto error;
198 }
199
200 /* By default, consumer output is enabled */
201 output->enabled = 1;
202 output->type = type;
203 output->net_seq_index = -1;
173af62f
DG
204
205 output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
00e2e675
DG
206
207error:
208 return output;
209}
210
211/*
212 * Delete the consumer_output object from the list and free the ptr.
213 */
214void consumer_destroy_output(struct consumer_output *obj)
215{
216 if (obj == NULL) {
217 return;
218 }
219
173af62f
DG
220 if (obj->socks) {
221 struct lttng_ht_iter iter;
222 struct consumer_socket *socket;
223
224 cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
225 consumer_destroy_socket(socket);
226 }
00e2e675 227 }
173af62f 228
00e2e675
DG
229 free(obj);
230}
231
232/*
233 * Copy consumer output and returned the newly allocated copy.
234 */
235struct consumer_output *consumer_copy_output(struct consumer_output *obj)
236{
173af62f
DG
237 struct lttng_ht_iter iter;
238 struct consumer_socket *socket, *copy_sock;
00e2e675
DG
239 struct consumer_output *output;
240
241 assert(obj);
242
243 output = consumer_create_output(obj->type);
244 if (output == NULL) {
245 goto error;
246 }
247
248 memcpy(output, obj, sizeof(struct consumer_output));
249
173af62f
DG
250 /* Copy sockets */
251 output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
252
253 cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
254 /* Create new socket object. */
255 copy_sock = consumer_allocate_socket(socket->fd);
256 if (copy_sock == NULL) {
257 goto malloc_error;
258 }
259
260 copy_sock->lock = socket->lock;
261 consumer_add_socket(copy_sock, output);
262 }
263
00e2e675
DG
264error:
265 return output;
173af62f
DG
266
267malloc_error:
268 consumer_destroy_output(output);
269 return NULL;
00e2e675
DG
270}
271
272/*
273 * Set network URI to the consumer output object.
274 *
ad20f474
DG
275 * Return 0 on success. Return 1 if the URI were equal. Else, negative value on
276 * error.
00e2e675
DG
277 */
278int consumer_set_network_uri(struct consumer_output *obj,
279 struct lttng_uri *uri)
280{
281 int ret;
282 char tmp_path[PATH_MAX];
283 char hostname[HOST_NAME_MAX];
284 struct lttng_uri *dst_uri = NULL;
285
286 /* Code flow error safety net. */
287 assert(obj);
288 assert(uri);
289
290 switch (uri->stype) {
291 case LTTNG_STREAM_CONTROL:
292 dst_uri = &obj->dst.net.control;
293 obj->dst.net.control_isset = 1;
294 if (uri->port == 0) {
295 /* Assign default port. */
296 uri->port = DEFAULT_NETWORK_CONTROL_PORT;
297 }
ad20f474 298 DBG3("Consumer control URI set with port %d", uri->port);
00e2e675
DG
299 break;
300 case LTTNG_STREAM_DATA:
301 dst_uri = &obj->dst.net.data;
302 obj->dst.net.data_isset = 1;
303 if (uri->port == 0) {
304 /* Assign default port. */
305 uri->port = DEFAULT_NETWORK_DATA_PORT;
306 }
ad20f474 307 DBG3("Consumer data URI set with port %d", uri->port);
00e2e675
DG
308 break;
309 default:
310 ERR("Set network uri type unknown %d", uri->stype);
311 goto error;
312 }
313
314 ret = uri_compare(dst_uri, uri);
315 if (!ret) {
316 /* Same URI, don't touch it and return success. */
317 DBG3("URI network compare are the same");
ad20f474 318 goto equal;
00e2e675
DG
319 }
320
321 /* URIs were not equal, replacing it. */
322 memset(dst_uri, 0, sizeof(struct lttng_uri));
323 memcpy(dst_uri, uri, sizeof(struct lttng_uri));
324 obj->type = CONSUMER_DST_NET;
325
326 /* Handle subdir and add hostname in front. */
327 if (dst_uri->stype == LTTNG_STREAM_CONTROL) {
328 /* Get hostname to append it in the pathname */
329 ret = gethostname(hostname, sizeof(hostname));
330 if (ret < 0) {
331 PERROR("gethostname. Fallback on default localhost");
332 strncpy(hostname, "localhost", sizeof(hostname));
333 }
334 hostname[sizeof(hostname) - 1] = '\0';
335
336 /* Setup consumer subdir if none present in the control URI */
337 if (strlen(dst_uri->subdir) == 0) {
338 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
339 hostname, obj->subdir);
340 } else {
341 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
342 hostname, dst_uri->subdir);
343 }
344 if (ret < 0) {
345 PERROR("snprintf set consumer uri subdir");
346 goto error;
347 }
348
349 strncpy(obj->subdir, tmp_path, sizeof(obj->subdir));
350 DBG3("Consumer set network uri subdir path %s", tmp_path);
351 }
352
00e2e675 353 return 0;
ad20f474
DG
354equal:
355 return 1;
00e2e675
DG
356error:
357 return -1;
358}
359
360/*
361 * Send file descriptor to consumer via sock.
362 */
363int consumer_send_fds(int sock, int *fds, size_t nb_fd)
364{
365 int ret;
366
367 assert(fds);
368 assert(nb_fd > 0);
369
370 ret = lttcomm_send_fds_unix_sock(sock, fds, nb_fd);
371 if (ret < 0) {
372 PERROR("send consumer fds");
373 goto error;
374 }
375
376error:
377 return ret;
378}
379
380/*
381 * Consumer send channel communication message structure to consumer.
382 */
383int consumer_send_channel(int sock, struct lttcomm_consumer_msg *msg)
384{
385 int ret;
386
387 assert(msg);
388 assert(sock >= 0);
389
390 ret = lttcomm_send_unix_sock(sock, msg,
391 sizeof(struct lttcomm_consumer_msg));
392 if (ret < 0) {
393 PERROR("send consumer channel");
394 goto error;
395 }
396
397error:
398 return ret;
399}
400
401/*
402 * Init channel communication message structure.
403 */
404void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
405 enum lttng_consumer_command cmd,
406 int channel_key,
407 uint64_t max_sb_size,
408 uint64_t mmap_len,
409 const char *name)
410{
411 assert(msg);
412
413 /* TODO: Args validation */
414
415 /* Zeroed structure */
416 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
417
418 /* Send channel */
419 msg->cmd_type = cmd;
420 msg->u.channel.channel_key = channel_key;
421 msg->u.channel.max_sb_size = max_sb_size;
422 msg->u.channel.mmap_len = mmap_len;
423}
424
425/*
426 * Init stream communication message structure.
427 */
428void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
429 enum lttng_consumer_command cmd,
430 int channel_key,
431 int stream_key,
432 uint32_t state,
433 enum lttng_event_output output,
434 uint64_t mmap_len,
435 uid_t uid,
436 gid_t gid,
437 int net_index,
438 unsigned int metadata_flag,
439 const char *name,
440 const char *pathname)
441{
442 assert(msg);
443
444 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
445
446 /* TODO: Args validation */
447
448 msg->cmd_type = cmd;
449 msg->u.stream.channel_key = channel_key;
450 msg->u.stream.stream_key = stream_key;
451 msg->u.stream.state = state;
452 msg->u.stream.output = output;
453 msg->u.stream.mmap_len = mmap_len;
454 msg->u.stream.uid = uid;
455 msg->u.stream.gid = gid;
456 msg->u.stream.net_index = net_index;
457 msg->u.stream.metadata_flag = metadata_flag;
458 strncpy(msg->u.stream.name, name, sizeof(msg->u.stream.name));
459 msg->u.stream.name[sizeof(msg->u.stream.name) - 1] = '\0';
460 strncpy(msg->u.stream.path_name, pathname,
461 sizeof(msg->u.stream.path_name));
462 msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0';
463}
464
465/*
466 * Send stream communication structure to the consumer.
467 */
468int consumer_send_stream(int sock, struct consumer_output *dst,
469 struct lttcomm_consumer_msg *msg, int *fds, size_t nb_fd)
470{
471 int ret;
472
473 assert(msg);
474 assert(dst);
475
476 switch (dst->type) {
477 case CONSUMER_DST_NET:
478 /* Consumer should send the stream on the network. */
479 msg->u.stream.net_index = dst->net_seq_index;
480 break;
481 case CONSUMER_DST_LOCAL:
482 /* Add stream file name to stream path */
483 strncat(msg->u.stream.path_name, "/", sizeof(msg->u.stream.path_name));
484 strncat(msg->u.stream.path_name, msg->u.stream.name,
485 sizeof(msg->u.stream.path_name));
486 msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0';
487 /* Indicate that the stream is NOT network */
488 msg->u.stream.net_index = -1;
489 break;
490 default:
491 ERR("Consumer unknown output type (%d)", dst->type);
492 ret = -1;
493 goto error;
494 }
495
496 /* Send on socket */
497 ret = lttcomm_send_unix_sock(sock, msg,
498 sizeof(struct lttcomm_consumer_msg));
499 if (ret < 0) {
500 PERROR("send consumer stream");
501 goto error;
502 }
503
504 ret = consumer_send_fds(sock, fds, nb_fd);
505 if (ret < 0) {
506 goto error;
507 }
508
509error:
510 return ret;
511}
37278a1e
DG
512
513/*
514 * Send relayd socket to consumer associated with a session name.
515 *
516 * On success return positive value. On error, negative value.
517 */
518int consumer_send_relayd_socket(int consumer_sock,
519 struct lttcomm_sock *sock, struct consumer_output *consumer,
520 enum lttng_stream_type type)
521{
522 int ret;
523 struct lttcomm_consumer_msg msg;
524
525 /* Code flow error. Safety net. */
526 assert(sock);
527 assert(consumer);
528
529 /* Bail out if consumer is disabled */
530 if (!consumer->enabled) {
531 ret = LTTCOMM_OK;
532 goto error;
533 }
534
535 msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET;
536 /*
537 * Assign network consumer output index using the temporary consumer since
538 * this call should only be made from within a set_consumer_uri() function
539 * call in the session daemon.
540 */
541 msg.u.relayd_sock.net_index = consumer->net_seq_index;
542 msg.u.relayd_sock.type = type;
543 memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock));
544
173af62f 545 DBG3("Sending relayd sock info to consumer on %d", consumer_sock);
37278a1e
DG
546 ret = lttcomm_send_unix_sock(consumer_sock, &msg, sizeof(msg));
547 if (ret < 0) {
548 PERROR("send consumer relayd socket info");
549 goto error;
550 }
551
552 DBG3("Sending relayd socket file descriptor to consumer");
553 ret = consumer_send_fds(consumer_sock, &sock->fd, 1);
554 if (ret < 0) {
555 goto error;
556 }
557
558 DBG2("Consumer relayd socket sent");
559
560error:
561 return ret;
562}
173af62f
DG
563
564/*
565 * Send destroy relayd command to consumer.
566 *
567 * On success return positive value. On error, negative value.
568 */
569int consumer_send_destroy_relayd(struct consumer_socket *sock,
570 struct consumer_output *consumer)
571{
572 int ret;
573 struct lttcomm_consumer_msg msg;
574
575 assert(consumer);
576 assert(sock);
577
578 DBG2("Sending destroy relayd command to consumer...");
579
580 /* Bail out if consumer is disabled */
581 if (!consumer->enabled) {
582 ret = LTTCOMM_OK;
583 DBG3("Consumer is disabled");
584 goto error;
585 }
586
587 msg.cmd_type = LTTNG_CONSUMER_DESTROY_RELAYD;
588 msg.u.destroy_relayd.net_seq_idx = consumer->net_seq_index;
589
590 pthread_mutex_lock(sock->lock);
591 ret = lttcomm_send_unix_sock(sock->fd, &msg, sizeof(msg));
592 pthread_mutex_unlock(sock->lock);
593 if (ret < 0) {
594 PERROR("send consumer destroy relayd command");
595 goto error;
596 }
597
598 DBG2("Consumer send destroy relayd command done");
599
600error:
601 return ret;
602}
This page took 0.072907 seconds and 4 git commands to generate.