summaryrefslogtreecommitdiff
path: root/Assistant/NetMessager.hs
blob: 2191e06f2fa95380248dbf9af71d24305236bbd9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
{- git-annex assistant out of band network messager interface
 -
 - Copyright 2012 Joey Hess <joey@kitenet.net>
 -
 - Licensed under the GNU GPL version 3 or higher.
 -}

module Assistant.NetMessager where

import Assistant.Common
import Assistant.Types.NetMessager

import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.MSampleVar
import Control.Exception as E
import qualified Data.Set as S

sendNetMessage :: NetMessage -> Assistant ()
sendNetMessage m = 
	(atomically . flip writeTChan m) <<~ (netMessages . netMessager)

waitNetMessage :: Assistant (NetMessage)
waitNetMessage = (atomically . readTChan) <<~ (netMessages . netMessager)

notifyNetMessagerRestart :: Assistant ()
notifyNetMessagerRestart =
	flip writeSV () <<~ (netMessagerRestart . netMessager)

waitNetMessagerRestart :: Assistant ()
waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessager)

{- Runs an action that runs either the send or receive side of a push.
 -
 - While the push is running, netMessagesPush will get messages put into it
 - relating to this push, while any messages relating to other pushes
 - on the same side go to netMessagesDeferred. Once the push finishes,
 - those deferred messages will be fed to handledeferred for processing.
 -}
runPush :: PushSide -> ClientID -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a
runPush side clientid handledeferred a = do
	nm <- getAssistant netMessager
	let runningv = getSide side $ netMessagerPushRunning nm
	let setup = void $ atomically $ swapTMVar runningv $ Just clientid
	let cleanup = atomically $ do
		void $ swapTMVar runningv Nothing
		emptytchan (getSide side $ netMessagesPush nm)
	r <- E.bracket_ setup cleanup <~> a
	(void . forkIO) <~> processdeferred nm
	return r
  where
	emptytchan c = maybe noop (const $ emptytchan c) =<< tryReadTChan c
	processdeferred nm = do
		s <- liftIO $ atomically $ swapTMVar (getSide side $ netMessagesPushDeferred nm) S.empty
		mapM_ rundeferred (S.toList s)
	rundeferred m = (void . (E.try :: (IO () -> IO (Either SomeException ()))))
		<~> handledeferred m

{- While a push is running, matching push messages are put into
 - netMessagesPush, while others that involve the same side go to
 - netMessagesDeferredPush.
 -
 - When no push is running involving the same side, returns False.
 -
 - To avoid bloating memory, only messages that initiate pushes are
 - deferred.
 -}
queueNetPushMessage :: NetMessage -> Assistant Bool
queueNetPushMessage m@(Pushing clientid stage) = do
	nm <- getAssistant netMessager
	liftIO $ atomically $ do
		v <- readTMVar (getSide side $ netMessagerPushRunning nm)
		case v of
			Nothing -> return False
			(Just runningclientid)
				| runningclientid == clientid -> queue nm
				| isPushInitiation stage -> defer nm
				| otherwise -> discard
  where
	side = pushDestinationSide stage
	queue nm = do
		writeTChan (getSide side $ netMessagesPush nm) m
		return True
	defer nm = do
		let mv = getSide side $ netMessagesPushDeferred nm
		s <- takeTMVar mv
		putTMVar mv $ S.insert m s
		return True
	discard = return True
queueNetPushMessage _ = return False

waitNetPushMessage :: PushSide -> Assistant (NetMessage)
waitNetPushMessage side = (atomically . readTChan)
	<<~ (getSide side . netMessagesPush . netMessager)