Files
CTFCUP-25/aws_sigma_service/executor/src/queue.rs
2025-12-05 07:14:11 +00:00

113 lines
3.7 KiB
Rust

use core::cell::RefCell;
use core::ops::DerefMut;
use std::collections::{BTreeMap, VecDeque};
use std::rc::Rc;
use boa_engine::context::time::JsInstant;
use boa_engine::job::{GenericJob, Job, JobExecutor, NativeAsyncJob, PromiseJob, TimeoutJob};
use boa_engine::{Context, JsResult};
use futures_concurrency::future::FutureGroup;
use futures_lite::{StreamExt, future};
use tokio::task;
pub(crate) struct Queue {
async_jobs: RefCell<VecDeque<NativeAsyncJob>>,
promise_jobs: RefCell<VecDeque<PromiseJob>>,
timeout_jobs: RefCell<BTreeMap<JsInstant, TimeoutJob>>,
generic_jobs: RefCell<VecDeque<GenericJob>>,
}
impl Queue {
pub(crate) fn new() -> Self {
Self {
async_jobs: RefCell::default(),
promise_jobs: RefCell::default(),
timeout_jobs: RefCell::default(),
generic_jobs: RefCell::default(),
}
}
pub(crate) fn drain_timeout_jobs(&self, context: &mut Context) {
let now = context.clock().now();
let mut timeouts_borrow = self.timeout_jobs.borrow_mut();
let mut jobs_to_keep = timeouts_borrow.split_off(&now);
jobs_to_keep.retain(|_, job| !job.is_cancelled());
let jobs_to_run = std::mem::replace(timeouts_borrow.deref_mut(), jobs_to_keep);
drop(timeouts_borrow);
for job in jobs_to_run.into_values() {
if let Err(e) = job.call(context) {
eprintln!("Uncaught {e}");
}
}
}
pub(crate) fn drain_jobs(&self, context: &mut Context) {
self.drain_timeout_jobs(context);
let job = self.generic_jobs.borrow_mut().pop_front();
if let Some(generic) = job
&& let Err(err) = generic.call(context)
{
eprintln!("Uncaught {err}");
}
let jobs = std::mem::take(&mut *self.promise_jobs.borrow_mut());
for job in jobs {
if let Err(e) = job.call(context) {
eprintln!("Uncaught {e}");
}
}
context.clear_kept_objects();
}
}
impl JobExecutor for Queue {
fn enqueue_job(self: Rc<Self>, job: Job, context: &mut Context) {
match job {
Job::PromiseJob(job) => self.promise_jobs.borrow_mut().push_back(job),
Job::AsyncJob(job) => self.async_jobs.borrow_mut().push_back(job),
Job::TimeoutJob(t) => {
let now = context.clock().now();
self.timeout_jobs.borrow_mut().insert(now + t.timeout(), t);
}
Job::GenericJob(g) => self.generic_jobs.borrow_mut().push_back(g),
_ => panic!("unsupported job type"),
}
}
fn run_jobs(self: Rc<Self>, context: &mut Context) -> JsResult<()> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()
.unwrap();
task::LocalSet::default().block_on(&runtime, self.run_jobs_async(&RefCell::new(context)))
}
async fn run_jobs_async(self: Rc<Self>, context: &RefCell<&mut Context>) -> JsResult<()> {
let mut group = FutureGroup::new();
loop {
for job in std::mem::take(&mut *self.async_jobs.borrow_mut()) {
group.insert(job.call(context));
}
if group.is_empty()
&& self.promise_jobs.borrow().is_empty()
&& self.timeout_jobs.borrow().is_empty()
&& self.generic_jobs.borrow().is_empty()
{
return Ok(());
}
if let Some(Err(err)) = future::poll_once(group.next()).await.flatten() {
eprintln!("Uncaught {err}");
};
self.drain_jobs(&mut context.borrow_mut());
task::yield_now().await
}
}
}