diff options
author | Greg Hudson <ghudson@mit.edu> | 1997-09-14 18:12:16 +0000 |
---|---|---|
committer | Greg Hudson <ghudson@mit.edu> | 1997-09-14 18:12:16 +0000 |
commit | ce6018836c422c86a729ba39fe5433ec11b87b02 (patch) | |
tree | cc2e6601489384fcf41e0565e8906822897b46c8 /zhm/queue.c | |
parent | ac16f380e349fa39ec7e26bccb5456cb300006a5 (diff) |
Pull in sources from zephyr locker. See /mit/zephyr/repository for
detailed change information.
Diffstat (limited to 'zhm/queue.c')
-rw-r--r-- | zhm/queue.c | 383 |
1 files changed, 179 insertions, 204 deletions
diff --git a/zhm/queue.c b/zhm/queue.c index fd87546..5e0ca3a 100644 --- a/zhm/queue.c +++ b/zhm/queue.c @@ -20,249 +20,224 @@ static char rcsid_queue_c[] = "$Header$"; #endif /* lint */ typedef struct _Queue { - long timeout; - int retries; - ZNotice_t z_notice; - caddr_t z_packet; - struct sockaddr_in reply; + Timer *timer; + int retries; + ZNotice_t notice; + caddr_t packet; + struct sockaddr_in reply; + struct _Queue *next, **prev_p; } Queue; -struct _qelem { - struct _qelem *q_forw; - struct _qelem *q_back; - Queue *q_data; -}; +static Queue *hm_queue; +static int retransmits_enabled = 0; -typedef struct _qelem Qelem; - -Qelem hm_queue = { &hm_queue, &hm_queue, NULL }, *is_in_queue(); +static Queue *find_notice_in_queue __P((ZNotice_t *notice)); +static Code_t dump_queue __P((void)); +static void queue_timeout __P((void *arg)); int rexmit_times[] = { 2, 2, 4, 4, 8, -1 }; -extern long time(); -extern int timeout_type; +#ifdef DEBUG +Code_t dump_queue(); +#endif void init_queue() { - while (hm_queue.q_forw != &hm_queue) { - free(hm_queue.q_forw->q_data->z_packet); - free((char *)hm_queue.q_forw->q_data); - remque(hm_queue.q_forw); - free((char *)hm_queue.q_forw); - } - - hm_queue.q_forw = hm_queue.q_back = &hm_queue; - hm_queue.q_data = NULL; - DPR ("Queue initialized and flushed.\n"); + Queue *q; + + while (hm_queue) { + q = hm_queue; + if (q->timer) + timer_reset(q->timer); + free(q->packet); + hm_queue = q->next; + free(q); + } + + DPR("Queue initialized and flushed.\n"); } Code_t add_notice_to_queue(notice, packet, repl, len) -ZNotice_t *notice; -caddr_t packet; -struct sockaddr_in *repl; -int len; + ZNotice_t *notice; + char * packet; + struct sockaddr_in *repl; + int len; { - Qelem *elem; - Queue *entry; - - DPR ("Adding notice to queue...\n"); - if (!is_in_queue(notice)) { - elem = (Qelem *)malloc(sizeof(Qelem)); - entry = (Queue *)malloc(sizeof(Queue)); - entry->timeout = time((time_t *)0) + rexmit_times[0]; - entry->retries = 0; - entry->z_packet = (char *)malloc(Z_MAXPKTLEN); - (void) memcpy(entry->z_packet, packet, Z_MAXPKTLEN); - if (ZParseNotice(entry->z_packet, len, &entry->z_notice) - != ZERR_NONE) { - syslog(LOG_ERR, "ZParseNotice failed, but succeeded before"); - free(entry->z_packet); - } else { - entry->reply = *repl; - elem->q_data = entry; - elem->q_forw = elem; - elem->q_back = elem; - insque(elem, hm_queue.q_back); - } - } -#ifdef DEBUG - if (!is_in_queue(notice)) - return(ZERR_NONOTICE); - else -#endif /* DEBUG */ - return(ZERR_NONE); + Queue *entry; + + DPR("Adding notice to queue...\n"); + if (!find_notice_in_queue(notice)) { + entry = (Queue *) malloc(sizeof(Queue)); + entry->retries = 0; + entry->packet = (char *) malloc(Z_MAXPKTLEN); + memcpy(entry->packet, packet, Z_MAXPKTLEN); + if (ZParseNotice(entry->packet, len, &entry->notice) != ZERR_NONE) { + syslog(LOG_ERR, "ZParseNotice failed, but succeeded before"); + free(entry->packet); + } else { + entry->reply = *repl; + LIST_INSERT(&hm_queue, entry); + } + entry->timer = (retransmits_enabled) ? + timer_set_rel(rexmit_times[0], queue_timeout, entry) : NULL; + } + return(ZERR_NONE); } Code_t remove_notice_from_queue(notice, kind, repl) -ZNotice_t *notice; -ZNotice_Kind_t *kind; -struct sockaddr_in *repl; + ZNotice_t *notice; + ZNotice_Kind_t *kind; + struct sockaddr_in *repl; { - Qelem *elem; - - DPR ("Removing notice from queue...\n"); - if ((elem = is_in_queue(notice)) == NULL) - return(ZERR_NONOTICE); - else { - *kind = elem->q_data->z_notice.z_kind; - *repl = elem->q_data->reply; - free(elem->q_data->z_packet); - free((char *)elem->q_data); - remque(elem); - free((char *)elem); - if (hm_queue.q_forw == &hm_queue) - (void)alarm(0); + Queue *entry; + + DPR("Removing notice from queue...\n"); + entry = find_notice_in_queue(notice); + if (entry == NULL) + return(ZERR_NONOTICE); + + *kind = entry->notice.z_kind; + *repl = entry->reply; + timer_reset(entry->timer); + free(entry->packet); + LIST_DELETE(entry); #ifdef DEBUG - dump_queue(); + dump_queue(); #endif /* DEBUG */ - return(ZERR_NONE); - } + return(ZERR_NONE); } +/* We have a server; transmit all of our packets. */ void retransmit_queue(sin) -struct sockaddr_in *sin; + struct sockaddr_in *sin; { - Qelem *srch; - Code_t ret; + Queue *entry; + Code_t ret; + + DPR("Retransmitting queue to new server...\n"); + ret = ZSetDestAddr(sin); + if (ret != ZERR_NONE) { + Zperr (ret); + com_err("queue", ret, "setting destination"); + } + for (entry = hm_queue; entry; entry = entry->next) { + DPR("notice:\n"); + DPR2("\tz_kind: %d\n", entry->notice.z_kind); + DPR2("\tz_port: %u\n", ntohs(entry->notice.z_port)); + DPR2("\tz_class: %s\n", entry->notice.z_class); + DPR2("\tz_clss_inst: %s\n", entry->notice.z_class_inst); + DPR2("\tz_opcode: %s\n", entry->notice.z_opcode); + DPR2("\tz_sender: %s\n", entry->notice.z_sender); + DPR2("\tz_recip: %s\n", entry->notice.z_recipient); + ret = send_outgoing(&entry->notice); + if (ret != ZERR_NONE) { + Zperr(ret); + com_err("queue", ret, "sending raw notice"); + } + entry->timer = timer_set_rel(rexmit_times[0], queue_timeout, entry); + entry->retries = 0; + } + retransmits_enabled = 1; +} - DPR ("Retransmitting queue to new server...\n"); - if ((ret = ZSetDestAddr(sin)) != ZERR_NONE) { - Zperr (ret); - com_err("queue", ret, "setting destination"); - } - if ((srch = hm_queue.q_forw) != &hm_queue) { - do { - DPR ("notice:\n"); - DPR2 ("\tz_kind: %d\n", srch->q_data->z_notice.z_kind); - DPR2 ("\tz_port: %u\n", - ntohs(srch->q_data->z_notice.z_port)); - DPR2 ("\tz_class: %s\n", srch->q_data->z_notice.z_class); - DPR2 ("\tz_clss_inst: %s\n", - srch->q_data->z_notice.z_class_inst); - DPR2 ("\tz_opcode: %s\n", srch->q_data->z_notice.z_opcode); - DPR2 ("\tz_sender: %s\n", srch->q_data->z_notice.z_sender); - DPR2 ("\tz_recip: %s\n", srch->q_data->z_notice.z_recipient); - if ((ret = send_outgoing(&srch->q_data->z_notice)) - != ZERR_NONE) { - Zperr (ret); - com_err("queue", ret, "sending raw notice"); - } - srch->q_data->timeout = time(0) + rexmit_times[0]; - srch->q_data->retries = 0; - srch = srch->q_forw; - } while (srch != &hm_queue); - timeout_type = NOTICES; - (void)alarm(rexmit_times[0]); - } +/* We lost our server; nuke all of our timers. */ +void disable_queue_retransmits() +{ + Queue *entry; + + for (entry = hm_queue; entry; entry = entry->next) { + if (entry->timer) + timer_reset(entry->timer); + entry->timer = NULL; + } + retransmits_enabled = 0; } #ifdef DEBUG -Code_t dump_queue() +static Code_t dump_queue() { - Qelem *srch; - caddr_t mp; - int ml; - - DPR ("Dumping queue...\n"); - if ((srch = hm_queue.q_forw) == &hm_queue) - printf("Queue is empty.\n"); - else do { - printf("notice:\n"); - printf("\tz_kind: %d\n", srch->q_data->z_notice.z_kind); - printf("\tz_port: %u\n", ntohs(srch->q_data->z_notice.z_port)); - printf("\tz_class: %s\n", srch->q_data->z_notice.z_class); - printf("\tz_clss_inst: %s\n", srch->q_data->z_notice.z_class_inst); - printf("\tz_opcode: %s\n", srch->q_data->z_notice.z_opcode); - printf("\tz_sender: %s\n", srch->q_data->z_notice.z_sender); - printf("\tz_recip: %s\n", srch->q_data->z_notice.z_recipient); - printf("\tMessage:\n"); - mp = srch->q_data->z_notice.z_message; - for (ml = strlen(mp)+1; - ml <= srch->q_data->z_notice.z_message_len; ml++) { - printf("\t%s\n", mp); - mp += strlen(mp)+1; - ml += strlen(mp); - } - srch = srch->q_forw; - } while (srch != &hm_queue); + Queue *entry; + caddr_t mp; + int ml; + + DPR("Dumping queue...\n"); + if (!hm_queue) { + printf("Queue is empty.\n"); + return; + } + + for (entry = hm_queue; entry; entry = entry->next) { + printf("notice:\n"); + printf("\tz_kind: %d\n", entry->notice.z_kind); + printf("\tz_port: %u\n", ntohs(entry->notice.z_port)); + printf("\tz_class: %s\n", entry->notice.z_class); + printf("\tz_clss_inst: %s\n", entry->notice.z_class_inst); + printf("\tz_opcode: %s\n", entry->notice.z_opcode); + printf("\tz_sender: %s\n", entry->notice.z_sender); + printf("\tz_recip: %s\n", entry->notice.z_recipient); + printf("\tMessage:\n"); + mp = entry->notice.z_message; + for (ml = strlen(mp) + 1; ml <= entry->notice.z_message_len; ml++) { + printf("\t%s\n", mp); + mp += strlen(mp)+1; + ml += strlen(mp); + } + } } #endif /* DEBUG */ int queue_len() { - int length = 0; - Qelem *srch; + int length = 0; + Queue *entry; - if ((srch = hm_queue.q_forw) != &hm_queue) { - do { - length++; - srch = srch->q_forw; - } while (srch != &hm_queue); - } - return(length); + for (entry = hm_queue; entry; entry = entry->next) + length++; + return length; } -Qelem *is_in_queue(notice) -ZNotice_t *notice; +static Queue *find_notice_in_queue(notice) + ZNotice_t *notice; { - Qelem *srch; + Queue *entry; - srch = hm_queue.q_forw; - if (srch == &hm_queue) - return(NULL); - do { - if (ZCompareUID(&(srch->q_data->z_notice.z_uid), &(notice->z_uid))) - return(srch); - srch = srch->q_forw; - } while (srch != &hm_queue); - return(NULL); + for (entry = hm_queue; entry; entry = entry->next) { + if (ZCompareUID(&entry->notice.z_uid, ¬ice->z_uid)) + return entry; + } + return NULL; } -void resend_notices(sin) -struct sockaddr_in *sin; +static void queue_timeout(arg) + void *arg; { - Qelem *srch; - Code_t ret; - - DPR ("Resending notices...\n"); - if ((ret = ZSetDestAddr(sin)) != ZERR_NONE) { - Zperr(ret); - com_err("queue", ret, "setting destination"); - } - if ((srch = hm_queue.q_forw) == &hm_queue) { - syslog (LOG_INFO, "No notices, shouldn't have happened!"); - } else do { - if (srch->q_data->timeout <= time((time_t *)0)) { - srch->q_data->retries++; - if (rexmit_times[srch->q_data->retries] == -1) { - new_server((char *)NULL); - break; - } else { - DPR ("notice:\n"); - DPR2 ("\tz_kind: %d\n", srch->q_data->z_notice.z_kind); - DPR2 ("\tz_port: %u\n", - ntohs(srch->q_data->z_notice.z_port)); - DPR2 ("\tz_class: %s\n", - srch->q_data->z_notice.z_class); - DPR2 ("\tz_clss_inst: %s\n", - srch->q_data->z_notice.z_class_inst); - DPR2 ("\tz_opcode: %s\n", - srch->q_data->z_notice.z_opcode); - DPR2 ("\tz_sender: %s\n", - srch->q_data->z_notice.z_sender); - DPR2 ("\tz_recip: %s\n", - srch->q_data->z_notice.z_recipient); - if ((ret = send_outgoing(&srch->q_data->z_notice)) - != ZERR_NONE) { - Zperr(ret); - com_err("queue", ret, "sending raw notice"); - } - srch->q_data->timeout = time((time_t *)0) + - rexmit_times[srch->q_data->retries]; - } - srch = srch->q_forw; - } - } while (srch != &hm_queue); - if (timeout_type == NOTICES) - (void)alarm(rexmit_times[0]); + Queue *entry = (Queue *) arg; + Code_t ret; + + ret = ZSetDestAddr(&serv_sin); + if (ret != ZERR_NONE) { + Zperr(ret); + com_err("queue", ret, "setting destination"); + } + entry->retries++; + if (rexmit_times[entry->retries] == -1) { + new_server(NULL); + return; + } + DPR("Resending notice:\n"); + DPR2("\tz_kind: %d\n", entry->notice.z_kind); + DPR2("\tz_port: %u\n", ntohs(entry->notice.z_port)); + DPR2("\tz_class: %s\n", entry->notice.z_class); + DPR2("\tz_clss_inst: %s\n", entry->notice.z_class_inst); + DPR2("\tz_opcode: %s\n", entry->notice.z_opcode); + DPR2("\tz_sender: %s\n", entry->notice.z_sender); + DPR2("\tz_recip: %s\n", entry->notice.z_recipient); + ret = send_outgoing(&entry->notice); + if (ret != ZERR_NONE) { + Zperr(ret); + com_err("queue", ret, "sending raw notice"); + } + entry->timer = timer_set_rel(rexmit_times[entry->retries], queue_timeout, + entry); } + |