ustd: convert to new ustcomm api
[ust.git] / ustd / ustd.c
CommitLineData
1f8b0dff
PMF
1/*
2 * Copyright (C) 2009 Pierre-Marc Fournier
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <http://www.gnu.org/licenses/>.
16 */
17
3796af9b
PMF
18#define _GNU_SOURCE
19
20#include <sys/types.h>
21#include <sys/shm.h>
688760ef
PMF
22#include <fcntl.h>
23#include <unistd.h>
3a7b90de 24#include <pthread.h>
3796af9b
PMF
25
26#include <stdlib.h>
27#include <stdio.h>
28#include <string.h>
29
30#include "localerr.h"
31#include "ustcomm.h"
32
811e4b93
PMF
33struct list_head buffers = LIST_HEAD_INIT(buffers);
34
3796af9b
PMF
35struct buffer_info {
36 char *name;
37 pid_t pid;
4e2a8808 38 struct ustcomm_connection conn;
3796af9b
PMF
39
40 int shmid;
41 void *mem;
42 int memlen;
43
811e4b93
PMF
44 int n_subbufs;
45 int subbuf_size;
46
47 int file_fd; /* output file */
688760ef
PMF
48
49 struct list_head list;
50
51 long consumed_old;
3796af9b
PMF
52};
53
3a7b90de
PMF
54/* return value: 0 = subbuffer is finished, it won't produce data anymore
55 * 1 = got subbuffer successfully
56 * <0 = error
57 */
3796af9b 58
688760ef
PMF
59int get_subbuffer(struct buffer_info *buf)
60{
61 char *send_msg;
62 char *received_msg;
63 char *rep_code;
64 int retval;
65 int result;
66
67 asprintf(&send_msg, "get_subbuffer %s", buf->name);
3bb56863 68 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
688760ef 69 if(result < 0) {
3bb56863 70 ERR("get_subbuffer: ustcomm_send_request failed");
688760ef
PMF
71 return -1;
72 }
73 free(send_msg);
74
75 result = sscanf(received_msg, "%as %ld", &rep_code, &buf->consumed_old);
3a7b90de 76 if(result != 2 && result != 1) {
688760ef
PMF
77 ERR("unable to parse response to get_subbuffer");
78 return -1;
79 }
3a7b90de
PMF
80
81 DBG("received msg is %s", received_msg);
688760ef
PMF
82
83 if(!strcmp(rep_code, "OK")) {
84 DBG("got subbuffer %s", buf->name);
85 retval = 1;
86 }
3a7b90de
PMF
87 else if(nth_token_is(received_msg, "END", 0) == 1) {
88 return 0;
89 }
688760ef 90 else {
3a7b90de
PMF
91 DBG("error getting subbuffer %s", buf->name);
92 retval = -1;
688760ef
PMF
93 }
94
3a7b90de
PMF
95 /* FIMXE: free correctly the stuff */
96 free(received_msg);
688760ef
PMF
97 free(rep_code);
98 return retval;
99}
100
101int put_subbuffer(struct buffer_info *buf)
102{
103 char *send_msg;
104 char *received_msg;
105 char *rep_code;
106 int retval;
107 int result;
108
109 asprintf(&send_msg, "put_subbuffer %s %ld", buf->name, buf->consumed_old);
3bb56863 110 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
688760ef
PMF
111 if(result < 0) {
112 ERR("put_subbuffer: send_message failed");
113 return -1;
114 }
115 free(send_msg);
116
117 result = sscanf(received_msg, "%as", &rep_code);
118 if(result != 1) {
119 ERR("unable to parse response to put_subbuffer");
120 return -1;
121 }
122 free(received_msg);
123
124 if(!strcmp(rep_code, "OK")) {
125 DBG("subbuffer put %s", buf->name);
126 retval = 1;
127 }
128 else {
129 ERR("invalid response to put_subbuffer");
130 }
131
132 free(rep_code);
133 return retval;
134}
135
136ssize_t patient_write(int fd, const void *buf, size_t count)
137{
138 const char *bufc = (const char *) buf;
139 int result;
140
141 for(;;) {
142 result = write(fd, bufc, count);
143 if(result <= 0) {
144 return result;
145 }
146 count -= result;
147 bufc += result;
148
149 if(count == 0) {
150 break;
151 }
152 }
153
154 return bufc-(const char *)buf;
155}
156
3a7b90de
PMF
157void *consumer_thread(void *arg)
158{
159 struct buffer_info *buf = (struct buffer_info *) arg;
160 int result;
161
162 for(;;) {
163 result = get_subbuffer(buf);
164 if(result == -1) {
165 ERR("error getting subbuffer");
166 continue;
167 }
168 if(result == 0) {
169 /* this is done */
170 break;
171 }
172
173 /* write data to file */
174 result = patient_write(buf->file_fd, buf->mem + (buf->consumed_old & (buf->n_subbufs * buf->subbuf_size-1)), buf->subbuf_size);
175 if(result == -1) {
176 PERROR("write");
177 /* FIXME: maybe drop this trace */
178 }
179
180 result = put_subbuffer(buf);
181 if(result == -1) {
182 ERR("error putting subbuffer");
183 break;
184 }
185 }
186
187 DBG("thread for buffer %s is stopping", buf->name);
188
189 return NULL;
190}
191
192int add_buffer(pid_t pid, char *bufname)
193{
194 struct buffer_info *buf;
195 char *send_msg;
196 char *received_msg;
197 int result;
198 char *tmp;
199 int fd;
200 pthread_t thr;
201
202 buf = (struct buffer_info *) malloc(sizeof(struct buffer_info));
203 if(buf == NULL) {
204 ERR("add_buffer: insufficient memory");
205 return -1;
206 }
207
208 buf->name = bufname;
209 buf->pid = pid;
210
4e2a8808
PMF
211 /* connect to app */
212 result = ustcomm_connect_app(buf->pid, &buf->conn);
213 if(result) {
214 ERR("unable to connect to process");
215 return -1;
216 }
217
3a7b90de
PMF
218 /* get shmid */
219 asprintf(&send_msg, "get_shmid %s", buf->name);
4e2a8808 220 ustcomm_send_request(&buf->conn, send_msg, &received_msg);
3a7b90de
PMF
221 free(send_msg);
222 DBG("got buffer name %s", buf->name);
223
224 result = sscanf(received_msg, "%d", &buf->shmid);
225 if(result != 1) {
226 ERR("unable to parse response to get_shmid");
227 return -1;
228 }
229 free(received_msg);
230 DBG("got shmid %d", buf->shmid);
231
232 /* get n_subbufs */
233 asprintf(&send_msg, "get_n_subbufs %s", buf->name);
4e2a8808 234 ustcomm_send_request(&buf->conn, send_msg, &received_msg);
3a7b90de
PMF
235 free(send_msg);
236
237 result = sscanf(received_msg, "%d", &buf->n_subbufs);
238 if(result != 1) {
239 ERR("unable to parse response to get_n_subbufs");
240 return -1;
241 }
242 free(received_msg);
243 DBG("got n_subbufs %d", buf->n_subbufs);
244
245 /* get subbuf size */
246 asprintf(&send_msg, "get_subbuf_size %s", buf->name);
4e2a8808 247 ustcomm_send_request(&buf->conn, send_msg, &received_msg);
3a7b90de
PMF
248 free(send_msg);
249
250 result = sscanf(received_msg, "%d", &buf->subbuf_size);
251 if(result != 1) {
252 ERR("unable to parse response to get_subbuf_size");
253 return -1;
254 }
255 free(received_msg);
256 DBG("got subbuf_size %d", buf->subbuf_size);
257
258 /* attach memory */
259 buf->mem = shmat(buf->shmid, NULL, 0);
260 if(buf->mem == (void *) 0) {
261 perror("shmat");
262 return -1;
263 }
264 DBG("successfully attached memory");
265
266 /* open file for output */
267 asprintf(&tmp, "/tmp/trace/%s_0", buf->name);
268 result = fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC, 00600);
269 if(result == -1) {
270 PERROR("open");
271 return -1;
272 }
273 buf->file_fd = fd;
274 free(tmp);
275
276 //list_add(&buf->list, &buffers);
277
278 pthread_create(&thr, NULL, consumer_thread, buf);
279
280 return 0;
281}
282
3796af9b
PMF
283int main(int argc, char **argv)
284{
285 struct ustcomm_ustd ustd;
286 int result;
287
288 result = ustcomm_init_ustd(&ustd);
289 if(result == -1) {
290 ERR("failed to initialize socket");
291 return 1;
292 }
293
688760ef 294 /* app loop */
3796af9b
PMF
295 for(;;) {
296 char *recvbuf;
297
3a7b90de 298 /* check for requests on our public socket */
688760ef
PMF
299 result = ustcomm_ustd_recv_message(&ustd, &recvbuf, NULL, 100);
300 if(result == -1) {
301 ERR("error in ustcomm_ustd_recv_message");
302 continue;
303 }
304 if(result > 0) {
305 if(!strncmp(recvbuf, "collect", 7)) {
306 pid_t pid;
307 char *bufname;
308 int result;
3796af9b 309
688760ef
PMF
310 result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname);
311 if(result != 2) {
312 fprintf(stderr, "parsing error: %s\n", recvbuf);
313 }
3796af9b 314
688760ef
PMF
315 result = add_buffer(pid, bufname);
316 if(result < 0) {
317 ERR("error in add_buffer");
318 continue;
319 }
3796af9b
PMF
320 }
321
688760ef 322 free(recvbuf);
3796af9b 323 }
3796af9b
PMF
324 }
325
326 return 0;
327}
This page took 0.034268 seconds and 4 git commands to generate.