Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: unix event loop #25

Draft
wants to merge 20 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@
(tags
(thread pool domain futures fork-join)))

(package
(name moonpool-io)
(synopsis "Non blocking IO for Moonpool based on Unix")
(depends
(ocaml (>= 5.0))
dune
(poll (and (>= 3.0) (< 4.0)))
(iostream (>= 0.2.2))
(trace :with-test))
(depopts
mtime)
(tags
(moonpool io unix async)))

(package
(name moonpool-lwt)
(synopsis "Event loop for moonpool based on Lwt-engine (experimental)")
Expand Down
34 changes: 34 additions & 0 deletions moonpool-io.opam
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
version: "0.6"
synopsis: "Non blocking IO for Moonpool based on Unix"
maintainer: ["Simon Cruanes"]
authors: ["Simon Cruanes"]
license: "MIT"
tags: ["moonpool" "io" "unix" "async"]
homepage: "https://github.com/c-cube/moonpool"
bug-reports: "https://github.com/c-cube/moonpool/issues"
depends: [
"ocaml" {>= "5.0"}
"dune" {>= "3.0"}
"poll" {>= "3.0" & < "4.0"}
"iostream" {>= "0.2.2"}
"trace" {with-test}
"odoc" {with-doc}
]
depopts: ["mtime"]
build: [
["dune" "subst"] {dev}
[
"dune"
"build"
"-p"
name
"-j"
jobs
"@install"
"@runtest" {with-test}
"@doc" {with-doc}
]
]
dev-repo: "git+https://github.com/c-cube/moonpool.git"
10 changes: 8 additions & 2 deletions src/fib/fiber.ml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type cancel_callback = Exn_bt.t -> unit
let prom_of_fut : 'a Fut.t -> 'a Fut.promise =
Fut.Private_.unsafe_promise_of_fut

module Private_ = struct
module Private0 = struct
type 'a t = {
id: Handle.t; (** unique identifier for this fiber *)
state: 'a state A.t; (** Current state in the lifetime of the fiber *)
Expand Down Expand Up @@ -42,7 +42,7 @@ module Private_ = struct
| Terminating_or_done _ -> true
end

include Private_
include Private0

let create_ ~ls ~runner () : 'a t =
let id = Handle.generate_fresh () in
Expand Down Expand Up @@ -317,3 +317,9 @@ let yield () : unit =
check_if_cancelled_ self;
Suspend_.yield ();
check_if_cancelled_ self

module Private_ = struct
include Private0

let cancel_from_outside = resolve_as_failed_
end
1 change: 1 addition & 0 deletions src/fib/fiber.mli
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ module Private_ : sig
type any = Any : _ t -> any [@@unboxed]

val get_cur : unit -> any option
val cancel_from_outside : _ t -> Exn_bt.t -> unit
end

(**/**)
Expand Down
4 changes: 3 additions & 1 deletion src/fib/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ let main (f : Runner.t -> 'a) : 'a =
Fiber.on_result fiber (fun _ -> Runner.shutdown_without_waiting runner);
(* run the main thread *)
Fifo_pool.Private_.run_thread st runner ~on_exn:(fun e bt ->
raise (Oh_no (Exn_bt.make e bt)));
let ebt = Exn_bt.make e bt in
Fiber.Private_.cancel_from_outside fiber ebt;
raise (Oh_no ebt));
match Fiber.peek fiber with
| Some (Ok x) -> x
| Some (Error ebt) -> Exn_bt.raise ebt
Expand Down
Loading
Loading