Fix: add missing rcu read side lock
[lttng-tools.git] / src / bin / lttng-sessiond / ust-consumer.c
CommitLineData
48842b30
DG
1/*
2 * Copyright (C) 2011 - David Goulet <david.goulet@polymtl.ca>
3 *
d14d33bf
AM
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License, version 2 only,
6 * as published by the Free Software Foundation.
48842b30 7 *
d14d33bf
AM
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
48842b30 12 *
d14d33bf
AM
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
48842b30
DG
16 */
17
18#define _GNU_SOURCE
19#include <errno.h>
20#include <stdio.h>
21#include <stdlib.h>
22#include <string.h>
23#include <unistd.h>
24
990570ed 25#include <common/common.h>
db758600 26#include <common/consumer.h>
990570ed 27#include <common/defaults.h>
48842b30 28
00e2e675 29#include "consumer.h"
48842b30
DG
30#include "ust-consumer.h"
31
32/*
37278a1e 33 * Send a single channel to the consumer using command ADD_CHANNEL.
48842b30 34 */
f50f23d9
DG
35static int send_channel(struct consumer_socket *sock,
36 struct ust_app_channel *uchan)
48842b30 37{
8010679a 38 int ret, fd;
37278a1e 39 struct lttcomm_consumer_msg msg;
48842b30 40
37278a1e
DG
41 /* Safety net */
42 assert(uchan);
f50f23d9 43 assert(sock);
37278a1e 44
f50f23d9 45 if (sock->fd < 0) {
37278a1e
DG
46 ret = -EINVAL;
47 goto error;
48 }
48842b30 49
37278a1e
DG
50 DBG2("Sending channel %s to UST consumer", uchan->name);
51
52 consumer_init_channel_comm_msg(&msg,
00e2e675
DG
53 LTTNG_CONSUMER_ADD_CHANNEL,
54 uchan->obj->shm_fd,
55 uchan->attr.subbuf_size,
56 uchan->obj->memory_map_size,
c30aaa51
MD
57 uchan->name,
58 uchan->streams.count);
00e2e675 59
ca03de58
DG
60 health_code_update(&health_thread_cmd);
61
37278a1e 62 ret = consumer_send_channel(sock, &msg);
48842b30 63 if (ret < 0) {
48842b30
DG
64 goto error;
65 }
00e2e675 66
ca03de58
DG
67 health_code_update(&health_thread_cmd);
68
8010679a 69 fd = uchan->obj->shm_fd;
00e2e675 70 ret = consumer_send_fds(sock, &fd, 1);
48842b30 71 if (ret < 0) {
48842b30
DG
72 goto error;
73 }
74
ca03de58
DG
75 health_code_update(&health_thread_cmd);
76
37278a1e
DG
77error:
78 return ret;
79}
80
81/*
82 * Send a single stream to the consumer using ADD_STREAM command.
83 */
f50f23d9
DG
84static int send_channel_stream(struct consumer_socket *sock,
85 struct ust_app_channel *uchan, struct ust_app_session *usess,
030a66fa 86 struct ust_app_stream *stream, struct consumer_output *consumer,
f50f23d9 87 const char *pathname)
37278a1e
DG
88{
89 int ret, fds[2];
90 struct lttcomm_consumer_msg msg;
91
92 /* Safety net */
93 assert(uchan);
94 assert(usess);
95 assert(stream);
96 assert(consumer);
f50f23d9 97 assert(sock);
37278a1e
DG
98
99 DBG2("Sending stream %d of channel %s to kernel consumer",
100 stream->obj->shm_fd, uchan->name);
101
102 consumer_init_stream_comm_msg(&msg,
103 LTTNG_CONSUMER_ADD_STREAM,
104 uchan->obj->shm_fd,
105 stream->obj->shm_fd,
106 LTTNG_CONSUMER_ACTIVE_STREAM,
107 DEFAULT_UST_CHANNEL_OUTPUT,
108 stream->obj->memory_map_size,
109 usess->uid,
110 usess->gid,
111 consumer->net_seq_index,
112 0, /* Metadata flag unset */
113 stream->name,
ca22feea
DG
114 pathname,
115 usess->id);
37278a1e 116
ca03de58
DG
117 health_code_update(&health_thread_cmd);
118
37278a1e
DG
119 /* Send stream and file descriptor */
120 fds[0] = stream->obj->shm_fd;
121 fds[1] = stream->obj->wait_fd;
122 ret = consumer_send_stream(sock, consumer, &msg, fds, 2);
123 if (ret < 0) {
124 goto error;
125 }
126
ca03de58
DG
127 health_code_update(&health_thread_cmd);
128
37278a1e
DG
129error:
130 return ret;
131}
132
133/*
134 * Send all stream fds of UST channel to the consumer.
135 */
f50f23d9 136static int send_channel_streams(struct consumer_socket *sock,
37278a1e
DG
137 struct ust_app_channel *uchan, struct ust_app_session *usess,
138 struct consumer_output *consumer)
139{
140 int ret;
141 char tmp_path[PATH_MAX];
142 const char *pathname;
030a66fa 143 struct ust_app_stream *stream, *tmp;
37278a1e 144
f50f23d9
DG
145 assert(sock);
146
37278a1e
DG
147 DBG("Sending streams of channel %s to UST consumer", uchan->name);
148
149 ret = send_channel(sock, uchan);
150 if (ret < 0) {
151 goto error;
152 }
153
00e2e675
DG
154 /* Get the right path name destination */
155 if (consumer->type == CONSUMER_DST_LOCAL) {
156 /* Set application path to the destination path */
a4b92340
DG
157 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s/%s",
158 consumer->dst.trace_path, consumer->subdir, usess->path);
00e2e675
DG
159 if (ret < 0) {
160 PERROR("snprintf stream path");
161 goto error;
162 }
163 pathname = tmp_path;
164 DBG3("UST local consumer tracefile path: %s", pathname);
165 } else {
166 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
37278a1e 167 consumer->subdir, usess->path);
00e2e675
DG
168 if (ret < 0) {
169 PERROR("snprintf stream path");
170 goto error;
171 }
172 pathname = tmp_path;
173 DBG3("UST network consumer subdir path: %s", pathname);
174 }
175
d80a6244 176 cds_list_for_each_entry_safe(stream, tmp, &uchan->streams.head, list) {
48842b30 177 if (!stream->obj->shm_fd) {
5af2f756 178 continue;
48842b30 179 }
48842b30 180
37278a1e 181 ret = send_channel_stream(sock, uchan, usess, stream, consumer,
00e2e675 182 pathname);
48842b30 183 if (ret < 0) {
48842b30
DG
184 goto error;
185 }
48842b30 186 }
48842b30 187
00e2e675 188 DBG("UST consumer channel streams sent");
48842b30
DG
189
190 return 0;
191
192error:
193 return ret;
194}
195
196/*
37278a1e 197 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
48842b30 198 */
f50f23d9
DG
199static int send_metadata(struct consumer_socket *sock,
200 struct ust_app_session *usess, struct consumer_output *consumer)
48842b30 201{
37278a1e 202 int ret, fd, fds[2];
00e2e675
DG
203 char tmp_path[PATH_MAX];
204 const char *pathname;
37278a1e 205 struct lttcomm_consumer_msg msg;
48842b30 206
37278a1e
DG
207 /* Safety net */
208 assert(usess);
209 assert(consumer);
f50f23d9 210 assert(sock);
48842b30 211
f50f23d9
DG
212 if (sock->fd < 0) {
213 ERR("Consumer socket is negative (%d)", sock->fd);
7753dea8
MD
214 return -EINVAL;
215 }
216
37278a1e
DG
217 if (usess->metadata->obj->shm_fd == 0) {
218 ERR("Metadata obj shm_fd is 0");
219 ret = -1;
220 goto error;
221 }
48842b30 222
37278a1e 223 DBG("UST consumer sending metadata stream fd");
00e2e675 224
37278a1e
DG
225 consumer_init_channel_comm_msg(&msg,
226 LTTNG_CONSUMER_ADD_CHANNEL,
227 usess->metadata->obj->shm_fd,
228 usess->metadata->attr.subbuf_size,
229 usess->metadata->obj->memory_map_size,
c30aaa51
MD
230 "metadata",
231 1);
37278a1e 232
ca03de58
DG
233 health_code_update(&health_thread_cmd);
234
37278a1e
DG
235 ret = consumer_send_channel(sock, &msg);
236 if (ret < 0) {
237 goto error;
238 }
239
ca03de58
DG
240 health_code_update(&health_thread_cmd);
241
37278a1e
DG
242 /* Sending metadata shared memory fd */
243 fd = usess->metadata->obj->shm_fd;
244 ret = consumer_send_fds(sock, &fd, 1);
245 if (ret < 0) {
246 goto error;
247 }
00e2e675 248
ca03de58
DG
249 health_code_update(&health_thread_cmd);
250
37278a1e
DG
251 /* Get correct path name destination */
252 if (consumer->type == CONSUMER_DST_LOCAL) {
253 /* Set application path to the destination path */
a4b92340
DG
254 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s/%s",
255 consumer->dst.trace_path, consumer->subdir, usess->path);
48842b30 256 if (ret < 0) {
37278a1e 257 PERROR("snprintf stream path");
48842b30
DG
258 goto error;
259 }
37278a1e 260 pathname = tmp_path;
48842b30 261
37278a1e
DG
262 /* Create directory */
263 ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG,
264 usess->uid, usess->gid);
265 if (ret < 0) {
266 if (ret != -EEXIST) {
267 ERR("Trace directory creation error");
00e2e675
DG
268 goto error;
269 }
48842b30 270 }
37278a1e
DG
271 } else {
272 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
273 consumer->subdir, usess->path);
48842b30 274 if (ret < 0) {
37278a1e 275 PERROR("snprintf metadata path");
48842b30
DG
276 goto error;
277 }
37278a1e
DG
278 pathname = tmp_path;
279 }
280
281 consumer_init_stream_comm_msg(&msg,
282 LTTNG_CONSUMER_ADD_STREAM,
283 usess->metadata->obj->shm_fd,
284 usess->metadata->stream_obj->shm_fd,
285 LTTNG_CONSUMER_ACTIVE_STREAM,
286 DEFAULT_UST_CHANNEL_OUTPUT,
287 usess->metadata->stream_obj->memory_map_size,
288 usess->uid,
289 usess->gid,
290 consumer->net_seq_index,
291 1, /* Flag metadata set */
292 "metadata",
ca22feea
DG
293 pathname,
294 usess->id);
37278a1e 295
ca03de58
DG
296 health_code_update(&health_thread_cmd);
297
37278a1e
DG
298 /* Send stream and file descriptor */
299 fds[0] = usess->metadata->stream_obj->shm_fd;
300 fds[1] = usess->metadata->stream_obj->wait_fd;
301 ret = consumer_send_stream(sock, consumer, &msg, fds, 2);
302 if (ret < 0) {
303 goto error;
304 }
305
ca03de58
DG
306 health_code_update(&health_thread_cmd);
307
37278a1e
DG
308error:
309 return ret;
310}
311
312/*
313 * Send all stream fds of the UST session to the consumer.
314 */
173af62f
DG
315int ust_consumer_send_session(struct ust_app_session *usess,
316 struct consumer_output *consumer, struct consumer_socket *sock)
37278a1e
DG
317{
318 int ret = 0;
37278a1e
DG
319 struct lttng_ht_iter iter;
320 struct ust_app_channel *ua_chan;
321
173af62f 322 assert(usess);
a4b92340
DG
323
324 if (consumer == NULL || sock == NULL) {
325 /* There is no consumer so just ignoring the command. */
326 DBG("UST consumer does not exist. Not sending streams");
327 return 0;
328 }
37278a1e 329
173af62f
DG
330 DBG("Sending metadata stream fd to consumer on %d", sock->fd);
331
332 pthread_mutex_lock(sock->lock);
37278a1e
DG
333
334 /* Sending metadata information to the consumer */
f50f23d9 335 ret = send_metadata(sock, usess, consumer);
37278a1e
DG
336 if (ret < 0) {
337 goto error;
48842b30
DG
338 }
339
340 /* Send each channel fd streams of session */
341 rcu_read_lock();
bec39940
DG
342 cds_lfht_for_each_entry(usess->channels->ht, &iter.iter, ua_chan,
343 node.node) {
aeb96892
DG
344 /*
345 * Indicate that the channel was not created on the tracer side so skip
346 * sending unexisting streams.
347 */
348 if (ua_chan->obj == NULL) {
349 continue;
350 }
351
f50f23d9 352 ret = send_channel_streams(sock, ua_chan, usess, consumer);
48842b30 353 if (ret < 0) {
5485f822 354 rcu_read_unlock();
48842b30
DG
355 goto error;
356 }
48842b30
DG
357 }
358 rcu_read_unlock();
359
360 DBG("consumer fds (metadata and channel streams) sent");
361
173af62f
DG
362 /* All good! */
363 ret = 0;
48842b30
DG
364
365error:
173af62f 366 pthread_mutex_unlock(sock->lock);
48842b30
DG
367 return ret;
368}
This page took 0.046055 seconds and 4 git commands to generate.