/*****************************************************************************\
* dist_tasks.c - Assign task count for each resource.
*****************************************************************************
* Copyright (C) 2018-2019 SchedMD LLC
* Derived in large part from select/cons_res plugin
*
* This file is part of Slurm, a resource management program.
* For details, see .
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#include "cons_common.h"
#include "dist_tasks.h"
#include "src/common/xstring.h"
/* Max boards supported for best-fit across boards */
/* Larger board configurations may require new algorithm */
/* for acceptable performance */
#define MAX_BOARDS 8
/* Combination counts
* comb_counts[n-1][k-1] = number of combinations of
* k items from a set of n items
*
* Formula is n!/k!(n-k)!
*/
static uint32_t comb_counts[MAX_BOARDS][MAX_BOARDS] =
{{1,0,0,0,0,0,0,0},
{2,1,0,0,0,0,0,0},
{3,3,1,0,0,0,0,0},
{4,6,4,1,0,0,0,0},
{5,10,10,5,1,0,0,0},
{6,15,20,15,6,1,0,0},
{7,21,35,35,21,7,1,0},
{8,28,56,70,56,28,8,1}};
static int *sockets_core_cnt = NULL;
/*
* Generate all combinations of k integers from the
* set of integers 0 to n-1.
* Return combinations in comb_list.
*
* Example: For k = 2 and n = 4, there are six
* combinations:
* {0,1},{0,2},{0,3},{1,2},{1,3},{2,3}
*
*/
static void _gen_combs(int *comb_list, int n, int k)
{
int i, b;
int *comb = xmalloc(k * sizeof(int));
/* Setup comb for the initial combination */
for (i = 0; i < k; i++)
comb[i] = i;
b = 0;
/* Generate all the other combinations */
while (1) {
for (i = 0; i < k; i++) {
comb_list[b + i] = comb[i];
}
b += k;
i = k - 1;
++comb[i];
while ((i >= 0) && (comb[i] >= n - k + 1 + i)) {
--i;
++comb[i];
}
if (comb[0] > n - k)
break; /* No more combinations */
for (i = i + 1; i < k; ++i)
comb[i] = comb[i - 1] + 1;
}
xfree(comb);
}
/* qsort compare function for ascending int list */
static int _cmp_int_ascend(const void *a, const void *b)
{
return (*(int*)a - *(int*)b);
}
/* qsort compare function for descending int list */
static int _cmp_int_descend(const void *a, const void *b)
{
return (*(int*)b - *(int*)a);
}
/* qsort compare function for board combination socket list
* NOTE: sockets_core_cnt is a global symbol in this module */
static int _cmp_sock(const void *a, const void *b)
{
return (sockets_core_cnt[*(int*)b] - sockets_core_cnt[*(int*)a]);
}
/* Enable detailed logging of cr_dist() node and core bitmaps */
static inline void _log_select_maps(char *loc, job_record_t *job_ptr)
{
job_resources_t *job_res = job_ptr->job_resrcs;
char tmp[100];
int i;
if (!(select_debug_flags & DEBUG_FLAG_SELECT_TYPE))
return;
info("%s: %s %pJ", __func__, loc, job_ptr);
if (job_res->node_bitmap) {
bit_fmt(tmp, sizeof(tmp), job_res->node_bitmap);
info(" node_bitmap:%s", tmp);
}
if (job_res->core_bitmap) {
bit_fmt(tmp, sizeof(tmp), job_res->core_bitmap);
info(" core_bitmap:%s", tmp);
}
if (job_res->cpus) {
for (i = 0; i < job_res->nhosts; i++) {
info(" avail_cpus[%d]:%u", i,
job_res->cpus[i]);
}
}
if (job_res->tasks_per_node) {
for (i = 0; i < job_res->nhosts; i++) {
info(" tasks_per_node[%d]:%u", i,
job_res->tasks_per_node[i]);
}
}
}
/* Remove any specialized cores from those allocated to the job */
static void _clear_spec_cores(job_record_t *job_ptr,
bitstr_t **core_array)
{
int i, i_first, i_last;
int first_core, last_core;
int alloc_node = -1, alloc_core = -1, c;
job_resources_t *job_res = job_ptr->job_resrcs;
multi_core_data_t *mc_ptr = NULL;
bitstr_t *use_core_array = NULL;
if (job_ptr->details && job_ptr->details->mc_ptr)
mc_ptr = job_ptr->details->mc_ptr;
bit_set_all(job_res->core_bitmap);
i_first = bit_ffs(job_res->node_bitmap);
if (i_first != -1)
i_last = bit_fls(job_res->node_bitmap);
else
i_last = -2;
for (i = i_first; i <= i_last; i++) {
if (!bit_test(job_res->node_bitmap, i))
continue;
job_res->cpus[++alloc_node] = 0;
if (is_cons_tres) {
first_core = 0;
last_core = select_node_record[i].tot_cores;
use_core_array = core_array[i];
} else {
first_core = cr_get_coremap_offset(i);
last_core = cr_get_coremap_offset(i + 1);
use_core_array = *core_array;
}
for (c = first_core; c < last_core; c++) {
alloc_core++;
if (bit_test(use_core_array, c)) {
uint16_t tpc = select_node_record[i].vpus;
if (mc_ptr &&
(mc_ptr->threads_per_core != NO_VAL16) &&
(mc_ptr->threads_per_core < tpc))
tpc = mc_ptr->threads_per_core;
job_res->cpus[alloc_node] += tpc;
} else {
bit_clear(job_res->core_bitmap, alloc_core);
}
}
}
}
/* CPUs already selected for jobs, just distribute the tasks */
static int _set_task_dist(job_record_t *job_ptr)
{
uint32_t n, i, tid = 0, maxtasks;
uint16_t *avail_cpus;
job_resources_t *job_res = job_ptr->job_resrcs;
bool log_over_subscribe = true;
char *err_msg = NULL;
int plane_size = 1;
if (!is_cons_tres
|| !job_ptr->tres_per_task) /* Task layout for GRES not required */
return SLURM_SUCCESS;
if (!job_res)
err_msg = "job_res is NULL";
else if (!job_res->cpus)
err_msg = "job_res->cpus is NULL";
else if (!job_res->nhosts)
err_msg = "job_res->nhosts is zero";
if (err_msg) {
error("%s: %s: Invalid allocation for %pJ: %s",
plugin_type, __func__, job_ptr, err_msg);
return SLURM_ERROR;
}
if ((job_ptr->details->task_dist & SLURM_DIST_STATE_BASE) ==
SLURM_DIST_PLANE) {
if (job_ptr->details->mc_ptr)
plane_size = job_ptr->details->mc_ptr->plane_size;
if (plane_size <= 0) {
error("%s: %s: invalid plane_size", plugin_type, __func__);
return SLURM_ERROR;
}
}
i = job_res->nhosts * sizeof(uint16_t);
avail_cpus = xmalloc(i);
memcpy(avail_cpus, job_res->cpus, i);
job_res->tasks_per_node = xmalloc(i);
maxtasks = job_res->ncpus;
/* ncpus is already set the number of tasks if overcommit is used */
if (!job_ptr->details->overcommit &&
(job_ptr->details->cpus_per_task > 1)) {
if (job_ptr->details->ntasks_per_node == 0) {
maxtasks = maxtasks / job_ptr->details->cpus_per_task;
} else {
maxtasks = job_ptr->details->ntasks_per_node *
job_res->nhosts;
}
}
/*
* Safe guard if the user didn't specified a lower number of
* cpus than cpus_per_task or didn't specify the number.
*/
if (!maxtasks) {
error("%s: %s: changing task count from 0 to 1 for %pJ",
plugin_type, __func__, job_ptr);
maxtasks = 1;
}
if (job_ptr->details->cpus_per_task == 0)
job_ptr->details->cpus_per_task = 1;
/* First put one task on each node node */
for (n = 0; n < job_res->nhosts; n++) {
tid++;
job_res->tasks_per_node[n] = 1;
if (job_ptr->details->cpus_per_task > avail_cpus[n]) {
if (!job_ptr->details->overcommit) {
error("%s: %s: avail_cpus underflow on node %d for %pJ",
plugin_type, __func__, n, job_ptr);
}
avail_cpus[n] = 0;
} else {
avail_cpus[n] -= job_ptr->details->cpus_per_task;
}
}
/* Distrubute remaining tasks per plane size */
while (maxtasks > tid) {
uint32_t last_tid = tid;
for (n = 0; n < job_res->nhosts; n++) {
if (job_ptr->details->cpus_per_task > avail_cpus[n])
continue;
i = MAX(job_res->tasks_per_node[n] % plane_size, 1);
i = MIN(i,
avail_cpus[n] /job_ptr->details->cpus_per_task);
i = MIN(i, maxtasks - tid);
job_res->tasks_per_node[n] += i;
tid += i;
avail_cpus[n] -= (i * job_ptr->details->cpus_per_task);
}
if (last_tid == tid)
break;
}
/* If more tasks than resources, distribute them evenly */
if (!job_ptr->details->overcommit)
log_over_subscribe = true;
while (maxtasks > tid) {
if (log_over_subscribe) {
/*
* 'over_subscribe' is a relief valve that guards
* against an infinite loop, and it *should* never
* come into play because maxtasks should never be
* greater than the total number of available CPUs
*/
error("%s: %s: oversubscribe for %pJ",
plugin_type, __func__, job_ptr);
log_over_subscribe = false;
}
for (n = 0; n < job_res->nhosts; n++) {
i = MIN(plane_size, maxtasks - tid);
job_res->tasks_per_node[n] += i;
tid += i;
}
}
xfree(avail_cpus);
return SLURM_SUCCESS;
}
/* distribute blocks (planes) of tasks cyclically */
static int _compute_plane_dist(job_record_t *job_ptr,
uint32_t *gres_task_limit)
{
bool over_subscribe = false;
uint32_t n, i, p, tid, maxtasks, l;
uint16_t *avail_cpus, plane_size = 1;
job_resources_t *job_res = job_ptr->job_resrcs;
bool log_over_subscribe = true;
bool test_tres_tasks = true;
if (!job_res || !job_res->cpus || !job_res->nhosts) {
error("%s: %s: invalid allocation for %pJ",
plugin_type, __func__, job_ptr);
return SLURM_ERROR;
}
maxtasks = job_res->ncpus;
avail_cpus = job_res->cpus;
if (job_ptr->details->cpus_per_task > 1)
maxtasks = maxtasks / job_ptr->details->cpus_per_task;
if (job_ptr->details->mc_ptr)
plane_size = job_ptr->details->mc_ptr->plane_size;
if (plane_size <= 0) {
error("%s: %s: invalid plane_size", plugin_type, __func__);
return SLURM_ERROR;
}
if (!is_cons_tres)
test_tres_tasks = false;
job_res->cpus = xcalloc(job_res->nhosts, sizeof(uint16_t));
job_res->tasks_per_node = xcalloc(job_res->nhosts, sizeof(uint16_t));
if (job_ptr->details->overcommit)
log_over_subscribe = false;
for (tid = 0, i = 0; (tid < maxtasks); i++) { /* cycle counter */
bool space_remaining = false;
if (over_subscribe && log_over_subscribe) {
/*
* 'over_subscribe' is a relief valve that guards
* against an infinite loop, and it *should* never
* come into play because maxtasks should never be
* greater than the total number of available CPUs
*/
error("%s: %s: oversubscribe for %pJ",
plugin_type, __func__, job_ptr);
log_over_subscribe = false; /* Log once per job */;
}
for (n = 0; ((n < job_res->nhosts) && (tid < maxtasks)); n++) {
bool more_tres_tasks = false;
for (p = 0; p < plane_size && (tid < maxtasks); p++) {
if (is_cons_tres && test_tres_tasks &&
!dist_tasks_tres_tasks_avail(
gres_task_limit, job_res, n))
continue;
more_tres_tasks = true;
if ((job_res->cpus[n] < avail_cpus[n]) ||
over_subscribe) {
tid++;
job_res->tasks_per_node[n]++;
for (l = 0;
l details->cpus_per_task;
l++) {
if (job_res->cpus[n] <
avail_cpus[n])
job_res->cpus[n]++;
}
}
}
if (!more_tres_tasks)
test_tres_tasks = false;
if (job_res->cpus[n] < avail_cpus[n])
space_remaining = true;
}
if (!space_remaining)
over_subscribe = true;
}
xfree(avail_cpus);
return SLURM_SUCCESS;
}
/*
* sync up core bitmap arrays with job_resources_t struct using a best-fit
* approach on the available resources on each node
*
* "Best-fit" means:
* 1st priority: Use smallest number of boards with sufficient
* available resources
* 2nd priority: Use smallest number of sockets with sufficient
* available resources
* 3rd priority: Use board combination with the smallest number
* of available resources
* 4th priority: Use higher-numbered boards/sockets/cores first
*
* The job_resources_t struct can include threads based upon configuration
*/
static void _block_sync_core_bitmap(job_record_t *job_ptr,
const uint16_t cr_type)
{
uint32_t c, s, i, j, n, b, z, size, csize, core_cnt;
uint16_t cpus, num_bits, vpus = 1;
uint16_t cpus_per_task = job_ptr->details->cpus_per_task;
job_resources_t *job_res = job_ptr->job_resrcs;
bool alloc_cores = false, alloc_sockets = false;
uint16_t ntasks_per_core = 0xffff;
int tmp_cpt = 0;
int count, core_min, b_min, elig, s_min, comb_idx, sock_idx;
int elig_idx, comb_brd_idx, sock_list_idx, comb_min, board_num;
int sock_per_comb;
int *boards_core_cnt;
int *sort_brds_core_cnt;
int *board_combs;
int *socket_list;
int *elig_brd_combs;
int *elig_core_cnt;
bool *sockets_used;
uint16_t boards_nb;
uint16_t nboards_nb;
uint16_t sockets_nb;
uint16_t ncores_nb;
uint16_t nsockets_nb;
uint16_t sock_per_brd;
uint16_t req_cores,best_fit_cores = 0;
uint32_t best_fit_location = 0;
uint64_t ncomb_brd;
bool sufficient, best_fit_sufficient;
if (!job_res)
return;
if (!job_res->core_bitmap) {
error("%s: %s: core_bitmap for %pJ is NULL",
plugin_type, __func__, job_ptr);
return;
}
if (bit_ffs(job_res->core_bitmap) == -1) {
error("%s: %s: core_bitmap for %pJ has no bits set",
plugin_type, __func__, job_ptr);
return;
}
if (cr_type & CR_SOCKET)
alloc_sockets = true;
else if (cr_type & CR_CORE)
alloc_cores = true;
if (job_ptr->details->mc_ptr) {
multi_core_data_t *mc_ptr = job_ptr->details->mc_ptr;
if ((mc_ptr->ntasks_per_core != INFINITE16) &&
(mc_ptr->ntasks_per_core)) {
ntasks_per_core = mc_ptr->ntasks_per_core;
}
}
size = bit_size(job_res->node_bitmap);
csize = bit_size(job_res->core_bitmap);
sockets_nb = select_node_record[0].sockets;
sockets_core_cnt = xcalloc(sockets_nb, sizeof(int));
sockets_used = xcalloc(sockets_nb, sizeof(bool));
boards_nb = select_node_record[0].boards;
boards_core_cnt = xcalloc(boards_nb, sizeof(int));
sort_brds_core_cnt = xcalloc(boards_nb, sizeof(int));
for (c = 0, i = 0, n = 0; n < size; n++) {
if (!bit_test(job_res->node_bitmap, n))
continue;
core_cnt = 0;
ncores_nb = select_node_record[n].cores;
nsockets_nb = select_node_record[n].sockets;
nboards_nb = select_node_record[n].boards;
num_bits = nsockets_nb * ncores_nb;
if ((c + num_bits) > csize) {
error("%s: %s index error", plugin_type, __func__);
break;
}
cpus = job_res->cpus[i];
vpus = common_cpus_per_core(job_ptr->details, n);
/* compute still required cores on the node */
req_cores = cpus / vpus;
if (cpus % vpus)
req_cores++;
/*
* figure out core cnt if task requires more than one core and
* tasks_per_core is 1
*/
if ((ntasks_per_core == 1) &&
(cpus_per_task > vpus)) {
/* how many cores a task will consume */
int cores_per_task = (cpus_per_task + vpus - 1) / vpus;
int tasks = cpus / cpus_per_task;
req_cores = tasks * cores_per_task;
}
if (nboards_nb > MAX_BOARDS) {
debug3("%s: %s: node[%u]: exceeds max boards; "
"doing best-fit across sockets only",
plugin_type, __func__, n);
nboards_nb = 1;
}
if (nsockets_nb > sockets_nb) {
sockets_nb = nsockets_nb;
xrecalloc(sockets_core_cnt, sockets_nb, sizeof(int));
xrecalloc(sockets_used, sockets_nb, sizeof(bool));
}
if (nboards_nb > boards_nb) {
boards_nb = nboards_nb;
xrecalloc(boards_core_cnt, boards_nb, sizeof(int));
xrecalloc(sort_brds_core_cnt, boards_nb, sizeof(int));
}
/* Count available cores on each socket and board */
if (nsockets_nb >= nboards_nb) {
sock_per_brd = nsockets_nb / nboards_nb;
} else {
error("%s: %s: Node socket count lower than board count (%u < %u), %pJ node %s",
plugin_type, __func__, nsockets_nb, nboards_nb,
job_ptr, node_record_table_ptr[n].name);
sock_per_brd = 1;
}
for (b = 0; b < nboards_nb; b++) {
boards_core_cnt[b] = 0;
sort_brds_core_cnt[b] = 0;
}
for (s = 0; s < nsockets_nb; s++) {
sockets_core_cnt[s] = 0;
sockets_used[s] = false;
b = s / sock_per_brd;
for (j = c + (s * ncores_nb);
j < c + ((s+1) * ncores_nb); j++) {
if (bit_test(job_res->core_bitmap, j)) {
sockets_core_cnt[s]++;
boards_core_cnt[b]++;
sort_brds_core_cnt[b]++;
}
}
}
/* Sort boards in descending order of available core count */
qsort(sort_brds_core_cnt, nboards_nb, sizeof(int),
_cmp_int_descend);
/*
* Determine minimum number of boards required for the
* allocation (b_min)
*/
count = 0;
for (b = 0; b < nboards_nb; b++) {
count += sort_brds_core_cnt[b];
if (count >= req_cores)
break;
}
b_min = b + 1;
if (b_min > nboards_nb) {
char core_str[64];
bit_fmt(core_str, 64, job_res->core_bitmap);
error("%s: b_min > nboards_nb (%d > %u) node:%s core_bitmap:%s",
__func__, b_min, nboards_nb,
node_record_table_ptr[n].name, core_str);
break;
}
sock_per_comb = b_min * sock_per_brd;
/* Allocate space for list of board combinations */
ncomb_brd = comb_counts[nboards_nb-1][b_min-1];
board_combs = xcalloc(ncomb_brd * b_min, sizeof(int));
/* Generate all combinations of b_min boards on the node */
_gen_combs(board_combs, nboards_nb, b_min);
/*
* Determine which combinations have enough available cores
* for the allocation (eligible board combinations)
*/
elig_brd_combs = xcalloc(ncomb_brd, sizeof(int));
elig_core_cnt = xcalloc(ncomb_brd, sizeof(int));
elig = 0;
for (comb_idx = 0; comb_idx < ncomb_brd; comb_idx++) {
count = 0;
for (comb_brd_idx = 0; comb_brd_idx < b_min;
comb_brd_idx++) {
board_num = board_combs[(comb_idx * b_min)
+ comb_brd_idx];
count += boards_core_cnt[board_num];
}
if (count >= req_cores) {
elig_brd_combs[elig] = comb_idx;
elig_core_cnt[elig] = count;
elig++;
}
}
/*
* Allocate space for list of sockets for each eligible board
* combination
*/
socket_list = xcalloc(elig * sock_per_comb, sizeof(int));
/*
* Generate sorted list of sockets for each eligible board
* combination, and find combination with minimum number
* of sockets and minimum number of CPUs required for the
* allocation
*/
s_min = sock_per_comb;
comb_min = 0;
core_min = sock_per_comb * ncores_nb;
for (elig_idx = 0; elig_idx < elig; elig_idx++) {
comb_idx = elig_brd_combs[elig_idx];
for (comb_brd_idx = 0; comb_brd_idx < b_min;
comb_brd_idx++) {
board_num = board_combs[(comb_idx * b_min)
+ comb_brd_idx];
sock_list_idx = (elig_idx * sock_per_comb) +
(comb_brd_idx * sock_per_brd);
for (sock_idx = 0; sock_idx < sock_per_brd;
sock_idx++) {
socket_list[sock_list_idx + sock_idx]
= (board_num * sock_per_brd)
+ sock_idx;
}
}
/*
* Sort this socket list in descending order of
* available core count
*/
qsort(&socket_list[elig_idx*sock_per_comb],
sock_per_comb, sizeof (int), _cmp_sock);
/*
* Determine minimum number of sockets required for
* the allocation from this socket list
*/
count = 0;
for (b = 0; b < sock_per_comb; b++) {
sock_idx =
socket_list[(int)((elig_idx *
sock_per_comb) + b)];
count += sockets_core_cnt[sock_idx];
if (count >= req_cores)
break;
}
b++;
/*
* Use board combination with minimum number
* of required sockets and minimum number of CPUs
*/
if ((b < s_min) ||
((b == s_min) &&
(elig_core_cnt[elig_idx] <= core_min))) {
s_min = b;
comb_min = elig_idx;
core_min = elig_core_cnt[elig_idx];
}
}
if (select_debug_flags & DEBUG_FLAG_SELECT_TYPE) {
info("%s: %s: node[%u]: required CPUs:%u min req boards:%u,",
plugin_type, __func__, n, cpus, b_min);
info("%s: %s: node[%u]: min req sockets:%u min avail cores:%u",
plugin_type, __func__, n, s_min, core_min);
}
/*
* Re-sort socket list for best-fit board combination in
* ascending order of socket number
*/
qsort(&socket_list[comb_min * sock_per_comb], sock_per_comb,
sizeof (int), _cmp_int_ascend);
xfree(board_combs);
xfree(elig_brd_combs);
xfree(elig_core_cnt);
/*
* select cores from the sockets of the best-fit board
* combination using a best-fit approach
*/
tmp_cpt = cpus_per_task;
while (cpus > 0) {
best_fit_cores = 0;
best_fit_sufficient = false;
/* search for the socket with best fit */
for (z = 0; z < sock_per_comb; z++) {
s = socket_list[(comb_min*sock_per_comb)+z];
sufficient = sockets_core_cnt[s] >= req_cores;
if ((best_fit_cores == 0) ||
(sufficient && !best_fit_sufficient ) ||
(sufficient &&
(sockets_core_cnt[s] < best_fit_cores)) ||
(!sufficient &&
(sockets_core_cnt[s] > best_fit_cores))) {
best_fit_cores = sockets_core_cnt[s];
best_fit_location = s;
best_fit_sufficient = sufficient;
}
}
/* check that we have found a usable socket */
if (best_fit_cores == 0)
break;
j = best_fit_location;
if (sock_per_brd)
j /= sock_per_brd;
if (select_debug_flags & DEBUG_FLAG_SELECT_TYPE) {
info("%s: %s: using node[%u]: board[%u]: socket[%u]: %u cores available",
plugin_type, __func__, n, j,
best_fit_location,
sockets_core_cnt[best_fit_location]);
}
sockets_used[best_fit_location] = true;
for (j = (c + (best_fit_location * ncores_nb));
j < (c + ((best_fit_location + 1) * ncores_nb));
j++ ) {
/*
* if no more CPUs to select
* release remaining cores unless
* we are allocating whole sockets
*/
if (cpus == 0) {
if (alloc_sockets) {
bit_set(job_res->core_bitmap,
j);
core_cnt++;
} else {
bit_clear(job_res->core_bitmap,
j);
}
continue;
}
/*
* remove cores from socket count and
* cpus count using hyperthreading requirement
*/
if (bit_test(job_res->core_bitmap, j)) {
sockets_core_cnt[best_fit_location]--;
core_cnt++;
if (cpus < vpus)
cpus = 0;
else if ((ntasks_per_core == 1) &&
(cpus_per_task > vpus)) {
int used = MIN(tmp_cpt, vpus);
cpus -= used;
if (tmp_cpt <= used)
tmp_cpt = cpus_per_task;
else
tmp_cpt -= used;
} else {
cpus -= vpus;
}
} else if (alloc_sockets) {
/*
* If the core is not used, add it
* anyway if allocating whole sockets
*/
bit_set(job_res->core_bitmap, j);
core_cnt++;
}
}
/* loop again if more CPUs required */
if (cpus > 0)
continue;
/* release remaining cores of the unused sockets */
for (s = 0; s < nsockets_nb; s++) {
if (sockets_used[s])
continue;
bit_nclear(job_res->core_bitmap,
c + (s * ncores_nb),
c + ((s + 1) * ncores_nb) - 1);
}
}
xfree(socket_list);
if (cpus > 0) {
/*
* CPUs count should NEVER be greater than the number
* of set bits in the core bitmap for a given node
*/
error("%s: %s: CPUs computation error",
plugin_type, __func__);
break;
}
/* adjust cpus count of the current node */
if ((alloc_cores || alloc_sockets) &&
(select_node_record[n].vpus >= 1)) {
job_res->cpus[i] = core_cnt *
select_node_record[n].vpus;
}
i++;
/* move c to the next node in core_bitmap */
c += num_bits;
}
xfree(boards_core_cnt);
xfree(sort_brds_core_cnt);
xfree(sockets_core_cnt);
xfree(sockets_used);
}
/*
* Sync up the core_bitmap with the CPU array using cyclic distribution
*
* The CPU array contains the distribution of CPUs, which can include
* virtual CPUs (hyperthreads)
*/
static int _cyclic_sync_core_bitmap(job_record_t *job_ptr,
const uint16_t cr_type, bool preempt_mode)
{
uint32_t c, i, j, k, s, n;
uint32_t *sock_start, *sock_end, size, csize, core_cnt;
uint16_t cps = 0, cpus, vpus, sockets, sock_size, orig_cpu_cnt;
job_resources_t *job_res = job_ptr->job_resrcs;
bitstr_t *core_map;
bool *sock_used, *sock_avoid;
bool alloc_cores = false, alloc_sockets = false;
uint16_t ntasks_per_socket = 0xffff;
uint16_t ntasks_per_core = 0xffff;
int error_code = SLURM_SUCCESS;
int tmp_cpt = 0; /* cpus_per_task */
if ((job_res == NULL) || (job_res->core_bitmap == NULL) ||
(job_ptr->details == NULL))
return error_code;
if (cr_type & CR_SOCKET)
alloc_sockets = true;
else if (cr_type & CR_CORE)
alloc_cores = true;
core_map = job_res->core_bitmap;
if (job_ptr->details->mc_ptr) {
multi_core_data_t *mc_ptr = job_ptr->details->mc_ptr;
if ((mc_ptr->ntasks_per_core != INFINITE16) &&
(mc_ptr->ntasks_per_core)) {
ntasks_per_core = mc_ptr->ntasks_per_core;
}
if (mc_ptr->ntasks_per_socket)
ntasks_per_socket = mc_ptr->ntasks_per_socket;
}
sock_size = select_node_record[0].sockets;
sock_avoid = xcalloc(sock_size, sizeof(bool));
sock_start = xcalloc(sock_size, sizeof(uint32_t));
sock_end = xcalloc(sock_size, sizeof(uint32_t));
sock_used = xcalloc(sock_size, sizeof(bool));
size = bit_size(job_res->node_bitmap);
csize = bit_size(core_map);
for (c = 0, i = 0, n = 0; n < size; n++) {
if (bit_test(job_res->node_bitmap, n) == 0)
continue;
sockets = select_node_record[n].sockets;
cps = select_node_record[n].cores;
vpus = common_cpus_per_core(job_ptr->details, n);
if (select_debug_flags & DEBUG_FLAG_SELECT_TYPE) {
info("%s: %s: %pJ node %s vpus %u cpus %u",
plugin_type, __func__, job_ptr,
select_node_record[n].node_ptr->name,
vpus, job_res->cpus[i]);
}
if ((c + (sockets * cps)) > csize) {
error("%s: %s: index error", plugin_type, __func__);
break;
}
if (sockets > sock_size) {
sock_size = sockets;
xrecalloc(sock_avoid, sock_size, sizeof(bool));
xrecalloc(sock_start, sock_size, sizeof(uint32_t));
xrecalloc(sock_end, sock_size, sizeof(uint32_t));
xrecalloc(sock_used, sock_size, sizeof(bool));
}
for (s = 0; s < sockets; s++) {
sock_start[s] = c + (s * cps);
sock_end[s] = sock_start[s] + cps;
sock_avoid[s] = false;
sock_used[s] = false;
}
core_cnt = 0;
cpus = job_res->cpus[i];
if (ntasks_per_socket != 0xffff) {
int x_cpus, cpus_per_socket;
uint32_t total_cpus = 0;
uint32_t *cpus_cnt;
cpus_per_socket = ntasks_per_socket *
job_ptr->details->cpus_per_task;
cpus_cnt = xmalloc(sizeof(uint32_t) * sockets);
for (s = 0; s < sockets; s++) {
for (j = sock_start[s]; j < sock_end[s]; j++) {
if (bit_test(core_map, j))
cpus_cnt[s] += vpus;
}
total_cpus += cpus_cnt[s];
}
for (s = 0; s < sockets && total_cpus > cpus; s++) {
if (cpus_cnt[s] > cpus_per_socket) {
x_cpus = cpus_cnt[s] - cpus_per_socket;
cpus_cnt[s] = cpus_per_socket;
total_cpus -= x_cpus;
}
}
for (s = 0; s < sockets && total_cpus > cpus; s++) {
if ((cpus_cnt[s] <= cpus_per_socket) &&
(total_cpus - cpus_cnt[s] >= cpus)) {
sock_avoid[s] = true;
total_cpus -= cpus_cnt[s];
}
}
xfree(cpus_cnt);
} else if (job_ptr->details->cpus_per_task > 1) {
/* Try to pack all CPUs of each tasks on one socket. */
uint32_t *cpus_cnt, cpus_per_task;
cpus_per_task = job_ptr->details->cpus_per_task;
cpus_cnt = xmalloc(sizeof(uint32_t) * sockets);
for (s = 0; s < sockets; s++) {
for (j = sock_start[s]; j < sock_end[s]; j++) {
if (bit_test(core_map, j))
cpus_cnt[s] += vpus;
}
cpus_cnt[s] -= (cpus_cnt[s] % cpus_per_task);
}
tmp_cpt = cpus_per_task;
for (s = 0; ((s < sockets) && (cpus > 0)); s++) {
while ((sock_start[s] < sock_end[s]) &&
(cpus_cnt[s] > 0) && (cpus > 0)) {
if (bit_test(core_map, sock_start[s])) {
int used;
sock_used[s] = true;
core_cnt++;
if ((ntasks_per_core == 1) &&
(cpus_per_task > vpus)) {
used = MIN(tmp_cpt,
vpus);
if (tmp_cpt <= used)
tmp_cpt = cpus_per_task;
else
tmp_cpt -= used;
} else
used = vpus;
if (cpus_cnt[s] < vpus)
cpus_cnt[s] = 0;
else
cpus_cnt[s] -= used;
if (cpus < vpus)
cpus = 0;
else
cpus -= used;
}
sock_start[s]++;
}
}
xfree(cpus_cnt);
}
orig_cpu_cnt = cpus;
while (cpus > 0) {
uint16_t prev_cpus = cpus;
for (s = 0; s < sockets && cpus > 0; s++) {
if (sock_avoid[s])
continue;
while (sock_start[s] < sock_end[s]) {
if (bit_test(core_map, sock_start[s])) {
sock_used[s] = true;
core_cnt++;
break;
} else
sock_start[s]++;
}
if (sock_start[s] == sock_end[s])
/* this socket is unusable */
continue;
if (cpus < vpus)
cpus = 0;
else
cpus -= vpus;
sock_start[s]++;
}
if (prev_cpus != cpus)
continue;
/* FIXME: verify this isn't needed for cons_res */
if (is_cons_tres && job_ptr->details->overcommit) {
/* We've got all the CPUs that we need */
break;
}
if (!preempt_mode) {
/* we're stuck! */
char *core_str = NULL, *sock_str = NULL, *sep;
for (j = 0, k = c; j < (cps * sockets);
j++, k++) {
if (!bit_test(core_map, k))
continue;
if (core_str)
sep = ",";
else
sep = "";
xstrfmtcat(core_str, "%s%d", sep, j);
}
if (!core_str)
core_str = xstrdup("NONE");
for (s = 0; s < sockets; s++) {
if (!sock_avoid[s])
continue;
if (sock_str)
sep = ",";
else
sep = "";
xstrfmtcat(sock_str, "%s%d", sep, s);
}
if (!sock_str)
sock_str = xstrdup("NONE");
job_ptr->priority = 0;
job_ptr->state_reason = WAIT_HELD;
error("%s: %s: sync loop not progressing, holding %pJ, "
"tried to use %u CPUs on node %s core_map:%s avoided_sockets:%s vpus:%u",
plugin_type, __func__, job_ptr,
orig_cpu_cnt,
select_node_record[n].node_ptr->name,
core_str, sock_str, vpus);
xfree(core_str);
xfree(sock_str);
}
error_code = SLURM_ERROR;
goto fini;
}
/*
* clear the rest of the cores in each socket
* FIXME: do we need min_core/min_socket checks here?
*/
for (s = 0; s < sockets; s++) {
if (sock_start[s] == sock_end[s])
continue;
if (!alloc_sockets || !sock_used[s]) {
bit_nclear(core_map, sock_start[s],
sock_end[s]-1);
}
if ((select_node_record[n].vpus >= 1) &&
(alloc_sockets || alloc_cores) && sock_used[s]) {
for (j = sock_start[s]; j < sock_end[s]; j++) {
/* Mark all cores as used */
if (alloc_sockets)
bit_set(core_map, j);
if (bit_test(core_map, j))
core_cnt++;
}
}
}
if ((alloc_cores || alloc_sockets) &&
(select_node_record[n].vpus >= 1)) {
job_res->cpus[i] = core_cnt *
select_node_record[n].vpus;
}
i++;
/* advance 'c' to the beginning of the next node */
c += sockets * cps;
}
fini: xfree(sock_avoid);
xfree(sock_start);
xfree(sock_end);
xfree(sock_used);
return error_code;
}
/*
* To effectively deal with heterogeneous nodes, we fake a cyclic
* distribution to figure out how many cores are needed on each node.
*
* This routine is a slightly modified "version" of the routine
* _task_layout_block in src/common/dist_tasks.c. We do not need to
* assign tasks to job->hostid[] and job->tids[][] at this point so
* the core allocation is the same for cyclic and block.
*
* For the consumable resources support we need to determine what
* "node/Core/thread"-tuplets will be allocated for a given job.
* In the past we assumed that we only allocated one task per PU
* (processing unit, the lowest allocatable logical processor,
* core or thread depending upon configuration) and didn't allow
* the use of overcommit. We have changed this philosophy and are now
* allowing people to overcommit their resources and expect the system
* administrator to enable the task/affinity plug-in which will then
* bind all of a job's tasks to its allocated resources thereby
* avoiding interference between co-allocated running jobs.
*
* In the consumable resources environment we need to determine the
* layout schema within slurmctld.
*
* We have a core_bitmap of all available cores. All we're doing here
* is removing cores that are not needed based on the task count, and
* the choice of cores to remove is based on the distribution:
* - "cyclic" removes cores "evenly", starting from the last socket,
* - "block" removes cores from the "last" socket(s)
* - "plane" removes cores "in chunks"
*
* IN job_ptr - job to be allocated resources
* IN cr_type - allocation type (sockets, cores, etc.)
* IN preempt_mode - true if testing with simulated preempted jobs
* IN core_array - system-wide bitmap of cores originally available to
* the job, only used to identify specialized cores
* IN gres_task_limit - array of task limits based upon job GRES specification,
* offset based upon bits set in job_ptr->job_resrcs->node_bitmap
*/
extern int dist_tasks(job_record_t *job_ptr, const uint16_t cr_type,
bool preempt_mode, bitstr_t **core_array,
uint32_t *gres_task_limit)
{
int error_code;
bool one_task_per_node = false;
xassert(*cons_common_callbacks.dist_tasks_compute_c_b);
/*
* Zero size jobs are supported for the creation and deletion of
* persistent burst buffers.
*/
if (job_ptr->details->min_nodes == 0)
return SLURM_SUCCESS;
if (job_ptr->details->core_spec != NO_VAL16) {
/*
* The job has been allocated all non-specialized cores.
* Just set the task distribution for tres_per_task support.
*/
error_code = _set_task_dist(job_ptr);
if (error_code != SLURM_SUCCESS)
return error_code;
return SLURM_SUCCESS;
}
if ((job_ptr->job_resrcs->node_req == NODE_CR_RESERVED) ||
(job_ptr->details->whole_node == 1)) {
/*
* The job has been allocated an EXCLUSIVE set of nodes,
* so it gets all of the bits in the core_array except for
* specialized cores. Set the task distribution for
* tres_per_task support.
*/
_clear_spec_cores(job_ptr, core_array);
error_code = _set_task_dist(job_ptr);
if (error_code != SLURM_SUCCESS)
return error_code;
return SLURM_SUCCESS;
}
if (job_ptr->details->overcommit && (job_ptr->tres_per_task == 0))
one_task_per_node = true;
_log_select_maps("cr_dist/start", job_ptr);
if (((job_ptr->details->task_dist & SLURM_DIST_STATE_BASE) ==
SLURM_DIST_PLANE) && !one_task_per_node) {
/* Perform plane distribution on the job_resources_t struct */
error_code = _compute_plane_dist(job_ptr, gres_task_limit);
if (error_code != SLURM_SUCCESS)
return error_code;
} else {
/* Perform cyclic distribution on the job_resources_t struct */
error_code = (*cons_common_callbacks.dist_tasks_compute_c_b)(
job_ptr, gres_task_limit);
if (error_code != SLURM_SUCCESS)
return error_code;
}
_log_select_maps("cr_dist/middle", job_ptr);
/*
* now sync up the core_bitmap with the job_resources_t struct
* based on the given distribution AND resource setting
*/
if (!(cr_type & CR_CORE) && !(cr_type & CR_SOCKET)) {
_block_sync_core_bitmap(job_ptr, cr_type);
return SLURM_SUCCESS;
}
/*
* If SelectTypeParameters mentions to use a block distribution for
* cores by default, use that kind of distribution if no particular
* cores distribution specified.
* Note : cyclic cores distribution, which is the default, is treated
* by the next code block
*/
if (slurmctld_conf.select_type_param & CR_CORE_DEFAULT_DIST_BLOCK) {
switch (job_ptr->details->task_dist & SLURM_DIST_NODEMASK) {
case SLURM_DIST_ARBITRARY:
case SLURM_DIST_BLOCK:
case SLURM_DIST_CYCLIC:
case SLURM_DIST_UNKNOWN:
_block_sync_core_bitmap(job_ptr, cr_type);
return SLURM_SUCCESS;
}
}
/*
* Determine the number of logical processors per node needed
* for this job. Make sure below matches the layouts in
* lllp_distribution in plugins/task/affinity/dist_task.c (FIXME)
*/
switch (job_ptr->details->task_dist & SLURM_DIST_NODESOCKMASK) {
case SLURM_DIST_BLOCK_BLOCK:
case SLURM_DIST_CYCLIC_BLOCK:
case SLURM_DIST_PLANE:
_block_sync_core_bitmap(job_ptr, cr_type);
break;
case SLURM_DIST_ARBITRARY:
case SLURM_DIST_BLOCK:
case SLURM_DIST_CYCLIC:
case SLURM_DIST_BLOCK_CYCLIC:
case SLURM_DIST_CYCLIC_CYCLIC:
case SLURM_DIST_BLOCK_CFULL:
case SLURM_DIST_CYCLIC_CFULL:
case SLURM_DIST_UNKNOWN:
error_code = _cyclic_sync_core_bitmap(job_ptr, cr_type,
preempt_mode);
break;
default:
error("%s: %s: invalid task_dist entry",
plugin_type, __func__);
return SLURM_ERROR;
}
_log_select_maps("cr_dist/fini", job_ptr);
return error_code;
}
/* Return true if more tasks can be allocated for this job on this node */
extern bool dist_tasks_tres_tasks_avail(uint32_t *gres_task_limit,
job_resources_t *job_res,
uint32_t node_offset)
{
if (!gres_task_limit || !job_res)
return true;
if (gres_task_limit[node_offset] > job_res->tasks_per_node[node_offset])
return true;
return false;
}