summaryrefslogtreecommitdiff
path: root/server/server.c
diff options
context:
space:
mode:
authorGravatar John Kohl <jtkohl@mit.edu>1988-02-05 09:36:05 +0000
committerGravatar John Kohl <jtkohl@mit.edu>1988-02-05 09:36:05 +0000
commit84b74a4065424a964a381ca687e30ea02cd847a1 (patch)
treec5688e73fffd2b3bf91610e2992ee97f9359653b /server/server.c
parent494b01f2d5dd9ed087b0bf5657f998cb3224734f (diff)
concurrency support
Diffstat (limited to 'server/server.c')
-rw-r--r--server/server.c339
1 files changed, 264 insertions, 75 deletions
diff --git a/server/server.c b/server/server.c
index d6e1969..5c3a6d9 100644
--- a/server/server.c
+++ b/server/server.c
@@ -63,11 +63,12 @@ static char rcsid_server_s_c[] = "$Header$";
*
*/
-static void server_hello(), server_flush(), admin_dispatch(), setup_server();
+static void server_hello(), server_flush(), setup_server();
static void hello_respond(), srv_responded(), send_msg(), send_msg_list();
static void srv_alive(), srv_nack_cancel(), srv_rexmit(), srv_nack_release();
-static void recover_clt(), kill_clt(), server_lost();
-static void send_stats();
+static void server_lost();
+static void send_stats(), server_queue(), server_forw_reliable();
+static Code_t admin_dispatch(), recover_clt(), kill_clt();
#ifdef notdef
static Code_t server_register();
@@ -135,6 +136,8 @@ server_init()
setup_server(otherservers, &limbo_addr);
timer_reset(otherservers[0].zs_timer);
otherservers[0].zs_timer = (timer) NULL;
+ otherservers[0].zs_update_queue = NULLZSPT;
+ otherservers[0].zs_dumping = 0;
for (serv_addr = server_addrs, i = 1; i < nservers; serv_addr++, i++) {
setup_server(&otherservers[i], serv_addr);
@@ -144,6 +147,8 @@ server_init()
otherservers[i].zs_state = SERV_UP;
timer_reset(otherservers[i].zs_timer);
otherservers[i].zs_timer = (timer) NULL;
+ otherservers[i].zs_update_queue = NULLZSPT;
+ otherservers[i].zs_dumping = 0;
zdbug((LOG_DEBUG,"found myself"));
}
}
@@ -254,7 +259,7 @@ ZServerDesc_t *which;
*/
/*ARGSUSED*/
-void
+Code_t
server_dispatch(notice, auth, who)
ZNotice_t *notice;
int auth;
@@ -262,13 +267,14 @@ struct sockaddr_in *who;
{
ZServerDesc_t *server;
struct sockaddr_in newwho;
+ Code_t status;
zdbug((LOG_DEBUG, "server_dispatch"));
if (notice->z_kind == SERVACK) {
srv_nack_cancel(notice, who);
srv_responded(who);
- return;
+ return(ZERR_NONE);
}
/* XXX set up a who for the real origin */
bzero((caddr_t) &newwho, sizeof(newwho));
@@ -278,23 +284,29 @@ struct sockaddr_in *who;
server = server_which_server(who);
+ /* we can dispatch to routines safely here, since they will
+ return ZSRV_REQUEUE if appropriate. We bounce this back
+ to the caller, and the caller will re-queue the message
+ for us to process later. */
+
if (class_is_admin(notice)) {
/* admins don't get acked, else we get a packet loop */
- admin_dispatch(notice, auth, who, server);
- return;
+ /* will return requeue if bdump request and dumping */
+ return(admin_dispatch(notice, auth, who, server));
} else if (class_is_control(notice))
- control_dispatch(notice, auth, &newwho, server);
+ status = control_dispatch(notice, auth, &newwho, server);
else if (class_is_ulogin(notice))
- ulogin_dispatch(notice, auth, &newwho, server);
+ status = ulogin_dispatch(notice, auth, &newwho, server);
else if (class_is_ulocate(notice))
- ulocate_dispatch(notice, auth, &newwho, server);
- else
+ status = ulocate_dispatch(notice, auth, &newwho, server);
+ else {
/* shouldn't come from another server */
syslog(LOG_WARNING, "srv_disp: pkt cls %s",notice->z_class);
-
- /* acknowledge it */
- ack(notice, who);
- return;
+ status = ZERR_NONE; /* XXX */
+ }
+ if (status != ZSRV_REQUEUE)
+ ack(notice, who); /* acknowledge it if processed */
+ return(status);
}
#ifdef notdef
@@ -346,6 +358,9 @@ struct sockaddr_in *who;
setup_server(&otherservers[nservers], &who->sin_addr);
otherservers[nservers].zs_state = SERV_STARTING;
otherservers[nservers].zs_timeout = timo_tardy;
+ otherservers[nservers].zs_update_queue = NULLZSPT;
+ otherservers[nservers].zs_dumping = 0;
+
nservers++;
zdbug((LOG_DEBUG, "srv %s is %s",
inet_ntoa(otherservers[nservers].zs_addr.sin_addr),
@@ -489,7 +504,7 @@ ZClient_t *client;
* A client has died. remove it
*/
-static void
+static Code_t
kill_clt(notice)
ZNotice_t *notice;
{
@@ -499,42 +514,54 @@ ZNotice_t *notice;
zdbug((LOG_DEBUG, "kill_clt"));
if (extract_addr(notice, &who) != ZERR_NONE)
- return;
+ return(ZERR_NONE); /* XXX */
if (!(host = hostm_find_host(&who.sin_addr))) {
syslog(LOG_WARNING, "no host kill_clt");
- return;
+ return(ZERR_NONE); /* XXX */
}
+ if (host->zh_locked)
+ return(ZSRV_REQUEUE);
if (!(client = client_which_client(&who, notice))) {
syslog(LOG_WARNING, "no clt kill_clt");
- return;
+ return(ZERR_NONE); /* XXX */
}
/* remove the locations, too */
client_deregister(client, host, 1);
- return;
+ return(ZERR_NONE);
}
/*
* Another server asked us to initiate recovery protocol with the hostmanager
*/
-static void
-recover_clt(notice)
+static Code_t
+recover_clt(notice, server)
register ZNotice_t *notice;
+ZServerDesc_t *server;
{
struct sockaddr_in who;
ZClient_t *client;
ZHostList_t *host;
+ Code_t status;
- if (extract_addr(notice, &who) != ZERR_NONE)
- return;
+ if ((status = extract_addr(notice, &who)) != ZERR_NONE)
+ return(status);
if (!(host = hostm_find_host(&who.sin_addr))) {
- syslog(LOG_WARNING, "recover_clt h not found");
- return;
+ syslog(LOG_WARNING, "recover_clt h not found, from %s",
+ inet_ntoa(server->zs_addr.sin_addr));
+ syslog(LOG_WARNING, "%s", inet_ntoa(who.sin_addr));
+ return(ZERR_NONE); /* XXX */
}
+ if (host->zh_locked)
+ return(ZSRV_REQUEUE);
if (!(client = client_which_client(&who, notice))) {
- syslog(LOG_WARNING, "recover_clt not found");
- return;
+ syslog(LOG_WARNING, "recover_clt not found, from %s",
+ inet_ntoa(server->zs_addr.sin_addr));
+ syslog(LOG_WARNING, "%s/%d",inet_ntoa(who.sin_addr),
+ ntohs(who.sin_port));
+ return(ZERR_NONE); /* XXX */
}
hostm_losing(client, host);
+ return(ZERR_NONE);
}
/*
@@ -608,7 +635,7 @@ int auth;
*/
/*ARGSUSED*/
-static void
+static Code_t
admin_dispatch(notice, auth, who, server)
ZNotice_t *notice;
int auth;
@@ -616,6 +643,7 @@ struct sockaddr_in *who;
ZServerDesc_t *server;
{
register char *opcode = notice->z_opcode;
+ Code_t status = ZERR_NONE;
zdbug((LOG_DEBUG, "ADMIN received"));
@@ -637,15 +665,20 @@ ZServerDesc_t *server;
srv_states[(int) server->zs_state]));
}
} else if (!strcmp(opcode, ADMIN_BDUMP)) {
+#ifdef CONCURRENT
+ if (bdumping)
+ return(ZSRV_REQUEUE);
+#endif CONCURRENT
bdump_get(notice, auth, who, server);
} else if (!strcmp(opcode, ADMIN_LOST_CLT)) {
- recover_clt(notice);
+ status = recover_clt(notice, server);
} else if (!strcmp(opcode, ADMIN_KILL_CLT)) {
- kill_clt(notice);
- ack(notice, who);
+ status = kill_clt(notice);
+ if (status == ZERR_NONE)
+ ack(notice, who);
} else
syslog(LOG_WARNING, "ADMIN unknown opcode %s",opcode);
- return;
+ return(status);
}
/*
@@ -665,6 +698,7 @@ ZServerDesc_t *server;
/* hostm transfer remque's the host and
attaches it to the new server */
hostm_transfer(host, limbo_server);
+ srv_nack_release(server);
}
/*
@@ -674,7 +708,7 @@ ZServerDesc_t *server;
*/
/*ARGSUSED*/
-void
+Code_t
server_adispatch(notice, auth, who, server)
ZNotice_t *notice;
int auth;
@@ -688,7 +722,7 @@ ZServerDesc_t *server;
if (!strcmp(notice->z_opcode, ADMIN_STATUS)) {
/* status packet */
send_stats(who);
- return;
+ return(ZERR_NONE);
}
#ifdef notdef
syslog(LOG_INFO, "disp: new server?");
@@ -704,7 +738,7 @@ ZServerDesc_t *server;
syslog(LOG_INFO, "srv_adisp: server attempt from %s",
inet_ntoa(who->sin_addr));
#endif /* notdef */
- return;
+ return(ZERR_NONE);
}
static void
@@ -744,9 +778,10 @@ struct sockaddr_in *who;
num_resp = NUM_FIXED;
/* start at 1 and ignore limbo */
for (i = 1; i < nservers ; i++) {
- (void) sprintf(buf, "%s/%s",
+ (void) sprintf(buf, "%s/%s%s",
inet_ntoa(otherservers[i].zs_addr.sin_addr),
- srv_states[(int) otherservers[i].zs_state]);
+ srv_states[(int) otherservers[i].zs_state],
+ otherservers[i].zs_dumping ? " (DUMPING)" : "");
responses[num_resp++] = strsave(buf);
}
@@ -906,6 +941,10 @@ struct in_addr *addr;
}
host->q_forw = host->q_back = host;
server->zs_hosts = host;
+
+ server->zs_update_queue = NULLZSPT;
+ server->zs_dumping = 0;
+
return;
}
@@ -1150,19 +1189,17 @@ struct sockaddr_in *who;
caddr_t pack;
int packlen;
Code_t retval;
- register ZNotAcked_t *nacked;
- if (bdumping) {
- zdbug((LOG_DEBUG,"bdumping, won't srv_forw"));
- return;
- }
zdbug((LOG_DEBUG, "srv_forw"));
/* don't send to limbo */
for (i = 1; i < nservers; i++) {
if (i == me_server_idx) /* don't xmit to myself */
continue;
- if (otherservers[i].zs_state == SERV_DEAD)
+ if (otherservers[i].zs_state == SERV_DEAD &&
+ otherservers[i].zs_dumping == 0)
+ /* if we are dumping to him, we want to
+ queue it, even if he's dead */
continue;
if (!(pack = (caddr_t) xmalloc(sizeof(ZPacket_t)))) {
@@ -1177,45 +1214,93 @@ struct sockaddr_in *who;
xfree(pack);
continue;
}
- if ((retval = ZSetDestAddr(&otherservers[i].zs_addr)) != ZERR_NONE) {
- syslog(LOG_WARNING, "srv_fwd set addr: %s",
- error_message(retval));
- xfree(pack);
- continue;
- }
- if ((retval = ZSendPacket(pack, packlen)) != ZERR_NONE) {
- syslog(LOG_WARNING, "srv_fwd xmit: %s", error_message(retval));
- xfree(pack);
- continue;
- }
- /* now we've sent it, mark it as not ack'ed */
-
- if (!(nacked = (ZNotAcked_t *)xmalloc(sizeof(ZNotAcked_t)))) {
- /* no space: just punt */
- syslog(LOG_ERR, "srv_forw nack malloc");
- xfree(pack);
- continue;
+ if (otherservers[i].zs_dumping) {
+ server_queue(&(otherservers[i]), packlen, pack,
+ auth, who);
+ return;
}
+ server_forw_reliable(&otherservers[i],pack, packlen, notice);
+ }
+ return;
+}
- nacked->na_rexmits = 0;
- nacked->na_packet = pack;
- nacked->na_srv_idx = i;
- nacked->na_packsz = packlen;
- nacked->na_uid = notice->z_uid;
- nacked->q_forw = nacked->q_back = nacked;
- nacked->na_abstimo = 0;
+static void
+server_forw_reliable(server, pack, packlen, notice)
+ZServerDesc_t *server;
+caddr_t pack;
+int packlen;
+ZNotice_t *notice;
+{
+ Code_t retval;
+ register ZNotAcked_t *nacked;
- /* set a timer to retransmit */
- nacked->na_timer = timer_set_rel(srv_rexmit_secs,
- srv_rexmit,
- (caddr_t) nacked);
- /* chain in */
- xinsque(nacked, srv_nacklist);
+ if ((retval = ZSetDestAddr(&server->zs_addr)) != ZERR_NONE) {
+ syslog(LOG_WARNING, "srv_fwd_rel set addr: %s",
+ error_message(retval));
+ xfree(pack);
+ return;
+ }
+ if ((retval = ZSendPacket(pack, packlen)) != ZERR_NONE) {
+ syslog(LOG_WARNING, "srv_fwd xmit: %s", error_message(retval));
+ xfree(pack);
+ return;
+ }
+ /* now we've sent it, mark it as not ack'ed */
+
+ if (!(nacked = (ZNotAcked_t *)xmalloc(sizeof(ZNotAcked_t)))) {
+ /* no space: just punt */
+ syslog(LOG_ERR, "srv_forw nack malloc");
+ xfree(pack);
+ return;
}
+
+ nacked->na_rexmits = 0;
+ nacked->na_packet = pack;
+ nacked->na_srv_idx = server - otherservers;
+ nacked->na_packsz = packlen;
+ nacked->na_uid = notice->z_uid;
+ nacked->q_forw = nacked->q_back = nacked;
+ nacked->na_abstimo = 0;
+
+ /* set a timer to retransmit */
+ nacked->na_timer = timer_set_rel(srv_rexmit_secs,
+ srv_rexmit,
+ (caddr_t) nacked);
+ /* chain in */
+ xinsque(nacked, srv_nacklist);
return;
}
/*
+ * send the queued message for the server.
+ */
+
+void
+server_send_queue(server)
+ZServerDesc_t *server;
+{
+ register ZSrvPending_t *pending;
+ ZNotice_t notice;
+ Code_t status;
+
+ while(server->zs_update_queue) {
+ pending = server_dequeue(server);
+ if (status = ZParseNotice(pending->pend_packet,
+ pending->pend_len,
+ &notice)) {
+ syslog(LOG_ERR,
+ "ssq bad notice parse (%s): %s",
+ inet_ntoa(pending->pend_who.sin_addr),
+ error_message(status));
+ } else {
+ server_forw_reliable(server, pending->pend_packet,
+ pending->pend_len, &notice);
+ xfree(pending);
+ /* ACK handling routines will free the packet */
+ }
+ }
+}
+/*
* a server has acknowledged a message we sent to him; remove it from
* server unacked queue
*/
@@ -1298,6 +1383,8 @@ static void
srv_nack_release(server)
ZServerDesc_t *server;
{
+ /* XXX release any private queue for this server */
+
register ZNotAcked_t *nacked, *nack2;
/* search the not-yet-acked list for anything destined to him, and
@@ -1318,3 +1405,105 @@ ZServerDesc_t *server;
nacked = nacked->q_forw;
return;
}
+
+/*
+ * Queue this notice to be transmitted to the server when it is ready.
+ */
+static void
+server_queue(server, len, pack, auth, who)
+ZServerDesc_t *server;
+int len;
+caddr_t pack;
+int auth;
+struct sockaddr_in *who;
+{
+ register ZSrvPending_t *pending;
+
+ if (!server->zs_update_queue) {
+ if (!(pending =
+ (ZSrvPending_t *)xmalloc(sizeof(ZSrvPending_t)))) {
+ syslog(LOG_CRIT, "zs_update_queue head malloc");
+ abort();
+ }
+ pending->q_forw = pending->q_back = pending;
+ server->zs_update_queue = pending;
+ }
+ if (!(pending = (ZSrvPending_t *)xmalloc(sizeof(ZSrvPending_t)))) {
+ syslog(LOG_CRIT, "zs_update_queue malloc");
+ abort();
+ }
+ pending->pend_packet = pack;
+ pending->pend_len = len;
+ pending->pend_auth = auth;
+ pending->pend_who = *who;
+
+ /* put it on the end of the list */
+ xinsque(pending, server->zs_update_queue->q_back);
+ return;
+}
+
+/*
+ * Pull a notice off the hold queue.
+ */
+
+ZSrvPending_t *
+server_dequeue(server)
+register ZServerDesc_t *server;
+{
+ ZSrvPending_t *pending;
+
+ if (!server->zs_update_queue)
+ return(NULLZSPT);
+ pending = server->zs_update_queue->q_forw;
+ /* pull it off */
+ xremque(pending);
+ if (server->zs_update_queue->q_forw == server->zs_update_queue) {
+ /* empty queue now */
+ xfree(server->zs_update_queue);
+ server->zs_update_queue = NULLZSPT;
+ }
+ return(pending);
+}
+
+/*
+ * free storage used by a pending queue entry.
+ */
+
+void
+server_pending_free(pending)
+register ZSrvPending_t *pending;
+{
+ xfree(pending->pend_packet);
+ xfree(pending);
+ return;
+}
+
+/*
+ * Queue something to be handled later by this server.
+ */
+
+void
+server_self_queue(notice, auth, who)
+ZNotice_t *notice;
+int auth;
+struct sockaddr_in *who;
+{
+ caddr_t pack;
+ int packlen;
+ Code_t retval;
+
+ if (!(pack = (caddr_t) xmalloc(sizeof(ZPacket_t)))) {
+ syslog(LOG_CRIT, "srv_self_queue malloc");
+ abort();
+ }
+
+ packlen = sizeof(ZPacket_t);
+ if ((retval = ZFormatRawNotice(notice, pack, packlen, &packlen))
+ != ZERR_NONE) {
+ syslog(LOG_CRIT, "srv_self_queue format: %s",
+ error_message(retval));
+ abort();
+ }
+ server_queue(me_server, packlen, pack, auth, who);
+ return;
+}