module Cardano.Streaming
  ( withChainSyncEventStream,
    ChainSyncEvent (..),
    ChainSyncEventException (..),
  )
where

import Cardano.Api (BlockInMode, CardanoMode, ChainPoint, ChainSyncClient (ChainSyncClient), ChainTip,
                    ConsensusModeParams (CardanoModeParams), EpochSlots (EpochSlots),
                    LocalChainSyncClient (LocalChainSyncClient),
                    LocalNodeClientProtocols (LocalNodeClientProtocols, localChainSyncClient, localStateQueryClient, localTxMonitoringClient, localTxSubmissionClient),
                    LocalNodeConnectInfo (LocalNodeConnectInfo, localConsensusModeParams, localNodeNetworkId, localNodeSocketPath),
                    NetworkId, connectToLocalNode)
import Cardano.Api.ChainSync.Client (ClientStIdle (SendMsgFindIntersect, SendMsgRequestNext),
                                     ClientStIntersect (ClientStIntersect, recvMsgIntersectFound, recvMsgIntersectNotFound),
                                     ClientStNext (ClientStNext, recvMsgRollBackward, recvMsgRollForward))
import Control.Concurrent.Async (ExceptionInLinkedThread (ExceptionInLinkedThread), link, withAsync)
import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, takeMVar)
import Control.Exception (Exception, SomeException (SomeException), catch, throw)
import GHC.Generics (Generic)
import Streaming (Of, Stream)
import Streaming.Prelude qualified as S

data ChainSyncEvent a
  = RollForward a ChainTip
  | RollBackward ChainPoint ChainTip
  deriving (Int -> ChainSyncEvent a -> ShowS
[ChainSyncEvent a] -> ShowS
ChainSyncEvent a -> String
(Int -> ChainSyncEvent a -> ShowS)
-> (ChainSyncEvent a -> String)
-> ([ChainSyncEvent a] -> ShowS)
-> Show (ChainSyncEvent a)
forall a. Show a => Int -> ChainSyncEvent a -> ShowS
forall a. Show a => [ChainSyncEvent a] -> ShowS
forall a. Show a => ChainSyncEvent a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ChainSyncEvent a] -> ShowS
$cshowList :: forall a. Show a => [ChainSyncEvent a] -> ShowS
show :: ChainSyncEvent a -> String
$cshow :: forall a. Show a => ChainSyncEvent a -> String
showsPrec :: Int -> ChainSyncEvent a -> ShowS
$cshowsPrec :: forall a. Show a => Int -> ChainSyncEvent a -> ShowS
Show, a -> ChainSyncEvent b -> ChainSyncEvent a
(a -> b) -> ChainSyncEvent a -> ChainSyncEvent b
(forall a b. (a -> b) -> ChainSyncEvent a -> ChainSyncEvent b)
-> (forall a b. a -> ChainSyncEvent b -> ChainSyncEvent a)
-> Functor ChainSyncEvent
forall a b. a -> ChainSyncEvent b -> ChainSyncEvent a
forall a b. (a -> b) -> ChainSyncEvent a -> ChainSyncEvent b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> ChainSyncEvent b -> ChainSyncEvent a
$c<$ :: forall a b. a -> ChainSyncEvent b -> ChainSyncEvent a
fmap :: (a -> b) -> ChainSyncEvent a -> ChainSyncEvent b
$cfmap :: forall a b. (a -> b) -> ChainSyncEvent a -> ChainSyncEvent b
Functor, (forall x. ChainSyncEvent a -> Rep (ChainSyncEvent a) x)
-> (forall x. Rep (ChainSyncEvent a) x -> ChainSyncEvent a)
-> Generic (ChainSyncEvent a)
forall x. Rep (ChainSyncEvent a) x -> ChainSyncEvent a
forall x. ChainSyncEvent a -> Rep (ChainSyncEvent a) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall a x. Rep (ChainSyncEvent a) x -> ChainSyncEvent a
forall a x. ChainSyncEvent a -> Rep (ChainSyncEvent a) x
$cto :: forall a x. Rep (ChainSyncEvent a) x -> ChainSyncEvent a
$cfrom :: forall a x. ChainSyncEvent a -> Rep (ChainSyncEvent a) x
Generic)

data ChainSyncEventException
  = NoIntersectionFound
  deriving (Int -> ChainSyncEventException -> ShowS
[ChainSyncEventException] -> ShowS
ChainSyncEventException -> String
(Int -> ChainSyncEventException -> ShowS)
-> (ChainSyncEventException -> String)
-> ([ChainSyncEventException] -> ShowS)
-> Show ChainSyncEventException
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ChainSyncEventException] -> ShowS
$cshowList :: [ChainSyncEventException] -> ShowS
show :: ChainSyncEventException -> String
$cshow :: ChainSyncEventException -> String
showsPrec :: Int -> ChainSyncEventException -> ShowS
$cshowsPrec :: Int -> ChainSyncEventException -> ShowS
Show)

instance Exception ChainSyncEventException

-- | `withChainSyncEventStream` uses the chain-sync mini-protocol to
-- connect to a locally running node and fetch blocks from the given
-- starting point.
withChainSyncEventStream ::
  -- | Path to the node socket
  FilePath ->
  NetworkId ->
  -- | The point on the chain to start streaming from
  [ChainPoint] ->
  -- | The stream consumer
  (Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r -> IO b) ->
  IO b
withChainSyncEventStream :: String
-> NetworkId
-> [ChainPoint]
-> (Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
    -> IO b)
-> IO b
withChainSyncEventStream String
socketPath NetworkId
networkId [ChainPoint]
points Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r -> IO b
consumer = do
  -- The chain-sync client runs in a different thread passing the blocks it
  -- receives to the stream consumer through a MVar. The chain-sync client
  -- thread and the stream consumer will each block on each other and stay
  -- in lockstep.
  --
  -- NOTE: choosing a MVar is a tradeoff towards simplicity. In this case a
  -- (bounded) queue could perform better. Indeed a properly-sized buffer
  -- can reduce the time the two threads are blocked waiting for each
  -- other. The problem here is "properly-sized". A bounded queue like
  -- Control.Concurrent.STM.TBQueue allows us to specify a max queue length
  -- but block size can vary a lot (TODO quantify this) depending on the
  -- era. We have an alternative implementation with customizable queue
  -- size (TBMQueue) but it needs to be extracted from the
  -- plutus-chain-index-core package. Using a simple MVar doesn't seem to
  -- slow down marconi's indexing, likely because the difference is
  -- negligeable compared to existing network and IO latencies.  Therefore,
  -- let's stick with a MVar now and revisit later.
  MVar (ChainSyncEvent (BlockInMode CardanoMode))
nextBlockVar <- IO (MVar (ChainSyncEvent (BlockInMode CardanoMode)))
forall a. IO (MVar a)
newEmptyMVar

  let client :: ChainSyncClient (BlockInMode CardanoMode) ChainPoint ChainTip IO ()
client = [ChainPoint]
-> MVar (ChainSyncEvent (BlockInMode CardanoMode))
-> ChainSyncClient
     (BlockInMode CardanoMode) ChainPoint ChainTip IO ()
forall e.
[ChainPoint]
-> MVar (ChainSyncEvent e)
-> ChainSyncClient e ChainPoint ChainTip IO ()
chainSyncStreamingClient [ChainPoint]
points MVar (ChainSyncEvent (BlockInMode CardanoMode))
nextBlockVar

      localNodeClientProtocols :: LocalNodeClientProtocols
  (BlockInMode CardanoMode)
  ChainPoint
  ChainTip
  slot
  tx
  txid
  txerr
  query
  IO
localNodeClientProtocols =
        LocalNodeClientProtocols :: forall block point tip slot tx txid txerr (query :: * -> *)
       (m :: * -> *).
LocalChainSyncClient block point tip m
-> Maybe (LocalTxSubmissionClient tx txerr m ())
-> Maybe (LocalStateQueryClient block point query m ())
-> Maybe (LocalTxMonitorClient txid tx slot m ())
-> LocalNodeClientProtocols
     block point tip slot tx txid txerr query m
LocalNodeClientProtocols
          { localChainSyncClient :: LocalChainSyncClient
  (BlockInMode CardanoMode) ChainPoint ChainTip IO
localChainSyncClient = ChainSyncClient (BlockInMode CardanoMode) ChainPoint ChainTip IO ()
-> LocalChainSyncClient
     (BlockInMode CardanoMode) ChainPoint ChainTip IO
forall block point tip (m :: * -> *).
ChainSyncClient block point tip m ()
-> LocalChainSyncClient block point tip m
LocalChainSyncClient ChainSyncClient (BlockInMode CardanoMode) ChainPoint ChainTip IO ()
client,
            localStateQueryClient :: Maybe
  (LocalStateQueryClient
     (BlockInMode CardanoMode) ChainPoint query IO ())
localStateQueryClient = Maybe
  (LocalStateQueryClient
     (BlockInMode CardanoMode) ChainPoint query IO ())
forall a. Maybe a
Nothing,
            localTxMonitoringClient :: Maybe (LocalTxMonitorClient txid tx slot IO ())
localTxMonitoringClient = Maybe (LocalTxMonitorClient txid tx slot IO ())
forall a. Maybe a
Nothing,
            localTxSubmissionClient :: Maybe (LocalTxSubmissionClient tx txerr IO ())
localTxSubmissionClient = Maybe (LocalTxSubmissionClient tx txerr IO ())
forall a. Maybe a
Nothing
          }

      connectInfo :: LocalNodeConnectInfo CardanoMode
connectInfo =
        LocalNodeConnectInfo :: forall mode.
ConsensusModeParams mode
-> NetworkId -> String -> LocalNodeConnectInfo mode
LocalNodeConnectInfo
          { localConsensusModeParams :: ConsensusModeParams CardanoMode
localConsensusModeParams = EpochSlots -> ConsensusModeParams CardanoMode
CardanoModeParams EpochSlots
epochSlots,
            localNodeNetworkId :: NetworkId
localNodeNetworkId = NetworkId
networkId,
            localNodeSocketPath :: String
localNodeSocketPath = String
socketPath
          }

      -- This a parameter needed only for the Byron era. Since the Byron
      -- era is over and the parameter has never changed it is ok to
      -- hardcode this. See comment on `Cardano.Api.ConsensusModeParams` in
      -- cardano-node.
      epochSlots :: EpochSlots
epochSlots = Word64 -> EpochSlots
EpochSlots Word64
21600

  IO () -> (Async () -> IO b) -> IO b
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync (LocalNodeConnectInfo CardanoMode
-> LocalNodeClientProtocolsInMode CardanoMode -> IO ()
forall mode.
LocalNodeConnectInfo mode
-> LocalNodeClientProtocolsInMode mode -> IO ()
connectToLocalNode LocalNodeConnectInfo CardanoMode
connectInfo LocalNodeClientProtocolsInMode CardanoMode
forall slot tx txid txerr (query :: * -> *).
LocalNodeClientProtocols
  (BlockInMode CardanoMode)
  ChainPoint
  ChainTip
  slot
  tx
  txid
  txerr
  query
  IO
localNodeClientProtocols) ((Async () -> IO b) -> IO b) -> (Async () -> IO b) -> IO b
forall a b. (a -> b) -> a -> b
$ \Async ()
a -> do
    -- Make sure all exceptions in the client thread are passed to the consumer thread
    Async () -> IO ()
forall a. Async a -> IO ()
link Async ()
a
    -- Run the consumer
    Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r -> IO b
consumer (Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
 -> IO b)
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> IO b
forall a b. (a -> b) -> a -> b
$ IO (ChainSyncEvent (BlockInMode CardanoMode))
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
forall (m :: * -> *) a r. Monad m => m a -> Stream (Of a) m r
S.repeatM (IO (ChainSyncEvent (BlockInMode CardanoMode))
 -> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r)
-> IO (ChainSyncEvent (BlockInMode CardanoMode))
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
forall a b. (a -> b) -> a -> b
$ MVar (ChainSyncEvent (BlockInMode CardanoMode))
-> IO (ChainSyncEvent (BlockInMode CardanoMode))
forall a. MVar a -> IO a
takeMVar MVar (ChainSyncEvent (BlockInMode CardanoMode))
nextBlockVar
  -- Let's rethrow exceptions from the client thread unwrapped, so that the
  -- consumer does not have to know anything about async
  IO b -> (ExceptionInLinkedThread -> IO b) -> IO b
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` \(ExceptionInLinkedThread Async a
_ (SomeException e
e)) -> e -> IO b
forall a e. Exception e => e -> a
throw e
e

-- | `chainSyncStreamingClient` is the client that connects to a local node
-- and runs the chain-sync mini-protocol. This client is fire-and-forget
-- and does not require any control.
--
-- If the starting point is such that an intersection cannot be found, this
-- client will throw a NoIntersectionFound exception.
chainSyncStreamingClient ::
  [ChainPoint] ->
  MVar (ChainSyncEvent e) ->
  ChainSyncClient e ChainPoint ChainTip IO ()
chainSyncStreamingClient :: [ChainPoint]
-> MVar (ChainSyncEvent e)
-> ChainSyncClient e ChainPoint ChainTip IO ()
chainSyncStreamingClient [ChainPoint]
points MVar (ChainSyncEvent e)
nextChainEventVar =
  IO (ClientStIdle e ChainPoint ChainTip IO ())
-> ChainSyncClient e ChainPoint ChainTip IO ()
forall header point tip (m :: * -> *) a.
m (ClientStIdle header point tip m a)
-> ChainSyncClient header point tip m a
ChainSyncClient (IO (ClientStIdle e ChainPoint ChainTip IO ())
 -> ChainSyncClient e ChainPoint ChainTip IO ())
-> IO (ClientStIdle e ChainPoint ChainTip IO ())
-> ChainSyncClient e ChainPoint ChainTip IO ()
forall a b. (a -> b) -> a -> b
$ ClientStIdle e ChainPoint ChainTip IO ()
-> IO (ClientStIdle e ChainPoint ChainTip IO ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ClientStIdle e ChainPoint ChainTip IO ()
 -> IO (ClientStIdle e ChainPoint ChainTip IO ()))
-> ClientStIdle e ChainPoint ChainTip IO ()
-> IO (ClientStIdle e ChainPoint ChainTip IO ())
forall a b. (a -> b) -> a -> b
$ [ChainPoint]
-> ClientStIntersect e ChainPoint ChainTip IO ()
-> ClientStIdle e ChainPoint ChainTip IO ()
forall point header tip (m :: * -> *) a.
[point]
-> ClientStIntersect header point tip m a
-> ClientStIdle header point tip m a
SendMsgFindIntersect [ChainPoint]
points ClientStIntersect e ChainPoint ChainTip IO ()
forall a. ClientStIntersect e ChainPoint ChainTip IO a
onIntersect
  where
    onIntersect :: ClientStIntersect e ChainPoint ChainTip IO a
onIntersect =
      ClientStIntersect :: forall header point tip (m :: * -> *) a.
(point -> tip -> ChainSyncClient header point tip m a)
-> (tip -> ChainSyncClient header point tip m a)
-> ClientStIntersect header point tip m a
ClientStIntersect
        { recvMsgIntersectFound :: ChainPoint
-> ChainTip -> ChainSyncClient e ChainPoint ChainTip IO a
recvMsgIntersectFound = \ChainPoint
cp ChainTip
ct ->
            IO (ClientStIdle e ChainPoint ChainTip IO a)
-> ChainSyncClient e ChainPoint ChainTip IO a
forall header point tip (m :: * -> *) a.
m (ClientStIdle header point tip m a)
-> ChainSyncClient header point tip m a
ChainSyncClient (IO (ClientStIdle e ChainPoint ChainTip IO a)
 -> ChainSyncClient e ChainPoint ChainTip IO a)
-> IO (ClientStIdle e ChainPoint ChainTip IO a)
-> ChainSyncClient e ChainPoint ChainTip IO a
forall a b. (a -> b) -> a -> b
$ do
              MVar (ChainSyncEvent e) -> ChainSyncEvent e -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (ChainSyncEvent e)
nextChainEventVar (ChainPoint -> ChainTip -> ChainSyncEvent e
forall a. ChainPoint -> ChainTip -> ChainSyncEvent a
RollBackward ChainPoint
cp ChainTip
ct)
              IO (ClientStIdle e ChainPoint ChainTip IO a)
forall a. IO (ClientStIdle e ChainPoint ChainTip IO a)
sendRequestNext,
          recvMsgIntersectNotFound :: ChainTip -> ChainSyncClient e ChainPoint ChainTip IO a
recvMsgIntersectNotFound =
            -- There is nothing we can do here
            ChainSyncEventException
-> ChainTip -> ChainSyncClient e ChainPoint ChainTip IO a
forall a e. Exception e => e -> a
throw ChainSyncEventException
NoIntersectionFound
        }

    sendRequestNext :: IO (ClientStIdle e ChainPoint ChainTip IO a)
sendRequestNext =
      ClientStIdle e ChainPoint ChainTip IO a
-> IO (ClientStIdle e ChainPoint ChainTip IO a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ClientStIdle e ChainPoint ChainTip IO a
 -> IO (ClientStIdle e ChainPoint ChainTip IO a))
-> ClientStIdle e ChainPoint ChainTip IO a
-> IO (ClientStIdle e ChainPoint ChainTip IO a)
forall a b. (a -> b) -> a -> b
$ ClientStNext e ChainPoint ChainTip IO a
-> IO (ClientStNext e ChainPoint ChainTip IO a)
-> ClientStIdle e ChainPoint ChainTip IO a
forall header point tip (m :: * -> *) a.
ClientStNext header point tip m a
-> m (ClientStNext header point tip m a)
-> ClientStIdle header point tip m a
SendMsgRequestNext ClientStNext e ChainPoint ChainTip IO a
onNext (ClientStNext e ChainPoint ChainTip IO a
-> IO (ClientStNext e ChainPoint ChainTip IO a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure ClientStNext e ChainPoint ChainTip IO a
onNext)
      where
        onNext :: ClientStNext e ChainPoint ChainTip IO a
onNext =
          ClientStNext :: forall header point tip (m :: * -> *) a.
(header -> tip -> ChainSyncClient header point tip m a)
-> (point -> tip -> ChainSyncClient header point tip m a)
-> ClientStNext header point tip m a
ClientStNext
            { recvMsgRollForward :: e -> ChainTip -> ChainSyncClient e ChainPoint ChainTip IO a
recvMsgRollForward = \e
bim ChainTip
ct ->
                IO (ClientStIdle e ChainPoint ChainTip IO a)
-> ChainSyncClient e ChainPoint ChainTip IO a
forall header point tip (m :: * -> *) a.
m (ClientStIdle header point tip m a)
-> ChainSyncClient header point tip m a
ChainSyncClient (IO (ClientStIdle e ChainPoint ChainTip IO a)
 -> ChainSyncClient e ChainPoint ChainTip IO a)
-> IO (ClientStIdle e ChainPoint ChainTip IO a)
-> ChainSyncClient e ChainPoint ChainTip IO a
forall a b. (a -> b) -> a -> b
$ do
                  MVar (ChainSyncEvent e) -> ChainSyncEvent e -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (ChainSyncEvent e)
nextChainEventVar (e -> ChainTip -> ChainSyncEvent e
forall a. a -> ChainTip -> ChainSyncEvent a
RollForward e
bim ChainTip
ct)
                  IO (ClientStIdle e ChainPoint ChainTip IO a)
sendRequestNext,
              recvMsgRollBackward :: ChainPoint
-> ChainTip -> ChainSyncClient e ChainPoint ChainTip IO a
recvMsgRollBackward = \ChainPoint
cp ChainTip
ct ->
                IO (ClientStIdle e ChainPoint ChainTip IO a)
-> ChainSyncClient e ChainPoint ChainTip IO a
forall header point tip (m :: * -> *) a.
m (ClientStIdle header point tip m a)
-> ChainSyncClient header point tip m a
ChainSyncClient (IO (ClientStIdle e ChainPoint ChainTip IO a)
 -> ChainSyncClient e ChainPoint ChainTip IO a)
-> IO (ClientStIdle e ChainPoint ChainTip IO a)
-> ChainSyncClient e ChainPoint ChainTip IO a
forall a b. (a -> b) -> a -> b
$ do
                  MVar (ChainSyncEvent e) -> ChainSyncEvent e -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (ChainSyncEvent e)
nextChainEventVar (ChainPoint -> ChainTip -> ChainSyncEvent e
forall a. ChainPoint -> ChainTip -> ChainSyncEvent a
RollBackward ChainPoint
cp ChainTip
ct)
                  IO (ClientStIdle e ChainPoint ChainTip IO a)
sendRequestNext
            }