diff options
Diffstat (limited to '')
-rw-r--r-- | pigz/yarn.c | 365 |
1 files changed, 365 insertions, 0 deletions
diff --git a/pigz/yarn.c b/pigz/yarn.c new file mode 100644 index 000000000..0562e7010 --- /dev/null +++ b/pigz/yarn.c @@ -0,0 +1,365 @@ +/* yarn.c -- generic thread operations implemented using pthread functions + * Copyright (C) 2008 Mark Adler + * Version 1.1 26 Oct 2008 Mark Adler + * For conditions of distribution and use, see copyright notice in yarn.h + */ + +/* Basic thread operations implemented using the POSIX pthread library. All + pthread references are isolated within this module to allow alternate + implementations with other thread libraries. See yarn.h for the description + of these operations. */ + +/* Version history: + 1.0 19 Oct 2008 First version + 1.1 26 Oct 2008 No need to set the stack size -- remove + Add yarn_abort() function for clean-up on error exit + */ + +/* for thread portability */ +#define _POSIX_PTHREAD_SEMANTICS +#define _REENTRANT + +/* external libraries and entities referenced */ +#include <stdio.h> /* fprintf(), stderr */ +#include <stdlib.h> /* exit(), malloc(), free(), NULL */ +#include <pthread.h> /* pthread_t, pthread_create(), pthread_join(), */ + /* pthread_attr_t, pthread_attr_init(), pthread_attr_destroy(), + PTHREAD_CREATE_JOINABLE, pthread_attr_setdetachstate(), + pthread_self(), pthread_equal(), + pthread_mutex_t, PTHREAD_MUTEX_INITIALIZER, pthread_mutex_init(), + pthread_mutex_lock(), pthread_mutex_unlock(), pthread_mutex_destroy(), + pthread_cond_t, PTHREAD_COND_INITIALIZER, pthread_cond_init(), + pthread_cond_broadcast(), pthread_cond_wait(), pthread_cond_destroy() */ +#include <errno.h> /* ENOMEM, EAGAIN, EINVAL */ + +/* interface definition */ +#include "yarn.h" + +/* constants */ +#define local static /* for non-exported functions and globals */ + +/* error handling external globals, resettable by application */ +char *yarn_prefix = "yarn"; +void (*yarn_abort)(int) = NULL; + + +/* immediately exit -- use for errors that shouldn't ever happen */ +local void fail(int err) +{ + fprintf(stderr, "%s: %s (%d) -- aborting\n", yarn_prefix, + err == ENOMEM ? "out of memory" : "internal pthread error", err); + if (yarn_abort != NULL) + yarn_abort(err); + exit(err == ENOMEM || err == EAGAIN ? err : EINVAL); +} + +/* memory handling routines provided by user -- if none are provided, malloc() + and free() are used, which are therefore assumed to be thread-safe */ +typedef void *(*malloc_t)(size_t); +typedef void (*free_t)(void *); +local malloc_t my_malloc_f = malloc; +local free_t my_free = free; + +/* use user-supplied allocation routines instead of malloc() and free() */ +void yarn_mem(malloc_t lease, free_t vacate) +{ + my_malloc_f = lease; + my_free = vacate; +} + +/* memory allocation that cannot fail (from the point of view of the caller) */ +local void *my_malloc(size_t size) +{ + void *block; + + if ((block = my_malloc_f(size)) == NULL) + fail(ENOMEM); + return block; +} + +/* -- lock functions -- */ + +struct lock_s { + pthread_mutex_t mutex; + pthread_cond_t cond; + long value; +}; + +lock *new_lock(long initial) +{ + int ret; + lock *bolt; + + bolt = my_malloc(sizeof(struct lock_s)); + if ((ret = pthread_mutex_init(&(bolt->mutex), NULL)) || + (ret = pthread_cond_init(&(bolt->cond), NULL))) + fail(ret); + bolt->value = initial; + return bolt; +} + +void possess(lock *bolt) +{ + int ret; + + if ((ret = pthread_mutex_lock(&(bolt->mutex))) != 0) + fail(ret); +} + +void release(lock *bolt) +{ + int ret; + + if ((ret = pthread_mutex_unlock(&(bolt->mutex))) != 0) + fail(ret); +} + +void twist(lock *bolt, enum twist_op op, long val) +{ + int ret; + + if (op == TO) + bolt->value = val; + else if (op == BY) + bolt->value += val; + if ((ret = pthread_cond_broadcast(&(bolt->cond))) || + (ret = pthread_mutex_unlock(&(bolt->mutex)))) + fail(ret); +} + +#define until(a) while(!(a)) + +void wait_for(lock *bolt, enum wait_op op, long val) +{ + int ret; + + switch (op) { + case TO_BE: + until (bolt->value == val) + if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0) + fail(ret); + break; + case NOT_TO_BE: + until (bolt->value != val) + if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0) + fail(ret); + break; + case TO_BE_MORE_THAN: + until (bolt->value > val) + if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0) + fail(ret); + break; + case TO_BE_LESS_THAN: + until (bolt->value < val) + if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0) + fail(ret); + } +} + +long peek_lock(lock *bolt) +{ + return bolt->value; +} + +void free_lock(lock *bolt) +{ + int ret; + if ((ret = pthread_cond_destroy(&(bolt->cond))) || + (ret = pthread_mutex_destroy(&(bolt->mutex)))) + fail(ret); + my_free(bolt); +} + +/* -- thread functions (uses lock functions above) -- */ + +struct thread_s { + pthread_t id; + int done; /* true if this thread has exited */ + thread *next; /* for list of all launched threads */ +}; + +/* list of threads launched but not joined, count of threads exited but not + joined (incremented by ignition() just before exiting) */ +local lock threads_lock = { + PTHREAD_MUTEX_INITIALIZER, + PTHREAD_COND_INITIALIZER, + 0 /* number of threads exited but not joined */ +}; +local thread *threads = NULL; /* list of extant threads */ + +/* structure in which to pass the probe and its payload to ignition() */ +struct capsule { + void (*probe)(void *); + void *payload; +}; + +/* mark the calling thread as done and alert join_all() */ +local void reenter(void *dummy) +{ + thread *match, **prior; + pthread_t me; + + /* find this thread in the threads list by matching the thread id */ + me = pthread_self(); + possess(&(threads_lock)); + prior = &(threads); + while ((match = *prior) != NULL) { + if (pthread_equal(match->id, me)) + break; + prior = &(match->next); + } + if (match == NULL) + fail(EINVAL); + + /* mark this thread as done and move it to the head of the list */ + match->done = 1; + if (threads != match) { + *prior = match->next; + match->next = threads; + threads = match; + } + + /* update the count of threads to be joined and alert join_all() */ + twist(&(threads_lock), BY, +1); +} + +/* all threads go through this routine so that just before the thread exits, + it marks itself as done in the threads list and alerts join_all() so that + the thread resources can be released -- use cleanup stack so that the + marking occurs even if the thread is cancelled */ +local void *ignition(void *arg) +{ + struct capsule *capsule = arg; + + /* run reenter() before leaving */ + pthread_cleanup_push(reenter, NULL); + + /* execute the requested function with argument */ + capsule->probe(capsule->payload); + my_free(capsule); + + /* mark this thread as done and let join_all() know */ + pthread_cleanup_pop(1); + + /* exit thread */ + return NULL; +} + +/* not all POSIX implementations create threads as joinable by default, so that + is made explicit here */ +thread *launch(void (*probe)(void *), void *payload) +{ + int ret; + thread *th; + struct capsule *capsule; + pthread_attr_t attr; + + /* construct the requested call and argument for the ignition() routine + (allocated instead of automatic so that we're sure this will still be + there when ignition() actually starts up -- ignition() will free this + allocation) */ + capsule = my_malloc(sizeof(struct capsule)); + capsule->probe = probe; + capsule->payload = payload; + + /* assure this thread is in the list before join_all() or ignition() looks + for it */ + possess(&(threads_lock)); + + /* create the thread and call ignition() from that thread */ + th = my_malloc(sizeof(struct thread_s)); + if ((ret = pthread_attr_init(&attr)) || + (ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE)) || + (ret = pthread_create(&(th->id), &attr, ignition, capsule)) || + (ret = pthread_attr_destroy(&attr))) + fail(ret); + + /* put the thread in the threads list for join_all() */ + th->done = 0; + th->next = threads; + threads = th; + release(&(threads_lock)); + return th; +} + +void join(thread *ally) +{ + int ret; + thread *match, **prior; + + /* wait for thread to exit and return its resources */ + if ((ret = pthread_join(ally->id, NULL)) != 0) + fail(ret); + + /* find the thread in the threads list */ + possess(&(threads_lock)); + prior = &(threads); + while ((match = *prior) != NULL) { + if (match == ally) + break; + prior = &(match->next); + } + if (match == NULL) + fail(EINVAL); + + /* remove thread from list and update exited count, free thread */ + if (match->done) + threads_lock.value--; + *prior = match->next; + release(&(threads_lock)); + my_free(ally); +} + +/* This implementation of join_all() only attempts to join threads that have + announced that they have exited (see ignition()). When there are many + threads, this is faster than waiting for some random thread to exit while a + bunch of other threads have already exited. */ +int join_all(void) +{ + int ret, count; + thread *match, **prior; + + /* grab the threads list and initialize the joined count */ + count = 0; + possess(&(threads_lock)); + + /* do until threads list is empty */ + while (threads != NULL) { + /* wait until at least one thread has reentered */ + wait_for(&(threads_lock), NOT_TO_BE, 0); + + /* find the first thread marked done (should be at or near the top) */ + prior = &(threads); + while ((match = *prior) != NULL) { + if (match->done) + break; + prior = &(match->next); + } + if (match == NULL) + fail(EINVAL); + + /* join the thread (will be almost immediate), remove from the threads + list, update the reenter count, and free the thread */ + if ((ret = pthread_join(match->id, NULL)) != 0) + fail(ret); + threads_lock.value--; + *prior = match->next; + my_free(match); + count++; + } + + /* let go of the threads list and return the number of threads joined */ + release(&(threads_lock)); + return count; +} + +/* cancel and join the thread -- the thread will cancel when it gets to a file + operation, a sleep or pause, or a condition wait */ +void destruct(thread *off_course) +{ + int ret; + + if ((ret = pthread_cancel(off_course->id)) != 0) + fail(ret); + join(off_course); +} |