/*___INFO__MARK_BEGIN__*/ /************************************************************************* * * The Contents of this file are made available subject to the terms of * the Sun Industry Standards Source License Version 1.2 * * Sun Microsystems Inc., March, 2001 * * * Sun Industry Standards Source License Version 1.2 * ================================================= * The contents of this file are subject to the Sun Industry Standards * Source License Version 1.2 (the "License"); You may not use this file * except in compliance with the License. You may obtain a copy of the * License at http://gridengine.sunsource.net/Gridengine_SISSL_license.html * * Software provided under this License is provided on an "AS IS" basis, * WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, * WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS, * MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING. * See the License for the specific provisions governing your rights and * obligations concerning the Software. * * The Initial Developer of the Original Code is: Sun Microsystems, Inc. * * Copyright: 2001 by Sun Microsystems, Inc. * * All Rights Reserved. * ************************************************************************/ /*___INFO__MARK_END__*/ #include #include #include #include #include #include #include "rmon/sgermon.h" #include "uti/sge_log.h" #include "uti/sge_prog.h" #include "uti/sge_unistd.h" #include "uti/sge_uidgid.h" #include "uti/sge_os.h" #include "uti/sge_hostname.h" #include "uti/sge_bootstrap.h" #include "uti/sge_spool.h" #include "uti/setup_path.h" #include "uti/config_file.h" #include "evm/sge_event_master.h" #include "spool/sge_spooling.h" #include "gdi/qm_name.h" #include "sgeobj/parse.h" #include "sgeobj/sge_all_listsL.h" #include "sgeobj/sge_host.h" #include "sgeobj/sge_utility.h" #include "sgeobj/sge_answer.h" #include "sgeobj/sge_job.h" #include "sgeobj/sge_resource_quota.h" #include "sgeobj/sge_qinstance.h" #include "sgeobj/sge_qinstance_state.h" #include "sgeobj/sge_cqueue.h" #include "sgeobj/sge_userprj.h" #include "sgeobj/sge_manop.h" #include "sgeobj/sge_centry.h" #include "sgeobj/sge_userset.h" #include "sgeobj/sge_conf.h" #include "sched/sge_sched.h" #include "sge.h" #include "sge_resource_quota_qmaster.h" #include "sge_advance_reservation_qmaster.h" #include "sge_qinstance_qmaster.h" #include "sge_qmod_qmaster.h" #include "sge_persistence_qmaster.h" #include "sge_sharetree_qmaster.h" #include "sge_host_qmaster.h" #include "sge_pe_qmaster.h" #include "sge_cqueue_qmaster.h" #include "sge_job_qmaster.h" #include "sge_task_depend.h" #include "sched_conf_qmaster.h" #include "configuration_qmaster.h" #include "lock.h" #include "usage.h" #include "shutdown.h" #include "sge_give_jobs.h" #include "msg_daemons_common.h" #include "msg_qmaster.h" #include "msg_common.h" #include "spool/sge_spooling.h" #include "sgeobj/sge_resource_quota.h" #include "sge_resource_quota_qmaster.h" #include "sge_advance_reservation_qmaster.h" #include "sge_qinstance_qmaster.h" #include "uti/sge_time.h" struct cmplx_tmp { char *name; char *shortcut; u_long32 valtype; u_long32 relop; u_long32 consumable; char *valdefault; u_long32 requestable; char *urgency_weight; }; static void process_cmdline(char**); static lList* parse_cmdline_qmaster(char**, lList**); static lList* parse_qmaster(lList**, u_long32*); static void qmaster_init(sge_gdi_ctx_class_t *ctx, char**); static void communication_setup(sge_gdi_ctx_class_t *ctx); static bool is_qmaster_already_running(const char *qmaster_spool_dir); static void qmaster_lock_and_shutdown(void **ctx_ref, int); static int setup_qmaster(sge_gdi_ctx_class_t *ctx); static int remove_invalid_job_references(bool job_spooling, int user, object_description *object_base); static int debit_all_jobs_from_qs(void); static void init_categories(void); /****** qmaster/setup_qmaster/sge_setup_qmaster() ****************************** * NAME * sge_setup_qmaster() -- setup qmaster * * SYNOPSIS * int sge_setup_qmaster(char* anArgv[]) * * FUNCTION * Process commandline arguments. Remove qmaster lock file. Write qmaster * host to the 'act_qmaster' file. Initialize qmaster and reporting. Write * qmaster PID file. * * NOTE: Before this function is invoked, qmaster must become admin user. * * INPUTS * char* anArgv[] - commandline argument vector * * RESULT * 0 - success * * NOTES * MT-NOTE: sge_setup_qmaster() is NOT MT safe! * MT-NOTE: * MT-NOTE: This function must be called exclusively, with the qmaster main * MT-NOTE: thread being the *only* active thread. In other words, do not * MT-NOTE: invoke this function after any additional thread (directly or * MT-NOTE: indirectly) has been created. * * Do *not* write the qmaster pid file, before 'qmaster_init()' did return * successfully. Otherwise, if we do have a running qmaster and a second * qmaster is started (illegally) on the same host, the second qmaster will * overwrite the pid of the qmaster started first. The second qmaster will * detect it's insubordinate doing and terminate itself, thus leaving behind * a useless pid. * *******************************************************************************/ int sge_setup_qmaster(sge_gdi_ctx_class_t *ctx, char* anArgv[]) { char err_str[1024]; const char *qualified_hostname = ctx->get_qualified_hostname(ctx); const char *act_qmaster_file = ctx->get_act_qmaster_file(ctx); DENTER(TOP_LAYER, "sge_setup_qmaster"); umask(022); /* this needs a better solution */ process_cmdline(anArgv); INFO((SGE_EVENT, MSG_STARTUP_BEGINWITHSTARTUP)); qmaster_unlock(QMASTER_LOCK_FILE); if (write_qm_name(qualified_hostname, act_qmaster_file, err_str)) { ERROR((SGE_EVENT, "%s\n", err_str)); SGE_EXIT(NULL, 1); } qmaster_init(ctx, anArgv); sge_write_pid(QMASTER_PID_FILE); DEXIT; return 0; } /* sge_setup_qmaster() */ /****** qmaster/setup_qmaster/sge_qmaster_thread_init() ************************ * NAME * sge_qmaster_thread_init() -- Initialize a qmaster thread. * * SYNOPSIS * int sge_qmaster_thread_init(bool switch_to_admin_user) * * FUNCTION * Subsume functions which need to be called immediately after thread * startup. This function does make sure that the thread local data * structures do contain reasonable values. * * INPUTS * bool switch_to_admin_user - become admin user if set to true * * RESULT * 0 - success * * NOTES * MT-NOTE: sge_qmaster_thread_init() is MT safe * MT-NOTE: * MT-NOTE: sge_qmaster_thread_init() should be invoked at the beginning * MT-NOTE: of a thread function. * *******************************************************************************/ int sge_qmaster_thread_init(sge_gdi_ctx_class_t **ctx_ref, u_long32 prog_id, u_long32 thread_id, bool switch_to_admin_user) { const char *admin_user = NULL; lList *alp = NULL; sge_gdi_ctx_class_t *ctx = NULL; DENTER(TOP_LAYER, "sge_qmaster_thread_init"); lInit(nmv); if (sge_setup2(ctx_ref, prog_id, thread_id, &alp, true) != AE_OK) { answer_list_output(&alp); SGE_EXIT((void**)ctx_ref, 1); } ctx = *ctx_ref; ctx->reresolve_qualified_hostname(ctx); DEBUG((SGE_EVENT,"%s: qualified hostname \"%s\"\n", SGE_FUNC, ctx->get_qualified_hostname(ctx))); admin_user = ctx->get_admin_user(ctx); if (switch_to_admin_user == true) { char str[1024]; if (sge_set_admin_username(admin_user, str) == -1) { CRITICAL((SGE_EVENT, str)); SGE_EXIT((void**)ctx_ref, 1); } if (sge_switch2admin_user()) { CRITICAL((SGE_EVENT, MSG_ERROR_CANTSWITCHTOADMINUSER)); SGE_EXIT((void**)ctx_ref, 1); } } DEXIT; return 0; } /* sge_qmaster_thread_init() */ /****** qmaster/setup_qmaster/sge_setup_job_resend() *************************** * NAME * sge_setup_job_resend() -- Setup job resend events. * * SYNOPSIS * void sge_setup_job_resend(void) * * FUNCTION * Register a job resend event for each job or array task which does have a * 'JTRANSFERING' status. * * INPUTS * void - none * * RESULT * void - none * * NOTES * MT-NOTE: sge_setup_job_resend() is not MT safe * *******************************************************************************/ void sge_setup_job_resend(void) { lListElem *job = NULL; object_description *object_base = object_type_get_object_description(); DENTER(TOP_LAYER, "sge_setup_job_resend"); job = lFirst(*object_base[SGE_TYPE_JOB].list); while (NULL != job) { lListElem *task; u_long32 job_num; job_num = lGetUlong(job, JB_job_number); task = lFirst(lGetList(job, JB_ja_tasks)); while (NULL != task) { if (lGetUlong(task, JAT_status) == JTRANSFERING) { lListElem *granted_queue, *qinstance, *host; const char *qname; u_long32 task_num, when; te_event_t ev; task_num = lGetUlong(task, JAT_task_number); granted_queue = lFirst(lGetList(task, JAT_granted_destin_identifier_list)); qname = lGetString(granted_queue, JG_qname); qinstance = cqueue_list_locate_qinstance(*object_base[SGE_TYPE_CQUEUE].list, qname); host = host_list_locate(*object_base[SGE_TYPE_EXECHOST].list, lGetHost(qinstance, QU_qhostname)); when = lGetUlong(task, JAT_start_time); when += MAX(load_report_interval(host), MAX_JOB_DELIVER_TIME); ev = te_new_event((time_t)when, TYPE_JOB_RESEND_EVENT, ONE_TIME_EVENT, job_num, task_num, "job-resend_event"); te_add_event(ev); te_free_event(&ev); DPRINTF(("Did add job resend for "sge_u32"/"sge_u32" at %d\n", job_num, task_num, when)); } task = lNext(task); } job = lNext(job); } DEXIT; return; } /* sge_setup_job_resend() */ /****** setup_qmaster/sge_process_qmaster_cmdline() **************************** * NAME * sge_process_qmaster_cmdline() -- global available function for qmaster * * SYNOPSIS * void sge_process_qmaster_cmdline(char**anArgv) * * FUNCTION * This function simply calls the static function process_cmdline() * * INPUTS * char**anArgv - command line arguments from main() * * NOTES * MT-NOTE: sge_process_qmaster_cmdline() is NOT MT safe * * SEE ALSO * qmaster/setup_qmaster/process_cmdline() *******************************************************************************/ void sge_process_qmaster_cmdline(char**anArgv) { process_cmdline(anArgv); } /****** qmaster/setup_qmaster/process_cmdline() ******************************** * NAME * process_cmdline() -- Handle command line arguments * * SYNOPSIS * static void process_cmdline(char **anArgv) * * FUNCTION * Handle command line arguments. Parse argument vector and handle options. * * INPUTS * char **anArgv - pointer to agrument vector * * RESULT * void - none * * NOTES * MT-NOTE: process_cmdline() is NOT MT safe. * *******************************************************************************/ static void process_cmdline(char **anArgv) { lList *alp, *pcmdline; lListElem *aep; u_long32 help = 0; DENTER(TOP_LAYER, "process_cmdline"); alp = pcmdline = NULL; alp = parse_cmdline_qmaster(&anArgv[1], &pcmdline); if(alp) { /* ** high level parsing error! show answer list */ for_each(aep, alp) { fprintf(stderr, "%s", lGetString(aep, AN_text)); } lFreeList(&alp); lFreeList(&pcmdline); SGE_EXIT(NULL, 1); } alp = parse_qmaster(&pcmdline, &help); if(alp) { /* ** low level parsing error! show answer list */ for_each(aep, alp) { fprintf(stderr, "%s", lGetString(aep, AN_text)); } lFreeList(&alp); lFreeList(&pcmdline); SGE_EXIT(NULL, 1); } if(help) { /* user wanted to see help. we can exit */ lFreeList(&pcmdline); SGE_EXIT(NULL, 0); } DEXIT; return; } /* process_cmdline */ /****** qmaster/setup_qmaster/parse_cmdline_qmaster() ************************** * NAME * parse_cmdline_qmaster() -- Parse command line arguments * * SYNOPSIS * static lList* parse_cmdline_qmaster(char **argv, lList **ppcmdline) * * FUNCTION * Decompose argument vector. Handle options and option arguments. * * INPUTS * char **argv - pointer to argument vector * lList **ppcmdline - pointer to lList pointer which does contain the * command line arguments upon return. * * RESULT * lList* - pointer to answer list * * NOTES * MT-NOTE: parse_cmdline_qmaster() is MT safe. * *******************************************************************************/ static lList *parse_cmdline_qmaster(char **argv, lList **ppcmdline ) { char **sp; char **rp; stringT str; lList *alp = NULL; DENTER(TOP_LAYER, "parse_cmdline_qmaster"); rp = argv; while(*(sp=rp)) { /* -help */ if ((rp = parse_noopt(sp, "-help", NULL, ppcmdline, &alp)) != sp) continue; /* oops */ sprintf(str, MSG_PARSE_INVALIDOPTIONARGUMENTX_S, *sp); printf("%s\n", *sp); sge_usage(QMASTER, stderr); answer_list_add(&alp, str, STATUS_ESEMANTIC, ANSWER_QUALITY_ERROR); DEXIT; return alp; } DEXIT; return alp; } /* parse_cmdline_qmaster() */ /****** qmaster/setup_qmaster/parse_qmaster() ********************************** * NAME * parse_qmaster() -- Process options. * * SYNOPSIS * static lList* parse_qmaster(lList **ppcmdline, u_long32 *help) * * FUNCTION * Process options * * INPUTS * lList **ppcmdline - list of options * u_long32 *help - flag is set upon return if help has been requested * * RESULT * lList* - answer list * * NOTES * MT-NOTE: parse_qmaster() is not MT safe. * *******************************************************************************/ static lList *parse_qmaster(lList **ppcmdline, u_long32 *help ) { stringT str; lList *alp = NULL; int usageshowed = 0; DENTER(TOP_LAYER, "parse_qmaster"); /* Loop over all options. Only valid options can be in the ppcmdline list. */ while(lGetNumberOfElem(*ppcmdline)) { /* -help */ if(parse_flag(ppcmdline, "-help", &alp, help)) { usageshowed = 1; sge_usage(QMASTER, stdout); break; } } if(lGetNumberOfElem(*ppcmdline)) { sprintf(str, MSG_PARSE_TOOMANYOPTIONS); if(!usageshowed) sge_usage(QMASTER, stderr); answer_list_add(&alp, str, STATUS_ESEMANTIC, ANSWER_QUALITY_ERROR); DEXIT; return alp; } DEXIT; return alp; } /* parse_qmaster() */ /****** qmaster/setup_qmaster/qmaster_init() *********************************** * NAME * qmaster_init() -- Initialize qmaster * * SYNOPSIS * static void qmaster_init(char **anArgv) * * FUNCTION * Initialize qmaster. Do general setup and communication setup. * * INPUTS * char **anArgv - process argument vector * * RESULT * void - none * * NOTES * MT-NOTE: qmaster_init() is NOT MT safe. * *******************************************************************************/ static void qmaster_init(sge_gdi_ctx_class_t *ctx, char **anArgv) { DENTER(TOP_LAYER, "qmaster_init"); if (setup_qmaster(ctx)) { CRITICAL((SGE_EVENT, MSG_STARTUP_SETUPFAILED)); SGE_EXIT(NULL, 1); } ctx->set_exit_func(ctx, qmaster_lock_and_shutdown); communication_setup(ctx); starting_up(); /* write startup info message to message file */ DRETURN_VOID; } /* qmaster_init() */ /****** qmaster/setup_qmaster/communication_setup() **************************** * NAME * communication_setup() -- set up communication * * SYNOPSIS * static void communication_setup(void) * * FUNCTION * Initialize qmaster communication. * * This function will fail, if the configured qmaster port is already in * use. * * This could happen if either qmaster has been terminated shortly before * and the operating system did not get around to free the port or there * is a qmaster already running. * * INPUTS * void - none * * RESULT * void - none * * NOTES * MT-NOTE: communication_setup() is NOT MT safe * *******************************************************************************/ static void communication_setup(sge_gdi_ctx_class_t *ctx) { cl_com_handle_t* com_handle = NULL; char* qmaster_params = NULL; #if defined(IRIX) struct rlimit64 qmaster_rlimits; #else struct rlimit qmaster_rlimits; #endif const char *qualified_hostname = ctx->get_qualified_hostname(ctx); u_long32 qmaster_port = ctx->get_sge_qmaster_port(ctx); const char *qmaster_spool_dir = ctx->get_qmaster_spool_dir(ctx); DENTER(TOP_LAYER, "communication_setup"); DEBUG((SGE_EVENT,"my resolved hostname name is: \"%s\"\n", qualified_hostname)); com_handle = cl_com_get_handle(prognames[QMASTER], 1); if (com_handle == NULL) { ERROR((SGE_EVENT, "port "sge_u32" already bound\n", qmaster_port)); if (is_qmaster_already_running(qmaster_spool_dir) == true) { char *host = NULL; int res = -1; res = cl_com_gethostname(&host, NULL, NULL,NULL); CRITICAL((SGE_EVENT, MSG_QMASTER_FOUNDRUNNINGQMASTERONHOSTXNOTSTARTING_S, ((CL_RETVAL_OK == res ) ? host : "unknown"))); if (CL_RETVAL_OK == res) { FREE(host); } } SGE_EXIT(NULL, 1); } if (com_handle) { unsigned long max_connections = 0; u_long32 old_ll = 0; /* * re-check file descriptor limits for qmaster */ #if defined(IRIX) getrlimit64(RLIMIT_NOFILE, &qmaster_rlimits); #else getrlimit(RLIMIT_NOFILE, &qmaster_rlimits); #endif /* save old debug log level and set log level to INFO */ old_ll = log_state_get_log_level(); /* enable max connection close mode */ cl_com_set_max_connection_close_mode(com_handle, CL_ON_MAX_COUNT_CLOSE_AUTOCLOSE_CLIENTS); cl_com_get_max_connections(com_handle, &max_connections); /* add local host to allowed host list */ cl_com_add_allowed_host(com_handle,com_handle->local->comp_host); /* check dynamic event client count */ mconf_set_max_dynamic_event_clients(sge_set_max_dynamic_event_clients(mconf_get_max_dynamic_event_clients())); /* log startup info into qmaster messages file */ log_state_set_log_level(LOG_INFO); INFO((SGE_EVENT, MSG_QMASTER_FD_HARD_LIMIT_SETTINGS_U, sge_u32c(qmaster_rlimits.rlim_max))); INFO((SGE_EVENT, MSG_QMASTER_FD_SOFT_LIMIT_SETTINGS_U, sge_u32c(qmaster_rlimits.rlim_cur))); INFO((SGE_EVENT, MSG_QMASTER_MAX_FILE_DESCRIPTORS_LIMIT_U, sge_u32c(max_connections))); INFO((SGE_EVENT, MSG_QMASTER_MAX_EVC_LIMIT_U, sge_u32c(mconf_get_max_dynamic_event_clients()))); log_state_set_log_level(old_ll); } cl_commlib_set_connection_param(cl_com_get_handle(prognames[QMASTER], 1), HEARD_FROM_TIMEOUT, mconf_get_max_unheard()); /* fetching qmaster_params and begin to parse */ qmaster_params = mconf_get_qmaster_params(); /* updating the commlib paramterlist with new or changed parameters */ cl_com_update_parameter_list(qmaster_params); DPRINTF(("received qmaster_params are: %s\n", qmaster_params)); FREE(qmaster_params); /* now enable qmaster communication */ cl_commlib_set_global_param(CL_COMMLIB_DELAYED_LISTEN, CL_FALSE); DEXIT; return; } /* communication_setup() */ /****** qmaster/setup_qmaster/is_qmaster_already_running() ********************* * NAME * is_qmaster_already_running() -- is qmaster already running * * SYNOPSIS * static bool is_qmaster_already_running(void) * * FUNCTION * Check, whether there is running qmaster already. * * INPUTS * void - none * * RESULT * true - running qmaster detected. * false - otherwise * * NOTES * MT-NOTE: is_qmaster_already_running() is not MT safe * * BUGS * This function will only work, if the PID found in the qmaster PID file * either does belong to a running qmaster or no process at all. * * Of course PID's will be reused. This is, however, not a problem because * of the very specifc situation in which this function is called. * *******************************************************************************/ static bool is_qmaster_already_running(const char *qmaster_spool_dir) { enum { NULL_SIGNAL = 0 }; bool res = true; char pidfile[SGE_PATH_MAX] = { '\0' }; pid_t pid = 0; DENTER(TOP_LAYER, "is_qmaster_already_running"); sprintf(pidfile, "%s/%s", qmaster_spool_dir, QMASTER_PID_FILE); if ((pid = sge_readpid(pidfile)) == 0) { DEXIT; return false; } res = (kill(pid, NULL_SIGNAL) == 0) ? true: false; DEXIT; return res; } /* is_qmaster_already_running() */ static void sge_propagate_queue_suspension(object_description *object_base, lListElem *jep, dstring *cqueue_name, dstring *host_domain) { const lListElem *gdil_ep, *cq, *qi; lListElem *jatep; for_each (jatep, lGetList(jep, JB_ja_tasks)) { u_long32 jstate = lGetUlong(jatep, JAT_state); bool is_suspended = false; for_each (gdil_ep, lGetList(jatep, JAT_granted_destin_identifier_list)) { if (!cqueue_name_split(lGetString(gdil_ep, JG_qname), cqueue_name, host_domain, NULL, NULL)) { continue; } if (!(cq = lGetElemStr(*object_base[SGE_TYPE_CQUEUE].list, CQ_name, sge_dstring_get_string(cqueue_name))) || !(qi = lGetElemHost(lGetList(cq, CQ_qinstances), QU_qhostname, sge_dstring_get_string(host_domain)))) continue; if (qinstance_state_is_manual_suspended(qi) || qinstance_state_is_susp_on_sub(qi) || qinstance_state_is_cal_suspended(qi)) { is_suspended = true; break; } } if (is_suspended) jstate |= JSUSPENDED_ON_SUBORDINATE; else jstate &= ~JSUSPENDED_ON_SUBORDINATE; lSetUlong(jatep, JAT_state, jstate); } } /****** qmaster/setup_qmaster/qmaster_lock_and_shutdown() *************************** * NAME * qmaster_lock_and_shutdown() -- Acquire qmaster lock file and shutdown * * SYNOPSIS * static void qmaster_lock_and_shutdown(int anExitValue) * * FUNCTION * qmaster exit function. This version MUST NOT be used, if the current * working directory is NOT the spool directory. Other components do rely * on finding the lock file in the spool directory. * * INPUTS * int anExitValue - exit value * * RESULT * void - none * * EXAMPLE * ??? * * NOTES * MT-NOTE: qmaster_lock_and_shutdown() is MT safe * *******************************************************************************/ static void qmaster_lock_and_shutdown(void **ctx_ref, int anExitValue) { DENTER(TOP_LAYER, "qmaster_lock_and_shutdown"); if (anExitValue == 0) { if (qmaster_lock(QMASTER_LOCK_FILE) == -1) { CRITICAL((SGE_EVENT, MSG_QMASTER_LOCKFILE_ALREADY_EXISTS)); } } sge_gdi2_shutdown(ctx_ref); DEXIT; return; } /* qmaster_lock_and_shutdown() */ static int setup_qmaster(sge_gdi_ctx_class_t *ctx) { lListElem *jep, *ep, *tmpqep; static bool first = true; lListElem *spooling_context = NULL; lList *answer_list = NULL; time_t time_start, time_end; monitoring_t monitor; object_description *object_base = object_type_get_object_description(); const char *qualified_hostname = NULL; bool job_spooling = ctx->get_job_spooling(ctx); DENTER(TOP_LAYER, "setup_qmaster"); if (first) { first = false; } else { CRITICAL((SGE_EVENT, MSG_SETUP_SETUPMAYBECALLEDONLYATSTARTUP)); DEXIT; return -1; } /* register our error function for use in replace_params() */ config_errfunc = set_error; /* * Initialize Master lists and hash tables, if necessary */ /** This is part is making the scheduler a * lot slower that it was before. This was an enhancement introduced * in cvs revision 1.35 * It might be added again, when hte hashing problem on large job lists * with only a view owners is solved. */ #if 0 if (*(object_base[SGE_TYPE_JOB].list) == NULL) { *(object_base[SGE_TYPE_JOB].list) = lCreateList("Master_Job_List", JB_Type); } cull_hash_new(*(object_base[SGE_TYPE_JOB].list), JB_owner, 0); #endif if (!sge_initialize_persistence(ctx, &answer_list)) { answer_list_output(&answer_list); DRETURN(-1); } else { answer_list_output(&answer_list); spooling_context = spool_get_default_context(); } if (sge_read_configuration(ctx, spooling_context, object_base[SGE_TYPE_CONFIG].list, answer_list) != 0) { DRETURN(-1); } mconf_set_new_config(true); /* get aliased hostname from commd */ ctx->reresolve_qualified_hostname(ctx); qualified_hostname = ctx->get_qualified_hostname(ctx); DEBUG((SGE_EVENT,"ctx->get_qualified_hostname(ctx) returned \"%s\"\n", qualified_hostname)); /* ** read in all objects and check for correctness */ DPRINTF(("Complex Attributes----------------------\n")); spool_read_list(&answer_list, spooling_context, object_base[SGE_TYPE_CENTRY].list, SGE_TYPE_CENTRY); answer_list_output(&answer_list); /* * for release 6.2u5 the "job to core"- binding feature has been added * that needs some additional complex entries. We check here if those * entries exist and create them silently if they are not there. Only * this prevents from creating a update procedure. * * TODO: As soon as there is a release where an update procedure is * available we should put that code there and remove it here. */ { struct cmplx_tmp new_complexes[] = { {"m_core", "core", 1, CMPLXLE_OP, CONSUMABLE_NO, "0", REQU_YES, "0"}, {"m_socket", "socket", 1, CMPLXLE_OP, CONSUMABLE_NO, "0", REQU_YES, "0"}, {"m_topology", "topo", 9, CMPLXEQ_OP, CONSUMABLE_NO, NULL, REQU_YES, "0"}, {"m_topology_inuse", "utopo", 9, CMPLXEQ_OP, CONSUMABLE_NO, NULL, REQU_YES, "0"}, {NULL, NULL, 0, 0, 0, NULL, 0, 0} }; int i; for (i = 0; new_complexes[i].name != NULL; i++) { lList *centry_list = *(object_base[SGE_TYPE_CENTRY].list); lListElem *entry_long = lGetElemStr(centry_list, CE_name, new_complexes[i].name); lListElem *entry_short = lGetElemStr(centry_list, CE_shortcut, new_complexes[i].shortcut); if (entry_long == NULL && entry_short == NULL) { lListElem *new_centry = lCreateElem(CE_Type); lSetString(new_centry, CE_name, new_complexes[i].name); lSetString(new_centry, CE_shortcut, new_complexes[i].shortcut); lSetString(new_centry, CE_default, new_complexes[i].valdefault); lSetString(new_centry, CE_urgency_weight, new_complexes[i].urgency_weight); lSetUlong(new_centry, CE_valtype, new_complexes[i].valtype); lSetUlong(new_centry, CE_relop, new_complexes[i].relop); lSetUlong(new_centry, CE_consumable, new_complexes[i].consumable); lSetUlong(new_centry, CE_requestable, new_complexes[i].requestable); /* append and spool the object */ lAppendElem(centry_list, new_centry); spool_write_object(NULL, spool_get_default_context(), new_centry, lGetString(new_centry, CE_name), SGE_TYPE_CENTRY, false); } } } DPRINTF(("host_list----------------------------\n")); spool_read_list(&answer_list, spooling_context, object_base[SGE_TYPE_EXECHOST].list, SGE_TYPE_EXECHOST); spool_read_list(&answer_list, spooling_context, object_base[SGE_TYPE_ADMINHOST].list, SGE_TYPE_ADMINHOST); spool_read_list(&answer_list, spooling_context, object_base[SGE_TYPE_SUBMITHOST].list, SGE_TYPE_SUBMITHOST); answer_list_output(&answer_list); if (!host_list_locate(*object_base[SGE_TYPE_EXECHOST].list, SGE_TEMPLATE_NAME)) { /* add an exec host "template" */ if (sge_add_host_of_type(ctx, SGE_TEMPLATE_NAME, SGE_EH_LIST, &monitor)) ERROR((SGE_EVENT, MSG_CONFIG_ADDINGHOSTTEMPLATETOEXECHOSTLIST)); } /* add host "global" to Master_Exechost_List as an exec host */ if (!host_list_locate(*object_base[SGE_TYPE_EXECHOST].list, SGE_GLOBAL_NAME)) { /* add an exec host "global" */ if (sge_add_host_of_type(ctx, SGE_GLOBAL_NAME, SGE_EH_LIST, &monitor)) ERROR((SGE_EVENT, MSG_CONFIG_ADDINGHOSTGLOBALTOEXECHOSTLIST)); } /* add qmaster host to Master_Adminhost_List as an administrativ host */ if (!host_list_locate(*object_base[SGE_TYPE_ADMINHOST].list, qualified_hostname)) { if (sge_add_host_of_type(ctx, qualified_hostname, SGE_AH_LIST, &monitor)) { DRETURN(-1); } } DPRINTF(("manager_list----------------------------\n")); spool_read_list(&answer_list, spooling_context, object_base[SGE_TYPE_MANAGER].list, SGE_TYPE_MANAGER); answer_list_output(&answer_list); if (!manop_is_manager("root")) { ep = lAddElemStr(object_base[SGE_TYPE_MANAGER].list, UM_name, "root", UM_Type); if (!spool_write_object(&answer_list, spooling_context, ep, "root", SGE_TYPE_MANAGER, job_spooling)) { answer_list_output(&answer_list); CRITICAL((SGE_EVENT, MSG_CONFIG_CANTWRITEMANAGERLIST)); DRETURN(-1); } } for_each(ep, *object_base[SGE_TYPE_MANAGER].list) { DPRINTF(("%s\n", lGetString(ep, UM_name))); } DPRINTF(("host group definitions-----------\n")); spool_read_list(&answer_list, spooling_context, object_base[SGE_TYPE_HGROUP].list, SGE_TYPE_HGROUP); answer_list_output(&answer_list); DPRINTF(("operator_list----------------------------\n")); spool_read_list(&answer_list, spooling_context, object_base[SGE_TYPE_OPERATOR].list, SGE_TYPE_OPERATOR); answer_list_output(&answer_list); if (!manop_is_operator("root")) { ep = lAddElemStr(object_base[SGE_TYPE_OPERATOR].list, UO_name, "root", UO_Type); if (!spool_write_object(&answer_list, spooling_context, ep, "root", SGE_TYPE_OPERATOR, job_spooling)) { answer_list_output(&answer_list); CRITICAL((SGE_EVENT, MSG_CONFIG_CANTWRITEOPERATORLIST)); DEXIT; return -1; } } for_each(ep, *object_base[SGE_TYPE_OPERATOR].list) { DPRINTF(("%s\n", lGetString(ep, UO_name))); } DPRINTF(("userset_list------------------------------\n")); spool_read_list(&answer_list, spooling_context, object_base[SGE_TYPE_USERSET].list, SGE_TYPE_USERSET); answer_list_output(&answer_list); DPRINTF(("calendar list ------------------------------\n")); spool_read_list(&answer_list, spooling_context, object_base[SGE_TYPE_CALENDAR].list, SGE_TYPE_CALENDAR); answer_list_output(&answer_list); DPRINTF(("resource quota list -----------------------\n")); spool_read_list(&answer_list, spooling_context, object_base[SGE_TYPE_RQS].list, SGE_TYPE_RQS); answer_list_output(&answer_list); #ifndef __SGE_NO_USERMAPPING__ DPRINTF(("administrator user mapping-----------\n")); spool_read_list(&answer_list, spooling_context, object_base[SGE_TYPE_CUSER].list, SGE_TYPE_CUSER); answer_list_output(&answer_list); #endif DPRINTF(("cluster_queue_list---------------------------------\n")); spool_read_list(&answer_list, spooling_context, object_base[SGE_TYPE_CQUEUE].list, SGE_TYPE_CQUEUE); answer_list_output(&answer_list); cqueue_list_set_unknown_state(*(object_base[SGE_TYPE_CQUEUE].list), NULL, false, true); /* * Initialize cached values for each qinstance: * - fullname * - suspend_on_subordinate */ for_each(tmpqep, *(object_type_get_master_list(SGE_TYPE_CQUEUE))) { lList *qinstance_list = lGetList(tmpqep, CQ_qinstances); lListElem *qinstance = NULL; lList *aso_list = NULL; lListElem *aso = NULL; /* * Update cluster queue configuration from pre-6.2u5 to 6.2u5, i.e. from * cluster queues without slotwise suspend on subordinate to such with * slotwise ssos. */ aso_list = lGetList(tmpqep, CQ_subordinate_list); if (aso_list != NULL) { /* This cluster queue has a list of subordinate lists (possibly one * for each host).*/ for_each (aso, aso_list) { lListElem *elem; int pos; lList *so_list = lGetList(aso, ASOLIST_value); /* Every element of the ASOLIST should have a SOLIST, but we * check it to be sure. */ if (so_list != NULL) { /* In each subordinate list, all elements must have the same * SO_slots_sum value, so it's enough to look in the first * element. */ elem = lFirst(so_list); pos = lGetPosViaElem(elem, SO_slots_sum, SGE_NO_ABORT); if (pos == -1) { /* In the subordinate list of the cluster queue there is no * SO_slots_sum field yet, so we have to create a new * subordinate list, copy the values from the old one and * initialize the new fields, remove the old subordinate list * from the cluster queue and add the new one instead. */ lList *new_so_list = NULL; lListElem *so = NULL; lListElem *new_so = NULL; const char *so_list_name = NULL; so_list_name = lGetListName(so_list); new_so_list = lCreateList(so_list_name, SO_Type); for_each (so, so_list) { new_so = lCreateElem(SO_Type); lSetString(new_so, SO_name, lGetString(so, SO_name)); lSetUlong(new_so, SO_threshold, lGetUlong(so, SO_threshold)); lSetUlong(new_so, SO_slots_sum, 0); lSetUlong(new_so, SO_seq_no, 0); lSetUlong(new_so, SO_action, 0); lAppendElem(new_so_list, new_so); } lSetList(aso, ASOLIST_value, new_so_list); } } } } for_each(qinstance, qinstance_list) { qinstance_set_full_name(qinstance); sge_qmaster_qinstance_state_set_susp_on_sub(qinstance, false); } } DPRINTF(("pe_list---------------------------------\n")); spool_read_list(&answer_list, spooling_context, object_base[SGE_TYPE_PE].list, SGE_TYPE_PE); answer_list_output(&answer_list); DPRINTF(("ckpt_list---------------------------------\n")); spool_read_list(&answer_list, spooling_context, object_base[SGE_TYPE_CKPT].list, SGE_TYPE_CKPT); answer_list_output(&answer_list); DPRINTF(("advance reservation list -----------------------\n")); spool_read_list(&answer_list, spooling_context, object_base[SGE_TYPE_AR].list, SGE_TYPE_AR); answer_list_output(&answer_list); /* initialize cached advance reservations structures */ { lListElem *ar; for_each(ar, *object_base[SGE_TYPE_AR].list) { ar_initialize_reserved_queue_list(ar); } } DPRINTF(("job_list-----------------------------------\n")); /* measure time needed to read job database */ time_start = time(0); spool_read_list(&answer_list, spooling_context, object_base[SGE_TYPE_JOB].list, SGE_TYPE_JOB); time_end = time(0); answer_list_output(&answer_list); { u_long32 saved_logginglevel = log_state_get_log_level(); log_state_set_log_level(LOG_INFO); INFO((SGE_EVENT, MSG_QMASTER_READ_JDB_WITH_X_ENTR_IN_Y_SECS_UU, sge_u32c(lGetNumberOfElem(*object_base[SGE_TYPE_JOB].list)), sge_u32c(time_end - time_start))); log_state_set_log_level(saved_logginglevel); } { dstring cqueue_name = DSTRING_INIT; dstring host_domain = DSTRING_INIT; for_each(jep, *(object_base[SGE_TYPE_JOB].list)) { DPRINTF(("JOB "sge_u32" PRIORITY %d\n", lGetUlong(jep, JB_job_number), (int)lGetUlong(jep, JB_priority) - BASE_PRIORITY)); /* doing this operation we need the complete job list read in */ job_suc_pre(jep); /* also do this for array dependency predecessors */ job_suc_pre_ad(jep); /* array successor jobs need to have their cache rebuilt. this will do nothing spectacular if the AD reqest list for this job is empty. */ sge_task_depend_init(jep, &answer_list); centry_list_fill_request(lGetList(jep, JB_hard_resource_list), NULL, *object_base[SGE_TYPE_CENTRY].list, false, true, false); /* need to update JSUSPENDED_ON_SUBORDINATE since task spooling is not triggered upon queue un/-suspension */ sge_propagate_queue_suspension(object_base, jep, &cqueue_name, &host_domain); } sge_dstring_free(&cqueue_name); sge_dstring_free(&host_domain); } if (!ctx->get_job_spooling(ctx)) { lList *answer_list = NULL; dstring buffer = DSTRING_INIT; INFO((SGE_EVENT, "job spooling is disabled - removing spooled jobs")); ctx->set_job_spooling(ctx, true); for_each(jep, *object_base[SGE_TYPE_JOB].list) { u_long32 job_id = lGetUlong(jep, JB_job_number); sge_dstring_clear(&buffer); if (lGetString(jep, JB_exec_file) != NULL) { if (spool_read_script(&answer_list, job_id, jep) == true) { spool_delete_script(&answer_list, job_id, jep); } else { printf("could not read in script file\n"); } } spool_delete_object(&answer_list, spool_get_default_context(), SGE_TYPE_JOB, job_get_key(job_id, 0, NULL, &buffer), job_spooling); } answer_list_output(&answer_list); sge_dstring_free(&buffer); ctx->set_job_spooling(ctx, true); } /* if the job is in state running we have to register each slot in a queue, in the resource quota sets and in the parallel environment if the job is a parallel one */ debit_all_jobs_from_qs(); debit_all_jobs_from_pes(*object_base[SGE_TYPE_PE].list); /* * Initialize cached values for each qinstance: * - update suspend on subordinate state according to running jobs * - update cached QI values. */ for_each(tmpqep, *(object_type_get_master_list(SGE_TYPE_CQUEUE))) { cqueue_mod_qinstances(ctx, tmpqep, NULL, tmpqep, true, false, &monitor); } /* rebuild signal resend events */ rebuild_signal_events(); DPRINTF(("user list-----------------------------------\n")); spool_read_list(&answer_list, spooling_context, object_base[SGE_TYPE_USER].list, SGE_TYPE_USER); answer_list_output(&answer_list); remove_invalid_job_references(job_spooling, 1, object_base); DPRINTF(("project list-----------------------------------\n")); spool_read_list(&answer_list, spooling_context, object_base[SGE_TYPE_PROJECT].list, SGE_TYPE_PROJECT); answer_list_output(&answer_list); remove_invalid_job_references(job_spooling, 0, object_base); DPRINTF(("scheduler config -----------------------------------\n")); sge_read_sched_configuration(ctx, spooling_context, &answer_list); answer_list_output(&answer_list); DPRINTF(("share tree list-----------------------------------\n")); spool_read_list(&answer_list, spooling_context, object_base[SGE_TYPE_SHARETREE].list, SGE_TYPE_SHARETREE); answer_list_output(&answer_list); ep = lFirst(*object_base[SGE_TYPE_SHARETREE].list); if (ep) { lList *alp = NULL; lList *found = NULL; check_sharetree(&alp, ep, *object_base[SGE_TYPE_USER].list, *object_base[SGE_TYPE_PROJECT].list, NULL, &found); lFreeList(&found); lFreeList(&alp); } init_categories(); DRETURN(0); } /****** setup_qmaster/remove_invalid_job_references() ************************** * NAME * remove_invalid_job_references() -- ??? * * SYNOPSIS * static int remove_invalid_job_references(int user, object_description * *object_base) * * FUNCTION * get rid of still debited per job usage contained * in user or project object if the job is no longer existing * * INPUTS * int user - work on users * object_description *object_base - master list table * * RESULT * static int - always 0 * * NOTES * MT-NOTE: remove_invalid_job_references() is not MT safe * *******************************************************************************/ static int remove_invalid_job_references(bool job_spooling, int user, object_description *object_base) { lListElem *up, *upu, *next; u_long32 jobid; int object_key = user ? UU_name : PR_name; lList *object_list = user ? *object_base[SGE_TYPE_USER].list : *object_base[SGE_TYPE_PROJECT].list; sge_object_type object_type = user ? SGE_TYPE_USER : SGE_TYPE_PROJECT; const char *object_name = user ? MSG_OBJ_USER : MSG_OBJ_PRJ; int debited_job_usage_key = user ? UU_debited_job_usage : PR_debited_job_usage; DENTER(TOP_LAYER, "remove_invalid_job_references"); for_each(up, object_list) { int spool_me = 0; next = lFirst(lGetList(up, debited_job_usage_key)); while ((upu=next)) { next = lNext(upu); jobid = lGetUlong(upu, UPU_job_number); if (!job_list_locate(*(object_type_get_master_list(SGE_TYPE_JOB)), jobid)) { lRemoveElem(lGetList(up, debited_job_usage_key), &upu); WARNING((SGE_EVENT, "removing reference to no longer existing job "sge_u32" of %s "SFQ"\n", jobid, object_name, lGetString(up, object_key))); spool_me = 1; } } if (spool_me) { lList *answer_list = NULL; spool_write_object(&answer_list, spool_get_default_context(), up, lGetString(up, object_key), object_type, job_spooling); answer_list_output(&answer_list); } } DEXIT; return 0; } static int debit_all_jobs_from_qs() { lListElem *gdi; u_long32 slots; const char *queue_name; lListElem *next_jep = NULL; lListElem *jep = NULL; lListElem *qep = NULL; lListElem *next_jatep = NULL; lListElem *jatep = NULL; int ret = 0; object_description *object_base = object_type_get_object_description(); lList *master_centry_list = *object_base[SGE_TYPE_CENTRY].list; lList *master_cqueue_list = *object_base[SGE_TYPE_CQUEUE].list; lList *master_ar_list = *object_base[SGE_TYPE_AR].list; lList *master_rqs_list = *object_base[SGE_TYPE_RQS].list; DENTER(TOP_LAYER, "debit_all_jobs_from_qs"); next_jep = lFirst(*(object_type_get_master_list(SGE_TYPE_JOB))); while ((jep=next_jep)) { /* may be we have to delete this job */ next_jep = lNext(jep); next_jatep = lFirst(lGetList(jep, JB_ja_tasks)); while ((jatep = next_jatep)) { bool master_task = true; next_jatep = lNext(jatep); /* don't look at states - we only trust in "granted destin. ident. list" */ for_each (gdi, lGetList(jatep, JAT_granted_destin_identifier_list)) { u_long32 ar_id = lGetUlong(jep, JB_ar); lListElem *ar = NULL; queue_name = lGetString(gdi, JG_qname); slots = lGetUlong(gdi, JG_slots); if (!(qep = cqueue_list_locate_qinstance(master_cqueue_list, queue_name))) { ERROR((SGE_EVENT, MSG_CONFIG_CANTFINDQUEUEXREFERENCEDINJOBY_SU, queue_name, sge_u32c(lGetUlong(jep, JB_job_number)))); lRemoveElem(lGetList(jep, JB_ja_tasks), &jatep); } else if (ar_id != 0 && (ar = lGetElemUlong(master_ar_list, AR_id, ar_id)) == NULL) { ERROR((SGE_EVENT, MSG_CONFIG_CANTFINDARXREFERENCEDINJOBY_UU, sge_u32c(ar_id), sge_u32c(lGetUlong(jep, JB_job_number)))); lRemoveElem(lGetList(jep, JB_ja_tasks), &jatep); } else { /* debit in all layers */ lListElem *rqs = NULL; debit_host_consumable(jep, host_list_locate(*object_base[SGE_TYPE_EXECHOST].list, "global"), master_centry_list, slots, master_task); debit_host_consumable(jep, host_list_locate( *object_base[SGE_TYPE_EXECHOST].list, lGetHost(qep, QU_qhostname)), master_centry_list, slots, master_task); qinstance_debit_consumable(qep, jep, master_centry_list, slots, master_task); for_each (rqs, master_rqs_list) { rqs_debit_consumable(rqs, jep, gdi, lGetString(jatep, JAT_granted_pe), master_centry_list, *object_base[SGE_TYPE_USERSET].list, *object_base[SGE_TYPE_HGROUP].list, slots, master_task); } if (ar != NULL) { lListElem *queue = lGetSubStr(ar, QU_full_name, lGetString(gdi, JG_qname), AR_reserved_queues); if (queue != NULL) { qinstance_debit_consumable(queue, jep, master_centry_list, slots, master_task); } else { ERROR((SGE_EVENT, "job "sge_U32CFormat" runs in queue "SFQ" not reserved by AR "sge_U32CFormat, sge_u32c(lGetUlong(jep, JB_job_number)), lGetString(gdi, JG_qname), sge_u32c(ar_id))); } } } master_task = false; } } } DRETURN(ret); } /****** setup_qmaster/init_categories() **************************************** * NAME * init_categories() -- Initialize usersets/projects wrts categories * * SYNOPSIS * static void init_categories(void) * * FUNCTION * Initialize usersets/projects wrts categories. * * NOTES * MT-NOTE: init_categories() is not MT safe *******************************************************************************/ static void init_categories(void) { const lListElem *cq, *pe, *hep, *ep; lListElem *acl, *prj, *rqs; lList *u_list = NULL, *p_list = NULL; lList *master_project_list = (*object_type_get_master_list(SGE_TYPE_PROJECT)); lList *master_userset_list = (*object_type_get_master_list(SGE_TYPE_USERSET)); bool all_projects = false; bool all_usersets = false; /* * collect a list of references to usersets/projects used in * the resource quota sets */ for_each (rqs, *object_type_get_master_list(SGE_TYPE_RQS)) { if (!all_projects && !rqs_diff_projects(rqs, NULL, &p_list, NULL, master_project_list)) { all_projects = true; } if (!all_usersets && !rqs_diff_usersets(rqs, NULL, &u_list, NULL, master_userset_list)) { all_usersets = true; } if (all_usersets && all_projects) { break; } } /* * collect list of references to usersets/projects used as ACL * with queue_conf(5), host_conf(5) and sge_pe(5) */ for_each (cq, *object_type_get_master_list(SGE_TYPE_CQUEUE)) { cqueue_diff_projects(cq, NULL, &p_list, NULL); cqueue_diff_usersets(cq, NULL, &u_list, NULL); } for_each (pe, *object_type_get_master_list(SGE_TYPE_PE)) { pe_diff_usersets(pe, NULL, &u_list, NULL); } for_each (hep, *object_type_get_master_list(SGE_TYPE_EXECHOST)) { host_diff_projects(hep, NULL, &p_list, NULL); host_diff_usersets(hep, NULL, &u_list, NULL); } /* * now set categories flag with usersets/projects used as ACL */ for_each(ep, p_list) if ((prj = prj_list_locate(master_project_list, lGetString(ep, PR_name)))) lSetBool(prj, PR_consider_with_categories, true); for_each(ep, u_list) if ((acl = userset_list_locate(master_userset_list, lGetString(ep, US_name)))) lSetBool(acl, US_consider_with_categories, true); lFreeList(&p_list); lFreeList(&u_list); }