/*****************************************************************************\
* job_container_cncu.c - Define job container management functions for
* Cray systems
*****************************************************************************
* Copyright (C) 2013 SchedMD LLC
* Written by Morris Jette, SchedMD
*
* This file is part of Slurm, a resource management program.
* For details, see .
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#include "config.h"
#include
#include
#include
#ifdef HAVE_NATIVE_CRAY
#include /* Cray's job module component */
#endif
#include "slurm/slurm_errno.h"
#include "src/common/slurm_xlator.h"
#include "src/slurmd/common/proctrack.h"
#define ADD_FLAGS 0
#define CREATE_FLAGS 0
#define DELETE_FLAGS 0
#define JOB_BUF_SIZE 128
/*
* These variables are required by the generic plugin interface. If they
* are not found in the plugin, the plugin loader will ignore it.
*
* plugin_name - a string giving a human-readable description of the
* plugin. There is no maximum length, but the symbol must refer to
* a valid string.
*
* plugin_type - a string suggesting the type of the plugin or its
* applicability to a particular form of data or method of data handling.
* If the low-level plugin API is used, the contents of this string are
* unimportant and may be anything. Slurm uses the higher-level plugin
* interface which requires this string to be of the form
*
* /
*
* where is a description of the intended application of
* the plugin (e.g., "task" for task control) and is a description
* of how this plugin satisfies that application. Slurm will only load
* a task plugin if the plugin_type string has a prefix of "task/".
*
* plugin_version - an unsigned 32-bit integer containing the Slurm version
* (major.minor.micro combined into a single number).
*/
const char plugin_name[] = "job_container cncu plugin";
const char plugin_type[] = "job_container/cncu";
const uint32_t plugin_version = SLURM_VERSION_NUMBER;
static uint32_t *job_id_array = NULL;
static uint32_t job_id_count = 0;
static pthread_mutex_t context_lock = PTHREAD_MUTEX_INITIALIZER;
static char *state_dir = NULL;
static uint64_t debug_flags = 0;
static int _save_state(char *dir_name)
{
char *file_name;
int ret = SLURM_SUCCESS;
int state_fd;
if (!dir_name) {
error("job_container state directory is NULL");
return SLURM_ERROR;
}
file_name = xstrdup_printf("%s/job_container_state", dir_name);
(void) unlink(file_name);
state_fd = creat(file_name, 0600);
if (state_fd < 0) {
error("Can't save state, error creating file %s %m",
file_name);
ret = SLURM_ERROR;
} else {
char *buf = (char *) job_id_array;
size_t len = job_id_count * sizeof(uint32_t);
while (1) {
int wrote = write(state_fd, buf, len);
if ((wrote < 0) && (errno == EINTR))
continue;
if (wrote == 0)
break;
if (wrote < 0) {
error("Can't save job_container state: %m");
ret = SLURM_ERROR;
break;
}
buf += wrote;
len -= wrote;
}
close(state_fd);
}
xfree(file_name);
return ret;
}
static int _restore_state(char *dir_name)
{
char *data = NULL, *file_name = NULL;
int error_code = SLURM_SUCCESS;
int state_fd, data_allocated = 0, data_read = 0, data_offset = 0;
if (!dir_name) {
error("job_container state directory is NULL");
return SLURM_ERROR;
}
file_name = xstrdup_printf("%s/job_container_state", dir_name);
state_fd = open (file_name, O_RDONLY);
if (state_fd >= 0) {
data_allocated = JOB_BUF_SIZE;
data = xmalloc(data_allocated);
while (1) {
data_read = read(state_fd, data + data_offset,
JOB_BUF_SIZE);
if ((data_read < 0) && (errno == EINTR))
continue;
if (data_read < 0) {
error ("Read error on %s, %m", file_name);
error_code = SLURM_ERROR;
break;
} else if (data_read == 0)
break;
data_offset += data_read;
data_allocated += data_read;
xrealloc(data, data_allocated);
}
close(state_fd);
} else {
error("No %s file for %s state recovery",
file_name, plugin_type);
xfree(file_name);
return SLURM_SUCCESS;
}
xfree(file_name);
if (error_code == SLURM_SUCCESS) {
job_id_array = (uint32_t *) data;
job_id_count = data_offset / sizeof(uint32_t);
}
return error_code;
}
#ifdef HAVE_NATIVE_CRAY
static void _stat_reservation(char *type, rid_t resv_id)
{
struct job_resv_stat buf;
DEF_TIMERS;
START_TIMER;
if (job_stat_reservation(resv_id, &buf)) {
error("%s: stat(%"PRIu64"): %m", plugin_type, resv_id);
} else {
info("%s: %s/stat(%"PRIu64"): flags=%d "
"num_jobs=%d num_files=%d num_ipc_objs=%d",
plugin_type, type, resv_id, buf.flags, buf.num_jobs,
buf.num_files, buf.num_ipc_objs);
}
END_TIMER;
if (debug_flags & DEBUG_FLAG_TIME_CRAY)
INFO_LINE("call took: %s", TIME_STR);
}
#endif
extern void container_p_reconfig(void)
{
debug_flags = slurm_get_debug_flags();
}
/*
* init() is called when the plugin is loaded, before any other functions
* are called. Put global initialization here.
*/
extern int init(void)
{
debug_flags = slurm_get_debug_flags();
if (debug_flags & DEBUG_FLAG_JOB_CONT)
info("%s loaded", plugin_name);
else
debug("%s loaded", plugin_name);
return SLURM_SUCCESS;
}
/*
* fini() is called when the plugin is removed. Clear any allocated
* storage here.
*/
extern int fini(void)
{
slurm_mutex_lock(&context_lock);
xfree(state_dir);
xfree(job_id_array);
job_id_count = 0;
slurm_mutex_unlock(&context_lock);
return SLURM_SUCCESS;
}
extern int container_p_restore(char *dir_name, bool recover)
{
int i;
slurm_mutex_lock(&context_lock);
xfree(state_dir);
state_dir = xstrdup(dir_name);
_restore_state(state_dir);
for (i = 0; i < job_id_count; i++) {
if (job_id_array[i] == 0)
continue;
if (debug_flags & DEBUG_FLAG_JOB_CONT)
info("%s: %s job(%u)",
plugin_type,
recover ? "recovered" : "purging",
job_id_array[i]);
if (!recover)
job_id_array[i] = 0;
}
slurm_mutex_unlock(&context_lock);
return SLURM_SUCCESS;
}
extern int container_p_create(uint32_t job_id)
{
#ifdef HAVE_NATIVE_CRAY
rid_t resv_id = job_id;
int rc;
#endif
int i, empty = -1, found = -1;
DEF_TIMERS;
START_TIMER;
if (debug_flags & DEBUG_FLAG_JOB_CONT)
info("%s: creating(%u)", plugin_type, job_id);
slurm_mutex_lock(&context_lock);
for (i = 0; i < job_id_count; i++) {
if (job_id_array[i] == 0) {
empty = i;
} else if (job_id_array[i] == job_id) {
found = i;
break;
}
}
if (found == -1) {
if (empty == -1) {
empty = job_id_count;
job_id_count += 4;
job_id_array = xrealloc(job_id_array,
sizeof(uint32_t)*job_id_count);
}
job_id_array[empty] = job_id;
_save_state(state_dir);
}
slurm_mutex_unlock(&context_lock);
if (debug_flags & DEBUG_FLAG_TIME_CRAY) {
END_TIMER;
INFO_LINE("call took: %s", TIME_STR);
} else {
END_TIMER3("container_p_create: saving state took", 3000000);
}
#ifdef HAVE_NATIVE_CRAY
START_TIMER;
rc = job_create_reservation(resv_id, CREATE_FLAGS);
if (debug_flags & DEBUG_FLAG_TIME_CRAY) {
END_TIMER;
INFO_LINE("call took: %s", TIME_STR);
} else
END_TIMER3("container_p_create: job_create_reservation took",
3000000);
if ((rc == 0) || (errno == EEXIST)) {
if ((found == -1) && (rc != 0) && (errno == EEXIST)) {
error("%s: create(%u): Reservation already exists",
plugin_type, job_id);
}
if (debug_flags & DEBUG_FLAG_JOB_CONT)
_stat_reservation("create", resv_id);
return SLURM_SUCCESS;
}
error("%s: create(%u): %m", plugin_type, job_id);
return SLURM_ERROR;
#else
return SLURM_SUCCESS;
#endif
}
/* Add proctrack container (PAGG) to a job container */
extern int container_p_add_cont(uint32_t job_id, uint64_t cont_id)
{
#ifdef HAVE_NATIVE_CRAY
jid_t cjob_id = cont_id;
rid_t resv_id = job_id;
int rc;
DEF_TIMERS;
#endif
if (debug_flags & DEBUG_FLAG_JOB_CONT) {
info("%s: adding cont(%u.%"PRIu64")",
plugin_type, job_id, cont_id);
}
#ifdef HAVE_NATIVE_CRAY
START_TIMER;
rc = job_attach_reservation(cjob_id, resv_id, ADD_FLAGS);
if (debug_flags & DEBUG_FLAG_TIME_CRAY) {
END_TIMER;
INFO_LINE("call took: %s", TIME_STR);
} else
END_TIMER3("container_p_add_cont: job_attach_reservation took",
3000000);
if ((rc != 0) && (errno == ENOENT)) { /* Log and retry */
if (debug_flags & DEBUG_FLAG_JOB_CONT)
info("%s: add(%u.%"PRIu64"): No reservation found, "
"no big deal, this is probably the first time "
"this was called. We will just create a new one.",
plugin_type, job_id, cont_id);
START_TIMER;
rc = job_create_reservation(resv_id, CREATE_FLAGS);
rc = job_attach_reservation(cjob_id, resv_id, ADD_FLAGS);
if (debug_flags & DEBUG_FLAG_TIME_CRAY) {
END_TIMER;
INFO_LINE("call took: %s", TIME_STR);
} else
END_TIMER3("container_p_add_cont: "
"job_(create&attach)_reservation took",
3000000);
}
if ((rc == 0) || (errno == EBUSY)) {
if (rc) {
/* EBUSY - job ID already attached to a reservation
* Duplicate adds can be generated by prolog/epilog */
debug2("%s: add(%u.%"PRIu64"): %m",
plugin_type, job_id, cont_id);
} else if (debug_flags & DEBUG_FLAG_JOB_CONT)
_stat_reservation("add", resv_id);
return SLURM_SUCCESS;
}
error("%s: add(%u.%"PRIu64"): %m", plugin_type, job_id, cont_id);
return SLURM_ERROR;
#else
return SLURM_SUCCESS;
#endif
}
/* Add a process to a job container, create the proctrack container to add */
extern int container_p_join(uint32_t job_id, uid_t uid)
{
stepd_step_rec_t job;
int rc;
pid_t pid = getpid();
DEF_TIMERS;
START_TIMER;
if (debug_flags & DEBUG_FLAG_JOB_CONT) {
info("%s: adding pid(%u.%u)",
plugin_type, job_id, (uint32_t) pid);
}
memset(&job, 0, sizeof(stepd_step_rec_t));
job.jmgr_pid = pid;
job.uid = uid;
/*
* container_g_join() is called only from forked processes, set the
* proctrack_forked global bool to inform proctrack/cray_aries we are
* forked.
*/
proctrack_forked = true;
if (proctrack_g_create(&job) != SLURM_SUCCESS) {
error("%s: proctrack_g_create job(%u)", plugin_type,job_id);
return SLURM_ERROR;
}
proctrack_g_add(&job, pid);
rc = container_p_add_cont(job_id, job.cont_id);
if (debug_flags & DEBUG_FLAG_TIME_CRAY) {
END_TIMER;
INFO_LINE("call took: %s", TIME_STR);
}
return rc;
}
extern int container_p_delete(uint32_t job_id)
{
#ifdef HAVE_NATIVE_CRAY
rid_t resv_id = job_id;
DEF_TIMERS;
int rc;
#endif
int i, found = -1;
bool job_id_change = false;
if (debug_flags & DEBUG_FLAG_JOB_CONT)
info("%s: deleting(%u)", plugin_type, job_id);
slurm_mutex_lock(&context_lock);
for (i = 0; i < job_id_count; i++) {
if (job_id_array[i] == job_id) {
job_id_array[i] = 0;
job_id_change = true;
found = i;
}
}
if (found == -1)
info("%s: no job for delete(%u)", plugin_type, job_id);
if (job_id_change)
_save_state(state_dir);
slurm_mutex_unlock(&context_lock);
#ifdef HAVE_NATIVE_CRAY
START_TIMER;
rc = job_end_reservation(resv_id, DELETE_FLAGS);
if (debug_flags & DEBUG_FLAG_TIME_CRAY) {
END_TIMER;
INFO_LINE("call took: %s", TIME_STR);
} else
END_TIMER3("container_p_delete: job_end_reservation took",
3000000);
if (rc == 0)
return SLURM_SUCCESS;
if ((errno == ENOENT) || (errno == EINPROGRESS) || (errno == EALREADY))
return SLURM_SUCCESS; /* Not fatal error */
error("%s: delete(%u): %m", plugin_type, job_id);
return SLURM_ERROR;
#else
return SLURM_SUCCESS;
#endif
}