summaryrefslogtreecommitdiff
path: root/Assistant/NetMessager.hs
blob: f042b4e4e86077291c2dd546913619ad39d9b95a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
{- git-annex assistant out of band network messager interface
 -
 - Copyright 2012-2013 Joey Hess <joey@kitenet.net>
 -
 - Licensed under the GNU GPL version 3 or higher.
 -}

{-# LANGUAGE BangPatterns #-}

module Assistant.NetMessager where

import Assistant.Common
import Assistant.Types.NetMessager

import Control.Concurrent.STM
import Control.Concurrent.MSampleVar
import qualified Data.Set as S
import qualified Data.Map as M
import qualified Data.DList as D

sendNetMessage :: NetMessage -> Assistant ()
sendNetMessage m = 
	(atomically . flip writeTChan m) <<~ (netMessages . netMessager)

waitNetMessage :: Assistant (NetMessage)
waitNetMessage = (atomically . readTChan) <<~ (netMessages . netMessager)

notifyNetMessagerRestart :: Assistant ()
notifyNetMessagerRestart =
	flip writeSV () <<~ (netMessagerRestart . netMessager)

{- This can be used to get an early indication if the network has
 - changed, to immediately restart a connection. However, that is not
 - available on all systems, so clients also need to deal with
 - restarting dropped connections in the usual way. -}
waitNetMessagerRestart :: Assistant ()
waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessager)

{- Store a new important NetMessage for a client, and if an equivilant
 - older message is already stored, remove it from both importantNetMessages
 - and sentImportantNetMessages. -}
storeImportantNetMessage :: NetMessage -> ClientID -> (ClientID -> Bool) -> Assistant ()
storeImportantNetMessage m client matchingclient = go <<~ netMessager
  where
	go nm = atomically $ do
		q <- takeTMVar $ importantNetMessages nm
		sent <- takeTMVar $ sentImportantNetMessages nm
		putTMVar (importantNetMessages nm) $
			M.alter (Just . maybe (S.singleton m) (S.insert m)) client $
				M.mapWithKey removematching q
		putTMVar (sentImportantNetMessages nm) $
			M.mapWithKey removematching sent
	removematching someclient s
		| matchingclient someclient = S.filter (not . equivilantImportantNetMessages m) s
		| otherwise = s

{- Indicates that an important NetMessage has been sent to a client. -}
sentImportantNetMessage :: NetMessage -> ClientID -> Assistant ()
sentImportantNetMessage m client = go <<~ (sentImportantNetMessages . netMessager)
  where
	go v = atomically $ do
		sent <- takeTMVar v
		putTMVar v $
			M.alter (Just . maybe (S.singleton m) (S.insert m)) client sent

{- Checks for important NetMessages that have been stored for a client, and
 - sent to a client. Typically the same client for both, although 
 - a modified or more specific client may need to be used. -}
checkImportantNetMessages :: (ClientID, ClientID) -> Assistant (S.Set NetMessage, S.Set NetMessage)
checkImportantNetMessages (storedclient, sentclient) = go <<~ netMessager
  where
	go nm = atomically $ do
		stored <- M.lookup storedclient <$> (readTMVar $ importantNetMessages nm)
		sent <- M.lookup sentclient <$> (readTMVar $ sentImportantNetMessages nm)
		return (fromMaybe S.empty stored, fromMaybe S.empty sent)

{- Queues a push initiation message in the queue for the appropriate
 - side of the push but only if there is not already an initiation message
 - from the same client in the queue. -}
queuePushInitiation :: NetMessage -> Assistant ()
queuePushInitiation msg@(Pushing clientid stage) = do
	tv <- getPushInitiationQueue side
	liftIO $ atomically $ do
		r <- tryTakeTMVar tv
		case r of
			Nothing -> putTMVar tv [msg]
			Just l -> do
				let !l' = msg : filter differentclient l
				putTMVar tv l'
  where
	side = pushDestinationSide stage
	differentclient (Pushing cid _) = cid /= clientid
	differentclient _ = True
queuePushInitiation _ = noop

{- Waits for a push inititation message to be received, and runs
 - function to select a message from the queue. -}
waitPushInitiation :: PushSide -> ([NetMessage] -> (NetMessage, [NetMessage])) -> Assistant NetMessage
waitPushInitiation side selector = do
	tv <- getPushInitiationQueue side
	liftIO $ atomically $ do
		q <- takeTMVar tv
		if null q
			then retry
			else do
				let (msg, !q') = selector q
				unless (null q') $
					putTMVar tv q'
				return msg

{- Stores messages for a push into the appropriate inbox.
 -
 - To avoid overflow, only 1000 messages max are stored in any
 - inbox, which should be far more than necessary.
 -
 - TODO: If we have more than 100 inboxes for different clients,
 - discard old ones that are not currently being used by any push.
 -}
storeInbox :: NetMessage -> Assistant ()
storeInbox msg@(Pushing clientid stage) = do
	inboxes <- getInboxes side
	stored <- liftIO $ atomically $ do
		m <- readTVar inboxes
		let update = \v -> do
			writeTVar inboxes $
				M.insertWith' const clientid v m
			return True
		case M.lookup clientid m of
			Nothing -> update (1, tostore)
			Just (sz, l)
				| sz > 1000 -> return False
				| otherwise ->
					let !sz' = sz + 1
					    !l' = D.append l tostore
					in update (sz', l')
	if stored
		then netMessagerDebug clientid ["stored", logNetMessage msg, "in", show side, "inbox"]
		else netMessagerDebug clientid ["discarded", logNetMessage msg, "; ", show side, "inbox is full"]
  where
	side = pushDestinationSide stage
	tostore = D.singleton msg
storeInbox _ = noop

{- Gets the new message for a push from its inbox.
 - Blocks until a message has been received. -}
waitInbox :: ClientID -> PushSide -> Assistant (NetMessage)
waitInbox clientid side = do
	inboxes <- getInboxes side
	liftIO $ atomically $ do
		m <- readTVar inboxes
		case M.lookup clientid m of
			Nothing -> retry
			Just (sz, dl)
				| sz < 1 -> retry
				| otherwise -> do
					let msg = D.head dl
					let dl' = D.tail dl
					let !sz' = sz - 1
					writeTVar inboxes $
						M.insertWith' const clientid (sz', dl') m
					return msg

emptyInbox :: ClientID -> PushSide -> Assistant ()
emptyInbox clientid side = do
	inboxes <- getInboxes side
	liftIO $ atomically $
		modifyTVar' inboxes $
			M.delete clientid
	
getInboxes :: PushSide -> Assistant Inboxes
getInboxes side =
	getSide side . netMessagerInboxes <$> getAssistant netMessager

getPushInitiationQueue :: PushSide -> Assistant (TMVar [NetMessage])
getPushInitiationQueue side =
	getSide side . netMessagerPushInitiations <$> getAssistant netMessager

netMessagerDebug :: ClientID -> [String] -> Assistant ()
netMessagerDebug clientid l = debug $
	"NetMessager" : l ++ [show $ logClientID clientid]