Libevhtp 1.2.13
thread.c
Go to the documentation of this file.
1#define _GNU_SOURCE
2#include <stdio.h>
3#include <stdlib.h>
4#include <stdint.h>
5#include <limits.h>
6#ifndef WIN32
7#include <sys/queue.h>
8#endif
9
10#include <sys/ioctl.h>
11#include <unistd.h>
12#include <pthread.h>
13
14#include <event2/event.h>
15#include <event2/thread.h>
16
17#include "internal.h"
18#include "evhtp/thread.h"
19
20typedef struct evthr_cmd evthr_cmd_t;
21typedef struct evthr_pool_slist evthr_pool_slist_t;
22
23struct evthr_cmd {
24 uint8_t stop;
25 void * args;
26 evthr_cb cb;
27} __attribute__((packed));
28
29TAILQ_HEAD(evthr_pool_slist, evthr);
30
31struct evthr_pool {
32#ifdef EVTHR_SHARED_PIPE
33 int rdr;
34 int wdr;
35#endif
38};
39
40struct evthr {
41 int rdr;
42 int wdr;
43 char err;
44 ev_t * event;
45 evbase_t * evbase;
46 pthread_mutex_t lock;
47 pthread_t * thr;
48 evthr_init_cb init_cb;
49 evthr_exit_cb exit_cb;
50 void * arg;
51 void * aux;
52
53#ifdef EVTHR_SHARED_PIPE
54 int pool_rdr;
55 struct event * shared_pool_ev;
56#endif
57 TAILQ_ENTRY(evthr) next;
58};
59
60#define _evthr_read(thr, cmd, sock) \
61 (recv(sock, cmd, sizeof(evthr_cmd_t), 0) == sizeof(evthr_cmd_t)) ? 1 : 0
62
63static void
64_evthr_read_cmd(evutil_socket_t sock, short which, void * args)
65{
66 evthr_t * thread;
67 evthr_cmd_t cmd;
68 int stopped;
69
70 if (!(thread = (evthr_t *)args)) {
71 return;
72 }
73
74 stopped = 0;
75
76 if (evhtp_likely(_evthr_read(thread, &cmd, sock) == 1)) {
77 stopped = cmd.stop;
78
79 if (evhtp_likely(cmd.cb != NULL)) {
80 (cmd.cb)(thread, cmd.args, thread->arg);
81 }
82 }
83
84 if (evhtp_unlikely(stopped == 1)) {
85 event_base_loopbreak(thread->evbase);
86 }
87
88 return;
89} /* _evthr_read_cmd */
90
91static void *
93{
94 evthr_t * thread;
95
96 if (!(thread = (evthr_t *)args)) {
97 return NULL;
98 }
99
100 if (thread == NULL || thread->thr == NULL) {
101 pthread_exit(NULL);
102 }
103
104 thread->evbase = event_base_new();
105 thread->event = event_new(thread->evbase, thread->rdr,
106 EV_READ | EV_PERSIST, _evthr_read_cmd, args);
107
108 event_add(thread->event, NULL);
109
110#ifdef EVTHR_SHARED_PIPE
111 if (thread->pool_rdr > 0) {
112 thread->shared_pool_ev = event_new(thread->evbase, thread->pool_rdr,
113 EV_READ | EV_PERSIST, _evthr_read_cmd, args);
114 event_add(thread->shared_pool_ev, NULL);
115 }
116
117#endif
118
119 pthread_mutex_lock(&thread->lock);
120 if (thread->init_cb != NULL) {
121 (thread->init_cb)(thread, thread->arg);
122 }
123
124 pthread_mutex_unlock(&thread->lock);
125
126 event_base_loop(thread->evbase, 0);
127
128 pthread_mutex_lock(&thread->lock);
129 if (thread->exit_cb != NULL) {
130 (thread->exit_cb)(thread, thread->arg);
131 }
132
133 pthread_mutex_unlock(&thread->lock);
134
135 if (thread->err == 1) {
136 fprintf(stderr, "FATAL ERROR!\n");
137 }
138
139 pthread_exit(NULL);
140} /* _evthr_loop */
141
142evthr_res
143evthr_defer(evthr_t * thread, evthr_cb cb, void * arg)
144{
145 evthr_cmd_t cmd = {
146 .cb = cb,
147 .args = arg,
148 .stop = 0
149 };
150
151 if (send(thread->wdr, &cmd, sizeof(cmd), 0) <= 0) {
152 return EVTHR_RES_RETRY;
153 }
154
155 return EVTHR_RES_OK;
156}
157
158evthr_res
159evthr_stop(evthr_t * thread)
160{
161 evthr_cmd_t cmd = {
162 .cb = NULL,
163 .args = NULL,
164 .stop = 1
165 };
166
167 if (send(thread->wdr, &cmd, sizeof(evthr_cmd_t), 0) < 0) {
168 return EVTHR_RES_RETRY;
169 }
170
171 pthread_join(*thread->thr, NULL);
172 return EVTHR_RES_OK;
173}
174
175evbase_t *
176evthr_get_base(evthr_t * thr)
177{
178 return thr ? thr->evbase : NULL;
179}
180
181void
182evthr_set_aux(evthr_t * thr, void * aux)
183{
184 if (thr) {
185 thr->aux = aux;
186 }
187}
188
189void *
190evthr_get_aux(evthr_t * thr)
191{
192 return thr ? thr->aux : NULL;
193}
194
195int
196evthr_set_initcb(evthr_t * thr, evthr_init_cb cb)
197{
198 if (thr == NULL) {
199 return -1;
200 }
201
202 thr->init_cb = cb;
203
204 return 0;
205}
206
207int
208evthr_set_exitcb(evthr_t * thr, evthr_exit_cb cb)
209{
210 if (thr == NULL) {
211 return -1;
212 }
213
214 thr->exit_cb = cb;
215
216 return 0;
217}
218
219static evthr_t *
220_evthr_new(evthr_init_cb init_cb, evthr_exit_cb exit_cb, void * args)
221{
222 evthr_t * thread;
223 int fds[2];
224
225 if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == -1) {
226 return NULL;
227 }
228
229 evutil_make_socket_nonblocking(fds[0]);
230 evutil_make_socket_nonblocking(fds[1]);
231
232 if (!(thread = calloc(sizeof(evthr_t), 1))) {
233 return NULL;
234 }
235
236 thread->thr = malloc(sizeof(pthread_t));
237 thread->arg = args;
238 thread->rdr = fds[0];
239 thread->wdr = fds[1];
240
241 thread->init_cb = init_cb;
242 thread->exit_cb = exit_cb;
243
244 if (pthread_mutex_init(&thread->lock, NULL)) {
245 evthr_free(thread);
246 return NULL;
247 }
248
249 return thread;
250} /* evthr_new */
251
252evthr_t *
253evthr_new(evthr_init_cb init_cb, void * args)
254{
255 return _evthr_new(init_cb, NULL, args);
256}
257
258evthr_t *
259evthr_wexit_new(evthr_init_cb init_cb, evthr_exit_cb exit_cb, void * args)
260{
261 return _evthr_new(init_cb, exit_cb, args);
262}
263
264int
265evthr_start(evthr_t * thread)
266{
267 if (thread == NULL || thread->thr == NULL) {
268 return -1;
269 }
270
271 if (pthread_create(thread->thr, NULL, _evthr_loop, (void *)thread)) {
272 return -1;
273 }
274
275 return 0;
276}
277
278void
279evthr_free(evthr_t * thread)
280{
281 if (thread == NULL) {
282 return;
283 }
284
285 if (thread->rdr > 0) {
286 close(thread->rdr);
287 }
288
289 if (thread->wdr > 0) {
290 close(thread->wdr);
291 }
292
293 if (thread->thr) {
294 free(thread->thr);
295 }
296
297 if (thread->event) {
298 event_free(thread->event);
299 }
300
301#ifdef EVTHR_SHARED_PIPE
302 if (thread->shared_pool_ev) {
303 event_free(thread->shared_pool_ev);
304 }
305
306#endif
307
308 if (thread->evbase) {
309 event_base_free(thread->evbase);
310 }
311
312 free(thread);
313} /* evthr_free */
314
315void
316evthr_pool_free(evthr_pool_t * pool)
317{
318 evthr_t * thread;
319 evthr_t * save;
320
321 if (pool == NULL) {
322 return;
323 }
324
325 TAILQ_FOREACH_SAFE(thread, &pool->threads, next, save) {
326 TAILQ_REMOVE(&pool->threads, thread, next);
327
328 evthr_free(thread);
329 }
330
331 free(pool);
332}
333
334evthr_res
335evthr_pool_stop(evthr_pool_t * pool)
336{
337 evthr_t * thr;
338 evthr_t * save;
339
340 if (pool == NULL) {
341 return EVTHR_RES_FATAL;
342 }
343
344 TAILQ_FOREACH_SAFE(thr, &pool->threads, next, save) {
345 evthr_stop(thr);
346 }
347
348 return EVTHR_RES_OK;
349}
350
351static inline int
352get_backlog_(evthr_t * thread)
353{
354 int backlog = 0;
355
356 ioctl(thread->rdr, FIONREAD, &backlog);
357
358 return (int)(backlog / sizeof(evthr_cmd_t));
359}
360
361evthr_res
362evthr_pool_defer(evthr_pool_t * pool, evthr_cb cb, void * arg)
363{
364#ifdef EVTHR_SHARED_PIPE
365 evthr_cmd_t cmd = {
366 .cb = cb,
367 .args = arg,
368 .stop = 0
369 };
370
371 if (evhtp_unlikely(send(pool->wdr, &cmd, sizeof(cmd), 0) == -1)) {
372 return EVTHR_RES_RETRY;
373 }
374
375 return EVTHR_RES_OK;
376#endif
377 evthr_t * thread = NULL;
378 evthr_t * min_thread = NULL;
379 int min_backlog = 0;
380
381 if (pool == NULL) {
382 return EVTHR_RES_FATAL;
383 }
384
385 if (cb == NULL) {
386 return EVTHR_RES_NOCB;
387 }
388
389
390 TAILQ_FOREACH(thread, &pool->threads, next) {
391 int backlog = get_backlog_(thread);
392
393 if (backlog == 0) {
394 min_thread = thread;
395 break;
396 }
397
398 if (min_thread == NULL || backlog < min_backlog) {
399 min_thread = thread;
400 min_backlog = backlog;
401 }
402 }
403
404 return evthr_defer(min_thread, cb, arg);
405} /* evthr_pool_defer */
406
407static evthr_pool_t *
408_evthr_pool_new(int nthreads,
409 evthr_init_cb init_cb,
410 evthr_exit_cb exit_cb,
411 void * shared)
412{
413 evthr_pool_t * pool;
414 int i;
415
416#ifdef EVTHR_SHARED_PIPE
417 int fds[2];
418#endif
419
420 if (nthreads == 0) {
421 return NULL;
422 }
423
424 if (!(pool = calloc(sizeof(evthr_pool_t), 1))) {
425 return NULL;
426 }
427
428 pool->nthreads = nthreads;
429 TAILQ_INIT(&pool->threads);
430
431#ifdef EVTHR_SHARED_PIPE
432 if (evutil_socketpair(AF_UNIX, SOCK_DGRAM, 0, fds) == -1) {
433 return NULL;
434 }
435
436 evutil_make_socket_nonblocking(fds[0]);
437 evutil_make_socket_nonblocking(fds[1]);
438
439 pool->rdr = fds[0];
440 pool->wdr = fds[1];
441#endif
442
443 for (i = 0; i < nthreads; i++) {
444 evthr_t * thread;
445
446 if (!(thread = evthr_wexit_new(init_cb, exit_cb, shared))) {
447 evthr_pool_free(pool);
448 return NULL;
449 }
450
451#ifdef EVTHR_SHARED_PIPE
452 thread->pool_rdr = fds[0];
453#endif
454
455 TAILQ_INSERT_TAIL(&pool->threads, thread, next);
456 }
457
458 return pool;
459} /* _evthr_pool_new */
460
461evthr_pool_t *
462evthr_pool_new(int nthreads, evthr_init_cb init_cb, void * shared)
463{
464 return _evthr_pool_new(nthreads, init_cb, NULL, shared);
465}
466
467evthr_pool_t *
469 evthr_init_cb init_cb,
470 evthr_exit_cb exit_cb, void * shared)
471{
472 return _evthr_pool_new(nthreads, init_cb, exit_cb, shared);
473}
474
475int
476evthr_pool_start(evthr_pool_t * pool)
477{
478 evthr_t * evthr = NULL;
479
480 if (pool == NULL) {
481 return -1;
482 }
483
484 TAILQ_FOREACH(evthr, &pool->threads, next) {
485 if (evthr_start(evthr) < 0) {
486 return -1;
487 }
488
489 usleep(5000);
490 }
491
492 return 0;
493}
#define TAILQ_FOREACH_SAFE(var, head, field, tvar)
Definition: internal.h:22
#define evhtp_unlikely(x)
Definition: internal.h:18
#define evhtp_likely(x)
Definition: internal.h:17
void * args
Definition: thread.c:25
uint8_t stop
Definition: thread.c:24
evthr_cb cb
Definition: thread.c:26
int nthreads
Definition: thread.c:36
evthr_pool_slist_t threads
Definition: thread.c:37
Definition: thread.c:40
evbase_t * evbase
Definition: thread.c:45
void * aux
Definition: thread.c:51
char err
Definition: thread.c:43
evthr_exit_cb exit_cb
Definition: thread.c:49
int rdr
Definition: thread.c:41
pthread_mutex_t lock
Definition: thread.c:46
void * arg
Definition: thread.c:50
int wdr
Definition: thread.c:42
pthread_t * thr
Definition: thread.c:47
ev_t * event
Definition: thread.c:44
evthr_init_cb init_cb
Definition: thread.c:48
int evthr_start(evthr_t *thread)
Definition: thread.c:265
void evthr_set_aux(evthr_t *thr, void *aux)
Definition: thread.c:182
TAILQ_HEAD(evthr_pool_slist, evthr)
static int get_backlog_(evthr_t *thread)
Definition: thread.c:352
evthr_res evthr_stop(evthr_t *thread)
Definition: thread.c:159
evthr_res evthr_defer(evthr_t *thread, evthr_cb cb, void *arg)
Definition: thread.c:143
void evthr_pool_free(evthr_pool_t *pool)
Definition: thread.c:316
evthr_t * evthr_new(evthr_init_cb init_cb, void *args)
Definition: thread.c:253
static void * _evthr_loop(void *args)
Definition: thread.c:92
evthr_pool_t * evthr_pool_new(int nthreads, evthr_init_cb init_cb, void *shared)
Definition: thread.c:462
static void _evthr_read_cmd(evutil_socket_t sock, short which, void *args)
Definition: thread.c:64
evthr_t * evthr_wexit_new(evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *args)
Definition: thread.c:259
void * evthr_get_aux(evthr_t *thr)
Definition: thread.c:190
int evthr_set_initcb(evthr_t *thr, evthr_init_cb cb)
Definition: thread.c:196
struct evthr_pool_slist evthr_pool_slist_t
Definition: thread.c:21
void evthr_free(evthr_t *thread)
Definition: thread.c:279
struct evthr_pool __attribute__
static evthr_pool_t * _evthr_pool_new(int nthreads, evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *shared)
Definition: thread.c:408
evthr_pool_t * evthr_pool_wexit_new(int nthreads, evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *shared)
Definition: thread.c:468
evthr_cb cb
Definition: thread.c:2
evbase_t * evthr_get_base(evthr_t *thr)
Definition: thread.c:176
evthr_res evthr_pool_defer(evthr_pool_t *pool, evthr_cb cb, void *arg)
Definition: thread.c:362
#define _evthr_read(thr, cmd, sock)
Definition: thread.c:60
void * args
Definition: thread.c:1
int evthr_set_exitcb(evthr_t *thr, evthr_exit_cb cb)
Definition: thread.c:208
int evthr_pool_start(evthr_pool_t *pool)
Definition: thread.c:476
evthr_res evthr_pool_stop(evthr_pool_t *pool)
Definition: thread.c:335
static evthr_t * _evthr_new(evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *args)
Definition: thread.c:220