use crate::{ fuel_core_graphql_api::ports::ConsensusModulePort, service::adapters::{ BlockImporterAdapter, BlockProducerAdapter, P2PAdapter, PoAAdapter, TxPoolAdapter, }, }; use anyhow::anyhow; use fuel_core_poa::{ ports::{ BlockImporter, P2pPort, TransactionPool, TransactionsSource, }, service::{ Mode, SharedState, }, }; use fuel_core_services::stream::BoxStream; use fuel_core_storage::transactional::Changes; use fuel_core_types::{ fuel_tx::TxId, fuel_types::BlockHeight, services::{ block_importer::{ BlockImportInfo, UncommittedResult as UncommittedImporterResult, }, executor::{ Error as ExecutorError, UncommittedResult, }, txpool::ArcPoolTx, }, tai64::Tai64, }; use tokio_stream::{ wrappers::BroadcastStream, StreamExt, }; impl PoAAdapter { pub fn new(shared_state: Option) -> Self { Self { shared_state } } pub async fn manually_produce_blocks( &self, start_time: Option, mode: Mode, ) -> anyhow::Result<()> { self.shared_state .as_ref() .ok_or(anyhow!("The block production is disabled"))? .manually_produce_block(start_time, mode) .await } } #[async_trait::async_trait] impl ConsensusModulePort for PoAAdapter { async fn manually_produce_blocks( &self, start_time: Option, number_of_blocks: u32, ) -> anyhow::Result<()> { self.manually_produce_blocks(start_time, Mode::Blocks { number_of_blocks }) .await } } impl TransactionPool for TxPoolAdapter { fn pending_number(&self) -> usize { self.service.pending_number() } fn total_consumable_gas(&self) -> u64 { self.service.total_consumable_gas() } fn remove_txs(&self, ids: Vec<(TxId, ExecutorError)>) -> Vec { self.service.remove_txs( ids.into_iter() .map(|(tx_id, err)| (tx_id, err.to_string())) .collect(), ) } fn transaction_status_events(&self) -> BoxStream { Box::pin( BroadcastStream::new(self.service.new_tx_notification_subscribe()) .filter_map(|result| result.ok()), ) } } #[async_trait::async_trait] impl fuel_core_poa::ports::BlockProducer for BlockProducerAdapter { async fn produce_and_execute_block( &self, height: BlockHeight, block_time: Tai64, source: TransactionsSource, ) -> anyhow::Result> { match source { TransactionsSource::TxPool => { self.block_producer .produce_and_execute_block_txpool(height, block_time) .await } TransactionsSource::SpecificTransactions(txs) => { self.block_producer .produce_and_execute_block_transactions(height, block_time, txs) .await } } } } #[async_trait::async_trait] impl BlockImporter for BlockImporterAdapter { async fn commit_result( &self, result: UncommittedImporterResult, ) -> anyhow::Result<()> { self.block_importer .commit_result(result) .await .map_err(Into::into) } fn block_stream(&self) -> BoxStream { Box::pin( BroadcastStream::new(self.block_importer.subscribe()) .filter_map(|result| result.ok()) .map(BlockImportInfo::from), ) } } #[cfg(feature = "p2p")] impl P2pPort for P2PAdapter { fn reserved_peers_count(&self) -> BoxStream { if let Some(service) = &self.service { Box::pin( BroadcastStream::new(service.subscribe_reserved_peers_count()) .filter_map(|result| result.ok()), ) } else { Box::pin(tokio_stream::pending()) } } } #[cfg(not(feature = "p2p"))] impl P2pPort for P2PAdapter { fn reserved_peers_count(&self) -> BoxStream { Box::pin(tokio_stream::pending()) } }