00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012 #include <stdio.h>
00013 #include <stdlib.h>
00014 #include <unistd.h>
00015 #include <string.h>
00016
00017 #include "parallel_com.h"
00018
00019 #ifdef __comm_selected
00020
00021
00022 #ifdef _USE_MPI
00023 int rank, size;
00024 #endif
00025
00026
00027 int parallel_init(int *argc, char ***argv) {
00028 int status;
00029 #ifdef _USE_PVM
00030 if (parent_tid == PvmNoParent) {
00031 for (n = 0; n < nworkers; n++) {
00032 rcode = pvm_spawn("calcpi", NULL, PvmTaskDefault, "", nworkers, tids[n]);
00033 }
00034
00035 }
00036 #endif
00037 #ifdef _USE_MPI
00038 status = MPI_Init( argc, argv );
00039 if (status == MPI_SUCCESS) {
00040
00041 MPI_Comm_size ( MPI_COMM_WORLD, &size );
00042 MPI_Comm_rank ( MPI_COMM_WORLD, &rank );
00043 return 0;
00044 }
00045 return 1;
00046
00047 #endif
00048
00049 return 0;
00050 }
00051
00052
00053 int parallel_close(void) {
00054 #ifdef _USE_PVM
00055
00056 #endif
00057 #ifdef _USE_MPI
00058 MPI_Finalize();
00059 #endif
00060 }
00061
00062 int parallel_master_task(void) {
00063 #ifdef _USE_MPI
00064 if (rank == 0)
00065 return 1;
00066 else
00067 return 0;
00068 #endif
00069 return 0;
00070 }
00071
00072 int parallel_slave_task(void) {
00073 #ifdef _USE_MPI
00074 if (rank != 0)
00075 return 1;
00076 else
00077 return 0;
00078 #endif
00079 return 0;
00080
00081 }
00082
00083
00084 int parallel_group_size(void) {
00085 #ifdef _USE_MPI
00086 return size;
00087 #endif
00088 }
00089
00090
00091 int parallel_send_text(char *str, int dest, int tag) {
00092 #ifdef _USE_MPI
00093 int size = strlen(str)+1;
00094 MPI_Send( &size, 1, MPI_INT, dest, tag, MPI_COMM_WORLD);
00095 return MPI_Send(str, size, MPI_CHAR, dest, tag, MPI_COMM_WORLD);
00096 #endif
00097 }
00098
00099
00100
00101 int parallel_recv_text(char **str, int src, int tag) {
00102 #ifdef _USE_MPI
00103 int size;
00104 MPI_Status status;
00105 MPI_Recv( &size, 1, MPI_INT, src, tag, MPI_COMM_WORLD, &status);
00106 *str = (char *)malloc( size );
00107 memset(*str, 0, size-1);
00108 return MPI_Recv( *str, size, MPI_CHAR, src, tag, MPI_COMM_WORLD, &status);
00109 #endif
00110 }
00111
00112 int parallel_send_data(void *data, int data_size, int dest, int tag) {
00113 #ifdef _USE_MPI
00114 MPI_Send( &data_size, 1, MPI_INT, dest, tag, MPI_COMM_WORLD);
00115 return MPI_Send(data, data_size, MPI_CHAR, dest, tag, MPI_COMM_WORLD);
00116 #endif
00117 }
00118
00119
00120 int parallel_recv_data(void **data, int *data_size, int src, int tag) {
00121 #ifdef _USE_MPI
00122 MPI_Status status;
00123 MPI_Recv( data_size, 1, MPI_INT, src, tag, MPI_COMM_WORLD, &status);
00124 *data = malloc( *data_size );
00125 memset(*data, 0, (*data_size)-1);
00126 return MPI_Recv( *data, *data_size, MPI_CHAR, src, tag, MPI_COMM_WORLD, &status);
00127 #endif
00128 }
00129
00130
00131
00132 int parallel_send_int(int num, int dest, int tag) {
00133 #ifdef _USE_MPI
00134 return MPI_Send( &num, 1, MPI_INT, dest, tag, MPI_COMM_WORLD);
00135 #endif
00136 }
00137
00138
00139
00140 int parallel_recv_int(int *num, int src, int tag) {
00141 #ifdef _USE_MPI
00142 MPI_Status status;
00143 return MPI_Recv( num, 1, MPI_INT, src, tag, MPI_COMM_WORLD, &status);
00144 #endif
00145 }
00146
00147
00148
00149 int parallel_probe( int *src_val, int *tag_val) {
00150 #ifdef _USE_MPI
00151 MPI_Status status;
00152 int flag;
00153 int src_num, tag_num;
00154
00155 if (src_val == NULL)
00156 src_num = MPI_ANY_SOURCE;
00157 else
00158 src_num = *src_val;
00159 if (tag_val == NULL)
00160 tag_num = MPI_ANY_TAG;
00161 else
00162 tag_num = *tag_val;
00163
00164 MPI_Iprobe( src_num, tag_num, MPI_COMM_WORLD, &flag, &status );
00165
00166 if ( flag ) {
00167 if ( src_val != NULL )
00168 *src_val = status.MPI_SOURCE;
00169 if ( tag_val != NULL )
00170 *tag_val = status.MPI_TAG;
00171 return 1;
00172 }
00173 return 0;
00174 #endif
00175 }
00176
00177
00178
00179 #define kRequestTag 1000
00180 #define kWorkTag 1050
00181 #define kResultTag 1100
00182 #define kRetryTag 1125
00183 #define kEndTag 1150
00184
00185
00186 #define tsWaiting 0
00187 #define tsWorking 1
00188 #define tsDone 2
00189 #define tsRetry 3
00190
00191
00192 void parallel_master_work(int work_count,
00193 request_function req_func,
00194 result_function res_func,
00195 void *data) {
00196 int *work_status, i, tmp_src, tmp_tag, working, cur_work;
00197 void *buffer_work, *buffer_result;
00198 int buffer_work_size, buffer_result_size;
00199
00200 work_status = (int *) malloc(sizeof(int) * work_count );
00201 for ( i = 0; i < work_count; i++ ) {
00202 work_status[i] = tsWaiting;
00203 }
00204
00205 MPI_Barrier(MPI_COMM_WORLD);
00206 working = work_count;
00207
00208 do {
00209 tmp_src = PARALLEL_ANY_SOURCE;
00210 tmp_tag = kRequestTag;
00211 if ( parallel_probe(&tmp_src, &tmp_tag)) {
00212 parallel_recv_int(&i, tmp_src, tmp_tag);
00213 tmp_tag = kWorkTag;
00214 for (i = 0; i < work_count; i++) {
00215 if ( work_status[i] == tsWaiting) {
00216 cur_work = i;
00217 buffer_work_size = req_func( cur_work, data, &buffer_work );
00218 if ( buffer_work_size > 0) {
00219 i = work_count;
00220 } else {
00221 work_status[i] = tsDone;
00222 working--;
00223 }
00224 }
00225 }
00226 if ( buffer_work_size > 0) {
00227 parallel_send_int( cur_work, tmp_src, tmp_tag);
00228 parallel_send_data( buffer_work, buffer_work_size, tmp_src, tmp_tag);
00229 work_status[ cur_work ] = tsWorking;
00230 free(buffer_work);
00231 }
00232 }
00233 tmp_src = PARALLEL_ANY_SOURCE;
00234 tmp_tag = kResultTag;
00235 while ( parallel_probe(&tmp_src, &tmp_tag)) {
00236 parallel_recv_int(&i, tmp_src, tmp_tag);
00237 parallel_recv_data(&buffer_result, &buffer_result_size, tmp_src, tmp_tag);
00238 work_status[ i ] = tsDone;
00239 res_func( i, data, buffer_result, buffer_result_size);
00240 working--;
00241 free(buffer_result);
00242 tmp_src = PARALLEL_ANY_SOURCE;
00243 tmp_tag = kResultTag;
00244 }
00245 tmp_src = PARALLEL_ANY_SOURCE;
00246 tmp_tag = kRetryTag;
00247 while ( parallel_probe(&tmp_src, &tmp_tag)) {
00248 parallel_recv_int(&i, tmp_src, tmp_tag);
00249 work_status[ i ] = tsWaiting;
00250 }
00251 usleep(1000);
00252 } while (working);
00253 for (i = 1; i < parallel_group_size(); i++)
00254 parallel_send_int(0, i, kEndTag );
00255 MPI_Barrier(MPI_COMM_WORLD);
00256 }
00257
00258
00259 void parallel_slave_work(work_function work_func, void *data) {
00260 int tmp_src, tmp_tag, buffer_int;
00261 void *buffer_request, *buffer_result;
00262 char working = 1;
00263 int buffer_request_size, buffer_result_size;
00264
00265 MPI_Barrier(MPI_COMM_WORLD);
00266
00267 parallel_send_int(1, 0, kRequestTag );
00268 do {
00269 tmp_src = PARALLEL_ANY_SOURCE;
00270 tmp_tag = kWorkTag;
00271 if ( parallel_probe( &tmp_src, &tmp_tag ) ) {
00272 parallel_recv_int( &buffer_int, tmp_src, tmp_tag);
00273 parallel_recv_data( &buffer_request, &buffer_request_size, tmp_src, tmp_tag);
00274
00275 buffer_result_size = work_func(buffer_int, data, buffer_request, buffer_request_size, &buffer_result);
00276
00277 if (buffer_result_size == -1) {
00278 parallel_send_int( buffer_int, tmp_src, kRetryTag );
00279 parallel_send_int( 1, 0, kRequestTag );
00280 } else {
00281 parallel_send_int( buffer_int, tmp_src, kResultTag );
00282 parallel_send_data( buffer_result, buffer_result_size, tmp_src, kResultTag );
00283 parallel_send_int( 1, 0, kRequestTag );
00284 }
00285 }
00286 tmp_src = PARALLEL_ANY_SOURCE;
00287 tmp_tag = kEndTag;
00288 if ( parallel_probe( &tmp_src, &tmp_tag ) ) {
00289 parallel_recv_int( &buffer_int, tmp_src, tmp_tag);
00290 working = 0;
00291 }
00292 usleep(1000);
00293 } while (working);
00294 MPI_Barrier(MPI_COMM_WORLD);
00295 }
00296
00297
00298
00299
00300 #endif