/*****************************************************************************\
* slurm_persist_conn.h - Definitions for communicating over a persistent
* connection within Slurm.
******************************************************************************
* Copyright (C) 2016 SchedMD LLC
* Written by Danny Auble da@schedmd.com, et. al.
*
* This file is part of Slurm, a resource management program.
* For details, see .
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#include "config.h"
#include
#include
#if HAVE_SYS_PRCTL_H
#include
#endif
#include "slurm/slurm_errno.h"
#include "src/common/fd.h"
#include "src/common/macros.h"
#include "src/common/slurm_auth.h"
#include "src/common/slurm_protocol_pack.h"
#include "src/common/slurmdbd_defs.h"
#include "src/common/slurmdbd_pack.h"
#include "src/common/xsignal.h"
#include "slurm_persist_conn.h"
#define MAX_THREAD_COUNT 100
/*
* Maximum message size. Messages larger than this value (in bytes)
* will not be received.
*/
#define MAX_MSG_SIZE (16*1024*1024)
typedef struct {
void *arg;
slurm_persist_conn_t *conn;
int thread_loc;
pthread_t thread_id;
} persist_service_conn_t;
static persist_service_conn_t *persist_service_conn[MAX_THREAD_COUNT];
static int thread_count = 0;
static pthread_mutex_t thread_count_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t thread_count_cond = PTHREAD_COND_INITIALIZER;
static int shutdown_time = 0;
/* Return time in msec since "start time" */
static int _tot_wait (struct timeval *start_time)
{
struct timeval end_time;
int msec_delay;
gettimeofday(&end_time, NULL);
msec_delay = (end_time.tv_sec - start_time->tv_sec ) * 1000;
msec_delay += ((end_time.tv_usec - start_time->tv_usec + 500) / 1000);
return msec_delay;
}
/* close and fd and replace it with a -1 */
static void _close_fd(int *fd)
{
if (*fd && *fd >= 0) {
close(*fd);
*fd = -1;
}
}
/* Return true if communication failure should be logged. Only log failures
* every 10 minutes to avoid filling logs */
static bool _comm_fail_log(slurm_persist_conn_t *persist_conn)
{
time_t now = time(NULL);
time_t old = now - 600; /* Log failures once every 10 mins */
if (persist_conn->comm_fail_time < old) {
persist_conn->comm_fail_time = now;
return true;
}
return false;
}
/* static void _reopen_persist_conn(slurm_persist_conn_t *persist_conn) */
/* { */
/* xassert(persist_conn); */
/* _close_fd(&persist_conn->fd); */
/* slurm_persist_conn_open(persist_conn); */
/* } */
/* Wait until a file is readable,
* RET false if can not be read */
static bool _conn_readable(slurm_persist_conn_t *persist_conn)
{
struct pollfd ufds;
int rc, time_left;
xassert(persist_conn->shutdown);
ufds.fd = persist_conn->fd;
ufds.events = POLLIN;
while (!(*persist_conn->shutdown)) {
if (persist_conn->timeout) {
struct timeval tstart;
gettimeofday(&tstart, NULL);
time_left = persist_conn->timeout - _tot_wait(&tstart);
} else
time_left = -1;
rc = poll(&ufds, 1, time_left);
if (*persist_conn->shutdown)
break;
if (rc == -1) {
if ((errno == EINTR) || (errno == EAGAIN)) {
debug3("%s: retrying poll for fd %d: %m",
__func__, persist_conn->fd);
continue;
}
error("%s: poll error for fd %d: %m",
__func__, persist_conn->fd);
return false;
}
if (rc == 0) {
debug("%s: poll for fd %d timeout after %d msecs of total wait %d msecs.",
__func__, persist_conn->fd, time_left,
persist_conn->timeout);
return false;
}
if ((ufds.revents & POLLHUP) &&
((ufds.revents & POLLIN) == 0)) {
debug2("%s: persistent connection for fd %d closed",
__func__, persist_conn->fd);
return false;
}
if (ufds.revents & POLLNVAL) {
error("%s: persistent connection for fd %d is invalid",
__func__, persist_conn->fd);
return false;
}
if (ufds.revents & POLLERR) {
error("%s: persistent connection for fd %d experienced an error",
__func__, persist_conn->fd);
return false;
}
if ((ufds.revents & POLLIN) == 0) {
error("%s: persistent connection for fd %d missing POLLIN flag with revents 0x%"PRIx64,
__func__, persist_conn->fd, (uint64_t) ufds.revents);
return false;
}
if (ufds.revents == POLLIN) {
errno = 0;
return true;
}
fatal_abort("%s: poll returned unexpected revents: 0x%"PRIx64,
__func__, (uint64_t) ufds.revents);
}
debug("%s: shutdown request detected for fd %d",
__func__, persist_conn->fd);
return false;
}
static void _destroy_persist_service(persist_service_conn_t *persist_service)
{
if (persist_service) {
slurm_persist_conn_destroy(persist_service->conn);
xfree(persist_service);
}
}
static void _sig_handler(int signal)
{
}
static void _persist_free_msg_members(slurm_persist_conn_t *persist_conn,
persist_msg_t *persist_msg)
{
if (persist_conn->flags & PERSIST_FLAG_DBD)
slurmdbd_free_msg(persist_msg);
else
slurm_free_msg_data(persist_msg->msg_type, persist_msg->data);
}
static int _process_service_connection(
slurm_persist_conn_t *persist_conn, void *arg)
{
uint32_t nw_size = 0, msg_size = 0, uid = NO_VAL;
char *msg_char = NULL;
ssize_t msg_read = 0, offset = 0;
bool first = true, fini = false;
Buf buffer = NULL;
int rc = SLURM_SUCCESS;
xassert(persist_conn->callback_proc);
xassert(persist_conn->shutdown);
debug2("Opened connection %d from %s", persist_conn->fd,
persist_conn->rem_host);
if (persist_conn->flags & PERSIST_FLAG_ALREADY_INITED)
first = false;
while (!(*persist_conn->shutdown) && !fini) {
if (!_conn_readable(persist_conn))
break; /* problem with this socket */
msg_read = read(persist_conn->fd, &nw_size, sizeof(nw_size));
if (msg_read == 0) /* EOF */
break;
if (msg_read != sizeof(nw_size)) {
error("Could not read msg_size from "
"connection %d(%s) uid(%d)",
persist_conn->fd, persist_conn->rem_host, uid);
break;
}
msg_size = ntohl(nw_size);
if ((msg_size < 2) || (msg_size > MAX_MSG_SIZE)) {
error("Invalid msg_size (%u) from "
"connection %d(%s) uid(%d)",
msg_size, persist_conn->fd,
persist_conn->rem_host, uid);
break;
}
msg_char = xmalloc(msg_size);
offset = 0;
while (msg_size > offset) {
if (!_conn_readable(persist_conn))
break; /* problem with this socket */
msg_read = read(persist_conn->fd, (msg_char + offset),
(msg_size - offset));
if (msg_read <= 0) {
error("read(%d): %m", persist_conn->fd);
break;
}
offset += msg_read;
}
if (msg_size == offset) {
persist_msg_t msg;
rc = slurm_persist_conn_process_msg(
persist_conn, &msg,
msg_char, msg_size,
&buffer, first);
if (rc == SLURM_SUCCESS) {
rc = (persist_conn->callback_proc)(
arg, &msg, &buffer, &uid);
_persist_free_msg_members(persist_conn, &msg);
if (rc != SLURM_SUCCESS &&
rc != ACCOUNTING_FIRST_REG &&
rc != ACCOUNTING_TRES_CHANGE_DB &&
rc != ACCOUNTING_NODES_CHANGE_DB) {
error("Processing last message from "
"connection %d(%s) uid(%d)",
persist_conn->fd,
persist_conn->rem_host, uid);
if (rc == ESLURM_ACCESS_DENIED ||
rc == SLURM_PROTOCOL_VERSION_ERROR)
fini = true;
}
}
first = false;
} else {
buffer = slurm_persist_make_rc_msg(
persist_conn, SLURM_ERROR, "Bad offset", 0);
fini = true;
}
xfree(msg_char);
if (buffer) {
if (slurm_persist_send_msg(persist_conn, buffer)
!= SLURM_SUCCESS) {
/* This is only an issue on persistent
* connections, and really isn't that big of a
* deal as the slurmctld will just send the
* message again. */
if (persist_conn->rem_port)
debug("Problem sending response to "
"connection %d(%s) uid(%d)",
persist_conn->fd,
persist_conn->rem_host, uid);
fini = true;
}
free_buf(buffer);
}
}
debug2("Closed connection %d uid(%d)", persist_conn->fd, uid);
return rc;
}
static void *_service_connection(void *arg)
{
persist_service_conn_t *service_conn = arg;
xassert(service_conn);
xassert(service_conn->conn);
#if HAVE_SYS_PRCTL_H
char *name = xstrdup_printf("p-%s",
service_conn->conn->cluster_name);
if (prctl(PR_SET_NAME, name, NULL, NULL, NULL) < 0) {
error("%s: cannot set my name to %s %m", __func__, name);
}
xfree(name);
#endif
service_conn->thread_id = pthread_self();
_process_service_connection(service_conn->conn, service_conn->arg);
if (service_conn->conn->callback_fini)
(service_conn->conn->callback_fini)(service_conn->arg);
else
debug("Persist connection from cluster %s has disconnected",
service_conn->conn->cluster_name);
/* service_conn is freed inside here */
slurm_persist_conn_free_thread_loc(service_conn->thread_loc);
// xfree(service_conn);
/* In order to avoid zombie threads, detach the thread now before
* exiting. slurm_persist_conn_recv_server_fini() will not try to join
* the thread because slurm_persist_conn_free_thread_loc() will have
* free'd the connection. If their are threads at shutdown, the join
* will happen before the detach so recv_fini() will wait until the
* thread is done.
*
* pthread_join man page:
* Failure to join with a thread that is joinable (i.e., one that is not
* detached), produces a "zombie thread". Avoid doing this, since each
* zombie thread consumes some system resources, and when enough zombie
* threads have accumulated, it will no longer be possible to create new
* threads (or processes).
*/
pthread_detach(pthread_self());
return NULL;
}
extern void slurm_persist_conn_recv_server_init(void)
{
int sigarray[] = {SIGUSR1, 0};
shutdown_time = 0;
(void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
(void) pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
/* Prepare to catch SIGUSR1 to interrupt accept().
* This signal is generated by the slurmdbd signal
* handler thread upon receipt of SIGABRT, SIGINT,
* or SIGTERM. That thread does all processing of
* all signals. */
xsignal(SIGUSR1, _sig_handler);
xsignal_unblock(sigarray);
}
extern void slurm_persist_conn_recv_server_fini(void)
{
int i;
shutdown_time = time(NULL);
slurm_mutex_lock(&thread_count_lock);
for (i=0; ithread_id)
pthread_kill(persist_service_conn[i]->thread_id,
SIGUSR1);
}
/* It is faster to signal then wait since the threads would end serially
* instead of parallel if you did it all in one loop.
*/
for (i=0; ithread_id) {
pthread_t thread_id =
persist_service_conn[i]->thread_id;
/* Let go of lock in case the persistent connection
* thread is cleaning itself up.
* slurm_persist_conn_free_thread_loc() may be trying to
* remove itself but could be waiting on the
* thread_count mutex which this has locked. */
slurm_mutex_unlock(&thread_count_lock);
pthread_join(thread_id, NULL);
slurm_mutex_lock(&thread_count_lock);
}
_destroy_persist_service(persist_service_conn[i]);
persist_service_conn[i] = NULL;
}
slurm_mutex_unlock(&thread_count_lock);
}
extern void slurm_persist_conn_recv_thread_init(slurm_persist_conn_t *persist_conn,
int thread_loc, void *arg)
{
persist_service_conn_t *service_conn;
if (thread_loc < 0)
thread_loc = slurm_persist_conn_wait_for_thread_loc();
if (thread_loc < 0)
return;
service_conn = xmalloc(sizeof(persist_service_conn_t));
slurm_mutex_lock(&thread_count_lock);
persist_service_conn[thread_loc] = service_conn;
slurm_mutex_unlock(&thread_count_lock);
service_conn->arg = arg;
service_conn->conn = persist_conn;
service_conn->thread_loc = thread_loc;
persist_conn->timeout = 0; /* If this isn't zero we won't wait forever
like we want to.
*/
//_service_connection(service_conn);
slurm_thread_create(&persist_service_conn[thread_loc]->thread_id,
_service_connection, service_conn);
}
/* Increment thread_count and don't return until its value is no larger
* than MAX_THREAD_COUNT,
* RET index of free index in persist_service_conn or -1 to exit */
extern int slurm_persist_conn_wait_for_thread_loc(void)
{
bool print_it = true;
int i, rc = -1;
slurm_mutex_lock(&thread_count_lock);
while (1) {
if (shutdown_time)
break;
if (thread_count < MAX_THREAD_COUNT) {
thread_count++;
for (i=0; i 2) {
verbose("thread_count over "
"limit (%d), waiting",
thread_count);
last_print_time = now;
}
print_it = false;
}
slurm_cond_wait(&thread_count_cond, &thread_count_lock);
}
}
slurm_mutex_unlock(&thread_count_lock);
return rc;
}
/* my_tid IN - Thread ID of spawned thread, 0 if no thread spawned */
extern void slurm_persist_conn_free_thread_loc(int thread_loc)
{
/* we will handle this in the fini */
if (shutdown_time)
return;
slurm_mutex_lock(&thread_count_lock);
if (thread_count > 0)
thread_count--;
else
error("thread_count underflow");
_destroy_persist_service(persist_service_conn[thread_loc]);
persist_service_conn[thread_loc] = NULL;
slurm_cond_broadcast(&thread_count_cond);
slurm_mutex_unlock(&thread_count_lock);
}
extern int slurm_persist_conn_open_without_init(
slurm_persist_conn_t *persist_conn)
{
slurm_addr_t addr;
xassert(persist_conn);
xassert(persist_conn->rem_host);
xassert(persist_conn->rem_port);
xassert(persist_conn->cluster_name);
if (persist_conn->fd > 0)
_close_fd(&persist_conn->fd);
else
persist_conn->fd = -1;
if (!persist_conn->inited)
persist_conn->inited = true;
if (!persist_conn->version) {
/* Set to MIN_PROTOCOL so that a higher version controller can
* talk to a lower protocol version controller. When talking to
* the DBD, the protocol version should be set to the current
* protocol version prior to calling this. */
persist_conn->version = SLURM_MIN_PROTOCOL_VERSION;
}
if (persist_conn->timeout < 0)
persist_conn->timeout = slurm_get_msg_timeout() * 1000;
slurm_set_addr_char(&addr, persist_conn->rem_port,
persist_conn->rem_host);
if ((persist_conn->fd = slurm_open_msg_conn(&addr)) < 0) {
if (_comm_fail_log(persist_conn)) {
char *s = xstrdup_printf("%s: failed to open persistent connection to %s:%d: %m",
__func__,
persist_conn->rem_host,
persist_conn->rem_port);
if (persist_conn->flags & PERSIST_FLAG_SUPPRESS_ERR)
debug2("%s", s);
else
error("%s", s);
xfree(s);
}
return SLURM_ERROR;
}
fd_set_nonblocking(persist_conn->fd);
fd_set_close_on_exec(persist_conn->fd);
return SLURM_SUCCESS;
}
/* Open a persistent socket connection
* IN/OUT - persistent connection needing rem_host and rem_port filled in.
* Returned completely filled in.
* Returns SLURM_SUCCESS on success or SLURM_ERROR on failure */
extern int slurm_persist_conn_open(slurm_persist_conn_t *persist_conn)
{
int rc = SLURM_ERROR;
slurm_msg_t req_msg;
persist_init_req_msg_t req;
persist_rc_msg_t *resp = NULL;
if (slurm_persist_conn_open_without_init(persist_conn) != SLURM_SUCCESS)
return rc;
slurm_msg_t_init(&req_msg);
/* Always send the lowest protocol since we don't know what version the
* other side is running yet.
*/
req_msg.protocol_version = persist_conn->version;
req_msg.msg_type = REQUEST_PERSIST_INIT;
req_msg.flags |= SLURM_GLOBAL_AUTH_KEY;
if (persist_conn->flags & PERSIST_FLAG_DBD)
req_msg.flags |= SLURMDBD_CONNECTION;
memset(&req, 0, sizeof(persist_init_req_msg_t));
req.cluster_name = persist_conn->cluster_name;
req.persist_type = persist_conn->persist_type;
req.port = persist_conn->my_port;
req.version = SLURM_PROTOCOL_VERSION;
req_msg.data = &req;
if (slurm_send_node_msg(persist_conn->fd, &req_msg) < 0) {
error("%s: failed to send persistent connection init message to %s:%d",
__func__, persist_conn->rem_host, persist_conn->rem_port);
_close_fd(&persist_conn->fd);
} else {
Buf buffer = slurm_persist_recv_msg(persist_conn);
persist_msg_t msg;
slurm_persist_conn_t persist_conn_tmp;
if (!buffer) {
if (_comm_fail_log(persist_conn)) {
error("%s: No response to persist_init",
__func__);
}
_close_fd(&persist_conn->fd);
goto end_it;
}
memset(&msg, 0, sizeof(persist_msg_t));
memcpy(&persist_conn_tmp, persist_conn,
sizeof(slurm_persist_conn_t));
/* The first unpack is done the same way for dbd or normal
* communication . */
persist_conn_tmp.flags &= (~PERSIST_FLAG_DBD);
rc = slurm_persist_msg_unpack(&persist_conn_tmp, &msg, buffer);
free_buf(buffer);
resp = (persist_rc_msg_t *)msg.data;
if (resp && (rc == SLURM_SUCCESS)) {
rc = resp->rc;
persist_conn->version = resp->ret_info;
persist_conn->flags |= resp->flags;
}
if (rc != SLURM_SUCCESS) {
if (resp) {
error("%s: Something happened with the receiving/processing of the persistent connection init message to %s:%d: %s",
__func__, persist_conn->rem_host,
persist_conn->rem_port, resp->comment);
} else {
error("%s: Failed to unpack persistent connection init resp message from %s:%d",
__func__,
persist_conn->rem_host,
persist_conn->rem_port);
}
_close_fd(&persist_conn->fd);
}
}
end_it:
slurm_persist_free_rc_msg(resp);
return rc;
}
extern void slurm_persist_conn_close(slurm_persist_conn_t *persist_conn)
{
if (!persist_conn)
return;
_close_fd(&persist_conn->fd);
}
extern int slurm_persist_conn_reopen(slurm_persist_conn_t *persist_conn,
bool with_init)
{
slurm_persist_conn_close(persist_conn);
if (with_init)
return slurm_persist_conn_open(persist_conn);
else
return slurm_persist_conn_open_without_init(persist_conn);
}
/* Close the persistent connection */
extern void slurm_persist_conn_members_destroy(
slurm_persist_conn_t *persist_conn)
{
if (!persist_conn)
return;
persist_conn->inited = false;
slurm_persist_conn_close(persist_conn);
if (persist_conn->auth_cred) {
g_slurm_auth_destroy(persist_conn->auth_cred);
persist_conn->auth_cred = NULL;
}
xfree(persist_conn->cluster_name);
xfree(persist_conn->rem_host);
}
/* Close the persistent connection */
extern void slurm_persist_conn_destroy(slurm_persist_conn_t *persist_conn)
{
if (!persist_conn)
return;
slurm_persist_conn_members_destroy(persist_conn);
xfree(persist_conn);
}
extern int slurm_persist_conn_process_msg(slurm_persist_conn_t *persist_conn,
persist_msg_t *persist_msg,
char *msg_char, uint32_t msg_size,
Buf *out_buffer, bool first)
{
int rc;
Buf recv_buffer = NULL;
char *comment = NULL;
/* puts msg_char into buffer struct */
recv_buffer = create_buf(msg_char, msg_size);
memset(persist_msg, 0, sizeof(persist_msg_t));
rc = slurm_persist_msg_unpack(persist_conn, persist_msg, recv_buffer);
xfer_buf_data(recv_buffer); /* delete in_buffer struct
* without xfree of msg_char
* (done later in this
* function). */
if (rc != SLURM_SUCCESS) {
comment = xstrdup_printf("Failed to unpack %s message",
slurmdbd_msg_type_2_str(
persist_msg->msg_type, true));
error("CONN:%u %s", persist_conn->fd, comment);
*out_buffer = slurm_persist_make_rc_msg(
persist_conn, rc, comment, persist_msg->msg_type);
xfree(comment);
} else if (first &&
(persist_msg->msg_type != REQUEST_PERSIST_INIT)) {
comment = "Initial RPC not REQUEST_PERSIST_INIT";
error("CONN:%u %s type (%d)",
persist_conn->fd, comment, persist_msg->msg_type);
rc = EINVAL;
*out_buffer = slurm_persist_make_rc_msg(
persist_conn, rc, comment,
REQUEST_PERSIST_INIT);
} else if (!first &&
(persist_msg->msg_type == REQUEST_PERSIST_INIT)) {
comment = "REQUEST_PERSIST_INIT sent after connection established";
error("CONN:%u %s", persist_conn->fd, comment);
rc = EINVAL;
*out_buffer = slurm_persist_make_rc_msg(
persist_conn, rc, comment, REQUEST_PERSIST_INIT);
}
return rc;
}
/* Wait until a file is writeable,
* RET 1 if file can be written now,
* 0 if can not be written to within 5 seconds
* -1 if file has been closed POLLHUP
*/
extern int slurm_persist_conn_writeable(slurm_persist_conn_t *persist_conn)
{
struct pollfd ufds;
int write_timeout = 5000;
int rc, time_left;
struct timeval tstart;
char temp[2];
xassert(persist_conn->shutdown);
if (persist_conn->fd < 0)
return -1;
ufds.fd = persist_conn->fd;
ufds.events = POLLOUT;
gettimeofday(&tstart, NULL);
while ((*persist_conn->shutdown) == 0) {
time_left = write_timeout - _tot_wait(&tstart);
rc = poll(&ufds, 1, time_left);
if (rc == -1) {
if ((errno == EINTR) || (errno == EAGAIN))
continue;
error("%s: poll error: %m", __func__);
return -1;
}
if (rc == 0)
return 0;
/*
* Check here to make sure the socket really is there.
* If not then exit out and notify the conn. This
* is here since a write doesn't always tell you the
* socket is gone, but getting 0 back from a
* nonblocking read means just that.
*/
if (ufds.revents & POLLHUP ||
(recv(persist_conn->fd, &temp, 1, 0) == 0)) {
debug2("%s: persistent connection %d is closed for writes",
__func__, persist_conn->fd);
if (persist_conn->trigger_callbacks.dbd_fail)
(persist_conn->trigger_callbacks.dbd_fail)();
return -1;
}
if (ufds.revents & POLLNVAL) {
error("%s: persistent connection %d is invalid",
__func__, persist_conn->fd);
return 0;
}
if (ufds.revents & POLLERR) {
if (_comm_fail_log(persist_conn)) {
if (fd_get_socket_error(persist_conn->fd, &errno))
error("%s: unable to get error for persistent connection %d: %m",
__func__, persist_conn->fd);
else
error("%s: persistent connection %d experienced an error: %m",
__func__, persist_conn->fd);
}
if (persist_conn->trigger_callbacks.dbd_fail)
(persist_conn->trigger_callbacks.dbd_fail)();
return 0;
}
if ((ufds.revents & POLLOUT) == 0) {
error("%s: persistent connection %d events %d",
__func__, persist_conn->fd, ufds.revents);
return 0;
}
/* revents == POLLOUT */
errno = 0;
return 1;
}
return 0;
}
extern int slurm_persist_send_msg(
slurm_persist_conn_t *persist_conn, Buf buffer)
{
uint32_t msg_size, nw_size;
char *msg;
ssize_t msg_wrote;
int rc, retry_cnt = 0;
xassert(persist_conn);
if (persist_conn->fd < 0)
return EAGAIN;
if (!buffer)
return SLURM_ERROR;
rc = slurm_persist_conn_writeable(persist_conn);
if (rc == -1) {
re_open:
/* if errno is ACCESS_DENIED do not try to reopen to
connection just return that */
if (errno == ESLURM_ACCESS_DENIED)
return ESLURM_ACCESS_DENIED;
if (retry_cnt++ > 3)
return SLURM_COMMUNICATIONS_SEND_ERROR;
if (persist_conn->flags & PERSIST_FLAG_RECONNECT) {
slurm_persist_conn_reopen(persist_conn, true);
rc = slurm_persist_conn_writeable(persist_conn);
} else
return SLURM_ERROR;
}
if (rc < 1)
return EAGAIN;
msg_size = get_buf_offset(buffer);
nw_size = htonl(msg_size);
msg_wrote = write(persist_conn->fd, &nw_size, sizeof(nw_size));
if (msg_wrote != sizeof(nw_size))
return EAGAIN;
msg = get_buf_data(buffer);
while (msg_size > 0) {
rc = slurm_persist_conn_writeable(persist_conn);
if (rc == -1)
goto re_open;
if (rc < 1)
return EAGAIN;
msg_wrote = write(persist_conn->fd, msg, msg_size);
if (msg_wrote <= 0)
return EAGAIN;
msg += msg_wrote;
msg_size -= msg_wrote;
}
return SLURM_SUCCESS;
}
extern Buf slurm_persist_recv_msg(slurm_persist_conn_t *persist_conn)
{
uint32_t msg_size, nw_size;
char *msg;
ssize_t msg_read, offset;
Buf buffer;
xassert(persist_conn);
if (persist_conn->fd < 0)
return NULL;
if (!_conn_readable(persist_conn))
goto endit;
msg_read = read(persist_conn->fd, &nw_size, sizeof(nw_size));
if (msg_read != sizeof(nw_size))
goto endit;
msg_size = ntohl(nw_size);
/* We don't error check for an upper limit here
* since size could possibly be massive */
if (msg_size < 2) {
error("Persistent Conn: Invalid msg_size (%u)", msg_size);
goto endit;
}
msg = xmalloc(msg_size);
offset = 0;
while (msg_size > offset) {
if (!_conn_readable(persist_conn))
break; /* problem with this socket */
msg_read = read(persist_conn->fd, (msg + offset),
(msg_size - offset));
if (msg_read <= 0) {
error("Persistent Conn: read: %m");
break;
}
offset += msg_read;
}
if (msg_size != offset) {
if (!(*persist_conn->shutdown)) {
error("Persistent Conn: only read %zd of %d bytes",
offset, msg_size);
} /* else in shutdown mode */
xfree(msg);
goto endit;
}
buffer = create_buf(msg, msg_size);
return buffer;
endit:
/* Close it since we abandoned it. If the connection does still exist
* on the other end we can't rely on it after this point since we didn't
* listen long enough for this response.
*/
if (!(*persist_conn->shutdown) &&
persist_conn->flags & PERSIST_FLAG_RECONNECT)
slurm_persist_conn_reopen(persist_conn, true);
return NULL;
}
extern Buf slurm_persist_msg_pack(slurm_persist_conn_t *persist_conn,
persist_msg_t *req_msg)
{
Buf buffer;
xassert(persist_conn);
if (persist_conn->flags & PERSIST_FLAG_DBD)
buffer = pack_slurmdbd_msg(req_msg, persist_conn->version);
else {
slurm_msg_t msg;
slurm_msg_t_init(&msg);
msg.data = req_msg->data;
msg.data_size = req_msg->data_size;
msg.msg_type = req_msg->msg_type;
msg.protocol_version = persist_conn->version;
buffer = init_buf(BUF_SIZE);
pack16(req_msg->msg_type, buffer);
if (pack_msg(&msg, buffer) != SLURM_SUCCESS) {
free_buf(buffer);
return NULL;
}
}
return buffer;
}
extern int slurm_persist_msg_unpack(slurm_persist_conn_t *persist_conn,
persist_msg_t *resp_msg, Buf buffer)
{
int rc;
xassert(persist_conn);
xassert(resp_msg);
if (persist_conn->flags & PERSIST_FLAG_DBD) {
rc = unpack_slurmdbd_msg(resp_msg,
persist_conn->version,
buffer);
} else {
slurm_msg_t msg;
slurm_msg_t_init(&msg);
msg.protocol_version = persist_conn->version;
safe_unpack16(&msg.msg_type, buffer);
rc = unpack_msg(&msg, buffer);
resp_msg->msg_type = msg.msg_type;
resp_msg->data = msg.data;
}
/* Here we transfer the auth_cred to the persist_conn just in case in the
* future we need to use it in some way to verify things for messages
* that don't have on that will follow on the connection.
*/
if (resp_msg->msg_type == REQUEST_PERSIST_INIT) {
slurm_msg_t *msg = resp_msg->data;
if (persist_conn->auth_cred)
g_slurm_auth_destroy(persist_conn->auth_cred);
persist_conn->auth_cred = msg->auth_cred;
msg->auth_cred = NULL;
}
return rc;
unpack_error:
return SLURM_ERROR;
}
extern void slurm_persist_pack_init_req_msg(
persist_init_req_msg_t *msg, Buf buffer)
{
/* always send version field first for backwards compatibility */
pack16(msg->version, buffer);
if (msg->version >= SLURM_MIN_PROTOCOL_VERSION) {
packstr(msg->cluster_name, buffer);
pack16(msg->persist_type, buffer);
pack16(msg->port, buffer);
} else {
error("%s: invalid protocol version %u",
__func__, msg->version);
}
}
extern int slurm_persist_unpack_init_req_msg(
persist_init_req_msg_t **msg, Buf buffer)
{
uint32_t tmp32;
persist_init_req_msg_t *msg_ptr =
xmalloc(sizeof(persist_init_req_msg_t));
*msg = msg_ptr;
safe_unpack16(&msg_ptr->version, buffer);
if (msg_ptr->version >= SLURM_MIN_PROTOCOL_VERSION) {
safe_unpackstr_xmalloc(&msg_ptr->cluster_name, &tmp32, buffer);
safe_unpack16(&msg_ptr->persist_type, buffer);
safe_unpack16(&msg_ptr->port, buffer);
} else {
error("%s: invalid protocol_version %u",
__func__, msg_ptr->version);
goto unpack_error;
}
return SLURM_SUCCESS;
unpack_error:
slurm_persist_free_init_req_msg(msg_ptr);
*msg = NULL;
return SLURM_ERROR;
}
extern void slurm_persist_free_init_req_msg(persist_init_req_msg_t *msg)
{
if (msg) {
xfree(msg->cluster_name);
xfree(msg);
}
}
extern void slurm_persist_pack_rc_msg(
persist_rc_msg_t *msg, Buf buffer, uint16_t protocol_version)
{
if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
packstr(msg->comment, buffer);
pack16(msg->flags, buffer);
pack32(msg->rc, buffer);
pack16(msg->ret_info, buffer);
} else {
error("%s: invalid protocol version %u",
__func__, protocol_version);
}
}
extern int slurm_persist_unpack_rc_msg(
persist_rc_msg_t **msg, Buf buffer, uint16_t protocol_version)
{
uint32_t uint32_tmp;
persist_rc_msg_t *msg_ptr = xmalloc(sizeof(persist_rc_msg_t));
*msg = msg_ptr;
if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
safe_unpackstr_xmalloc(&msg_ptr->comment, &uint32_tmp, buffer);
safe_unpack16(&msg_ptr->flags, buffer);
safe_unpack32(&msg_ptr->rc, buffer);
safe_unpack16(&msg_ptr->ret_info, buffer);
} else {
error("%s: invalid protocol_version %u",
__func__, protocol_version);
goto unpack_error;
}
return SLURM_SUCCESS;
unpack_error:
slurm_persist_free_rc_msg(msg_ptr);
*msg = NULL;
return SLURM_ERROR;
}
extern void slurm_persist_free_rc_msg(persist_rc_msg_t *msg)
{
if (msg) {
xfree(msg->comment);
xfree(msg);
}
}
extern Buf slurm_persist_make_rc_msg(slurm_persist_conn_t *persist_conn,
uint32_t rc, char *comment,
uint16_t ret_info)
{
persist_rc_msg_t msg;
persist_msg_t resp;
memset(&msg, 0, sizeof(persist_rc_msg_t));
memset(&resp, 0, sizeof(persist_msg_t));
msg.rc = rc;
msg.comment = comment;
msg.ret_info = ret_info;
resp.msg_type = PERSIST_RC;
resp.data = &msg;
return slurm_persist_msg_pack(persist_conn, &resp);
}
extern Buf slurm_persist_make_rc_msg_flags(slurm_persist_conn_t *persist_conn,
uint32_t rc, char *comment,
uint16_t flags,
uint16_t ret_info)
{
persist_rc_msg_t msg;
persist_msg_t resp;
memset(&msg, 0, sizeof(persist_rc_msg_t));
memset(&resp, 0, sizeof(persist_msg_t));
msg.rc = rc;
msg.flags = flags;
msg.comment = comment;
msg.ret_info = ret_info;
resp.msg_type = PERSIST_RC;
resp.data = &msg;
return slurm_persist_msg_pack(persist_conn, &resp);
}