1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
|
{- git-annex assistant out of band network messager interface
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
module Assistant.NetMessager where
import Assistant.Common
import Assistant.Types.NetMessager
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.MSampleVar
import Control.Exception as E
import qualified Data.Set as S
import qualified Data.Map as M
sendNetMessage :: NetMessage -> Assistant ()
sendNetMessage m =
(atomically . flip writeTChan m) <<~ (netMessages . netMessager)
waitNetMessage :: Assistant (NetMessage)
waitNetMessage = (atomically . readTChan) <<~ (netMessages . netMessager)
notifyNetMessagerRestart :: Assistant ()
notifyNetMessagerRestart =
flip writeSV () <<~ (netMessagerRestart . netMessager)
waitNetMessagerRestart :: Assistant ()
waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessager)
{- Store an important NetMessage for a client, and if the same message was
- already sent, remove it from sentImportantNetMessages. -}
storeImportantNetMessage :: NetMessage -> ClientID -> (ClientID -> Bool) -> Assistant ()
storeImportantNetMessage m client matchingclient = go <<~ netMessager
where
go nm = atomically $ do
q <- takeTMVar $ importantNetMessages nm
sent <- takeTMVar $ sentImportantNetMessages nm
putTMVar (importantNetMessages nm) $
M.alter (Just . maybe (S.singleton m) (S.insert m)) client q
putTMVar (sentImportantNetMessages nm) $
M.mapWithKey removematching sent
removematching someclient s
| matchingclient someclient = S.delete m s
| otherwise = s
{- Indicates that an important NetMessage has been sent to a client. -}
sentImportantNetMessage :: NetMessage -> ClientID -> Assistant ()
sentImportantNetMessage m client = go <<~ (sentImportantNetMessages . netMessager)
where
go v = atomically $ do
sent <- takeTMVar v
putTMVar v $
M.alter (Just . maybe (S.singleton m) (S.insert m)) client sent
{- Checks for important NetMessages that have been stored for a client, and
- sent to a client. Typically the same client for both, although
- a modified or more specific client may need to be used. -}
checkImportantNetMessages :: (ClientID, ClientID) -> Assistant (S.Set NetMessage, S.Set NetMessage)
checkImportantNetMessages (storedclient, sentclient) = go <<~ netMessager
where
go nm = atomically $ do
stored <- M.lookup storedclient <$> (readTMVar $ importantNetMessages nm)
sent <- M.lookup sentclient <$> (readTMVar $ sentImportantNetMessages nm)
return (fromMaybe S.empty stored, fromMaybe S.empty sent)
{- Runs an action that runs either the send or receive side of a push.
-
- While the push is running, netMessagesPush will get messages put into it
- relating to this push, while any messages relating to other pushes
- on the same side go to netMessagesDeferred. Once the push finishes,
- those deferred messages will be fed to handledeferred for processing.
-}
runPush :: PushSide -> ClientID -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a
runPush side clientid handledeferred a = do
nm <- getAssistant netMessager
let runningv = getSide side $ netMessagerPushRunning nm
let setup = void $ atomically $ swapTMVar runningv $ Just clientid
let cleanup = atomically $ do
void $ swapTMVar runningv Nothing
emptytchan (getSide side $ netMessagesPush nm)
r <- E.bracket_ setup cleanup <~> a
(void . forkIO) <~> processdeferred nm
return r
where
emptytchan c = maybe noop (const $ emptytchan c) =<< tryReadTChan c
processdeferred nm = do
s <- liftIO $ atomically $ swapTMVar (getSide side $ netMessagesPushDeferred nm) S.empty
mapM_ rundeferred (S.toList s)
rundeferred m = (void . (E.try :: (IO () -> IO (Either SomeException ()))))
<~> handledeferred m
{- While a push is running, matching push messages are put into
- netMessagesPush, while others that involve the same side go to
- netMessagesPushDeferred.
-
- When no push is running involving the same side, returns False.
-
- To avoid bloating memory, only messages that initiate pushes are
- deferred.
-}
queueNetPushMessage :: NetMessage -> Assistant Bool
queueNetPushMessage m@(Pushing clientid stage) = do
nm <- getAssistant netMessager
liftIO $ atomically $ do
v <- readTMVar (getSide side $ netMessagerPushRunning nm)
case v of
Nothing -> return False
(Just runningclientid)
| runningclientid == clientid -> queue nm
| isPushInitiation stage -> defer nm
| otherwise -> discard
where
side = pushDestinationSide stage
queue nm = do
writeTChan (getSide side $ netMessagesPush nm) m
return True
defer nm = do
let mv = getSide side $ netMessagesPushDeferred nm
s <- takeTMVar mv
putTMVar mv $ S.insert m s
return True
discard = return True
queueNetPushMessage _ = return False
waitNetPushMessage :: PushSide -> Assistant (NetMessage)
waitNetPushMessage side = (atomically . readTChan)
<<~ (getSide side . netMessagesPush . netMessager)
|