ust: make lttd work
[ust.git] / ustd / ustd.c
CommitLineData
3796af9b
PMF
1#define _GNU_SOURCE
2
3#include <sys/types.h>
4#include <sys/shm.h>
688760ef
PMF
5#include <fcntl.h>
6#include <unistd.h>
3a7b90de 7#include <pthread.h>
3796af9b
PMF
8
9#include <stdlib.h>
10#include <stdio.h>
11#include <string.h>
12
13#include "localerr.h"
14#include "ustcomm.h"
15
811e4b93
PMF
16struct list_head buffers = LIST_HEAD_INIT(buffers);
17
3796af9b
PMF
18struct buffer_info {
19 char *name;
20 pid_t pid;
21
22 int shmid;
23 void *mem;
24 int memlen;
25
811e4b93
PMF
26 int n_subbufs;
27 int subbuf_size;
28
29 int file_fd; /* output file */
688760ef
PMF
30
31 struct list_head list;
32
33 long consumed_old;
3796af9b
PMF
34};
35
3a7b90de
PMF
36/* return value: 0 = subbuffer is finished, it won't produce data anymore
37 * 1 = got subbuffer successfully
38 * <0 = error
39 */
3796af9b 40
688760ef
PMF
41int get_subbuffer(struct buffer_info *buf)
42{
43 char *send_msg;
44 char *received_msg;
45 char *rep_code;
46 int retval;
47 int result;
48
49 asprintf(&send_msg, "get_subbuffer %s", buf->name);
50 result = send_message(buf->pid, send_msg, &received_msg);
51 if(result < 0) {
52 ERR("get_subbuffer: send_message failed");
53 return -1;
54 }
55 free(send_msg);
56
57 result = sscanf(received_msg, "%as %ld", &rep_code, &buf->consumed_old);
3a7b90de 58 if(result != 2 && result != 1) {
688760ef
PMF
59 ERR("unable to parse response to get_subbuffer");
60 return -1;
61 }
3a7b90de
PMF
62
63 DBG("received msg is %s", received_msg);
688760ef
PMF
64
65 if(!strcmp(rep_code, "OK")) {
66 DBG("got subbuffer %s", buf->name);
67 retval = 1;
68 }
3a7b90de
PMF
69 else if(nth_token_is(received_msg, "END", 0) == 1) {
70 return 0;
71 }
688760ef 72 else {
3a7b90de
PMF
73 DBG("error getting subbuffer %s", buf->name);
74 retval = -1;
688760ef
PMF
75 }
76
3a7b90de
PMF
77 /* FIMXE: free correctly the stuff */
78 free(received_msg);
688760ef
PMF
79 free(rep_code);
80 return retval;
81}
82
83int put_subbuffer(struct buffer_info *buf)
84{
85 char *send_msg;
86 char *received_msg;
87 char *rep_code;
88 int retval;
89 int result;
90
91 asprintf(&send_msg, "put_subbuffer %s %ld", buf->name, buf->consumed_old);
92 result = send_message(buf->pid, send_msg, &received_msg);
93 if(result < 0) {
94 ERR("put_subbuffer: send_message failed");
95 return -1;
96 }
97 free(send_msg);
98
99 result = sscanf(received_msg, "%as", &rep_code);
100 if(result != 1) {
101 ERR("unable to parse response to put_subbuffer");
102 return -1;
103 }
104 free(received_msg);
105
106 if(!strcmp(rep_code, "OK")) {
107 DBG("subbuffer put %s", buf->name);
108 retval = 1;
109 }
110 else {
111 ERR("invalid response to put_subbuffer");
112 }
113
114 free(rep_code);
115 return retval;
116}
117
118ssize_t patient_write(int fd, const void *buf, size_t count)
119{
120 const char *bufc = (const char *) buf;
121 int result;
122
123 for(;;) {
124 result = write(fd, bufc, count);
125 if(result <= 0) {
126 return result;
127 }
128 count -= result;
129 bufc += result;
130
131 if(count == 0) {
132 break;
133 }
134 }
135
136 return bufc-(const char *)buf;
137}
138
3a7b90de
PMF
139void *consumer_thread(void *arg)
140{
141 struct buffer_info *buf = (struct buffer_info *) arg;
142 int result;
143
144 for(;;) {
145 result = get_subbuffer(buf);
146 if(result == -1) {
147 ERR("error getting subbuffer");
148 continue;
149 }
150 if(result == 0) {
151 /* this is done */
152 break;
153 }
154
155 /* write data to file */
156 result = patient_write(buf->file_fd, buf->mem + (buf->consumed_old & (buf->n_subbufs * buf->subbuf_size-1)), buf->subbuf_size);
157 if(result == -1) {
158 PERROR("write");
159 /* FIXME: maybe drop this trace */
160 }
161
162 result = put_subbuffer(buf);
163 if(result == -1) {
164 ERR("error putting subbuffer");
165 break;
166 }
167 }
168
169 DBG("thread for buffer %s is stopping", buf->name);
170
171 return NULL;
172}
173
174int add_buffer(pid_t pid, char *bufname)
175{
176 struct buffer_info *buf;
177 char *send_msg;
178 char *received_msg;
179 int result;
180 char *tmp;
181 int fd;
182 pthread_t thr;
183
184 buf = (struct buffer_info *) malloc(sizeof(struct buffer_info));
185 if(buf == NULL) {
186 ERR("add_buffer: insufficient memory");
187 return -1;
188 }
189
190 buf->name = bufname;
191 buf->pid = pid;
192
193 /* get shmid */
194 asprintf(&send_msg, "get_shmid %s", buf->name);
195 send_message(pid, send_msg, &received_msg);
196 free(send_msg);
197 DBG("got buffer name %s", buf->name);
198
199 result = sscanf(received_msg, "%d", &buf->shmid);
200 if(result != 1) {
201 ERR("unable to parse response to get_shmid");
202 return -1;
203 }
204 free(received_msg);
205 DBG("got shmid %d", buf->shmid);
206
207 /* get n_subbufs */
208 asprintf(&send_msg, "get_n_subbufs %s", buf->name);
209 send_message(pid, send_msg, &received_msg);
210 free(send_msg);
211
212 result = sscanf(received_msg, "%d", &buf->n_subbufs);
213 if(result != 1) {
214 ERR("unable to parse response to get_n_subbufs");
215 return -1;
216 }
217 free(received_msg);
218 DBG("got n_subbufs %d", buf->n_subbufs);
219
220 /* get subbuf size */
221 asprintf(&send_msg, "get_subbuf_size %s", buf->name);
222 send_message(pid, send_msg, &received_msg);
223 free(send_msg);
224
225 result = sscanf(received_msg, "%d", &buf->subbuf_size);
226 if(result != 1) {
227 ERR("unable to parse response to get_subbuf_size");
228 return -1;
229 }
230 free(received_msg);
231 DBG("got subbuf_size %d", buf->subbuf_size);
232
233 /* attach memory */
234 buf->mem = shmat(buf->shmid, NULL, 0);
235 if(buf->mem == (void *) 0) {
236 perror("shmat");
237 return -1;
238 }
239 DBG("successfully attached memory");
240
241 /* open file for output */
242 asprintf(&tmp, "/tmp/trace/%s_0", buf->name);
243 result = fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC, 00600);
244 if(result == -1) {
245 PERROR("open");
246 return -1;
247 }
248 buf->file_fd = fd;
249 free(tmp);
250
251 //list_add(&buf->list, &buffers);
252
253 pthread_create(&thr, NULL, consumer_thread, buf);
254
255 return 0;
256}
257
3796af9b
PMF
258int main(int argc, char **argv)
259{
260 struct ustcomm_ustd ustd;
261 int result;
262
263 result = ustcomm_init_ustd(&ustd);
264 if(result == -1) {
265 ERR("failed to initialize socket");
266 return 1;
267 }
268
688760ef 269 /* app loop */
3796af9b
PMF
270 for(;;) {
271 char *recvbuf;
272
3a7b90de 273 /* check for requests on our public socket */
688760ef
PMF
274 result = ustcomm_ustd_recv_message(&ustd, &recvbuf, NULL, 100);
275 if(result == -1) {
276 ERR("error in ustcomm_ustd_recv_message");
277 continue;
278 }
279 if(result > 0) {
280 if(!strncmp(recvbuf, "collect", 7)) {
281 pid_t pid;
282 char *bufname;
283 int result;
3796af9b 284
688760ef
PMF
285 result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname);
286 if(result != 2) {
287 fprintf(stderr, "parsing error: %s\n", recvbuf);
288 }
3796af9b 289
688760ef
PMF
290 result = add_buffer(pid, bufname);
291 if(result < 0) {
292 ERR("error in add_buffer");
293 continue;
294 }
3796af9b
PMF
295 }
296
688760ef 297 free(recvbuf);
3796af9b 298 }
3796af9b
PMF
299 }
300
301 return 0;
302}
This page took 0.03611 seconds and 4 git commands to generate.