diff options
Diffstat (limited to 'zhm/queue.c')
-rw-r--r-- | zhm/queue.c | 122 |
1 files changed, 60 insertions, 62 deletions
diff --git a/zhm/queue.c b/zhm/queue.c index ecffdbf..5a072aa 100644 --- a/zhm/queue.c +++ b/zhm/queue.c @@ -18,83 +18,78 @@ static char rcsid_queue_c[] = "$Id$"; #endif /* SABER */ #endif /* lint */ -static const int rexmit_times[] = { 2, 2, 4, 4, 8, -1 }; - -static Queue *find_notice_in_galaxy __P((galaxy_info *gi, ZNotice_t *notice)); +typedef struct _Queue { + Timer *timer; + int retries; + ZNotice_t notice; + caddr_t packet; + struct sockaddr_in reply; + struct _Queue *next, **prev_p; +} Queue; + +static Queue *hm_queue; +static int retransmits_enabled = 0; + +static Queue *find_notice_in_queue __P((ZNotice_t *notice)); +static Code_t dump_queue __P((void)); static void queue_timeout __P((void *arg)); +int rexmit_times[] = { 2, 2, 4, 4, 8, -1 }; + #ifdef DEBUG -Code_t dump_galaxy_queue(galaxy_info *); +Code_t dump_queue(); #endif -void init_galaxy_queue(galaxy_info *gi) +void init_queue() { Queue *q; - while (gi->queue) { - q = gi->queue; + while (hm_queue) { + q = hm_queue; if (q->timer) timer_reset(q->timer); free(q->packet); - gi->queue = q->next; + hm_queue = q->next; free(q); } DPR("Queue initialized and flushed.\n"); } -Code_t add_notice_to_galaxy(gi, notice, repl, len) - galaxy_info *gi; +Code_t add_notice_to_queue(notice, packet, repl, len) ZNotice_t *notice; + char * packet; struct sockaddr_in *repl; int len; { Queue *entry; - int length; - int retval; DPR("Adding notice to queue...\n"); - if (!find_notice_in_galaxy(gi, notice)) { + if (!find_notice_in_queue(notice)) { entry = (Queue *) malloc(sizeof(Queue)); if (entry == NULL) return(ZERR_NONOTICE); - entry->gi = gi; entry->retries = 0; - if (!(entry->packet = (char *) malloc((unsigned)sizeof(ZPacket_t)))) { + entry->packet = (char *) malloc(Z_MAXPKTLEN); + if (entry->packet == NULL) { free(entry); - return(ENOMEM); + return(ZERR_NONOTICE); } - - if ((retval = ZFormatSmallRawNotice(notice, entry->packet, &length)) - != ZERR_NONE) { + memcpy(entry->packet, packet, Z_MAXPKTLEN); + if (ZParseNotice(entry->packet, len, &entry->notice) != ZERR_NONE) { + syslog(LOG_ERR, "ZParseNotice failed, but succeeded before"); free(entry->packet); - free(entry); - return(retval); - } - - /* I dislike this, but I need a notice which represents the - packet. since the notice structure refers to the internals - of its packet, I can't use the notice which was passed in, - so I need to make a new one. */ - - if ((retval = ZParseNotice(entry->packet, length, &entry->notice)) - != ZERR_NONE) { - free(entry->packet); - free(entry); - return(retval); + } else { + entry->reply = *repl; + LIST_INSERT(&hm_queue, entry); } - - entry->reply = *repl; - LIST_INSERT(&gi->queue, entry); - - entry->timer = (gi->state == ATTACHED) ? - timer_set_rel(rexmit_times[0], queue_timeout, entry) : NULL; + entry->timer = (retransmits_enabled) ? + timer_set_rel(rexmit_times[0], queue_timeout, entry) : NULL; } return(ZERR_NONE); } -Code_t remove_notice_from_galaxy(gi, notice, kind, repl) - galaxy_info *gi; +Code_t remove_notice_from_queue(notice, kind, repl) ZNotice_t *notice; ZNotice_Kind_t *kind; struct sockaddr_in *repl; @@ -102,7 +97,7 @@ Code_t remove_notice_from_galaxy(gi, notice, kind, repl) Queue *entry; DPR("Removing notice from queue...\n"); - entry = find_notice_in_galaxy(gi, notice); + entry = find_notice_in_queue(notice); if (entry == NULL) return(ZERR_NONOTICE); @@ -120,14 +115,19 @@ Code_t remove_notice_from_galaxy(gi, notice, kind, repl) } /* We have a server; transmit all of our packets. */ -void retransmit_galaxy(gi) - galaxy_info *gi; +void retransmit_queue(sin) + struct sockaddr_in *sin; { Queue *entry; Code_t ret; DPR("Retransmitting queue to new server...\n"); - for (entry = gi->queue; entry; entry = entry->next) { + ret = ZSetDestAddr(sin); + if (ret != ZERR_NONE) { + Zperr (ret); + com_err("queue", ret, "setting destination"); + } + for (entry = hm_queue; entry; entry = entry->next) { DPR("notice:\n"); DPR2("\tz_kind: %d\n", entry->notice.z_kind); DPR2("\tz_port: %u\n", ntohs(entry->notice.z_port)); @@ -136,7 +136,7 @@ void retransmit_galaxy(gi) DPR2("\tz_opcode: %s\n", entry->notice.z_opcode); DPR2("\tz_sender: %s\n", entry->notice.z_sender); DPR2("\tz_recip: %s\n", entry->notice.z_recipient); - ret = send_outgoing(&gi->sin, &entry->notice); + ret = send_outgoing(&entry->notice); if (ret != ZERR_NONE) { Zperr(ret); com_err("queue", ret, "sending raw notice"); @@ -144,36 +144,36 @@ void retransmit_galaxy(gi) entry->timer = timer_set_rel(rexmit_times[0], queue_timeout, entry); entry->retries = 0; } + retransmits_enabled = 1; } /* We lost our server; nuke all of our timers. */ -void disable_galaxy_retransmits(gi) - galaxy_info *gi; +void disable_queue_retransmits() { Queue *entry; - for (entry = gi->queue; entry; entry = entry->next) { + for (entry = hm_queue; entry; entry = entry->next) { if (entry->timer) timer_reset(entry->timer); entry->timer = NULL; } + retransmits_enabled = 0; } #ifdef DEBUG -static Code_t dump_galaxy_queue(gi) - galaxy_info *gi; +static Code_t dump_queue() { Queue *entry; caddr_t mp; int ml; DPR("Dumping queue...\n"); - if (!gi->queue) { + if (!hm_queue) { printf("Queue is empty.\n"); return; } - for (entry = gi->queue; entry; entry = entry->next) { + for (entry = hm_queue; entry; entry = entry->next) { printf("notice:\n"); printf("\tz_kind: %d\n", entry->notice.z_kind); printf("\tz_port: %u\n", ntohs(entry->notice.z_port)); @@ -193,24 +193,22 @@ static Code_t dump_galaxy_queue(gi) } #endif /* DEBUG */ -int galaxy_queue_len(gi) - galaxy_info *gi; +int queue_len() { int length = 0; Queue *entry; - for (entry = gi->queue; entry; entry = entry->next) + for (entry = hm_queue; entry; entry = entry->next) length++; return length; } -static Queue *find_notice_in_galaxy(gi, notice) - galaxy_info *gi; +static Queue *find_notice_in_queue(notice) ZNotice_t *notice; { Queue *entry; - for (entry = gi->queue; entry; entry = entry->next) { + for (entry = hm_queue; entry; entry = entry->next) { if (ZCompareUID(&entry->notice.z_uid, ¬ice->z_uid)) return entry; } @@ -224,14 +222,14 @@ static void queue_timeout(arg) Code_t ret; entry->timer = NULL; - + ret = ZSetDestAddr(&serv_sin); if (ret != ZERR_NONE) { Zperr(ret); com_err("queue", ret, "setting destination"); } entry->retries++; if (rexmit_times[entry->retries] == -1) { - galaxy_new_server(entry->gi, NULL); + new_server(NULL); return; } DPR("Resending notice:\n"); @@ -242,7 +240,7 @@ static void queue_timeout(arg) DPR2("\tz_opcode: %s\n", entry->notice.z_opcode); DPR2("\tz_sender: %s\n", entry->notice.z_sender); DPR2("\tz_recip: %s\n", entry->notice.z_recipient); - ret = send_outgoing(&entry->gi->sin, &entry->notice); + ret = send_outgoing(&entry->notice); if (ret != ZERR_NONE) { Zperr(ret); com_err("queue", ret, "sending raw notice"); |