Creating a Step
What is a step?
A step is a unit of processing logic in the SDK and can be used to define logic for the extraction, transformation, or storing of data. Steps are the building blocks of a processor. The Aptos core processors represent (1) getting a stream of transactions from Transaction Stream, (2) extracting the data, (3) writing to a database, and (4) tracking the progress, each as separate steps.
There are two types of steps in the SDK:
- AsyncStep: Processes a batch of input items and returns a batch of output items.
- PollableAsyncStep: Does the same as
AsyncStep
, but it also periodically polls its internal state and returns a batch of output items if available.
How to create a Step
To create a step with the SDK, follow these instructions:
-
Implement the
Processable
trait. This trait defines several important details about the step: the input and output types, the processing logic, and the run type (eitherAsyncStepRunType
orPollableAsyncStepRunType
).#[async_trait] impl Processable for MyExtractorStep { // The Input is a vec of Transaction type Input = Vec<Transaction>; // The Output is a vec of MyData type Output = Vec<MyData>; // Depending on the type of step this is, the RunType is either // - AsyncRunType // - PollableAsyncRunType type RunType = AsyncRunType; // Processes a batch of input items and returns a batch of output items. async fn process( &mut self, input: TransactionContext<Vec<Transaction>>, ) -> Result<Option<TransactionContext<Vec<MyData>>>, ProcessorError> { let transactions = input.data; let data = transactions.iter().map(|transaction| { // Define the processing logic to extract MyData from a Transaction }).collect(); Ok(Some(TransactionContext { data, metadata: input.metadata, })) } }
In most cases, you’re going to be processing a list of inputs to a list of outputs. To speed up the processing, we recommend using
rayon
to process sequential computations in parallel. You can see an example of how we userayon.par_iter
to parallelize the processing here.In the example code above, you’ll notice that the input and output types are wrapped within a
TransactionContext
.TransactionContext
contains relevant metadata about the batch of data being processed, such as the transaction versions and timestamp, and are used for metrics and logging. -
Implement the
NamedStep
trait. This is used for logging.impl NamedStep for MyExtractorStep { fn name(&self) -> String { "MyExtractorStep".to_string() } }
-
Implement either
AsyncStep
trait orPollableAsyncStep
trait, which defines how the step will be run in the processor.-
If you’re using
AsyncStep
, add this to your code:impl AsyncStep for MyExtractorStep {}
-
If you’re creating a
PollableAsyncStep
, you will need to define the poll interval and what the step should do every time it polls.#[async_trait] impl<T: Send + 'static> PollableAsyncStep for MyPollStep<T> where Self: Sized + Send + Sync + 'static, T: Send + 'static, { fn poll_interval(&self) -> std::time::Duration { // Define duration } async fn poll(&mut self) -> Result<Option<Vec<TransactionContext<T>>>, ProcessorError> { // Define code here on what this step should do every time it polls // Optionally return a batch of output items } }
-
Parsing Transactions
When building the extractor step, you’ll need to define how you want to parse your data from transactions. Read more about how to parse your data from transactions here.
Common SDK steps
The SDK comes with a set of common steps that you can use to build your processor.
TransactionStreamStep
provides a stream of Aptos transactions to the processor. Read more about it here.TimedBufferStep
buffers a batch of items and periodically polls to release the items to the next stepVersionTrackerStep
tracks the progress of the processor and checkpoints the processor’s progress. Read more about it here.OrderByVersionStep
orders transaction contextx by their starting versions. It buffers ordered these contexts and releases them at every poll interval.WriteRateLimitStep
limits the number of bytes written to the database per second.