Tests: Fix nprocesses applications shutdown
[lttng-tools.git] / src / bin / lttng-sessiond / ust-consumer.c
CommitLineData
48842b30
DG
1/*
2 * Copyright (C) 2011 - David Goulet <david.goulet@polymtl.ca>
3 *
d14d33bf
AM
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License, version 2 only,
6 * as published by the Free Software Foundation.
48842b30 7 *
d14d33bf
AM
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
48842b30 12 *
d14d33bf
AM
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
48842b30
DG
16 */
17
18#define _GNU_SOURCE
19#include <errno.h>
20#include <stdio.h>
21#include <stdlib.h>
22#include <string.h>
23#include <unistd.h>
d88aee68 24#include <inttypes.h>
48842b30 25
990570ed 26#include <common/common.h>
db758600 27#include <common/consumer.h>
990570ed 28#include <common/defaults.h>
48842b30 29
00e2e675 30#include "consumer.h"
7972aab2 31#include "health.h"
48842b30
DG
32#include "ust-consumer.h"
33
34/*
ffe60014
DG
35 * Return allocated full pathname of the session using the consumer trace path
36 * and subdir if available. On a successful allocation, the directory of the
37 * trace is created with the session credentials.
38 *
39 * The caller can safely free(3) the returned value. On error, NULL is
40 * returned.
48842b30 41 */
ffe60014
DG
42static char *setup_trace_path(struct consumer_output *consumer,
43 struct ust_app_session *ua_sess)
48842b30 44{
ffe60014
DG
45 int ret;
46 char *pathname;
37278a1e 47
ffe60014
DG
48 assert(consumer);
49 assert(ua_sess);
00e2e675 50
840cb59c 51 health_code_update();
ca03de58 52
ffe60014
DG
53 /* Allocate our self the string to make sure we never exceed PATH_MAX. */
54 pathname = zmalloc(PATH_MAX);
55 if (!pathname) {
48842b30
DG
56 goto error;
57 }
00e2e675 58
ffe60014
DG
59 /* Get correct path name destination */
60 if (consumer->type == CONSUMER_DST_LOCAL) {
61 /* Set application path to the destination path */
62 ret = snprintf(pathname, PATH_MAX, "%s/%s/%s",
63 consumer->dst.trace_path, consumer->subdir, ua_sess->path);
64 if (ret < 0) {
65 PERROR("snprintf channel path");
66 goto error;
67 }
ca03de58 68
ffe60014 69 /* Create directory. Ignore if exist. */
7972aab2
DG
70 ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG,
71 ua_sess->euid, ua_sess->egid);
ffe60014
DG
72 if (ret < 0) {
73 if (ret != -EEXIST) {
74 ERR("Trace directory creation error");
75 goto error;
76 }
77 }
78 } else {
79 ret = snprintf(pathname, PATH_MAX, "%s/%s", consumer->subdir,
80 ua_sess->path);
81 if (ret < 0) {
82 PERROR("snprintf channel path");
83 goto error;
84 }
48842b30
DG
85 }
86
ffe60014 87 return pathname;
ca03de58 88
37278a1e 89error:
ffe60014
DG
90 free(pathname);
91 return NULL;
37278a1e
DG
92}
93
94/*
ffe60014
DG
95 * Send a single channel to the consumer using command ADD_CHANNEL.
96 *
7972aab2 97 * Consumer socket lock MUST be acquired before calling this.
37278a1e 98 */
ffe60014
DG
99static int ask_channel_creation(struct ust_app_session *ua_sess,
100 struct ust_app_channel *ua_chan, struct consumer_output *consumer,
7972aab2 101 struct consumer_socket *socket, struct ust_registry_session *registry)
37278a1e 102{
ffe60014 103 int ret;
7972aab2
DG
104 uint32_t chan_id;
105 uint64_t key, chan_reg_key;
ffe60014 106 char *pathname = NULL;
37278a1e 107 struct lttcomm_consumer_msg msg;
7972aab2 108 struct ust_registry_channel *chan_reg;
37278a1e 109
ffe60014
DG
110 assert(ua_sess);
111 assert(ua_chan);
112 assert(socket);
37278a1e 113 assert(consumer);
7972aab2 114 assert(registry);
ffe60014
DG
115
116 DBG2("Asking UST consumer for channel");
117
118 /* Get and create full trace path of session. */
119 pathname = setup_trace_path(consumer, ua_sess);
120 if (!pathname) {
121 ret = -1;
122 goto error;
123 }
124
7972aab2
DG
125 /* Depending on the buffer type, a different channel key is used. */
126 if (ua_sess->buffer_type == LTTNG_BUFFER_PER_UID) {
127 chan_reg_key = ua_chan->tracing_channel_id;
128 } else {
129 chan_reg_key = ua_chan->key;
130 }
131
132 if (ua_chan->attr.type == LTTNG_UST_CHAN_METADATA) {
133 chan_id = -1U;
134 } else {
135 chan_reg = ust_registry_channel_find(registry, chan_reg_key);
136 assert(chan_reg);
137 chan_id = chan_reg->chan_id;
138 }
139
ffe60014
DG
140 consumer_init_ask_channel_comm_msg(&msg,
141 ua_chan->attr.subbuf_size,
142 ua_chan->attr.num_subbuf,
143 ua_chan->attr.overwrite,
144 ua_chan->attr.switch_timer_interval,
145 ua_chan->attr.read_timer_interval,
146 (int) ua_chan->attr.output,
147 (int) ua_chan->attr.type,
7972aab2 148 ua_sess->tracing_id,
ca22feea 149 pathname,
ffe60014 150 ua_chan->name,
7972aab2
DG
151 ua_sess->euid,
152 ua_sess->egid,
ffe60014
DG
153 consumer->net_seq_index,
154 ua_chan->key,
7972aab2
DG
155 registry->uuid,
156 chan_id);
37278a1e 157
840cb59c 158 health_code_update();
ca03de58 159
ffe60014 160 ret = lttcomm_send_unix_sock(socket->fd, &msg, sizeof(msg));
37278a1e
DG
161 if (ret < 0) {
162 goto error;
163 }
164
ffe60014
DG
165 ret = consumer_recv_status_channel(socket, &key,
166 &ua_chan->expected_stream_count);
167 if (ret < 0) {
168 goto error;
169 }
170 /* Communication protocol error. */
171 assert(key == ua_chan->key);
172 /* We need at least one where 1 stream for 1 cpu. */
173 assert(ua_chan->expected_stream_count > 0);
174
d88aee68 175 DBG2("UST ask channel %" PRIu64 " successfully done with %u stream(s)", key,
ffe60014 176 ua_chan->expected_stream_count);
ca03de58 177
37278a1e 178error:
ffe60014
DG
179 free(pathname);
180 health_code_update();
37278a1e
DG
181 return ret;
182}
183
184/*
ffe60014
DG
185 * Ask consumer to create a channel for a given session.
186 *
187 * Returns 0 on success else a negative value.
37278a1e 188 */
ffe60014
DG
189int ust_consumer_ask_channel(struct ust_app_session *ua_sess,
190 struct ust_app_channel *ua_chan, struct consumer_output *consumer,
7972aab2 191 struct consumer_socket *socket, struct ust_registry_session *registry)
37278a1e
DG
192{
193 int ret;
37278a1e 194
ffe60014
DG
195 assert(ua_sess);
196 assert(ua_chan);
197 assert(consumer);
198 assert(socket);
199 assert(socket->fd >= 0);
7972aab2 200 assert(registry);
f50f23d9 201
ffe60014 202 pthread_mutex_lock(socket->lock);
37278a1e 203
7972aab2 204 ret = ask_channel_creation(ua_sess, ua_chan, consumer, socket, registry);
37278a1e
DG
205 if (ret < 0) {
206 goto error;
207 }
208
48842b30 209error:
ffe60014 210 pthread_mutex_unlock(socket->lock);
48842b30
DG
211 return ret;
212}
213
214/*
ffe60014
DG
215 * Send a get channel command to consumer using the given channel key. The
216 * channel object is populated and the stream list.
217 *
218 * Return 0 on success else a negative value.
48842b30 219 */
ffe60014
DG
220int ust_consumer_get_channel(struct consumer_socket *socket,
221 struct ust_app_channel *ua_chan)
48842b30 222{
ffe60014 223 int ret;
37278a1e 224 struct lttcomm_consumer_msg msg;
48842b30 225
ffe60014
DG
226 assert(ua_chan);
227 assert(socket);
228 assert(socket->fd >= 0);
48842b30 229
ffe60014
DG
230 msg.cmd_type = LTTNG_CONSUMER_GET_CHANNEL;
231 msg.u.get_channel.key = ua_chan->key;
37278a1e 232
ffe60014 233 pthread_mutex_lock(socket->lock);
840cb59c 234 health_code_update();
ca03de58 235
ffe60014
DG
236 /* Send command and wait for OK reply. */
237 ret = consumer_send_msg(socket, &msg);
37278a1e
DG
238 if (ret < 0) {
239 goto error;
240 }
241
ffe60014
DG
242 /* First, get the channel from consumer. */
243 ret = ustctl_recv_channel_from_consumer(socket->fd, &ua_chan->obj);
37278a1e 244 if (ret < 0) {
ffe60014
DG
245 if (ret != -EPIPE) {
246 ERR("Error recv channel from consumer %d with ret %d",
247 socket->fd, ret);
248 } else {
249 DBG3("UST app recv channel from consumer. Consumer is dead.");
250 }
37278a1e
DG
251 goto error;
252 }
00e2e675 253
ffe60014
DG
254 /* Next, get all streams. */
255 while (1) {
256 struct ust_app_stream *stream;
ca03de58 257
ffe60014
DG
258 /* Create UST stream */
259 stream = ust_app_alloc_stream();
260 if (stream == NULL) {
261 ret = -ENOMEM;
48842b30
DG
262 goto error;
263 }
264
ffe60014
DG
265 /* Stream object is populated by this call if successful. */
266 ret = ustctl_recv_stream_from_consumer(socket->fd, &stream->obj);
37278a1e 267 if (ret < 0) {
ffe60014
DG
268 free(stream);
269 if (ret == -LTTNG_UST_ERR_NOENT) {
270 DBG3("UST app consumer has no more stream available");
271 ret = 0;
272 break;
273 }
274 if (ret != -EPIPE) {
275 ERR("Recv stream from consumer %d with ret %d",
276 socket->fd, ret);
277 } else {
278 DBG3("UST app recv stream from consumer. Consumer is dead.");
00e2e675 279 }
48842b30
DG
280 goto error;
281 }
37278a1e 282
ffe60014
DG
283 /* Order is important this is why a list is used. */
284 cds_list_add_tail(&stream->list, &ua_chan->streams.head);
285 ua_chan->streams.count++;
37278a1e 286
ffe60014
DG
287 DBG2("UST app stream %d received succesfully", ua_chan->streams.count);
288 }
289
290 /* This MUST match or else we have a synchronization problem. */
291 assert(ua_chan->expected_stream_count == ua_chan->streams.count);
ca03de58 292
ffe60014
DG
293 /* Wait for confirmation that we can proceed with the streams. */
294 ret = consumer_recv_status_reply(socket);
37278a1e
DG
295 if (ret < 0) {
296 goto error;
297 }
298
299error:
ffe60014
DG
300 health_code_update();
301 pthread_mutex_unlock(socket->lock);
37278a1e
DG
302 return ret;
303}
304
305/*
ffe60014
DG
306 * Send a destroy channel command to consumer using the given channel key.
307 *
308 * Note that this command MUST be used prior to a successful
309 * LTTNG_CONSUMER_GET_CHANNEL because once this command is done successfully,
310 * the streams are dispatched to the consumer threads and MUST be teardown
311 * through the hang up process.
312 *
313 * Return 0 on success else a negative value.
37278a1e 314 */
ffe60014
DG
315int ust_consumer_destroy_channel(struct consumer_socket *socket,
316 struct ust_app_channel *ua_chan)
37278a1e 317{
ffe60014
DG
318 int ret;
319 struct lttcomm_consumer_msg msg;
a4b92340 320
ffe60014
DG
321 assert(ua_chan);
322 assert(socket);
323 assert(socket->fd >= 0);
37278a1e 324
ffe60014
DG
325 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
326 msg.u.destroy_channel.key = ua_chan->key;
173af62f 327
ffe60014
DG
328 pthread_mutex_lock(socket->lock);
329 health_code_update();
37278a1e 330
ffe60014 331 ret = consumer_send_msg(socket, &msg);
37278a1e
DG
332 if (ret < 0) {
333 goto error;
48842b30
DG
334 }
335
ffe60014
DG
336error:
337 health_code_update();
338 pthread_mutex_unlock(socket->lock);
339 return ret;
340}
aeb96892 341
ffe60014
DG
342/*
343 * Send a given stream to UST tracer.
344 *
345 * On success return 0 else a negative value.
346 */
347int ust_consumer_send_stream_to_ust(struct ust_app *app,
348 struct ust_app_channel *channel, struct ust_app_stream *stream)
349{
350 int ret;
351
352 assert(app);
353 assert(stream);
354 assert(channel);
355
356 DBG2("UST consumer send stream to app %d", app->sock);
357
358 /* Relay stream to application. */
359 ret = ustctl_send_stream_to_ust(app->sock, channel->obj, stream->obj);
360 if (ret < 0) {
361 if (ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) {
362 ERR("Error ustctl send stream %s to app pid: %d with ret %d",
363 stream->name, app->pid, ret);
364 } else {
365 DBG3("UST app send stream to ust failed. Application is dead.");
48842b30 366 }
ffe60014 367 goto error;
48842b30 368 }
d0b96690 369 channel->handle = channel->obj->handle;
48842b30 370
ffe60014
DG
371error:
372 return ret;
373}
374
375/*
376 * Send channel previously received from the consumer to the UST tracer.
377 *
378 * On success return 0 else a negative value.
379 */
380int ust_consumer_send_channel_to_ust(struct ust_app *app,
381 struct ust_app_session *ua_sess, struct ust_app_channel *channel)
382{
383 int ret;
384
385 assert(app);
386 assert(ua_sess);
387 assert(channel);
388 assert(channel->obj);
389
7972aab2
DG
390 DBG2("UST app send channel to sock %d pid %d (name: %s, key: %" PRIu64 ")",
391 app->sock, app->pid, channel->name, channel->tracing_channel_id);
48842b30 392
ffe60014
DG
393 /* Send stream to application. */
394 ret = ustctl_send_channel_to_ust(app->sock, ua_sess->handle, channel->obj);
395 if (ret < 0) {
396 if (ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) {
397 ERR("Error ustctl send channel %s to app pid: %d with ret %d",
398 channel->name, app->pid, ret);
399 } else {
400 DBG3("UST app send channel to ust failed. Application is dead.");
401 }
402 goto error;
403 }
48842b30
DG
404
405error:
406 return ret;
407}
This page took 0.07791 seconds and 4 git commands to generate.