//! Service utilities for running fuel sync. use std::sync::Arc; use crate::{ import::{ Config, Import, }, ports::{ self, BlockImporterPort, ConsensusPort, PeerToPeerPort, }, state::State, sync::SyncHeights, }; use fuel_core_services::{ stream::{ BoxStream, IntoBoxStream, }, RunnableService, RunnableTask, Service, ServiceRunner, SharedMutex, StateWatcher, }; use fuel_core_types::fuel_types::BlockHeight; use futures::StreamExt; use tokio::sync::Notify; #[cfg(test)] mod tests; /// Creates an instance of runnable sync service. pub fn new_service
(
current_fuel_block_height: BlockHeight,
p2p: P,
executor: E,
consensus: C,
params: Config,
) -> anyhow::Result
where
P: PeerToPeerPort + Send + Sync + 'static,
E: BlockImporterPort + Send + Sync + 'static,
C: ConsensusPort + Send + Sync + 'static,
{
sync_heights: SyncHeights,
import_task_handle: ServiceRunner (Import );
impl SyncTask
where
P: PeerToPeerPort + Send + Sync + 'static,
E: BlockImporterPort + Send + Sync + 'static,
C: ConsensusPort + Send + Sync + 'static,
{
fn new(
height_stream: BoxStream RunnableTask for SyncTask
where
P: PeerToPeerPort + Send + Sync + 'static,
E: BlockImporterPort + Send + Sync + 'static,
C: ConsensusPort + Send + Sync + 'static,
{
#[tracing::instrument(level = "debug", skip_all, err, ret)]
async fn run(&mut self, _: &mut StateWatcher) -> anyhow::Result RunnableService for SyncTask
where
P: PeerToPeerPort + Send + Sync + 'static,
E: BlockImporterPort + Send + Sync + 'static,
C: ConsensusPort + Send + Sync + 'static,
{
const NAME: &'static str = "SyncTask";
type SharedData = ();
type Task = SyncTask ;
type TaskParams = ();
fn shared_data(&self) -> Self::SharedData {}
async fn into_task(
mut self,
watcher: &StateWatcher,
_: Self::TaskParams,
) -> anyhow::Result RunnableTask for ImportTask
where
P: PeerToPeerPort + Send + Sync + 'static,
E: BlockImporterPort + Send + Sync + 'static,
C: ConsensusPort + Send + Sync + 'static,
{
#[tracing::instrument(level = "debug", skip_all, err, ret)]
async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result RunnableService for ImportTask
where
P: PeerToPeerPort + Send + Sync + 'static,
E: BlockImporterPort + Send + Sync + 'static,
C: ConsensusPort + Send + Sync + 'static,
{
const NAME: &'static str = "ImportTask";
type SharedData = ();
type TaskParams = ();
type Task = ImportTask ;
fn shared_data(&self) -> Self::SharedData {}
async fn into_task(
self,
_: &StateWatcher,
_: Self::TaskParams,
) -> anyhow::Result