src/common/parallel_com.cc

00001 
00002 
00003 /****************************
00004 **
00005 **      parallel_com.cc
00006 **      Parallel Communication
00007 **              by Kyle Ellrott
00008 **
00009 **
00010 *****************************/
00011 
00012 #include <stdio.h>
00013 #include <stdlib.h>
00014 #include <unistd.h>
00015 #include <string.h>
00016 
00017 #include "parallel_com.h"
00018 
00019 #ifdef __comm_selected
00020 
00021 
00022 #ifdef _USE_MPI
00023 int rank, size;
00024 #endif
00025 
00026 
00027 int parallel_init(int *argc, char ***argv) {
00028   int status;
00029 #ifdef _USE_PVM
00030   if (parent_tid == PvmNoParent) {
00031     for (n = 0; n < nworkers; n++) {
00032       rcode = pvm_spawn("calcpi", NULL, PvmTaskDefault, "", nworkers, tids[n]);
00033     }
00034     
00035   }
00036 #endif
00037 #ifdef _USE_MPI
00038   status = MPI_Init( argc, argv );
00039   if (status == MPI_SUCCESS) {
00040     //    fprintf(stderr, "Running MPI\n");
00041     MPI_Comm_size ( MPI_COMM_WORLD, &size ); 
00042     MPI_Comm_rank ( MPI_COMM_WORLD, &rank ); 
00043     return 0;
00044   }
00045   return 1;
00046 
00047 #endif
00048 
00049   return 0;
00050 }
00051 
00052 
00053 int parallel_close(void) {
00054 #ifdef _USE_PVM
00055   
00056 #endif
00057 #ifdef _USE_MPI
00058   MPI_Finalize();
00059 #endif
00060 }
00061 
00062 int parallel_master_task(void) {
00063 #ifdef _USE_MPI
00064   if (rank == 0) 
00065     return 1;
00066   else 
00067     return 0;
00068 #endif
00069   return 0;
00070 }
00071 
00072 int parallel_slave_task(void) {
00073 #ifdef _USE_MPI
00074   if (rank != 0) 
00075     return 1;
00076   else 
00077     return 0;
00078 #endif
00079   return 0;
00080   
00081 }
00082 
00083 
00084 int parallel_group_size(void) {
00085 #ifdef _USE_MPI
00086   return size;
00087 #endif
00088 }
00089 
00090 
00091 int parallel_send_text(char *str, int dest, int tag) {
00092 #ifdef _USE_MPI
00093   int size = strlen(str)+1;
00094   MPI_Send( &size, 1, MPI_INT, dest, tag, MPI_COMM_WORLD);
00095   return MPI_Send(str, size, MPI_CHAR, dest, tag, MPI_COMM_WORLD);
00096 #endif
00097 }
00098 
00099 
00100 
00101 int parallel_recv_text(char **str, int src, int tag) {
00102 #ifdef _USE_MPI
00103   int size;
00104   MPI_Status status;
00105   MPI_Recv( &size, 1, MPI_INT, src, tag, MPI_COMM_WORLD, &status);
00106   *str = (char *)malloc( size );
00107    memset(*str, 0, size-1);
00108   return MPI_Recv( *str, size, MPI_CHAR, src, tag, MPI_COMM_WORLD, &status);
00109 #endif
00110 }
00111 
00112 int parallel_send_data(void *data, int data_size, int dest, int tag) {
00113 #ifdef _USE_MPI
00114   MPI_Send( &data_size, 1, MPI_INT, dest, tag, MPI_COMM_WORLD);
00115   return MPI_Send(data, data_size, MPI_CHAR, dest, tag, MPI_COMM_WORLD);
00116 #endif
00117 }
00118 
00119 
00120 int parallel_recv_data(void **data, int *data_size, int src, int tag) {
00121 #ifdef _USE_MPI
00122   MPI_Status status;
00123   MPI_Recv( data_size, 1, MPI_INT, src, tag, MPI_COMM_WORLD, &status);
00124   *data = malloc( *data_size );
00125   memset(*data, 0, (*data_size)-1);
00126   return MPI_Recv( *data, *data_size, MPI_CHAR, src, tag, MPI_COMM_WORLD, &status);
00127 #endif
00128 }
00129 
00130 
00131 
00132 int parallel_send_int(int num, int dest, int tag) {
00133 #ifdef _USE_MPI
00134   return MPI_Send( &num, 1, MPI_INT, dest, tag, MPI_COMM_WORLD);
00135 #endif
00136 }
00137 
00138 
00139 
00140 int parallel_recv_int(int *num, int src, int tag) {
00141 #ifdef _USE_MPI
00142   MPI_Status status;
00143   return MPI_Recv( num, 1, MPI_INT, src, tag, MPI_COMM_WORLD, &status);
00144 #endif
00145 }
00146 
00147 
00148 
00149 int parallel_probe( int *src_val, int *tag_val) {
00150 #ifdef _USE_MPI
00151   MPI_Status status;
00152   int flag;
00153   int src_num, tag_num;
00154   
00155   if (src_val == NULL)  
00156     src_num = MPI_ANY_SOURCE;
00157   else 
00158     src_num = *src_val;
00159   if (tag_val == NULL)
00160     tag_num = MPI_ANY_TAG;
00161   else 
00162     tag_num = *tag_val;
00163 
00164   MPI_Iprobe( src_num, tag_num, MPI_COMM_WORLD, &flag, &status );
00165   
00166   if ( flag ) {
00167     if ( src_val != NULL )
00168       *src_val = status.MPI_SOURCE;
00169     if ( tag_val != NULL ) 
00170       *tag_val = status.MPI_TAG;
00171     return 1;
00172   }
00173   return 0;
00174 #endif
00175 }
00176 
00177 
00178 
00179 #define kRequestTag             1000
00180 #define kWorkTag                1050
00181 #define kResultTag              1100
00182 #define kRetryTag               1125
00183 #define kEndTag                 1150
00184 
00185 
00186 #define tsWaiting               0
00187 #define tsWorking               1
00188 #define tsDone                  2
00189 #define tsRetry                 3
00190 
00191 
00192 void parallel_master_work(int work_count, 
00193                       request_function req_func, 
00194                       result_function res_func, 
00195                       void *data) {
00196   int *work_status, i, tmp_src, tmp_tag, working, cur_work;
00197   void *buffer_work, *buffer_result;
00198   int buffer_work_size, buffer_result_size;
00199 
00200   work_status = (int *) malloc(sizeof(int) * work_count );
00201   for ( i = 0; i < work_count; i++ ) {
00202     work_status[i] = tsWaiting;
00203   }
00204 
00205   MPI_Barrier(MPI_COMM_WORLD);
00206   working = work_count;
00207 
00208   do {
00209     tmp_src = PARALLEL_ANY_SOURCE;
00210     tmp_tag = kRequestTag;
00211     if ( parallel_probe(&tmp_src, &tmp_tag)) {
00212       parallel_recv_int(&i, tmp_src, tmp_tag);
00213       tmp_tag = kWorkTag;
00214       for (i = 0; i < work_count; i++) {
00215         if ( work_status[i] == tsWaiting) {
00216           cur_work = i;
00217           buffer_work_size = req_func( cur_work, data, &buffer_work );
00218           if ( buffer_work_size > 0) {
00219             i = work_count;
00220           } else {
00221             work_status[i] = tsDone;
00222             working--;
00223           }       
00224         }
00225       }
00226       if (  buffer_work_size > 0) {
00227         parallel_send_int( cur_work, tmp_src, tmp_tag);
00228         parallel_send_data( buffer_work, buffer_work_size, tmp_src, tmp_tag);
00229         work_status[ cur_work ] = tsWorking;
00230         free(buffer_work);
00231       }
00232     }
00233     tmp_src = PARALLEL_ANY_SOURCE;
00234     tmp_tag = kResultTag;
00235     while ( parallel_probe(&tmp_src, &tmp_tag)) {
00236       parallel_recv_int(&i, tmp_src, tmp_tag);
00237       parallel_recv_data(&buffer_result, &buffer_result_size, tmp_src, tmp_tag);
00238       work_status[ i ] = tsDone;
00239       res_func(  i, data, buffer_result, buffer_result_size);
00240       working--;
00241       free(buffer_result);
00242       tmp_src = PARALLEL_ANY_SOURCE;
00243       tmp_tag = kResultTag;
00244     }
00245     tmp_src = PARALLEL_ANY_SOURCE;
00246     tmp_tag = kRetryTag;
00247     while ( parallel_probe(&tmp_src, &tmp_tag)) {
00248       parallel_recv_int(&i, tmp_src, tmp_tag);
00249       work_status[ i ] = tsWaiting;
00250     }
00251     usleep(1000);
00252   } while (working);
00253   for (i = 1; i <  parallel_group_size(); i++) 
00254     parallel_send_int(0, i, kEndTag );
00255   MPI_Barrier(MPI_COMM_WORLD);
00256 }
00257 
00258 
00259 void parallel_slave_work(work_function work_func, void *data) {
00260   int tmp_src, tmp_tag, buffer_int;
00261   void *buffer_request, *buffer_result;
00262   char working = 1;
00263   int buffer_request_size, buffer_result_size;
00264 
00265   MPI_Barrier(MPI_COMM_WORLD);
00266 
00267   parallel_send_int(1, 0, kRequestTag );
00268   do {
00269     tmp_src = PARALLEL_ANY_SOURCE;
00270     tmp_tag = kWorkTag;
00271     if ( parallel_probe( &tmp_src, &tmp_tag ) ) {
00272       parallel_recv_int( &buffer_int, tmp_src, tmp_tag);
00273       parallel_recv_data( &buffer_request, &buffer_request_size, tmp_src, tmp_tag);
00274       
00275       buffer_result_size = work_func(buffer_int, data, buffer_request, buffer_request_size, &buffer_result);
00276 
00277       if (buffer_result_size == -1) {
00278         parallel_send_int( buffer_int, tmp_src, kRetryTag );
00279         parallel_send_int( 1, 0, kRequestTag );
00280       } else {
00281         parallel_send_int( buffer_int, tmp_src, kResultTag );
00282         parallel_send_data( buffer_result, buffer_result_size, tmp_src, kResultTag );
00283         parallel_send_int( 1, 0, kRequestTag );
00284       }
00285     }
00286     tmp_src = PARALLEL_ANY_SOURCE;
00287     tmp_tag = kEndTag;
00288     if ( parallel_probe( &tmp_src, &tmp_tag ) ) {
00289       parallel_recv_int( &buffer_int, tmp_src, tmp_tag);
00290       working = 0;
00291     }
00292     usleep(1000);
00293   } while (working);
00294   MPI_Barrier(MPI_COMM_WORLD);
00295 }
00296 
00297 
00298 
00299 
00300 #endif

Generated on Wed Apr 11 16:50:49 2007 for open_prospect by  doxygen 1.4.6