Prevent ObserveRegistry to use all intermediate States of the RegistryFish

Hi,

is there a way to prevent the observeRegistry Function from reading all intermediate states of the RegistryFish? I need the Fish to complete the function, which is called on each id and Fish in the registryFish before reading the registryFish State again. Is that somehow possible?

Our problem:
We use the registryFish for reporting, where each value in the registryFish (and the corresponding SubFish) is emitted to a Azure function. With the current behaviour of the observeRegistry function, emission to Azure is called multiple times for the same value. This happens, when multiple events are sent to the registry fish at once and it updates for each value.

Thank you <3

Hi Rebecca,

I hope I understand you correctly.

There is a property in the pondOptions call stateEffectDebounce. You can use it to debounce the state effect.

Pond.of({}, { stateEffectDebounce: 2500 } ).then((pond) => {
  pond.observe(SuperFish.swim(), (st) => {
    console.log(JSON.stringify(st, undefined, 2))
  })
})

If you need more control over each state-changed, I suggest you, to use the rx-pond directly. You could replace the combineLatest() that produces the state array with merge() to get the states in sequence and benefit from RxJs internal backpressure.

Unfortunately this did not solve the problem. The state changes of the fishes called by the registry fishs IDs seem to keep firering up the function. I tried to prevent this with the following:

export const observeRegistryExhaustive$ = <RegS, RegE, Prop, State, Event>(
  rxPond: RxPond,
  registryFish: Fish<RegS, RegE>,
  mapToProperty: (regState: RegS) => ReadonlyArray<Prop | undefined>,
  makeEntityFish: (p: Prop) => Fish<State, Event>,
  onStateChanged: (states: State[]) => void,
): Observable<void> =>
  rxPond.observe(registryFish).pipe(
    distinctUntilChanged(deepEqual),
    map(mapToProperty),
    map((props): Prop[] => props.filter((p): p is Prop => p !== undefined)),
    switchMap(
      (regState): Observable<State[]> =>
        regState.length === 0
          ?  Observable.of([])
          :  Observable.combineLatest(
              regState.map(
                id =>
                  rxPond.observe(makeEntityFish(id)),
              ),
            ),
    ),
    distinctUntilChanged(deepEqual),
    exhaustMap(states => {
      return Observable.from(Observable.of(onStateChanged(states)))
    }),
  )

But the problem stays the same. I’m not too experienced with rxjs, so it might be, that exhaustMap is not the best way to do this. Any suggestions?

Thank you for the help @alex_AX : The problem was solved by making the onStateChange function returning a Promise

onStateChanged: (states: State[]) => Promise<void>

and changing the last part to

exhaustMap(states => {
      console.log('states', states)
      return Observable.from(onStateChanged(states))
    }),

to make the calls aync.

I’m very happy, that I was able to help you :slight_smile:

For other fellows:
We end up in a personal debugging session :slight_smile:.
If you stuck on the same issue and miss some context, please let me know.