--- /dev/null
+sorting
+=======
+
+Various sorting algorithms.
+
+Manifest
+--------
+
+=========== ===============================================
+README This file.
+Makefile Automate building and cleanup.
+data.py Generate random data for the 'data' file.
+scaling.py Script to generate time-scaling data.
+main.c Command line framework for a sorting algorithm.
+sort.h Header declaring sort function syntax.
+bubble.c Bubble sort implementation of sort.h.
+quicksort.c Quicksort implementation of sort.h.
+=========== ===============================================
+
+Build
+-----
+
+Just run
+
+ $ make
+
+which also builds a random data file 'data'. To build with the DEBUG
+macro defined (to enable some stderr printouts in main.c), run
+
+ $ make CFLAGS=-DDEBUG
+
+To get timing information, compile with
+
+ $ make CFLAGS=-DDEBUG_TIMING
+
+Although the chunk-merge portion of the sort is generally much quicker
+than the chunk sorting itself, it is less clear what the fastest
+chunk-merge algorithm is. Set the MERGE_TYPE macro to test other
+algorithms, for example:
+
+ $ make clean && make "CFLAGS=-DDEBUG_TIMING"
+ $ ./data.py 100000 | mpiexec -n 3 ./parallel-quicksort >/dev/null
+ sorted 100000 elements in 20.9942 seconds and merged in 0.0027411 seconds
+ $ make clean && make "CFLAGS=-DDEBUG_TIMING -DMERGE_TYPE=1"
+ $ ./data.py 100000 | mpiexec -n 3 ./parallel-quicksort >/dev/null
+ sorted 100000 elements in 21.3503 seconds and merged in 1.07004 seconds
+
+Remove auto-generated files with
+
+ $ make clean
+
+Usage
+-----
+
+Start the Message Passing Daemons with
+
+ $ mpdboot -n 4 -f mpd.hosts
+
+which spawns a daemon (`mpd`) on the first four hosts in `mpd.hosts`.
+Then run your program (e.g. `simplest_message`) with.
+
+ $ mpiexec -n 4 ./simplest_message
+
+When you're done, clean up the daemons with
+
+ $ mpdallexit
+
+Examples for each of the executables in this package:
+
+ $ mpiexec -n 4 ./parallel-quicksort data
+ $ mpiexec -n 4 ./parallel-bubble data
+
+Timing
+------
+
+Time 8191 data points on 4 xphy nodes with
+
+ $ mpdboot -n 4 -f <(seq 1 4 | sed 's/^/xphy/')
+ $ time mpiexec -n 4 ./parallel-bubble data > /dev/null
+ $ time mpiexec -n 4 ./parallel-quicksort data > /dev/null
+ $ mpdallexit
+
+quicksort takes 0.210 s and bubble takes 0.302 s.
+
+TODO: `make scaling` doesn't work yet.
+You can generate scaling graphs for all executables built by the
+Makefile with
+
+ $ make scaling
+
+which generates `*-scaling.dat` and `*-scaling.png` for each
+executable.
--- /dev/null
+/* Parallel sorting utility.
+ *
+ * W. Trevor King.
+ * Based on Michel Vallieres' serial implementation.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <float.h>
+#include <sys/time.h>
+#include <mpi.h>
+
+#include "sort.h"
+
+#define NUM_SHOWN 3
+//#define DEBUG
+//#define DEBUG_MERGE
+//#define DEBUG_TIMING
+//#define MERGE_TYPE 0
+
+#define TAG_UNSORTED 100
+#define TAG_SORTED 101
+
+void printarray(FILE * stream, int array_size, double *array, int num_shown)
+{
+ int i;
+
+ if (num_shown > 0) {
+ if (num_shown > array_size / 2)
+ num_shown = array_size / 2;
+ for (i = 0; i < num_shown; i++)
+ fprintf(stream, "%g\t", array[i]);
+ fprintf(stream, "...\n...\t");
+ for (i = num_shown; i > 0; i--)
+ fprintf(stream, "%g\t", array[array_size - i]);
+ fprintf(stream, "\n");
+ } else {
+ for (i = 0; i < array_size; i++)
+ fprintf(stream, "%g\n", array[i]);
+ }
+}
+
+double checkarray(int array_size, double *array)
+{
+ int i;
+ double sum;
+
+ sum = 0.0;
+ for (i = 0; i < array_size; i++)
+ sum = sum + array[i];
+ return sum;
+}
+
+FILE *open_file(const char *file_name)
+{
+ FILE *fp;
+
+ if (strcmp("-", file_name) == 0) {
+ fp = stdin;
+ } else if ((fp = fopen(file_name, "r")) == NULL) {
+ fprintf(stderr, "error in opening data file %s\n", file_name);
+ exit(EXIT_FAILURE);
+ }
+ return fp;
+}
+
+int read_length(FILE * fp)
+{
+ int array_size;
+
+ fscanf(fp, "# %d", &array_size);
+ return array_size;
+}
+
+void allocate_array(size_t element_size, int array_size, void **pArray)
+{
+ *pArray = (void *)malloc(element_size * array_size);
+ if (*pArray == NULL) {
+ fprintf(stderr, "could not allocate %d bytes\n",
+ element_size * array_size);
+ exit(EXIT_FAILURE);
+ }
+}
+
+int read_data_block(FILE * fp, int array_size, double *array)
+{
+ int i;
+
+#ifdef DEBUG
+ fprintf(stderr, "Reading %d points\n", array_size);
+#endif /* DEBUG */
+ for (i = 0; i < array_size; i++) {
+ fscanf(fp, "%lf", &(array[i]));
+ }
+}
+
+#if MERGE_TYPE == 1
+
+void merge(int array_size, double *array,
+ int size, int chunk_size, int residual_size, double **arrays)
+{
+ int n, i, j, k;
+ double *p, x;
+
+ if (residual_size > 0) {
+ memcpy(array, arrays[0], sizeof(double) * residual_size);
+#ifdef DEBUG_MERGE
+ fprintf(stderr, "chunk %d merge complete\n", 0);
+#endif /* DEBUG_MERGE */
+ }
+ n = residual_size;
+ for (i = 1; i < size; i++) {
+ p = arrays[i];
+ k = 0;
+ for (j = 0; j < chunk_size; j++) {
+ x = p[j];
+ while (k < n && array[k] <= x) {
+ k++;
+ }
+ if (k < n) {
+#ifdef DEBUG_MERGE
+ fprintf(stderr,
+ "shift %d elements to insert %g at %d\n",
+ n - k, x, k);
+#endif /* DEBUG_MERGE */
+ memmove(&(array[k + 1]), &(array[k]),
+ sizeof(double) * (n - k));
+ }
+ array[k] = x;
+#ifdef DEBUG_MERGE
+ fprintf(stderr, " inserted %g at %d of %d\n", x, k, n);
+#endif /* DEBUG_MERGE */
+ k++;
+ n++;
+ }
+#ifdef DEBUG_MERGE
+ fprintf(stderr, "chunk %d merge complete\n", i);
+#endif /* DEBUG_MERGE */
+ }
+}
+
+#else /* MERGE_TYPE 0 */
+
+void merge(int array_size, double *array,
+ int size, int chunk_size, int residual_size, double **arrays)
+{
+ int i, j, minj, *p;
+ double min;
+
+ allocate_array(sizeof(int *), size, (void **)&p);
+ memset(p, 0, sizeof(int *) * size);
+ if (residual_size == 0) {
+ p[0] = -1;
+#ifdef DEBUG_MERGE
+ fprintf(stderr, "chunk %d merge complete\n", 0);
+#endif /* DEBUG_MERGE */
+ }
+
+ for (i = 0; i < array_size; i++) {
+ double x;
+ min = DBL_MAX;
+ for (j = 0; j < size; j++) {
+ if (p[j] >= 0 && arrays[j][p[j]] < min) {
+ minj = j;
+ min = arrays[j][p[j]];
+ }
+ }
+ array[i] = arrays[minj][p[minj]];
+#ifdef DEBUG_MERGE
+ fprintf(stderr, "Merging arrays[%d][%d]=%g as element %d\n",
+ minj, p[minj], array[i], i);
+#endif /* DEBUG_MERGE */
+ p[minj]++;
+ if ((j == 0 && p[minj] == residual_size)
+ || p[minj] >= chunk_size) {
+ p[minj] = -1;
+#ifdef DEBUG_MERGE
+ fprintf(stderr, "chunk %d merge complete\n", minj);
+#endif /* DEBUG_MERGE */
+ }
+ }
+
+ free(p);
+}
+
+#endif /* MERGE_TYPE */
+
+#ifdef DEBUG_TIMING
+double utime()
+{
+ int err;
+ struct timeval tv;
+
+ if ((err = gettimeofday(&tv, NULL)) != 0) {
+ fprintf(stderr, "error getting time of day.\n");
+ exit(EXIT_FAILURE);
+ }
+ return tv.tv_sec + tv.tv_usec / 1e6;
+}
+#endif /* DEBUG_TIMING */
+
+void master(int rank, int size, const char *file_name)
+{
+ int array_size, chunk_size, residual_size, i, err;
+ double *array, **arrays;
+ FILE *fp;
+ MPI_Status status;
+#ifdef DEBUG_TIMING
+ double t_start, t_sorted, t_merged;
+#endif /* DEBUG_TIMING */
+
+ fp = open_file(file_name);
+
+#ifdef DEBUG_TIMING
+ t_start = utime();
+#endif /* DEBUG_TIMING */
+
+#ifdef DEBUG
+ fprintf(stderr, "reading number of elements from %s\n", file_name);
+#endif /* DEBUG */
+ array_size = read_length(fp);
+ chunk_size = array_size / (size - 1);
+ residual_size = array_size % (size - 1);
+#ifdef DEBUG
+ fprintf(stderr,
+ "break %d elements into %d chunks of %d with %d remaining\n",
+ array_size, size - 1, chunk_size, residual_size);
+#endif /* DEBUG */
+
+#ifdef DEBUG
+ fprintf(stderr, "broadcasting chunk size\n");
+#endif /* DEBUG */
+ MPI_Bcast(&chunk_size, 1, MPI_INT, rank, MPI_COMM_WORLD);
+
+ allocate_array(sizeof(double *), size, (void **)&arrays);
+
+ for (i = 1; i < size; i++) {
+ allocate_array(sizeof(double), chunk_size,
+ (void **)&(arrays[i]));
+#ifdef DEBUG
+ fprintf(stderr, "allocate chunk %d at %p\n", i, arrays[i]);
+#endif /* DEBUG */
+ read_data_block(fp, chunk_size, arrays[i]);
+#ifdef DEBUG
+ fprintf(stderr, "sending data chunk to node %d\n", i);
+ printarray(stderr, chunk_size, arrays[i], NUM_SHOWN);
+ fprintf(stderr, "check: sum of %d elements = %g\n",
+ chunk_size, checkarray(chunk_size, arrays[i]));
+#endif /* DEBUG */
+ MPI_Ssend(arrays[i], chunk_size, MPI_DOUBLE, i, TAG_UNSORTED,
+ MPI_COMM_WORLD);
+ }
+
+#ifdef DEBUG
+ fprintf(stderr, "allocate memory for the whole array\n");
+#endif /* DEBUG */
+ allocate_array(sizeof(double), array_size, (void **)&array);
+ if (residual_size > 0) {
+ /* use the full array's tail for the local data */
+ arrays[0] = &(array[array_size - 1 - residual_size]);
+ read_data_block(fp, residual_size, arrays[0]);
+ } else {
+ arrays[0] = NULL;
+ }
+
+ if (fp != stdin)
+ fclose(fp);
+
+ if (residual_size > 0) {
+#ifdef DEBUG
+ fprintf(stderr, "sort the local array\n");
+#endif /* DEBUG */
+ sort(residual_size, arrays[0]);
+ }
+
+ for (i = 1; i < size; i++) {
+#ifdef DEBUG
+ fprintf(stderr, "receive sorted data chunk from node %d\n", i);
+#endif /* DEBUG */
+ err = MPI_Recv(arrays[i], chunk_size, MPI_DOUBLE, i, TAG_SORTED,
+ MPI_COMM_WORLD, &status);
+ if (err != 0) {
+ fprintf(stderr,
+ "error receiving sorted block from %d\n", i);
+ exit(EXIT_FAILURE);
+ }
+#ifdef DEBUG
+ printarray(stderr, chunk_size, arrays[i], NUM_SHOWN);
+ fprintf(stderr, "check: sum of %d elements = %g\n",
+ chunk_size, checkarray(chunk_size, arrays[i]));
+#endif /* DEBUG */
+ }
+
+#ifdef DEBUG_TIMING
+ t_sorted = utime();
+#endif /* DEBUG_TIMING */
+
+#ifdef DEBUG
+ fprintf(stderr, "merge sorted chunks\n");
+#endif /* DEBUG */
+ merge(array_size, array, size, chunk_size, residual_size, arrays);
+
+#ifdef DEBUG_TIMING
+ t_merged = utime();
+#endif /* DEBUG_TIMING */
+
+#ifdef DEBUG
+ fprintf(stderr, "free chunks and chunk holder\n");
+#endif /* DEBUG */
+ for (i = 1; i < size; i++) {
+ free(arrays[i]);
+ }
+ free(arrays);
+
+#ifdef DEBUG
+ /* print final array */
+ fprintf(stderr, "the array after sorting is:\n");
+ printarray(stderr, array_size, array, NUM_SHOWN);
+ fprintf(stderr, "check: sum of %d elements = %g\n",
+ array_size, checkarray(array_size, array));
+#endif /* DEBUG */
+
+#ifdef DEBUG_TIMING
+ fprintf(stderr,
+ "sorted %d elements in %g seconds and merged in %g seconds\n",
+ array_size, t_sorted - t_start, t_merged - t_sorted);
+#endif /* DEBUG_TIMING */
+
+ printarray(stdout, array_size, array, 0);
+
+ free(array);
+}
+
+void slave(int rank)
+{
+ int chunk_size, master_rank = 0, err;
+ double *array;
+ MPI_Status status;
+
+ MPI_Bcast(&chunk_size, 1, MPI_INT, master_rank, MPI_COMM_WORLD);
+
+ allocate_array(sizeof(double), chunk_size, (void **)&array);
+ err = MPI_Recv(array, chunk_size, MPI_DOUBLE, master_rank, TAG_UNSORTED,
+ MPI_COMM_WORLD, &status);
+ if (err != 0) {
+ fprintf(stderr, "Error receiving unsorted block on %d\n", rank);
+ exit(EXIT_FAILURE);
+ }
+
+ sort(chunk_size, array);
+
+ MPI_Ssend(array, chunk_size, MPI_DOUBLE, master_rank, TAG_SORTED,
+ MPI_COMM_WORLD);
+
+ free(array);
+}
+
+int main(int argc, char *argv[])
+{
+ int rank, size;
+ const char *file_name = "-";
+
+ MPI_Init(&argc, &argv);
+ MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+ MPI_Comm_size(MPI_COMM_WORLD, &size);
+
+ if (size < 2) {
+ fprintf(stderr, "%s requires at least 2 nodes\n", argv[0]);
+ return EXIT_FAILURE;
+ }
+
+ if (rank == 0) {
+ if (argc > 1)
+ file_name = argv[1];
+ master(rank, size, file_name);
+ } else {
+ slave(rank);
+ }
+
+ MPI_Finalize();
+ return EXIT_SUCCESS;
+}
+
+/* LocalWords: Vallieres stdlib mpi NUM
+ */