Performance improvements with isReset and observeLatest

I’ve created a node app to connect to a Microsoft SQL Server and read continuously from its tables.
In case of an app crash, I’ve implemented a method to store the latest database ID into a fish.
When restarting the app, it uses the state of the fish to retrieve the latest ID that was successfully read to use it as an offset for the database connection.
It looks like this:

const specificId = await new Promise<number>((resolve) => {
  cancelSubscription = actyx.observe(
    (specificId) => {

console.log(`Last stored ID in fish: ${specificId}`)
offsetId = specificId

The corresponding fish looks like this:

lastDatabaseId: (machineId: string): Fish<number, Event> => ({
    fishId: FishId.of('ErrorFish', machineId, 0),
    initialState: 0,
    where: errorTag,
    // isReset: () => true,
    onEvent: (state, event) => {
      if (
        event.eventType === 'error_occured' &&
        event.machineId === machineId &&
        parseInt(event.sourceSpecificId.toString()) > state
      ) {
        return parseInt(event.sourceSpecificId.toString()) // necessary because property comes as a string from event and needs to be parsed

      return state

I know that this is problematic regarding to the performance of my app as the fish, when it will be awaken, runs through all historical states. The goal is to just get the latest state without running through all the history when restarting the app.
I also know that the fish parameter “isReset” and the pond method “observeLatest” are good candidates to improve the performance here. But I failed to implement both correctly and wasn’t able to find a satisfying example.

Do you have an example for my case on how to use both?

Hi @alko,

You could improve your code/performance a lot when you add an id to the tag.

const emitErrorOccurredEvent = (pond: Pond, machineId: string, sourceSpecificId: string): PendingEmission =>
  pond.emit(errorTag.withId(machineId), {
    eventType: 'error_occured',

Now, you can query this particular event very precisely.

Pond.default(manifest).then(async (pond) => {
  await emitErrorOccurredEvent(pond, 'M1', Date()).toPromise()

  const lastEvent = await new Promise((res) => {
    const cancel ={ query: errorTag.withId('M1') }, (event) => {

If you are not able to improve on the Tag side, check out observeBestMatch()
you can use it very similarly as, but you can filter on the machineId for the best match.

Hi @alex_AX,

we have got another challenge, that the ErrorEvent could be of different types and we only would like to get the latest errorOccuredEvent because only this type contains the sourceSpecificId we are looking for. It could happen that an errorAcknowledgedEvent is the latest so we have no sourceSpecificId.

Am I right and can I solve this with the observeLatest method?
If not would it be better to use the observeBestMatch method or the lastDatabaseId fish like @alko described and add improved tagging and isReset implementation?

Our ErrorEvent definition looks like this:

export type errorOccuredEvent = {
  eventType: 'error_occured'
  errorId: string
  reason: string
  machineId: string
  machineName: string
  variablename: string
  sourceSpecificId: number

export type errorHintAddedEvent = {
  eventType: 'errorHintAdded'
  errorId: string
  machineId: string
  hint: string

export type errorAcknowledgedEvent = {
  eventType: 'errorAcknowledged'
  errorId: string
  machineId: string

export type Event = errorHintAddedEvent | errorAcknowledgedEvent | errorOccuredEvent

The ErrorTag looks like this:

const errorTag = Tag<Event>('Error')

We emit the event like this:

const emitErrorOccured = (
  pond: Pond,
  errorId: string,
  machineId: string,
  reason: string,
  variableName: string,
  sourceSpecificId: number,
) =>
      eventType: 'error_occured',
      machineId: machineId,
      errorId: errorId,
      reason: reason,
      variablename: variableName,
      sourceSpecificId: sourceSpecificId,

So when using the observeLatest it should be something like this:

const latestEvent = await new Promise<Event>((resolve) => {
            cancelSubscription =
                query: ErrorFish.tags.errorTag.and(
              (event: Event) => {
                // TODO: only get latest ErrorOccuredEvent

Hi @jokr,

as discussed previously:

  1. Add an additional Tag called Error.occurred whenever you emit errorOccuredEvent
  2. In your observerLatest, add this Tag to the query, e.g.{
  query: MachineFish.tags.machineTag.withId(ID)
    .and(Tag('Error.occurred')) // should probably be extracted and put into ErrorFish
}, ...)

This will trigger your callback only for the latest known event tagged with Error.occurred originating from the machine having ID. You don’t need to filter manually in the callback then, b/c the query already returns only the events you’re looking for.

Further performance tweaks should not be required.

1 Like