#include #include #include #include #include #include #include #include using namespace std; #include #if TARGET == QCDOC #include #endif CPS_START_NAMESPACE ///////////////////////////////////////////////////////////////// // QioArg members//////////////////////////////////////// ///////////////////////////////////////////////////////////////// void QioArg::init(const char * file, const int concur_io_number, const Float chk_prec, const FP_FORMAT file_format, const INT_FORMAT file_int_format, const int recon_row_3) { for(int dir=0;dir<5;dir++) { nodes[dir] = GJP.Nodes(dir); node_sites[dir] = GJP.NodeSites(dir); coor[dir] = GJP.NodeCoor(dir); } // Make it all periodic as NERSC header specifies gauge boundary condition, // 04/03/05 CJ for(int dir=0;dir<4;dir++) bc[dir] = BND_CND_PRD; // bc[dir] = GJP.Bc(dir); StartConfLoadAddr = GJP.StartConfLoadAddr(); StartU1ConfLoadAddr = GJP.StartU1ConfLoadAddr(); // user set params ConcurIONumber = concur_io_number; strcpy(FileName, file); if(!UniqueID()){ printf("QioArg::init copied filename string '%s' from %p to %p\n",file,file,FileName); fflush(stdout); } if(!UniqueID()){ printf("QioArg::init FileName = '%s'\n",FileName); fflush(stdout); } CheckPrecision = chk_prec; FileFpFormat = file_format; FileIntFormat = file_int_format; ReconRow3 = recon_row_3; } ///////////////////////////////////////////////////////////////////// // QioControl members//////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////// QioControl::QioControl() : num_concur_io(0), do_log(0), cname("QioControl"), io_good(false), GparityReconstructUstarField(true) { // cout << "I am on a " << GJP.Xnodes() << "x"<< GJP.Ynodes() << "x"<< GJP.Znodes() // << "x"<< GJP.Tnodes() <<"x"<1) { error = globalSumInt(error); if(error > 0) { VRB.Flow(cname,fname,"Totally %d nodes reported error!\n",error); } } return error; } void QioControl::broadcastInt(int * data, int fromID) const { } void QioControl::broadcastInt(int * data, int fromID) const { if(NumNodes() > 1) { if(unique_id != fromID) { *data = 0; } *data = globalSumInt(*data); } } void QioControl::broadcastFloat(Float * data, int fromID) const { if(NumNodes() > 1) { if(unique_id != fromID) { * data = 0; } *data = globalSumFloat(*data); } } int QioControl::round(const Float fdata) const{ int ndata = (int)fdata; if(fdata - ndata >= 0.5) ndata++; if(fdata - ndata < -0.5) ndata--; return ndata; } int QioControl::globalSumInt(const int data) const{ #ifdef PARALLEL // Gsum64Ext gsum; // return gsum.Sum(data); int hfbits = sizeof(unsigned int) * 8 / 2; unsigned int mask = (1 << hfbits) - 1; int sumd = data; int hi = sumd >> hfbits; int lo = sumd & mask; hi = round(globalSumFloat(hi)); lo = round(globalSumFloat(lo)); sumd = (hi<> hfbits; unsigned int lo = sumd & mask; hi = round(globalSumFloat(hi)); lo = round(globalSumFloat(lo)); sumd = (hi< 1) { // using intelligent commander(node-0), dumb server(others) mode if(unique_id == 0) { return IOCommander(0); } else { int firstID, lastID; while(1) { broadcastInt(&firstID); broadcastInt(&lastID); if(unique_id >= firstID && unique_id <= lastID){ // got time slot // printf("Node %d: Got time slot!\n",UniqueID()); return 1; } synchronize(); } } } return 1; } int QioControl::finishIOTimeSlot() const { // printf("Node %d: finishIOTimeSlot()\n",UniqueID()); if(NumNodes() > 1) { if(unique_id == 0) { return IOCommander(1); } else { if(synchronize()<0) return 0; // io finished while(1) { int dummy; broadcastInt(&dummy); broadcastInt(&dummy); if(synchronize()<0) break; } } } return 0; } int QioControl::IOCommander(int caller) const { const char * fname = "IOCommander()"; int totalnodes = NumNodes(); int do_concur_io = num_concur_io; if(do_concur_io <= 0) do_concur_io = totalnodes; int batches = totalnodes / do_concur_io; if(do_concur_io * batches < totalnodes) batches ++; int firstID, lastID; if(caller == 0) { // let node 0 finish its task first (w/ the first batch) firstID = 0; lastID = do_concur_io-1; if(lastID > totalnodes-1) lastID = totalnodes-1; printf("Node %d: IOCommander(%d) batches=%d firstID=%d lastID=%d\n",UniqueID(),caller, batches, firstID,lastID); broadcastInt(&firstID); broadcastInt(&lastID); VRB.Flow(cname, fname, "Parallel IO: Group 1, Node %d thru Node %d\n",firstID,lastID); return 1; } else { // now node 0 finished his own io, can control others if(batches==1) { synchronize(-1); // io finished return 0; } for(int i=1;i totalnodes-1) lastID = totalnodes-1; broadcastInt(&firstID); broadcastInt(&lastID); VRB.Flow(cname,fname,"Parallel IO: Group %d, Node %d thru Node %d\n",i+1,firstID,lastID); } synchronize(-1); // io finished return 0; } } void QioControl::buildNodesList(int * active_num, int * active_node_list, int this_active) const { *active_num = globalSumInt(this_active?1:0); for(int i=0;i < *active_num; i++) { int sendid; if(this_active) sendid = uniqueID(); else sendid = NumNodes(); // > all possible uniqueID(); active_node_list[i] = globalMinInt(sendid); if(active_node_list[i] == uniqueID()) this_active = 0; // exclude the nodes already in list } } int QioControl::syncError(int this_error) const { const char * fname = "testError()"; TempBufAlloc nodes_list_buf(NumNodes()*sizeof(int)); int * nodes_list = nodes_list_buf.IntPtr(); int error_nodes; buildNodesList(&error_nodes, nodes_list, this_error); if(error_nodes>0) { VRB.Flow(cname, fname, "%d nodes report error! They are (if more than 10 nodes, only list first 10 ids):\n", error_nodes); for(int i=0;i<10 && i0) { ERR.FileA(cname,fname,logname); } /* prevlogs.open(oldlogname); if(prevlogs.is_open()) { logs << prevlogs.rdbuf(); prevlogs.close(); } logs.clear(); // if prevlogs is empty, the logs may have a error bit set */ /* char logfile[200]; strcpy(logfile,log_dir); strcat(logfile,"/qcdio.log"); logs = Fopen(ADD_ID, logfile, "a"); if(!logs) error = 1; if(testError(error) > 0) { ERR.FileA(cname,fname,logfile); } */ // cout << "start logging..." << endl; // start logging struct timeval tp; gettimeofday(&tp,NULL); log_start = tp.tv_sec; char logtime[100]; strcpy(logtime,ctime(&log_start)); logtime[strlen(logtime)-1] = '\0'; // cut the last '\n' logs << "LOG<" << uniqueID() << ">["<< logtime << "] "; if(action) logs << action; logs<<" : \t"; log_point = logs.tellp(); logs << "Processing" << endl << flush; logging = 1; } void QioControl::log(const char * short_note) { const char * fname = "log()"; int error = 0; if(!do_log || !logging) return; // if(!logs.is_open() || !logs.good()) error = 1; if(syncError(error)>0) { ERR.Hardware(cname,fname,"Wrinting to file qcdio.log.* failed"); } // cout << "continue logging..." << endl; struct timeval tp; gettimeofday(&tp,NULL); time_t tm_elapse = tp.tv_sec - log_start; logs.seekp(log_point); logs << tm_elapse; if(short_note) logs << "(" << short_note << ")"; logs<<"\t"; log_point = logs.tellp(); logs<<"Processing" << endl << flush; } void QioControl::finishLogging(const char * ending_word) { const char * fname = "finishLogging()"; int error = 0; if(!do_log || !logging) return; // if(!logs.is_open() || !logs.good()) error=1; if(syncError(error)>0) { ERR.Hardware(cname,fname,"Closing file qcdio.log.* failed"); } // cout << "finish logging..." << endl; struct timeval tp; gettimeofday(&tp,NULL); time_t tm_elapse = tp.tv_sec - log_start; logs.seekp(log_point); if(ending_word) logs << ending_word; logs<< "["<