1 /* Parallel sorting utility.
4 * Based on Michel Vallieres' serial implementation.
19 //#define DEBUG_TIMING
20 //#define MERGE_TYPE 0
22 #define TAG_UNSORTED 100
23 #define TAG_SORTED 101
25 void printarray(FILE * stream, int array_size, double *array, int num_shown)
30 if (num_shown > array_size / 2)
31 num_shown = array_size / 2;
32 for (i = 0; i < num_shown; i++)
33 fprintf(stream, "%g\t", array[i]);
34 fprintf(stream, "...\n...\t");
35 for (i = num_shown; i > 0; i--)
36 fprintf(stream, "%g\t", array[array_size - i]);
37 fprintf(stream, "\n");
39 for (i = 0; i < array_size; i++)
40 fprintf(stream, "%g\n", array[i]);
44 double checkarray(int array_size, double *array)
50 for (i = 0; i < array_size; i++)
55 FILE *open_file(const char *file_name)
59 if (strcmp("-", file_name) == 0) {
61 } else if ((fp = fopen(file_name, "r")) == NULL) {
62 fprintf(stderr, "error in opening data file %s\n", file_name);
68 int read_length(FILE * fp)
72 fscanf(fp, "# %d", &array_size);
76 void allocate_array(size_t element_size, int array_size, void **pArray)
78 *pArray = (void *)malloc(element_size * array_size);
79 if (*pArray == NULL) {
80 fprintf(stderr, "could not allocate %d bytes\n",
81 element_size * array_size);
86 int read_data_block(FILE * fp, int array_size, double *array)
91 fprintf(stderr, "Reading %d points\n", array_size);
93 for (i = 0; i < array_size; i++) {
94 fscanf(fp, "%lf", &(array[i]));
100 void merge(int array_size, double *array,
101 int size, int chunk_size, int residual_size, double **arrays)
106 if (residual_size > 0) {
107 memcpy(array, arrays[0], sizeof(double) * residual_size);
109 fprintf(stderr, "chunk %d merge complete\n", 0);
110 #endif /* DEBUG_MERGE */
113 for (i = 1; i < size; i++) {
116 for (j = 0; j < chunk_size; j++) {
118 while (k < n && array[k] <= x) {
124 "shift %d elements to insert %g at %d\n",
126 #endif /* DEBUG_MERGE */
127 memmove(&(array[k + 1]), &(array[k]),
128 sizeof(double) * (n - k));
132 fprintf(stderr, " inserted %g at %d of %d\n", x, k, n);
133 #endif /* DEBUG_MERGE */
138 fprintf(stderr, "chunk %d merge complete\n", i);
139 #endif /* DEBUG_MERGE */
143 #else /* MERGE_TYPE 0 */
145 void merge(int array_size, double *array,
146 int size, int chunk_size, int residual_size, double **arrays)
151 allocate_array(sizeof(int *), size, (void **)&p);
152 memset(p, 0, sizeof(int *) * size);
153 if (residual_size == 0) {
156 fprintf(stderr, "chunk %d merge complete\n", 0);
157 #endif /* DEBUG_MERGE */
160 for (i = 0; i < array_size; i++) {
163 for (j = 0; j < size; j++) {
164 if (p[j] >= 0 && arrays[j][p[j]] < min) {
166 min = arrays[j][p[j]];
169 array[i] = arrays[minj][p[minj]];
171 fprintf(stderr, "Merging arrays[%d][%d]=%g as element %d\n",
172 minj, p[minj], array[i], i);
173 #endif /* DEBUG_MERGE */
175 if ((j == 0 && p[minj] == residual_size)
176 || p[minj] >= chunk_size) {
179 fprintf(stderr, "chunk %d merge complete\n", minj);
180 #endif /* DEBUG_MERGE */
187 #endif /* MERGE_TYPE */
195 if ((err = gettimeofday(&tv, NULL)) != 0) {
196 fprintf(stderr, "error getting time of day.\n");
199 return tv.tv_sec + tv.tv_usec / 1e6;
201 #endif /* DEBUG_TIMING */
203 void master(int rank, int size, const char *file_name)
205 int array_size, chunk_size, residual_size, i, err;
206 double *array, **arrays;
210 double t_start, t_sorted, t_merged;
211 #endif /* DEBUG_TIMING */
213 fp = open_file(file_name);
217 #endif /* DEBUG_TIMING */
220 fprintf(stderr, "reading number of elements from %s\n", file_name);
222 array_size = read_length(fp);
223 chunk_size = array_size / (size - 1);
224 residual_size = array_size % (size - 1);
227 "break %d elements into %d chunks of %d with %d remaining\n",
228 array_size, size - 1, chunk_size, residual_size);
232 fprintf(stderr, "broadcasting chunk size\n");
234 MPI_Bcast(&chunk_size, 1, MPI_INT, rank, MPI_COMM_WORLD);
236 allocate_array(sizeof(double *), size, (void **)&arrays);
238 for (i = 1; i < size; i++) {
239 allocate_array(sizeof(double), chunk_size,
240 (void **)&(arrays[i]));
242 fprintf(stderr, "allocate chunk %d at %p\n", i, arrays[i]);
244 read_data_block(fp, chunk_size, arrays[i]);
246 fprintf(stderr, "sending data chunk to node %d\n", i);
247 printarray(stderr, chunk_size, arrays[i], NUM_SHOWN);
248 fprintf(stderr, "check: sum of %d elements = %g\n",
249 chunk_size, checkarray(chunk_size, arrays[i]));
251 MPI_Ssend(arrays[i], chunk_size, MPI_DOUBLE, i, TAG_UNSORTED,
256 fprintf(stderr, "allocate memory for the whole array\n");
258 allocate_array(sizeof(double), array_size, (void **)&array);
259 if (residual_size > 0) {
260 /* use the full array's tail for the local data */
261 arrays[0] = &(array[array_size - 1 - residual_size]);
262 read_data_block(fp, residual_size, arrays[0]);
270 if (residual_size > 0) {
272 fprintf(stderr, "sort the local array\n");
274 sort(residual_size, arrays[0]);
277 for (i = 1; i < size; i++) {
278 err = MPI_Recv(arrays[i], chunk_size, MPI_DOUBLE,
279 MPI_ANY_SOURCE, TAG_SORTED, MPI_COMM_WORLD,
283 "error receiving sorted block from %d\n",
288 fprintf(stderr, "received sorted data chunk from node %d\n",
290 printarray(stderr, chunk_size, arrays[i], NUM_SHOWN);
291 fprintf(stderr, "check: sum of %d elements = %g\n",
292 chunk_size, checkarray(chunk_size, arrays[i]));
298 #endif /* DEBUG_TIMING */
301 fprintf(stderr, "merge sorted chunks\n");
303 merge(array_size, array, size, chunk_size, residual_size, arrays);
307 #endif /* DEBUG_TIMING */
310 fprintf(stderr, "free chunks and chunk holder\n");
312 for (i = 1; i < size; i++) {
318 /* print final array */
319 fprintf(stderr, "the array after sorting is:\n");
320 printarray(stderr, array_size, array, NUM_SHOWN);
321 fprintf(stderr, "check: sum of %d elements = %g\n",
322 array_size, checkarray(array_size, array));
327 "sorted %d elements in %g seconds and merged in %g seconds\n",
328 array_size, t_sorted - t_start, t_merged - t_sorted);
329 #endif /* DEBUG_TIMING */
331 printarray(stdout, array_size, array, 0);
338 int chunk_size, master_rank = 0, err;
342 MPI_Bcast(&chunk_size, 1, MPI_INT, master_rank, MPI_COMM_WORLD);
344 allocate_array(sizeof(double), chunk_size, (void **)&array);
345 err = MPI_Recv(array, chunk_size, MPI_DOUBLE, master_rank, TAG_UNSORTED,
346 MPI_COMM_WORLD, &status);
348 fprintf(stderr, "Error receiving unsorted block on %d\n", rank);
352 sort(chunk_size, array);
354 MPI_Ssend(array, chunk_size, MPI_DOUBLE, master_rank, TAG_SORTED,
360 int main(int argc, char *argv[])
363 const char *file_name = "-";
365 MPI_Init(&argc, &argv);
366 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
367 MPI_Comm_size(MPI_COMM_WORLD, &size);
370 fprintf(stderr, "%s requires at least 2 nodes\n", argv[0]);
377 master(rank, size, file_name);
386 /* LocalWords: Vallieres stdlib mpi NUM