/*___INFO__MARK_BEGIN__*/ /************************************************************************* * * The Contents of this file are made available subject to the terms of * the Sun Industry Standards Source License Version 1.2 * * Sun Microsystems Inc., March, 2001 * * * Sun Industry Standards Source License Version 1.2 * ================================================= * The contents of this file are subject to the Sun Industry Standards * Source License Version 1.2 (the "License"); You may not use this file * except in compliance with the License. You may obtain a copy of the * License at http://gridengine.sunsource.net/Gridengine_SISSL_license.html * * Software provided under this License is provided on an "AS IS" basis, * WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, * WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS, * MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING. * See the License for the specific provisions governing your rights and * obligations concerning the Software. * * The Initial Developer of the Original Code is: Sun Microsystems, Inc. * * Copyright: 2001 by Sun Microsystems, Inc. * * All Rights Reserved. * ************************************************************************/ /*___INFO__MARK_END__*/ #include #include #include #include #include #ifdef SGE_PQS_API #include #endif #include "rmon/sgermon.h" #include "uti/sge_log.h" #include "uti/sge_parse_num_par.h" #include "uti/sge_string.h" #include "uti/sge_hostname.h" #include "uti/sge_time.h" #include "cull/cull.h" #include "sgeobj/sge_range.h" #include "sgeobj/sge_order.h" #include "sgeobj/sge_pe.h" #include "sgeobj/sge_qinstance.h" #include "sgeobj/sge_str.h" #include "sgeobj/sge_ja_task.h" #include "sgeobj/sge_attr.h" #include "sgeobj/sge_host.h" #include "sgeobj/sge_job.h" #include "sgeobj/sge_cqueue.h" #include "sgeobj/sge_qinstance.h" #include "sgeobj/sge_qinstance_type.h" #include "sgeobj/sge_userprj.h" #include "sgeobj/sge_ckpt.h" #include "sgeobj/sge_centry.h" #include "sgeobj/sge_object.h" #include "sgeobj/sge_qinstance_state.h" #include "sgeobj/sge_schedd_conf.h" #include "sgeobj/sge_subordinate.h" #include "sgeobj/sge_qref.h" #include "sgeobj/sge_advance_reservation.h" #include "sgeobj/sge_userset.h" #include "sgeobj/sge_hgroup.h" #include "sgeobj/sge_calendar.h" #include "sge.h" #include "basis_types.h" #include "sge_select_queue.h" #include "sge_complex_schedd.h" #include "sge_resource_utilization.h" #include "valid_queue_user.h" #include "subordinate_schedd.h" #include "sge_range_schedd.h" #include "sge_pe_schedd.h" #include "sge_qeti.h" #include "sort_hosts.h" #include "schedd_monitor.h" #include "schedd_message.h" #include "sge_schedd_text.h" #include "sge_resource_quota_schedd.h" #ifdef SGE_PQS_API #include "sge_pqs_api.h" #endif #include "sge_resource_utilization_RUE_L.h" #include "sge_select_queue_LDR_L.h" #include "sge_select_queue_QRL_L.h" #include "sge_ct_SCT_L.h" #include "sge_ct_REF_L.h" #include "sge_ct_CT_L.h" #include "sge_ct_CCT_L.h" #include "sge_ct_CTI_L.h" #include "sge_message_SME_L.h" #include "sge_message_MES_L.h" #ifdef SGE_PQS_API #include "sge_varL.h" #endif #include "msg_common.h" #include "msg_schedd.h" /* -- these implement helpers for the category optimization -------- */ typedef struct { lListElem *category; /* ref to the category */ lListElem *cache; /* ref to the cache object in th category */ bool use_category; /* if true: use the category with immediate dispatch runs only and only if there is more than a single job of that category prevents 'skip_host_list' and 'skip_queue_list' be used with reservation */ bool mod_category; /* if true: update the category with new messages, queues, and hosts */ u_long32 *posible_pe_slots; /* stores all posible slots settings for a pe job with ranges */ bool is_pe_slots_rev; /* if it is true, the posible_pe_slots are stored in the category */ } category_use_t; static void fill_category_use_t(const sge_assignment_t *best, category_use_t *use_category, const char *pe_name); static bool add_pe_slots_to_category(category_use_t *use_category, u_long32 *max_slotsp, lListElem *pe, int min_slots, int max_slots, lList *pe_range); /* -- these implement parallel assignemnt ------------------------- */ static dispatch_t parallel_reservation_max_time_slots(sge_assignment_t *best, int *available_slots); static dispatch_t parallel_maximize_slots_pe(sge_assignment_t *best, int *available_slots); static dispatch_t parallel_assignment(sge_assignment_t *a, category_use_t *use_category, int *available_slots); #ifdef SOLARIS #pragma no_inline(parallel_assignment) #endif static dispatch_t parallel_available_slots(const sge_assignment_t *a, int *slots, int *slots_qend); static dispatch_t parallel_tag_queues_suitable4job(sge_assignment_t *assignment, category_use_t *use_category, int *available_slots); static dispatch_t parallel_global_slots(const sge_assignment_t *a, int *slots, int *slots_qend); static dispatch_t parallel_tag_hosts_queues(sge_assignment_t *a, lListElem *hep, int *slots, int *slots_qend, bool *master_host, category_use_t *use_category, lList **unclear_cqueue_list); #ifdef SOLARIS #pragma no_inline(parallel_tag_hosts_queues) #endif static int parallel_max_host_slots(sge_assignment_t *a, lListElem *host); static dispatch_t parallel_queue_slots(sge_assignment_t *a,lListElem *qep, int *slots, int *slots_qend, bool allow_non_requestable); static void clean_up_parallel_job(sge_assignment_t *a); /* -- these implement sequential assignemnt ---------------------- */ static dispatch_t sequential_tag_queues_suitable4job(sge_assignment_t *a); static dispatch_t sequential_queue_time(u_long32 *start, const sge_assignment_t *a, int *violations, lListElem *qep); static dispatch_t sequential_host_time(u_long32 *start, const sge_assignment_t *a, int *violations, lListElem *hep); static dispatch_t sequential_global_time(u_long32 *start_time, const sge_assignment_t *a, int *violations); static dispatch_t match_static_advance_reservation(const sge_assignment_t *a); static int sequential_update_host_order(lList *host_list, lList *queues); /* -- base functions ---------------------------------------------- */ static int compute_soft_violations(const sge_assignment_t *a, lListElem *queue, int violation, lList *load_attr, lList *config_attr, lList *actual_attr, u_long32 layer, double lc_factor, u_long32 tag); static dispatch_t rc_time_by_slots(const sge_assignment_t *a, lList *requested, lList *load_attr, lList *config_attr, lList *actual_attr, lListElem *queue, bool allow_non_requestable, dstring *reason, int slots, u_long32 layer, double lc_factor, u_long32 tag, u_long32 *start_time, const char *object_name); static dispatch_t ri_slots_by_time(const sge_assignment_t *a, int *slots, int *slots_qend, lList *rue_list, lListElem *request, lList *load_attr, lList *total_list, lListElem *queue, u_long32 layer, double lc_factor, dstring *reason, bool allow_non_requestable, bool no_centry, const char *object_name); static dispatch_t match_static_resource(int slots, lListElem *req_cplx, lListElem *src_cplx, dstring *reason, bool allow_non_requestable); static int resource_cmp(u_long32 relop, double req, double src_dl); static bool job_is_forced_centry_missing(const sge_assignment_t *a, const lListElem *queue_or_host); static void clear_resource_tags(lList *resources, u_long32 max_tag); static dispatch_t find_best_result(dispatch_t r1, dispatch_t r2); /* ---- helpers for load computation ---------------------------------------------------------- */ static lListElem *load_locate_elem(lList *load_list, lListElem *global_consumable, lListElem *host_consumable, lListElem *queue_consumable, const char *limit); #ifdef SGE_PQS_API static int sge_call_pe_qsort(sge_assignment_t *a, const char *qsort_args); #endif static int load_check_alarm(char *reason, const char *name, const char *load_value, const char *limit_value, u_long32 relop, u_long32 type, lListElem *hep, lListElem *hlep, double lc_host, double lc_global, const lList *load_adjustments, int load_is_value); static int load_np_value_adjustment(const char* name, lListElem *hep, double *load_correction); /* ---- Implementation ------------------------------------------------------------------------- */ void assignment_init(sge_assignment_t *a, lListElem *job, lListElem *ja_task, bool is_load_adj) { if (job != NULL) { a->job = job; a->user = lGetString(job, JB_owner); a->group = lGetString(job, JB_group); a->project = lGetString(job, JB_project); a->job_id = lGetUlong(job, JB_job_number); a->is_soft = job_has_soft_requests(job); } if (is_load_adj) { a->load_adjustments = sconf_get_job_load_adjustments(); } if (ja_task != NULL) { a->ja_task = ja_task; a->ja_task_id = lGetUlong(ja_task, JAT_task_number); } } void assignment_copy(sge_assignment_t *dst, sge_assignment_t *src, bool move_gdil) { if (dst == NULL || src == NULL) { return; } if (dst->load_adjustments != NULL) { lFreeList(&dst->load_adjustments); } if (move_gdil) { lFreeList(&(dst->gdil)); lFreeList(&(dst->limit_list)); lFreeList(&(dst->skip_cqueue_list)); lFreeList(&(dst->skip_host_list)); } memcpy(dst, src, sizeof(sge_assignment_t)); if (src->load_adjustments != NULL) { dst->load_adjustments = lCopyList("cpy_load_adj", src->load_adjustments); } if (!move_gdil) dst->gdil = dst->limit_list = dst->skip_cqueue_list = dst->skip_host_list = NULL; else src->gdil = src->limit_list = src->skip_cqueue_list = src->skip_host_list = NULL; } void assignment_release(sge_assignment_t *a) { lFreeList(&(a->load_adjustments)); lFreeList(&(a->gdil)); lFreeList(&(a->limit_list)); lFreeList(&(a->skip_cqueue_list)); lFreeList(&(a->skip_host_list)); } void assignment_clear_cache(sge_assignment_t *a) { lFreeList(&(a->limit_list)); lFreeList(&(a->skip_cqueue_list)); lFreeList(&(a->skip_host_list)); } static dispatch_t find_best_result(dispatch_t r1, dispatch_t r2) { DENTER(BASIS_LAYER, "find_best_result"); if (r1 == DISPATCH_NEVER || r2 == DISPATCH_NEVER) { DRETURN(DISPATCH_NEVER); } else if (r1 == DISPATCH_OK || r2 == DISPATCH_OK) { DRETURN(DISPATCH_OK); } else if (r1 == DISPATCH_NOT_AT_TIME || r2 == DISPATCH_NOT_AT_TIME) { DRETURN(DISPATCH_NOT_AT_TIME); } else if (r1 == DISPATCH_NEVER_JOB || r2 == DISPATCH_NEVER_JOB) { DRETURN(DISPATCH_NEVER_JOB); } else if (r1 == DISPATCH_NEVER_CAT || r2 == DISPATCH_NEVER_CAT) { DRETURN(DISPATCH_NEVER_CAT); } else if (r1 == DISPATCH_MISSING_ATTR || r2 == DISPATCH_MISSING_ATTR ) { DRETURN(DISPATCH_MISSING_ATTR); } CRITICAL((SGE_EVENT, MSG_JOBMATCHINGUNEXPECTEDRESULT)); DRETURN(DISPATCH_NEVER); } static bool is_not_better(sge_assignment_t *a, u_long32 viola_best, u_long32 tt_best, u_long32 viola_this, u_long32 tt_this) { /* earlier start time has higher preference than lower soft violations */ if (a->is_reservation && tt_this >= tt_best) return true; if (!a->is_reservation && viola_this >= viola_best) return true; return false; } /****** scheduler/sge_select_parallel_environment() **************************** * NAME * sge_select_parallel_environment() -- Decide about a PE assignment * * SYNOPSIS * static dispatch_t sge_select_parallel_environment(sge_assignment_t *best, lList * *pe_list) * * FUNCTION * When users use wildcard PE request such as -pe 'mpi8_*' * more than a single parallel environment can match the wildcard expression. * In case of 'now' assignments the PE that gives us the largest assignment * is selected. When scheduling a reservation we search for the earliest * assignment for each PE and then choose that one that finally gets us the * maximum number of slots. * * IMPORTANT * The scheduler info messages are not cached. They are added globaly and have * to be added for each job in the category. When the messages are updated * this has to be changed. * * INPUTS * sge_assignment_t *best - herein we keep all important in/out information * lList *pe_list - the list of all parallel environments (PE_Type) * * RESULT * dispatch_t - 0 ok got an assignment * 1 no assignment at the specified time (???) * -1 assignment will never be possible for all jobs of that category * -2 assignment will never be possible for that particular job * * NOTES * MT-NOTE: sge_select_parallel_environment() is not MT safe *******************************************************************************/ dispatch_t sge_select_parallel_environment(sge_assignment_t *best, lList *pe_list) { int matched_pe_count = 0; lListElem *pe, *queue; const char *pe_request, *pe_name; dispatch_t result, best_result = DISPATCH_NEVER_CAT; int old_logging = 0; DENTER(TOP_LAYER, "sge_select_parallel_environment"); pe_request = lGetString(best->job, JB_pe); DPRINTF(("handling parallel job "sge_u32"."sge_u32"\n", best->job_id, best->ja_task_id)); /* initialize all taggs */ for_each(queue, best->queue_list) { lSetUlong(queue, QU_tagged4schedule, 2); } if (best->is_reservation) { /* reservation scheduling */ if (!best->is_advance_reservation) { old_logging = schedd_mes_get_logging(); schedd_mes_set_logging(0); sconf_set_mes_schedd_info(false); } for_each(pe, pe_list) { int available_slots = 0; if (!pe_is_matching(pe, pe_request)) { continue; } matched_pe_count++; pe_name = lGetString(pe, PE_name); if (best->gdil == NULL) { /* first pe run */ best->pe = pe; best->pe_name = pe_name; /* determine earliest start time with that PE */ result = parallel_reservation_max_time_slots(best, &available_slots); if (result != DISPATCH_OK) { schedd_mes_add(best->monitor_alpp, best->monitor_next_run, best->job_id, SCHEDD_INFO_PESLOTSNOTINRANGE_SI, pe_name, available_slots); best_result = find_best_result(best_result, result); continue; } DPRINTF(("### first ### reservation in PE \"%s\" at "sge_u32" with %d soft violations\n", pe_name, best->start, best->soft_violations)); } else { /* test with all other pes */ sge_assignment_t tmp = SGE_ASSIGNMENT_INIT; assignment_copy(&tmp, best, false); tmp.pe = pe; tmp.pe_name = pe_name; /* try to find earlier assignment again with minimum slot amount */ tmp.slots = 0; result = parallel_reservation_max_time_slots(&tmp, &available_slots); if (result != DISPATCH_OK) { schedd_mes_add(best->monitor_alpp, best->monitor_next_run, best->job_id, SCHEDD_INFO_PESLOTSNOTINRANGE_SI, pe_name, available_slots); best_result = find_best_result(best_result, result); assignment_release(&tmp); continue; } if (tmp.start < best->start || (tmp.start == best->start && tmp.soft_violations < best->soft_violations)) { assignment_copy(best, &tmp, true); DPRINTF(("### better ### reservation in PE \"%s\" at "sge_u32" with %d soft violations\n", pe_name, best->start, best->soft_violations)); } assignment_release(&tmp); } } } else { u_long32 ar_id; /* now assignments */ result = match_static_advance_reservation(best); if (result != DISPATCH_OK) { DRETURN(result); } if ((ar_id = lGetUlong(best->job, JB_ar)) != 0) { lListElem *ar = lGetElemUlong(best->ar_list, AR_id, ar_id); pe = lGetElemStr(pe_list, PE_name, lGetString(ar, AR_granted_pe)); if (pe == NULL) { DPRINTF(("Critical Error, ar references non existing PE\n")); } else { int available_slots = 0; matched_pe_count++; best->pe = pe; best->pe_name = lGetString(pe, PE_name); best_result = parallel_maximize_slots_pe(best, &available_slots); if (best_result != DISPATCH_OK) { schedd_mes_add(best->monitor_alpp, best->monitor_next_run, best->job_id, SCHEDD_INFO_PESLOTSNOTINRANGE_SI, best->pe_name, available_slots); } DPRINTF(("### AR ### assignment in PE \"%s\" with %d soft violations and %d available slots\n", best->pe_name, best->soft_violations, available_slots)); } } else { for_each(pe, pe_list) { if (!pe_is_matching(pe, pe_request)) { continue; } pe_name = lGetString(pe, PE_name); matched_pe_count++; if (best->gdil == NULL) { int available_slots = 0; best->pe = pe; best->pe_name = pe_name; result = parallel_maximize_slots_pe(best, &available_slots); if (result != DISPATCH_OK) { schedd_mes_add(best->monitor_alpp, best->monitor_next_run, best->job_id, SCHEDD_INFO_PESLOTSNOTINRANGE_SI, pe_name, available_slots); best_result = find_best_result(best_result, result); continue; } DPRINTF(("### first ### assignment in PE \"%s\" with %d soft violations\n", pe_name, best->soft_violations)); } else { int available_slots = 0; sge_assignment_t tmp = SGE_ASSIGNMENT_INIT; assignment_copy(&tmp, best, false); tmp.pe = pe; tmp.pe_name = pe_name; result = parallel_maximize_slots_pe(&tmp, &available_slots); if (result != DISPATCH_OK) { assignment_release(&tmp); schedd_mes_add(best->monitor_alpp, best->monitor_next_run, best->job_id, SCHEDD_INFO_PESLOTSNOTINRANGE_SI, pe_name, available_slots); best_result = find_best_result(best_result, result); continue; } if ((tmp.slots > best->slots) || (tmp.start == best->start && tmp.soft_violations < best->soft_violations)) { assignment_copy(best, &tmp, true); DPRINTF(("### better ### assignment in PE \"%s\" with %d soft violations\n", pe_name, best->soft_violations)); } assignment_release(&tmp); } } } } if (matched_pe_count == 0) { schedd_mes_add(best->monitor_alpp, best->monitor_next_run, best->job_id, SCHEDD_INFO_NOPEMATCH_ ); best_result = DISPATCH_NEVER_CAT; } else if (best->is_reservation && best->gdil) { int available_slots = 0; result = parallel_maximize_slots_pe(best, &available_slots); if (result != DISPATCH_OK) { /* ... should never happen */ best_result = DISPATCH_NEVER_CAT; } } if (best->gdil) { best_result = DISPATCH_OK; } switch (best_result) { case DISPATCH_OK: DPRINTF(("SELECT PE("sge_u32"."sge_u32") returns PE %s %d slots at "sge_u32")\n", best->job_id, best->ja_task_id, best->pe_name, best->slots, best->start)); break; case DISPATCH_NOT_AT_TIME: DPRINTF(("SELECT PE("sge_u32"."sge_u32") returns \n", best->job_id, best->ja_task_id)); break; case DISPATCH_NEVER_CAT: DPRINTF(("SELECT PE("sge_u32"."sge_u32") returns \n", best->job_id, best->ja_task_id)); break; case DISPATCH_NEVER_JOB: DPRINTF(("SELECT PE("sge_u32"."sge_u32") returns \n", best->job_id, best->ja_task_id)); break; case DISPATCH_MISSING_ATTR: default: DPRINTF(("!!!!!!!! SELECT PE("sge_u32"."sge_u32") returns unexpected %d\n", best->job_id, best->ja_task_id, best_result)); break; } if (best->is_reservation && !best->is_advance_reservation) { sconf_set_mes_schedd_info(true); schedd_mes_set_logging(old_logging); } /* clean up */ clean_up_parallel_job(best); DRETURN(best_result); } /****** sge_select_queue/clean_up_parallel_job() ******************************* * NAME * clean_up_parallel_job() -- removes tages * * SYNOPSIS * static void clean_up_parallel_job(sge_assignment_t *a) * * FUNCTION * duing pe job dispatch are man queues and hosts taged. This * function removes the tages. * * INPUTS * sge_assignment_t *a - the resource structure * * * * NOTES * MT-NOTE: clean_up_parallel_job() is not MT safe * *******************************************************************************/ static void clean_up_parallel_job(sge_assignment_t *a) { qinstance_list_set_tag(a->queue_list, 0); } /****** scheduler/parallel_reservation_max_time_slots() ***************************************** * NAME * parallel_reservation_max_time_slots() -- Search earliest possible assignment * * SYNOPSIS * static dispatch_t parallel_reservation_max_time_slots(sge_assignment_t *best) * * FUNCTION * The earliest possible assignment is searched for a job assuming a * particular parallel environment be used with a particular slot * number. If the slot number passed is 0 we start with the minimum * possible slot number for that job. The search starts with the * latest queue end time if DISPATCH_TIME_QUEUE_END was specified * rather than a real time value. * * INPUTS * sge_assignment_t *best - herein we keep all important in/out information * * RESULT * dispatch_t - 0 ok got an assignment * 1 no assignment at the specified time (???) * -1 assignment will never be possible for all jobs of that category * -2 assignment will never be possible for that particular job * * NOTES * MT-NOTE: parallel_reservation_max_time_slots() is not MT safe *******************************************************************************/ static dispatch_t parallel_reservation_max_time_slots(sge_assignment_t *best, int *available_slots) { u_long32 pe_time, first_time; sge_assignment_t tmp_assignment = SGE_ASSIGNMENT_INIT; dispatch_t result = DISPATCH_NEVER_CAT; sge_qeti_t *qeti = NULL; bool is_first = true; int old_logging = 0; category_use_t use_category; DENTER(TOP_LAYER, "parallel_reservation_max_time_slots"); /* assemble job category information */ fill_category_use_t(best, &use_category, best->pe_name); qeti = sge_qeti_allocate(best); if (qeti == NULL) { ERROR((SGE_EVENT, "could not allocate qeti object needed reservation " "scheduling of parallel job "sge_U32CFormat, sge_u32c(best->job_id))); DRETURN(DISPATCH_NEVER_CAT); } assignment_copy(&tmp_assignment, best, false); if (best->slots == 0) { tmp_assignment.slots = range_list_get_first_id(lGetList(best->job, JB_pe_range), NULL); } if (best->start == DISPATCH_TIME_QUEUE_END) { first_time = sge_qeti_first(qeti); if (first_time == 0) { /* we need at least one reservation run */ first_time = best->now; } } else { /* the first iteration will be done using best->start further ones will use earliert times */ first_time = best->start; sge_qeti_next_before(qeti, best->start); } old_logging = schedd_mes_get_logging(); /* store logging mode */ for (pe_time = first_time ; pe_time; pe_time = sge_qeti_next(qeti)) { DPRINTF(("SELECT PE TIME(%s, "sge_u32") tries at "sge_u32"\n", best->pe_name, best->job_id, pe_time)); tmp_assignment.start = pe_time; /* this is an additional run, we have already at least one posible match, all additional scheduling information is not important, since we can start the job */ if (is_first) { is_first = false; } else { use_category.mod_category = false; schedd_mes_set_logging(0); sconf_set_mes_schedd_info(false); } result = parallel_assignment(&tmp_assignment, &use_category, available_slots); assignment_clear_cache(&tmp_assignment); if (result == DISPATCH_OK) { if (tmp_assignment.gdil) { DPRINTF(("SELECT PE TIME: earlier assignment at "sge_u32"\n", pe_time)); } assignment_copy(best, &tmp_assignment, true); assignment_release(&tmp_assignment); } else { DPRINTF(("SELECT PE TIME: no earlier assignment at "sge_u32"\n", pe_time)); break; } } schedd_mes_set_logging(old_logging); /* restore logging mode */ sconf_set_mes_schedd_info(true); sge_qeti_release(&qeti); assignment_release(&tmp_assignment); if (best->gdil) { result = DISPATCH_OK; } switch (result) { case DISPATCH_OK: DPRINTF(("SELECT PE TIME(%s, %d) returns "sge_u32"\n", best->pe_name, best->slots, best->start)); break; case DISPATCH_NEVER_CAT: DPRINTF(("SELECT PE TIME(%s, %d) returns \n", best->pe_name, best->slots)); break; case DISPATCH_NEVER_JOB: DPRINTF(("SELECT PE TIME(%s, %d) returns \n", best->pe_name, best->slots)); break; default: DPRINTF(("!!!!!!!! SELECT PE TIME(%s, %d) returns unexpected %d\n", best->pe_name, best->slots, result)); break; } DRETURN(result); } /****** scheduler/parallel_maximize_slots_pe() ***************************************** * NAME * parallel_maximize_slots_pe() -- Maximize number of slots for an assignment * * SYNOPSIS * static int parallel_maximize_slots_pe(sge_assignment_t *best, lList *host_list, * lList *queue_list, lList *centry_list, lList *acl_list) * * FUNCTION * The largest possible slot amount is searched for a job assuming a * particular parallel environment be used at a particular start time. * If the slot number passed is 0 we start with the minimum * possible slot number for that job. * * To search most efficent for the right slot value, it has three search * strategies implemented: * - binary search * - least slot value first * - highest slot value first * * To be able to use binary search all possible slot values are stored in * one array. The slot values in this array are sorted ascending. After the * right slot value was found, it is very easy to compute the best strategy * from the result. For each strategy it will compute how many iterations * would have been needed to compute the correct result. These steps will * be stored for the next run and used to figure out the best algorithm. * To ensure that we can adapt to rapid changes and also ignore spiks we * are using the running avarage algorithm in a 80-20 setting. This means * that the algorithm will need 4 (max 5) iterations to adopt to a new * secenario. * * Further enhancements: * It might be a good idea to store the derived values with the job cartegories * and allow to find the best strategy per category. * * INPUTS * sge_assignment_t *best - herein we keep all important in/out information * lList *host_list - a list of all available hosts * lList *queue_list - a list of all available queues * lList *centry_list - a list of all available complex attributes * lList *acl_list - a list of all access lists * * RESULT * int - 0 ok got an assignment (maybe without maximizing it) * 1 no assignment at the specified time * -1 assignment will never be possible for all jobs of that category * -2 assignment will never be possible for that particular job * * NOTES * MT-NOTE: parallel_maximize_slots_pe() is MT safe as long as the provided * lists are owned be the caller * * SEE ALSO: * sconf_best_pe_alg * sconf_update_pe_alg * add_pe_slots_to_category * *******************************************************************************/ static dispatch_t parallel_maximize_slots_pe(sge_assignment_t *best, int *available_slots) { int min_slots, max_slots; int max_pe_slots; int first, last; lList *pe_range; lListElem *pe; sge_assignment_t tmp = SGE_ASSIGNMENT_INIT; dispatch_t result = DISPATCH_NEVER_CAT; const char *pe_name = best->pe_name; bool is_first = true; int old_logging = 0; category_use_t use_category; u_long32 max_slotsp = 0; int current = 0; int match_current = 0; int runs = 0; schedd_pe_algorithm alg = sconf_best_pe_alg(); DENTER(TOP_LAYER, "parallel_maximize_slots_pe"); if (best == NULL || (pe_range=lGetList(best->job, JB_pe_range)) == NULL || (pe=best->pe) == NULL) { DRETURN(DISPATCH_NEVER_CAT); } /* assemble job category information */ fill_category_use_t(best, &use_category, pe_name); first = range_list_get_first_id(pe_range, NULL); last = range_list_get_last_id(pe_range, NULL); max_pe_slots = lGetUlong(pe, PE_slots); if (best->slots) { min_slots = best->slots; } else { min_slots = first; } if (best->gdil && best->slots == max_pe_slots) { /* already found maximum */ DRETURN(DISPATCH_OK); } DPRINTF(("MAXIMIZE SLOT: FIRST %d LAST %d MAX SLOT %d\n", first, last, max_pe_slots)); /* this limits max range number RANGE_INFINITY (i.e. -pe pe 1-) to reasonable number */ max_slots = MIN(last, max_pe_slots); DPRINTF(("MAXIMIZE SLOT FOR "sge_u32" using \"%s\" FROM %d TO %d\n", best->job_id, pe_name, min_slots, max_slots)); old_logging = schedd_mes_get_logging(); /* store logging mode */ if ((max_slots < min_slots) || (min_slots <= 0)) { ERROR((SGE_EVENT, "invalid pe job range setting for job "sge_u32"\n", best->job_id)); DRETURN(DISPATCH_NEVER_CAT); } /* --- prepare the posible slots for the binary search */ max_slotsp = (max_slots - min_slots+1); if (!add_pe_slots_to_category(&use_category, &max_slotsp, pe, min_slots, max_slots, pe_range)) { ERROR((SGE_EVENT, MSG_SGETEXT_NOMEM)); DRETURN(DISPATCH_NEVER_CAT); } if (max_slotsp == 0) { DPRINTF(("no slots in PE %s available for job "sge_u32"\n", pe_name, best->job_id)); DRETURN(DISPATCH_NEVER_CAT); } assignment_copy(&tmp, best, false); /* --- work on the different slot ranges and try to find the best one --- */ if (alg == SCHEDD_PE_BINARY) { int min = 0; int max = max_slotsp-1; do { runs++; current = (min + max) / 2; if (is_first) { /* first run, collect information */ is_first = false; } else { /* this is an additional run, we have already at least one posible match */ use_category.mod_category = false; schedd_mes_set_logging(0); sconf_set_mes_schedd_info(false); } /* we try that slot amount */ tmp.slots = use_category.posible_pe_slots[current]; result = parallel_assignment(&tmp, &use_category, available_slots); assignment_clear_cache(&tmp); if (result == DISPATCH_OK) { assignment_copy(best, &tmp, true); assignment_release(&tmp); match_current = current; min = current + 1; } else { max = current - 1; } } while (min <= max && max != -1); } else { /* compute how many runs the bin search might have needed */ /* This will not give us the correct answer in all cases, but it is close enough. And on average it is corrent :-) */ int end = max_slotsp; for ( runs=1; runs < end; runs++) { end = end - runs; } runs--; if (alg == SCHEDD_PE_LOW_FIRST) { for (current = 0; current < max_slotsp; current++) { if (is_first) { /* first run, collect information */ is_first = false; } else { /* this is an additional run, we have already at least one posible match */ use_category.mod_category = false; schedd_mes_set_logging(0); sconf_set_mes_schedd_info(false); } /* we try that slot amount */ tmp.slots = use_category.posible_pe_slots[current]; result = parallel_assignment(&tmp, &use_category, available_slots); assignment_clear_cache(&tmp); if (result != DISPATCH_OK) { /* we have mismatch, stop */ break; /* the other runs will not work */ } match_current = current; assignment_copy(best, &tmp, true); assignment_release(&tmp); } } else { /* optimistic search */ for (current = max_slotsp-1; current >= 0; current--) { if (is_first) { /* first run, collect information */ is_first = false; } else { /* this is an additional run, we have already at least one posible match */ use_category.mod_category = false; schedd_mes_set_logging(0); sconf_set_mes_schedd_info(false); } /* we try that slot amount */ tmp.slots = use_category.posible_pe_slots[current]; result = parallel_assignment(&tmp, &use_category, available_slots); assignment_clear_cache(&tmp); if (result == DISPATCH_OK) { /* we have a match, stop */ assignment_copy(best, &tmp, true); /* all other runs will also match */ assignment_release(&tmp); match_current = current; break; } } /* end for */ } } /* end for */ sconf_update_pe_alg(runs, match_current, max_slotsp); /* --- we are done --- */ if (!use_category.is_pe_slots_rev) { FREE(use_category.posible_pe_slots); } assignment_release(&tmp); schedd_mes_set_logging(old_logging); /* restore logging mode */ sconf_set_mes_schedd_info(true); if (best->gdil) { result = DISPATCH_OK; } switch (result) { case DISPATCH_OK: if (!best->is_reservation) { sconf_inc_pe_jobs(); } DPRINTF(("MAXIMIZE SLOT(%s, %d) returns \n", pe_name, best->slots)); break; case DISPATCH_NOT_AT_TIME: DPRINTF(("MAXIMIZE SLOT(%s, %d) returns \n", pe_name, best->slots)); break; case DISPATCH_NEVER_CAT: DPRINTF(("MAXIMIZE SLOT(%s, %d) returns \n", pe_name, best->slots)); break; case DISPATCH_NEVER_JOB: DPRINTF(("MAXIMIZE SLOT(%s, %d) returns \n", pe_name, best->slots)); break; default: DPRINTF(("!!!!!!!! MAXIMIZE SLOT(%d, %d) returns unexpected %d\n", best->slots, (int)best->start, result)); break; } DRETURN(result); } /****** sge_select_queue/sge_select_queue() ************************************ * NAME * sge_select_queue() -- checks whether a job matches a given queue or host * * SYNOPSIS * int sge_select_queue(lList *requested_attr, lListElem *queue, lListElem * *host, lList *exechost_list, lList *centry_list, bool * allow_non_requestable, int slots) * * FUNCTION * Takes the requested attributes from a job and checks if it matches the given * host or queue. One and only one should be specified. If both, the function * assumes, that the queue belongs to the given host. * * INPUTS * lList *requested_attr - list of requested attributes * lListElem *queue - current queue or null if host is set * lListElem *host - current host or null if queue is set * lList *exechost_list - list of all hosts in the system * lList *centry_list - system wide attribut config list * bool allow_non_requestable - allow non requestable? * int slots - number of requested slots * lList *queue_user_list - list of users or null * lList *acl_list - acl_list or null * lListElem *job - job or null * * RESULT * int - 1, if okay, QU_tag will be set if a queue is selected * 0, if not okay * * NOTES * The caller is responsible for cleaning tags. * * No range is used. For serial jobs we will need a call for hard and one * for soft requests. For parallel jobs we will call this function for each * -l request. Because of in serial jobs requests can be simply added. * In Parallel jobs each -l requests a different set of queues. * *******************************************************************************/ bool sge_select_queue(lList *requested_attr, lListElem *queue, lListElem *host, lList *exechost_list, lList *centry_list, bool allow_non_requestable, int slots, lList *queue_user_list, lList *acl_list, lListElem *job) { dispatch_t ret; lList *load_attr = NULL; lList *config_attr = NULL; lList *actual_attr = NULL; lListElem *global = NULL; lListElem *qu = NULL; int q_access=1; lList *projects; const char *project; sge_assignment_t a = SGE_ASSIGNMENT_INIT; double lc_factor = 0; /* scaling for load correction */ u_long32 ulc_factor; /* actually we don't care on start time here to this is just a dummy setting */ u_long32 start_time = DISPATCH_TIME_NOW; DENTER(TOP_LAYER, "sge_select_queue"); clear_resource_tags(requested_attr, MAX_TAG); assignment_init(&a, NULL, NULL, false); a.centry_list = centry_list; a.host_list = exechost_list; if (acl_list != NULL) { /* check if job owner has access rights to the queue */ DPRINTF(("testing queue access lists\n")); for_each(qu, queue_user_list) { const char *name = lGetString(qu, ST_name); DPRINTF(("-----> checking queue user: %s\n", name )); q_access |= (name[0]=='@')? sge_has_access(NULL, &name[1], queue, acl_list): sge_has_access(name, NULL, queue, acl_list); } if (q_access == 0) { DPRINTF(("no access\n")); assignment_release(&a); DRETURN(false); } else { DPRINTF(("ok\n")); } } if (job != NULL) { /* check if job can run in queue based on project */ DPRINTF(("testing job projects lists\n")); if ( (project = lGetString(job, JB_project)) ) { if ((!(projects = lGetList(queue, QU_projects)))) { DPRINTF(("no access because queue has no project\n")); assignment_release(&a); DRETURN(false); } if ((!prj_list_locate(projects, project))) { DPRINTF(("no access because project not contained in queues project list")); assignment_release(&a); DRETURN(false); } DPRINTF(("ok\n")); /* check if job can run in queue based on excluded projects */ DPRINTF(("testing job xprojects lists\n")); if ((projects = lGetList(queue, QU_xprojects))) { if (((project = lGetString(job, JB_project)) && prj_list_locate(projects, project))) { DPRINTF(("no access\n")); assignment_release(&a); DRETURN(false); } } DPRINTF(("ok\n")); } /* *is queue contained in hard queue list ? */ DPRINTF(("queue contained in jobs hard queue list?\n")); if (lGetList(job, JB_hard_queue_list)) { lList *qref_list = lGetList(job, JB_hard_queue_list); const char *qname = NULL; const char *qinstance_name = NULL; qname = lGetString(queue, QU_qname); qinstance_name = lGetString(queue, QU_full_name); if ((lGetElemStr(qref_list, QR_name, qname) != NULL) || (lGetElemStr(qref_list, QR_name, qinstance_name) != NULL)) { DPRINTF(("ok")); } else { DPRINTF(("denied because queue \"%s\" is not contained in the hard " "queue list (-q) that was requested by job %d\n", qname, lGetUlong(job, JB_job_number))); assignment_release(&a); DRETURN(false); } } } /* global */ global = host_list_locate(exechost_list, SGE_GLOBAL_NAME); load_attr = lGetList(global, EH_load_list); config_attr = lGetList(global, EH_consumable_config_list); actual_attr = lGetList(global, EH_resource_utilization); /* is there a multiplier for load correction (may be not in qstat, qmon etc) */ if (lGetPosViaElem(global, EH_load_correction_factor, SGE_NO_ABORT) >= 0) { if ((ulc_factor=lGetUlong(global, EH_load_correction_factor))) lc_factor = ((double)ulc_factor)/100; } ret = rc_time_by_slots(&a, requested_attr, load_attr, config_attr, actual_attr, NULL, allow_non_requestable, NULL, slots, DOMINANT_LAYER_HOST, lc_factor, GLOBAL_TAG, &start_time, SGE_GLOBAL_NAME); /* host */ if(ret == DISPATCH_OK || ret == DISPATCH_MISSING_ATTR){ if(host == NULL) { host = host_list_locate(exechost_list, lGetHost(queue, QU_qhostname)); } load_attr = lGetList(host, EH_load_list); config_attr = lGetList(host, EH_consumable_config_list); actual_attr = lGetList(host, EH_resource_utilization); if (lGetPosViaElem(host, EH_load_correction_factor, SGE_NO_ABORT) >= 0) { if ((ulc_factor=lGetUlong(host, EH_load_correction_factor))) lc_factor = ((double)ulc_factor)/100; } ret = rc_time_by_slots(&a, requested_attr, load_attr, config_attr, actual_attr, NULL, allow_non_requestable, NULL, slots, DOMINANT_LAYER_HOST, lc_factor, HOST_TAG, &start_time, lGetHost(host, EH_name)); /* queue */ if((ret == DISPATCH_OK || ret == DISPATCH_MISSING_ATTR) && queue){ config_attr = lGetList(queue, QU_consumable_config_list); actual_attr = lGetList(queue, QU_resource_utilization); ret = rc_time_by_slots(&a, requested_attr, NULL, config_attr, actual_attr, queue, allow_non_requestable, NULL, slots, DOMINANT_LAYER_QUEUE, 0, QUEUE_TAG, &start_time, lGetString(queue, QU_full_name)); } } assignment_release(&a); DRETURN((ret == DISPATCH_OK) ? true : false); } /****** sge_select_queue/rc_time_by_slots() ********************************** * NAME * rc_time_by_slots() -- checks weather all resource requests on one level * are fulfilled * * SYNOPSIS * static int rc_time_by_slots(lList *requested, lList *load_attr, lList * *config_attr, lList *actual_attr, lList *centry_list, lListElem *queue, * bool allow_non_requestable, char *reason, int reason_size, int slots, * u_long32 layer, double lc_factor, u_long32 tag) * * FUNCTION * Checks, weather all requests, default requests and implicit requests on this * this level are fulfilled. * * With reservation scheduling the earliest start time due to resources of the * resource container is the maximum of the earliest start times for all * resources comprised by the resource container that requested by a job. * * INPUTS * lList *requested - list of attribute requests * lList *load_attr - list of load attributes or null on queue level * lList *config_attr - list of user defined attributes * lList *actual_attr - usage of all consumables (RUE_Type) * lList *centry_list - system wide attribute config. list (CE_Type) * lListElem *queue - current queue or NULL on global/host level * bool allow_non_requestable - allow none requestabales? * char *reason - error message * int reason_size - max error message size * int slots - number of slots the job is looking for * u_long32 layer - current layer flag * double lc_factor - load correction factor * u_long32 tag - current layer tag * u_long32 *start_time - in/out argument for start time * u_long32 duration - jobs estimated total run time * const char *object_name - name of the object used for monitoring purposes * * RESULT * dispatch_t - * * NOTES * MT-NOTES: is not thread save. uses a static buffer * * Important: * we have some special behavior, when slots is set to -1. *******************************************************************************/ static dispatch_t rc_time_by_slots(const sge_assignment_t *a, lList *requested, lList *load_attr, lList *config_attr, lList *actual_attr, lListElem *queue, bool allow_non_requestable, dstring *reason, int slots, u_long32 layer, double lc_factor, u_long32 tag, u_long32 *start_time, const char *object_name) { static lListElem *implicit_slots_request = NULL; lListElem *attr; u_long32 latest_time = DISPATCH_TIME_NOW; u_long32 tmp_start; dispatch_t ret; bool is_not_found = false; DENTER(TOP_LAYER, "rc_time_by_slots"); clear_resource_tags(requested, QUEUE_TAG); /* ensure availability of implicit slot request */ if (!implicit_slots_request) { implicit_slots_request = lCreateElem(CE_Type); lSetString(implicit_slots_request, CE_name, SGE_ATTR_SLOTS); lSetString(implicit_slots_request, CE_stringval, "1"); lSetDouble(implicit_slots_request, CE_doubleval, 1); } /* match number of free slots */ if (slots != -1) { tmp_start = *start_time; ret = ri_time_by_slots(a, implicit_slots_request, load_attr, config_attr, actual_attr, queue, reason, allow_non_requestable, slots, layer, lc_factor, &tmp_start, object_name); if (ret == DISPATCH_OK && *start_time == DISPATCH_TIME_QUEUE_END) { DPRINTF(("%s: \"slot\" request delays start time from "sge_U32CFormat " to "sge_U32CFormat"\n", object_name, latest_time, MAX(latest_time, tmp_start))); latest_time = MAX(latest_time, tmp_start); } /* we don't care if slots are not specified, except at queue level */ if (ret == DISPATCH_MISSING_ATTR && tag != QUEUE_TAG) { ret = DISPATCH_OK; } if (ret != DISPATCH_OK) { DRETURN(ret); } /* ensure all default requests are fulfilled */ if (!allow_non_requestable) { lListElem *attr; dispatch_t ff; const char *name; double dval=0.0; u_long32 valtype; for_each (attr, actual_attr) { name = lGetString(attr, RUE_name); if (!strcmp(name, "slots")) { continue; } /* consumable && used in this global/host/queue && not requested */ if (!is_requested(requested, name)) { lListElem *default_request = lGetElemStr(a->centry_list, CE_name, name); const char *def_req = lGetString(default_request, CE_default); valtype = lGetUlong(default_request, CE_valtype); parse_ulong_val(&dval, NULL, valtype, def_req, NULL, 0); /* ignore default request if the value is 0 */ if ((def_req != NULL && dval != 0.0) || lGetUlong(default_request, CE_relop) == CMPLXEXCL_OP) { dstring tmp_reason; char tmp_reason_buf[2048]; sge_dstring_init(&tmp_reason, tmp_reason_buf, sizeof(tmp_reason_buf)); /* build the default request */ lSetString(default_request, CE_stringval, def_req); lSetDouble(default_request, CE_doubleval, dval); tmp_start = *start_time; ff = ri_time_by_slots(a, default_request, load_attr, config_attr, actual_attr, queue, &tmp_reason, true, slots, layer, lc_factor, &tmp_start, object_name); if (ff != DISPATCH_OK) { /* useless to continue in these cases */ sge_dstring_append(reason, MSG_SCHEDD_FORDEFAULTREQUEST); sge_dstring_append_dstring(reason, &tmp_reason); DRETURN(ff); } if (*start_time == DISPATCH_TIME_QUEUE_END) { DPRINTF(("%s: default request \"%s\" delays start time from "sge_U32CFormat " to "sge_U32CFormat"\n", object_name, name, latest_time, MAX(latest_time, tmp_start))); latest_time = MAX(latest_time, tmp_start); } } } }/* end for*/ } } else { slots = 1; } /* explicit requests */ for_each(attr, requested) { const char *attr_name = lGetString(attr, CE_name); tmp_start = *start_time; switch (ri_time_by_slots(a, attr,load_attr, config_attr, actual_attr, queue, reason, allow_non_requestable, slots, layer, lc_factor, &tmp_start, object_name)) { case DISPATCH_NEVER_CAT : /* will never match */ DRETURN(DISPATCH_NEVER_CAT); case DISPATCH_OK : /* a match was found */ if (*start_time == DISPATCH_TIME_QUEUE_END) { DPRINTF(("%s: explicit request \"%s\" delays start time from "sge_U32CFormat "to "sge_U32CFormat"\n", object_name, attr_name, latest_time, MAX(latest_time, tmp_start))); latest_time = MAX(latest_time, tmp_start); } if (lGetUlong(attr, CE_tagged) < tag && tag != RQS_TAG) { lSetUlong(attr, CE_tagged, tag); } break; case DISPATCH_NOT_AT_TIME : /* will match later-on */ DPRINTF(("%s: request for %s will match later-on\n", object_name, attr_name)); DRETURN(DISPATCH_NOT_AT_TIME); case DISPATCH_MISSING_ATTR : /* the requested element does not exist */ if (tag == QUEUE_TAG && lGetUlong(attr, CE_tagged) == NO_TAG) { sge_dstring_sprintf(reason, MSG_SCHEDD_JOBREQUESTSUNKOWNRESOURCE_S, attr_name); DRETURN(DISPATCH_NEVER_CAT); } if (tag != QUEUE_TAG) { is_not_found = true; } break; default: /* error */ break; } } if (*start_time == DISPATCH_TIME_QUEUE_END) { *start_time = latest_time; } if (is_not_found) { DRETURN(DISPATCH_MISSING_ATTR); } DRETURN(DISPATCH_OK); } static dispatch_t match_static_resource(int slots, lListElem *req_cplx, lListElem *src_cplx, dstring *reason, bool allow_non_requestable) { int match; dispatch_t ret = DISPATCH_OK; char availability_text[2048]; DENTER(TOP_LAYER, "match_static_resource"); /* check whether attrib is requestable */ if (!allow_non_requestable && lGetUlong(src_cplx, CE_requestable) == REQU_NO) { sge_dstring_append(reason, MSG_SCHEDD_JOBREQUESTSNONREQUESTABLERESOURCE); sge_dstring_append(reason, lGetString(src_cplx, CE_name)); sge_dstring_append(reason, "\""); DRETURN(DISPATCH_NEVER_CAT); } match = compare_complexes(slots, req_cplx, src_cplx, availability_text, false, false); if (!match) { sge_dstring_append(reason, MSG_SCHEDD_ITOFFERSONLY); sge_dstring_append(reason, availability_text); ret = DISPATCH_NEVER_CAT; } DRETURN(ret); } /****** sge_select_queue/clear_resource_tags() ********************************* * NAME * clear_resource_tags() -- removes the tags from a resouce request. * * SYNOPSIS * static void clear_resource_tags(lList *resouces, u_long32 max_tag) * * FUNCTION * Removes the tages from the given resouce list. A tag is only removed * if it is smaller or equal to the given tag value. The tag value "MAX_TAG" results * in removing all existing tags, or the value "HOST_TAG" removes queue and host * tags but keeps the global tags. * * INPUTS * lList *resouces - list of job requests. * u_long32 max_tag - max tag element * *******************************************************************************/ static void clear_resource_tags(lList *resources, u_long32 max_tag) { lListElem *attr=NULL; for_each(attr, resources){ if(lGetUlong(attr, CE_tagged) <= max_tag) lSetUlong(attr, CE_tagged, NO_TAG); } } /****** sge_select_queue/sge_queue_match_static() ************************ * NAME * sge_queue_match_static() -- Do matching that depends not on time. * * SYNOPSIS * static int sge_queue_match_static(lListElem *queue, lListElem *job, * const lListElem *pe, const lListElem *ckpt, lList *centry_list, lList * *host_list, lList *acl_list) * * FUNCTION * Checks if a job fits on a queue or not. All checks that depend on the * current load and resource situation must get handled outside. * The queue also gets tagged in QU_tagged4schedule to indicate whether it * is specified using -masterq queue_list. * * INPUTS * lListElem *queue - The queue we're matching * lListElem *job - The job * const lListElem *pe - The PE object * const lListElem *ckpt - The ckpt object * lList *centry_list - The centry list * lList *acl_list - The ACL list * * RESULT * dispatch_t - DISPATCH_OK, ok * DISPATCH_NEVER_CAT, assignment will never be possible for all jobs of that category * * NOTES *******************************************************************************/ dispatch_t sge_queue_match_static(const sge_assignment_t *a, lListElem *queue) { u_long32 ar_id; lList *projects; lListElem *ar_ep; const lList *hard_queue_list, *master_hard_queue_list; const char *qinstance_name = lGetString(queue, QU_full_name); bool could_be_master = false; DENTER(TOP_LAYER, "sge_queue_match_static"); /* check if queue was reserved for AR job */ ar_id = lGetUlong(a->job, JB_ar); ar_ep = lGetElemUlong(a->ar_list, AR_id, ar_id); if (ar_ep != NULL) { DPRINTF(("searching for queue %s\n", qinstance_name)); if (lGetSubStr(ar_ep, QU_full_name, qinstance_name, AR_reserved_queues) == NULL) { schedd_mes_add_global(a->monitor_alpp, a->monitor_next_run, SCHEDD_INFO_QNOTARRESERVED_SI, qinstance_name, ar_id); DRETURN(DISPATCH_NEVER_CAT); } } else { /* this is not advance reservation job, we have to drop queues in orphaned state */ if (lGetUlong(queue, QU_state) == QI_ORPHANED) { schedd_mes_add_global(a->monitor_alpp, a->monitor_next_run, SCHEDD_INFO_QUEUENOTAVAIL_, qinstance_name); DRETURN(DISPATCH_NEVER_CAT); } } /* check if job owner has access rights to the queue */ if (!sge_has_access(a->user, a->group, queue, a->acl_list)) { DPRINTF(("Job %d has no permission for queue %s\n", (int)a->job_id, qinstance_name)); schedd_mes_add(a->monitor_alpp, a->monitor_next_run, a->job_id, SCHEDD_INFO_HASNOPERMISSION_SS, "queue", qinstance_name); DRETURN(DISPATCH_NEVER_CAT); } /* check if job can run in queue based on project */ if ((projects = lGetList(queue, QU_projects))) { if (!a->project) { schedd_mes_add(a->monitor_alpp, a->monitor_next_run, a->job_id, SCHEDD_INFO_HASNOPRJ_S, "queue", qinstance_name); DRETURN(DISPATCH_NEVER_CAT); } if ((!prj_list_locate(projects, a->project))) { schedd_mes_add(a->monitor_alpp, a->monitor_next_run, a->job_id, SCHEDD_INFO_HASINCORRECTPRJ_SSS, a->project, "queue", qinstance_name); DRETURN(DISPATCH_NEVER_CAT); } } /* check if job can run in queue based on excluded projects */ if ((projects = lGetList(queue, QU_xprojects))) { if (a->project && prj_list_locate(projects, a->project)) { schedd_mes_add(a->monitor_alpp, a->monitor_next_run, a->job_id, SCHEDD_INFO_EXCLPRJ_SSS, a->project, "queue", qinstance_name); DRETURN(DISPATCH_NEVER_CAT); } } hard_queue_list = lGetList(a->job, JB_hard_queue_list); master_hard_queue_list = lGetList(a->job, JB_master_hard_queue_list); if (hard_queue_list || master_hard_queue_list) { if (!centry_list_are_queues_requestable(a->centry_list)) { schedd_mes_add(a->monitor_alpp, a->monitor_next_run, a->job_id, SCHEDD_INFO_QUEUENOTREQUESTABLE_S, qinstance_name); DRETURN(DISPATCH_NEVER_CAT); } } /* * is this queue a candidate for being the master queue? */ if (master_hard_queue_list) { if (qref_list_cq_rejected(master_hard_queue_list, lGetString(queue, QU_qname), lGetHost(queue, QU_qhostname), a->hgrp_list)) { DPRINTF(("Queue \"%s\" is not contained in the master hard " "queue list (-masterq) that was requested by job %d\n", qinstance_name, (int) a->job_id)); lSetUlong(queue, QU_tagged4schedule, 0); } else { could_be_master = true; } } /* * is queue contained in hard queue list ? */ if (hard_queue_list) { if ((could_be_master == false) && qref_list_cq_rejected(hard_queue_list, lGetString(queue, QU_qname), lGetHost(queue, QU_qhostname), a->hgrp_list)) { DPRINTF(("Queue \"%s\" is not contained in the hard " "queue list (-q) that was requested by job %d\n", qinstance_name, (int) a->job_id)); schedd_mes_add(a->monitor_alpp, a->monitor_next_run, a->job_id, SCHEDD_INFO_NOTINHARDQUEUELST_S, qinstance_name); DRETURN(DISPATCH_NEVER_CAT); } } /* ** different checks for different job types: */ if (a->pe) { /* parallel job */ if (!qinstance_is_parallel_queue(queue)) { DPRINTF(("Queue \"%s\" is not a parallel queue as requested by " "job %d\n", qinstance_name, (int)a->job_id)); schedd_mes_add(a->monitor_alpp, a->monitor_next_run, a->job_id, SCHEDD_INFO_NOTPARALLELQUEUE_S, qinstance_name); DRETURN(DISPATCH_NEVER_CAT); } /* * check if the requested PE is named in the PE reference list of Queue */ if (!qinstance_is_pe_referenced(queue, a->pe)) { DPRINTF(("Queue "SFQ" does not reference PE "SFQ"\n", qinstance_name, lGetString(a->pe, PE_name))); schedd_mes_add(a->monitor_alpp, a->monitor_next_run, a->job_id, SCHEDD_INFO_NOTINQUEUELSTOFPE_SS, qinstance_name, lGetString(a->pe, PE_name)); DRETURN(DISPATCH_NEVER_CAT); } } if (a->ckpt) { /* ckpt job */ /* is it a ckpt queue ? */ if (!qinstance_is_checkpointing_queue(queue)) { DPRINTF(("Queue \"%s\" is not a checkpointing queue as requested by " "job %d\n", qinstance_name, (int)a->job_id)); schedd_mes_add(a->monitor_alpp, a->monitor_next_run, a->job_id, SCHEDD_INFO_NOTACKPTQUEUE_SS, qinstance_name); DRETURN(DISPATCH_NEVER_CAT); } /* * check if the requested CKPT is named in the CKPT ref list of Queue */ if (!qinstance_is_ckpt_referenced(queue, a->ckpt)) { DPRINTF(("Queue \"%s\" does not reference checkpointing object "SFQ "\n", qinstance_name, lGetString(a->ckpt, CK_name))); schedd_mes_add(a->monitor_alpp, a->monitor_next_run, a->job_id, SCHEDD_INFO_NOTINQUEUELSTOFCKPT_SS, qinstance_name, lGetString(a->ckpt, CK_name)); DRETURN(DISPATCH_NEVER_CAT); } } /* to be activated as soon as immediate jobs are available */ if (JOB_TYPE_IS_IMMEDIATE(lGetUlong(a->job, JB_type))) { if (!qinstance_is_interactive_queue(queue)) { DPRINTF(("Queue \"%s\" is not an interactive queue as requested by " "job %d\n", qinstance_name, (int)a->job_id)); schedd_mes_add(a->monitor_alpp, a->monitor_next_run, a->job_id, SCHEDD_INFO_QUEUENOTINTERACTIVE_S, qinstance_name); DRETURN(DISPATCH_NEVER_CAT); } } if (!a->pe && !a->ckpt && !JOB_TYPE_IS_IMMEDIATE(lGetUlong(a->job, JB_type))) { /* serial (batch) job */ /* is it a batch or transfer queue */ if (!qinstance_is_batch_queue(queue)) { DPRINTF(("Queue \"%s\" is not a batch queue as " "requested by job %d\n", qinstance_name, (int)a->job_id)); schedd_mes_add(a->monitor_alpp, a->monitor_next_run, a->job_id, SCHEDD_INFO_NOTASERIALQUEUE_S, qinstance_name); DRETURN(DISPATCH_NEVER_CAT); } } /* RD: I don't understand this condition */ if (a->ckpt && !a->pe && !JOB_TYPE_IS_IMMEDIATE(lGetUlong(a->job, JB_type)) && qinstance_is_parallel_queue(queue) && !qinstance_is_batch_queue(queue)) { DPRINTF(("Queue \"%s\" is not a serial queue as " "requested by job %d\n", qinstance_name, (int)a->job_id)); schedd_mes_add(a->monitor_alpp, a->monitor_next_run, a->job_id, SCHEDD_INFO_NOTPARALLELJOB_S, qinstance_name); DRETURN(DISPATCH_NEVER_CAT); } if (job_is_forced_centry_missing(a, queue)) { DRETURN(DISPATCH_NEVER_CAT); } DRETURN(DISPATCH_OK); } static bool job_is_forced_centry_missing(const sge_assignment_t *a, const lListElem *queue_or_host) { bool ret = false; lListElem *centry; DENTER(TOP_LAYER, "job_is_forced_centry_missing"); if (a->job != NULL && a->centry_list != NULL && queue_or_host != NULL) { lList *res_list = lGetList(a->job, JB_hard_resource_list); bool is_qinstance = object_has_type(queue_or_host, QU_Type); /* Optimization: Have a forced_centry_list in the assignment structure * and only iterate over this list. */ for_each(centry, a->centry_list) { const char *name = lGetString(centry, CE_name); bool is_forced = lGetUlong(centry, CE_requestable) == REQU_FORCED ? true : false; if (!is_forced || is_requested(res_list, name)) { /* if requested or not forced we are always fine */ continue; } if (is_qinstance) { if (qinstance_is_centry_a_complex_value(queue_or_host, centry)) { schedd_mes_add(a->monitor_alpp, a->monitor_next_run, a->job_id, SCHEDD_INFO_NOTREQFORCEDRES_SS, name, lGetString(queue_or_host, QU_full_name)); ret = true; break; } } else { if (host_is_centry_a_complex_value(queue_or_host, centry)) { schedd_mes_add(a->monitor_alpp, a->monitor_next_run, a->job_id, SCHEDD_INFO_NOFORCEDRES_SS, name, lGetHost(queue_or_host, EH_name)); ret = true; break; } } } } DRETURN(ret); } /****** sge_select_queue/compute_soft_violations() ******************************** * NAME * compute_soft_violations() -- counts the violations in the request for a given host or queue * * SYNOPSIS * static int compute_soft_violations(lListElem *queue, int violation, lListElem *job,lList *load_attr, lList *config_attr, * lList *actual_attr, lList *centry_list, u_long32 layer, double lc_factor, u_long32 tag) * * FUNCTION * this function checks if the current resources can satisfy the requests. The resources come from the global host, a * given host or the queue. The function returns the number of violations. * * INPUTS * const sge_assignment_t *a - job info structure * lListElem *queue - should only be set, when one using this method on queue level * int violation - the number of previous violations. This is needed to get a correct result on queue level. * lList *load_attr - the load attributs, only when used on hosts or global * lList *config_attr - a list of custom attributes (CE_Type) * lList *actual_attr - a list of custom consumables, they contain the current usage of these attributes (RUE_Type) * u_long32 layer - the curent layer flag * double lc_factor - should be set, when load correction has to be done. * u_long32 tag - the current layer tag. (GLOGAL_TAG, HOST_TAG, QUEUE_TAG) * * RESULT * static int - the number of violations ( = (prev. violations) + (new violations in this run)). * *******************************************************************************/ static int compute_soft_violations(const sge_assignment_t *a, lListElem *queue, int violation, lList *load_attr, lList *config_attr, lList *actual_attr, u_long32 layer, double lc_factor, u_long32 tag) { u_long32 job_id; const char *queue_name = NULL; dstring reason; char reason_buf[1024 + 1]; unsigned int soft_violation = violation; lList *soft_requests = NULL; lListElem *attr; u_long32 start_time = DISPATCH_TIME_NOW; DENTER(TOP_LAYER, "compute_soft_violations"); sge_dstring_init(&reason, reason_buf, sizeof(reason_buf)); soft_requests = lGetList(a->job, JB_soft_resource_list); clear_resource_tags(soft_requests, tag); job_id = a->job_id; if (queue) { queue_name = lGetString(queue, QU_full_name); } /* count number of soft violations for _one_ slot of this job */ for_each(attr, soft_requests) { switch (ri_time_by_slots(a, attr, load_attr, config_attr, actual_attr, queue, &reason, false, 1, layer, lc_factor, &start_time, queue_name?queue_name:"no queue")) { /* no match */ case DISPATCH_NEVER_CAT: soft_violation++; break; /* element not found */ case DISPATCH_MISSING_ATTR: case DISPATCH_NOT_AT_TIME: if (tag == QUEUE_TAG && lGetUlong(attr, CE_tagged) == NO_TAG) { soft_violation++; } break; /* everything is fine */ default: if (lGetUlong(attr, CE_tagged) < tag) lSetUlong(attr, CE_tagged, tag); } } if (queue) { DPRINTF(("queue %s does not fulfill soft %d requests (first: %s)\n", queue_name, soft_violation, reason_buf)); /* * check whether queue fulfills soft queue request of the job (-q) */ if (lGetList(a->job, JB_soft_queue_list)) { lList *qref_list = lGetList(a->job, JB_soft_queue_list); lListElem *qr; const char *qinstance_name = NULL; qinstance_name = lGetString(queue, QU_full_name); for_each (qr, qref_list) { if (qref_cq_rejected(lGetString(qr, QR_name), lGetString(queue, QU_qname), lGetHost(queue, QU_qhostname), a->hgrp_list)) { DPRINTF(("Queue \"%s\" is not contained in the soft " "queue list (-q) that was requested by job %d\n", qinstance_name, (int) job_id)); soft_violation++; } } } /* store number of soft violations in queue */ lSetUlong(queue, QU_soft_violation, soft_violation); } DRETURN(soft_violation); } /****** sge_select_queue/sge_host_match_static() ******************************** * NAME * sge_host_match_static() -- Static test whether job fits to host * * SYNOPSIS * static int sge_host_match_static(lListElem *job, lListElem *ja_task, * lListElem *host, lList *centry_list, lList *acl_list) * * FUNCTION * * INPUTS * lListElem *job - ??? * lListElem *ja_task - ??? * lListElem *host - ??? * lList *centry_list - ??? * lList *acl_list - ??? * * RESULT * int - 0 ok * -1 assignment will never be possible for all jobs of that category * -2 assignment will never be possible for that particular job *******************************************************************************/ dispatch_t sge_host_match_static(const sge_assignment_t *a, lListElem *host) { lList *projects; const char *eh_name; DENTER(TOP_LAYER, "sge_host_match_static"); if (!host) { DRETURN(DISPATCH_OK); } eh_name = lGetHost(host, EH_name); /* check if job owner has access rights to the host */ if (!sge_has_access_(a->user, a->group, lGetList(host, EH_acl), lGetList(host, EH_xacl), a->acl_list)) { DPRINTF(("Job %d has no permission for host %s\n", (int)a->job_id, eh_name)); schedd_mes_add(a->monitor_alpp, a->monitor_next_run, a->job_id, SCHEDD_INFO_HASNOPERMISSION_SS, "host", eh_name); DRETURN(DISPATCH_NEVER_CAT); } /* check if job can run on host based on required projects */ if ((projects = lGetList(host, EH_prj))) { if (!a->project) { schedd_mes_add(a->monitor_alpp, a->monitor_next_run, a->job_id, SCHEDD_INFO_HASNOPRJ_S, "host", eh_name); DRETURN(DISPATCH_NEVER_CAT); } if ((!prj_list_locate(projects, a->project))) { schedd_mes_add(a->monitor_alpp, a->monitor_next_run, a->job_id, SCHEDD_INFO_HASINCORRECTPRJ_SSS, a->project, "host", eh_name); DRETURN(DISPATCH_NEVER_CAT); } } /* check if job can run on host based on excluded projects */ if ((projects = lGetList(host, EH_xprj))) { if (a->project && prj_list_locate(projects, a->project)) { schedd_mes_add(a->monitor_alpp, a->monitor_next_run, a->job_id, SCHEDD_INFO_EXCLPRJ_SSS, a->project, "host", eh_name); DRETURN(DISPATCH_NEVER_CAT); } } if (job_is_forced_centry_missing(a, host)) { DRETURN(DISPATCH_NEVER_CAT); } /* RU: */ /* ** check if job can run on host based on the list of jids/taskids ** contained in the reschedule_unknown-list */ if (a->ja_task) { lListElem *ruep; lList *rulp; u_long32 task_id; task_id = lGetUlong(a->ja_task, JAT_task_number); rulp = lGetList(host, EH_reschedule_unknown_list); for_each(ruep, rulp) { if (lGetUlong(ruep, RU_job_number) == a->job_id && lGetUlong(ruep, RU_task_number) == task_id) { DPRINTF(("RU: Job "sge_u32"."sge_u32" Host "SFN"\n", a->job_id, task_id, eh_name)); schedd_mes_add(a->monitor_alpp, a->monitor_next_run, a->job_id, SCHEDD_INFO_CLEANUPNECESSARY_S, eh_name); DRETURN(DISPATCH_NEVER_JOB); } } } DRETURN(DISPATCH_OK); } /****** sge_select_queue/is_requested() **************************************** * NAME * is_requested() -- Returns true if specified resource is requested. * * SYNOPSIS * bool is_requested(lList *req, const char *attr) * * FUNCTION * Returns true if specified resource is requested. Both long name * and shortcut name are checked. * * INPUTS * lList *req - The request list (CE_Type) * const char *attr - The resource name. * * RESULT * bool - true if requested, otherwise false * * NOTES * MT-NOTE: is_requested() is MT safe *******************************************************************************/ bool is_requested(lList *req, const char *attr) { if (lGetElemStr(req, CE_name, attr) || lGetElemStr(req, CE_shortcut , attr)) { return true; } return false; } static int load_check_alarm(char *reason, const char *name, const char *load_value, const char *limit_value, u_long32 relop, u_long32 type, lListElem *hep, lListElem *hlep, double lc_host, double lc_global, const lList *load_adjustments, int load_is_value) { lListElem *job_load; double limit, load; int match; #define STR_LC_DIAGNOSIS 1024 char lc_diagnosis1[STR_LC_DIAGNOSIS], lc_diagnosis2[STR_LC_DIAGNOSIS]; DENTER(TOP_LAYER, "load_check_alarm"); switch (type) { case TYPE_INT: case TYPE_TIM: case TYPE_MEM: case TYPE_BOO: case TYPE_DOUBLE: if (!parse_ulong_val(&load, NULL, type, load_value, NULL, 0)) { if (reason) sprintf(reason, MSG_SCHEDD_WHYEXCEEDINVALIDLOAD_SS, load_value, name); DRETURN(1); } if (!parse_ulong_val(&limit, NULL, type, limit_value, NULL, 0)) { if (reason) sprintf(reason, MSG_SCHEDD_WHYEXCEEDINVALIDTHRESHOLD_SS, name, limit_value); DRETURN(1); } if (load_is_value) { /* we got no load - this is just the complex value */ sge_strlcpy(lc_diagnosis2, MSG_SCHEDD_LCDIAGNOLOAD, STR_LC_DIAGNOSIS); } else if (((hlep && lc_host) || lc_global) && (job_load = lGetElemStr(load_adjustments, CE_name, name))) { /* load correction */ const char *load_correction_str; double load_correction; load_correction_str = lGetString(job_load, CE_stringval); if (!parse_ulong_val(&load_correction, NULL, type, load_correction_str, NULL, 0)) { if (reason) sprintf(reason, MSG_SCHEDD_WHYEXCEEDINVALIDLOADADJUST_SS, name, load_correction_str); DRETURN(1); } if (hlep) { int nproc; load_correction *= lc_host; if ((nproc = load_np_value_adjustment(name, hep, &load_correction)) > 0) { sprintf(lc_diagnosis1, MSG_SCHEDD_LCDIAGHOSTNP_SFI, load_correction_str, lc_host, nproc); } else { sprintf(lc_diagnosis1, MSG_SCHEDD_LCDIAGHOST_SF, load_correction_str, lc_host); } } else { load_correction *= lc_global; sprintf(lc_diagnosis1, MSG_SCHEDD_LCDIAGGLOBAL_SF, load_correction_str, lc_global); } /* it depends on relop in complex config whether load_correction is pos/neg */ switch (relop) { case CMPLXGE_OP: case CMPLXGT_OP: load += load_correction; sprintf(lc_diagnosis2, MSG_SCHEDD_LCDIAGPOSITIVE_SS, load_value, lc_diagnosis1); break; case CMPLXNE_OP: case CMPLXEQ_OP: case CMPLXLT_OP: case CMPLXLE_OP: default: load -= load_correction; sprintf(lc_diagnosis2, MSG_SCHEDD_LCDIAGNEGATIVE_SS, load_value, lc_diagnosis1); break; } } else { sge_strlcpy(lc_diagnosis2, MSG_SCHEDD_LCDIAGNONE, STR_LC_DIAGNOSIS); } /* is threshold exceeded ? */ if (resource_cmp(relop, load, limit)) { if (reason) { if (type == TYPE_BOO){ sprintf(reason, MSG_SCHEDD_WHYEXCEEDBOOLVALUE_SSSSS, name, load?MSG_TRUE:MSG_FALSE, lc_diagnosis2, map_op2str(relop), limit_value); } else { sprintf(reason, MSG_SCHEDD_WHYEXCEEDFLOATVALUE_SFSSS, name, load, lc_diagnosis2, map_op2str(relop), limit_value); } } DRETURN(1); } break; case TYPE_STR: case TYPE_CSTR: case TYPE_HOST: case TYPE_RESTR: match = string_base_cmp(type, limit_value, load_value); if (!match) { if (reason) sprintf(reason, MSG_SCHEDD_WHYEXCEEDSTRINGVALUE_SSSS, name, load_value, map_op2str(relop), limit_value); DRETURN(1); } break; default: if (reason) sprintf(reason, MSG_SCHEDD_WHYEXCEEDCOMPLEXTYPE_S, name); DRETURN(1); } DRETURN(0); } /****** sge_select_queue/load_np_value_adjustment() **************************** * NAME * load_np_value_adjustment() -- adjusts np load values for the number of processors * * SYNOPSIS * static int load_np_value_adjustment(const char* name, lListElem *hep, * double *load_correction) * * FUNCTION * Tests the load value name for "np_*". If this pattern is found, it will * retrieve the number of processors and adjusts the load_correction accordingly. * If the pattern is not found, it does nothing and returns 0 for number of processors. * * INPUTS * const char* name - load value name * lListElem *hep - host object * double *load_correction - current load_correction for further corrections * * RESULT * static int - number of processors, or 0 if it was called on a none np load value * * NOTES * MT-NOTE: load_np_value_adjustment() is MT safe * *******************************************************************************/ static int load_np_value_adjustment(const char* name, lListElem *hep, double *load_correction) { int nproc = 1; if (!strncmp(name, "np_", 3)) { lListElem *ep_nproc; if ((ep_nproc = lGetSubStr(hep, HL_name, LOAD_ATTR_NUM_PROC, EH_load_list))) { const char* cp = lGetString(ep_nproc, HL_value); if (cp) { nproc = atoi(cp); if (nproc > 1) { *load_correction /= nproc; } } } } else { nproc = 0; } return nproc; } static int resource_cmp(u_long32 relop, double req, double src_dl) { int match; switch(relop) { case CMPLXEQ_OP : match = ( req==src_dl); break; case CMPLXLE_OP : match = ( req<=src_dl); break; case CMPLXLT_OP : match = ( reqsrc_dl); break; case CMPLXGE_OP : match = ( req>=src_dl); break; case CMPLXNE_OP : match = ( req!=src_dl); break; default: match = 0; /* default -> no match */ } return match; } /* ---------------------------------------- sge_load_alarm() checks given threshold of the queue; centry_list and exechost_list get used therefore returns boolean: 1 yes, the threshold is exceeded 0 no */ int sge_load_alarm(char *reason, lListElem *qep, lList *threshold, const lList *exechost_list, const lList *centry_list, const lList *load_adjustments, bool is_check_consumable) { lListElem *hep, *global_hep, *tep; u_long32 ulc_factor; const char *load_value = NULL; const char *limit_value = NULL; double lc_host = 0, lc_global = 0; int load_is_value = 0; DENTER(TOP_LAYER, "sge_load_alarm"); if (!threshold) { /* no threshold -> no alarm */ DRETURN(0); } hep = host_list_locate(exechost_list, lGetHost(qep, QU_qhostname)); if(!hep) { if (reason) sprintf(reason, MSG_SCHEDD_WHYEXCEEDNOHOST_S, lGetHost(qep, QU_qhostname)); /* no host for queue -> ERROR */ DRETURN(1); } if ((lGetPosViaElem(hep, EH_load_correction_factor, SGE_NO_ABORT) >= 0) && (ulc_factor=lGetUlong(hep, EH_load_correction_factor))) { lc_host = ((double)ulc_factor)/100; } if ((global_hep = host_list_locate(exechost_list, SGE_GLOBAL_NAME)) != NULL) { if ((lGetPosViaElem(global_hep, EH_load_correction_factor, SGE_NO_ABORT) >= 0) && (ulc_factor=lGetUlong(global_hep, EH_load_correction_factor))) lc_global = ((double)ulc_factor)/100; } for_each (tep, threshold) { lListElem *hlep = NULL, *glep = NULL, *queue_ep = NULL, *cep = NULL; bool need_free_cep = false; const char *name; u_long32 relop, type; name = lGetString(tep, CE_name); /* complex attriute definition */ if (!(cep = centry_list_locate(centry_list, name))) { if (reason) sprintf(reason, MSG_SCHEDD_WHYEXCEEDNOCOMPLEX_S, name); DRETURN(1); } if (!is_check_consumable && lGetUlong(cep, CE_consumable) != CONSUMABLE_NO) { continue; } if (hep != NULL) { hlep = lGetSubStr(hep, HL_name, name, EH_load_list); } if (lGetUlong(cep, CE_consumable) == CONSUMABLE_NO) { if (hlep != NULL) { load_value = lGetString(hlep, HL_value); load_is_value = 0; } else if ((global_hep != NULL) && ((glep = lGetSubStr(global_hep, HL_name, name, EH_load_list)) != NULL)) { load_value = lGetString(glep, HL_value); load_is_value = 0; } else { queue_ep = lGetSubStr(qep, CE_name, name, QU_consumable_config_list); if (queue_ep != NULL) { load_value = lGetString(queue_ep, CE_stringval); load_is_value = 1; } else { if (reason) { sprintf(reason, MSG_SCHEDD_NOVALUEFORATTR_S, name); } DRETURN(1); } } } else { /* load thesholds... */ if ((cep = get_attribute_by_name(global_hep, hep, qep, name, centry_list, DISPATCH_TIME_NOW, 0)) == NULL ) { if (reason) sprintf(reason, MSG_SCHEDD_WHYEXCEEDNOCOMPLEX_S, name); DRETURN(1); } need_free_cep = true; load_value = lGetString(cep, CE_pj_stringval); load_is_value = (lGetUlong(cep, CE_pj_dominant) & DOMINANT_TYPE_MASK) != DOMINANT_TYPE_CLOAD; } relop = lGetUlong(cep, CE_relop); limit_value = lGetString(tep, CE_stringval); type = lGetUlong(cep, CE_valtype); if (load_check_alarm(reason, name, load_value, limit_value, relop, type, hep, hlep, lc_host, lc_global, load_adjustments, load_is_value)) { if (need_free_cep) { lFreeElem(&cep); } DRETURN(1); } if (need_free_cep) { lFreeElem(&cep); } } DRETURN(0); } /* ---------------------------------------- sge_load_alarm_reasons() checks given threshold of the queue; centry_list and exechost_list get used therefore fills and returns string buffer containing reasons for alarm states */ char *sge_load_alarm_reason(lListElem *qep, lList *threshold, const lList *exechost_list, const lList *centry_list, char *reason, int reason_size, const char *threshold_type) { DENTER(TOP_LAYER, "sge_load_alarm_reason"); *reason = 0; /* no threshold -> no alarm */ if (threshold != NULL) { lList *rlp = NULL; lListElem *tep; bool first = true; /* get actual complex values for queue */ queue_complexes2scheduler(&rlp, qep, exechost_list, centry_list); /* check all thresholds */ for_each (tep, threshold) { const char *name; /* complex attrib name */ lListElem *cep; /* actual complex attribute */ char dom_str[5]; /* dominance as string */ u_long32 dom_val; /* dominance as u_long */ char buffer[MAX_STRING_SIZE]; /* buffer for one line */ const char *load_value; /* actual load value */ const char *limit_value; /* limit defined by threshold */ name = lGetString(tep, CE_name); if ( first == true ) { first = false; } else { strncat(reason, "\n\t", reason_size); } /* find actual complex attribute */ if ((cep = lGetElemStr(rlp, CE_name, name)) == NULL) { /* no complex attribute for threshold -> ERROR */ if (qinstance_state_is_unknown(qep)) { snprintf(buffer, MAX_STRING_SIZE, MSG_QINSTANCE_VALUEMISSINGMASTERDOWN_S, name); } else { snprintf(buffer, MAX_STRING_SIZE, MSG_SCHEDD_NOCOMPLEXATTRIBUTEFORTHRESHOLD_S, name); } strncat(reason, buffer, reason_size); continue; } limit_value = lGetString(tep, CE_stringval); if (!(lGetUlong(cep, CE_pj_dominant) & DOMINANT_TYPE_VALUE)) { dom_val = lGetUlong(cep, CE_pj_dominant); load_value = lGetString(cep, CE_pj_stringval); } else { dom_val = lGetUlong(cep, CE_dominant); load_value = lGetString(cep, CE_stringval); } monitor_dominance(dom_str, dom_val); snprintf(buffer, MAX_STRING_SIZE, "alarm %s:%s=%s %s-threshold=%s", dom_str, name, load_value, threshold_type, limit_value ); strncat(reason, buffer, reason_size); } lFreeList(&rlp); } DRETURN(reason); } /* ---------------------------------------- sge_split_queue_load() splits the incoming queue list (1st arg) into an unloaded and overloaded (2nd arg) list according to the load values contained in the execution host list (3rd arg) and with respect to the definitions in the complex list (4th arg). returns: 0 successful -1 errors in functions called by sge_split_queue_load */ int sge_split_queue_load( bool monitor_next_run, lList **unloaded, /* QU_Type */ lList **overloaded, /* QU_Type */ lList *exechost_list, /* EH_Type */ lList *centry_list, /* CE_Type */ const lList *load_adjustments, /* CE_Type */ lList *granted, /* JG_Type */ bool is_consumable_load_alarm, /* is true, when the consumable evaluation set a load alarm */ bool is_comprehensive, /* do the load evaluation comprehensive (include consumables) */ u_long32 ttype ) { lList *thresholds; int nverified = 0; char reason[2048]; DENTER(TOP_LAYER, "sge_split_queue_load"); /* a job has been dispatched recently, but load correction is not in use at all */ if (granted && !load_adjustments && !is_consumable_load_alarm) { DRETURN(0); } if (!granted || load_adjustments) { lListElem *qep, *next_qep; next_qep = lFirst(*unloaded); while ((qep = next_qep)) { bool remove_queue = false; next_qep = lNext(qep); /* do not verify load alarm if a job has been dispatched recently but not to the host where this queue resides */ if (lGetUlong(qep, QU_tagged4schedule) == 1) { /* this queue is already tagged for removing */ remove_queue = true; lSetUlong(qep, QU_tagged4schedule, 0); } else if (!granted || (granted && (sconf_get_global_load_correction() || lGetElemHost(granted, JG_qhostname, lGetHost(qep, QU_qhostname))))) { thresholds = lGetList(qep, ttype); nverified++; if (sge_load_alarm(reason, qep, thresholds, exechost_list, centry_list, load_adjustments, is_comprehensive) != 0) { remove_queue = true; if (ttype==QU_suspend_thresholds) { DPRINTF(("queue %s tagged to be in suspend alarm: %s\n", lGetString(qep, QU_full_name), reason)); schedd_mes_add_global(NULL, monitor_next_run, SCHEDD_INFO_QUEUEINALARM_SS, lGetString(qep, QU_full_name), reason); } else { DPRINTF(("queue %s tagged to be overloaded: %s\n", lGetString(qep, QU_full_name), reason)); schedd_mes_add_global(NULL, monitor_next_run, SCHEDD_INFO_QUEUEOVERLOADED_SS, lGetString(qep, QU_full_name), reason); } } } if (remove_queue) { if (overloaded != NULL) { lDechainElem(*unloaded, qep); if (*overloaded == NULL) { *overloaded = lCreateListHash("", lGetListDescr(*unloaded), false); } lAppendElem(*overloaded, qep); } else { lRemoveElem(*unloaded, &qep); } } } } DPRINTF(("verified threshold of %d queues\n", nverified)); DRETURN(0); } /****** sge_select_queue/sge_split_queue_slots_free() ************************** * NAME * sge_split_queue_slots_free() -- ??? * * SYNOPSIS * int sge_split_queue_slots_free(lList **free, lList **full) * * FUNCTION * Split queue list into queues with at least one slots and queues with * less than one free slot. The list optioally returned in full gets the * QNOSLOTS queue instance state set. * * INPUTS * lList **free - Input queue instance list and return free slots. * lList **full - If non-NULL the full queue instances get returned here. * * RESULT * int - 0 success * -1 error *******************************************************************************/ int sge_split_queue_slots_free(bool monitor_next_run, lList **free, lList **full) { lList *full_queues = NULL; lListElem *this = NULL; lListElem *next = NULL; DENTER(TOP_LAYER, "sge_split_queue_nslots_free"); if (free == NULL) { DRETURN(-1); } for (this=lFirst(*free); ((next=lNext(this))), this ; this = next) { if (qinstance_slots_used(this) >= (int)lGetUlong(this, QU_job_slots)) { this = lDechainElem(*free, this); if (!qinstance_state_is_full(this)) { schedd_mes_add_global(NULL, monitor_next_run, SCHEDD_INFO_QUEUEFULL_, lGetString(this, QU_full_name)); qinstance_state_set_full(this, true); if (full_queues == NULL) { full_queues = lCreateListHash("full one", lGetListDescr(*free), false); } lAppendElem(full_queues, this); } else if (full != NULL) { if (*full == NULL) { *full = lCreateList("full one", lGetListDescr(*free)); } lAppendElem(*full, this); } else { lFreeElem(&this); } } } /* dump out the -tsm log and add the new queues to the disabled queue list */ if (full_queues) { schedd_log_list(NULL, monitor_next_run, MSG_SCHEDD_LOGLIST_QUEUESFULLANDDROPPED, full_queues, QU_full_name); if (full != NULL) { if (*full == NULL) { *full = full_queues; } else { lAddList(*full, &full_queues); } } else { lFreeList(&full_queues); } } DRETURN(0); } /* ---------------------------------------- sge_split_suspended() splits the incoming queue list (1st arg) into non suspended queues and suspended queues (2nd arg) returns: 0 successful -1 error */ int sge_split_suspended( bool monitor_next_run, lList **queue_list, /* QU_Type */ lList **suspended /* QU_Type */ ) { lCondition *where; int ret; lList *lp = NULL; DENTER(TOP_LAYER, "sge_split_suspended"); if (!queue_list) { DRETURN(-1); } /* split queues */ where = lWhere("%T(!(%I m= %u) && !(%I m= %u) && !(%I m= %u) && !(%I m= %u))", lGetListDescr(*queue_list), QU_state, QI_SUSPENDED, QU_state, QI_CAL_SUSPENDED, QU_state, QI_CAL_DISABLED, QU_state, QI_SUSPENDED_ON_SUBORDINATE); ret = lSplit(queue_list, &lp, "full queues", where); lFreeWhere(&where); if (lp != NULL) { lListElem* mes_queue; for_each(mes_queue, lp) { if (!qinstance_state_is_manual_suspended(mes_queue)) { qinstance_state_set_manual_suspended(mes_queue, true); schedd_mes_add_global(NULL, monitor_next_run, SCHEDD_INFO_QUEUESUSP_, lGetString(mes_queue, QU_full_name)); } } schedd_log_list(NULL, monitor_next_run, MSG_SCHEDD_LOGLIST_QUEUESSUSPENDEDANDDROPPED, lp, QU_full_name); if (*suspended == NULL) { *suspended = lp; } else { lAddList(*suspended, &lp); } } DRETURN(ret); } /* ---------------------------------------- sge_split_cal_disabled() splits the incoming queue list (1st arg) into non disabled queues and cal_disabled queues (2nd arg) lList **queue_list, QU_Type lList **disabled QU_Type returns: 0 successful -1 errors in functions called by sge_split_queue_load */ int sge_split_cal_disabled(bool monitor_next_run, lList **queue_list, lList **disabled) { lCondition *where; int ret; lList *lp = NULL; DENTER(TOP_LAYER, "sge_split_cal_disabled"); if (!queue_list) { DRETURN(-1); } /* split queues */ where = lWhere("%T(!(%I m= %u))", lGetListDescr(*queue_list), QU_state, QI_CAL_DISABLED); ret = lSplit(queue_list, &lp, "full queues", where); lFreeWhere(&where); if (lp != NULL) { lListElem* mes_queue; for_each(mes_queue, lp) { schedd_mes_add_global(NULL, monitor_next_run, SCHEDD_INFO_QUEUEDISABLED_, lGetString(mes_queue, QU_full_name)); } schedd_log_list(NULL, monitor_next_run, MSG_SCHEDD_LOGLIST_QUEUESDISABLEDANDDROPPED, lp, QU_full_name); if (*disabled == NULL) { *disabled = lp; } else { lAddList(*disabled, &lp); } } DRETURN(ret); } /* ---------------------------------------- sge_split_disabled() splits the incoming queue list (1st arg) into non disabled queues and disabled queues (2nd arg) lList **queue_list, QU_Type lList **disabled QU_Type returns: 0 successful -1 errors in functions called by sge_split_queue_load */ int sge_split_disabled(bool monitor_next_run, lList **queue_list, lList **disabled) { lCondition *where; int ret; lList *lp = NULL; DENTER(TOP_LAYER, "sge_split_disabled"); if (!queue_list) { DRETURN(-1); } /* split queues */ where = lWhere("%T(!(%I m= %u) && !(%I m= %u))", lGetListDescr(*queue_list), QU_state, QI_DISABLED, QU_state, QI_CAL_DISABLED); ret = lSplit(queue_list, &lp, "full queues", where); lFreeWhere(&where); if (lp != NULL) { lListElem* mes_queue; for_each(mes_queue, lp) { schedd_mes_add_global(NULL, monitor_next_run, SCHEDD_INFO_QUEUEDISABLED_, lGetString(mes_queue, QU_full_name)); } schedd_log_list(NULL, monitor_next_run, MSG_SCHEDD_LOGLIST_QUEUESDISABLEDANDDROPPED, lp, QU_full_name); if (*disabled == NULL) { *disabled = lp; } else { lAddList(*disabled, &lp); } } DRETURN(ret); } /****** sge_select_queue/pe_cq_rejected() ************************************** * NAME * pe_cq_rejected() -- Check, if -pe pe_name rejects cluster queue * * SYNOPSIS * static bool pe_cq_rejected(const char *pe_name, const lListElem *cq) * * FUNCTION * Match a jobs -pe 'pe_name' with pe_list cluster queue configuration. * True is returned if the parallel environment has no access. * * INPUTS * const char *project - the pe request of a job (no wildcard) * const lListElem *cq - cluster queue (CQ_Type) * * RESULT * static bool - True, if rejected * * NOTES * MT-NOTE: pe_cq_rejected() is MT safe *******************************************************************************/ static bool pe_cq_rejected(const char *pe_name, const lListElem *cq) { const lListElem *alist; bool rejected; DENTER(TOP_LAYER, "pe_cq_rejected"); if (!pe_name) { DRETURN(false); } rejected = true; for_each (alist, lGetList(cq, CQ_pe_list)) { if (lGetSubStr(alist, ST_name, pe_name, ASTRLIST_value)) { rejected = false; break; } } DRETURN(rejected); } /****** sge_select_queue/project_cq_rejected() ********************************* * NAME * project_cq_rejected() -- Check, if -P project rejects cluster queue * * SYNOPSIS * static bool project_cq_rejected(const char *project, const lListElem *cq) * * FUNCTION * Match a jobs -P 'project' with project/xproject cluster queue configuration. * True is returned if the project has no access. * * INPUTS * const char *project - the project of a job or NULL * const lListElem *cq - cluster queue (CQ_Type) * * RESULT * static bool - True, if rejected * * NOTES * MT-NOTE: project_cq_rejected() is MT safe *******************************************************************************/ static bool project_cq_rejected(const char *project, const lListElem *cq) { const lList *projects; const lListElem *alist; bool rejected; DENTER(TOP_LAYER, "project_cq_rejected"); if (!project) { /* without project: rejected, if each "project" profile does contain project references */ for_each (alist, lGetList(cq, CQ_projects)) { if (!lGetList(alist, APRJLIST_value)) { DRETURN(false); } } DRETURN(true); } /* with project: rejected, if project is excluded by each "xproject" profile */ rejected = true; for_each (alist, lGetList(cq, CQ_xprojects)) { projects = lGetList(alist, APRJLIST_value); if (!projects || !prj_list_locate(projects, project)) { rejected = false; break; } } if (rejected) { DRETURN(true); } /* with project: rejected, if project is not included with each "project" profile */ rejected = true; for_each (alist, lGetList(cq, CQ_projects)) { projects = lGetList(alist, APRJLIST_value); if (!projects || prj_list_locate(projects, project)) { rejected = false; break; } } if (rejected) { DRETURN(true); } DRETURN(false); } /****** sge_select_queue/interactive_cq_rejected() ***************************** * NAME * interactive_cq_rejected() -- Check, if -now yes rejects cluster queue * * SYNOPSIS * static bool interactive_cq_rejected(const lListElem *cq) * * FUNCTION * Returns true if -now yes jobs can not be run in cluster queue * * INPUTS * const lListElem *cq - cluster queue (CQ_Type) * * RESULT * static bool - True, if rejected * * NOTES * MT-NOTE: interactive_cq_rejected() is MT safe *******************************************************************************/ static bool interactive_cq_rejected(const lListElem *cq) { const lListElem *alist; bool rejected; DENTER(TOP_LAYER, "interactive_cq_rejected"); rejected = true; for_each (alist, lGetList(cq, CQ_qtype)) { if ((lGetUlong(alist, AQTLIST_value) & IQ)) { rejected = false; break; } } DRETURN(rejected); } /****** sge_select_queue/access_cq_rejected() ********************************** * NAME * access_cq_rejected() -- Check, if cluster queue rejects user/project * * SYNOPSIS * static bool access_cq_rejected(const char *user, const char *group, const * lList *acl_list, const lListElem *cq) * * FUNCTION * ??? * * INPUTS * const char *user - Username * const char *group - Groupname * const lList *acl_list - List of access list definitions * const lListElem *cq - Cluster queue * * RESULT * static bool - True, if rejected * * NOTES * MT-NOTE: access_cq_rejected() is MT safe *******************************************************************************/ static bool access_cq_rejected(const char *user, const char *group, const lList *acl_list, const lListElem *cq) { const lListElem *alist; bool rejected; DENTER(TOP_LAYER, "access_cq_rejected"); /* rejected, if user/group is excluded by each "xacl" profile */ rejected = true; for_each (alist, lGetList(cq, CQ_xacl)) { if (!sge_contained_in_access_list_(user, group, lGetList(alist, AUSRLIST_value), acl_list)) { rejected = false; break; } } if (rejected) { DRETURN(true); } /* rejected, if user/group is not included in any "acl" profile */ rejected = true; for_each (alist, lGetList(cq, CQ_acl)) { const lList *t = lGetList(alist, AUSRLIST_value); if (!t || sge_contained_in_access_list_(user, group, t, acl_list)) { rejected = false; break; } } DRETURN(rejected); } /****** sge_select_queue/cqueue_match_static() ********************************* * NAME * cqueue_match_static() -- Does cluster queue match the job? * * SYNOPSIS * static dispatch_t cqueue_match_static(const char *cqname, * sge_assignment_t *a) * * FUNCTION * The function tries to find reasons (-q, -l and -P) why the * entire cluster is not suited for the job. * * INPUTS * const char *cqname - Cluster queue name * sge_assignment_t *a - ??? * * RESULT * static dispatch_t - Returns DISPATCH_OK or DISPATCH_NEVER_CAT * * NOTES * MT-NOTE: cqueue_match_static() is MT safe *******************************************************************************/ dispatch_t cqueue_match_static(const char *cqname, sge_assignment_t *a) { const lList *hard_resource_list, *master_hard_queue_list; const lListElem *cq; const char *project, *pe_name; u_long32 ar_id; DENTER(TOP_LAYER, "cqueue_match_static"); /* detect if entire cluster queue ruled out due to -q */ if (qref_list_cq_rejected(lGetList(a->job, JB_hard_queue_list), cqname, NULL, NULL) && (!a->pe_name || !(master_hard_queue_list = lGetList(a->job, JB_master_hard_queue_list)) || qref_list_cq_rejected(master_hard_queue_list, cqname, NULL, NULL))) { DPRINTF(("Cluster Queue \"%s\" is not contained in the hard queue list (-q) that " "was requested by job %d\n", cqname, (int)a->job_id)); schedd_mes_add(a->monitor_alpp, a->monitor_next_run, a->job_id, SCHEDD_INFO_NOTINHARDQUEUELST_S, cqname); DRETURN(DISPATCH_NEVER_CAT); } /* check if cqueue was reserved for AR job */ ar_id = lGetUlong(a->job, JB_ar); if (ar_id != 0) { lListElem *ar_ep = lGetElemUlong(a->ar_list, AR_id, ar_id); if (ar_ep != NULL) { lListElem *gep = NULL; for_each(gep, lGetList(ar_ep, AR_granted_slots)) { char *ar_cqueue = cqueue_get_name_from_qinstance(lGetString(gep, JG_qname)); if (strcmp(cqname, ar_cqueue) == 0) { /* found queue */ FREE(ar_cqueue); break; } FREE(ar_cqueue); } if (gep == NULL) { DPRINTF(("Cluster Queue \"%s\" was not reserved by advance reservation %d\n", cqname, ar_id)); schedd_mes_add(a->monitor_alpp, a->monitor_next_run, a->job_id, SCHEDD_INFO_QINOTARRESERVED_SI, cqname, ar_id); DRETURN(DISPATCH_NEVER_CAT); } } else { /* should never happen */ DRETURN(DISPATCH_NEVER_CAT); } } cq = lGetElemStr(*(object_type_get_master_list(SGE_TYPE_CQUEUE)), CQ_name, cqname); /* detect if entire cluster queue ruled out due to -l */ if ((hard_resource_list = lGetList(a->job, JB_hard_resource_list))) { dstring unsatisfied = DSTRING_INIT; if (request_cq_rejected(hard_resource_list, cq, a->centry_list, (!a->pe_name || a->slots == 1)?true:false, &unsatisfied)) { DPRINTF(("Cluster Queue \"%s\" can not fulfill resource request (-l %s) that " "was requested by job %d\n", cqname, sge_dstring_get_string(&unsatisfied), (int)a->job_id)); schedd_mes_add(a->monitor_alpp, a->monitor_next_run, a->job_id, SCHEDD_INFO_CANNOTRUNINQUEUE_SSS, sge_dstring_get_string(&unsatisfied), cqname, "of cluster queue"); sge_dstring_free(&unsatisfied); DRETURN(DISPATCH_NEVER_CAT); } } /* detect if entire cluster queue ruled out due to -P */ project = a->project; if (project_cq_rejected(project, cq)) { DPRINTF(("Cluster queue \"%s\" does not work for -P %s job %d\n", cqname, project?project:"", (int)a->job_id)); schedd_mes_add(a->monitor_alpp, a->monitor_next_run, a->job_id, SCHEDD_INFO_HASNOPRJ_S, "cluster queue", cqname); DRETURN(DISPATCH_NEVER_CAT); } /* detect if entire cluster queue ruled out due to user_list/xuser_lists */ if (access_cq_rejected(a->user, a->group, a->acl_list, cq)) { DPRINTF(("Job %d has no permission for cluster queue %s\n", (int)a->job_id, cqname)); schedd_mes_add(a->monitor_alpp, a->monitor_next_run, a->job_id, SCHEDD_INFO_HASNOPERMISSION_SS, "cluster queue", cqname); DRETURN(DISPATCH_NEVER_CAT); } /* detect if entire cluster queue ruled out due to -pe */ if (a->pe && (pe_name=a->pe_name) && pe_cq_rejected(pe_name, cq)) { DPRINTF(("Cluster queue "SFQ" does not reference PE "SFQ"\n", cqname, pe_name)); schedd_mes_add(a->monitor_alpp, a->monitor_next_run, a->job_id, SCHEDD_INFO_NOTINQUEUELSTOFPE_SS, cqname, pe_name); DRETURN(DISPATCH_NEVER_CAT); } /* detect if entire cluster queue ruled out due to -I y aka -now yes */ if (JOB_TYPE_IS_IMMEDIATE(lGetUlong(a->job, JB_type)) && interactive_cq_rejected(cq)) { DPRINTF(("Queue \"%s\" is not an interactive queue as requested by job %d\n", cqname, (int)a->job_id)); schedd_mes_add(a->monitor_alpp, a->monitor_next_run, a->job_id, SCHEDD_INFO_QUEUENOTINTERACTIVE_S, cqname); DRETURN(DISPATCH_NEVER_CAT); } /* Optimization: Walk through forced_centry_list and reject cq if possible */ DRETURN(DISPATCH_OK); } /****** sge_select_queue/sequential_tag_queues_suitable4job() ************** * NAME * sequential_tag_queues_suitable4job() -- ??? * * SYNOPSIS * * FUNCTION * The start time of a queue is always returned using the QU_available_at * field. * * The overall behaviour of this function is somewhat dependent on the * value that gets passed to assignment->start and whether soft requests * were specified with the job: * * (1) In case of now assignemnts (DISPATCH_TIME_NOW) only the first queue * suitable for jobs without soft requests is tagged. When soft requests * are specified all queues must be verified and tagged in order to find * the queue that fits best. * * (2) In case of reservation assignments (DISPATCH_TIME_QUEUE_END) the earliest * time is searched when the resources of global/host/queue are sufficient * for the job. The time-wise iteration is then done for each single resources * instance. * * Actually there are cases when iterating through all queues were not * needed: (a) if there was a global limitation search could stop once * a queue is found that causes no further delay (b) if job has * a soft request search could stop once a queue is found with minimum (=0) * soft violations. * * INPUTS * sge_assignment_t *assignment - job info structure * * RESULT * dispatch_t - 0 ok got an assignment * start time(s) and slots are tagged * 1 no assignment at the specified time * -1 assignment will never be possible for all jobs of that category * -2 assignment will never be possible for that particular job * * NOTES * MT-NOTE: sequential_tag_queues_suitable4job() is not MT safe *******************************************************************************/ static dispatch_t sequential_tag_queues_suitable4job(sge_assignment_t *a) { lList *skip_host_list = NULL; lList *skip_queue_list = NULL; lList *unclear_cqueue_list = NULL; lList *unclear_host_list = NULL; dstring rule_name = DSTRING_INIT; dstring rue_string = DSTRING_INIT; dstring limit_name = DSTRING_INIT; u_long32 ar_id = lGetUlong(a->job, JB_ar); lListElem *ar_ep = lGetElemUlong(a->ar_list, AR_id, ar_id); category_use_t use_category; bool got_solution = false; u_long32 tt_best = U_LONG32_MAX; u_long32 violations_best = U_LONG32_MAX; dispatch_t result; u_long32 tt_global = a->start; dispatch_t best_queue_result = DISPATCH_NEVER_CAT; int global_violations = 0; int queue_violations = 0; lListElem *qep; lListElem *best_qep = NULL; u_long32 best_qep_violations = U_LONG32_MAX; u_long32 best_qep_tt = U_LONG32_MAX; DENTER(TOP_LAYER, "sequential_tag_queues_suitable4job"); /* assemble job category information */ fill_category_use_t(a, &use_category, "NONE"); /* restore job messages from previous dispatch runs of jobs of the same category */ if (use_category.use_category) { schedd_mes_set_tmp_list(use_category.cache, CCT_job_messages, a->job_id); skip_host_list = lGetList(use_category.cache, CCT_ignore_hosts); skip_queue_list = lGetList(use_category.cache, CCT_ignore_queues); } if (ar_ep != NULL) { result = match_static_advance_reservation(a); if (result != DISPATCH_OK) { DRETURN(result); } } else { if (a->pi) a->pi->seq_global++; result = sequential_global_time(&tt_global, a, &global_violations); if (result != DISPATCH_OK && result != DISPATCH_MISSING_ATTR) { DRETURN(result); } } for_each(qep, a->queue_list) { u_long32 tt_host = a->start; u_long32 tt_queue = a->start; const char *eh_name; const char *qname, *cqname; u_long32 tt_rqs = 0; bool is_global; lListElem *hep; qname = lGetString(qep, QU_full_name); cqname = lGetString(qep, QU_qname); eh_name = lGetHost(qep, QU_qhostname); /* untag this queues */ lSetUlong(qep, QU_tag, 0); /* try to foreclose the cluster queue */ if (lGetElemStr(a->skip_cqueue_list, CTI_name, cqname)) { DPRINTF(("skip cluster queue %s\n", cqname)); continue; } if (lGetElemStr(a->skip_host_list, CTI_name, eh_name)) { DPRINTF(("rqs skip host %s\n", eh_name)); continue; } if (skip_host_list && lGetElemStr(skip_host_list, CTI_name, eh_name)){ DPRINTF(("job category skip host %s\n", eh_name)); continue; } if (skip_queue_list && lGetElemStr(skip_queue_list, CTI_name, qname)){ DPRINTF(("job category skip queue %s\n", qname)); continue; } if (!ar_ep) { /* resource quota matching */ if ((result = rqs_by_slots(a, cqname, eh_name, &tt_rqs, &is_global, &rue_string, &limit_name, &rule_name, got_solution?tt_best:U_LONG32_MAX)) != DISPATCH_OK) { best_queue_result = find_best_result(result, best_queue_result); if (is_global == false) { DPRINTF(("no match due to GLOBAL RQS\n", eh_name)); continue; } break; /* hit a global limit */ } if (got_solution && is_not_better(a, violations_best, tt_best, 0, tt_rqs)) { DPRINTF(("CUT TREE: Due to RQS for \"%s\"\n", qname)); if (is_global == false) continue; break; } } /* static cqueue matching */ if (!lGetElemStr(unclear_cqueue_list, CTI_name, cqname)) { if (a->pi) { a->pi->seq_cqstat++; } if (cqueue_match_static(cqname, a) != DISPATCH_OK) { lAddElemStr(&(a->skip_cqueue_list), CTI_name, cqname, CTI_Type); best_queue_result = find_best_result(DISPATCH_NEVER_CAT, best_queue_result); continue; } lAddElemStr(&unclear_cqueue_list, CTI_name, cqname, CTI_Type); } /* static host matching */ if (!(hep = lGetElemHost(a->host_list, EH_name, eh_name))) { ERROR((SGE_EVENT, MSG_SCHEDD_UNKNOWN_HOST_SS, qname, eh_name)); if (skip_queue_list != NULL) { lAddElemStr(&skip_queue_list, CTI_name, qname, CTI_Type); } continue; } if (!lGetElemStr(unclear_host_list, CTI_name, eh_name)) { if (a->pi) { a->pi->seq_hstat++; } if (qref_list_eh_rejected(lGetList(a->job, JB_hard_queue_list), eh_name, a->hgrp_list)) { schedd_mes_add(a->monitor_alpp, a->monitor_next_run, a->job_id, SCHEDD_INFO_NOTINHARDQUEUELST_S, eh_name); DPRINTF(("Host \"%s\" is not contained in the hard queue list (-q) that " "was requested by job %d\n", eh_name, (int)a->job_id)); if (skip_host_list) { lAddElemStr(&skip_host_list, CTI_name, eh_name, CTI_Type); } else { lAddElemStr(&(a->skip_host_list), CTI_name, eh_name, CTI_Type); } best_queue_result = find_best_result(DISPATCH_NEVER_CAT, best_queue_result); continue; } if ((result=sge_host_match_static(a, hep))) { if (result == DISPATCH_NEVER_JOB || !skip_host_list) { lAddElemStr(&(a->skip_host_list), CTI_name, eh_name, CTI_Type); } else { lAddElemStr(&(a->skip_host_list), CTI_name, eh_name, CTI_Type); } best_queue_result = find_best_result(result, best_queue_result); continue; } lAddElemStr(&unclear_host_list, CTI_name, eh_name, CTI_Type); } /* static queue matching */ if (a->pi) { a->pi->seq_qstat++; } if (sge_queue_match_static(a, qep) != DISPATCH_OK) { if (skip_queue_list) lAddElemStr(&skip_queue_list, CTI_name, qname, CTI_Type); best_queue_result = find_best_result(DISPATCH_NEVER_CAT, best_queue_result); continue; } /* ar matching */ if (ar_ep != NULL) { lListElem *ar_queue; lListElem *rep; dstring reason = DSTRING_INIT; ar_queue = lGetSubStr(ar_ep, QU_full_name, qname, AR_reserved_queues); /* * We are only interested in the static complexes on queue/host level. * These are tagged in this function. The result doesn't matter */ for_each(rep, lGetList(a->job, JB_hard_resource_list)) { const char *attrname = lGetString(rep, CE_name); lListElem *cplx_el = lGetElemStr(a->centry_list, CE_name, attrname); if (lGetUlong(cplx_el, CE_consumable) != CONSUMABLE_NO) { continue; } sge_dstring_clear(&reason); cplx_el = get_attribute_by_name(a->gep, hep, qep, attrname, a->centry_list, a->start, a->duration); if (cplx_el == NULL) { result = DISPATCH_MISSING_ATTR; sge_dstring_sprintf(&reason, MSG_ATTRIB_MISSINGATTRIBUTEXINCOMPLEXES_S, attrname); break; } result = match_static_resource(1, rep, cplx_el, &reason, false); lFreeElem(&cplx_el); if (result != DISPATCH_OK) { break; } lSetUlong(rep, CE_tagged, HOST_TAG); } if (result != DISPATCH_OK) { char buff[1024 + 1]; centry_list_append_to_string(lGetList(a->job, JB_hard_resource_list), buff, sizeof(buff) - 1); if (*buff && (buff[strlen(buff) - 1] == '\n')) { buff[strlen(buff) - 1] = 0; } schedd_mes_add(a->monitor_alpp, a->monitor_next_run, a->job_id, SCHEDD_INFO_CANNOTRUNATHOST_SSS, buff, lGetHost(hep, EH_name), sge_dstring_get_string(&reason)); } else { result = sequential_queue_time(&tt_queue, a, &queue_violations, ar_queue); if (result != DISPATCH_OK) { DPRINTF(("ar queue %s returned %d\n", qname, result)); if (skip_queue_list) { lAddElemStr(&skip_queue_list, CTI_name, lGetString(ar_queue, QU_full_name), CTI_Type); } best_queue_result = find_best_result(result, best_queue_result); continue; } tt_global = tt_host = tt_queue; } sge_dstring_free(&reason); } else { queue_violations = global_violations; /* dynamic host matching */ if (a->pi) a->pi->seq_hdyn++; result = sequential_host_time(&tt_host, a, &queue_violations, hep); if (result != DISPATCH_OK && result != DISPATCH_MISSING_ATTR) { if (skip_host_list) lAddElemStr(&skip_host_list, CTI_name, eh_name, CTI_Type); else lAddElemStr(&(a->skip_host_list), CTI_name, eh_name, CTI_Type); DPRINTF(("host %s returned %d\n", eh_name, result)); best_queue_result = find_best_result(result, best_queue_result); continue; } if (got_solution && is_not_better(a, violations_best, tt_best, queue_violations, tt_host)) { lAddElemStr(&(a->skip_host_list), CTI_name, eh_name, CTI_Type); DPRINTF(("CUT TREE: Due to HOST for \"%s\"\n", qname)); continue; } /* dynamic queue matching */ if (a->pi) a->pi->seq_qdyn++; result = sequential_queue_time(&tt_queue, a, &queue_violations, qep); if (result != DISPATCH_OK) { DPRINTF(("queue %s returned %d\n", qname, result)); if (skip_queue_list) { lAddElemStr(&skip_queue_list, CTI_name, qname, CTI_Type); } best_queue_result = find_best_result(result, best_queue_result); continue; } if (got_solution && is_not_better(a, violations_best, tt_best, queue_violations, tt_queue)) { DPRINTF(("CUT TREE: Due to QUEUE for \"%s\"\n", qname)); continue; } } if (result == DISPATCH_OK) { u_long32 this_tt = 0, this_violations = 0; lSetUlong(qep, QU_tag, 1); /* tag number of slots per queue and time when it will be available */ if (a->start == DISPATCH_TIME_QUEUE_END) { DPRINTF((" global "sge_u32" host "sge_u32" queue "sge_u32" rqs "sge_u32"\n", tt_global, tt_host, tt_queue, tt_rqs)); lSetUlong(qep, QU_available_at, this_tt = MAX(tt_queue, MAX(tt_host, MAX(tt_rqs, tt_global)))); } if (a->is_soft) { this_violations = lGetUlong(qep, QU_soft_violation); } DPRINTF((" set Q: %s number=1 when="sge_u32" violations="sge_u32"\n", qname, this_tt, this_violations)); best_queue_result = DISPATCH_OK; if (!a->is_reservation) { if (!a->is_soft || this_violations == 0) { /* we found our queue */ if (best_qep != NULL) { lSetUlong(best_qep, QU_tag, 0); } break; } } else { /* further search is pointless, if reservation depends from a global consumble */ /* earlier start time has higher preference than lower soft violations */ if (ar_ep == NULL && (tt_global >= MAX(MAX(tt_queue, tt_host), tt_rqs)) && !a->is_soft) { /* we found our queue */ if (best_qep != NULL) { lSetUlong(best_qep, QU_tag, 0); } break; } } if (a->is_soft) { if (!a->is_reservation) { /* check if this queue is better than our last best queue */ if (this_violations < best_qep_violations) { lSetUlong(best_qep, QU_tag, 0); best_qep = qep; best_qep_violations = this_violations; } else { /* reset QU_tag because a other queue is better */ lSetUlong(qep, QU_tag, 0); } } else { /* earlier start time has higher preference than lower soft violations */ if (this_tt < best_qep_tt || (this_tt == best_qep_tt && this_violations < best_qep_violations)) { lSetUlong(best_qep, QU_tag, 0); best_qep = qep; best_qep_tt = this_tt; best_qep_violations = this_violations; } else { lSetUlong(qep, QU_tag, 0); } } } got_solution = true; violations_best = this_violations; tt_best = this_tt; } else { DPRINTF(("queue %s reported %d\n", qname, result)); if (skip_queue_list) { lAddElemStr(&skip_queue_list, CTI_name, qname, CTI_Type); } best_queue_result = find_best_result(result, best_queue_result); } } lFreeList(&unclear_cqueue_list); lFreeList(&unclear_host_list); /* cache so far generated messages with the job category */ if (use_category.use_category) { lList *temp = schedd_mes_get_tmp_list(); if (temp){ lSetList(use_category.cache, CCT_job_messages, lCopyList(NULL, temp)); } } sge_dstring_free(&rue_string); sge_dstring_free(&limit_name); sge_dstring_free(&rule_name); DRETURN(best_queue_result); } /****** sge_select_queue/add_pe_slots_to_category() **************************** * NAME * add_pe_slots_to_category() -- defines an array of valid slot values * * SYNOPSIS * static bool add_pe_slots_to_category(category_use_t *use_category, * u_long32 *max_slotsp, lListElem *pe, int min_slots, int max_slots, lList * *pe_range) * * FUNCTION * In case of pe ranges does this function alocate memory and filles it wil * valid pe slot values. If a category is set, it stores them the category * for further jobs. * * INPUTS * category_use_t *use_category - category caching structure, must not be NULL * u_long32 *max_slotsp - number of different slot settings * lListElem *pe - pe, must not be NULL * int min_slots - min slot setting (pe range) * int max_slots - max slot setting (pe range) * lList *pe_range - pe range, must not be NULL * * RESULT * static bool - true, if successfull * * NOTES * MT-NOTE: add_pe_slots_to_category() is MT safe * *******************************************************************************/ static bool add_pe_slots_to_category(category_use_t *use_category, u_long32 *max_slotsp, lListElem *pe, int min_slots, int max_slots, lList *pe_range) { if (use_category->cache != NULL) { use_category->posible_pe_slots = lGetRef(use_category->cache, CCT_pe_job_slots); } /* --- prepare the posible slots for the binary search */ if (use_category->posible_pe_slots == NULL) { int slots; *max_slotsp = 0; for (slots = min_slots; slots <= max_slots; slots++) { /* sort out slot numbers that would conflict with allocation rule */ if (sge_pe_slots_per_host(pe, slots) == 0) { continue; } /* only slot numbers from jobs PE range request */ if (range_list_is_id_within(pe_range, slots) == 0) { continue; } if (use_category->posible_pe_slots == NULL) { use_category->posible_pe_slots = malloc((max_slots - min_slots + 1) * sizeof(u_long32)); if (use_category->posible_pe_slots == NULL) { return false; } } use_category->posible_pe_slots[(*max_slotsp)] = slots; (*max_slotsp)++; } if (use_category->cache != NULL) { lSetRef(use_category->cache, CCT_pe_job_slots, use_category->posible_pe_slots); lSetUlong(use_category->cache, CCT_pe_job_slot_count, *max_slotsp); use_category->is_pe_slots_rev = true; } else { use_category->is_pe_slots_rev = false; } } else { use_category->is_pe_slots_rev = true; *max_slotsp = lGetUlong(use_category->cache, CCT_pe_job_slot_count); } return true; } /****** sge_select_queue/fill_category_use_t() ************** * NAME * fill_category_use_t() -- fills the category_use_t structure. * * SYNOPSIS * void fill_category_use_t(sge_assignment_t *a, category_use_t * *use_category, const char *pe_name) * * FUNCTION * If a cache structure for the given PE does not exist, it * will generate the neccissary data structures. * * INPUTS * sge_assignment_t *a - job info structure (in) * category_use_t *use_category - category info structure (out) * const char* pe_name - the current pe name or "NONE" * * NOTES * MT-NOTE: fill_category_use_t() is MT safe *******************************************************************************/ static void fill_category_use_t(const sge_assignment_t *a, category_use_t *use_category, const char *pe_name) { lListElem *job = a->job; DENTER(TOP_LAYER, "fill_category_use_t"); use_category->category = lGetRef(job, JB_category); if (use_category->category != NULL) { use_category->cache = lGetElemStr(lGetList(use_category->category, CT_cache), CCT_pe_name, pe_name); if (use_category->cache == NULL) { use_category->cache = lCreateElem(CCT_Type); lSetString(use_category->cache, CCT_pe_name, pe_name); lSetList(use_category->cache, CCT_ignore_queues, lCreateList("", CTI_Type)); lSetList(use_category->cache, CCT_ignore_hosts, lCreateList("", CTI_Type)); lSetList(use_category->cache, CCT_job_messages, lCreateList("", MES_Type)); if (lGetList(use_category->category, CT_cache) == NULL) { lSetList(use_category->category, CT_cache, lCreateList("pe_cache", CCT_Type)); } lAppendElem(lGetList(use_category->category, CT_cache), use_category->cache); } use_category->mod_category = true; use_category->use_category = ((a->start == DISPATCH_TIME_NOW) && lGetUlong(use_category->category, CT_refcount) > MIN_JOBS_IN_CATEGORY) ? true : false; } else { use_category->cache = NULL; use_category->mod_category = false; use_category->use_category = false; } use_category->posible_pe_slots = NULL; use_category->is_pe_slots_rev = false; DRETURN_VOID; } static int get_soft_violations(sge_assignment_t *a, lListElem *hep, lListElem *qep) { int violations; violations = compute_soft_violations(a, NULL, 0, lGetList(a->gep, EH_load_list), lGetList(a->gep, EH_consumable_config_list), lGetList(a->gep, EH_resource_utilization), DOMINANT_LAYER_GLOBAL, 0, GLOBAL_TAG); violations = compute_soft_violations(a, NULL, violations, lGetList(hep, EH_load_list), lGetList(hep, EH_consumable_config_list), lGetList(hep, EH_resource_utilization), DOMINANT_LAYER_HOST, 0, HOST_TAG); violations = compute_soft_violations(a, qep, violations, NULL, lGetList(qep, QU_consumable_config_list), lGetList(qep, QU_resource_utilization), DOMINANT_LAYER_QUEUE, 0, QUEUE_TAG); return violations; } /****** sge_select_queue/parallel_tag_queues_suitable4job() ********* * NAME * parallel_tag_queues_suitable4job() -- Tag queues/hosts for * a comprehensive/parallel assignment * * SYNOPSIS * static int parallel_tag_queues_suitable4job(sge_assignment_t * *assignment) * * FUNCTION * We tag the amount of available slots for that job at global, host and * queue level under consideration of all constraints of the job. We also * mark those queues that are suitable as a master queue as possible master * queues and count the number of violations of the job's soft request. * The method below is named comprehensive since it does the tagging game * for the whole parallel job and under consideration of all available * resources that could help to suffice the jobs request. This is necessary * to prevent consumable resource limited at host/global level multiple * times. * * While tagging we also set queues QU_host_seq_no based on the sort * order of each host. Assumption is the host list passed is sorted * according the load forumla. * * INPUTS * sge_assignment_t *assignment - ??? * category_use_t use_category - information on how to use the job category * * RESULT * static dispatch_t - 0 ok got an assignment * 1 no assignment at the specified time * -1 assignment will never be possible for all jobs of that category * -2 assignment will never be possible for that particular job * * NOTES * MT-NOTE: parallel_tag_queues_suitable4job() is not MT safe *******************************************************************************/ static dispatch_t parallel_tag_queues_suitable4job(sge_assignment_t *a, category_use_t *use_category, int *available_slots) { lListElem *job = a->job; bool have_masterq_request = (lGetList(job, JB_master_hard_queue_list) != NULL) ? true : false; bool have_hard_queue_request = (lGetList(job, JB_hard_queue_list) != NULL) ? true : false; int accu_host_slots, accu_host_slots_qend; bool have_master_host, have_master_host_qend, suited_as_master_host; lListElem *hep; dispatch_t best_result = DISPATCH_NEVER_CAT; int gslots = a->slots; int gslots_qend = 0; int allocation_rule, minslots; dstring rule_name = DSTRING_INIT; dstring rue_name = DSTRING_INIT; dstring limit_name = DSTRING_INIT; lListElem *gdil_ep; DENTER(TOP_LAYER, "parallel_tag_queues_suitable4job"); clean_up_parallel_job(a); if (use_category->use_category) { schedd_mes_set_tmp_list(use_category->cache, CCT_job_messages, a->job_id); } if (lGetUlong(a->job, JB_ar) == 0) { if (a->pi) a->pi->par_global++; parallel_global_slots(a, &gslots, &gslots_qend); } if (gslots < a->slots) { best_result = (gslots_qend < a->slots) ? DISPATCH_NEVER_CAT : DISPATCH_NOT_AT_TIME; *available_slots = gslots_qend; if (best_result == DISPATCH_NOT_AT_TIME) { DPRINTF(("GLOBAL will get us %d slots (%d)\n", gslots, gslots_qend)); } else { DPRINTF(("GLOBAL will get us %d slots (%d)\n", gslots, gslots_qend)); } } else { lList *skip_host_list = NULL; lList *unclear_cqueue_list = NULL; bool done; int last_accu_host_slots, last_accu_host_slots_qend; if (use_category->use_category) { skip_host_list = lGetList(use_category->cache, CCT_ignore_hosts); } accu_host_slots = accu_host_slots_qend = 0; have_master_host = have_master_host_qend = false; allocation_rule = sge_pe_slots_per_host(a->pe, a->slots); minslots = ALLOC_RULE_IS_BALANCED(allocation_rule)?allocation_rule:1; { u_long32 host_seq_no = 0; const char *eh_name; lListElem *qep; /* non-eligable queues may have no impact on host preference */ for_each (qep, a->queue_list) { const char *cqname = lGetString(qep, QU_qname); /* try to foreclose the cluster queue */ if (lGetElemStr(a->skip_cqueue_list, CTI_name, cqname)) { lSetUlong(qep, QU_tag, 0); DPRINTF(("(1) skip cluster queue %s\n", cqname)); continue; } if (!lGetElemStr(unclear_cqueue_list, CTI_name, cqname)) { if (a->pi) a->pi->par_cqstat++; if (cqueue_match_static(cqname, a) != DISPATCH_OK) { lAddElemStr(&(a->skip_cqueue_list), CTI_name, cqname, CTI_Type); /* tag QI as unsuited */ lSetUlong(qep, QU_tag, 0); DPRINTF(("add cluster queue %s to skip list\n", cqname)); continue; } DPRINTF(("cqueue %s is not rejected\n", cqname)); lAddElemStr(&unclear_cqueue_list, CTI_name, cqname, CTI_Type); } lSetUlong(qep, QU_tag, 1); } if (a->is_soft) { a->soft_violations = 0; for_each (qep, a->queue_list) { lSetUlong(qep, QU_soft_violation, 0); if (lGetUlong(qep, QU_tag) == 0 || !(hep = host_list_locate(a->host_list, lGetHost(qep, QU_qhostname)))) { continue; } get_soft_violations(a, hep, qep); DPRINTF(("EVALUATE: %s soft violations %d\n", lGetString(qep, QU_full_name), lGetUlong(qep, QU_soft_violation))); } if (sconf_get_queue_sort_method() == QSM_LOAD) { lPSortList(a->queue_list, "%I- %I+ %I+ %I+", QU_tag, QU_soft_violation, QU_host_seq_no, QU_seq_no); } else { lPSortList(a->queue_list, "%I- %I+ %I+ %I+", QU_tag, QU_soft_violation, QU_seq_no, QU_host_seq_no); } } else { int last_dispatch_type = sconf_get_last_dispatch_type(); if (last_dispatch_type == DISPATCH_TYPE_PE_SOFT_REQ || last_dispatch_type == DISPATCH_TYPE_NONE || sconf_get_host_order_changed()) { if (sconf_get_queue_sort_method() == QSM_LOAD) { lPSortList(a->queue_list, "%I+ %I+", QU_host_seq_no, QU_seq_no); } else { lPSortList(a->queue_list, "%I+ %I+", QU_seq_no, QU_host_seq_no); } } } for_each (hep, a->host_list) { lSetUlong(hep, EH_seq_no, -1); } for_each (qep, a->queue_list) { DPRINTF(("AFTER SORT: %s soft violations %d\n", lGetString(qep, QU_full_name), lGetUlong(qep, QU_soft_violation))); if (lGetUlong(qep, QU_tag) == 0) { continue; } eh_name = lGetHost(qep, QU_qhostname); if (!(hep = host_list_locate(a->host_list, eh_name))) continue; if (lGetUlong(hep, EH_seq_no) == -1) lSetUlong(hep, EH_seq_no, host_seq_no++); } lPSortList(a->host_list, "%I+", EH_seq_no); } done = false; do { last_accu_host_slots = accu_host_slots; last_accu_host_slots_qend = accu_host_slots_qend; for_each (hep, a->host_list) { int hslots = 0, hslots_qend = 0; const char *eh_name = lGetHost(hep, EH_name); if (!strcasecmp(eh_name, SGE_GLOBAL_NAME) || !strcasecmp(eh_name, SGE_TEMPLATE_NAME)) { continue; } /* this host does not work for this category, skip it */ if (skip_host_list && lGetElemStr(skip_host_list, CTI_name, eh_name)){ continue; } if (lGetElemStr(a->skip_host_list, CTI_name, eh_name)){ continue; } /* * do not perform expensive checks for this host if there * is not at least one free queue residing at this host: * see if there are queues which are not disbaled/suspended/calender; * which have at least one free slot, which are not unknown, several alarms */ if (lGetElemHost(a->queue_list, QU_qhostname, eh_name)) { const void *iter = NULL; parallel_tag_hosts_queues(a, hep, &hslots, &hslots_qend, &suited_as_master_host, use_category, &unclear_cqueue_list); if (hslots >= minslots) { /* Now RQ limit debitation can be performed for each queue instance based on QU_tag. While considering allocation_rule and master queue demand we try to get as much from each queue as needed, but not more than 'maxslots'. When we are through all queue instances and got not even 'minslots' the slots must be undebited. */ bool got_master_queue = have_master_host; lListElem *qep; int rqs_hslots = 0, maxslots, slots, slots_qend; /* still broken for cases where round robin requires multiple rounds */ if (ALLOC_RULE_IS_BALANCED(allocation_rule)) { maxslots = allocation_rule; } else if (allocation_rule == ALLOC_RULE_FILLUP) { maxslots = MIN(a->slots - accu_host_slots, hslots); } else {/* ALLOC_RULE_ROUNDROBIN */ maxslots = 1; } /* debit on RQS limits */ for (qep = lGetElemHostFirst(a->queue_list, QU_qhostname, eh_name, &iter); qep; qep = lGetElemHostNext(a->queue_list, QU_qhostname, eh_name, &iter)) { const char *qname = lGetString(qep, QU_full_name); if (lGetUlong(qep, QU_tag) == 0) continue; DPRINTF(("tagged: %d maxslots: %d rqs_hslots: %d\n", (int)lGetUlong(qep, QU_tag), maxslots, rqs_hslots)); DPRINTF(("SLOT HARVESTING: %s soft violations: %d master: %d\n", lGetString(qep, QU_full_name), (int)lGetUlong(qep, QU_soft_violation), (int)lGetUlong(qep, QU_tagged4schedule))); /* how much is still needed */ slots = MIN(lGetUlong(qep, QU_tag), maxslots - rqs_hslots); if (!have_master_host && !got_master_queue) { if (lGetUlong(qep, QU_tagged4schedule) < 2) { /* care for slave tasks assignments of -masterq jobs we need at least one slot on the masterq, thus we reduce by one slot if we run out of slots */ if (accu_host_slots + rqs_hslots + slots == a->slots) { slots--; } } else { /* care for master tasks assignments of -masterq jobs */ if (have_masterq_request && have_hard_queue_request) { /* if the masterq request is not contained in the hard queue request we need to allocate only one slot for the master task */ if (qref_list_cq_rejected(lGetList(a->job, JB_hard_queue_list), lGetString(qep, QU_qname), lGetHost(qep, QU_qhostname), a->hgrp_list)) { slots = MIN(slots, 1); } } } } slots_qend = 0; if (lGetUlong(a->job, JB_ar)==0 && !a->is_advance_reservation) { DPRINTF(("trying to debit %d slots in queue "SFQ"\n", slots, qname)); parallel_check_and_debit_rqs_slots(a, eh_name, lGetString(qep, QU_qname), &slots, &slots_qend, &rule_name, &rue_name, &limit_name); DPRINTF(("could debiting %d slots in queue "SFQ"\n", slots, qname)); } if (slots > 0) { /* add gdil element for this queue */ if (!(gdil_ep=lGetElemStr(a->gdil, JG_qname, qname))) { gdil_ep = lAddElemStr(&(a->gdil), JG_qname, qname, JG_Type); lSetUlong(gdil_ep, JG_qversion, lGetUlong(qep, QU_version)); lSetHost(gdil_ep, JG_qhostname, eh_name); lSetUlong(gdil_ep, JG_slots, slots); /* master queue must be at first position */ if (!have_master_host && lGetUlong(qep, QU_tagged4schedule) == 2) { lDechainElem(a->gdil, gdil_ep); lInsertElem(a->gdil, NULL, gdil_ep); got_master_queue = true; } } else { lAddUlong(gdil_ep, JG_slots, slots); } if (a->is_soft) a->soft_violations += slots * lGetUlong(qep, QU_soft_violation); } lSetUlong(qep, QU_tag, slots); rqs_hslots += slots; if (rqs_hslots == maxslots) break; } if (rqs_hslots < minslots) { DPRINTF(("reverting debitation since "SFQ" gets us only %d slots while min. %d are needed\n", eh_name, rqs_hslots, minslots)); /* must revert all RQS debitations */ for (qep = lGetElemHostFirst(a->queue_list, QU_qhostname, eh_name, &iter); qep; qep = lGetElemHostNext(a->queue_list, QU_qhostname, eh_name, &iter)) { slots = lGetUlong(qep, QU_tag); if (slots != 0) { if (lGetUlong(a->job, JB_ar)==0 && !a->is_advance_reservation) { parallel_revert_rqs_slot_debitation(a, eh_name, lGetString(qep, QU_qname), slots, 0, &rule_name, &rue_name, &limit_name); } if ((gdil_ep=lGetElemStr(a->gdil, JG_qname, lGetString(qep, QU_full_name)))) { if (lGetUlong(gdil_ep, JG_slots) - slots != 0) { lSetUlong(gdil_ep, JG_slots, lGetUlong(gdil_ep, JG_slots) - slots); } else { lDelElemStr(&(a->gdil), JG_qname, lGetString(qep, QU_full_name)); } } a->soft_violations -= slots * lGetUlong(qep, QU_soft_violation); lSetUlong(qep, QU_tag, 0); } } hslots = 0; } else { if (got_master_queue) have_master_host = true; accu_host_slots += rqs_hslots; } } if (hslots_qend >= minslots && a->care_reservation && !a->is_reservation) { lListElem *qep; bool got_master_queue_qend = have_master_host_qend; int rqs_hslots = 0, maxslots, slots, slots_qend; /* still broken for cases where round robin requires multiple rounds */ if (ALLOC_RULE_IS_BALANCED(allocation_rule)) { maxslots = allocation_rule; } else if (allocation_rule == ALLOC_RULE_FILLUP) { maxslots = MIN(a->slots - accu_host_slots_qend, hslots_qend); } else { /* ALLOC_RULE_ROUNDROBIN */ maxslots = 1; } /* debit on RQS limits */ for (qep = lGetElemHostFirst(a->queue_list, QU_qhostname, eh_name, &iter); qep; qep = lGetElemHostNext(a->queue_list, QU_qhostname, eh_name, &iter)) { if (lGetUlong(qep, QU_tag_qend) == 0) continue; slots = 0; slots_qend = MIN(lGetUlong(qep, QU_tag_qend), maxslots - rqs_hslots); if (!have_master_host_qend && !got_master_queue_qend) { if (lGetUlong(qep, QU_tagged4schedule) == 0) { /* care for slave tasks assignments of -masterq jobs we need at least one slot on the masterq, thus we reduce by one slot if we run out of slots */ if (accu_host_slots_qend + rqs_hslots + slots_qend == a->slots) { slots_qend--; } } else { /* care for master tasks assignments of -masterq jobs */ if (have_masterq_request && have_hard_queue_request) { /* if the masterq request is not contained in the hard queue request we need to allocate only one slot for the master task */ if (qref_list_cq_rejected(lGetList(a->job, JB_hard_queue_list), lGetString(qep, QU_qname), lGetHost(qep, QU_qhostname), a->hgrp_list)) { slots_qend = MIN(slots_qend, 1); } } } } if (lGetUlong(a->job, JB_ar)==0 && !a->is_advance_reservation) { DPRINTF(("trying to debit %d slots_qend in queue "SFQ"\n", slots_qend, lGetString(qep, QU_full_name))); parallel_check_and_debit_rqs_slots(a, eh_name, lGetString(qep, QU_qname), &slots, &slots_qend, &rule_name, &rue_name, &limit_name); DPRINTF(("could debiting %d slots_qend in queue "SFQ"\n", slots_qend, lGetString(qep, QU_full_name))); } if (slots_qend > 0) { if (!have_master_host_qend && lGetUlong(qep, QU_tagged4schedule) >= 1) { got_master_queue_qend = true; } } lSetUlong(qep, QU_tag_qend, slots_qend); rqs_hslots += slots_qend; if (rqs_hslots == maxslots) break; } if (rqs_hslots < minslots) { DPRINTF(("reverting debitation since "SFQ" gets us only %d slots while min. %d are needed\n", eh_name, rqs_hslots, minslots)); /* must revert all RQS debitations */ for (qep = lGetElemHostFirst(a->queue_list, QU_qhostname, eh_name, &iter); qep; qep = lGetElemHostNext(a->queue_list, QU_qhostname, eh_name, &iter)) { slots_qend = lGetUlong(qep, QU_tag_qend); if (slots_qend != 0) { if (lGetUlong(a->job, JB_ar)==0 && !a->is_advance_reservation) parallel_revert_rqs_slot_debitation(a, eh_name, lGetString(qep, QU_qname), 0, slots_qend, &rule_name, &rue_name, &limit_name); lSetUlong(qep, QU_tag_qend, 0); } } hslots_qend = 0; } else { if (got_master_queue_qend) have_master_host_qend = true; accu_host_slots_qend += rqs_hslots; } } } /* mark host as not suitable */ /* at least when accu_host_slots and accu_host_slots_qend < a->slots */ /* and only, if modify category is set */ if (skip_host_list && use_category->mod_category ) { if (hslots < minslots && hslots_qend < minslots) { lAddElemStr(&skip_host_list, CTI_name, eh_name, CTI_Type); } } /* exit is possible when we (a) got the full slot amount or (b) got full slot_qend amount and know full slot amount is not achievable anymore */ if ((accu_host_slots >= a->slots && have_master_host) && (!a->care_reservation || a->is_reservation || (accu_host_slots_qend >= a->slots && have_master_host_qend))) { DPRINTF(("WE ARE THROUGH!\n")); done = true; break; } /* no need to collect further when slots_qend are complete */ if (a->care_reservation && accu_host_slots_qend >= a->slots && have_master_host_qend) a->care_reservation = false; } /* for each host */ } while (allocation_rule == ALLOC_RULE_ROUNDROBIN && !done && (last_accu_host_slots != accu_host_slots || last_accu_host_slots_qend != accu_host_slots_qend)); lFreeList(&unclear_cqueue_list); if (accu_host_slots >= a->slots && have_master_host) { /* stop looking for smaller slot amounts */ DPRINTF(("--------------> BINGO %d slots at specified time <--------------\n", a->slots)); best_result = DISPATCH_OK; } else if (accu_host_slots_qend >= a->slots && have_master_host_qend) { DPRINTF(("--------------> %d slots later <--------------\n", a->slots)); best_result = DISPATCH_NOT_AT_TIME; } else { DPRINTF(("--------------> NEVER ONLY %d but %d needed <--------------\n", accu_host_slots, a->slots)); best_result = DISPATCH_NEVER_CAT; } /* could be interesting for future diagnosis that unveils what resources the job could get as of now we got only negative diagnosis information i.e. reasons why/where no resources are/available */ #if 0 if (best_result != DISPATCH_OK) { dstring sched_info = DSTRING_INIT; for_each (gdil_ep, a->gdil) { sge_dstring_sprintf(&sched_info, " HOST: %s QUEUE: %s SLOTS %d", lGetHost(gdil_ep, JG_qhostname), lGetString(gdil_ep, JG_qname), (int)lGetUlong(gdil_ep, JG_slots)); schedd_log(sge_dstring_get_string(&sched_info)); } sge_dstring_free(&sched_info); } #endif *available_slots = accu_host_slots; if (rmon_condition(xaybzc, INFOPRINT)) { switch (best_result) { case DISPATCH_OK: DPRINTF(("COMPREHSENSIVE ASSIGNMENT(%d) returns "sge_u32"\n", a->slots, a->start)); break; case DISPATCH_NOT_AT_TIME: DPRINTF(("COMPREHSENSIVE ASSIGNMENT(%d) returns \n", a->slots)); break; case DISPATCH_NEVER_CAT: DPRINTF(("COMPREHSENSIVE ASSIGNMENT(%d) returns \n", a->slots)); break; case DISPATCH_NEVER_JOB: DPRINTF(("COMPREHSENSIVE ASSIGNMENT(%d) returns \n", a->slots)); break; default: DPRINTF(("!!!!!!!! COMPREHSENSIVE ASSIGNMENT(%d) returns unexpected %d\n", best_result)); break; } } } sge_dstring_free(&rule_name); sge_dstring_free(&rue_name); sge_dstring_free(&limit_name); if (use_category->use_category) { lList *temp = schedd_mes_get_tmp_list(); if (temp){ lSetList(use_category->cache, CCT_job_messages, lCopyList(NULL, temp)); } } if (best_result != DISPATCH_OK) { lFreeList(&(a->gdil)); } DRETURN(best_result); } /****** sge_select_queue/parallel_host_slots() ****************************** * NAME * parallel_host_slots() -- Return host slots available at time period * * SYNOPSIS * FUNCTION * The maximum amount available at the host for the specified time period * is determined. * * * INPUTS * * RESULT *******************************************************************************/ static dispatch_t parallel_host_slots(sge_assignment_t *a, int *slots, int *slots_qend, lListElem *hep, bool allow_non_requestable) { int hslots = 0, hslots_qend = 0; const char *eh_name; dispatch_t result = DISPATCH_NEVER_CAT; lList *hard_requests = lGetList(a->job, JB_hard_resource_list); lList *load_list = lGetList(hep, EH_load_list); lList *config_attr = lGetList(hep, EH_consumable_config_list); lList *actual_attr = lGetList(hep, EH_resource_utilization); double lc_factor = 0; DENTER(TOP_LAYER, "parallel_host_slots"); eh_name = lGetHost(hep, EH_name); clear_resource_tags(hard_requests, HOST_TAG); if (a->pi) a->pi->par_hstat++; if (!(qref_list_eh_rejected(lGetList(a->job, JB_hard_queue_list), eh_name, a->hgrp_list) && qref_list_eh_rejected(lGetList(a->job, JB_master_hard_queue_list), eh_name, a->hgrp_list)) && sge_host_match_static(a, hep) == DISPATCH_OK) { /* cause load be raised artificially to reflect load correction when checking job requests */ if (lGetPosViaElem(hep, EH_load_correction_factor, SGE_NO_ABORT) >= 0) { u_long32 ulc_factor; if ((ulc_factor=lGetUlong(hep, EH_load_correction_factor))) { lc_factor = ((double)ulc_factor)/100; } } if (a->pi) a->pi->par_hdyn++; result = parallel_rc_slots_by_time(a, hard_requests, &hslots, &hslots_qend, config_attr, actual_attr, load_list, false, NULL, DOMINANT_LAYER_HOST, lc_factor, HOST_TAG, false, eh_name, false); if (result == DISPATCH_NOT_AT_TIME) { const void *queue_iterator = NULL; lListElem *next_queue, *qep; for (next_queue = lGetElemHostFirst(a->queue_list, QU_qhostname, eh_name, &queue_iterator); (qep = next_queue); next_queue = lGetElemHostNext(a->queue_list, QU_qhostname, eh_name, &queue_iterator)) { lSetUlong(qep, QU_tagged4schedule, 1); } result = DISPATCH_OK; } if (hslots > 0) { const void *iterator = NULL; lListElem *gdil; int t_max = parallel_max_host_slots(a, hep); if (t_maxgdil, JG_qhostname, eh_name, &iterator); gdil != NULL; gdil = lGetElemHostNext(a->gdil, JG_qhostname, eh_name, &iterator)) { hslots -= lGetUlong(gdil, JG_slots); } } } *slots = hslots; *slots_qend = hslots_qend; if (result == DISPATCH_OK) { DPRINTF(("\tparallel_host_slots(%s) returns %d/%d\n", eh_name, hslots, hslots_qend)); } else { DPRINTF(("\tparallel_host_slots(%s) returns \n", eh_name)); } DRETURN(result); } /****** sge_select_queue/parallel_tag_hosts_queues() ********************************** * NAME * parallel_tag_hosts_queues() -- Determine host slots and tag queue(s) accordingly * * SYNOPSIS * * FUNCTION * For a particular job the maximum number of slots that could be served * at that host is determined in accordance with the allocation rule and * returned. The time of the assignment can be either DISPATCH_TIME_NOW * or a specific time, but never DISPATCH_TIME_QUEUE_END. * * In those cases when the allocation rule allows more than one slot be * served per host it is necessary to also consider per queue possibly * specified load thresholds. This is because load is global/per host * concept while load thresholds are a queue attribute. * * In those cases when the allocation rule gives us neither a fixed amount * of slots required nor an upper limit for the number per host slots (i.e. * $fill_up and $round_robin) we must iterate through all slot numbers from * 1 to the maximum number of slots "total_slots" and check with each slot * amount whether we can get it or not. Iteration stops when we can't get * more slots the host based on the queue limitations and load thresholds. * * As long as only one single queue at the host is eligible for the job the * it is sufficient to check with each iteration whether the corresponding * number of slots can be served by the host and it's queue or not. The * really sick case however is when multiple queues are eligable for a host: * Here we have to determine in each iteration step also the maximum number * of slots each queue could get us by doing a per queue iteration from the * 1 up to the maximum number of slots we're testing. The optimization in * effect here is to check always only if we could get more slots than with * the former per host slot amount iteration. * * INPUTS * sge_assignment_t *a - * lListElem *hep - current host * lListElem *global - global host * int *slots - out: # free slots * int *slots_qend - out: # free slots in the far far future * int global_soft_violations - # of global soft violations * bool *master_host - out: if true, found a master host * category_use_t *use_category - int/out : how to use the job category * * RESULT * static dispatch_t - 0 ok got an assignment * 1 no assignment at the specified time * -1 assignment will never be possible for all jobs of that category * -2 assignment will never be possible for that particular job * * NOTES * MT-NOTE: parallel_tag_hosts_queues() is not MT safe *******************************************************************************/ static dispatch_t parallel_tag_hosts_queues(sge_assignment_t *a, lListElem *hep, int *slots, int *slots_qend, bool *master_host, category_use_t *use_category, lList **unclear_cqueue_list) { bool suited_as_master_host = false; int min_host_slots = 1; int max_host_slots = a->slots; int accu_queue_slots, accu_queue_slots_qend; int qslots, qslots_qend; int hslots = 0; int hslots_qend = 0; const char *cqname, *qname, *eh_name = lGetHost(hep, EH_name); lListElem *qep, *next_queue; dispatch_t result = DISPATCH_OK; const void *queue_iterator = NULL; int allocation_rule; DENTER(TOP_LAYER, "parallel_tag_hosts_queues"); allocation_rule = sge_pe_slots_per_host(a->pe, a->slots); if (ALLOC_RULE_IS_BALANCED(allocation_rule)) { min_host_slots = max_host_slots = allocation_rule; } if (lGetUlong(a->job, JB_ar) == 0) { parallel_host_slots(a, &hslots, &hslots_qend, hep, false); } else { lList *config_attr = lGetList(hep, EH_consumable_config_list); lList *actual_attr = lGetList(hep, EH_resource_utilization); lList *load_attr = lGetList(hep, EH_load_list); lList *hard_resource_list = lGetList(a->job, JB_hard_resource_list); lListElem *rep; dstring reason = DSTRING_INIT; clear_resource_tags(hard_resource_list, HOST_TAG); for_each(rep, hard_resource_list) { const char *attrname = lGetString(rep, CE_name); lListElem *cplx_el = lGetElemStr(a->centry_list, CE_name, attrname); if (lGetUlong(cplx_el, CE_consumable) != CONSUMABLE_NO) { continue; } sge_dstring_clear(&reason); cplx_el = get_attribute(attrname, config_attr, actual_attr, load_attr, a->centry_list, NULL, DOMINANT_LAYER_HOST, 0, &reason, false, DISPATCH_TIME_NOW, 0); if (cplx_el != NULL) { if (match_static_resource(1, rep, cplx_el, &reason, false) == DISPATCH_OK) { lSetUlong(rep, CE_tagged, HOST_TAG); } lFreeElem(&cplx_el); } } sge_dstring_free(&reason); { lListElem *gdil_ep; const void *iterator = NULL; lListElem *ar = ar_list_locate(a->ar_list, lGetUlong(a->job, JB_ar)); lList *granted_list = lGetList(ar, AR_granted_slots); for (gdil_ep = lGetElemHostFirst(granted_list, JG_qhostname, eh_name, &iterator); gdil_ep != NULL; gdil_ep = lGetElemHostNext(granted_list, JG_qhostname, eh_name, &iterator)) { hslots += lGetUlong(gdil_ep, JG_slots); } hslots_qend = hslots; } } DPRINTF(("HOST %s itself (and queue threshold) will get us %d slots (%d later) ... " "we need %d\n", eh_name, hslots, hslots_qend, min_host_slots)); hslots = MIN(hslots, max_host_slots); hslots_qend = MIN(hslots_qend, max_host_slots); if (hslots >= min_host_slots || hslots_qend >= min_host_slots) { lList *skip_queue_list = NULL; if (use_category->use_category) { skip_queue_list = lGetList(use_category->cache, CCT_ignore_queues); } accu_queue_slots = accu_queue_slots_qend = 0; for (next_queue = lGetElemHostFirst(a->queue_list, QU_qhostname, eh_name, &queue_iterator); (qep = next_queue); next_queue = lGetElemHostNext(a->queue_list, QU_qhostname, eh_name, &queue_iterator)) { qname = lGetString(qep, QU_full_name); cqname = lGetString(qep, QU_qname); if (lGetElemStr(a->skip_cqueue_list, CTI_name, cqname)) { /* QU_tag is already 0 */ lSetUlong(qep, QU_tag_qend, 0); DPRINTF(("skipped due to cluster queue %s\n", cqname)); continue; } if (skip_queue_list && lGetElemStr(skip_queue_list, CTI_name, qname)){ DPRINTF(("skipped due to queue instance %s\n", qname)); continue; } DPRINTF(("checking queue %s because cqueue %s is not rejected\n", qname, cqname)); result = parallel_queue_slots(a, qep, &qslots, &qslots_qend, false); if (result == DISPATCH_OK && (qslots > 0 || qslots_qend > 0)) { /* could this host be a master host */ if (!suited_as_master_host && lGetUlong(qep, QU_tagged4schedule)) { DPRINTF(("HOST %s can be master host because of queue %s\n", eh_name, qname)); suited_as_master_host = true; } DPRINTF(("QUEUE %s TIME: %d + %d -> %d QEND: %d + %d -> %d (%d soft violations)\n", qname, accu_queue_slots, qslots, accu_queue_slots + qslots, accu_queue_slots_qend, qslots_qend, accu_queue_slots_qend + qslots_qend, (int)lGetUlong(qep, QU_soft_violation))); accu_queue_slots += qslots; accu_queue_slots_qend += qslots_qend; lSetUlong(qep, QU_tag, qslots); lSetUlong(qep, QU_tag_qend, qslots_qend); } else { lSetUlong(qep, QU_tag, 0); if (skip_queue_list) { lAddElemStr(&skip_queue_list, CTI_name, qname, CTI_Type); } DPRINTF(("HOST(1.5) %s will get us nothing\n", eh_name)); } } /* for each queue of the host */ hslots = MIN(accu_queue_slots, hslots); hslots_qend = MIN(accu_queue_slots_qend, hslots_qend); DPRINTF(("HOST %s and it's queues will get us %d slots (%d later) ... we need %d\n", eh_name, hslots, hslots_qend, min_host_slots)); } *slots = hslots; *slots_qend = hslots_qend; *master_host = suited_as_master_host; DRETURN(DISPATCH_OK); } /* * Determine maximum number of host_slots as limited * by queue load thresholds. The maximum only considers * thresholds with load adjustments * * for each queue Q at this host { * for each threshold T of a queue { * if compare operator > or >= * avail(Q, T) = job, JB_hard_resource_list); DENTER(TOP_LAYER, "parallel_max_host_slots"); for (next_queue = lGetElemHostFirst(a->queue_list, QU_qhostname, eh_name, &queue_iterator); (qep = next_queue); next_queue = lGetElemHostNext(a->queue_list, QU_qhostname, eh_name, &queue_iterator)) { lList *load_thresholds = lGetList(qep, QU_load_thresholds); avail_q = INT_MAX; for_each(tr, load_thresholds) { double load, threshold, adjustment; const char *name = lGetString(tr, CE_name); if ((cep = centry_list_locate(a->centry_list, name)) == NULL) { continue; } if (lGetUlong(cep, CE_consumable) == CONSUMABLE_NO) { /* work on the load values */ if ((lc = lGetElemStr(a->load_adjustments, CE_name, name)) == NULL) { continue; } if ((lv=lGetSubStr(host, HL_name, name, EH_load_list)) != NULL) { load_value = lGetString(lv, HL_value); } else if ((lv = lGetSubStr(a->gep, HL_name, name, EH_load_list)) != NULL) { load_value = lGetString(lv, HL_value); } else { fv = lGetSubStr(qep, CE_name, name, QU_consumable_config_list); load_value = lGetString(fv, CE_stringval); } adj_value = lGetString(lc, CE_stringval); is_np_adjustment = true; adj_value = lGetString(lc, CE_stringval); } else { /* work on a consumable */ if ((lc = lGetElemStr(requests, CE_name, name)) != NULL) { adj_value = lGetString(lc, CE_stringval); } else { /* is default value */ adj_value = lGetString(cep, CE_default); } if ((centry = get_attribute_by_name(a->gep, host, qep, name, a->centry_list, a->start, a->duration)) == NULL) { /* no load value, no assigned consumable to queue, host, or global */ DPRINTF(("the consumable "SFN" used in queue "SFN" as load threshold has no instance at queue, host or global level\n", name, lGetString(qep, QU_full_name))); DRETURN(0); } load_value = lGetString(centry, CE_pj_stringval); } limit_value = lGetString(tr, CE_stringval); type = lGetUlong(cep, CE_valtype); /* get the needed values. If the load value is not a number, ignore it */ switch (type) { case TYPE_INT: case TYPE_TIM: case TYPE_MEM: case TYPE_BOO: case TYPE_DOUBLE: if (!parse_ulong_val(&load, NULL, type, load_value, NULL, 0) || !parse_ulong_val(&threshold, NULL, type, limit_value, NULL, 0) || !parse_ulong_val(&adjustment, NULL, type, adj_value, NULL, 0)) { lFreeElem(¢ry); continue; } break; default: lFreeElem(¢ry); continue; } /* the string in load_value is not needed anymore, we can free the element */ lFreeElem(¢ry); /* a adjustment of NULL is ignored here. We can dispatch unlimited jobs based on the load value */ if (adjustment == 0) { continue; } switch (lGetUlong(cep, CE_relop)) { case CMPLXEQ_OP : case CMPLXNE_OP : continue; /* we cannot compute a usefull range */ case CMPLXLT_OP : case CMPLXLE_OP : adjustment *= -1; break; } if (is_np_adjustment) { load_np_value_adjustment(name, host, &adjustment); } /* load alarm is defined in a way, that a host can go with one used slot into alarm and not that the dispatching prevents the alarm state. For this behavior we have to add 1 to the available slots. However, this is not true for consumables as load threshold*/ avail = (threshold - load)/adjustment; if (lGetUlong(cep, CE_consumable) == CONSUMABLE_NO) { avail++; } avail_q = MIN(avail, avail_q); } avail_h = MAX(avail_h, avail_q); } DRETURN(avail_h); } /****** sge_select_queue/sge_sequential_assignment() *************************** * NAME * sge_sequential_assignment() -- Make an assignment for a sequential job. * * SYNOPSIS * int sge_sequential_assignment(sge_assignment_t *assignment) * * FUNCTION * For sequential job assignments all the earliest job start time * is determined with each queue instance and the earliest one gets * chosen. Secondary criterion for queue selection minimizing jobs * soft requests. * * The overall behaviour of this function is somewhat dependent on the * value that gets passed to assignment->start and whether soft requests * were specified with the job: * * (1) In case of now assignemnts (DISPATCH_TIME_NOW) only the first queue * suitable for jobs without soft requests is tagged. When soft requests * are specified all queues must be verified and tagged in order to find * the queue that fits best. On success the start time is set * * (2) In case of queue end assignments (DISPATCH_TIME_QUEUE_END) * * * INPUTS * sge_assignment_t *assignment - ??? * * RESULT * int - 0 ok got an assignment + time (DISPATCH_TIME_NOW and DISPATCH_TIME_QUEUE_END) * 1 no assignment at the specified time * -1 assignment will never be possible for all jobs of that category * -2 assignment will never be possible for that particular job * * NOTES * MT-NOTE: sge_sequential_assignment() is not MT safe *******************************************************************************/ dispatch_t sge_sequential_assignment(sge_assignment_t *a) { dispatch_t result; int old_logging = 0; DENTER(TOP_LAYER, "sge_sequential_assignment"); if (a == NULL) { DRETURN(DISPATCH_NEVER_CAT); } if (a->is_reservation && !a->is_advance_reservation){ /* turn off messages for reservation scheduling */ old_logging = schedd_mes_get_logging(); schedd_mes_set_logging(0); sconf_set_mes_schedd_info(false); } sequential_update_host_order(a->host_list, a->queue_list); if (sconf_get_qs_state() != QS_STATE_EMPTY) { /*------------------------------------------------------------------ * There is no need to sort the queues after each dispatch in * case: * * 1. The last dispatch run did not resort the queue list * currently only DISPATCH_TYPE_PE_SOFT_REQ needs to sort * the list based on the soft violations * 2. The hosts sort order has not changed since last dispatch. * Load correction or consumables in the load formula can * change the order of the hosts. We detect changings in the * host order by comparing the host sequence number with the * sequence number from previous run. * ------------------------------------------------------------------*/ int last_dispatch_type = sconf_get_last_dispatch_type(); if (last_dispatch_type == DISPATCH_TYPE_PE_SOFT_REQ || last_dispatch_type == DISPATCH_TYPE_NONE || sconf_get_host_order_changed()) { DPRINTF(("SORTING HOSTS!\n")); if (sconf_get_queue_sort_method() == QSM_LOAD) { lPSortList(a->queue_list, "%I+ %I+", QU_host_seq_no, QU_seq_no); } else { lPSortList(a->queue_list, "%I+ %I+", QU_seq_no, QU_host_seq_no); } } if (a->is_soft) { sconf_set_last_dispatch_type(DISPATCH_TYPE_FAST_SOFT_REQ); } else { sconf_set_last_dispatch_type(DISPATCH_TYPE_FAST); } } result = sequential_tag_queues_suitable4job(a); if (result == DISPATCH_OK) { lListElem *qep; u_long32 job_start_time = U_LONG32_MAX; u_long32 min_soft_violations = U_LONG32_MAX; lListElem *best_queue = NULL; if (a->is_reservation) { for_each (qep, a->queue_list) { if (lGetUlong(qep, QU_tag) != 0) { u_long32 temp_job_start_time = lGetUlong(qep, QU_available_at); DPRINTF((" Q: %s "sge_u32" "sge_u32" (jst: "sge_u32")\n", lGetString(qep, QU_full_name), lGetUlong(qep, QU_tag), temp_job_start_time, job_start_time)); /* earlier start time has higher preference than lower soft violations */ if ((job_start_time > temp_job_start_time) || (a->is_soft && min_soft_violations > lGetUlong(qep, QU_soft_violation) && job_start_time == temp_job_start_time) ) { best_queue = qep; job_start_time = temp_job_start_time; min_soft_violations = lGetUlong(qep, QU_soft_violation); } } } if (best_queue) { DPRINTF(("earliest queue \"%s\" at "sge_u32"\n", lGetString(best_queue, QU_full_name), job_start_time)); } else { DPRINTF(("no earliest queue found!\n")); } } else { for_each (qep, a->queue_list) { if (lGetUlong(qep, QU_tag)) { job_start_time = lGetUlong(qep, QU_available_at); best_queue = qep; break; } } } if (!best_queue) { DRETURN(DISPATCH_NEVER_CAT); /* should never happen */ } { lListElem *gdil_ep; lList *gdil = NULL; const char *qname = lGetString(best_queue, QU_full_name); const char *eh_name = lGetHost(best_queue, QU_qhostname); DPRINTF((sge_u32": 1 slot in queue %s user %s %s for "sge_u32"\n", a->job_id, qname, a->user? a->user:"", !a->is_reservation?"scheduled":"reserved", job_start_time)); gdil_ep = lAddElemStr(&gdil, JG_qname, qname, JG_Type); lSetUlong(gdil_ep, JG_qversion, lGetUlong(best_queue, QU_version)); lSetHost(gdil_ep, JG_qhostname, eh_name); lSetUlong(gdil_ep, JG_slots, 1); if (!a->is_reservation) { sconf_inc_fast_jobs(); } lFreeList(&(a->gdil)); a->gdil = gdil; a->slots = 1; if (a->start == DISPATCH_TIME_QUEUE_END) { a->start = job_start_time; } } } switch (result) { case DISPATCH_OK: DPRINTF(("SEQUENTIAL ASSIGNMENT("sge_u32"."sge_u32") returns