Add consumer socket object and relayd commands
[lttng-tools.git] / src / bin / lttng-sessiond / consumer.c
CommitLineData
00e2e675
DG
1/*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18#define _GNU_SOURCE
19#include <assert.h>
20#include <stdio.h>
21#include <stdlib.h>
22#include <string.h>
23#include <sys/stat.h>
24#include <sys/types.h>
25#include <unistd.h>
26
27#include <common/common.h>
28#include <common/defaults.h>
29#include <common/uri.h>
30
31#include "consumer.h"
32
173af62f
DG
33/*
34 * Find a consumer_socket in a consumer_output hashtable. Read side lock must
35 * be acquired before calling this function and across use of the
36 * returned consumer_socket.
37 */
38struct consumer_socket *consumer_find_socket(int key,
39 struct consumer_output *consumer)
40{
41 struct lttng_ht_iter iter;
42 struct lttng_ht_node_ulong *node;
43 struct consumer_socket *socket = NULL;
44
45 /* Negative keys are lookup failures */
46 if (key < 0) {
47 return NULL;
48 }
49
50 lttng_ht_lookup(consumer->socks, (void *)((unsigned long) key),
51 &iter);
52 node = lttng_ht_iter_get_node_ulong(&iter);
53 if (node != NULL) {
54 socket = caa_container_of(node, struct consumer_socket, node);
55 }
56
57 return socket;
58}
59
60/*
61 * Allocate a new consumer_socket and return the pointer.
62 */
63struct consumer_socket *consumer_allocate_socket(int fd)
64{
65 struct consumer_socket *socket = NULL;
66
67 socket = zmalloc(sizeof(struct consumer_socket));
68 if (socket == NULL) {
69 PERROR("zmalloc consumer socket");
70 goto error;
71 }
72
73 socket->fd = fd;
74 lttng_ht_node_init_ulong(&socket->node, fd);
75
76error:
77 return socket;
78}
79
80/*
81 * Add consumer socket to consumer output object. Read side lock must be
82 * acquired before calling this function.
83 */
84void consumer_add_socket(struct consumer_socket *sock,
85 struct consumer_output *consumer)
86{
87 assert(sock);
88 assert(consumer);
89
90 lttng_ht_add_unique_ulong(consumer->socks, &sock->node);
91}
92
93/*
94 * Delte consumer socket to consumer output object. Read side lock must be
95 * acquired before calling this function.
96 */
97void consumer_del_socket(struct consumer_socket *sock,
98 struct consumer_output *consumer)
99{
100 int ret;
101 struct lttng_ht_iter iter;
102
103 assert(sock);
104 assert(consumer);
105
106 iter.iter.node = &sock->node.node;
107 ret = lttng_ht_del(consumer->socks, &iter);
108 assert(!ret);
109}
110
111/*
112 * RCU destroy call function.
113 */
114static void destroy_socket_rcu(struct rcu_head *head)
115{
116 struct lttng_ht_node_ulong *node =
117 caa_container_of(head, struct lttng_ht_node_ulong, head);
118 struct consumer_socket *socket =
119 caa_container_of(node, struct consumer_socket, node);
120
121 free(socket);
122}
123
124/*
125 * Destroy and free socket pointer in a call RCU. Read side lock must be
126 * acquired before calling this function.
127 */
128void consumer_destroy_socket(struct consumer_socket *sock)
129{
130 assert(sock);
131
132 /*
133 * We DO NOT close the file descriptor here since it is global to the
134 * session daemon and is closed only if the consumer dies.
135 */
136
137 call_rcu(&sock->node.head, destroy_socket_rcu);
138}
139
00e2e675
DG
140/*
141 * Allocate and assign data to a consumer_output object.
142 *
143 * Return pointer to structure.
144 */
145struct consumer_output *consumer_create_output(enum consumer_dst_type type)
146{
147 struct consumer_output *output = NULL;
148
149 output = zmalloc(sizeof(struct consumer_output));
150 if (output == NULL) {
151 PERROR("zmalloc consumer_output");
152 goto error;
153 }
154
155 /* By default, consumer output is enabled */
156 output->enabled = 1;
157 output->type = type;
158 output->net_seq_index = -1;
173af62f
DG
159
160 output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
00e2e675
DG
161
162error:
163 return output;
164}
165
166/*
167 * Delete the consumer_output object from the list and free the ptr.
168 */
169void consumer_destroy_output(struct consumer_output *obj)
170{
171 if (obj == NULL) {
172 return;
173 }
174
173af62f
DG
175 if (obj->socks) {
176 struct lttng_ht_iter iter;
177 struct consumer_socket *socket;
178
179 cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
180 consumer_destroy_socket(socket);
181 }
00e2e675 182 }
173af62f 183
00e2e675
DG
184 free(obj);
185}
186
187/*
188 * Copy consumer output and returned the newly allocated copy.
189 */
190struct consumer_output *consumer_copy_output(struct consumer_output *obj)
191{
173af62f
DG
192 struct lttng_ht_iter iter;
193 struct consumer_socket *socket, *copy_sock;
00e2e675
DG
194 struct consumer_output *output;
195
196 assert(obj);
197
198 output = consumer_create_output(obj->type);
199 if (output == NULL) {
200 goto error;
201 }
202
203 memcpy(output, obj, sizeof(struct consumer_output));
204
173af62f
DG
205 /* Copy sockets */
206 output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
207
208 cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
209 /* Create new socket object. */
210 copy_sock = consumer_allocate_socket(socket->fd);
211 if (copy_sock == NULL) {
212 goto malloc_error;
213 }
214
215 copy_sock->lock = socket->lock;
216 consumer_add_socket(copy_sock, output);
217 }
218
00e2e675
DG
219error:
220 return output;
173af62f
DG
221
222malloc_error:
223 consumer_destroy_output(output);
224 return NULL;
00e2e675
DG
225}
226
227/*
228 * Set network URI to the consumer output object.
229 *
230 * Return 0 on success. Negative value on error.
231 */
232int consumer_set_network_uri(struct consumer_output *obj,
233 struct lttng_uri *uri)
234{
235 int ret;
236 char tmp_path[PATH_MAX];
237 char hostname[HOST_NAME_MAX];
238 struct lttng_uri *dst_uri = NULL;
239
240 /* Code flow error safety net. */
241 assert(obj);
242 assert(uri);
243
244 switch (uri->stype) {
245 case LTTNG_STREAM_CONTROL:
246 dst_uri = &obj->dst.net.control;
247 obj->dst.net.control_isset = 1;
248 if (uri->port == 0) {
249 /* Assign default port. */
250 uri->port = DEFAULT_NETWORK_CONTROL_PORT;
251 }
252 break;
253 case LTTNG_STREAM_DATA:
254 dst_uri = &obj->dst.net.data;
255 obj->dst.net.data_isset = 1;
256 if (uri->port == 0) {
257 /* Assign default port. */
258 uri->port = DEFAULT_NETWORK_DATA_PORT;
259 }
260 break;
261 default:
262 ERR("Set network uri type unknown %d", uri->stype);
263 goto error;
264 }
265
266 ret = uri_compare(dst_uri, uri);
267 if (!ret) {
268 /* Same URI, don't touch it and return success. */
269 DBG3("URI network compare are the same");
270 goto end;
271 }
272
273 /* URIs were not equal, replacing it. */
274 memset(dst_uri, 0, sizeof(struct lttng_uri));
275 memcpy(dst_uri, uri, sizeof(struct lttng_uri));
276 obj->type = CONSUMER_DST_NET;
277
278 /* Handle subdir and add hostname in front. */
279 if (dst_uri->stype == LTTNG_STREAM_CONTROL) {
280 /* Get hostname to append it in the pathname */
281 ret = gethostname(hostname, sizeof(hostname));
282 if (ret < 0) {
283 PERROR("gethostname. Fallback on default localhost");
284 strncpy(hostname, "localhost", sizeof(hostname));
285 }
286 hostname[sizeof(hostname) - 1] = '\0';
287
288 /* Setup consumer subdir if none present in the control URI */
289 if (strlen(dst_uri->subdir) == 0) {
290 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
291 hostname, obj->subdir);
292 } else {
293 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
294 hostname, dst_uri->subdir);
295 }
296 if (ret < 0) {
297 PERROR("snprintf set consumer uri subdir");
298 goto error;
299 }
300
301 strncpy(obj->subdir, tmp_path, sizeof(obj->subdir));
302 DBG3("Consumer set network uri subdir path %s", tmp_path);
303 }
304
305end:
306 return 0;
307
308error:
309 return -1;
310}
311
312/*
313 * Send file descriptor to consumer via sock.
314 */
315int consumer_send_fds(int sock, int *fds, size_t nb_fd)
316{
317 int ret;
318
319 assert(fds);
320 assert(nb_fd > 0);
321
322 ret = lttcomm_send_fds_unix_sock(sock, fds, nb_fd);
323 if (ret < 0) {
324 PERROR("send consumer fds");
325 goto error;
326 }
327
328error:
329 return ret;
330}
331
332/*
333 * Consumer send channel communication message structure to consumer.
334 */
335int consumer_send_channel(int sock, struct lttcomm_consumer_msg *msg)
336{
337 int ret;
338
339 assert(msg);
340 assert(sock >= 0);
341
342 ret = lttcomm_send_unix_sock(sock, msg,
343 sizeof(struct lttcomm_consumer_msg));
344 if (ret < 0) {
345 PERROR("send consumer channel");
346 goto error;
347 }
348
349error:
350 return ret;
351}
352
353/*
354 * Init channel communication message structure.
355 */
356void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
357 enum lttng_consumer_command cmd,
358 int channel_key,
359 uint64_t max_sb_size,
360 uint64_t mmap_len,
361 const char *name)
362{
363 assert(msg);
364
365 /* TODO: Args validation */
366
367 /* Zeroed structure */
368 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
369
370 /* Send channel */
371 msg->cmd_type = cmd;
372 msg->u.channel.channel_key = channel_key;
373 msg->u.channel.max_sb_size = max_sb_size;
374 msg->u.channel.mmap_len = mmap_len;
375}
376
377/*
378 * Init stream communication message structure.
379 */
380void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
381 enum lttng_consumer_command cmd,
382 int channel_key,
383 int stream_key,
384 uint32_t state,
385 enum lttng_event_output output,
386 uint64_t mmap_len,
387 uid_t uid,
388 gid_t gid,
389 int net_index,
390 unsigned int metadata_flag,
391 const char *name,
392 const char *pathname)
393{
394 assert(msg);
395
396 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
397
398 /* TODO: Args validation */
399
400 msg->cmd_type = cmd;
401 msg->u.stream.channel_key = channel_key;
402 msg->u.stream.stream_key = stream_key;
403 msg->u.stream.state = state;
404 msg->u.stream.output = output;
405 msg->u.stream.mmap_len = mmap_len;
406 msg->u.stream.uid = uid;
407 msg->u.stream.gid = gid;
408 msg->u.stream.net_index = net_index;
409 msg->u.stream.metadata_flag = metadata_flag;
410 strncpy(msg->u.stream.name, name, sizeof(msg->u.stream.name));
411 msg->u.stream.name[sizeof(msg->u.stream.name) - 1] = '\0';
412 strncpy(msg->u.stream.path_name, pathname,
413 sizeof(msg->u.stream.path_name));
414 msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0';
415}
416
417/*
418 * Send stream communication structure to the consumer.
419 */
420int consumer_send_stream(int sock, struct consumer_output *dst,
421 struct lttcomm_consumer_msg *msg, int *fds, size_t nb_fd)
422{
423 int ret;
424
425 assert(msg);
426 assert(dst);
427
428 switch (dst->type) {
429 case CONSUMER_DST_NET:
430 /* Consumer should send the stream on the network. */
431 msg->u.stream.net_index = dst->net_seq_index;
432 break;
433 case CONSUMER_DST_LOCAL:
434 /* Add stream file name to stream path */
435 strncat(msg->u.stream.path_name, "/", sizeof(msg->u.stream.path_name));
436 strncat(msg->u.stream.path_name, msg->u.stream.name,
437 sizeof(msg->u.stream.path_name));
438 msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0';
439 /* Indicate that the stream is NOT network */
440 msg->u.stream.net_index = -1;
441 break;
442 default:
443 ERR("Consumer unknown output type (%d)", dst->type);
444 ret = -1;
445 goto error;
446 }
447
448 /* Send on socket */
449 ret = lttcomm_send_unix_sock(sock, msg,
450 sizeof(struct lttcomm_consumer_msg));
451 if (ret < 0) {
452 PERROR("send consumer stream");
453 goto error;
454 }
455
456 ret = consumer_send_fds(sock, fds, nb_fd);
457 if (ret < 0) {
458 goto error;
459 }
460
461error:
462 return ret;
463}
37278a1e
DG
464
465/*
466 * Send relayd socket to consumer associated with a session name.
467 *
468 * On success return positive value. On error, negative value.
469 */
470int consumer_send_relayd_socket(int consumer_sock,
471 struct lttcomm_sock *sock, struct consumer_output *consumer,
472 enum lttng_stream_type type)
473{
474 int ret;
475 struct lttcomm_consumer_msg msg;
476
477 /* Code flow error. Safety net. */
478 assert(sock);
479 assert(consumer);
480
481 /* Bail out if consumer is disabled */
482 if (!consumer->enabled) {
483 ret = LTTCOMM_OK;
484 goto error;
485 }
486
487 msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET;
488 /*
489 * Assign network consumer output index using the temporary consumer since
490 * this call should only be made from within a set_consumer_uri() function
491 * call in the session daemon.
492 */
493 msg.u.relayd_sock.net_index = consumer->net_seq_index;
494 msg.u.relayd_sock.type = type;
495 memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock));
496
173af62f 497 DBG3("Sending relayd sock info to consumer on %d", consumer_sock);
37278a1e
DG
498 ret = lttcomm_send_unix_sock(consumer_sock, &msg, sizeof(msg));
499 if (ret < 0) {
500 PERROR("send consumer relayd socket info");
501 goto error;
502 }
503
504 DBG3("Sending relayd socket file descriptor to consumer");
505 ret = consumer_send_fds(consumer_sock, &sock->fd, 1);
506 if (ret < 0) {
507 goto error;
508 }
509
510 DBG2("Consumer relayd socket sent");
511
512error:
513 return ret;
514}
173af62f
DG
515
516/*
517 * Send destroy relayd command to consumer.
518 *
519 * On success return positive value. On error, negative value.
520 */
521int consumer_send_destroy_relayd(struct consumer_socket *sock,
522 struct consumer_output *consumer)
523{
524 int ret;
525 struct lttcomm_consumer_msg msg;
526
527 assert(consumer);
528 assert(sock);
529
530 DBG2("Sending destroy relayd command to consumer...");
531
532 /* Bail out if consumer is disabled */
533 if (!consumer->enabled) {
534 ret = LTTCOMM_OK;
535 DBG3("Consumer is disabled");
536 goto error;
537 }
538
539 msg.cmd_type = LTTNG_CONSUMER_DESTROY_RELAYD;
540 msg.u.destroy_relayd.net_seq_idx = consumer->net_seq_index;
541
542 pthread_mutex_lock(sock->lock);
543 ret = lttcomm_send_unix_sock(sock->fd, &msg, sizeof(msg));
544 pthread_mutex_unlock(sock->lock);
545 if (ret < 0) {
546 PERROR("send consumer destroy relayd command");
547 goto error;
548 }
549
550 DBG2("Consumer send destroy relayd command done");
551
552error:
553 return ret;
554}
This page took 0.042171 seconds and 4 git commands to generate.