diff options
author | John Kohl <jtkohl@mit.edu> | 1988-02-05 09:36:05 +0000 |
---|---|---|
committer | John Kohl <jtkohl@mit.edu> | 1988-02-05 09:36:05 +0000 |
commit | 84b74a4065424a964a381ca687e30ea02cd847a1 (patch) | |
tree | c5688e73fffd2b3bf91610e2992ee97f9359653b /server/server.c | |
parent | 494b01f2d5dd9ed087b0bf5657f998cb3224734f (diff) |
concurrency support
Diffstat (limited to 'server/server.c')
-rw-r--r-- | server/server.c | 339 |
1 files changed, 264 insertions, 75 deletions
diff --git a/server/server.c b/server/server.c index d6e1969..5c3a6d9 100644 --- a/server/server.c +++ b/server/server.c @@ -63,11 +63,12 @@ static char rcsid_server_s_c[] = "$Header$"; * */ -static void server_hello(), server_flush(), admin_dispatch(), setup_server(); +static void server_hello(), server_flush(), setup_server(); static void hello_respond(), srv_responded(), send_msg(), send_msg_list(); static void srv_alive(), srv_nack_cancel(), srv_rexmit(), srv_nack_release(); -static void recover_clt(), kill_clt(), server_lost(); -static void send_stats(); +static void server_lost(); +static void send_stats(), server_queue(), server_forw_reliable(); +static Code_t admin_dispatch(), recover_clt(), kill_clt(); #ifdef notdef static Code_t server_register(); @@ -135,6 +136,8 @@ server_init() setup_server(otherservers, &limbo_addr); timer_reset(otherservers[0].zs_timer); otherservers[0].zs_timer = (timer) NULL; + otherservers[0].zs_update_queue = NULLZSPT; + otherservers[0].zs_dumping = 0; for (serv_addr = server_addrs, i = 1; i < nservers; serv_addr++, i++) { setup_server(&otherservers[i], serv_addr); @@ -144,6 +147,8 @@ server_init() otherservers[i].zs_state = SERV_UP; timer_reset(otherservers[i].zs_timer); otherservers[i].zs_timer = (timer) NULL; + otherservers[i].zs_update_queue = NULLZSPT; + otherservers[i].zs_dumping = 0; zdbug((LOG_DEBUG,"found myself")); } } @@ -254,7 +259,7 @@ ZServerDesc_t *which; */ /*ARGSUSED*/ -void +Code_t server_dispatch(notice, auth, who) ZNotice_t *notice; int auth; @@ -262,13 +267,14 @@ struct sockaddr_in *who; { ZServerDesc_t *server; struct sockaddr_in newwho; + Code_t status; zdbug((LOG_DEBUG, "server_dispatch")); if (notice->z_kind == SERVACK) { srv_nack_cancel(notice, who); srv_responded(who); - return; + return(ZERR_NONE); } /* XXX set up a who for the real origin */ bzero((caddr_t) &newwho, sizeof(newwho)); @@ -278,23 +284,29 @@ struct sockaddr_in *who; server = server_which_server(who); + /* we can dispatch to routines safely here, since they will + return ZSRV_REQUEUE if appropriate. We bounce this back + to the caller, and the caller will re-queue the message + for us to process later. */ + if (class_is_admin(notice)) { /* admins don't get acked, else we get a packet loop */ - admin_dispatch(notice, auth, who, server); - return; + /* will return requeue if bdump request and dumping */ + return(admin_dispatch(notice, auth, who, server)); } else if (class_is_control(notice)) - control_dispatch(notice, auth, &newwho, server); + status = control_dispatch(notice, auth, &newwho, server); else if (class_is_ulogin(notice)) - ulogin_dispatch(notice, auth, &newwho, server); + status = ulogin_dispatch(notice, auth, &newwho, server); else if (class_is_ulocate(notice)) - ulocate_dispatch(notice, auth, &newwho, server); - else + status = ulocate_dispatch(notice, auth, &newwho, server); + else { /* shouldn't come from another server */ syslog(LOG_WARNING, "srv_disp: pkt cls %s",notice->z_class); - - /* acknowledge it */ - ack(notice, who); - return; + status = ZERR_NONE; /* XXX */ + } + if (status != ZSRV_REQUEUE) + ack(notice, who); /* acknowledge it if processed */ + return(status); } #ifdef notdef @@ -346,6 +358,9 @@ struct sockaddr_in *who; setup_server(&otherservers[nservers], &who->sin_addr); otherservers[nservers].zs_state = SERV_STARTING; otherservers[nservers].zs_timeout = timo_tardy; + otherservers[nservers].zs_update_queue = NULLZSPT; + otherservers[nservers].zs_dumping = 0; + nservers++; zdbug((LOG_DEBUG, "srv %s is %s", inet_ntoa(otherservers[nservers].zs_addr.sin_addr), @@ -489,7 +504,7 @@ ZClient_t *client; * A client has died. remove it */ -static void +static Code_t kill_clt(notice) ZNotice_t *notice; { @@ -499,42 +514,54 @@ ZNotice_t *notice; zdbug((LOG_DEBUG, "kill_clt")); if (extract_addr(notice, &who) != ZERR_NONE) - return; + return(ZERR_NONE); /* XXX */ if (!(host = hostm_find_host(&who.sin_addr))) { syslog(LOG_WARNING, "no host kill_clt"); - return; + return(ZERR_NONE); /* XXX */ } + if (host->zh_locked) + return(ZSRV_REQUEUE); if (!(client = client_which_client(&who, notice))) { syslog(LOG_WARNING, "no clt kill_clt"); - return; + return(ZERR_NONE); /* XXX */ } /* remove the locations, too */ client_deregister(client, host, 1); - return; + return(ZERR_NONE); } /* * Another server asked us to initiate recovery protocol with the hostmanager */ -static void -recover_clt(notice) +static Code_t +recover_clt(notice, server) register ZNotice_t *notice; +ZServerDesc_t *server; { struct sockaddr_in who; ZClient_t *client; ZHostList_t *host; + Code_t status; - if (extract_addr(notice, &who) != ZERR_NONE) - return; + if ((status = extract_addr(notice, &who)) != ZERR_NONE) + return(status); if (!(host = hostm_find_host(&who.sin_addr))) { - syslog(LOG_WARNING, "recover_clt h not found"); - return; + syslog(LOG_WARNING, "recover_clt h not found, from %s", + inet_ntoa(server->zs_addr.sin_addr)); + syslog(LOG_WARNING, "%s", inet_ntoa(who.sin_addr)); + return(ZERR_NONE); /* XXX */ } + if (host->zh_locked) + return(ZSRV_REQUEUE); if (!(client = client_which_client(&who, notice))) { - syslog(LOG_WARNING, "recover_clt not found"); - return; + syslog(LOG_WARNING, "recover_clt not found, from %s", + inet_ntoa(server->zs_addr.sin_addr)); + syslog(LOG_WARNING, "%s/%d",inet_ntoa(who.sin_addr), + ntohs(who.sin_port)); + return(ZERR_NONE); /* XXX */ } hostm_losing(client, host); + return(ZERR_NONE); } /* @@ -608,7 +635,7 @@ int auth; */ /*ARGSUSED*/ -static void +static Code_t admin_dispatch(notice, auth, who, server) ZNotice_t *notice; int auth; @@ -616,6 +643,7 @@ struct sockaddr_in *who; ZServerDesc_t *server; { register char *opcode = notice->z_opcode; + Code_t status = ZERR_NONE; zdbug((LOG_DEBUG, "ADMIN received")); @@ -637,15 +665,20 @@ ZServerDesc_t *server; srv_states[(int) server->zs_state])); } } else if (!strcmp(opcode, ADMIN_BDUMP)) { +#ifdef CONCURRENT + if (bdumping) + return(ZSRV_REQUEUE); +#endif CONCURRENT bdump_get(notice, auth, who, server); } else if (!strcmp(opcode, ADMIN_LOST_CLT)) { - recover_clt(notice); + status = recover_clt(notice, server); } else if (!strcmp(opcode, ADMIN_KILL_CLT)) { - kill_clt(notice); - ack(notice, who); + status = kill_clt(notice); + if (status == ZERR_NONE) + ack(notice, who); } else syslog(LOG_WARNING, "ADMIN unknown opcode %s",opcode); - return; + return(status); } /* @@ -665,6 +698,7 @@ ZServerDesc_t *server; /* hostm transfer remque's the host and attaches it to the new server */ hostm_transfer(host, limbo_server); + srv_nack_release(server); } /* @@ -674,7 +708,7 @@ ZServerDesc_t *server; */ /*ARGSUSED*/ -void +Code_t server_adispatch(notice, auth, who, server) ZNotice_t *notice; int auth; @@ -688,7 +722,7 @@ ZServerDesc_t *server; if (!strcmp(notice->z_opcode, ADMIN_STATUS)) { /* status packet */ send_stats(who); - return; + return(ZERR_NONE); } #ifdef notdef syslog(LOG_INFO, "disp: new server?"); @@ -704,7 +738,7 @@ ZServerDesc_t *server; syslog(LOG_INFO, "srv_adisp: server attempt from %s", inet_ntoa(who->sin_addr)); #endif /* notdef */ - return; + return(ZERR_NONE); } static void @@ -744,9 +778,10 @@ struct sockaddr_in *who; num_resp = NUM_FIXED; /* start at 1 and ignore limbo */ for (i = 1; i < nservers ; i++) { - (void) sprintf(buf, "%s/%s", + (void) sprintf(buf, "%s/%s%s", inet_ntoa(otherservers[i].zs_addr.sin_addr), - srv_states[(int) otherservers[i].zs_state]); + srv_states[(int) otherservers[i].zs_state], + otherservers[i].zs_dumping ? " (DUMPING)" : ""); responses[num_resp++] = strsave(buf); } @@ -906,6 +941,10 @@ struct in_addr *addr; } host->q_forw = host->q_back = host; server->zs_hosts = host; + + server->zs_update_queue = NULLZSPT; + server->zs_dumping = 0; + return; } @@ -1150,19 +1189,17 @@ struct sockaddr_in *who; caddr_t pack; int packlen; Code_t retval; - register ZNotAcked_t *nacked; - if (bdumping) { - zdbug((LOG_DEBUG,"bdumping, won't srv_forw")); - return; - } zdbug((LOG_DEBUG, "srv_forw")); /* don't send to limbo */ for (i = 1; i < nservers; i++) { if (i == me_server_idx) /* don't xmit to myself */ continue; - if (otherservers[i].zs_state == SERV_DEAD) + if (otherservers[i].zs_state == SERV_DEAD && + otherservers[i].zs_dumping == 0) + /* if we are dumping to him, we want to + queue it, even if he's dead */ continue; if (!(pack = (caddr_t) xmalloc(sizeof(ZPacket_t)))) { @@ -1177,45 +1214,93 @@ struct sockaddr_in *who; xfree(pack); continue; } - if ((retval = ZSetDestAddr(&otherservers[i].zs_addr)) != ZERR_NONE) { - syslog(LOG_WARNING, "srv_fwd set addr: %s", - error_message(retval)); - xfree(pack); - continue; - } - if ((retval = ZSendPacket(pack, packlen)) != ZERR_NONE) { - syslog(LOG_WARNING, "srv_fwd xmit: %s", error_message(retval)); - xfree(pack); - continue; - } - /* now we've sent it, mark it as not ack'ed */ - - if (!(nacked = (ZNotAcked_t *)xmalloc(sizeof(ZNotAcked_t)))) { - /* no space: just punt */ - syslog(LOG_ERR, "srv_forw nack malloc"); - xfree(pack); - continue; + if (otherservers[i].zs_dumping) { + server_queue(&(otherservers[i]), packlen, pack, + auth, who); + return; } + server_forw_reliable(&otherservers[i],pack, packlen, notice); + } + return; +} - nacked->na_rexmits = 0; - nacked->na_packet = pack; - nacked->na_srv_idx = i; - nacked->na_packsz = packlen; - nacked->na_uid = notice->z_uid; - nacked->q_forw = nacked->q_back = nacked; - nacked->na_abstimo = 0; +static void +server_forw_reliable(server, pack, packlen, notice) +ZServerDesc_t *server; +caddr_t pack; +int packlen; +ZNotice_t *notice; +{ + Code_t retval; + register ZNotAcked_t *nacked; - /* set a timer to retransmit */ - nacked->na_timer = timer_set_rel(srv_rexmit_secs, - srv_rexmit, - (caddr_t) nacked); - /* chain in */ - xinsque(nacked, srv_nacklist); + if ((retval = ZSetDestAddr(&server->zs_addr)) != ZERR_NONE) { + syslog(LOG_WARNING, "srv_fwd_rel set addr: %s", + error_message(retval)); + xfree(pack); + return; + } + if ((retval = ZSendPacket(pack, packlen)) != ZERR_NONE) { + syslog(LOG_WARNING, "srv_fwd xmit: %s", error_message(retval)); + xfree(pack); + return; + } + /* now we've sent it, mark it as not ack'ed */ + + if (!(nacked = (ZNotAcked_t *)xmalloc(sizeof(ZNotAcked_t)))) { + /* no space: just punt */ + syslog(LOG_ERR, "srv_forw nack malloc"); + xfree(pack); + return; } + + nacked->na_rexmits = 0; + nacked->na_packet = pack; + nacked->na_srv_idx = server - otherservers; + nacked->na_packsz = packlen; + nacked->na_uid = notice->z_uid; + nacked->q_forw = nacked->q_back = nacked; + nacked->na_abstimo = 0; + + /* set a timer to retransmit */ + nacked->na_timer = timer_set_rel(srv_rexmit_secs, + srv_rexmit, + (caddr_t) nacked); + /* chain in */ + xinsque(nacked, srv_nacklist); return; } /* + * send the queued message for the server. + */ + +void +server_send_queue(server) +ZServerDesc_t *server; +{ + register ZSrvPending_t *pending; + ZNotice_t notice; + Code_t status; + + while(server->zs_update_queue) { + pending = server_dequeue(server); + if (status = ZParseNotice(pending->pend_packet, + pending->pend_len, + ¬ice)) { + syslog(LOG_ERR, + "ssq bad notice parse (%s): %s", + inet_ntoa(pending->pend_who.sin_addr), + error_message(status)); + } else { + server_forw_reliable(server, pending->pend_packet, + pending->pend_len, ¬ice); + xfree(pending); + /* ACK handling routines will free the packet */ + } + } +} +/* * a server has acknowledged a message we sent to him; remove it from * server unacked queue */ @@ -1298,6 +1383,8 @@ static void srv_nack_release(server) ZServerDesc_t *server; { + /* XXX release any private queue for this server */ + register ZNotAcked_t *nacked, *nack2; /* search the not-yet-acked list for anything destined to him, and @@ -1318,3 +1405,105 @@ ZServerDesc_t *server; nacked = nacked->q_forw; return; } + +/* + * Queue this notice to be transmitted to the server when it is ready. + */ +static void +server_queue(server, len, pack, auth, who) +ZServerDesc_t *server; +int len; +caddr_t pack; +int auth; +struct sockaddr_in *who; +{ + register ZSrvPending_t *pending; + + if (!server->zs_update_queue) { + if (!(pending = + (ZSrvPending_t *)xmalloc(sizeof(ZSrvPending_t)))) { + syslog(LOG_CRIT, "zs_update_queue head malloc"); + abort(); + } + pending->q_forw = pending->q_back = pending; + server->zs_update_queue = pending; + } + if (!(pending = (ZSrvPending_t *)xmalloc(sizeof(ZSrvPending_t)))) { + syslog(LOG_CRIT, "zs_update_queue malloc"); + abort(); + } + pending->pend_packet = pack; + pending->pend_len = len; + pending->pend_auth = auth; + pending->pend_who = *who; + + /* put it on the end of the list */ + xinsque(pending, server->zs_update_queue->q_back); + return; +} + +/* + * Pull a notice off the hold queue. + */ + +ZSrvPending_t * +server_dequeue(server) +register ZServerDesc_t *server; +{ + ZSrvPending_t *pending; + + if (!server->zs_update_queue) + return(NULLZSPT); + pending = server->zs_update_queue->q_forw; + /* pull it off */ + xremque(pending); + if (server->zs_update_queue->q_forw == server->zs_update_queue) { + /* empty queue now */ + xfree(server->zs_update_queue); + server->zs_update_queue = NULLZSPT; + } + return(pending); +} + +/* + * free storage used by a pending queue entry. + */ + +void +server_pending_free(pending) +register ZSrvPending_t *pending; +{ + xfree(pending->pend_packet); + xfree(pending); + return; +} + +/* + * Queue something to be handled later by this server. + */ + +void +server_self_queue(notice, auth, who) +ZNotice_t *notice; +int auth; +struct sockaddr_in *who; +{ + caddr_t pack; + int packlen; + Code_t retval; + + if (!(pack = (caddr_t) xmalloc(sizeof(ZPacket_t)))) { + syslog(LOG_CRIT, "srv_self_queue malloc"); + abort(); + } + + packlen = sizeof(ZPacket_t); + if ((retval = ZFormatRawNotice(notice, pack, packlen, &packlen)) + != ZERR_NONE) { + syslog(LOG_CRIT, "srv_self_queue format: %s", + error_message(retval)); + abort(); + } + server_queue(me_server, packlen, pack, auth, who); + return; +} |