1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
{- git-annex assistant out of band network messager types
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
module Assistant.Types.NetMessager where
import Common.Annex
import Assistant.Pairing
import Data.Text (Text)
import Control.Concurrent.STM
import Control.Concurrent.MSampleVar
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as B8
import qualified Data.Set as S
import qualified Data.Map as M
{- Messages that can be sent out of band by a network messager. -}
data NetMessage
-- indicate that pushes have been made to the repos with these uuids
= NotifyPush [UUID]
-- requests other clients to inform us of their presence
| QueryPresence
-- notification about a stage in the pairing process,
-- involving a client, and a UUID.
| PairingNotification PairStage ClientID UUID
-- used for git push over the network messager
| Pushing ClientID PushStage
deriving (Show, Eq, Ord)
{- Something used to identify the client, or clients to send the message to. -}
type ClientID = Text
data PushStage
-- indicates that we have data to push over the out of band network
= CanPush
-- request that a git push be sent over the out of band network
| PushRequest
-- indicates that a push is starting
| StartingPush
-- a chunk of output of git receive-pack
| ReceivePackOutput ByteString
-- a chuck of output of git send-pack
| SendPackOutput ByteString
-- sent when git receive-pack exits, with its exit code
| ReceivePackDone ExitCode
deriving (Show, Eq, Ord)
{- NetMessages that are important (and small), and should be stored to be
- resent when new clients are seen. -}
isImportantNetMessage :: NetMessage -> Maybe ClientID
isImportantNetMessage (Pushing c CanPush) = Just c
isImportantNetMessage (Pushing c PushRequest) = Just c
isImportantNetMessage _ = Nothing
readdressNetMessage :: NetMessage -> ClientID -> NetMessage
readdressNetMessage (PairingNotification stage _ uuid) c = PairingNotification stage c uuid
readdressNetMessage (Pushing _ stage) c = Pushing c stage
readdressNetMessage m _ = m
{- Convert a NetMessage to something that can be logged. -}
sanitizeNetMessage :: NetMessage -> NetMessage
sanitizeNetMessage (Pushing c stage) = Pushing c $ case stage of
ReceivePackOutput _ -> ReceivePackOutput elided
SendPackOutput _ -> SendPackOutput elided
s -> s
where
elided = B8.pack "<elided>"
sanitizeNetMessage m = m
{- Things that initiate either side of a push, but do not actually send data. -}
isPushInitiation :: PushStage -> Bool
isPushInitiation CanPush = True
isPushInitiation PushRequest = True
isPushInitiation StartingPush = True
isPushInitiation _ = False
data PushSide = SendPack | ReceivePack
deriving (Eq, Ord)
pushDestinationSide :: PushStage -> PushSide
pushDestinationSide CanPush = ReceivePack
pushDestinationSide PushRequest = SendPack
pushDestinationSide StartingPush = ReceivePack
pushDestinationSide (ReceivePackOutput _) = SendPack
pushDestinationSide (SendPackOutput _) = ReceivePack
pushDestinationSide (ReceivePackDone _) = SendPack
type SideMap a = PushSide -> a
mkSideMap :: STM a -> IO (SideMap a)
mkSideMap gen = do
(sp, rp) <- atomically $ (,) <$> gen <*> gen
return $ lookupside sp rp
where
lookupside sp _ SendPack = sp
lookupside _ rp ReceivePack = rp
getSide :: PushSide -> SideMap a -> a
getSide side m = m side
data NetMessager = NetMessager
-- outgoing messages
{ netMessages :: TChan (NetMessage)
-- important messages for each client
, importantNetMessages :: TMVar (M.Map ClientID (S.Set NetMessage))
-- important messages that are believed to have been sent to a client
, sentImportantNetMessages :: TMVar (M.Map ClientID (S.Set NetMessage))
-- write to this to restart the net messager
, netMessagerRestart :: MSampleVar ()
-- only one side of a push can be running at a time
, netMessagerPushRunning :: SideMap (TMVar (Maybe ClientID))
-- incoming messages related to a running push
, netMessagesPush :: SideMap (TChan NetMessage)
-- incoming push messages, deferred to be processed later
, netMessagesPushDeferred :: SideMap (TMVar (S.Set NetMessage))
}
newNetMessager :: IO NetMessager
newNetMessager = NetMessager
<$> atomically newTChan
<*> atomically (newTMVar M.empty)
<*> atomically (newTMVar M.empty)
<*> newEmptySV
<*> mkSideMap (newTMVar Nothing)
<*> mkSideMap newTChan
<*> mkSideMap (newTMVar S.empty)
|