AsyncDispatch
This module implements asynchronous IO. This includes a dispatcher, a Future type implementation, and an async macro which allows asynchronous code to be written in a synchronous style with the await keyword.
The dispatcher acts as a kind of event loop. You must call poll on it (or a function which does so for you such as waitFor or runForever) in order to poll for any outstanding events. The underlying implementation is based on epoll on Linux, IO Completion Ports on Windows and select on other operating systems.
The poll function will not, on its own, return any events. Instead an appropriate Future object will be completed. A Future is a type which holds a value which is not yet available, but which may be available in the future. You can check whether a future is finished by using the finished function. When a future is finished it means that either the value that it holds is now available or it holds an error instead. The latter situation occurs when the operation to complete a future fails with an exception. You can distinguish between the two situations with the failed function.
Future objects can also store a callback procedure which will be called automatically once the future completes.
Futures therefore can be thought of as an implementation of the proactor pattern. In this pattern you make a request for an action, and once that action is fulfilled a future is completed with the result of that action. Requests can be made by calling the appropriate functions. For example: calling the recv function will create a request for some data to be read from a socket. The future which the recv function returns will then complete once the requested amount of data is read or an exception occurs.
Code to read some data from a socket may look something like this:
var future = socket.recv(100) future.callback = proc () = echo(future.read)
All asynchronous functions returning a Future will not block. They will not however return immediately. An asynchronous function will have code which will be executed before an asynchronous request is made, in most cases this code sets up the request.
In the above example, the recv function will return a brand new Future instance once the request for data to be read from the socket is made. This Future instance will complete once the requested amount of data is read, in this case it is 100 bytes. The second line sets a callback on this future which will be called once the future completes. All the callback does is write the data stored in the future to stdout. The read function is used for this and it checks whether the future completes with an error for you (if it did it will simply raise the error), if there is no error however it returns the value of the future.
Asynchronous procedures
Asynchronous procedures remove the pain of working with callbacks. They do this by allowing you to write asynchronous code the same way as you would write synchronous code.
An asynchronous procedure is marked using the {.async.} pragma. When marking a procedure with the {.async.} pragma it must have a Future[T] return type or no return type at all. If you do not specify a return type then Future[void] is assumed.
Inside asynchronous procedures await can be used to call any procedures which return a Future; this includes asynchronous procedures. When a procedure is "awaited", the asynchronous procedure it is awaited in will suspend its execution until the awaited procedure's Future completes. At which point the asynchronous procedure will resume its execution. During the period when an asynchronous procedure is suspended other asynchronous procedures will be run by the dispatcher.
The await call may be used in many contexts. It can be used on the right hand side of a variable declaration: var data = await socket.recv(100), in which case the variable will be set to the value of the future automatically. It can be used to await a Future object, and it can be used to await a procedure returning a Future[void]: await socket.send("foobar").
Discarding futures
Futures should never be discarded. This is because they may contain errors. If you do not care for the result of a Future then you should use the asyncCheck procedure instead of the discard keyword.
Examples
For examples take a look at the documentation for the modules implementing asynchronous IO. A good place to start is the asyncnet module.
Limitations/Bugs
- The effect system (raises: []) does not work with async procedures.
- Can't await in a except body
- Forward declarations for async procs are broken, link includes workaround: https://github.com/nim-lang/Nim/issues/3182.
- FutureVar[T] needs to be completed manually.
Types
FutureBase = ref object of RootObj cb: proc () {.closure, gcsafe.} finished: bool error*: ref Exception ## Stored exception errorStackTrace*: string when not false: stackTrace: string ## For debugging purposes only. id: int fromProc: string
- Untyped future. Source
Future[T] = ref object of FutureBase value: T ## Stored value
- Typed future. Source
FutureVar[T] = distinct Future[T]
- Source
FutureError = object of Exception cause*: FutureBase
- Source
CompletionKey = DWORD
- Source
CompletionData = object fd*: AsyncFD cb*: proc (fd: AsyncFD; bytesTransferred: DWORD; errcode: OSErrorCode) {.closure, gcsafe.}
- Source
PDispatcher = ref object of PDispatcherBase ioPort: Handle handles: HashSet[AsyncFD]
- Source
PCustomOverlapped = ref CustomOverlapped
- Source
AsyncFD = distinct int
- Source
Procs
proc newFuture[T](fromProc: string = "unspecified"): Future[T]
-
Creates a new future.
Specifying fromProc, which is a string specifying the name of the proc that this future belongs to, is a good habit as it helps with debugging.
Source proc newFutureVar[T](fromProc = "unspecified"): FutureVar[T]
-
Create a new FutureVar. This Future type is ideally suited for situations where you want to avoid unnecessary allocations of Futures.
Specifying fromProc, which is a string specifying the name of the proc that this future belongs to, is a good habit as it helps with debugging.
Source proc clean[T](future: FutureVar[T])
- Resets the finished status of future. Source
proc complete[T](future: Future[T]; val: T)
- Completes future with value val. Source
proc complete(future: Future[void]) {.raises: [FutureError, Exception], tags: [RootEffect].}
- Completes a void future. Source
proc complete[T](future: FutureVar[T])
- Completes a FutureVar. Source
proc fail[T](future: Future[T]; error: ref Exception)
- Completes future with error. Source
proc callback=(future: FutureBase; cb: proc () {.closure, gcsafe.}) {. raises: [Exception], tags: [RootEffect].}
-
Sets the callback proc to be called when the future completes.
If future has already completed then cb will be called immediately.
Note: You most likely want the other callback setter which passes future as a param to the callback.
Source proc callback=[T](future: Future[T]; cb: proc (future: Future[T]) {.closure, gcsafe.})
-
Sets the callback proc to be called when the future completes.
If future has already completed then cb will be called immediately.
Source proc read[T](future: Future[T]): T
-
Retrieves the value of future. Future must be finished otherwise this function will fail with a ValueError exception.
If the result of the future is an error then that error will be raised.
Source proc readError[T](future: Future[T]): ref Exception
-
Retrieves the exception stored in future.
An ValueError exception will be thrown if no exception exists in the specified Future.
Source proc mget[T](future: FutureVar[T]): var T
-
Returns a mutable value stored in future.
Unlike read, this function will not raise an exception if the Future has not been finished.
Source proc finished[T](future: Future[T]): bool
-
Determines whether future has completed.
True may indicate an error or a value. Use failed to distinguish.
Source proc failed(future: FutureBase): bool {.raises: [], tags: [].}
- Determines whether future completed with an error. Source
proc asyncCheck[T](future: Future[T])
-
Sets a callback on future which raises an exception if the future finished with an error.
This should be used instead of discard to discard void futures.
Source proc `and`[T, Y](fut1: Future[T]; fut2: Future[Y]): Future[void]
- Returns a future which will complete once both fut1 and fut2 complete. Source
proc `or`[T, Y](fut1: Future[T]; fut2: Future[Y]): Future[void]
- Returns a future which will complete once either fut1 or fut2 complete. Source
proc `==`(x: AsyncFD; y: AsyncFD): bool {.borrow.}
- Source
proc newDispatcher(): PDispatcher {.raises: [], tags: [].}
- Creates a new Dispatcher instance. Source
proc getGlobalDispatcher(): PDispatcher {.raises: [], tags: [].}
- Retrieves the global thread-local dispatcher. Source
proc register(fd: AsyncFD) {.raises: [OSError], tags: [].}
- Registers fd with the dispatcher. Source
proc poll(timeout = 500) {.raises: [ValueError, Exception, OSError, FutureError], tags: [RootEffect, TimeEffect].}
- Waits for completion events and processes them. Source
proc connect(socket: AsyncFD; address: string; port: Port; domain = nativesockets.AF_INET): Future[void] {. raises: [ValueError, OSError, Exception, FutureError], tags: [RootEffect].}
-
Connects socket to server at address:port.
Returns a Future which will complete when the connection succeeds or an error occurs.
Source proc recv(socket: AsyncFD; size: int; flags = {SafeDisconn}): Future[string] {. raises: [ValueError, Exception, FutureError], tags: [RootEffect].}
-
Reads up to size bytes from socket. Returned future will complete once all the data requested is read, a part of the data has been read, or the socket has disconnected in which case the future will complete with a value of "".
Warning: The Peek socket flag is not supported on Windows.
Source proc recvInto(socket: AsyncFD; buf: cstring; size: int; flags = {SafeDisconn}): Future[int] {. raises: [ValueError, FutureError, Exception], tags: [RootEffect].}
-
Reads up to size bytes from socket into buf, which must at least be of that size. Returned future will complete once all the data requested is read, a part of the data has been read, or the socket has disconnected in which case the future will complete with a value of 0.
Warning: The Peek socket flag is not supported on Windows.
Source proc send(socket: AsyncFD; data: string; flags = {SafeDisconn}): Future[void] {. raises: [ValueError, FutureError, Exception], tags: [RootEffect].}
- Sends data to socket. The returned future will complete once all data has been sent. Source
proc acceptAddr(socket: AsyncFD; flags = {SafeDisconn}): Future[ tuple[address: string, client: AsyncFD]] {. raises: [ValueError, OSError, Exception, FutureError], tags: [RootEffect].}
-
Accepts a new connection. Returns a future containing the client socket corresponding to that connection and the remote address of the client. The future will complete when the connection is successfully accepted.
The resulting client socket is automatically registered to the dispatcher.
The accept call may result in an error if the connecting socket disconnects during the duration of the accept. If the SafeDisconn flag is specified then this error will not be raised and instead accept will be called again.
Source proc newAsyncNativeSocket(domain, sockType, protocol: cint): AsyncFD {. raises: [OSError], tags: [].}
- Creates a new socket and registers it with the dispatcher implicitly. Source
proc newAsyncNativeSocket(domain: Domain = nativesockets.AF_INET; sockType: SockType = SOCK_STREAM; protocol: Protocol = IPPROTO_TCP): AsyncFD {. raises: [OSError], tags: [].}
- Creates a new socket and registers it with the dispatcher implicitly. Source
proc closeSocket(socket: AsyncFD) {.raises: [], tags: [].}
- Closes a socket and ensures that it is unregistered. Source
proc unregister(fd: AsyncFD) {.raises: [], tags: [].}
- Unregisters fd. Source
proc sleepAsync(ms: int): Future[void] {.raises: [], tags: [TimeEffect].}
- Suspends the execution of the current async procedure for the next ms milliseconds. Source
proc accept(socket: AsyncFD; flags = {SafeDisconn}): Future[AsyncFD] {. raises: [ValueError, OSError, Exception, FutureError], tags: [RootEffect].}
- Accepts a new connection. Returns a future containing the client socket corresponding to that connection. The future will complete when the connection is successfully accepted. Source
proc recvLine(socket: AsyncFD): Future[string] {.raises: [FutureError], tags: [RootEffect].}
-
Reads a line of data from socket. Returned future will complete once a full line is read or an error occurs.
If a full line is read \r\L is not added to line, however if solely \r\L is read then line will be set to it.
If the socket is disconnected, line will be set to "".
If the socket is disconnected in the middle of a line (before \r\L is read) then line will be set to "". The partial line will be lost.
Warning: This assumes that lines are delimited by \r\L.
Note: This procedure is mostly used for testing. You likely want to use asyncnet.recvLine instead.
Source proc runForever() {.raises: [ValueError, Exception, OSError, FutureError], tags: [RootEffect, TimeEffect].}
- Begins a never ending global dispatcher poll loop. Source
proc waitFor[T](fut: Future[T]): T
- Blocks the current thread until the specified future completes. Source