projects
/
userspace-rcu.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
workqueue: add approximate upper bound to queue length
[userspace-rcu.git]
/
urcu
/
workqueue-fifo.h
diff --git
a/urcu/workqueue-fifo.h
b/urcu/workqueue-fifo.h
index a2bbd909fdb1ae3923c8743d155752dceb016802..13d9278a5beca9baa46ea9ef3cc7f3b1981091e0 100644
(file)
--- a/
urcu/workqueue-fifo.h
+++ b/
urcu/workqueue-fifo.h
@@
-36,6
+36,11
@@
enum urcu_accept_ret {
URCU_ACCEPT_SHUTDOWN = 1,
};
URCU_ACCEPT_SHUTDOWN = 1,
};
+enum urcu_enqueue_ret {
+ URCU_ENQUEUE_OK = 0,
+ URCU_ENQUEUE_FULL = 1,
+};
+
/*
* We use RCU to steal work from siblings. Therefore, one of RCU flavors
* need to be included before this header. All worker that participate
/*
* We use RCU to steal work from siblings. Therefore, one of RCU flavors
* need to be included before this header. All worker that participate
@@
-59,6
+64,9
@@
struct urcu_workqueue {
struct cds_list_head sibling_head;
pthread_mutex_t sibling_lock; /* Protect sibling list updates */
struct cds_list_head sibling_head;
pthread_mutex_t sibling_lock; /* Protect sibling list updates */
+ /* Maximum number of work entries (approximate). 0 means infinite. */
+ unsigned long nr_work_max;
+ unsigned long nr_work; /* Current number of work items */
bool shutdown; /* Shutdown performed */
};
bool shutdown; /* Shutdown performed */
};
@@
-73,6
+81,7
@@
struct urcu_worker {
struct urcu_wait_node wait_node;
/* RCU linked list node of siblings for work stealing. */
struct cds_list_head sibling_node;
struct urcu_wait_node wait_node;
/* RCU linked list node of siblings for work stealing. */
struct cds_list_head sibling_node;
+ struct urcu_workqueue *queue;
int flags; /* enum urcu_worker_flags */
};
int flags; /* enum urcu_worker_flags */
};
@@
-81,20
+90,32
@@
enum urcu_worker_flags {
};
static inline
};
static inline
-void urcu_workqueue_init(struct urcu_workqueue *queue)
+void urcu_workqueue_init(struct urcu_workqueue *queue,
+ unsigned long max_queue_len)
{
__cds_wfcq_init(&queue->head, &queue->tail);
urcu_wait_queue_init(&queue->waitqueue);
CDS_INIT_LIST_HEAD(&queue->sibling_head);
pthread_mutex_init(&queue->sibling_lock, NULL);
{
__cds_wfcq_init(&queue->head, &queue->tail);
urcu_wait_queue_init(&queue->waitqueue);
CDS_INIT_LIST_HEAD(&queue->sibling_head);
pthread_mutex_init(&queue->sibling_lock, NULL);
+ queue->nr_work_max = max_queue_len;
+ queue->nr_work = 0;
queue->shutdown = false;
}
static inline
queue->shutdown = false;
}
static inline
-void urcu_queue_work(struct urcu_workqueue *queue, struct urcu_work *work)
+enum urcu_enqueue_ret urcu_queue_work(struct urcu_workqueue *queue,
+ struct urcu_work *work)
{
bool was_empty;
{
bool was_empty;
-
+ unsigned long nr_work_max;
+
+ nr_work_max = queue->nr_work_max;
+ if (nr_work_max) {
+ /* Approximate max queue size. */
+ if (uatomic_read(&queue->nr_work) >= nr_work_max)
+ return URCU_ENQUEUE_FULL;
+ uatomic_inc(&queue->nr_work);
+ }
cds_wfcq_node_init(&work->node);
/* Enqueue work. */
cds_wfcq_node_init(&work->node);
/* Enqueue work. */
@@
-118,6
+139,7
@@
void urcu_queue_work(struct urcu_workqueue *queue, struct urcu_work *work)
(void) urcu_dequeue_wake_single(&queue->waitqueue);
rcu_read_unlock(); /* Protect stack dequeue */
}
(void) urcu_dequeue_wake_single(&queue->waitqueue);
rcu_read_unlock(); /* Protect stack dequeue */
}
+ return URCU_ENQUEUE_OK;
}
static inline
}
static inline
@@
-133,13
+155,15
@@
void __urcu_workqueue_wakeup_all(struct urcu_workqueue *queue)
}
static inline
}
static inline
-void urcu_worker_init(struct urcu_worker *worker, int flags)
+void urcu_worker_init(struct urcu_workqueue *queue,
+ struct urcu_worker *worker, int flags)
{
cds_wfcq_init(&worker->head, &worker->tail);
worker->flags = flags;
urcu_wait_node_init(&worker->wait_node, URCU_WAIT_RUNNING);
worker->own = NULL;
worker->wait_node.node.next = NULL;
{
cds_wfcq_init(&worker->head, &worker->tail);
worker->flags = flags;
urcu_wait_node_init(&worker->wait_node, URCU_WAIT_RUNNING);
worker->own = NULL;
worker->wait_node.node.next = NULL;
+ worker->queue = queue;
}
static inline
}
static inline
@@
-358,9
+382,9
@@
end:
}
static inline
}
static inline
-enum urcu_accept_ret urcu_accept_work(struct urcu_workqueue *queue,
- struct urcu_worker *worker)
+enum urcu_accept_ret urcu_accept_work(struct urcu_worker *worker)
{
{
+ struct urcu_workqueue *queue = worker->queue;
enum cds_wfcq_ret wfcq_ret;
bool has_work;
enum cds_wfcq_ret wfcq_ret;
bool has_work;
@@
-430,15
+454,15
@@
do_work:
static inline
struct urcu_work *urcu_dequeue_work(struct urcu_worker *worker)
{
static inline
struct urcu_work *urcu_dequeue_work(struct urcu_worker *worker)
{
+ struct urcu_workqueue *queue = worker->queue;
struct cds_wfcq_node *node;
struct cds_wfcq_node *node;
+ struct urcu_work *work;
if (worker->own) {
if (worker->own) {
- struct urcu_work *work;
-
/* Process our own work entry. */
work = worker->own;
worker->own = NULL;
/* Process our own work entry. */
work = worker->own;
worker->own = NULL;
-
return work
;
+
goto end
;
}
/*
* If we are registered for work stealing, we need to dequeue
}
/*
* If we are registered for work stealing, we need to dequeue
@@
-459,7
+483,11
@@
struct urcu_work *urcu_dequeue_work(struct urcu_worker *worker)
}
if (!node)
return NULL;
}
if (!node)
return NULL;
- return caa_container_of(node, struct urcu_work, node);
+ work = caa_container_of(node, struct urcu_work, node);
+end:
+ if (queue->nr_work_max)
+ uatomic_dec(&queue->nr_work);
+ return work;
}
static inline
}
static inline
This page took
0.024815 seconds
and
4
git commands to generate.