Ok, I see. To me, the state looks as it might contain disjunct sets of information. If so, it might help to split it into multiple ReportingFish
es, each dealing with different parts of the domain model.
That said:
Here’s how to enable compression for snapshots.
You need to do two things:
- Implement
toJSON()
in your business types. This needs to return the compressed data, which might be counter-intuitive. You can think of toJSON()
as serialize
- Implement
deserialize
in your fish
This is described in https://developer.actyx.com/docs/pond/guides/snapshots in more detail.
Here’s a small example of how this could look like:
Assume we have an event with the current time emitted every 50ms, and subscribe to this event once with and once without compression:
import { Pond } from '@actyx/pond'
import { BoringFish, CompressingFish, pushEventTag } from '../fish'
Pond.default()
.then(pond => {
setInterval(() => pond.emit(pushEventTag, { content: Date() }), 50)
pond.observe(BoringFish.of(), state => console.log('boring', state))
pond.observe(CompressingFish.of(), state => console.log('compressed', state))
})
.catch(console.error)
The boring fish simply aggregates the state over time:
import { Fish, FishId, Tag } from '@actyx/pond'
type State = string[]
type PushEvent = { content: string }
export const pushEventTag = Tag<PushEvent>('pushed')
export const BoringFish = {
of: (): Fish<State, PushEvent> => ({
fishId: FishId.of('BoringFish', 'Carp', 0),
initialState: [],
where: pushEventTag,
onEvent: (state, event) => {
return [...state, event.content]
},
})
}
In contrast, the compressing fish implements snapshot compression using pako:
import * as Pako from 'pako';
const pack = (data : any) : string => Pako.deflate(JSON.stringify(data), {to: 'string'})
const unpack : any = (zipped: string) => JSON.parse(Pako.inflate(zipped as string, { to: 'string' }) as string)
type CompressedState = {
data: string[]
toJSON: (data: string[]) => {}
}
const INITIAL_STATE = {
data: [],
toJSON: () => pack([])
}
export const CompressingFish = {
of: (): Fish<CompressedState, PushEvent> => ({
fishId: FishId.of('CompressingFish', 'Fugu', 0),
initialState: INITIAL_STATE,
where: pushEventTag,
onEvent: (state, event) => {
let data = [...state.data, event.content]
return {
data,
toJSON: () => pack(data)
}
},
deserializeState: (zipped) => unpack(zipped) as CompressedState
})
}
Now, this is neither elegant nor optimal, but it shows us we can reduce the snapshot size by a factor of ~10:

You can look at your snapshots by connecting to the Actyx DB file at <actyxos-data>/store/<topic>
using a SQLite client and run select semantics, length(data) from snapshots
.
To make the approach a bit cleaner and more reusable (thanks @alex_AX), we can just build a wrapper that adds the compression function:
type CompressingStateWrapper<S> = S & {toJSON: () => string}
export const asCompressingFish = <S,E>(fish: Fish<S,E>):Fish<CompressingStateWrapper<S>,E> => ({
fishId: fish.fishId,
where: fish.where,
initialState: {
...fish.initialState,
toJSON: function() { return Pako.deflate(JSON.stringify(this), { to: 'string' }) }
},
deserializeState: (zipped) => ({
...JSON.parse(Pako.inflate(zipped as string, { to: 'string' }) as string),
toJSON: function() { Pako.deflate(JSON.stringify(this)) },
} as CompressingStateWrapper<S>),
isReset: fish.isReset,
onEvent: (state, event,metadata) => {
const newState = fish.onEvent(state, event, metadata) as CompressingStateWrapper<S>
newState.toJSON = function() { return Pako.deflate(JSON.stringify(this), { to: 'string' }) }
return newState
}
})
Using this, you simply can observe the state of an arbitrary fish using compressed snapshots like this: pond.observe(asCompressingFish(BoringFish.of()), state => console.log('wrapped boring', state))
. Note that if you also observe the unwrapped fish, you’ll still get uncompressed snapshots as well.
It should be possible to also get rid of the JSON.stringify / .parse
dance and instead use number[]
/Uint8Array
directly instead of string
s when working with Pako. I guess this will reduce the size a bit further, but I haven’t implemented that yet. Why don’t you try and let us know how it worked out? 