module Cardano.Streaming
( withChainSyncEventStream,
ChainSyncEvent (..),
ChainSyncEventException (..),
)
where
import Cardano.Api (BlockInMode, CardanoMode, ChainPoint, ChainSyncClient (ChainSyncClient), ChainTip,
ConsensusModeParams (CardanoModeParams), EpochSlots (EpochSlots),
LocalChainSyncClient (LocalChainSyncClient),
LocalNodeClientProtocols (LocalNodeClientProtocols, localChainSyncClient, localStateQueryClient, localTxMonitoringClient, localTxSubmissionClient),
LocalNodeConnectInfo (LocalNodeConnectInfo, localConsensusModeParams, localNodeNetworkId, localNodeSocketPath),
NetworkId, connectToLocalNode)
import Cardano.Api.ChainSync.Client (ClientStIdle (SendMsgFindIntersect, SendMsgRequestNext),
ClientStIntersect (ClientStIntersect, recvMsgIntersectFound, recvMsgIntersectNotFound),
ClientStNext (ClientStNext, recvMsgRollBackward, recvMsgRollForward))
import Control.Concurrent.Async (ExceptionInLinkedThread (ExceptionInLinkedThread), link, withAsync)
import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, takeMVar)
import Control.Exception (Exception, SomeException (SomeException), catch, throw)
import GHC.Generics (Generic)
import Streaming (Of, Stream)
import Streaming.Prelude qualified as S
data ChainSyncEvent a
= RollForward a ChainTip
| RollBackward ChainPoint ChainTip
deriving (Int -> ChainSyncEvent a -> ShowS
[ChainSyncEvent a] -> ShowS
ChainSyncEvent a -> String
(Int -> ChainSyncEvent a -> ShowS)
-> (ChainSyncEvent a -> String)
-> ([ChainSyncEvent a] -> ShowS)
-> Show (ChainSyncEvent a)
forall a. Show a => Int -> ChainSyncEvent a -> ShowS
forall a. Show a => [ChainSyncEvent a] -> ShowS
forall a. Show a => ChainSyncEvent a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ChainSyncEvent a] -> ShowS
$cshowList :: forall a. Show a => [ChainSyncEvent a] -> ShowS
show :: ChainSyncEvent a -> String
$cshow :: forall a. Show a => ChainSyncEvent a -> String
showsPrec :: Int -> ChainSyncEvent a -> ShowS
$cshowsPrec :: forall a. Show a => Int -> ChainSyncEvent a -> ShowS
Show, a -> ChainSyncEvent b -> ChainSyncEvent a
(a -> b) -> ChainSyncEvent a -> ChainSyncEvent b
(forall a b. (a -> b) -> ChainSyncEvent a -> ChainSyncEvent b)
-> (forall a b. a -> ChainSyncEvent b -> ChainSyncEvent a)
-> Functor ChainSyncEvent
forall a b. a -> ChainSyncEvent b -> ChainSyncEvent a
forall a b. (a -> b) -> ChainSyncEvent a -> ChainSyncEvent b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> ChainSyncEvent b -> ChainSyncEvent a
$c<$ :: forall a b. a -> ChainSyncEvent b -> ChainSyncEvent a
fmap :: (a -> b) -> ChainSyncEvent a -> ChainSyncEvent b
$cfmap :: forall a b. (a -> b) -> ChainSyncEvent a -> ChainSyncEvent b
Functor, (forall x. ChainSyncEvent a -> Rep (ChainSyncEvent a) x)
-> (forall x. Rep (ChainSyncEvent a) x -> ChainSyncEvent a)
-> Generic (ChainSyncEvent a)
forall x. Rep (ChainSyncEvent a) x -> ChainSyncEvent a
forall x. ChainSyncEvent a -> Rep (ChainSyncEvent a) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall a x. Rep (ChainSyncEvent a) x -> ChainSyncEvent a
forall a x. ChainSyncEvent a -> Rep (ChainSyncEvent a) x
$cto :: forall a x. Rep (ChainSyncEvent a) x -> ChainSyncEvent a
$cfrom :: forall a x. ChainSyncEvent a -> Rep (ChainSyncEvent a) x
Generic)
data ChainSyncEventException
= NoIntersectionFound
deriving (Int -> ChainSyncEventException -> ShowS
[ChainSyncEventException] -> ShowS
ChainSyncEventException -> String
(Int -> ChainSyncEventException -> ShowS)
-> (ChainSyncEventException -> String)
-> ([ChainSyncEventException] -> ShowS)
-> Show ChainSyncEventException
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ChainSyncEventException] -> ShowS
$cshowList :: [ChainSyncEventException] -> ShowS
show :: ChainSyncEventException -> String
$cshow :: ChainSyncEventException -> String
showsPrec :: Int -> ChainSyncEventException -> ShowS
$cshowsPrec :: Int -> ChainSyncEventException -> ShowS
Show)
instance Exception ChainSyncEventException
withChainSyncEventStream ::
FilePath ->
NetworkId ->
[ChainPoint] ->
(Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r -> IO b) ->
IO b
withChainSyncEventStream :: String
-> NetworkId
-> [ChainPoint]
-> (Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> IO b)
-> IO b
withChainSyncEventStream String
socketPath NetworkId
networkId [ChainPoint]
points Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r -> IO b
consumer = do
MVar (ChainSyncEvent (BlockInMode CardanoMode))
nextBlockVar <- IO (MVar (ChainSyncEvent (BlockInMode CardanoMode)))
forall a. IO (MVar a)
newEmptyMVar
let client :: ChainSyncClient (BlockInMode CardanoMode) ChainPoint ChainTip IO ()
client = [ChainPoint]
-> MVar (ChainSyncEvent (BlockInMode CardanoMode))
-> ChainSyncClient
(BlockInMode CardanoMode) ChainPoint ChainTip IO ()
forall e.
[ChainPoint]
-> MVar (ChainSyncEvent e)
-> ChainSyncClient e ChainPoint ChainTip IO ()
chainSyncStreamingClient [ChainPoint]
points MVar (ChainSyncEvent (BlockInMode CardanoMode))
nextBlockVar
localNodeClientProtocols :: LocalNodeClientProtocols
(BlockInMode CardanoMode)
ChainPoint
ChainTip
slot
tx
txid
txerr
query
IO
localNodeClientProtocols =
LocalNodeClientProtocols :: forall block point tip slot tx txid txerr (query :: * -> *)
(m :: * -> *).
LocalChainSyncClient block point tip m
-> Maybe (LocalTxSubmissionClient tx txerr m ())
-> Maybe (LocalStateQueryClient block point query m ())
-> Maybe (LocalTxMonitorClient txid tx slot m ())
-> LocalNodeClientProtocols
block point tip slot tx txid txerr query m
LocalNodeClientProtocols
{ localChainSyncClient :: LocalChainSyncClient
(BlockInMode CardanoMode) ChainPoint ChainTip IO
localChainSyncClient = ChainSyncClient (BlockInMode CardanoMode) ChainPoint ChainTip IO ()
-> LocalChainSyncClient
(BlockInMode CardanoMode) ChainPoint ChainTip IO
forall block point tip (m :: * -> *).
ChainSyncClient block point tip m ()
-> LocalChainSyncClient block point tip m
LocalChainSyncClient ChainSyncClient (BlockInMode CardanoMode) ChainPoint ChainTip IO ()
client,
localStateQueryClient :: Maybe
(LocalStateQueryClient
(BlockInMode CardanoMode) ChainPoint query IO ())
localStateQueryClient = Maybe
(LocalStateQueryClient
(BlockInMode CardanoMode) ChainPoint query IO ())
forall a. Maybe a
Nothing,
localTxMonitoringClient :: Maybe (LocalTxMonitorClient txid tx slot IO ())
localTxMonitoringClient = Maybe (LocalTxMonitorClient txid tx slot IO ())
forall a. Maybe a
Nothing,
localTxSubmissionClient :: Maybe (LocalTxSubmissionClient tx txerr IO ())
localTxSubmissionClient = Maybe (LocalTxSubmissionClient tx txerr IO ())
forall a. Maybe a
Nothing
}
connectInfo :: LocalNodeConnectInfo CardanoMode
connectInfo =
LocalNodeConnectInfo :: forall mode.
ConsensusModeParams mode
-> NetworkId -> String -> LocalNodeConnectInfo mode
LocalNodeConnectInfo
{ localConsensusModeParams :: ConsensusModeParams CardanoMode
localConsensusModeParams = EpochSlots -> ConsensusModeParams CardanoMode
CardanoModeParams EpochSlots
epochSlots,
localNodeNetworkId :: NetworkId
localNodeNetworkId = NetworkId
networkId,
localNodeSocketPath :: String
localNodeSocketPath = String
socketPath
}
epochSlots :: EpochSlots
epochSlots = Word64 -> EpochSlots
EpochSlots Word64
21600
IO () -> (Async () -> IO b) -> IO b
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync (LocalNodeConnectInfo CardanoMode
-> LocalNodeClientProtocolsInMode CardanoMode -> IO ()
forall mode.
LocalNodeConnectInfo mode
-> LocalNodeClientProtocolsInMode mode -> IO ()
connectToLocalNode LocalNodeConnectInfo CardanoMode
connectInfo LocalNodeClientProtocolsInMode CardanoMode
forall slot tx txid txerr (query :: * -> *).
LocalNodeClientProtocols
(BlockInMode CardanoMode)
ChainPoint
ChainTip
slot
tx
txid
txerr
query
IO
localNodeClientProtocols) ((Async () -> IO b) -> IO b) -> (Async () -> IO b) -> IO b
forall a b. (a -> b) -> a -> b
$ \Async ()
a -> do
Async () -> IO ()
forall a. Async a -> IO ()
link Async ()
a
Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r -> IO b
consumer (Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> IO b)
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> IO b
forall a b. (a -> b) -> a -> b
$ IO (ChainSyncEvent (BlockInMode CardanoMode))
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
forall (m :: * -> *) a r. Monad m => m a -> Stream (Of a) m r
S.repeatM (IO (ChainSyncEvent (BlockInMode CardanoMode))
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r)
-> IO (ChainSyncEvent (BlockInMode CardanoMode))
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
forall a b. (a -> b) -> a -> b
$ MVar (ChainSyncEvent (BlockInMode CardanoMode))
-> IO (ChainSyncEvent (BlockInMode CardanoMode))
forall a. MVar a -> IO a
takeMVar MVar (ChainSyncEvent (BlockInMode CardanoMode))
nextBlockVar
IO b -> (ExceptionInLinkedThread -> IO b) -> IO b
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` \(ExceptionInLinkedThread Async a
_ (SomeException e
e)) -> e -> IO b
forall a e. Exception e => e -> a
throw e
e
chainSyncStreamingClient ::
[ChainPoint] ->
MVar (ChainSyncEvent e) ->
ChainSyncClient e ChainPoint ChainTip IO ()
chainSyncStreamingClient :: [ChainPoint]
-> MVar (ChainSyncEvent e)
-> ChainSyncClient e ChainPoint ChainTip IO ()
chainSyncStreamingClient [ChainPoint]
points MVar (ChainSyncEvent e)
nextChainEventVar =
IO (ClientStIdle e ChainPoint ChainTip IO ())
-> ChainSyncClient e ChainPoint ChainTip IO ()
forall header point tip (m :: * -> *) a.
m (ClientStIdle header point tip m a)
-> ChainSyncClient header point tip m a
ChainSyncClient (IO (ClientStIdle e ChainPoint ChainTip IO ())
-> ChainSyncClient e ChainPoint ChainTip IO ())
-> IO (ClientStIdle e ChainPoint ChainTip IO ())
-> ChainSyncClient e ChainPoint ChainTip IO ()
forall a b. (a -> b) -> a -> b
$ ClientStIdle e ChainPoint ChainTip IO ()
-> IO (ClientStIdle e ChainPoint ChainTip IO ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ClientStIdle e ChainPoint ChainTip IO ()
-> IO (ClientStIdle e ChainPoint ChainTip IO ()))
-> ClientStIdle e ChainPoint ChainTip IO ()
-> IO (ClientStIdle e ChainPoint ChainTip IO ())
forall a b. (a -> b) -> a -> b
$ [ChainPoint]
-> ClientStIntersect e ChainPoint ChainTip IO ()
-> ClientStIdle e ChainPoint ChainTip IO ()
forall point header tip (m :: * -> *) a.
[point]
-> ClientStIntersect header point tip m a
-> ClientStIdle header point tip m a
SendMsgFindIntersect [ChainPoint]
points ClientStIntersect e ChainPoint ChainTip IO ()
forall a. ClientStIntersect e ChainPoint ChainTip IO a
onIntersect
where
onIntersect :: ClientStIntersect e ChainPoint ChainTip IO a
onIntersect =
ClientStIntersect :: forall header point tip (m :: * -> *) a.
(point -> tip -> ChainSyncClient header point tip m a)
-> (tip -> ChainSyncClient header point tip m a)
-> ClientStIntersect header point tip m a
ClientStIntersect
{ recvMsgIntersectFound :: ChainPoint
-> ChainTip -> ChainSyncClient e ChainPoint ChainTip IO a
recvMsgIntersectFound = \ChainPoint
cp ChainTip
ct ->
IO (ClientStIdle e ChainPoint ChainTip IO a)
-> ChainSyncClient e ChainPoint ChainTip IO a
forall header point tip (m :: * -> *) a.
m (ClientStIdle header point tip m a)
-> ChainSyncClient header point tip m a
ChainSyncClient (IO (ClientStIdle e ChainPoint ChainTip IO a)
-> ChainSyncClient e ChainPoint ChainTip IO a)
-> IO (ClientStIdle e ChainPoint ChainTip IO a)
-> ChainSyncClient e ChainPoint ChainTip IO a
forall a b. (a -> b) -> a -> b
$ do
MVar (ChainSyncEvent e) -> ChainSyncEvent e -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (ChainSyncEvent e)
nextChainEventVar (ChainPoint -> ChainTip -> ChainSyncEvent e
forall a. ChainPoint -> ChainTip -> ChainSyncEvent a
RollBackward ChainPoint
cp ChainTip
ct)
IO (ClientStIdle e ChainPoint ChainTip IO a)
forall a. IO (ClientStIdle e ChainPoint ChainTip IO a)
sendRequestNext,
recvMsgIntersectNotFound :: ChainTip -> ChainSyncClient e ChainPoint ChainTip IO a
recvMsgIntersectNotFound =
ChainSyncEventException
-> ChainTip -> ChainSyncClient e ChainPoint ChainTip IO a
forall a e. Exception e => e -> a
throw ChainSyncEventException
NoIntersectionFound
}
sendRequestNext :: IO (ClientStIdle e ChainPoint ChainTip IO a)
sendRequestNext =
ClientStIdle e ChainPoint ChainTip IO a
-> IO (ClientStIdle e ChainPoint ChainTip IO a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ClientStIdle e ChainPoint ChainTip IO a
-> IO (ClientStIdle e ChainPoint ChainTip IO a))
-> ClientStIdle e ChainPoint ChainTip IO a
-> IO (ClientStIdle e ChainPoint ChainTip IO a)
forall a b. (a -> b) -> a -> b
$ ClientStNext e ChainPoint ChainTip IO a
-> IO (ClientStNext e ChainPoint ChainTip IO a)
-> ClientStIdle e ChainPoint ChainTip IO a
forall header point tip (m :: * -> *) a.
ClientStNext header point tip m a
-> m (ClientStNext header point tip m a)
-> ClientStIdle header point tip m a
SendMsgRequestNext ClientStNext e ChainPoint ChainTip IO a
onNext (ClientStNext e ChainPoint ChainTip IO a
-> IO (ClientStNext e ChainPoint ChainTip IO a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure ClientStNext e ChainPoint ChainTip IO a
onNext)
where
onNext :: ClientStNext e ChainPoint ChainTip IO a
onNext =
ClientStNext :: forall header point tip (m :: * -> *) a.
(header -> tip -> ChainSyncClient header point tip m a)
-> (point -> tip -> ChainSyncClient header point tip m a)
-> ClientStNext header point tip m a
ClientStNext
{ recvMsgRollForward :: e -> ChainTip -> ChainSyncClient e ChainPoint ChainTip IO a
recvMsgRollForward = \e
bim ChainTip
ct ->
IO (ClientStIdle e ChainPoint ChainTip IO a)
-> ChainSyncClient e ChainPoint ChainTip IO a
forall header point tip (m :: * -> *) a.
m (ClientStIdle header point tip m a)
-> ChainSyncClient header point tip m a
ChainSyncClient (IO (ClientStIdle e ChainPoint ChainTip IO a)
-> ChainSyncClient e ChainPoint ChainTip IO a)
-> IO (ClientStIdle e ChainPoint ChainTip IO a)
-> ChainSyncClient e ChainPoint ChainTip IO a
forall a b. (a -> b) -> a -> b
$ do
MVar (ChainSyncEvent e) -> ChainSyncEvent e -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (ChainSyncEvent e)
nextChainEventVar (e -> ChainTip -> ChainSyncEvent e
forall a. a -> ChainTip -> ChainSyncEvent a
RollForward e
bim ChainTip
ct)
IO (ClientStIdle e ChainPoint ChainTip IO a)
sendRequestNext,
recvMsgRollBackward :: ChainPoint
-> ChainTip -> ChainSyncClient e ChainPoint ChainTip IO a
recvMsgRollBackward = \ChainPoint
cp ChainTip
ct ->
IO (ClientStIdle e ChainPoint ChainTip IO a)
-> ChainSyncClient e ChainPoint ChainTip IO a
forall header point tip (m :: * -> *) a.
m (ClientStIdle header point tip m a)
-> ChainSyncClient header point tip m a
ChainSyncClient (IO (ClientStIdle e ChainPoint ChainTip IO a)
-> ChainSyncClient e ChainPoint ChainTip IO a)
-> IO (ClientStIdle e ChainPoint ChainTip IO a)
-> ChainSyncClient e ChainPoint ChainTip IO a
forall a b. (a -> b) -> a -> b
$ do
MVar (ChainSyncEvent e) -> ChainSyncEvent e -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (ChainSyncEvent e)
nextChainEventVar (ChainPoint -> ChainTip -> ChainSyncEvent e
forall a. ChainPoint -> ChainTip -> ChainSyncEvent a
RollBackward ChainPoint
cp ChainTip
ct)
IO (ClientStIdle e ChainPoint ChainTip IO a)
sendRequestNext
}