summaryrefslogtreecommitdiff
path: root/server/server.c
diff options
context:
space:
mode:
authorGravatar John Kohl <jtkohl@mit.edu>1987-07-14 13:19:21 +0000
committerGravatar John Kohl <jtkohl@mit.edu>1987-07-14 13:19:21 +0000
commit01672618db420942aafb3e138008722a9ef4b92b (patch)
tree335c30a95a651b7df527a3a0b4388153ef3ef12f /server/server.c
parentc2ed740e88eac8a1991ce98878fd66eb9376bf56 (diff)
braindump and retransmission support
Diffstat (limited to 'server/server.c')
-rw-r--r--server/server.c285
1 files changed, 245 insertions, 40 deletions
diff --git a/server/server.c b/server/server.c
index 594b195..da25d0f 100644
--- a/server/server.c
+++ b/server/server.c
@@ -56,30 +56,41 @@ static char rcsid_server_s_c[] = "$Header$";
* int auth;
* struct sockaddr_in *who;
*
+ * ZServerDesc_t *server_which_server(who)
+ * struct sockaddr_in *who;
+ *
*/
-static void server_hello(), server_flush(), admin_handle(), setup_server();
-static void hello_respond(), hello_input(), send_msg();
+static void server_hello(), server_flush(), admin_dispatch(), setup_server();
+static void hello_respond(), srv_responded(), send_msg(), srv_alive();
+static void srv_nack_cancel(), srv_rexmit(), srv_nack_release();
static Code_t server_register();
static struct in_addr *get_server_addrs();
-static ZServerDesc_t *which_server();
+ZNotAcked_t *srv_nacklist; /* not acked list for server-server
+ packets */
ZServerDesc_t *otherservers; /* points to an array of the known
servers */
int nservers; /* number of other servers */
int me_server_idx; /* # of my entry in the array */
+#define ADJUST (1) /* adjust timeout on hello input */
+#define DONT_ADJUST (0) /* don't adjust timeout */
+
/* parameters controlling the transitions of the FSM's--patchable with adb */
int timo_up = TIMO_UP;
int timo_tardy = TIMO_TARDY;
int timo_dead = TIMO_DEAD;
+int srv_rexmit_secs = REXMIT_SECS;
+
/*
* Initialize the array of servers. The `limbo' server goes in the first
* slot (otherservers[0]).
* Contact Hesiod to find all the other servers, allocate space for the
* structure, initialize them all to SERV_DEAD with expired timeouts.
+ * Set up a list header for server_forward retransmits.
*/
void
@@ -123,14 +134,12 @@ server_init()
xfree(hes_addrs);
if (me_server_idx == -1) {
- ZServerDesc_t *temp;
syslog(LOG_WARNING, "I'm a renegade server!");
- temp = (ZServerDesc_t *)realloc((caddr_t) otherservers, (unsigned) (++nservers * sizeof(ZServerDesc_t)));
- if (!temp) {
+ otherservers = (ZServerDesc_t *)realloc((caddr_t) otherservers, (unsigned) (++nservers * sizeof(ZServerDesc_t)));
+ if (!otherservers) {
syslog(LOG_CRIT, "renegade realloc");
abort();
}
- otherservers = temp;
setup_server(&otherservers[nservers - 1], &my_addr);
/* we are up. */
otherservers[nservers - 1].zs_state = SERV_UP;
@@ -149,8 +158,26 @@ server_init()
}
me_server_idx = nservers - 1;
}
+ if (!(srv_nacklist = (ZNotAcked_t *) xmalloc(sizeof(ZNotAcked_t)))) {
+ /* unrecoverable */
+ syslog(LOG_CRIT, "srv_nacklist malloc");
+ abort();
+ }
+ bzero((caddr_t) srv_nacklist, sizeof(ZNotAcked_t));
+ srv_nacklist->q_forw = srv_nacklist->q_back = srv_nacklist;
}
+#ifdef DEBUG
+/* note: these must match the order given in zephyr.h */
+static char *
+srv_states[] = {
+ "SERV_UP",
+ "SERV_TARDY",
+ "SERV_DEAD",
+ "SERV_STARTING"
+};
+#endif DEBUG
+
/*
* A server timout has expired. If enough hello's have been unanswered,
* change state and act accordingly. Send a "hello" and reset the timer,
@@ -198,8 +225,8 @@ ZServerDesc_t *which;
}
/* now he's either TARDY, STARTING, or DEAD
We send a "hello," which increments the counter */
- zdbug((LOG_DEBUG, "srv %s is %d",inet_ntoa(which->zs_addr.sin_addr),
- (int) which->zs_state));
+ zdbug((LOG_DEBUG, "srv %s is %s",inet_ntoa(which->zs_addr.sin_addr),
+ srv_states[(int) which->zs_state]));
server_hello(which, auth);
/* reschedule the timer */
which->zs_timer = timer_set_rel(which->zs_timeout, server_timo,
@@ -222,17 +249,24 @@ struct sockaddr_in *who;
zdbug((LOG_DEBUG, "server_dispatch"));
+ if (notice->z_kind == SERVACK) {
+ srv_nack_cancel(notice, who);
+ srv_responded(notice, who);
+ return;
+ }
/* XXX set up a who for the real origin */
bzero((caddr_t) &newwho, sizeof(newwho));
newwho.sin_family = AF_INET;
newwho.sin_addr.s_addr = notice->z_sender_addr.s_addr;
newwho.sin_port = notice->z_port;
- server = which_server(who);
+ server = server_which_server(who);
- if (class_is_admin(notice))
- admin_handle(notice, auth, who, server);
- else if (class_is_control(notice))
+ if (class_is_admin(notice)) {
+ /* admins don't get acked, else we get a packet loop */
+ admin_dispatch(notice, auth, who, server);
+ return;
+ } else if (class_is_control(notice))
control_dispatch(notice, auth, &newwho, server);
else if (class_is_ulogin(notice))
ulogin_dispatch(notice, auth, &newwho, server);
@@ -241,6 +275,9 @@ struct sockaddr_in *who;
else
/* shouldn't come from another server */
syslog(LOG_WARNING, "srv_disp: pkt cls %s",notice->z_class);
+
+ /* acknowledge it */
+ ack(notice, who);
return;
}
@@ -293,8 +330,9 @@ struct sockaddr_in *who;
otherservers[nservers].zs_state = SERV_STARTING;
otherservers[nservers].zs_timeout = timo_tardy;
nservers++;
- get_brain_dump(who);
-
+ zdbug((LOG_DEBUG, "srv %s is %s",
+ inet_ntoa(otherservers[nservers].zs_addr.sin_addr),
+ srv_states[(int) otherservers[nservers].zs_state]));
return(0);
}
@@ -329,6 +367,7 @@ register ZServerDesc_t *which;
{
register ZHostList_t *hst;
+ zdbug((LOG_DEBUG, "server_flush"));
if (!which->zs_hosts) /* no data to flush */
return;
@@ -338,7 +377,7 @@ register ZServerDesc_t *which;
/* for each host, flush all data */
hostm_flush(hst, which);
}
-
+ srv_nack_release(which);
}
/*
@@ -362,26 +401,21 @@ int auth;
/*ARGSUSED*/
static void
-admin_handle(notice, auth, who, server)
+admin_dispatch(notice, auth, who, server)
ZNotice_t *notice;
int auth;
struct sockaddr_in *who;
ZServerDesc_t *server;
{
register char *opcode = notice->z_opcode;
- register ZHostList_t *host, *hishost;;
+ register ZHostList_t *host, *hishost;
zdbug((LOG_DEBUG, "ADMIN received"));
if (!strcmp(opcode, ADMIN_HELLO)) {
- hello_respond(who);
-#ifdef notdef
- if ((him = which_server(who))) {
- /* XXX take a hint here that the other guy is up */
- }
-#endif notdef
+ hello_respond(who, ADJUST);
} else if (!strcmp(opcode, ADMIN_IMHERE)) {
- hello_input(notice, auth, who);
+ srv_responded(notice, who);
} else if (!strcmp(opcode, ADMIN_SHUTDOWN)) {
zdbug((LOG_DEBUG, "server shutdown"));
/* we need to transfer all of its hosts to limbo */
@@ -393,8 +427,16 @@ ZServerDesc_t *server;
/* hostm transfer remque's the host and
attaches it to the new server */
hostm_transfer(host, limbo_server);
-
+ server->zs_state = SERV_DEAD;
+ server->zs_timeout = timo_dead;
+ /* don't worry about the timer, it will
+ be set appropriately on the next send */
+ zdbug((LOG_DEBUG, "srv %s is %s",
+ inet_ntoa(server->zs_addr.sin_addr),
+ srv_states[(int) server->zs_state]));
}
+ } else if (!strcmp(opcode, ADMIN_BDUMP)) {
+ get_brain_dump(notice, auth, who, server);
} else
syslog(LOG_WARNING, "ADMIN unknown opcode %s",opcode);
return;
@@ -424,7 +466,7 @@ ZServerDesc_t *server;
syslog(LOG_INFO, "new server %s, %d",
inet_ntoa(who->sin_addr),
ntohs(who->sin_port));
- hello_respond(who);
+ hello_respond(who, DONT_ADJUST);
}
return;
}
@@ -508,25 +550,52 @@ struct in_addr *addr;
*/
static void
-hello_respond(who)
+hello_respond(who, adj)
struct sockaddr_in *who;
+int adj;
{
+ register ZServerDesc_t *which;
send_msg(who, ADMIN_IMHERE, 0);
+ if (adj != ADJUST)
+ return;
+
+ /* If we think he's down, schedule an immediate HELLO. */
+
+ if (!(which = server_which_server(who)))
+ return;
+
+ switch (which->zs_state) {
+ case SERV_DEAD:
+ /* he said hello, we thought he was dead.
+ reschedule his hello for now. */
+ timer_reset(which->zs_timer);
+ which->zs_timer = timer_set_rel(0L, server_timo,
+ (caddr_t) which);
+ break;
+ case SERV_STARTING:
+ case SERV_TARDY:
+ case SERV_UP:
+ default:
+ break;
+ }
return;
-}
+}
/*
* return the server descriptor for server at who
*/
-static ZServerDesc_t *
-which_server(who)
+ZServerDesc_t *
+server_which_server(who)
struct sockaddr_in *who;
{
register ZServerDesc_t *server;
register int i;
+ if (who->sin_port != sock_sin.sin_port)
+ return(NULLZSDT);
+
/* don't check limbo */
for (server = &otherservers[1], i = 1; i < nservers; i++, server++) {
if (server->zs_addr.sin_addr.s_addr == who->sin_addr.s_addr)
@@ -536,16 +605,16 @@ struct sockaddr_in *who;
}
/*
- * We received a response to a hello packet. Adjust server state
+ * We received a response to a hello packet or an ack. Adjust server state
* appropriately.
*/
static void
-hello_input(notice, auth, who)
+srv_responded(notice, who)
ZNotice_t *notice;
-int auth;
struct sockaddr_in *who;
{
- register ZServerDesc_t *which = which_server(who);
+ register ZServerDesc_t *which = server_which_server(who);
+ int retval;
if (!which) {
syslog(LOG_ERR, "hello input from non-server?!");
@@ -559,9 +628,12 @@ struct sockaddr_in *who;
which->zs_state = SERV_STARTING;
case SERV_STARTING:
- /* XXX here we negotiate and set up a braindump */
-
- /* fall through */
+ /* here we negotiate and set up a braindump */
+ if (!bdump_socket) {
+ /* XXX offer it to the other server */
+ offer_brain_dump(who);
+ }
+ break;
case SERV_TARDY:
which->zs_state = SERV_UP;
case SERV_UP:
@@ -573,6 +645,8 @@ struct sockaddr_in *who;
(caddr_t) which);
break;
}
+ zdbug((LOG_DEBUG, "srv %s is %s",inet_ntoa(which->zs_addr.sin_addr),
+ srv_states[(int) which->zs_state]));
return;
}
@@ -650,9 +724,10 @@ int auth;
struct sockaddr_in *who;
{
register int i;
- ZPacket_t pack;
+ caddr_t pack;
int packlen;
Code_t retval;
+ register ZNotAcked_t *nacked;
zdbug((LOG_DEBUG, "srv_forw"));
@@ -660,22 +735,152 @@ struct sockaddr_in *who;
for (i = 1; i < nservers; i++) {
if (i == me_server_idx) /* don't xmit to myself */
continue;
+ if (otherservers[i].zs_state == SERV_DEAD)
+ continue;
+
+ if (!(pack = (caddr_t) xmalloc(sizeof(ZPacket_t)))) {
+ syslog(LOG_ERR, "srv_forw malloc");
+ continue; /* DON'T put on nack list */
+ }
- packlen = sizeof(pack);
+ packlen = sizeof(ZPacket_t);
if ((retval = ZFormatRawNotice(notice, pack, packlen, &packlen)) != ZERR_NONE) {
syslog(LOG_WARNING, "srv_fwd format: %s",
error_message(retval));
+ xfree(pack);
continue;
}
if ((retval = ZSetDestAddr(&otherservers[i].zs_addr)) != ZERR_NONE) {
syslog(LOG_WARNING, "srv_fwd set addr: %s",
error_message(retval));
+ xfree(pack);
continue;
}
if ((retval = ZSendPacket(pack, packlen)) != ZERR_NONE) {
syslog(LOG_WARNING, "srv_fwd xmit: %s", error_message(retval));
+ xfree(pack);
continue;
}
+ /* now we've sent it, mark it as not ack'ed */
+
+ if (!(nacked = (ZNotAcked_t *)xmalloc(sizeof(ZNotAcked_t)))) {
+ /* no space: just punt */
+ syslog(LOG_ERR, "srv_forw nack malloc");
+ xfree(pack);
+ continue;
+ }
+
+ nacked->na_rexmits = 0;
+ nacked->na_packet = pack;
+ nacked->na_srv_idx = i;
+ nacked->na_packsz = packlen;
+ nacked->na_uid = notice->z_uid;
+ nacked->q_forw = nacked->q_back = nacked;
+ nacked->na_abstimo = 0;
+
+ /* set a timer to retransmit */
+ nacked->na_timer = timer_set_rel(srv_rexmit_secs,
+ srv_rexmit,
+ (caddr_t) nacked);
+ /* chain in */
+ xinsque(nacked, srv_nacklist);
}
return;
}
+
+/*
+ * a server has acknowledged a message we sent to him; remove it from
+ * server unacked queue
+ */
+
+static void
+srv_nack_cancel(notice, who)
+register ZNotice_t *notice;
+struct sockaddr_in *who;
+{
+ register ZServerDesc_t *which = server_which_server(who);
+ register ZNotAcked_t *nacked;
+
+ if (!which) {
+ syslog(LOG_ERR, "non-server ack?");
+ return;
+ }
+ for (nacked = srv_nacklist->q_forw;
+ nacked != srv_nacklist;
+ nacked = nacked->q_forw)
+ if (&otherservers[nacked->na_srv_idx] == which)
+ if (ZCompareUID((caddr_t) &nacked->na_uid,
+ (caddr_t) &notice->z_uid)) {
+ timer_reset(nacked->na_timer);
+ xfree(nacked->na_packet);
+ xremque(nacked);
+ xfree(nacked);
+ return;
+ }
+ zdbug((LOG_DEBUG, "srv_nack not found"));
+ return;
+}
+
+/*
+ * retransmit a message to another server
+ */
+
+static void
+srv_rexmit(nackpacket)
+register ZNotAcked_t *nackpacket;
+{
+ Code_t retval;
+ /* retransmit the packet */
+
+ zdbug((LOG_DEBUG,"srv_rexmit to %s/%d",
+ inet_ntoa(otherservers[nackpacket->na_srv_idx].zs_addr.sin_addr),
+ ntohs(otherservers[nackpacket->na_srv_idx].zs_addr.sin_port)));
+
+ if ((retval = ZSetDestAddr(&otherservers[nackpacket->na_srv_idx].zs_addr))
+ != ZERR_NONE) {
+ syslog(LOG_WARNING, "srv_rexmit set addr: %s",
+ error_message(retval));
+ goto requeue;
+
+ }
+ if ((retval = ZSendPacket(nackpacket->na_packet,
+ nackpacket->na_packsz)) != ZERR_NONE)
+ syslog(LOG_WARNING, "srv_rexmit xmit: %s", error_message(retval));
+
+requeue:
+ /* reset the timer */
+ nackpacket->na_timer = timer_set_rel(srv_rexmit_secs,
+ srv_rexmit,
+ (caddr_t) nackpacket);
+ return;
+}
+
+/*
+ * Clean up the not-yet-acked queue and release anything destined
+ * to the server.
+ */
+
+static void
+srv_nack_release(server)
+ZServerDesc_t *server;
+{
+ register ZNotAcked_t *nacked, *nack2;
+
+ /* search the not-yet-acked list for anything destined to him, and
+ flush it. */
+ for (nacked = nacklist->q_forw;
+ nacked != nacklist;)
+ if (&otherservers[nacked->na_srv_idx] == server) {
+ /* go back, since remque will change things */
+ nack2 = nacked->q_back;
+ timer_reset(nacked->na_timer);
+ xremque(nacked);
+ xfree(nacked->na_packet);
+ xfree(nacked);
+ /* now that the remque adjusted the linked list,
+ we go forward again */
+ nacked = nack2->q_forw;
+ } else
+ nacked = nacked->q_forw;
+ return;
+}