summaryrefslogtreecommitdiff
path: root/zhm/queue.c
diff options
context:
space:
mode:
authorGravatar Greg Hudson <ghudson@mit.edu>1997-09-14 18:12:16 +0000
committerGravatar Greg Hudson <ghudson@mit.edu>1997-09-14 18:12:16 +0000
commitce6018836c422c86a729ba39fe5433ec11b87b02 (patch)
treecc2e6601489384fcf41e0565e8906822897b46c8 /zhm/queue.c
parentac16f380e349fa39ec7e26bccb5456cb300006a5 (diff)
Pull in sources from zephyr locker. See /mit/zephyr/repository for
detailed change information.
Diffstat (limited to 'zhm/queue.c')
-rw-r--r--zhm/queue.c383
1 files changed, 179 insertions, 204 deletions
diff --git a/zhm/queue.c b/zhm/queue.c
index fd87546..5e0ca3a 100644
--- a/zhm/queue.c
+++ b/zhm/queue.c
@@ -20,249 +20,224 @@ static char rcsid_queue_c[] = "$Header$";
#endif /* lint */
typedef struct _Queue {
- long timeout;
- int retries;
- ZNotice_t z_notice;
- caddr_t z_packet;
- struct sockaddr_in reply;
+ Timer *timer;
+ int retries;
+ ZNotice_t notice;
+ caddr_t packet;
+ struct sockaddr_in reply;
+ struct _Queue *next, **prev_p;
} Queue;
-struct _qelem {
- struct _qelem *q_forw;
- struct _qelem *q_back;
- Queue *q_data;
-};
+static Queue *hm_queue;
+static int retransmits_enabled = 0;
-typedef struct _qelem Qelem;
-
-Qelem hm_queue = { &hm_queue, &hm_queue, NULL }, *is_in_queue();
+static Queue *find_notice_in_queue __P((ZNotice_t *notice));
+static Code_t dump_queue __P((void));
+static void queue_timeout __P((void *arg));
int rexmit_times[] = { 2, 2, 4, 4, 8, -1 };
-extern long time();
-extern int timeout_type;
+#ifdef DEBUG
+Code_t dump_queue();
+#endif
void init_queue()
{
- while (hm_queue.q_forw != &hm_queue) {
- free(hm_queue.q_forw->q_data->z_packet);
- free((char *)hm_queue.q_forw->q_data);
- remque(hm_queue.q_forw);
- free((char *)hm_queue.q_forw);
- }
-
- hm_queue.q_forw = hm_queue.q_back = &hm_queue;
- hm_queue.q_data = NULL;
- DPR ("Queue initialized and flushed.\n");
+ Queue *q;
+
+ while (hm_queue) {
+ q = hm_queue;
+ if (q->timer)
+ timer_reset(q->timer);
+ free(q->packet);
+ hm_queue = q->next;
+ free(q);
+ }
+
+ DPR("Queue initialized and flushed.\n");
}
Code_t add_notice_to_queue(notice, packet, repl, len)
-ZNotice_t *notice;
-caddr_t packet;
-struct sockaddr_in *repl;
-int len;
+ ZNotice_t *notice;
+ char * packet;
+ struct sockaddr_in *repl;
+ int len;
{
- Qelem *elem;
- Queue *entry;
-
- DPR ("Adding notice to queue...\n");
- if (!is_in_queue(notice)) {
- elem = (Qelem *)malloc(sizeof(Qelem));
- entry = (Queue *)malloc(sizeof(Queue));
- entry->timeout = time((time_t *)0) + rexmit_times[0];
- entry->retries = 0;
- entry->z_packet = (char *)malloc(Z_MAXPKTLEN);
- (void) memcpy(entry->z_packet, packet, Z_MAXPKTLEN);
- if (ZParseNotice(entry->z_packet, len, &entry->z_notice)
- != ZERR_NONE) {
- syslog(LOG_ERR, "ZParseNotice failed, but succeeded before");
- free(entry->z_packet);
- } else {
- entry->reply = *repl;
- elem->q_data = entry;
- elem->q_forw = elem;
- elem->q_back = elem;
- insque(elem, hm_queue.q_back);
- }
- }
-#ifdef DEBUG
- if (!is_in_queue(notice))
- return(ZERR_NONOTICE);
- else
-#endif /* DEBUG */
- return(ZERR_NONE);
+ Queue *entry;
+
+ DPR("Adding notice to queue...\n");
+ if (!find_notice_in_queue(notice)) {
+ entry = (Queue *) malloc(sizeof(Queue));
+ entry->retries = 0;
+ entry->packet = (char *) malloc(Z_MAXPKTLEN);
+ memcpy(entry->packet, packet, Z_MAXPKTLEN);
+ if (ZParseNotice(entry->packet, len, &entry->notice) != ZERR_NONE) {
+ syslog(LOG_ERR, "ZParseNotice failed, but succeeded before");
+ free(entry->packet);
+ } else {
+ entry->reply = *repl;
+ LIST_INSERT(&hm_queue, entry);
+ }
+ entry->timer = (retransmits_enabled) ?
+ timer_set_rel(rexmit_times[0], queue_timeout, entry) : NULL;
+ }
+ return(ZERR_NONE);
}
Code_t remove_notice_from_queue(notice, kind, repl)
-ZNotice_t *notice;
-ZNotice_Kind_t *kind;
-struct sockaddr_in *repl;
+ ZNotice_t *notice;
+ ZNotice_Kind_t *kind;
+ struct sockaddr_in *repl;
{
- Qelem *elem;
-
- DPR ("Removing notice from queue...\n");
- if ((elem = is_in_queue(notice)) == NULL)
- return(ZERR_NONOTICE);
- else {
- *kind = elem->q_data->z_notice.z_kind;
- *repl = elem->q_data->reply;
- free(elem->q_data->z_packet);
- free((char *)elem->q_data);
- remque(elem);
- free((char *)elem);
- if (hm_queue.q_forw == &hm_queue)
- (void)alarm(0);
+ Queue *entry;
+
+ DPR("Removing notice from queue...\n");
+ entry = find_notice_in_queue(notice);
+ if (entry == NULL)
+ return(ZERR_NONOTICE);
+
+ *kind = entry->notice.z_kind;
+ *repl = entry->reply;
+ timer_reset(entry->timer);
+ free(entry->packet);
+ LIST_DELETE(entry);
#ifdef DEBUG
- dump_queue();
+ dump_queue();
#endif /* DEBUG */
- return(ZERR_NONE);
- }
+ return(ZERR_NONE);
}
+/* We have a server; transmit all of our packets. */
void retransmit_queue(sin)
-struct sockaddr_in *sin;
+ struct sockaddr_in *sin;
{
- Qelem *srch;
- Code_t ret;
+ Queue *entry;
+ Code_t ret;
+
+ DPR("Retransmitting queue to new server...\n");
+ ret = ZSetDestAddr(sin);
+ if (ret != ZERR_NONE) {
+ Zperr (ret);
+ com_err("queue", ret, "setting destination");
+ }
+ for (entry = hm_queue; entry; entry = entry->next) {
+ DPR("notice:\n");
+ DPR2("\tz_kind: %d\n", entry->notice.z_kind);
+ DPR2("\tz_port: %u\n", ntohs(entry->notice.z_port));
+ DPR2("\tz_class: %s\n", entry->notice.z_class);
+ DPR2("\tz_clss_inst: %s\n", entry->notice.z_class_inst);
+ DPR2("\tz_opcode: %s\n", entry->notice.z_opcode);
+ DPR2("\tz_sender: %s\n", entry->notice.z_sender);
+ DPR2("\tz_recip: %s\n", entry->notice.z_recipient);
+ ret = send_outgoing(&entry->notice);
+ if (ret != ZERR_NONE) {
+ Zperr(ret);
+ com_err("queue", ret, "sending raw notice");
+ }
+ entry->timer = timer_set_rel(rexmit_times[0], queue_timeout, entry);
+ entry->retries = 0;
+ }
+ retransmits_enabled = 1;
+}
- DPR ("Retransmitting queue to new server...\n");
- if ((ret = ZSetDestAddr(sin)) != ZERR_NONE) {
- Zperr (ret);
- com_err("queue", ret, "setting destination");
- }
- if ((srch = hm_queue.q_forw) != &hm_queue) {
- do {
- DPR ("notice:\n");
- DPR2 ("\tz_kind: %d\n", srch->q_data->z_notice.z_kind);
- DPR2 ("\tz_port: %u\n",
- ntohs(srch->q_data->z_notice.z_port));
- DPR2 ("\tz_class: %s\n", srch->q_data->z_notice.z_class);
- DPR2 ("\tz_clss_inst: %s\n",
- srch->q_data->z_notice.z_class_inst);
- DPR2 ("\tz_opcode: %s\n", srch->q_data->z_notice.z_opcode);
- DPR2 ("\tz_sender: %s\n", srch->q_data->z_notice.z_sender);
- DPR2 ("\tz_recip: %s\n", srch->q_data->z_notice.z_recipient);
- if ((ret = send_outgoing(&srch->q_data->z_notice))
- != ZERR_NONE) {
- Zperr (ret);
- com_err("queue", ret, "sending raw notice");
- }
- srch->q_data->timeout = time(0) + rexmit_times[0];
- srch->q_data->retries = 0;
- srch = srch->q_forw;
- } while (srch != &hm_queue);
- timeout_type = NOTICES;
- (void)alarm(rexmit_times[0]);
- }
+/* We lost our server; nuke all of our timers. */
+void disable_queue_retransmits()
+{
+ Queue *entry;
+
+ for (entry = hm_queue; entry; entry = entry->next) {
+ if (entry->timer)
+ timer_reset(entry->timer);
+ entry->timer = NULL;
+ }
+ retransmits_enabled = 0;
}
#ifdef DEBUG
-Code_t dump_queue()
+static Code_t dump_queue()
{
- Qelem *srch;
- caddr_t mp;
- int ml;
-
- DPR ("Dumping queue...\n");
- if ((srch = hm_queue.q_forw) == &hm_queue)
- printf("Queue is empty.\n");
- else do {
- printf("notice:\n");
- printf("\tz_kind: %d\n", srch->q_data->z_notice.z_kind);
- printf("\tz_port: %u\n", ntohs(srch->q_data->z_notice.z_port));
- printf("\tz_class: %s\n", srch->q_data->z_notice.z_class);
- printf("\tz_clss_inst: %s\n", srch->q_data->z_notice.z_class_inst);
- printf("\tz_opcode: %s\n", srch->q_data->z_notice.z_opcode);
- printf("\tz_sender: %s\n", srch->q_data->z_notice.z_sender);
- printf("\tz_recip: %s\n", srch->q_data->z_notice.z_recipient);
- printf("\tMessage:\n");
- mp = srch->q_data->z_notice.z_message;
- for (ml = strlen(mp)+1;
- ml <= srch->q_data->z_notice.z_message_len; ml++) {
- printf("\t%s\n", mp);
- mp += strlen(mp)+1;
- ml += strlen(mp);
- }
- srch = srch->q_forw;
- } while (srch != &hm_queue);
+ Queue *entry;
+ caddr_t mp;
+ int ml;
+
+ DPR("Dumping queue...\n");
+ if (!hm_queue) {
+ printf("Queue is empty.\n");
+ return;
+ }
+
+ for (entry = hm_queue; entry; entry = entry->next) {
+ printf("notice:\n");
+ printf("\tz_kind: %d\n", entry->notice.z_kind);
+ printf("\tz_port: %u\n", ntohs(entry->notice.z_port));
+ printf("\tz_class: %s\n", entry->notice.z_class);
+ printf("\tz_clss_inst: %s\n", entry->notice.z_class_inst);
+ printf("\tz_opcode: %s\n", entry->notice.z_opcode);
+ printf("\tz_sender: %s\n", entry->notice.z_sender);
+ printf("\tz_recip: %s\n", entry->notice.z_recipient);
+ printf("\tMessage:\n");
+ mp = entry->notice.z_message;
+ for (ml = strlen(mp) + 1; ml <= entry->notice.z_message_len; ml++) {
+ printf("\t%s\n", mp);
+ mp += strlen(mp)+1;
+ ml += strlen(mp);
+ }
+ }
}
#endif /* DEBUG */
int queue_len()
{
- int length = 0;
- Qelem *srch;
+ int length = 0;
+ Queue *entry;
- if ((srch = hm_queue.q_forw) != &hm_queue) {
- do {
- length++;
- srch = srch->q_forw;
- } while (srch != &hm_queue);
- }
- return(length);
+ for (entry = hm_queue; entry; entry = entry->next)
+ length++;
+ return length;
}
-Qelem *is_in_queue(notice)
-ZNotice_t *notice;
+static Queue *find_notice_in_queue(notice)
+ ZNotice_t *notice;
{
- Qelem *srch;
+ Queue *entry;
- srch = hm_queue.q_forw;
- if (srch == &hm_queue)
- return(NULL);
- do {
- if (ZCompareUID(&(srch->q_data->z_notice.z_uid), &(notice->z_uid)))
- return(srch);
- srch = srch->q_forw;
- } while (srch != &hm_queue);
- return(NULL);
+ for (entry = hm_queue; entry; entry = entry->next) {
+ if (ZCompareUID(&entry->notice.z_uid, &notice->z_uid))
+ return entry;
+ }
+ return NULL;
}
-void resend_notices(sin)
-struct sockaddr_in *sin;
+static void queue_timeout(arg)
+ void *arg;
{
- Qelem *srch;
- Code_t ret;
-
- DPR ("Resending notices...\n");
- if ((ret = ZSetDestAddr(sin)) != ZERR_NONE) {
- Zperr(ret);
- com_err("queue", ret, "setting destination");
- }
- if ((srch = hm_queue.q_forw) == &hm_queue) {
- syslog (LOG_INFO, "No notices, shouldn't have happened!");
- } else do {
- if (srch->q_data->timeout <= time((time_t *)0)) {
- srch->q_data->retries++;
- if (rexmit_times[srch->q_data->retries] == -1) {
- new_server((char *)NULL);
- break;
- } else {
- DPR ("notice:\n");
- DPR2 ("\tz_kind: %d\n", srch->q_data->z_notice.z_kind);
- DPR2 ("\tz_port: %u\n",
- ntohs(srch->q_data->z_notice.z_port));
- DPR2 ("\tz_class: %s\n",
- srch->q_data->z_notice.z_class);
- DPR2 ("\tz_clss_inst: %s\n",
- srch->q_data->z_notice.z_class_inst);
- DPR2 ("\tz_opcode: %s\n",
- srch->q_data->z_notice.z_opcode);
- DPR2 ("\tz_sender: %s\n",
- srch->q_data->z_notice.z_sender);
- DPR2 ("\tz_recip: %s\n",
- srch->q_data->z_notice.z_recipient);
- if ((ret = send_outgoing(&srch->q_data->z_notice))
- != ZERR_NONE) {
- Zperr(ret);
- com_err("queue", ret, "sending raw notice");
- }
- srch->q_data->timeout = time((time_t *)0) +
- rexmit_times[srch->q_data->retries];
- }
- srch = srch->q_forw;
- }
- } while (srch != &hm_queue);
- if (timeout_type == NOTICES)
- (void)alarm(rexmit_times[0]);
+ Queue *entry = (Queue *) arg;
+ Code_t ret;
+
+ ret = ZSetDestAddr(&serv_sin);
+ if (ret != ZERR_NONE) {
+ Zperr(ret);
+ com_err("queue", ret, "setting destination");
+ }
+ entry->retries++;
+ if (rexmit_times[entry->retries] == -1) {
+ new_server(NULL);
+ return;
+ }
+ DPR("Resending notice:\n");
+ DPR2("\tz_kind: %d\n", entry->notice.z_kind);
+ DPR2("\tz_port: %u\n", ntohs(entry->notice.z_port));
+ DPR2("\tz_class: %s\n", entry->notice.z_class);
+ DPR2("\tz_clss_inst: %s\n", entry->notice.z_class_inst);
+ DPR2("\tz_opcode: %s\n", entry->notice.z_opcode);
+ DPR2("\tz_sender: %s\n", entry->notice.z_sender);
+ DPR2("\tz_recip: %s\n", entry->notice.z_recipient);
+ ret = send_outgoing(&entry->notice);
+ if (ret != ZERR_NONE) {
+ Zperr(ret);
+ com_err("queue", ret, "sending raw notice");
+ }
+ entry->timer = timer_set_rel(rexmit_times[entry->retries], queue_timeout,
+ entry);
}
+