{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE PackageImports #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
module Marconi.Indexers where
import Control.Concurrent (MVar, forkIO, modifyMVar_, newMVar, readMVar)
import Control.Concurrent.QSemN (QSemN, newQSemN, signalQSemN, waitQSemN)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TChan (TChan, dupTChan, newBroadcastTChanIO, readTChan, writeTChan)
import Control.Lens (view)
import Control.Lens.Operators ((^.))
import Control.Monad (void)
import Data.List (findIndex, foldl1', intersect)
import Data.Map (Map)
import Data.Map qualified as Map
import Data.Maybe (fromMaybe, mapMaybe)
import Streaming.Prelude qualified as S
import Cardano.Api (Block (Block), BlockHeader (BlockHeader), BlockInMode (BlockInMode), CardanoMode,
ChainPoint (ChainPoint, ChainPointAtGenesis), Hash, ScriptData, SlotNo, Tx (Tx), chainPointToSlotNo)
import Cardano.Api qualified as C
import "cardano-api" Cardano.Api.Shelley qualified as Shelley
import Cardano.Ledger.Alonzo.TxWitness qualified as Alonzo
import Cardano.Streaming (ChainSyncEvent (RollBackward, RollForward))
import Control.Concurrent.STM.TMVar (TMVar)
import Marconi.Index.Datum (DatumIndex)
import Marconi.Index.Datum qualified as Datum
import Marconi.Index.ScriptTx qualified as ScriptTx
import Marconi.Index.Utxo qualified as Utxo
import Marconi.Types (TargetAddresses)
import RewindableIndex.Index.VSplit qualified as Ix
import RewindableIndex.Storable qualified as Storable
getDatums :: BlockInMode CardanoMode -> [(SlotNo, (Hash ScriptData, ScriptData))]
getDatums :: BlockInMode CardanoMode
-> [(SlotNo, (Hash ScriptData, ScriptData))]
getDatums (BlockInMode (Block (BlockHeader SlotNo
slotNo Hash BlockHeader
_ BlockNo
_) [Tx era]
txs) EraInMode era CardanoMode
_) = (Tx era -> [(SlotNo, (Hash ScriptData, ScriptData))])
-> [Tx era] -> [(SlotNo, (Hash ScriptData, ScriptData))]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap Tx era -> [(SlotNo, (Hash ScriptData, ScriptData))]
forall era. Tx era -> [(SlotNo, (Hash ScriptData, ScriptData))]
extractDatumsFromTx [Tx era]
txs
where
extractDatumsFromTx :: Tx era -> [(SlotNo, (Hash ScriptData, ScriptData))]
extractDatumsFromTx :: Tx era -> [(SlotNo, (Hash ScriptData, ScriptData))]
extractDatumsFromTx (Tx TxBody era
txBody [KeyWitness era]
_) =
((Hash ScriptData, ScriptData)
-> (SlotNo, (Hash ScriptData, ScriptData)))
-> [(Hash ScriptData, ScriptData)]
-> [(SlotNo, (Hash ScriptData, ScriptData))]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (SlotNo
slotNo,)
([(Hash ScriptData, ScriptData)]
-> [(SlotNo, (Hash ScriptData, ScriptData))])
-> (TxBody era -> [(Hash ScriptData, ScriptData)])
-> TxBody era
-> [(SlotNo, (Hash ScriptData, ScriptData))]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Map (Hash ScriptData) ScriptData -> [(Hash ScriptData, ScriptData)]
forall k a. Map k a -> [(k, a)]
Map.assocs
(Map (Hash ScriptData) ScriptData
-> [(Hash ScriptData, ScriptData)])
-> (TxBody era -> Map (Hash ScriptData) ScriptData)
-> TxBody era
-> [(Hash ScriptData, ScriptData)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TxBody era -> Map (Hash ScriptData) ScriptData
forall era. TxBody era -> Map (Hash ScriptData) ScriptData
scriptDataFromCardanoTxBody
(TxBody era -> [(SlotNo, (Hash ScriptData, ScriptData))])
-> TxBody era -> [(SlotNo, (Hash ScriptData, ScriptData))]
forall a b. (a -> b) -> a -> b
$ TxBody era
txBody
scriptDataFromCardanoTxBody :: C.TxBody era -> Map (Hash ScriptData) ScriptData
scriptDataFromCardanoTxBody :: TxBody era -> Map (Hash ScriptData) ScriptData
scriptDataFromCardanoTxBody (Shelley.ShelleyTxBody ShelleyBasedEra era
_ TxBody (ShelleyLedgerEra era)
_ [Script (ShelleyLedgerEra era)]
_ (C.TxBodyScriptData ScriptDataSupportedInEra era
_ TxDats (ShelleyLedgerEra era)
dats Redeemers (ShelleyLedgerEra era)
_) Maybe (AuxiliaryData (ShelleyLedgerEra era))
_ TxScriptValidity era
_) =
TxDats (ShelleyLedgerEra era) -> Map (Hash ScriptData) ScriptData
forall era. TxDats era -> Map (Hash ScriptData) ScriptData
extractData TxDats (ShelleyLedgerEra era)
dats
where
extractData :: Alonzo.TxDats era -> Map (Hash ScriptData) ScriptData
extractData :: TxDats era -> Map (Hash ScriptData) ScriptData
extractData (Alonzo.TxDats' Map (DataHash (Crypto era)) (Data era)
xs) =
[(Hash ScriptData, ScriptData)] -> Map (Hash ScriptData) ScriptData
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList
([(Hash ScriptData, ScriptData)]
-> Map (Hash ScriptData) ScriptData)
-> (Map (DataHash (Crypto era)) (Data era)
-> [(Hash ScriptData, ScriptData)])
-> Map (DataHash (Crypto era)) (Data era)
-> Map (Hash ScriptData) ScriptData
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Data era -> (Hash ScriptData, ScriptData))
-> [Data era] -> [(Hash ScriptData, ScriptData)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((\ScriptData
x -> (ScriptData -> Hash ScriptData
C.hashScriptData ScriptData
x, ScriptData
x)) (ScriptData -> (Hash ScriptData, ScriptData))
-> (Data era -> ScriptData)
-> Data era
-> (Hash ScriptData, ScriptData)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Data era -> ScriptData
forall ledgerera. Data ledgerera -> ScriptData
Shelley.fromAlonzoData)
([Data era] -> [(Hash ScriptData, ScriptData)])
-> (Map (DataHash (Crypto era)) (Data era) -> [Data era])
-> Map (DataHash (Crypto era)) (Data era)
-> [(Hash ScriptData, ScriptData)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Map (DataHash (Crypto era)) (Data era) -> [Data era]
forall k a. Map k a -> [a]
Map.elems
(Map (DataHash (Crypto era)) (Data era)
-> Map (Hash ScriptData) ScriptData)
-> Map (DataHash (Crypto era)) (Data era)
-> Map (Hash ScriptData) ScriptData
forall a b. (a -> b) -> a -> b
$ Map (DataHash (Crypto era)) (Data era)
xs
scriptDataFromCardanoTxBody TxBody era
_ = Map (Hash ScriptData) ScriptData
forall a. Monoid a => a
mempty
data Coordinator = Coordinator
{ Coordinator -> TChan (ChainSyncEvent (BlockInMode CardanoMode))
_channel :: TChan (ChainSyncEvent (BlockInMode CardanoMode))
, Coordinator -> QSemN
_barrier :: QSemN
, Coordinator -> Int
_indexerCount :: Int
}
initialCoordinator :: Int -> IO Coordinator
initialCoordinator :: Int -> IO Coordinator
initialCoordinator Int
indexerCount =
TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> QSemN -> Int -> Coordinator
Coordinator (TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> QSemN -> Int -> Coordinator)
-> IO (TChan (ChainSyncEvent (BlockInMode CardanoMode)))
-> IO (QSemN -> Int -> Coordinator)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (TChan (ChainSyncEvent (BlockInMode CardanoMode)))
forall a. IO (TChan a)
newBroadcastTChanIO
IO (QSemN -> Int -> Coordinator)
-> IO QSemN -> IO (Int -> Coordinator)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> IO QSemN
newQSemN Int
0
IO (Int -> Coordinator) -> IO Int -> IO Coordinator
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> IO Int
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int
indexerCount
type Worker = Coordinator -> FilePath -> IO [Storable.StorablePoint ScriptTx.ScriptTxHandle]
datumWorker :: Worker
datumWorker :: Worker
datumWorker Coordinator{QSemN
_barrier :: QSemN
_barrier :: Coordinator -> QSemN
_barrier, TChan (ChainSyncEvent (BlockInMode CardanoMode))
_channel :: TChan (ChainSyncEvent (BlockInMode CardanoMode))
_channel :: Coordinator -> TChan (ChainSyncEvent (BlockInMode CardanoMode))
_channel} FilePath
path = do
DatumIndex
ix <- FilePath -> Depth -> IO DatumIndex
Datum.open FilePath
path (Int -> Depth
Datum.Depth Int
2160)
TChan (ChainSyncEvent (BlockInMode CardanoMode))
workerChannel <- STM (TChan (ChainSyncEvent (BlockInMode CardanoMode)))
-> IO (TChan (ChainSyncEvent (BlockInMode CardanoMode)))
forall a. STM a -> IO a
atomically (STM (TChan (ChainSyncEvent (BlockInMode CardanoMode)))
-> IO (TChan (ChainSyncEvent (BlockInMode CardanoMode))))
-> (TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> STM (TChan (ChainSyncEvent (BlockInMode CardanoMode))))
-> TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> IO (TChan (ChainSyncEvent (BlockInMode CardanoMode)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> STM (TChan (ChainSyncEvent (BlockInMode CardanoMode)))
forall a. TChan a -> STM (TChan a)
dupTChan (TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> IO (TChan (ChainSyncEvent (BlockInMode CardanoMode))))
-> TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> IO (TChan (ChainSyncEvent (BlockInMode CardanoMode)))
forall a b. (a -> b) -> a -> b
$ TChan (ChainSyncEvent (BlockInMode CardanoMode))
_channel
IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> (IO () -> IO ThreadId) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ThreadId
forkIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> DatumIndex -> IO ()
innerLoop TChan (ChainSyncEvent (BlockInMode CardanoMode))
workerChannel DatumIndex
ix
[ChainPoint] -> IO [ChainPoint]
forall (f :: * -> *) a. Applicative f => a -> f a
pure [ChainPoint
ChainPointAtGenesis]
where
innerLoop :: TChan (ChainSyncEvent (BlockInMode CardanoMode)) -> DatumIndex -> IO ()
innerLoop :: TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> DatumIndex -> IO ()
innerLoop TChan (ChainSyncEvent (BlockInMode CardanoMode))
ch DatumIndex
index = do
QSemN -> Int -> IO ()
signalQSemN QSemN
_barrier Int
1
ChainSyncEvent (BlockInMode CardanoMode)
event <- STM (ChainSyncEvent (BlockInMode CardanoMode))
-> IO (ChainSyncEvent (BlockInMode CardanoMode))
forall a. STM a -> IO a
atomically (STM (ChainSyncEvent (BlockInMode CardanoMode))
-> IO (ChainSyncEvent (BlockInMode CardanoMode)))
-> STM (ChainSyncEvent (BlockInMode CardanoMode))
-> IO (ChainSyncEvent (BlockInMode CardanoMode))
forall a b. (a -> b) -> a -> b
$ TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> STM (ChainSyncEvent (BlockInMode CardanoMode))
forall a. TChan a -> STM a
readTChan TChan (ChainSyncEvent (BlockInMode CardanoMode))
ch
case ChainSyncEvent (BlockInMode CardanoMode)
event of
RollForward BlockInMode CardanoMode
blk ChainTip
_ct ->
[(SlotNo, (Hash ScriptData, ScriptData))]
-> DatumIndex -> IO DatumIndex
forall (m :: * -> *) h (v :: * -> *) e n q r.
(Monad m, PrimMonad m, MVector (Mutable v) e) =>
e -> SplitIndex m h v e n q r -> m (SplitIndex m h v e n q r)
Ix.insert (BlockInMode CardanoMode
-> [(SlotNo, (Hash ScriptData, ScriptData))]
getDatums BlockInMode CardanoMode
blk) DatumIndex
index IO DatumIndex -> (DatumIndex -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> DatumIndex -> IO ()
innerLoop TChan (ChainSyncEvent (BlockInMode CardanoMode))
ch
RollBackward ChainPoint
cp ChainTip
_ct -> do
[[(SlotNo, (Hash ScriptData, ScriptData))]]
events <- Storage Vector IO [(SlotNo, (Hash ScriptData, ScriptData))]
-> IO [[(SlotNo, (Hash ScriptData, ScriptData))]]
forall (v :: * -> *) (m :: * -> *) e.
(MVector (Mutable v) e, PrimMonad m, Show e) =>
Storage v m e -> m [e]
Ix.getEvents (DatumIndex
index DatumIndex
-> Getting
(Storage Vector IO [(SlotNo, (Hash ScriptData, ScriptData))])
DatumIndex
(Storage Vector IO [(SlotNo, (Hash ScriptData, ScriptData))])
-> Storage Vector IO [(SlotNo, (Hash ScriptData, ScriptData))]
forall s a. s -> Getting a s a -> a
^. Getting
(Storage Vector IO [(SlotNo, (Hash ScriptData, ScriptData))])
DatumIndex
(Storage Vector IO [(SlotNo, (Hash ScriptData, ScriptData))])
forall (m :: * -> *) h (v :: * -> *) e n q r.
Lens' (SplitIndex m h v e n q r) (Storage v m e)
Ix.storage)
TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> DatumIndex -> IO ()
innerLoop TChan (ChainSyncEvent (BlockInMode CardanoMode))
ch (DatumIndex -> IO ()) -> DatumIndex -> IO ()
forall a b. (a -> b) -> a -> b
$
DatumIndex -> Maybe DatumIndex -> DatumIndex
forall a. a -> Maybe a -> a
fromMaybe DatumIndex
index (Maybe DatumIndex -> DatumIndex) -> Maybe DatumIndex -> DatumIndex
forall a b. (a -> b) -> a -> b
$ do
SlotNo
slot <- ChainPoint -> Maybe SlotNo
chainPointToSlotNo ChainPoint
cp
Int
offset <- ([(SlotNo, (Hash ScriptData, ScriptData))] -> Bool)
-> [[(SlotNo, (Hash ScriptData, ScriptData))]] -> Maybe Int
forall a. (a -> Bool) -> [a] -> Maybe Int
findIndex (((SlotNo, (Hash ScriptData, ScriptData)) -> Bool)
-> [(SlotNo, (Hash ScriptData, ScriptData))] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any (\(SlotNo
s, (Hash ScriptData, ScriptData)
_) -> SlotNo
s SlotNo -> SlotNo -> Bool
forall a. Ord a => a -> a -> Bool
< SlotNo
slot)) [[(SlotNo, (Hash ScriptData, ScriptData))]]
events
Int -> DatumIndex -> Maybe DatumIndex
forall (v :: * -> *) e (m :: * -> *) h n q r.
MVector (Mutable v) e =>
Int -> SplitIndex m h v e n q r -> Maybe (SplitIndex m h v e n q r)
Ix.rewind Int
offset DatumIndex
index
isInTargetTxOut
:: TargetAddresses
-> C.TxOut C.CtxTx era
-> Bool
isInTargetTxOut :: TargetAddresses -> TxOut CtxTx era -> Bool
isInTargetTxOut TargetAddresses
targetAddresses (C.TxOut AddressInEra era
address TxOutValue era
_ TxOutDatum CtxTx era
_ ReferenceScript era
_) = case AddressInEra era
address of
(C.AddressInEra (C.ShelleyAddressInEra ShelleyBasedEra era
_) Address addrtype
addr) -> Address addrtype
addr Address addrtype -> NonEmpty (Address addrtype) -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` NonEmpty (Address addrtype)
TargetAddresses
targetAddresses
AddressInEra era
_ -> Bool
False
utxoWorker
:: (Utxo.UtxoIndex -> IO Utxo.UtxoIndex)
-> Maybe TargetAddresses
-> Worker
utxoWorker :: (UtxoIndex -> IO UtxoIndex) -> Maybe TargetAddresses -> Worker
utxoWorker UtxoIndex -> IO UtxoIndex
indexerCallback Maybe TargetAddresses
maybeTargetAddresses Coordinator{QSemN
_barrier :: QSemN
_barrier :: Coordinator -> QSemN
_barrier, TChan (ChainSyncEvent (BlockInMode CardanoMode))
_channel :: TChan (ChainSyncEvent (BlockInMode CardanoMode))
_channel :: Coordinator -> TChan (ChainSyncEvent (BlockInMode CardanoMode))
_channel} FilePath
path = do
UtxoIndex
ix <- FilePath -> Depth -> IO UtxoIndex
Utxo.open FilePath
path (Int -> Depth
Utxo.Depth Int
2160)
TChan (ChainSyncEvent (BlockInMode CardanoMode))
workerChannel <- STM (TChan (ChainSyncEvent (BlockInMode CardanoMode)))
-> IO (TChan (ChainSyncEvent (BlockInMode CardanoMode)))
forall a. STM a -> IO a
atomically (STM (TChan (ChainSyncEvent (BlockInMode CardanoMode)))
-> IO (TChan (ChainSyncEvent (BlockInMode CardanoMode))))
-> (TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> STM (TChan (ChainSyncEvent (BlockInMode CardanoMode))))
-> TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> IO (TChan (ChainSyncEvent (BlockInMode CardanoMode)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> STM (TChan (ChainSyncEvent (BlockInMode CardanoMode)))
forall a. TChan a -> STM (TChan a)
dupTChan (TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> IO (TChan (ChainSyncEvent (BlockInMode CardanoMode))))
-> TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> IO (TChan (ChainSyncEvent (BlockInMode CardanoMode)))
forall a b. (a -> b) -> a -> b
$ TChan (ChainSyncEvent (BlockInMode CardanoMode))
_channel
IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> (IO () -> IO ThreadId) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ThreadId
forkIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> UtxoIndex -> IO ()
innerLoop TChan (ChainSyncEvent (BlockInMode CardanoMode))
workerChannel UtxoIndex
ix
[ChainPoint] -> IO [ChainPoint]
forall (f :: * -> *) a. Applicative f => a -> f a
pure [ChainPoint
ChainPointAtGenesis]
where
innerLoop :: TChan (ChainSyncEvent (BlockInMode CardanoMode)) -> Utxo.UtxoIndex -> IO ()
innerLoop :: TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> UtxoIndex -> IO ()
innerLoop TChan (ChainSyncEvent (BlockInMode CardanoMode))
ch UtxoIndex
index = do
QSemN -> Int -> IO ()
signalQSemN QSemN
_barrier Int
1
IO UtxoIndex -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO UtxoIndex -> IO ()) -> IO UtxoIndex -> IO ()
forall a b. (a -> b) -> a -> b
$ UtxoIndex -> IO UtxoIndex
indexerCallback UtxoIndex
index
ChainSyncEvent (BlockInMode CardanoMode)
event <- STM (ChainSyncEvent (BlockInMode CardanoMode))
-> IO (ChainSyncEvent (BlockInMode CardanoMode))
forall a. STM a -> IO a
atomically (STM (ChainSyncEvent (BlockInMode CardanoMode))
-> IO (ChainSyncEvent (BlockInMode CardanoMode)))
-> STM (ChainSyncEvent (BlockInMode CardanoMode))
-> IO (ChainSyncEvent (BlockInMode CardanoMode))
forall a b. (a -> b) -> a -> b
$ TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> STM (ChainSyncEvent (BlockInMode CardanoMode))
forall a. TChan a -> STM a
readTChan TChan (ChainSyncEvent (BlockInMode CardanoMode))
ch
case ChainSyncEvent (BlockInMode CardanoMode)
event of
RollForward (BlockInMode (Block (BlockHeader SlotNo
slotNo Hash BlockHeader
_ BlockNo
blkNo) [Tx era]
txs) EraInMode era CardanoMode
_) ChainTip
_ct ->
case Maybe TargetAddresses
-> SlotNo -> BlockNo -> [Tx era] -> Maybe UtxoEvent
forall era.
IsCardanoEra era =>
Maybe TargetAddresses
-> SlotNo -> BlockNo -> [Tx era] -> Maybe UtxoEvent
Utxo.getUtxoEvents Maybe TargetAddresses
maybeTargetAddresses SlotNo
slotNo BlockNo
blkNo [Tx era]
txs of
Just UtxoEvent
us -> UtxoEvent -> UtxoIndex -> IO UtxoIndex
forall (m :: * -> *) h (v :: * -> *) e n q r.
(Monad m, PrimMonad m, MVector (Mutable v) e) =>
e -> SplitIndex m h v e n q r -> m (SplitIndex m h v e n q r)
Ix.insert UtxoEvent
us UtxoIndex
index IO UtxoIndex -> (UtxoIndex -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> UtxoIndex -> IO ()
innerLoop TChan (ChainSyncEvent (BlockInMode CardanoMode))
ch
Maybe UtxoEvent
_ -> TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> UtxoIndex -> IO ()
innerLoop TChan (ChainSyncEvent (BlockInMode CardanoMode))
ch UtxoIndex
index
RollBackward ChainPoint
cp ChainTip
_ct -> do
[UtxoEvent]
events <- Storage Vector IO UtxoEvent -> IO [UtxoEvent]
forall (v :: * -> *) (m :: * -> *) e.
(MVector (Mutable v) e, PrimMonad m, Show e) =>
Storage v m e -> m [e]
Ix.getEvents (UtxoIndex
index UtxoIndex
-> Getting
(Storage Vector IO UtxoEvent)
UtxoIndex
(Storage Vector IO UtxoEvent)
-> Storage Vector IO UtxoEvent
forall s a. s -> Getting a s a -> a
^. Getting
(Storage Vector IO UtxoEvent)
UtxoIndex
(Storage Vector IO UtxoEvent)
forall (m :: * -> *) h (v :: * -> *) e n q r.
Lens' (SplitIndex m h v e n q r) (Storage v m e)
Ix.storage)
TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> UtxoIndex -> IO ()
innerLoop TChan (ChainSyncEvent (BlockInMode CardanoMode))
ch (UtxoIndex -> IO ()) -> UtxoIndex -> IO ()
forall a b. (a -> b) -> a -> b
$
UtxoIndex -> Maybe UtxoIndex -> UtxoIndex
forall a. a -> Maybe a -> a
fromMaybe UtxoIndex
index (Maybe UtxoIndex -> UtxoIndex) -> Maybe UtxoIndex -> UtxoIndex
forall a b. (a -> b) -> a -> b
$ do
SlotNo
slot <- ChainPoint -> Maybe SlotNo
chainPointToSlotNo ChainPoint
cp
Int
offset <- (UtxoEvent -> Bool) -> [UtxoEvent] -> Maybe Int
forall a. (a -> Bool) -> [a] -> Maybe Int
findIndex (\UtxoEvent
u -> (UtxoEvent
u UtxoEvent -> Getting SlotNo UtxoEvent SlotNo -> SlotNo
forall s a. s -> Getting a s a -> a
^. Getting SlotNo UtxoEvent SlotNo
Lens' UtxoEvent SlotNo
Utxo.utxoEventSlotNo) SlotNo -> SlotNo -> Bool
forall a. Ord a => a -> a -> Bool
< SlotNo
slot) [UtxoEvent]
events
Int -> UtxoIndex -> Maybe UtxoIndex
forall (v :: * -> *) e (m :: * -> *) h n q r.
MVector (Mutable v) e =>
Int -> SplitIndex m h v e n q r -> Maybe (SplitIndex m h v e n q r)
Ix.rewind Int
offset UtxoIndex
index
scriptTxWorker_
:: (Storable.StorableEvent ScriptTx.ScriptTxHandle -> IO [()])
-> ScriptTx.Depth
-> Coordinator -> TChan (ChainSyncEvent (BlockInMode CardanoMode)) -> FilePath -> IO (IO (), MVar ScriptTx.ScriptTxIndexer)
scriptTxWorker_ :: (StorableEvent ScriptTxHandle -> IO [()])
-> Depth
-> Coordinator
-> TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> FilePath
-> IO (IO (), MVar ScriptTxIndexer)
scriptTxWorker_ StorableEvent ScriptTxHandle -> IO [()]
onInsert Depth
depth Coordinator{QSemN
_barrier :: QSemN
_barrier :: Coordinator -> QSemN
_barrier} TChan (ChainSyncEvent (BlockInMode CardanoMode))
ch FilePath
path = do
ScriptTxIndexer
indexer <- FilePath -> Depth -> IO ScriptTxIndexer
ScriptTx.open FilePath
path Depth
depth
MVar ScriptTxIndexer
mIndexer <- ScriptTxIndexer -> IO (MVar ScriptTxIndexer)
forall a. a -> IO (MVar a)
newMVar ScriptTxIndexer
indexer
(IO (), MVar ScriptTxIndexer) -> IO (IO (), MVar ScriptTxIndexer)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MVar ScriptTxIndexer -> IO ()
loop MVar ScriptTxIndexer
mIndexer, MVar ScriptTxIndexer
mIndexer)
where
loop :: MVar ScriptTx.ScriptTxIndexer -> IO ()
loop :: MVar ScriptTxIndexer -> IO ()
loop MVar ScriptTxIndexer
index = do
QSemN -> Int -> IO ()
signalQSemN QSemN
_barrier Int
1
ChainSyncEvent (BlockInMode CardanoMode)
event <- STM (ChainSyncEvent (BlockInMode CardanoMode))
-> IO (ChainSyncEvent (BlockInMode CardanoMode))
forall a. STM a -> IO a
atomically (STM (ChainSyncEvent (BlockInMode CardanoMode))
-> IO (ChainSyncEvent (BlockInMode CardanoMode)))
-> STM (ChainSyncEvent (BlockInMode CardanoMode))
-> IO (ChainSyncEvent (BlockInMode CardanoMode))
forall a b. (a -> b) -> a -> b
$ TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> STM (ChainSyncEvent (BlockInMode CardanoMode))
forall a. TChan a -> STM a
readTChan TChan (ChainSyncEvent (BlockInMode CardanoMode))
ch
case ChainSyncEvent (BlockInMode CardanoMode)
event of
RollForward (BlockInMode (Block (BlockHeader SlotNo
slotNo Hash BlockHeader
hsh BlockNo
_) [Tx era]
txs :: Block era) EraInMode era CardanoMode
_ :: BlockInMode CardanoMode) ChainTip
_ct -> do
let u :: StorableEvent ScriptTxHandle
u = [Tx era] -> ChainPoint -> StorableEvent ScriptTxHandle
forall era.
IsCardanoEra era =>
[Tx era] -> ChainPoint -> StorableEvent ScriptTxHandle
ScriptTx.toUpdate [Tx era]
txs (SlotNo -> Hash BlockHeader -> ChainPoint
ChainPoint SlotNo
slotNo Hash BlockHeader
hsh)
MVar ScriptTxIndexer
-> (ScriptTxIndexer -> IO ScriptTxIndexer) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar ScriptTxIndexer
index (StorableEvent ScriptTxHandle
-> ScriptTxIndexer -> StorableMonad ScriptTxHandle ScriptTxIndexer
forall h.
(Buffered h, PrimMonad (StorableMonad h)) =>
StorableEvent h -> State h -> StorableMonad h (State h)
Storable.insert StorableEvent ScriptTxHandle
u)
IO [()] -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO [()] -> IO ()) -> IO [()] -> IO ()
forall a b. (a -> b) -> a -> b
$ StorableEvent ScriptTxHandle -> IO [()]
onInsert StorableEvent ScriptTxHandle
u
MVar ScriptTxIndexer -> IO ()
loop MVar ScriptTxIndexer
index
RollBackward ChainPoint
cp ChainTip
_ct -> do
MVar ScriptTxIndexer
-> (ScriptTxIndexer -> IO ScriptTxIndexer) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar ScriptTxIndexer
index ((ScriptTxIndexer -> IO ScriptTxIndexer) -> IO ())
-> (ScriptTxIndexer -> IO ScriptTxIndexer) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ScriptTxIndexer
ix -> ScriptTxIndexer -> Maybe ScriptTxIndexer -> ScriptTxIndexer
forall a. a -> Maybe a -> a
fromMaybe ScriptTxIndexer
ix (Maybe ScriptTxIndexer -> ScriptTxIndexer)
-> IO (Maybe ScriptTxIndexer) -> IO ScriptTxIndexer
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StorablePoint ScriptTxHandle
-> ScriptTxIndexer
-> StorableMonad ScriptTxHandle (Maybe ScriptTxIndexer)
forall h.
(Rewindable h, HasPoint (StorableEvent h) (StorablePoint h),
PrimMonad (StorableMonad h), Eq (StorablePoint h)) =>
StorablePoint h -> State h -> StorableMonad h (Maybe (State h))
Storable.rewind ChainPoint
StorablePoint ScriptTxHandle
cp ScriptTxIndexer
ix
MVar ScriptTxIndexer -> IO ()
loop MVar ScriptTxIndexer
index
scriptTxWorker
:: (Storable.StorableEvent ScriptTx.ScriptTxHandle -> IO [()])
-> Worker
scriptTxWorker :: (StorableEvent ScriptTxHandle -> IO [()]) -> Worker
scriptTxWorker StorableEvent ScriptTxHandle -> IO [()]
onInsert Coordinator
coordinator FilePath
path = do
TChan (ChainSyncEvent (BlockInMode CardanoMode))
workerChannel <- STM (TChan (ChainSyncEvent (BlockInMode CardanoMode)))
-> IO (TChan (ChainSyncEvent (BlockInMode CardanoMode)))
forall a. STM a -> IO a
atomically (STM (TChan (ChainSyncEvent (BlockInMode CardanoMode)))
-> IO (TChan (ChainSyncEvent (BlockInMode CardanoMode))))
-> (TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> STM (TChan (ChainSyncEvent (BlockInMode CardanoMode))))
-> TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> IO (TChan (ChainSyncEvent (BlockInMode CardanoMode)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> STM (TChan (ChainSyncEvent (BlockInMode CardanoMode)))
forall a. TChan a -> STM (TChan a)
dupTChan (TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> IO (TChan (ChainSyncEvent (BlockInMode CardanoMode))))
-> TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> IO (TChan (ChainSyncEvent (BlockInMode CardanoMode)))
forall a b. (a -> b) -> a -> b
$ Coordinator -> TChan (ChainSyncEvent (BlockInMode CardanoMode))
_channel Coordinator
coordinator
(IO ()
loop, MVar ScriptTxIndexer
ix) <- (StorableEvent ScriptTxHandle -> IO [()])
-> Depth
-> Coordinator
-> TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> FilePath
-> IO (IO (), MVar ScriptTxIndexer)
scriptTxWorker_ StorableEvent ScriptTxHandle -> IO [()]
onInsert (Int -> Depth
ScriptTx.Depth Int
2160) Coordinator
coordinator TChan (ChainSyncEvent (BlockInMode CardanoMode))
workerChannel FilePath
path
IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> (IO () -> IO ThreadId) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ThreadId
forkIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO ()
loop
MVar ScriptTxIndexer -> IO ScriptTxIndexer
forall a. MVar a -> IO a
readMVar MVar ScriptTxIndexer
ix IO ScriptTxIndexer
-> (ScriptTxIndexer -> IO [ChainPoint]) -> IO [ChainPoint]
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ScriptTxHandle -> IO [ChainPoint]
forall h. Resumable h => h -> StorableMonad h [StorablePoint h]
Storable.resumeFromStorage (ScriptTxHandle -> IO [ChainPoint])
-> (ScriptTxIndexer -> ScriptTxHandle)
-> ScriptTxIndexer
-> IO [ChainPoint]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Getting ScriptTxHandle ScriptTxIndexer ScriptTxHandle
-> ScriptTxIndexer -> ScriptTxHandle
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting ScriptTxHandle ScriptTxIndexer ScriptTxHandle
forall h. Lens' (State h) h
Storable.handle
newtype UtxoQueryTMVar = UtxoQueryTMVar
{ UtxoQueryTMVar -> TMVar UtxoIndex
unUtxoIndex :: TMVar Utxo.UtxoIndex
}
filterIndexers
:: Maybe FilePath
-> Maybe FilePath
-> Maybe FilePath
-> Maybe TargetAddresses
-> [(Worker, FilePath)]
filterIndexers :: Maybe FilePath
-> Maybe FilePath
-> Maybe FilePath
-> Maybe TargetAddresses
-> [(Worker, FilePath)]
filterIndexers Maybe FilePath
utxoPath Maybe FilePath
datumPath Maybe FilePath
scriptTxPath Maybe TargetAddresses
maybeTargetAddresses =
((Coordinator -> FilePath -> IO [ChainPoint], Maybe FilePath)
-> Maybe (Coordinator -> FilePath -> IO [ChainPoint], FilePath))
-> [(Coordinator -> FilePath -> IO [ChainPoint], Maybe FilePath)]
-> [(Coordinator -> FilePath -> IO [ChainPoint], FilePath)]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe (Coordinator -> FilePath -> IO [ChainPoint], Maybe FilePath)
-> Maybe (Coordinator -> FilePath -> IO [ChainPoint], FilePath)
forall a b. (a, Maybe b) -> Maybe (a, b)
liftMaybe [(Coordinator -> FilePath -> IO [ChainPoint], Maybe FilePath)]
pairs
where
liftMaybe :: (a, Maybe b) -> Maybe (a, b)
liftMaybe (a
worker, Maybe b
maybePath) = case Maybe b
maybePath of
Just b
path -> (a, b) -> Maybe (a, b)
forall a. a -> Maybe a
Just (a
worker, b
path)
Maybe b
_ -> Maybe (a, b)
forall a. Maybe a
Nothing
pairs :: [(Coordinator -> FilePath -> IO [ChainPoint], Maybe FilePath)]
pairs =
[ ((UtxoIndex -> IO UtxoIndex) -> Maybe TargetAddresses -> Worker
utxoWorker UtxoIndex -> IO UtxoIndex
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe TargetAddresses
maybeTargetAddresses, Maybe FilePath
utxoPath)
, (Coordinator -> FilePath -> IO [ChainPoint]
Worker
datumWorker, Maybe FilePath
datumPath)
, ((StorableEvent ScriptTxHandle -> IO [()]) -> Worker
scriptTxWorker (\StorableEvent ScriptTxHandle
_ -> [()] -> IO [()]
forall (f :: * -> *) a. Applicative f => a -> f a
pure []), Maybe FilePath
scriptTxPath)
]
startIndexers
:: [(Worker, FilePath)]
-> IO ([ChainPoint], Coordinator)
startIndexers :: [(Worker, FilePath)] -> IO ([ChainPoint], Coordinator)
startIndexers [(Worker, FilePath)]
indexers = do
Coordinator
coordinator <- Int -> IO Coordinator
initialCoordinator (Int -> IO Coordinator) -> Int -> IO Coordinator
forall a b. (a -> b) -> a -> b
$ [(Coordinator -> FilePath -> IO [ChainPoint], FilePath)] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(Coordinator -> FilePath -> IO [ChainPoint], FilePath)]
[(Worker, FilePath)]
indexers
[[ChainPoint]]
startingPoints <- ((Coordinator -> FilePath -> IO [ChainPoint], FilePath)
-> IO [ChainPoint])
-> [(Coordinator -> FilePath -> IO [ChainPoint], FilePath)]
-> IO [[ChainPoint]]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (\(Coordinator -> FilePath -> IO [ChainPoint]
ix, FilePath
fp) -> Coordinator -> FilePath -> IO [ChainPoint]
ix Coordinator
coordinator FilePath
fp) [(Coordinator -> FilePath -> IO [ChainPoint], FilePath)]
[(Worker, FilePath)]
indexers
([ChainPoint], Coordinator) -> IO ([ChainPoint], Coordinator)
forall (f :: * -> *) a. Applicative f => a -> f a
pure ( ([ChainPoint] -> [ChainPoint] -> [ChainPoint])
-> [[ChainPoint]] -> [ChainPoint]
forall a. (a -> a -> a) -> [a] -> a
foldl1' [ChainPoint] -> [ChainPoint] -> [ChainPoint]
forall a. Eq a => [a] -> [a] -> [a]
intersect [[ChainPoint]]
startingPoints
, Coordinator
coordinator )
mkIndexerStream
:: Coordinator
-> S.Stream (S.Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> IO ()
mkIndexerStream :: Coordinator
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> IO ()
mkIndexerStream Coordinator
coordinator = (Coordinator
-> ChainSyncEvent (BlockInMode CardanoMode) -> IO Coordinator)
-> IO Coordinator
-> (Coordinator -> IO ())
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> IO ()
forall (m :: * -> *) x a b r.
Monad m =>
(x -> a -> m x) -> m x -> (x -> m b) -> Stream (Of a) m r -> m b
S.foldM_ Coordinator
-> ChainSyncEvent (BlockInMode CardanoMode) -> IO Coordinator
step IO Coordinator
initial Coordinator -> IO ()
finish
where
initial :: IO Coordinator
initial :: IO Coordinator
initial = Coordinator -> IO Coordinator
forall (f :: * -> *) a. Applicative f => a -> f a
pure Coordinator
coordinator
step :: Coordinator -> ChainSyncEvent (BlockInMode CardanoMode) -> IO Coordinator
step :: Coordinator
-> ChainSyncEvent (BlockInMode CardanoMode) -> IO Coordinator
step c :: Coordinator
c@Coordinator{QSemN
_barrier :: QSemN
_barrier :: Coordinator -> QSemN
_barrier, Int
_indexerCount :: Int
_indexerCount :: Coordinator -> Int
_indexerCount, TChan (ChainSyncEvent (BlockInMode CardanoMode))
_channel :: TChan (ChainSyncEvent (BlockInMode CardanoMode))
_channel :: Coordinator -> TChan (ChainSyncEvent (BlockInMode CardanoMode))
_channel} ChainSyncEvent (BlockInMode CardanoMode)
event = do
QSemN -> Int -> IO ()
waitQSemN QSemN
_barrier Int
_indexerCount
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> ChainSyncEvent (BlockInMode CardanoMode) -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan (ChainSyncEvent (BlockInMode CardanoMode))
_channel ChainSyncEvent (BlockInMode CardanoMode)
event
Coordinator -> IO Coordinator
forall (f :: * -> *) a. Applicative f => a -> f a
pure Coordinator
c
finish :: Coordinator -> IO ()
finish :: Coordinator -> IO ()
finish Coordinator
_ = () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()