event stream: future stream of events from a relay #118

Open
opened 2023-01-06 13:45:37 +13:00 by mikedilger · 5 comments
mikedilger commented 2023-01-06 13:45:37 +13:00 (Migrated from github.com)

Today I spent a lot of time trying to create a future Stream of events from a relay, so we could do something like this

let relay_url = Url::new(&person_relay.relay);
let filter = ...;
let mut future = self.launch_query(relay_url, vec![filter]).await?;

// Do it in a separate task so we don't block the overlord.                                                                                                                                  
task::spawn(async move {
    while let Some(event) = future.next().await {
        if let Err(e) = crate::process::process_new_event(&event, false, None, None).await {
            tracing::error!("{}", e);
        } else {
            // .... async fetch nip-05 validation here
        }
    }
});

I got it compiling, but there are numerous issues that do not seem easy to solve:

  1. Relays may never send an EOSE (happened in my test). And so the while-let loop above will never complete. Integrating this with a tokio timeout remains a challenge.
  2. Minions may die (happened in my test). And if so they never signal the future to wake up, nor that it is complete. Something has to signal the future than an error has occured so it can complete into an error state. And that something can't be the minion itself, unless it can do it from a higher level function that catches the error, so that needs to know of the futures that were being handled. And minions only know that a future is being handled by virtue of the subscription being flagged as such.
  3. There is always a risk that due to some mistake in coding, the future never wakes up.

I'll keep on this a bit.

Today I spent a lot of time trying to create a future Stream of events from a relay, so we could do something like this ````rust let relay_url = Url::new(&person_relay.relay); let filter = ...; let mut future = self.launch_query(relay_url, vec![filter]).await?; // Do it in a separate task so we don't block the overlord. task::spawn(async move { while let Some(event) = future.next().await { if let Err(e) = crate::process::process_new_event(&event, false, None, None).await { tracing::error!("{}", e); } else { // .... async fetch nip-05 validation here } } }); ```` I got it compiling, but there are numerous issues that do not seem easy to solve: 1. Relays may never send an EOSE (happened in my test). And so the while-let loop above will never complete. Integrating this with a tokio timeout remains a challenge. 2. Minions may die (happened in my test). And if so they never signal the future to wake up, nor that it is complete. Something has to signal the future than an error has occured so it can complete into an error state. And that something can't be the minion itself, unless it can do it from a higher level function that catches the error, so that needs to know of the futures that were being handled. And minions only know that a future is being handled by virtue of the subscription being flagged as such. 3. There is always a risk that due to some mistake in coding, the future never wakes up. I'll keep on this a bit.
mikedilger commented 2023-01-06 18:21:14 +13:00 (Migrated from github.com)

I've pushed a branch 'eventstream' where 'Update Metadata' happens via an event stream future "Stream" loop, one per relay the person is known to have an association with. It works in the happy case, but I believe a number of conditions could cause it to get stuck forever, so it's not merging into master until those are handled.

Also, if I get this working well, all the other subscriptions will change to this model and I'll clean up the code and simplify.

BTW: this branch may be totally rewritten and force pushed.

I've pushed a branch 'eventstream' where 'Update Metadata' happens via an event stream future "Stream" loop, one per relay the person is known to have an association with. It works in the happy case, but I believe a number of conditions could cause it to get stuck forever, so it's not merging into master until those are handled. Also, if I get this working well, all the other subscriptions will change to this model and I'll clean up the code and simplify. BTW: this branch may be totally rewritten and force pushed.
mikedilger commented 2024-03-19 08:56:54 +13:00 (Migrated from github.com)

BTW, if I had (early on) pushed harder to develop full async semantics we could be doing things like this by now:

discovery_relays.get_relay_list(pubkey).and_then(|relay_list| {
    let inbox_relays = relay_list.inboxes();
    inbox_relays.get_event(id)
}

Notice how all the heavy work of spinning up multiple minions, waiting for responses from all of them, knowing when the future completes (EOSE or EVENT from relay, or a timeout), would all be hidden inside those async futures.

BTW, if I had (early on) pushed harder to develop full async semantics we could be doing things like this by now: ```rust discovery_relays.get_relay_list(pubkey).and_then(|relay_list| { let inbox_relays = relay_list.inboxes(); inbox_relays.get_event(id) } ``` Notice how all the heavy work of spinning up multiple minions, waiting for responses from all of them, knowing when the future completes (EOSE or EVENT from relay, or a timeout), would all be hidden inside those async futures.
bu5hm4nn commented 2024-03-19 10:11:25 +13:00 (Migrated from github.com)

That looks attractive.
On recovering from dead minions: Maybe we need to have some kind of garbage collection that runs periodically and checks for stale futures?

That looks attractive. On recovering from dead minions: Maybe we need to have some kind of garbage collection that runs periodically and checks for stale futures?
mikedilger commented 2024-03-19 10:28:06 +13:00 (Migrated from github.com)

Well if we did this, the futures would self-timeout, we wouldn't need the garbage collection.

Part of me wants to 'start over' with a new rust-nostr library that uses the efficient binary structures and parsing of Chorus and also has this kind of async-future driven system, and also uses the innovations in the work of Yuki on his nostr rust library, and layer it with types being the lowest level library, then an actions library above that.

Well if we did this, the futures would self-timeout, we wouldn't need the garbage collection. Part of me wants to 'start over' with a new rust-nostr library that uses the efficient binary structures and parsing of Chorus and also has this kind of async-future driven system, and also uses the innovations in the work of Yuki on his nostr rust library, and layer it with types being the lowest level library, then an actions library above that.
bu5hm4nn commented 2024-03-20 10:51:45 +13:00 (Migrated from github.com)

It's probably going to be worth it because nostr is not ever going to be complete, so if we can speed up implementation time for new features, it is going to compound quickly.

It's probably going to be worth it because nostr is not ever going to be complete, so if we can speed up implementation time for new features, it is going to compound quickly.
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
nostr/gossip#118
No description provided.