unsigned *processed;
};
+static pthread_mutex_t data_request = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t data_ready = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t data_provider = PTHREAD_MUTEX_INITIALIZER;
+static struct thread_params *data_requester;
+
static void *threaded_find_deltas(void *arg)
{
- struct thread_params *p = arg;
- if (p->list_size)
- find_deltas(p->list, p->list_size,
- p->window, p->depth, p->processed);
- return NULL;
+ struct thread_params *me = arg;
+
+ for (;;) {
+ pthread_mutex_lock(&data_request);
+ data_requester = me;
+ pthread_mutex_unlock(&data_provider);
+ pthread_mutex_lock(&data_ready);
+
+ if (!me->list_size)
+ return NULL;
+
+ find_deltas(me->list, me->list_size,
+ me->window, me->depth, me->processed);
+ }
}
-#define NR_THREADS 8
+#define NR_THREADS 4
static void ll_find_deltas(struct object_entry **list, unsigned list_size,
int window, int depth, unsigned *processed)
{
struct thread_params p[NR_THREADS];
int i, ret;
+ unsigned chunk_size;
+
+ pthread_mutex_lock(&data_provider);
+ pthread_mutex_lock(&data_ready);
for (i = 0; i < NR_THREADS; i++) {
- unsigned sublist_size = list_size / (NR_THREADS - i);
- p[i].list = list;
- p[i].list_size = sublist_size;
p[i].window = window;
p[i].depth = depth;
p[i].processed = processed;
threaded_find_deltas, &p[i]);
if (ret)
die("unable to create thread: %s", strerror(ret));
- list += sublist_size;
- list_size -= sublist_size;
}
- for (i = 0; i < NR_THREADS; i++) {
- pthread_join(p[i].thread, NULL);
- }
+ /* this should be auto-tuned somehow */
+ chunk_size = window * 1000;
+
+ do {
+ unsigned sublist_size = chunk_size;
+ if (sublist_size > list_size)
+ sublist_size = list_size;
+
+ pthread_mutex_lock(&data_provider);
+ data_requester->list = list;
+ data_requester->list_size = sublist_size;
+ pthread_mutex_unlock(&data_ready);
+
+ list += sublist_size;
+ list_size -= sublist_size;
+ if (!sublist_size) {
+ pthread_join(data_requester->thread, NULL);
+ i--;
+ }
+ pthread_mutex_unlock(&data_request);
+ } while (i);
}
#else