State too big: Cxn error: Max payload size exceeded

We created a Fish to gather information for reporting, which es sent to an Azure Database from an observing endpoint. After some events are gathered and the fish state grows, starting the app gives the following error:

Cxn error: Max payload size exceeded
The app only contains: pond.observe(reportingFish, state => console.log(state)

Trying to gather more information on this with a console log on the start of the onEvent function, is not triggered, so it seems that a previous state ist already “dirty”.
Our state is quite big with Lists and Maps.

export type ReportingFishState = {
  userOnActivity: Map<UserActivityKey, UserOnActivity>
  userBookedHours: List<UserBookedHours>
  userOnDummyActivity: Map<UserActivityKey, UserOnActivity>
  userBookedHoursDummy: List<UserBookedHours>
  interrupts: List<TrainInterruptInfo>
  correctiveTickets: List<CorrectiveTicketInfo>
  activityBookedHours: List<ActivityBookedHours>
  finishedTickets: List<FinishedTicketInfo>
}

No serialization or deserialization function is provided now (don’t know if it is necessary for a correct snapshot). Is there a way to compress the state or do you have an idea how to solve or prevent this problem?

This looks like the state really gets too big. The maximum size of a snapshot is 128MB.
Is it possible it exceeds this size?

I think the way forward is to look into how the contained state can be partitioned into reasonably sized chunks.

What are you trying to achieve with the ReportingFish?

It probably exceeds this size, but I’m not sure what would be the best (and easiest way) to handle this.
The reportingFish gatheres events from the application and pushes them to a database in Azure, where they can be used to generate reports with PowerBI. The reportingFish itself does not send the events, of course.
Since we will need this reporting function also for future projects, we are highly interested in a good solution to this (even if it means refactoring or other efforts).

Basically there are two solutions we can think of now:
a) compress the snapshot state. You’d hook into the (de-)serialization process to do that, as you already suggested
b) don’t keep the information you already pushed to the DB in the state

While option a) will probably work for some time, it will only take so far. The root problem that the state is ever growing won’t go away. It might be sufficient, though. If you decide to go down that road: A colleague suggested GitHub - nodeca/pako: high speed zlib port to javascript, works in browser & node.js, he’s already worked with that.

For me, option b) sounds simpler and more robust, though I can’t really tell without seeing the code. This would mean you’d not keep all the state in the ReportingFish but just as long as you haven’t pushed it to the DB. Whenever something is written to the DB you can remove that information from the local state.

Does any of this help you to proceed?

Hi,
We already remove successfully send information from the state. But if we are for some reason not able to connect to Azure, the state grows until the connection is back again. Therefore, it is hard to tell when the size becomes critical. I will have a look at the compression. Is there a better way to gather information from the application without unsing a fish?

Ok, I see. To me, the state looks as it might contain disjunct sets of information. If so, it might help to split it into multiple ReportingFishes, each dealing with different parts of the domain model.

That said:
Here’s how to enable compression for snapshots.

You need to do two things:

  1. Implement toJSON() in your business types. This needs to return the compressed data, which might be counter-intuitive. You can think of toJSON() as serialize
  2. Implement deserialize in your fish

This is described in https://developer.actyx.com/docs/pond/guides/snapshots in more detail.

Here’s a small example of how this could look like:

Assume we have an event with the current time emitted every 50ms, and subscribe to this event once with and once without compression:

import { Pond } from '@actyx/pond'
import { BoringFish, CompressingFish, pushEventTag } from '../fish'

Pond.default()
  .then(pond => {
    setInterval(() => pond.emit(pushEventTag, { content: Date() }), 50)
    pond.observe(BoringFish.of(), state => console.log('boring', state))
    pond.observe(CompressingFish.of(), state => console.log('compressed', state))
  })
  .catch(console.error)

The boring fish simply aggregates the state over time:

import { Fish, FishId, Tag } from '@actyx/pond'

type State = string[]
type PushEvent = { content: string }
export const pushEventTag = Tag<PushEvent>('pushed')
export const BoringFish = {
    of: (): Fish<State, PushEvent> => ({
        fishId: FishId.of('BoringFish', 'Carp', 0),
        initialState: [],
        where: pushEventTag,
        onEvent: (state, event) => {
            return [...state, event.content]
        },
    })
}

In contrast, the compressing fish implements snapshot compression using pako:

import * as Pako from 'pako';

const pack = (data : any) : string => Pako.deflate(JSON.stringify(data), {to: 'string'})
const unpack : any = (zipped: string) => JSON.parse(Pako.inflate(zipped as string, { to: 'string' }) as string)

type CompressedState = {
    data: string[]
    toJSON: (data: string[]) => {}
}

const INITIAL_STATE = {
    data: [],
    toJSON: () => pack([])
}

export const CompressingFish = {
    of: (): Fish<CompressedState, PushEvent> => ({
        fishId: FishId.of('CompressingFish', 'Fugu', 0),
        initialState: INITIAL_STATE,
        where: pushEventTag,
        onEvent: (state, event) => {
            let data = [...state.data, event.content]
            return {
                data,
                toJSON: () => pack(data)
            }
        },
        deserializeState: (zipped) => unpack(zipped) as CompressedState
    })
}

Now, this is neither elegant nor optimal, but it shows us we can reduce the snapshot size by a factor of ~10:
image
You can look at your snapshots by connecting to the Actyx DB file at <actyxos-data>/store/<topic> using a SQLite client and run select semantics, length(data) from snapshots.

To make the approach a bit cleaner and more reusable (thanks @alex_AX), we can just build a wrapper that adds the compression function:

type CompressingStateWrapper<S> = S & {toJSON: () => string}

export const asCompressingFish = <S,E>(fish: Fish<S,E>):Fish<CompressingStateWrapper<S>,E> => ({
    fishId: fish.fishId,
    where: fish.where,
    initialState: {
        ...fish.initialState, 
        toJSON: function() { return Pako.deflate(JSON.stringify(this), { to: 'string' }) }
    },
    deserializeState: (zipped) => ({
        ...JSON.parse(Pako.inflate(zipped as string, { to: 'string' }) as string), 
        toJSON: function() { Pako.deflate(JSON.stringify(this)) },
    } as CompressingStateWrapper<S>),

    isReset: fish.isReset,
    onEvent: (state, event,metadata) => {
        const newState = fish.onEvent(state, event, metadata) as CompressingStateWrapper<S>
        newState.toJSON = function() { return Pako.deflate(JSON.stringify(this), { to: 'string' }) }
        return newState
    }
})

Using this, you simply can observe the state of an arbitrary fish using compressed snapshots like this: pond.observe(asCompressingFish(BoringFish.of()), state => console.log('wrapped boring', state)). Note that if you also observe the unwrapped fish, you’ll still get uncompressed snapshots as well.

It should be possible to also get rid of the JSON.stringify / .parse dance and instead use number[]/Uint8Array directly instead of strings when working with Pako. I guess this will reduce the size a bit further, but I haven’t implemented that yet. Why don’t you try and let us know how it worked out? :slight_smile:

Hey,
sorry for the delayed answer. Thank you very much for your tipps and the help :slight_smile: We found out, that there was an error deleting the already sent objects from one of the lists. We fixed it now and will see if this might already have solved the problem. Additionally, we split the reporting fish in multiple smaller fishes, so that the state for each fish is smaller. If this is not enough, we will try out the compression.
Is there a way to reset an already overflown state? Since we don’t want to reset the topic, is there a way to force reloading the whole state from scratch without using the broken snapshot?

1 Like

Good morning,

Sounds like a much better approach.

To get rid of the old snapshot, increase the version in the FishId. It is the last parameter.

FishId.of(semantics, name, version)

Hey, alright thank you :slight_smile:

FWIW, I did a write up on considerations about fish state size. Basically the same as the answer above, but with some additional context.