Add solution to sorting assignment.
[parallel_computing.git] / assignments / archive / sorting / soln / main.c
1 /* Parallel sorting utility.
2  *
3  * W. Trevor King.
4  * Based on Michel Vallieres' serial implementation.
5  */
6
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <float.h>
11 #include <sys/time.h>
12 #include <mpi.h>
13
14 #include "sort.h"
15
16 #define NUM_SHOWN 3
17 //#define DEBUG
18 //#define DEBUG_MERGE
19 //#define DEBUG_TIMING
20 //#define MERGE_TYPE 0
21
22 #define TAG_UNSORTED  100
23 #define TAG_SORTED    101
24
25 void printarray(FILE * stream, int array_size, double *array, int num_shown)
26 {
27         int i;
28
29         if (num_shown > 0) {
30                 if (num_shown > array_size / 2)
31                         num_shown = array_size / 2;
32                 for (i = 0; i < num_shown; i++)
33                         fprintf(stream, "%g\t", array[i]);
34                 fprintf(stream, "...\n...\t");
35                 for (i = num_shown; i > 0; i--)
36                         fprintf(stream, "%g\t", array[array_size - i]);
37                 fprintf(stream, "\n");
38         } else {
39                 for (i = 0; i < array_size; i++)
40                         fprintf(stream, "%g\n", array[i]);
41         }
42 }
43
44 double checkarray(int array_size, double *array)
45 {
46         int i;
47         double sum;
48
49         sum = 0.0;
50         for (i = 0; i < array_size; i++)
51                 sum = sum + array[i];
52         return sum;
53 }
54
55 FILE *open_file(const char *file_name)
56 {
57         FILE *fp;
58
59         if (strcmp("-", file_name) == 0) {
60                 fp = stdin;
61         } else if ((fp = fopen(file_name, "r")) == NULL) {
62                 fprintf(stderr, "error in opening data file %s\n", file_name);
63                 exit(EXIT_FAILURE);
64         }
65         return fp;
66 }
67
68 int read_length(FILE * fp)
69 {
70         int array_size;
71
72         fscanf(fp, "# %d", &array_size);
73         return array_size;
74 }
75
76 void allocate_array(size_t element_size, int array_size, void **pArray)
77 {
78         *pArray = (void *)malloc(element_size * array_size);
79         if (*pArray == NULL) {
80                 fprintf(stderr, "could not allocate %d bytes\n",
81                         element_size * array_size);
82                 exit(EXIT_FAILURE);
83         }
84 }
85
86 int read_data_block(FILE * fp, int array_size, double *array)
87 {
88         int i;
89
90 #ifdef DEBUG
91         fprintf(stderr, "Reading %d points\n", array_size);
92 #endif                          /* DEBUG */
93         for (i = 0; i < array_size; i++) {
94                 fscanf(fp, "%lf", &(array[i]));
95         }
96 }
97
98 #if MERGE_TYPE == 1
99
100 void merge(int array_size, double *array,
101            int size, int chunk_size, int residual_size, double **arrays)
102 {
103         int n, i, j, k;
104         double *p, x;
105
106         if (residual_size > 0) {
107                 memcpy(array, arrays[0], sizeof(double) * residual_size);
108 #ifdef DEBUG_MERGE
109                 fprintf(stderr, "chunk %d merge complete\n", 0);
110 #endif                          /* DEBUG_MERGE */
111         }
112         n = residual_size;
113         for (i = 1; i < size; i++) {
114                 p = arrays[i];
115                 k = 0;
116                 for (j = 0; j < chunk_size; j++) {
117                         x = p[j];
118                         while (k < n && array[k] <= x) {
119                                 k++;
120                         }
121                         if (k < n) {
122 #ifdef DEBUG_MERGE
123                                 fprintf(stderr,
124                                         "shift %d elements to insert %g at %d\n",
125                                         n - k, x, k);
126 #endif                          /* DEBUG_MERGE */
127                                 memmove(&(array[k + 1]), &(array[k]),
128                                         sizeof(double) * (n - k));
129                         }
130                         array[k] = x;
131 #ifdef DEBUG_MERGE
132                         fprintf(stderr, "  inserted %g at %d of %d\n", x, k, n);
133 #endif                          /* DEBUG_MERGE */
134                         k++;
135                         n++;
136                 }
137 #ifdef DEBUG_MERGE
138                 fprintf(stderr, "chunk %d merge complete\n", i);
139 #endif                          /* DEBUG_MERGE */
140         }
141 }
142
143 #else                           /* MERGE_TYPE 0 */
144
145 void merge(int array_size, double *array,
146            int size, int chunk_size, int residual_size, double **arrays)
147 {
148         int i, j, minj, *p;
149         double min;
150
151         allocate_array(sizeof(int *), size, (void **)&p);
152         memset(p, 0, sizeof(int *) * size);
153         if (residual_size == 0) {
154                 p[0] = -1;
155 #ifdef DEBUG_MERGE
156                 fprintf(stderr, "chunk %d merge complete\n", 0);
157 #endif                          /* DEBUG_MERGE */
158         }
159
160         for (i = 0; i < array_size; i++) {
161                 double x;
162                 min = DBL_MAX;
163                 for (j = 0; j < size; j++) {
164                         if (p[j] >= 0 && arrays[j][p[j]] < min) {
165                                 minj = j;
166                                 min = arrays[j][p[j]];
167                         }
168                 }
169                 array[i] = arrays[minj][p[minj]];
170 #ifdef DEBUG_MERGE
171                 fprintf(stderr, "Merging arrays[%d][%d]=%g as element %d\n",
172                         minj, p[minj], array[i], i);
173 #endif                          /* DEBUG_MERGE */
174                 p[minj]++;
175                 if ((j == 0 && p[minj] == residual_size)
176                     || p[minj] >= chunk_size) {
177                         p[minj] = -1;
178 #ifdef DEBUG_MERGE
179                         fprintf(stderr, "chunk %d merge complete\n", minj);
180 #endif                          /* DEBUG_MERGE */
181                 }
182         }
183
184         free(p);
185 }
186
187 #endif                          /* MERGE_TYPE */
188
189 #ifdef DEBUG_TIMING
190 double utime()
191 {
192         int err;
193         struct timeval tv;
194
195         if ((err = gettimeofday(&tv, NULL)) != 0) {
196                 fprintf(stderr, "error getting time of day.\n");
197                 exit(EXIT_FAILURE);
198         }
199         return tv.tv_sec + tv.tv_usec / 1e6;
200 }
201 #endif                          /* DEBUG_TIMING */
202
203 void master(int rank, int size, const char *file_name)
204 {
205         int array_size, chunk_size, residual_size, i, err;
206         double *array, **arrays;
207         FILE *fp;
208         MPI_Status status;
209 #ifdef DEBUG_TIMING
210         double t_start, t_sorted, t_merged;
211 #endif                          /* DEBUG_TIMING */
212
213         fp = open_file(file_name);
214
215 #ifdef DEBUG_TIMING
216         t_start = utime();
217 #endif                          /* DEBUG_TIMING */
218
219 #ifdef DEBUG
220         fprintf(stderr, "reading number of elements from %s\n", file_name);
221 #endif                          /* DEBUG */
222         array_size = read_length(fp);
223         chunk_size = array_size / (size - 1);
224         residual_size = array_size % (size - 1);
225 #ifdef DEBUG
226         fprintf(stderr,
227                 "break %d elements into %d chunks of %d with %d remaining\n",
228                 array_size, size - 1, chunk_size, residual_size);
229 #endif                          /* DEBUG */
230
231 #ifdef DEBUG
232         fprintf(stderr, "broadcasting chunk size\n");
233 #endif                          /* DEBUG */
234         MPI_Bcast(&chunk_size, 1, MPI_INT, rank, MPI_COMM_WORLD);
235
236         allocate_array(sizeof(double *), size, (void **)&arrays);
237
238         for (i = 1; i < size; i++) {
239                 allocate_array(sizeof(double), chunk_size,
240                                (void **)&(arrays[i]));
241 #ifdef DEBUG
242                 fprintf(stderr, "allocate chunk %d at %p\n", i, arrays[i]);
243 #endif                          /* DEBUG */
244                 read_data_block(fp, chunk_size, arrays[i]);
245 #ifdef DEBUG
246                 fprintf(stderr, "sending data chunk to node %d\n", i);
247                 printarray(stderr, chunk_size, arrays[i], NUM_SHOWN);
248                 fprintf(stderr, "check: sum of %d elements = %g\n",
249                         chunk_size, checkarray(chunk_size, arrays[i]));
250 #endif                          /* DEBUG */
251                 MPI_Ssend(arrays[i], chunk_size, MPI_DOUBLE, i, TAG_UNSORTED,
252                           MPI_COMM_WORLD);
253         }
254
255 #ifdef DEBUG
256         fprintf(stderr, "allocate memory for the whole array\n");
257 #endif                          /* DEBUG */
258         allocate_array(sizeof(double), array_size, (void **)&array);
259         if (residual_size > 0) {
260                 /* use the full array's tail for the local data */
261                 arrays[0] = &(array[array_size - 1 - residual_size]);
262                 read_data_block(fp, residual_size, arrays[0]);
263         } else {
264                 arrays[0] = NULL;
265         }
266
267         if (fp != stdin)
268                 fclose(fp);
269
270         if (residual_size > 0) {
271 #ifdef DEBUG
272                 fprintf(stderr, "sort the local array\n");
273 #endif                          /* DEBUG */
274                 sort(residual_size, arrays[0]);
275         }
276
277         for (i = 1; i < size; i++) {
278 #ifdef DEBUG
279                 fprintf(stderr, "receive sorted data chunk from node %d\n", i);
280 #endif                          /* DEBUG */
281                 err = MPI_Recv(arrays[i], chunk_size, MPI_DOUBLE, i, TAG_SORTED,
282                                MPI_COMM_WORLD, &status);
283                 if (err != 0) {
284                         fprintf(stderr,
285                                 "error receiving sorted block from %d\n", i);
286                         exit(EXIT_FAILURE);
287                 }
288 #ifdef DEBUG
289                 printarray(stderr, chunk_size, arrays[i], NUM_SHOWN);
290                 fprintf(stderr, "check: sum of %d elements = %g\n",
291                         chunk_size, checkarray(chunk_size, arrays[i]));
292 #endif                          /* DEBUG */
293         }
294
295 #ifdef DEBUG_TIMING
296         t_sorted = utime();
297 #endif                          /* DEBUG_TIMING */
298
299 #ifdef DEBUG
300         fprintf(stderr, "merge sorted chunks\n");
301 #endif                          /* DEBUG */
302         merge(array_size, array, size, chunk_size, residual_size, arrays);
303
304 #ifdef DEBUG_TIMING
305         t_merged = utime();
306 #endif                          /* DEBUG_TIMING */
307
308 #ifdef DEBUG
309         fprintf(stderr, "free chunks and chunk holder\n");
310 #endif                          /* DEBUG */
311         for (i = 1; i < size; i++) {
312                 free(arrays[i]);
313         }
314         free(arrays);
315
316 #ifdef DEBUG
317         /* print final array */
318         fprintf(stderr, "the array after sorting is:\n");
319         printarray(stderr, array_size, array, NUM_SHOWN);
320         fprintf(stderr, "check: sum of %d elements = %g\n",
321                 array_size, checkarray(array_size, array));
322 #endif                          /* DEBUG */
323
324 #ifdef DEBUG_TIMING
325         fprintf(stderr,
326                 "sorted %d elements in %g seconds and merged in %g seconds\n",
327                 array_size, t_sorted - t_start, t_merged - t_sorted);
328 #endif                          /* DEBUG_TIMING */
329
330         printarray(stdout, array_size, array, 0);
331
332         free(array);
333 }
334
335 void slave(int rank)
336 {
337         int chunk_size, master_rank = 0, err;
338         double *array;
339         MPI_Status status;
340
341         MPI_Bcast(&chunk_size, 1, MPI_INT, master_rank, MPI_COMM_WORLD);
342
343         allocate_array(sizeof(double), chunk_size, (void **)&array);
344         err = MPI_Recv(array, chunk_size, MPI_DOUBLE, master_rank, TAG_UNSORTED,
345                        MPI_COMM_WORLD, &status);
346         if (err != 0) {
347                 fprintf(stderr, "Error receiving unsorted block on %d\n", rank);
348                 exit(EXIT_FAILURE);
349         }
350
351         sort(chunk_size, array);
352
353         MPI_Ssend(array, chunk_size, MPI_DOUBLE, master_rank, TAG_SORTED,
354                   MPI_COMM_WORLD);
355
356         free(array);
357 }
358
359 int main(int argc, char *argv[])
360 {
361         int rank, size;
362         const char *file_name = "-";
363
364         MPI_Init(&argc, &argv);
365         MPI_Comm_rank(MPI_COMM_WORLD, &rank);
366         MPI_Comm_size(MPI_COMM_WORLD, &size);
367
368         if (size < 2) {
369                 fprintf(stderr, "%s requires at least 2 nodes\n", argv[0]);
370                 return EXIT_FAILURE;
371         }
372
373         if (rank == 0) {
374                 if (argc > 1)
375                         file_name = argv[1];
376                 master(rank, size, file_name);
377         } else {
378                 slave(rank);
379         }
380
381         MPI_Finalize();
382         return EXIT_SUCCESS;
383 }
384
385 /*  LocalWords:  Vallieres stdlib mpi NUM
386  */