00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #ifndef AlpsKnowledgeBrokerMPI_h_
00024 #define AlpsKnowledgeBrokerMPI_h_
00025
00026 #include <cmath>
00027 #include <iosfwd>
00028
00029
00030
00031
00032 #include "mpi.h"
00033
00034 #include "AlpsEnumProcessT.h"
00035 #include "AlpsKnowledge.h"
00036 #include "AlpsKnowledgeBroker.h"
00037 #include "AlpsParams.h"
00038
00039
00040
00041 class AlpsKnowledgeBrokerMPI : public AlpsKnowledgeBroker {
00042
00043 private:
00045 AlpsKnowledgeBrokerMPI(const AlpsKnowledgeBrokerMPI&);
00046 AlpsKnowledgeBrokerMPI& operator=(const AlpsKnowledgeBrokerMPI&);
00047
00048 protected:
00049
00055 int processNum_;
00056
00058 int hubNum_;
00059
00061 int globalRank_;
00062
00064 MPI_Comm clusterComm_;
00065
00067 MPI_Comm hubComm_;
00068
00070 MPI_Group hubGroup_;
00071
00073 int clusterSize_;
00074
00076 int userClusterSize_;
00077
00079 int clusterRank_;
00080
00082 int* hubRanks_;
00083
00085 int myHubRank_;
00086
00088 int masterRank_;
00089
00091 AlpsProcessType processType_;
00092
00094 AlpsProcessType* processTypeList_;
00095
00097 bool hubWork_;
00098
00100 MPI_Request subTreeRequest_;
00101
00103 MPI_Request solRequestL_;
00104 MPI_Request solRequestR_;
00105
00107 MPI_Request modelKnowRequestL_;
00108 MPI_Request modelKnowRequestR_;
00109
00111 MPI_Request forwardRequestL_;
00112 MPI_Request forwardRequestR_;
00114
00120 double incumbentValue_;
00121
00123 int incumbentID_;
00124
00127 bool updateIncumbent_;
00129
00135 double workQuality_;
00136
00138 double clusterWorkQuality_;
00139
00141 double systemWorkQuality_;
00142
00144 double* hubWorkQualities_;
00145
00148 double* workerWorkQualities_;
00149
00151 double workQuantity_;
00152
00154 double clusterWorkQuantity_;
00155
00157 double systemWorkQuantity_;
00158
00160 double systemWorkQuantityForce_;
00161
00163 double* hubWorkQuantities_;
00164
00167 double* workerWorkQuantities_;
00168
00170 bool* workerReported_;
00171
00173 bool* hubReported_;
00174
00176 bool allHubReported_;
00177
00179 int masterDoBalance_;
00180
00182 int hubDoBalance_;
00183
00185 int* workerNodeProcesseds_;
00186
00188 int clusterNodeProcessed_;
00189
00191 int* hubNodeProcesseds_;
00193
00199 int sendCount_;
00200
00202 int recvCount_;
00203
00206 int clusterSendCount_;
00207
00210 int clusterRecvCount_;
00211
00213 int systemSendCount_;
00214
00216 int systemRecvCount_;
00218
00223 int masterIndexBatch_;
00225
00231 AlpsTimer masterTimer_;
00232
00234 AlpsTimer hubTimer_;
00235
00237 AlpsTimer workerTimer_;
00238
00240 double rampUpTime_;
00241
00243 double rampDownTime_;
00244
00246 double idleTime_;
00247
00249 double msgTime_;
00250
00252 AlpsPsStats psStats_;
00254
00256 bool forceTerminate_;
00257
00259 bool blockTermCheck_;
00260
00262 bool blockHubReport_;
00263
00265 bool blockWorkerReport_;
00266
00268 bool blockAskForWork_;
00269
00271 char *attachBuffer_;
00272
00274 char *largeBuffer_;
00275
00277 char *largeBuffer2_;
00278
00280 char *smallBuffer_;
00281
00284 double masterBalancePeriod_;
00285
00288 double hubReportPeriod_;
00289
00291 int modelGenID_;
00292
00294 int modelGenPos_;
00295
00297 AlpsSubTree* rampUpSubTree_;
00298
00300 int unitWorkNodes_;
00301
00303 int haltSearch_;
00304
00305 protected:
00306
00308 void init();
00309
00316 void masterMain(AlpsTreeNode* root);
00317
00320 void hubMain();
00321
00324 void workerMain();
00326
00328
00329
00330 AlpsReturnStatus doOneUnitWork(int unitWork,
00331 double unitTime,
00332 AlpsExitStatus & exitStatus,
00333 int & numNodesProcessed,
00334 int & numNodesBranched,
00335 int & numNodesDiscarded,
00336 int & numNodesPartial,
00337 int & depth,
00338 bool & betterSolution);
00339
00341 void processMessages(char *&buffer,
00342 MPI_Status &status,
00343 MPI_Request &request);
00344
00346 void rootInitMaster(AlpsTreeNode* root);
00347 void rootInitHub();
00348 void rootInitWorker();
00349
00351 void spiralMaster(AlpsTreeNode* root);
00352 void spiralHub();
00353 void spiralWorker();
00354
00355
00356
00361 void masterAskHubDonate(int donorID,
00362 int receiverID,
00363 double receiverWorkload);
00364
00366 void hubAskWorkerDonate(int donorID,
00367 int receiverID,
00368 double receiverWorkload);
00369
00371 void updateWorkloadInfo();
00372
00373 virtual int getNumNodeLeftSystem()
00374 { return static_cast<int>(systemWorkQuantity_); }
00375
00377 void donateWork(char*& buf,
00378 int tag,
00379 MPI_Status* status,
00380 int recvID = -1,
00381 double recvWL = 0.0);
00382
00384 void hubAllocateDonation(char*& buf, MPI_Status* status);
00385
00387 void hubBalanceWorkers();
00388
00390 void hubSatisfyWorkerRequest(char*& buf, MPI_Status* status);
00391
00393
00394 void hubReportStatus(int tag, MPI_Comm comm);
00395
00397
00398 void hubUpdateCluStatus(char*& buf, MPI_Status* status, MPI_Comm comm);
00399
00401 void hubsShareWork(char*& buf, MPI_Status* status);
00402
00404 void masterBalanceHubs();
00405
00407
00408 void masterUpdateSysStatus(char*& buf, MPI_Status* status, MPI_Comm comm);
00409
00411 void refreshSysStatus();
00412
00414 void refreshClusterStatus();
00415
00417
00418 void workerReportStatus(int tag, MPI_Comm comm);
00420
00421
00422
00428 void workerAskIndices();
00429
00431 void workerRecvIndices(char *&bufLarge);
00432
00434 void masterSendIndices(char *&bufLarge);
00436
00437
00438
00444 void broadcastModel(const int id, const int source);
00445
00447 void sendIncumbent();
00448
00451 bool unpackSetIncumbent(char*& buf, MPI_Status* status);
00452
00454 void collectBestSolution(int destination);
00455
00458 void tellMasterRecv();
00459
00462
00463 void tellHubRecv();
00464
00469 void packEncoded(AlpsEncoded* enc,
00470 char*& buf,
00471 int& size,
00472 int& position,
00473 MPI_Comm comm);
00474
00476 AlpsEncoded* unpackEncoded(char*& buf,
00477 int& position,
00478 MPI_Comm comm,
00479 int size = -1);
00480
00483
00484 void receiveSizeBuf(char*& buf,
00485 int sender,
00486 int tag,
00487 MPI_Comm comm,
00488 MPI_Status* status);
00489
00492
00493 void receiveRampUpNode(int sender,
00494 MPI_Comm comm,
00495 MPI_Status* status);
00496
00499 void receiveSubTree(char*& buf, int sender, MPI_Status* status);
00500
00502
00503 void sendSizeBuf(char*& buf,
00504 int size,
00505 int position,
00506 const int target,
00507 const int tag,
00508 MPI_Comm comm);
00509
00512
00513 void sendRampUpNode(const int target, MPI_Comm comm);
00514
00517 void sendNodeModelGen(int receiver, int doUnitWork);
00518
00520 bool sendSubTree(const int target, AlpsSubTree*& st, int tag);
00521
00523
00524 void sendFinishInit(const int target, MPI_Comm comm);
00526
00528 void deleteSubTrees();
00529
00530
00531 void forwardModelKnowledge();
00532
00534
00535 void sendModelKnowledge(MPI_Comm comm, int receiver=-1);
00536
00538
00539 void receiveModelKnowledge(MPI_Comm comm);
00540
00545 void incSendCount(const char* how, int s = 1);
00547 void decSendCount(const char* how, int s = 1);
00549 void incRecvCount(const char* how, int s = 1);
00551 void decRecvCount(const char* how, int s = 1);
00553
00555 void masterForceHubTerm();
00556
00558 void hubForceWorkerTerm();
00559
00561 void changeWorkingSubTree(double & changeWorkThreshold);
00562
00564 void sendErrorCodeToMaster(int errorCode);
00565
00567 void recvErrorCode(char *& bufLarge);
00568
00570 void spiralRecvProcessNode();
00571
00573 void spiralDonateNode();
00574
00575 public:
00576
00579 AlpsKnowledgeBrokerMPI()
00580 :
00581 AlpsKnowledgeBroker()
00582 {
00583 init();
00584 }
00585
00587 AlpsKnowledgeBrokerMPI(int argc,
00588 char* argv[],
00589 AlpsModel& model)
00590 :
00591 AlpsKnowledgeBroker()
00592 {
00593 init();
00594 initializeSearch(argc, argv, model);
00595 }
00596
00598 ~AlpsKnowledgeBrokerMPI();
00599
00601 virtual int getProcRank() const { return globalRank_; }
00602
00604 virtual int getMasterRank() const { return masterRank_; }
00605
00607 virtual AlpsProcessType getProcType() const { return processType_; }
00608
00621 void initializeSearch(int argc, char* argv[], AlpsModel& model);
00622
00624 void search(AlpsModel *model);
00625
00633 void rootSearch(AlpsTreeNode* root);
00634
00638 virtual double getIncumbentValue() const {
00639 double bestObj = ALPS_OBJ_MAX;
00640 if (AlpsKnowledgeBroker::hasKnowledge(AlpsKnowledgeTypeSolution)) {
00641 bestObj = getBestKnowledge(AlpsKnowledgeTypeSolution).second;
00642 if (incumbentValue_ > bestObj) {
00643 return bestObj;
00644 }
00645 }
00646 return incumbentValue_;
00647 }
00648
00650 virtual double getBestQuality() const {
00651 if (globalRank_ == masterRank_) {
00652 if (getNumKnowledges(AlpsKnowledgeTypeSolution) > 0) {
00653 return getBestKnowledge(AlpsKnowledgeTypeSolution).second;
00654 }
00655 else {
00656 return ALPS_OBJ_MAX;
00657 }
00658 }
00659 else {
00660 return ALPS_OBJ_MAX;
00661 }
00662 }
00663
00665 virtual double getBestEstimateQuality() { return systemWorkQuality_; }
00666
00668 virtual void printBestSolution(char* outputFile = 0) const;
00669
00671 virtual void searchLog();
00673
00674
00675
00680 void sendKnowledge(AlpsKnowledgeType type,
00681 int sender,
00682 int receiver,
00683 char *& msgBuffer,
00684 int msgSize,
00685 int msgTag,
00686 MPI_Comm comm,
00687 bool blocking);
00688
00690 void receiveKnowledge(AlpsKnowledgeType type,
00691 int sender,
00692 int receiver,
00693 char *& msgBuffer,
00694 int msgSize,
00695 int msgTag,
00696 MPI_Comm comm,
00697 MPI_Status* status,
00698 bool blocking);
00699
00701 void requestKnowledge(AlpsKnowledgeType type,
00702 int sender,
00703 int receiver,
00704 char *& msgBuffer,
00705 int msgSize,
00706 int msgTag,
00707 MPI_Comm comm,
00708 bool blocking);
00710
00711 };
00712 #endif
00713
00714