Aggregating cross-chain DEX data
Ethereum has adopted the rollup-centric roadmap, positioning itself as the global settlement layer for potential layer-2s more so than being direct target for "traditional" layer-1 applications. The move allows layer-2s to acquire the economic security of Ethereum as a base layer, the layer-2s on the other hand provide Ethereum with fees for all the block (and blob!) space required to commit each proof. Having such a clear separation of responsibilities provides the means needed to scale the current biggest bottleneck: transaction throughput. One of the downsides of the approach is the increased scatteredness of liquidity and the sheer amount of liquidity pools getting deployed.
One of the problems we're trying to solve at Polia is how to most efficiently provide direct exposure to real-time data from all relevant (liquidity unpoor) decentralised exchanges, without accruing too much unnecessary noise and pressure for the clients that'll be consuming the data. Doing so would allow anyone the ability to acquire and keep up-to-date a "world-view" of the current state of all decentralised liquidity pools across multiple networks.
The system used to provide the service consists of two core components, both developed in Rust:
- node ETL - pipeline aggregating filtered live data into an append-only stream
- websocket gateway - WS server synthesising and exposing the data for client consumption
graph LR
A[Networks nodes - Ethereum,Zksync,Arbitrum etc.] -->|Raw Data| B(Node ETL instances)
B -->|Filtered Data| C[FIFO Stream]
C -->|Aggregated Data| D(Websocket Gateway)
D -->|Synthesised Data| E[Clients]
subgraph "ETL Pipeline"
B
C
end
subgraph "Data Exposure"
D
end
classDef component fill:#f9f,stroke:#333,stroke-width:2px;
class B,D component;
Node ETL
The Node ETL provides the means to listen, aggregate and transform the raw events notifications received from each supported network. Each EVM compatible layer-2 offers the same means to acquire live data as the underlying layer-1 - pubsub JSON-RPC notifications. As DEFI is mostly still dominated by uniswap-v2 and uniswap-v3, and the hundreds of clones that have been deployed in their honor, the events we're most interested in to get an idea of the prices of roughly ~80% of all deployed liquidity pools are respectively:
- uniswap-v2 -
event Sync(uint112 reserve0, uint112 reserve1) - uniswap-v3 -
event Swap(address indexed sender, address indexed recipient, int256 amount0, int256 amount1, uint160 sqrtPriceX96, uint128 liquidity, int24 tick)
Each running instance of Node ETL has a Manager that keeps a list of collectors for each supported network. The Manager is actively tracking the healthiness and liveness of each collector. With the increase of the number of networks, you'd want to also increase the running instances of Node ETL - each supporting different networks, for example, one Node ETL instance might handle Ethereum and Arbitrum, while another manages Zksync, Base, and Optimism. This division allows for optimal resource allocation and prevents any single instance from getting throttled.
#[derive(Debug, Clone)]
pub struct Manager<N>
where
N: NetworkT,
{
pub manager_config: ManagerConfig,
pub collectors: Vec<Collector<N>>,
queue_rx: mpsc::Sender<Resource<N::Block, N::Log>>,
}
Each collector consists of a node instance, database pool and a queue that'll propagate the events of interest to a dedicated channel that'll process them further. As the websocket connections are fundamentally unstable the node client has to be self-healing and reconnect whenever a connection gets broken up or closed.
#[derive(Debug, Clone)]
pub struct Collector<N>
where
N: NetworkT,
{
config: CollectorConfig,
node: N,
#[cfg(feature = "postgres")]
pool: PgPool,
#[cfg(feature = "redis-stream")]
queue_rx: mpsc::Sender<Resource<N::Block, N::Log>>,
}
Each node client subscribes for the desired events, by applying criteria (also known as Filter in the official docs), transforms the result into internally meaningful representation, inserts it as is into the database and sends it to a dedicated channel for further processing.
async fn stream_logs(
&self,
stop_signal: watch::Receiver<bool>,
criteria: Option<&Criteria>,
) -> crate::Result<()> {
let mut stream = self.node.sub_logs(criteria).await?;
while let Some(log) = stream.next().await {
if stop_signal.has_changed()? {
break;
}
let log = match log {
Ok(log) => serde_json::from_str::<<N as NetworkT>::Log>(log.get())?,
Err(err) => {
error!(
kind = "log_error_count",
network = &self.config.network.to_string(),
);
continue;
}
};
#[cfg(feature = "postgres")]
{
event
.insert(
&self.pool,
&self.config.network.to_string(),
&log.core().tx_hash,
)
.await?;
}
#[cfg(feature = "redis-stream")]
{
let event = match_events(log);
match self.queue_rx.send(Resource::Log(event)).await {
Ok(_) => {}
Err(err) => {
warn!(kind="propagate_error", network = &self.config.network.to_string(), err = ?err);
}
}
}
}
Ok(())
}
Large prerequisite is acquiring a list of all the deployed pools as that's necessary to apply correct filter criteria (and prevent yourself from drowning in logs (especially in some layer-2s)).
The whole process can be illustrated with the diagram below.
graph TD
A[Node ETL] --> B[Manager]
B --> C1[Collector 1]
B --> C2[Collector 2]
B --> C3[Collector N]
C1 --> D1[Node Instance]
C1 --> D2[Database Pool]
C1 --> D3[Redis Stream]
I[Raw Events] <-->|Listen| D1
D1 -->|Aggregate & Transform| J[Processed Events]
J -->|Insert| D2[Database Pool]
J -->|Send| D3[Redis Stream]
subgraph "Collector"
D1
D2
D3
J
end
subgraph "Events"
I
end
classDef manager fill:#f9f,stroke:#333,stroke-width:2px;
classDef collector fill:#bbf,stroke:#333,stroke-width:2px;
classDef process fill:#bfb,stroke:#333,stroke-width:2px;
class B manager;
class C1,C2,C3 collector;
class I,J process;
Websocket Gateway
The websocket gateway complements the Node ETL by handling client interactions, including authentication, topic subscriptions, and real-time event delivery. The server is utilising the actor model to segregate the responsibilities between the different logical components.
The lifecycle of a client establishing a connection is roughly summarised by the below diagram. Please note that the diagram represents happy-path scenarios only.
sequenceDiagram
actor C as Client[Entity]
participant SES as ClientSession[Actor]
participant DB as Database[Actor]
participant SRV as Server[Actor]
participant DEFI as DEFI[Actor]
participant RED as Redis
C ->> SES: Begin Handshake
C ->> SES: Send an API Key
SES ->> DB: Verify the API Key
SES ->> SRV: Add the new client to the list of clients with established sessions
SRV ->> DB: Acquire client's relevant information (used credits/plan details etc.)
SRV ->> C: Establish connection
C ->> SES: Subscribe to specific topics
SES ->> SRV: Verify the correctness of the message
loop Continuous Communication
DEFI ->> RED: Poll for new events
RED -->> DEFI: Return new events
DEFI -->> DB: Optionally log the events before propagating
DEFI -->> SRV: Send synthesised events for further client distribution
end
SRV ->> SES: Send a msg to all client sessions each time a relevant event happens
SRV ->> SRV: Modify the user's used credits
SES ->> C: ClientSession actor sends the messages to all end client entities
rect rgb(240, 240, 240)
Note over C,SRV: Session End
C ->> SES: Close connection
SES ->> SRV: Notify of client disconnect
SES ->> SES: Cleanup client-specific resources
SRV ->> DB: Update client's session data (total time, credits used, etc.)
SRV ->> SRV: Remove client from active subscribers list
end
The authentication happens through the Authorization header - as recommended by the Websocket RFC (more on how to authenticate can be found in the official docs).
All messages are parsed to determine the networks and dexes of interest of the one connecting. Clients can subscribe to as many networks and/or dexes as they're interested in - so long as they can handle the amount of data.
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for ClientSession {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match msg {
Ok(ws::Message::Ping(msg)) => {
self.hb = Instant::now();
ctx.pong(&msg);
}
Ok(ws::Message::Pong(_)) => {
self.hb = Instant::now();
}
Ok(ws::Message::Text(txt)) => {
match serde_json::from_str::<ClientPayload>(&txt) {
Ok(payload) => {
let information = self.information.clone();
let networks = parse_comma_separated_values(payload.networks);
let dex = parse_comma_separated_values(payload.dex);
match payload.method {
Method::Subscribe => {
self.server_addr.do_send(Subscribe {
client_id: self.id,
information: information.clone(),
networks: networks.clone(),
kind: payload.kind.unwrap_or_default(),
dex: dex.clone(),
});
}
Method::Unsubscribe => {
self.server_addr.do_send(Unsubscribe {
client_id: self.id,
information: information.clone(),
networks: networks.clone(),
kind: payload.kind.unwrap_or_default(),
dex: dex.clone(),
});
}
}
}
Err(_) => {
ctx.text(
json!({
"error": "Invalid Format. Please visit https://docs.polia.io for more information on how to format messages. Support is available at support@polia.io."
}).to_string()
);
}
}
}
Ok(ws::Message::Binary(_)) => {}
_ => (),
}
}
}
As previously outlined, the ETL-processed logs are propagated to an append-only stream in Redis. This allows the DEFI actor to maintain an event loop with connection to Redis, thereby accessing the newest events from the latest block. Running the node ETL as a standalone process, rather than directly integrating its responsibilities into the websocket-gateway, offers several advantages:
- It enables easy scaling of the number of websocket instances running simultaneously (redis's append-only logs can be consumed by multiple entities).
- It prevents an exponential increase in node event consumption. This is particularly crucial when using an RPC provider like Infura, especially when integrating multiple networks. While running dedicated nodes can alleviate this to some extent, relying solely on external RPC providers for multi-network integration can lead to significant consumption-related challenges.
- The setup allows for the utilisation of real-time events by processes that offer additional functionality and flexibility.