/*****************************************************************************\ * salloc.c - Request a Slurm job allocation and * launch a user-specified command. ***************************************************************************** * Copyright (C) 2006-2007 The Regents of the University of California. * Copyright (C) 2008-2010 Lawrence Livermore National Security. * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). * Written by Christopher J. Morrone * CODE-OCEC-09-009. All rights reserved. * * This file is part of Slurm, a resource management program. * For details, see . * Please also read the included file: DISCLAIMER. * * Slurm is free software; you can redistribute it and/or modify it under * the terms of the GNU General Public License as published by the Free * Software Foundation; either version 2 of the License, or (at your option) * any later version. * * In addition, as a special exception, the copyright holders give permission * to link the code of portions of this program with the OpenSSL library under * certain conditions as described in each individual source file, and * distribute linked combinations including the two. You must obey the GNU * General Public License in all respects for all of the code used other than * OpenSSL. If you modify file(s) with this exception, you may extend this * exception to your version of the file(s), but you are not obligated to do * so. If you do not wish to do so, delete this exception statement from your * version. If you delete this exception statement from all source files in * the program, then also delete it here. * * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more * details. * * You should have received a copy of the GNU General Public License along * with Slurm; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. \*****************************************************************************/ #include "config.h" #include #include #include #include #include #include #include #include #include /* for struct rlimit */ #include #include #include #include #include #include "slurm/slurm.h" #include "src/common/cli_filter.h" #include "src/common/cpu_frequency.h" #include "src/common/env.h" #include "src/common/node_select.h" #include "src/common/plugstack.h" #include "src/common/proc_args.h" #include "src/common/read_config.h" #include "src/common/slurm_auth.h" #include "src/common/slurm_rlimits_info.h" #include "src/common/slurm_time.h" #include "src/common/uid.h" #include "src/common/xmalloc.h" #include "src/common/xsignal.h" #include "src/common/xstring.h" #include "src/salloc/salloc.h" #include "src/salloc/opt.h" #ifndef __USE_XOPEN_EXTENDED extern pid_t getpgid(pid_t pid); #endif #define MAX_RETRIES 10 #define POLL_SLEEP 3 /* retry interval in seconds */ char **command_argv; int command_argc; pid_t command_pid = -1; uint64_t debug_flags = 0; char *work_dir = NULL; static int is_interactive; enum possible_allocation_states allocation_state = NOT_GRANTED; pthread_cond_t allocation_state_cond = PTHREAD_COND_INITIALIZER; pthread_mutex_t allocation_state_lock = PTHREAD_MUTEX_INITIALIZER; static bool allocation_interrupted = false; static bool allocation_revoked = false; static bool exit_flag = false; static bool is_het_job = false; static bool suspend_flag = false; static uint32_t my_job_id = 0; static time_t last_timeout = 0; static struct termios saved_tty_attributes; static int het_job_limit = 0; static bool _cli_filter_post_submit_run = false; static void _exit_on_signal(int signo); static int _fill_job_desc_from_opts(job_desc_msg_t *desc); static pid_t _fork_command(char **command); static void _forward_signal(int signo); static void _job_complete_handler(srun_job_complete_msg_t *msg); static void _job_suspend_handler(suspend_msg_t *msg); static void _match_job_name(job_desc_msg_t *desc_last, List job_req_list); static void _node_fail_handler(srun_node_fail_msg_t *msg); static void _pending_callback(uint32_t job_id); static void _ping_handler(srun_ping_msg_t *msg); static int _proc_alloc(resource_allocation_response_msg_t *alloc); static void _ring_terminal_bell(void); static int _set_cluster_name(void *x, void *arg); static void _set_exit_code(void); static void _set_rlimits(char **env); static void _set_spank_env(void); static void _set_submit_dir_env(void); static void _signal_while_allocating(int signo); static void _timeout_handler(srun_timeout_msg_t *msg); static void _user_msg_handler(srun_user_msg_t *msg); static int _wait_nodes_ready(resource_allocation_response_msg_t *alloc); static void _salloc_cli_filter_post_submit(uint32_t jobid, uint32_t stepid); bool salloc_shutdown = false; /* Signals that are considered terminal before resource allocation. */ int sig_array[] = { SIGHUP, SIGINT, SIGQUIT, SIGPIPE, SIGTERM, SIGUSR1, SIGUSR2, 0 }; static void _reset_input_mode (void) { /* SIGTTOU needs to be blocked per the POSIX spec: * http://pubs.opengroup.org/onlinepubs/009695399/functions/tcsetattr.html */ int sig_block[] = { SIGTTOU, SIGTTIN, 0 }; xsignal_block (sig_block); tcsetattr (STDIN_FILENO, TCSANOW, &saved_tty_attributes); /* If salloc was run as interactive, with job control, reset the * foreground process group of the terminal to the process group of * the parent pid before exiting */ if (is_interactive) tcsetpgrp(STDIN_FILENO, getpgid(getppid())); } static int _set_cluster_name(void *x, void *arg) { job_desc_msg_t *desc = (job_desc_msg_t *) x; desc->origin_cluster = xstrdup(slurmctld_conf.cluster_name); return 0; } int main(int argc, char **argv) { static bool env_cache_set = false; log_options_t logopt = LOG_OPTS_STDERR_ONLY; job_desc_msg_t *desc = NULL, *first_job = NULL; List job_req_list = NULL, job_resp_list = NULL; resource_allocation_response_msg_t *alloc = NULL; time_t before, after; allocation_msg_thread_t *msg_thr = NULL; char **env = NULL, *cluster_name; int status = 0; int retries = 0; pid_t pid = getpid(); pid_t tpgid = 0; pid_t rc_pid = 0; int i, j, rc = 0; uint32_t num_tasks = 0; bool het_job_fini = false; int het_job_argc, het_job_inx, het_job_argc_off; char **het_job_argv; static char *msg = "Slurm job queue full, sleeping and retrying."; slurm_allocation_callbacks_t callbacks; ListIterator iter_req, iter_resp; slurm_conf_init(NULL); debug_flags = slurm_get_debug_flags(); log_init(xbasename(argv[0]), logopt, 0, NULL); _set_exit_code(); if (spank_init_allocator() < 0) { error("Failed to initialize plugin stack"); exit(error_exit); } /* Be sure to call spank_fini when salloc exits */ if (atexit((void (*) (void)) spank_fini) < 0) error("Failed to register atexit handler for plugins: %m"); het_job_argc = argc; het_job_argv = argv; for (het_job_inx = 0; !het_job_fini; het_job_inx++) { het_job_argc_off = -1; if (initialize_and_process_args(het_job_argc, het_job_argv, &het_job_argc_off, het_job_inx) < 0) { error("salloc parameter parsing"); exit(error_exit); } if ((het_job_argc_off >= 0) && (het_job_argc_off < het_job_argc) && !xstrcmp(het_job_argv[het_job_argc_off], ":")) { /* het_job_argv[0] moves from "salloc" to ":" */ het_job_argc -= het_job_argc_off; het_job_argv += het_job_argc_off; } else het_job_fini = true; /* reinit log with new verbosity (if changed by command line) */ if (opt.verbose || opt.quiet) { logopt.stderr_level += opt.verbose; logopt.stderr_level -= opt.quiet; logopt.prefix_level = 1; log_alter(logopt, 0, NULL); } if (spank_init_post_opt() < 0) { error("Plugin stack post-option processing failed"); exit(error_exit); } _set_spank_env(); if (het_job_inx == 0) _set_submit_dir_env(); if (opt.chdir && chdir(opt.chdir)) { error("chdir(%s): %m", opt.chdir); exit(error_exit); } if ((opt.get_user_env_time >= 0) && !env_cache_set) { bool no_env_cache = false; char *sched_params; char *user; env_cache_set = true; if (!(user = uid_to_string_or_null(opt.uid))) { error("Invalid user id %u: %m", (uint32_t) opt.uid); exit(error_exit); } sched_params = slurm_get_sched_params(); if (xstrcasestr(sched_params, "no_env_cache")) no_env_cache = true; xfree(sched_params); env = env_array_user_default(user, opt.get_user_env_time, opt.get_user_env_mode, no_env_cache); xfree(user); if (env == NULL) exit(error_exit); /* error already logged */ _set_rlimits(env); } if (desc && !job_req_list) { job_req_list = list_create(NULL); list_append(job_req_list, desc); } desc = xmalloc(sizeof(job_desc_msg_t)); slurm_init_job_desc_msg(desc); if (_fill_job_desc_from_opts(desc) == -1) exit(error_exit); if (het_job_inx || !het_job_fini) set_env_from_opts(&opt, &env, het_job_inx); else set_env_from_opts(&opt, &env, -1); if (job_req_list) list_append(job_req_list, desc); if (!first_job) first_job = desc; } het_job_limit = het_job_inx; if (!desc) { fatal("%s: desc is NULL", __func__); exit(error_exit); /* error already logged */ } _match_job_name(desc, job_req_list); /* * Job control for interactive salloc sessions: only if ... * * a) input is from a terminal (stdin has valid termios attributes), * b) controlling terminal exists (non-negative tpgid), * c) salloc is not run in allocation-only (--no-shell) mode, * NOTE: d and e below are configuration dependent * d) salloc runs in its own process group (true in interactive * shells that support job control), * e) salloc has been configured at compile-time to support background * execution and is not currently in the background process group. */ if (tcgetattr(STDIN_FILENO, &saved_tty_attributes) < 0) { /* * Test existence of controlling terminal (tpgid > 0) * after first making sure stdin is not redirected. */ } else if ((tpgid = tcgetpgrp(STDIN_FILENO)) < 0) { if (!saopt.no_shell) { error("no controlling terminal: please set --no-shell"); exit(error_exit); } #ifdef SALLOC_RUN_FOREGROUND } else if ((!saopt.no_shell) && (pid == getpgrp())) { if (tpgid == pid) is_interactive = true; while (tcgetpgrp(STDIN_FILENO) != pid) { if (!is_interactive) { error("Waiting for program to be placed in " "the foreground"); is_interactive = true; } killpg(pid, SIGTTIN); } } #else } else if ((!saopt.no_shell) && (getpgrp() == tcgetpgrp(STDIN_FILENO))) { is_interactive = true; } #endif /* * Reset saved tty attributes at exit, in case a child * process died before properly resetting terminal. */ if (is_interactive) atexit(_reset_input_mode); if (opt.gid != getgid()) { if (setgid(opt.gid) < 0) { error("setgid: %m"); exit(error_exit); } } /* If can run on multiple clusters find the earliest run time * and run it there */ if (opt.clusters) { if (job_req_list) { rc = slurmdb_get_first_het_job_cluster(job_req_list, opt.clusters, &working_cluster_rec); } else { rc = slurmdb_get_first_avail_cluster(desc, opt.clusters, &working_cluster_rec); } if (rc != SLURM_SUCCESS) { print_db_notok(opt.clusters, 0); exit(error_exit); } } if (job_req_list) (void) list_for_each(job_req_list, _set_cluster_name, NULL); else desc->origin_cluster = xstrdup(slurmctld_conf.cluster_name); callbacks.ping = _ping_handler; callbacks.timeout = _timeout_handler; callbacks.job_complete = _job_complete_handler; callbacks.job_suspend = _job_suspend_handler; callbacks.user_msg = _user_msg_handler; callbacks.node_fail = _node_fail_handler; /* * Create message thread to handle pings and such from slurmctld. * salloc --no-shell jobs aren't interactive, so they won't respond to * srun_ping(), so we don't want to kill it after InactiveLimit seconds. * Not creating this thread will leave other_port == 0, and will * prevent slurmctld from killing the salloc --no-shell job. */ if (!saopt.no_shell) msg_thr = slurm_allocation_msg_thr_create(&first_job->other_port, &callbacks); /* NOTE: Do not process signals in separate pthread. The signal will * cause slurm_allocate_resources_blocking() to exit immediately. */ for (i = 0; sig_array[i]; i++) xsignal(sig_array[i], _signal_while_allocating); before = time(NULL); while (true) { if (job_req_list) { is_het_job = true; job_resp_list = slurm_allocate_het_job_blocking( job_req_list, opt.immediate, _pending_callback); if (job_resp_list) break; } else { alloc = slurm_allocate_resources_blocking(desc, opt.immediate, _pending_callback); if (alloc) break; } if (((errno != ESLURM_ERROR_ON_DESC_TO_RECORD_COPY) && (errno != EAGAIN)) || (retries >= MAX_RETRIES)) break; if (retries == 0) error("%s", msg); else debug("%s", msg); sleep(++retries); } /* If the requested uid is different than ours, become that uid */ if (getuid() != opt.uid) { /* drop extended groups before changing uid/gid */ if ((setgroups(0, NULL) < 0)) { error("setgroups: %m"); exit(error_exit); } if (setuid(opt.uid) < 0) { error("setuid: %m"); exit(error_exit); } } if (!alloc && !job_resp_list) { if (allocation_interrupted) { /* cancelled by signal */ info("Job aborted due to signal"); } else if (errno == EINTR) { error("Interrupted by signal. Allocation request rescinded."); } else if (opt.immediate && ((errno == ETIMEDOUT) || (errno == ESLURM_NOT_TOP_PRIORITY) || (errno == ESLURM_NODES_BUSY))) { error("Unable to allocate resources: %m"); error_exit = immediate_exit; } else { error("Job submit/allocate failed: %m"); } if (msg_thr) slurm_allocation_msg_thr_destroy(msg_thr); exit(error_exit); } else if (job_resp_list && !allocation_interrupted) { /* Allocation granted to regular job */ i = 0; iter_resp = list_iterator_create(job_resp_list); while ((alloc = list_next(iter_resp))) { if (i == 0) { my_job_id = alloc->job_id; info("Granted job allocation %u", my_job_id); } if (debug_flags & DEBUG_FLAG_HETJOB) { info("Hetjob ID %u+%u (%u) on nodes %s", my_job_id, i, alloc->job_id, alloc->node_list); } i++; if (_proc_alloc(alloc) != SLURM_SUCCESS) { list_iterator_destroy(iter_resp); goto relinquish; } } list_iterator_destroy(iter_resp); } else if (!allocation_interrupted) { /* Allocation granted to regular job */ my_job_id = alloc->job_id; print_multi_line_string(alloc->job_submit_user_msg, -1, LOG_LEVEL_INFO); info("Granted job allocation %u", my_job_id); if (_proc_alloc(alloc) != SLURM_SUCCESS) goto relinquish; } _salloc_cli_filter_post_submit(my_job_id, NO_VAL); after = time(NULL); if ((saopt.bell == BELL_ALWAYS) || ((saopt.bell == BELL_AFTER_DELAY) && ((after - before) > DEFAULT_BELL_DELAY))) { _ring_terminal_bell(); } if (saopt.no_shell) exit(0); if (allocation_interrupted) { /* salloc process received a signal after * slurm_allocate_resources_blocking returned with the * allocation, but before the new signal handlers were * registered. */ goto relinquish; } /* * Set environment variables */ if (job_resp_list) { bool num_tasks_always_set = true; i = list_count(job_req_list); j = list_count(job_resp_list); if (i != j) { error("Job component count mismatch, submit/response " "count mismatch (%d != %d)", i, j); goto relinquish; } /* Continue support for old hetjob terminology. */ env_array_append_fmt(&env, "SLURM_PACK_SIZE", "%d", i); env_array_append_fmt(&env, "SLURM_HET_SIZE", "%d", i); i = 0; iter_req = list_iterator_create(job_req_list); iter_resp = list_iterator_create(job_resp_list); while ((desc = list_next(iter_req))) { alloc = list_next(iter_resp); if (alloc && desc && (desc->bitflags & JOB_NTASKS_SET)) { if (desc->ntasks_per_node != NO_VAL16) desc->num_tasks = alloc->node_cnt * desc->ntasks_per_node; else if (alloc->node_cnt > desc->num_tasks) desc->num_tasks = alloc->node_cnt; } if ((desc->num_tasks != NO_VAL) && num_tasks_always_set) num_tasks += desc->num_tasks; else { num_tasks = 0; num_tasks_always_set = false; } if (env_array_for_job(&env, alloc, desc, i++) != SLURM_SUCCESS) goto relinquish; } list_iterator_destroy(iter_resp); list_iterator_destroy(iter_req); } else { if (alloc && desc && (desc->bitflags & JOB_NTASKS_SET)) { if (desc->ntasks_per_node != NO_VAL16) desc->num_tasks = alloc->node_cnt * desc->ntasks_per_node; else if (alloc->node_cnt > desc->num_tasks) desc->num_tasks = alloc->node_cnt; } if (desc->num_tasks != NO_VAL) num_tasks += desc->num_tasks; if (env_array_for_job(&env, alloc, desc, -1) != SLURM_SUCCESS) goto relinquish; } if (num_tasks) { env_array_append_fmt(&env, "SLURM_NTASKS", "%d", num_tasks); env_array_append_fmt(&env, "SLURM_NPROCS", "%d", num_tasks); } if (working_cluster_rec && working_cluster_rec->name) { env_array_append_fmt(&env, "SLURM_CLUSTER_NAME", "%s", working_cluster_rec->name); } else if ((cluster_name = slurm_get_cluster_name())) { env_array_append_fmt(&env, "SLURM_CLUSTER_NAME", "%s", cluster_name); xfree(cluster_name); } env_array_set_environment(env); env_array_free(env); slurm_mutex_lock(&allocation_state_lock); if (allocation_state == REVOKED) { error("Allocation was revoked for job %u before command could be run", my_job_id); slurm_cond_broadcast(&allocation_state_cond); slurm_mutex_unlock(&allocation_state_lock); if (slurm_complete_job(my_job_id, status) != 0) { error("Unable to clean up allocation for job %u: %m", my_job_id); } return 1; } allocation_state = GRANTED; slurm_cond_broadcast(&allocation_state_cond); slurm_mutex_unlock(&allocation_state_lock); /* Ensure that salloc has initial terminal foreground control. */ if (is_interactive) { /* * Ignore remaining job-control signals (other than those in * sig_array, which at this state act like SIG_IGN). */ xsignal(SIGTSTP, SIG_IGN); xsignal(SIGTTIN, SIG_IGN); xsignal(SIGTTOU, SIG_IGN); pid = getpid(); setpgid(pid, pid); tcsetpgrp(STDIN_FILENO, pid); } slurm_mutex_lock(&allocation_state_lock); if (suspend_flag) slurm_cond_wait(&allocation_state_cond, &allocation_state_lock); command_pid = _fork_command(command_argv); slurm_cond_broadcast(&allocation_state_cond); slurm_mutex_unlock(&allocation_state_lock); /* * Wait for command to exit, OR for waitpid to be interrupted by a * signal. Either way, we are going to release the allocation next. */ if (command_pid > 0) { setpgid(command_pid, command_pid); if (is_interactive) tcsetpgrp(STDIN_FILENO, command_pid); /* NOTE: Do not process signals in separate pthread. * The signal will cause waitpid() to exit immediately. */ xsignal(SIGHUP, _exit_on_signal); /* Use WUNTRACED to treat stopped children like terminated ones */ do { rc_pid = waitpid(command_pid, &status, WUNTRACED); } while (WIFSTOPPED(status) || ((rc_pid == -1) && (!exit_flag))); if ((rc_pid == -1) && (errno != EINTR)) error("waitpid for %s failed: %m", command_argv[0]); } if (is_interactive) tcsetpgrp(STDIN_FILENO, pid); /* * Relinquish the job allocation (if not already revoked). */ relinquish: slurm_mutex_lock(&allocation_state_lock); if (allocation_state != REVOKED) { slurm_mutex_unlock(&allocation_state_lock); info("Relinquishing job allocation %u", my_job_id); if ((slurm_complete_job(my_job_id, status) != 0) && (slurm_get_errno() != ESLURM_ALREADY_DONE)) error("Unable to clean up job allocation %u: %m", my_job_id); slurm_mutex_lock(&allocation_state_lock); allocation_state = REVOKED; } slurm_cond_broadcast(&allocation_state_cond); slurm_mutex_unlock(&allocation_state_lock); slurm_free_resource_allocation_response_msg(alloc); if (msg_thr) slurm_allocation_msg_thr_destroy(msg_thr); /* * Figure out what return code we should use. If the user's command * exited normally, return the user's return code. */ rc = 1; if (rc_pid != -1) { if (WIFEXITED(status)) { rc = WEXITSTATUS(status); } else if (WIFSTOPPED(status)) { /* Terminate stopped child process */ _forward_signal(SIGKILL); } else if (WIFSIGNALED(status)) { verbose("Command \"%s\" was terminated by signal %d", command_argv[0], WTERMSIG(status)); /* if we get these signals we return a normal * exit since this was most likely sent from the * user */ switch (WTERMSIG(status)) { case SIGHUP: case SIGINT: case SIGQUIT: case SIGKILL: rc = 0; break; default: break; } } } #ifdef MEMORY_LEAK_DEBUG slurm_select_fini(); slurm_reset_all_options(&opt, false); slurm_auth_fini(); slurm_conf_destroy(); log_fini(); #endif /* MEMORY_LEAK_DEBUG */ return rc; } /* Initial processing of resource allocation response, including waiting for * compute nodes to be ready for use. * Ret SLURM_SUCCESS on success */ static int _proc_alloc(resource_allocation_response_msg_t *alloc) { static int elem = 0; if ((elem++ == 0) && alloc->working_cluster_rec) { slurm_setup_remote_working_cluster(alloc); /* set env for srun's to find the right cluster */ setenvf(NULL, "SLURM_WORKING_CLUSTER", "%s:%s:%d:%d:%d", working_cluster_rec->name, working_cluster_rec->control_host, working_cluster_rec->control_port, working_cluster_rec->rpc_version, select_get_plugin_id()); } if (!_wait_nodes_ready(alloc)) { if (!allocation_interrupted) error("Something is wrong with the boot of the nodes."); return SLURM_ERROR; } return SLURM_SUCCESS; } /* Copy job name from last component to all hetjob components unless * explicitly set. The default value comes from _salloc_default_command() * and is "sh". */ static void _match_job_name(job_desc_msg_t *desc_last, List job_req_list) { ListIterator iter; job_desc_msg_t *desc = NULL; char *name; if (!desc_last) return; if (!desc_last->name && command_argv[0]) desc_last->name = xstrdup(xbasename(command_argv[0])); name = desc_last->name; if (!job_req_list) return; iter = list_iterator_create(job_req_list); while ((desc = list_next(iter))) if (!desc->name) desc->name = xstrdup(name); list_iterator_destroy(iter); } static void _set_exit_code(void) { int i; char *val; if ((val = getenv("SLURM_EXIT_ERROR"))) { i = atoi(val); if (i == 0) error("SLURM_EXIT_ERROR has zero value"); else error_exit = i; } if ((val = getenv("SLURM_EXIT_IMMEDIATE"))) { i = atoi(val); if (i == 0) error("SLURM_EXIT_IMMEDIATE has zero value"); else immediate_exit = i; } } /* Propagate SPANK environment via SLURM_SPANK_ environment variables */ static void _set_spank_env(void) { int i; for (i = 0; i < opt.spank_job_env_size; i++) { if (setenvfs("SLURM_SPANK_%s", opt.spank_job_env[i]) < 0) { error("unable to set %s in environment", opt.spank_job_env[i]); } } } /* Set SLURM_SUBMIT_DIR and SLURM_SUBMIT_HOST environment variables within * current state */ static void _set_submit_dir_env(void) { char host[256]; work_dir = xmalloc(MAXPATHLEN + 1); if ((getcwd(work_dir, MAXPATHLEN)) == NULL) error("getcwd failed: %m"); else if (setenvf(NULL, "SLURM_SUBMIT_DIR", "%s", work_dir) < 0) error("unable to set SLURM_SUBMIT_DIR in environment"); if ((gethostname(host, sizeof(host)))) error("gethostname_short failed: %m"); else if (setenvf(NULL, "SLURM_SUBMIT_HOST", "%s", host) < 0) error("unable to set SLURM_SUBMIT_HOST in environment"); } /* Returns 0 on success, -1 on failure */ static int _fill_job_desc_from_opts(job_desc_msg_t *desc) { desc->contiguous = opt.contiguous ? 1 : 0; if (opt.core_spec != NO_VAL16) desc->core_spec = opt.core_spec; desc->extra = xstrdup(opt.extra); desc->features = xstrdup(opt.constraint); desc->cluster_features = xstrdup(opt.c_constraint); if (opt.immediate == 1) desc->immediate = 1; desc->name = xstrdup(opt.job_name); desc->reservation = xstrdup(opt.reservation); desc->profile = opt.profile; desc->wckey = xstrdup(opt.wckey); desc->x11 = opt.x11; if (desc->x11) { desc->x11_magic_cookie = xstrdup(opt.x11_magic_cookie); desc->x11_target = xstrdup(opt.x11_target); desc->x11_target_port = opt.x11_target_port; } desc->cpu_freq_min = opt.cpu_freq_min; desc->cpu_freq_max = opt.cpu_freq_max; desc->cpu_freq_gov = opt.cpu_freq_gov; if (opt.req_switch >= 0) desc->req_switch = opt.req_switch; if (opt.wait4switch >= 0) desc->wait4switch = opt.wait4switch; desc->req_nodes = xstrdup(opt.nodelist); desc->exc_nodes = xstrdup(opt.exclude); desc->partition = xstrdup(opt.partition); if (opt.nodes_set) { desc->min_nodes = opt.min_nodes; if (opt.max_nodes) desc->max_nodes = opt.max_nodes; } else if (opt.ntasks_set && (opt.ntasks == 0)) desc->min_nodes = 0; desc->user_id = opt.uid; desc->group_id = opt.gid; if (opt.dependency) desc->dependency = xstrdup(opt.dependency); if (opt.mem_bind) desc->mem_bind = xstrdup(opt.mem_bind); if (opt.mem_bind_type) desc->mem_bind_type = opt.mem_bind_type; if (opt.plane_size != NO_VAL) desc->plane_size = opt.plane_size; desc->task_dist = opt.distribution; if (opt.plane_size != NO_VAL) desc->plane_size = opt.plane_size; if (opt.licenses) desc->licenses = xstrdup(opt.licenses); desc->network = xstrdup(opt.network); if (opt.nice != NO_VAL) desc->nice = NICE_OFFSET + opt.nice; if (opt.priority) desc->priority = opt.priority; desc->mail_type = opt.mail_type; if (opt.mail_user) desc->mail_user = xstrdup(opt.mail_user); if (opt.begin) desc->begin_time = opt.begin; if (opt.deadline) desc->deadline = opt.deadline; if (opt.burst_buffer) desc->burst_buffer = opt.burst_buffer; if (opt.account) desc->account = xstrdup(opt.account); if (opt.acctg_freq) desc->acctg_freq = xstrdup(opt.acctg_freq); if (opt.comment) desc->comment = xstrdup(opt.comment); if (opt.qos) desc->qos = xstrdup(opt.qos); if (opt.chdir) desc->work_dir = xstrdup(opt.chdir); else if (work_dir) desc->work_dir = xstrdup(work_dir); if (opt.hold) desc->priority = 0; if (opt.reboot) desc->reboot = 1; /* job constraints */ if (opt.pn_min_cpus > -1) desc->pn_min_cpus = opt.pn_min_cpus; if (opt.pn_min_memory != NO_VAL64) desc->pn_min_memory = opt.pn_min_memory; else if (opt.mem_per_cpu != NO_VAL64) desc->pn_min_memory = opt.mem_per_cpu | MEM_PER_CPU; if (opt.pn_min_tmp_disk != NO_VAL64) desc->pn_min_tmp_disk = opt.pn_min_tmp_disk; if (opt.overcommit) { desc->min_cpus = opt.min_nodes; desc->overcommit = opt.overcommit; } else if (opt.cpus_set) desc->min_cpus = opt.ntasks * opt.cpus_per_task; else desc->min_cpus = opt.ntasks; if (opt.ntasks_set) desc->num_tasks = opt.ntasks; if (opt.cpus_set) desc->cpus_per_task = opt.cpus_per_task; if (opt.ntasks_per_node) desc->ntasks_per_node = opt.ntasks_per_node; if (opt.ntasks_per_socket > -1) desc->ntasks_per_socket = opt.ntasks_per_socket; if (opt.ntasks_per_core > -1) desc->ntasks_per_core = opt.ntasks_per_core; /* node constraints */ if (opt.sockets_per_node != NO_VAL) desc->sockets_per_node = opt.sockets_per_node; if (opt.cores_per_socket != NO_VAL) desc->cores_per_socket = opt.cores_per_socket; if (opt.threads_per_core != NO_VAL) desc->threads_per_core = opt.threads_per_core; if (opt.no_kill) desc->kill_on_node_fail = 0; if (opt.time_limit != NO_VAL) desc->time_limit = opt.time_limit; if (opt.time_min != NO_VAL) desc->time_min = opt.time_min; if (opt.job_flags) desc->bitflags |= opt.job_flags; desc->shared = opt.shared; desc->wait_all_nodes = saopt.wait_all_nodes; if (opt.warn_flags) desc->warn_flags = opt.warn_flags; if (opt.warn_signal) desc->warn_signal = opt.warn_signal; if (opt.warn_time) desc->warn_time = opt.warn_time; if (opt.spank_job_env_size) { /* NOTE: Not copying array, but shared memory */ desc->spank_job_env = opt.spank_job_env; desc->spank_job_env_size = opt.spank_job_env_size; } desc->power_flags = opt.power; if (opt.mcs_label) desc->mcs_label = xstrdup(opt.mcs_label); if (opt.delay_boot != NO_VAL) desc->delay_boot = opt.delay_boot; if (opt.cpus_set) desc->bitflags |= JOB_CPUS_SET; if (opt.ntasks_set) desc->bitflags |= JOB_NTASKS_SET; desc->clusters = xstrdup(opt.clusters); if (opt.cpus_per_gpu) xstrfmtcat(desc->cpus_per_tres, "gpu:%d", opt.cpus_per_gpu); desc->tres_bind = xstrdup(opt.tres_bind); desc->tres_freq = xstrdup(opt.tres_freq); xfmt_tres(&desc->tres_per_job, "gpu", opt.gpus); xfmt_tres(&desc->tres_per_node, "gpu", opt.gpus_per_node); if (opt.gres) { if (desc->tres_per_node) xstrfmtcat(desc->tres_per_node, ",%s", opt.gres); else desc->tres_per_node = xstrdup(opt.gres); } xfmt_tres(&desc->tres_per_socket, "gpu", opt.gpus_per_socket); xfmt_tres(&desc->tres_per_task, "gpu", opt.gpus_per_task); if (opt.mem_per_gpu != NO_VAL64) xstrfmtcat(desc->mem_per_tres, "gpu:%"PRIu64, opt.mem_per_gpu); return 0; } static void _ring_terminal_bell(void) { if (isatty(STDOUT_FILENO)) { fprintf(stdout, "\a"); fflush(stdout); } } /* returns the pid of the forked command, or <0 on error */ static pid_t _fork_command(char **command) { pid_t pid; pid = fork(); if (pid < 0) { error("%s: fork failed: %m", __func__); } else if (pid == 0) { /* child */ char *cwd = opt.chdir ? opt.chdir : work_dir; char *cpath = search_path(cwd, command[0], true, X_OK, true); xassert(cwd); if (!cpath) { error("%s: Unable to find command \"%s\"", __func__, command[0]); exit(error_exit); } setpgid(getpid(), 0); /* * Reset job control signals. * Suspend (TSTP) is not restored (ignored, as in the parent): * shells with job-control override this and look after their * processes. * Suspending single commands is more complex and would require * adding full shell-like job control to salloc. */ xsignal(SIGINT, SIG_DFL); xsignal(SIGQUIT, SIG_DFL); xsignal(SIGTTIN, SIG_DFL); xsignal(SIGTTOU, SIG_DFL); execvp(cpath, command); /* should only get here if execvp failed */ error("%s: Unable to exec command \"%s\": %m", __func__, cpath); xfree(cpath); exit(error_exit); } /* parent returns */ return pid; } static void _pending_callback(uint32_t job_id) { info("Pending job allocation %u", job_id); my_job_id = job_id; /* call cli_filter post_submit here so it runs while allocating */ _salloc_cli_filter_post_submit(my_job_id, NO_VAL); } /* * Run cli_filter_post_submit on all opt structures * Convenience function since this might need to run in two spots * uses a static bool to prevent multiple executions */ static void _salloc_cli_filter_post_submit(uint32_t jobid, uint32_t stepid) { int idx = 0; if (_cli_filter_post_submit_run) return; for (idx = 0; idx < het_job_limit; idx++) cli_filter_plugin_post_submit(idx, jobid, stepid); _cli_filter_post_submit_run = true; } static void _exit_on_signal(int signo) { _forward_signal(signo); exit_flag = true; } static void _forward_signal(int signo) { if (command_pid > 0) killpg(command_pid, signo); } static void _signal_while_allocating(int signo) { allocation_interrupted = true; if (my_job_id != 0) { slurm_complete_job(my_job_id, NO_VAL); } } /* This typically signifies the job was cancelled by scancel */ static void _job_complete_handler(srun_job_complete_msg_t *comp) { if (my_job_id && (my_job_id != comp->job_id)) { error("Ignoring job_complete for job %u because our job ID is %u", comp->job_id, my_job_id); return; } if (comp->step_id == NO_VAL) { slurm_mutex_lock(&allocation_state_lock); if (allocation_state != REVOKED) { /* If the allocation_state is already REVOKED, then * no need to print this message. We probably * relinquished the allocation ourself. */ if (last_timeout && (last_timeout < time(NULL))) { info("Job %u has exceeded its time limit and " "its allocation has been revoked.", comp->job_id); } else { info("Job allocation %u has been revoked.", comp->job_id); allocation_revoked = true; } } allocation_state = REVOKED; slurm_cond_broadcast(&allocation_state_cond); slurm_mutex_unlock(&allocation_state_lock); /* * Clean up child process: only if the forked process has not * yet changed state (waitpid returning 0). */ if ((command_pid > -1) && (waitpid(command_pid, NULL, WNOHANG) == 0)) { int signal = 0; if (is_interactive) { pid_t tpgid = tcgetpgrp(STDIN_FILENO); /* * This happens if the command forks further * subprocesses, e.g. a user shell (since we * are ignoring TSTP, the process must have * originated from salloc). Notify foreground * process about pending termination. */ if (tpgid != command_pid && tpgid != getpgrp()) killpg(tpgid, SIGHUP); } if (saopt.kill_command_signal) signal = saopt.kill_command_signal; #ifdef SALLOC_KILL_CMD else if (is_interactive) signal = SIGHUP; else signal = SIGTERM; #endif if (signal) { verbose("Sending signal %d to command \"%s\"," " pid %d", signal, command_argv[0], command_pid); if (suspend_flag) _forward_signal(SIGCONT); _forward_signal(signal); } } } else { verbose("Job step %u.%u is finished.", comp->job_id, comp->step_id); } } static void _job_suspend_handler(suspend_msg_t *msg) { if (msg->op == SUSPEND_JOB) { verbose("job has been suspended"); } else if (msg->op == RESUME_JOB) { verbose("job has been resumed"); } } /* * Job has been notified of it's approaching time limit. * Job will be killed shortly after timeout. * This RPC can arrive multiple times with the same or updated timeouts. * FIXME: We may want to signal the job or perform other action for this. * FIXME: How much lead time do we want for this message? Some jobs may * require tens of minutes to gracefully terminate. */ static void _timeout_handler(srun_timeout_msg_t *msg) { if (msg->timeout != last_timeout) { last_timeout = msg->timeout; verbose("Job allocation time limit to be reached at %s", slurm_ctime2(&msg->timeout)); } } static void _user_msg_handler(srun_user_msg_t *msg) { info("%s", msg->msg); } static void _ping_handler(srun_ping_msg_t *msg) { /* the api will respond so there really is nothing to do here */ } static void _node_fail_handler(srun_node_fail_msg_t *msg) { error("Node failure on %s", msg->nodelist); } static void _set_rlimits(char **env) { slurm_rlimits_info_t *rli; char env_name[32] = "SLURM_RLIMIT_"; char *env_value, *p; struct rlimit r; rlim_t env_num; int header_len = sizeof("SLURM_RLIMIT_"); for (rli = get_slurm_rlimits_info(); rli->name; rli++) { if (rli->propagate_flag != PROPAGATE_RLIMITS) continue; if ((header_len + strlen(rli->name)) >= sizeof(env_name)) { error("%s: env_name(%s) too long", __func__, env_name); continue; } strcpy(&env_name[header_len - 1], rli->name); env_value = getenvp(env, env_name); if (env_value == NULL) continue; unsetenvp(env, env_name); if (getrlimit(rli->resource, &r) < 0) { error("getrlimit(%s): %m", env_name+6); continue; } env_num = strtol(env_value, &p, 10); if (p && (p[0] != '\0')) { error("Invalid environment %s value %s", env_name, env_value); continue; } if (r.rlim_cur == env_num) continue; r.rlim_cur = (rlim_t) env_num; if (setrlimit(rli->resource, &r) < 0) { error("setrlimit(%s): %m", env_name+6); continue; } } } /* returns 1 if job and nodes are ready for job to begin, 0 otherwise */ static int _wait_nodes_ready(resource_allocation_response_msg_t *alloc) { int is_ready = 0, i, rc; int cur_delay = 0; int suspend_time, resume_time, max_delay; bool job_killed = false; suspend_time = slurm_get_suspend_timeout(); resume_time = slurm_get_resume_timeout(); if (suspend_time || resume_time) { max_delay = suspend_time + resume_time; max_delay *= 5; /* Allow for ResumeRate support */ } else { max_delay = 300; /* Wait to 5 min for PrologSlurmctld */ } if (alloc->alias_list && !xstrcmp(alloc->alias_list, "TBD")) saopt.wait_all_nodes = 1; /* Wait for boot & addresses */ if (saopt.wait_all_nodes == NO_VAL16) saopt.wait_all_nodes = 0; for (i = 0; (cur_delay < max_delay); i++) { if (i) { if (i == 1) info("Waiting for resource configuration"); else debug("still waiting"); sleep(POLL_SLEEP); cur_delay += POLL_SLEEP; } rc = slurm_job_node_ready(alloc->job_id); if (rc == READY_JOB_FATAL) break; /* fatal error */ if ((rc == READY_JOB_ERROR) || (rc == EAGAIN)) continue; /* retry */ if ((rc & READY_JOB_STATE) == 0) { /* job killed */ job_killed = true; break; } if ((rc & READY_JOB_STATE) && ((rc & READY_NODE_STATE) || !saopt.wait_all_nodes)) { is_ready = 1; break; } if (allocation_interrupted || allocation_revoked) break; } if (is_ready) { resource_allocation_response_msg_t *resp; char *tmp_str; if (i > 0) info("Nodes %s are ready for job", alloc->node_list); if (alloc->alias_list && !xstrcmp(alloc->alias_list, "TBD") && (slurm_allocation_lookup(alloc->job_id, &resp) == SLURM_SUCCESS)) { tmp_str = alloc->alias_list; alloc->alias_list = resp->alias_list; resp->alias_list = tmp_str; slurm_free_resource_allocation_response_msg(resp); } } else if (!allocation_interrupted) { if (job_killed) { error("Job allocation %u has been revoked", alloc->job_id); allocation_interrupted = true; } else error("Nodes %s are still not ready", alloc->node_list); } else /* allocation_interrupted or slurmctld not responing */ is_ready = 0; return is_ready; }