Lua
Helper Threads: Building blocks for non-blocking libraries.

What is? · Concepts · Lua API · Included Tasks · C API · Examples


What is the Helper Threads Toolkit?

It's a library to help developers build libraries. More specifically, to build non-blocking libraries.

Using Lua as a scripting addition for an application works wonderfully; but using it as the main development language shows some deficiencies. Most notably, it's almost impossible to get a good multithreading environment.

A coroutine-based dispatcher can be a complex project on it's own, especially if it have to manage all kinds of potentially blocking libraries. The Helper Threads Toolkit makes it easier to do because any library build with it in mind will use the same interface to represent non-finished tasks and to signal a task's completion. This allows the dispatcher to very simple, resuming the apropriate coroutine when a task is done. At the same time, it's a framework to make it easy to write those libraries in C, because it manages all the thread handling and locking.

Concepts:

Task

A task is an action that could be time-consuming, therefore it's separated in four steps: Preparation, Work, Update(s) and Finishing.

The first step creates an object (what I call a task object) used to identify it at the other three steps. The second step, Work, occurs in background; without blocking the main Lua execution. The third step, Update, can occur any number of times (including zero times), as needed by the Work process. Finally, the Finishing step is in fact the only Update done after the Work step finishes, it occurs exactly once and the task is disposed.

Queue

These are First-In, First-Out (FIFO) queues holding task objects. A task is added at the end of the queue with queue:addtask(task), and retrieved at the front with queue:wait([timeout]).

There are two types of queues: input queues, where prepared tasks are added; and output queues, where tasks are added by the threads to notify its completion or any intermediate event.

Thread

A thread is a separate execution context. It's only for C code and it can't touch or access the Lua state. This restriction makes the library much simpler, and improves performance significatively. Each thread has an input queue and an output queue, either one can be shared by several threads.

During execution, a thread waits on it's input queue for a task. When it gets one, it calls it's work function. After that function returns, marks the task as "Done" and puts it in it's output queue. Finally, returns to waiting on the input queue and repeats it all over again.

Preparation

To start a task, it has to get some parameters about the work to be performed. Also, it usually have to allocate some memory to hold these parameters and any result. All this is done in the "Preparation" step, and ends returning the task object to the calling Lua code.

Work

This is where all gets done. It's the time-consuming step wich we want to avoid blocking the Lua code.

When a thread picks a task from its input queue, the task's work function is called. This function does all the work according to the parameters collected at the "Preparation" step.

Since this function is called in a separate execution context, it can't directly interact with the Lua code; but if needed it can signal an event, making the unfinished task object to appear in the output queue. This signaling is optional, and can be repeated as many times as needed. There's also the option to pause the thread until the task is 'Updated'.

Update(s)

When the task object appears in the output queue, the Lua code should do an 'update' on the task. This is an opportunity for the C library to interact with the running Lua code. These updates should be brief, just to copy some data and/or state between the two contexts.

The task's work function initiates these interactions making the task object appear in the output queue, optionally blocking itself until the Lua code responds. In this case, the thread is resumed as soon as the update is done.

Finishing

This step doesn't have it's own API functions; it's just the last update. When the work function finishes, the helper library automatically puts the task object in the output queue, with a state of "Done"; and after the Lua code calls the update function, the task is considered "Finished" and disposed.

This is where most libraries will return data from the work just finished to the calling Lua code.

The Lua API:

Note that this API doesn't provide a function to create a task object. These are created by C libraries that use the Helper Thread Toolkit. For example, the nb_file.read(file, len) function (part of the sample "nb_file" library) creates and returns a task object that identifies the read operation and encapsulates the given parameters file and len.

In most cases, the functions described here would be called by a Lua dispatcher; that is, some Lua framework that manages several Lua threads of execution using coroutines or maybe some other methods.

helper.newqueue ()

Returns a newly created queue object.

helper.newthread (input, output)

Returns a newly created thread object. It's spawned and running, so if the input queue has task objects in it, they'll be executed ASAP. Either or both queues can be shared with other threads.

If more than one thread have the same input queue, you can't predict which one will execute a given task. If the input thread is used by only one thread, the tasks will be executed sequentially, with no reordering.

Each task will be added to the output queue, where the Lua code can get them using the peek() or wait() methods. If several threads have the same output queue, a single wait() line could get events from all those threads (a single task for each call, of course).

helper.update (task [, ...])

When a task appears at an output queue, the Lua code should respond calling this function. The task's update handler (in the C library) has access to any extra parameters given here, and can return any value(s) to the Lua code.

If the task is in the "Done" state, this update will be the last one for this task; and the task will be marked as "Finished" after this call, and eventually disposed.

If the task is in the "Ready" state, it means it has been prepared; but it's not in any queue, so it hasn't begun execution by any thread. In this case, the task is executed immediately and in a blocking manner. This makes easy for a Lua dispatcher to 'wrap' the task preparation functions and provide a simple blocking API over the non-blocking one provided by the underlying C library.

helper.state (task)

Returns an string indicating the task's state. The possible results are:

queue:addtask (task)

Use this function to add tasks to input queues. The task should be in the "Ready" state, or it will be rejected. If the queue is currently empty and has a thread waiting on it, the task would be immediatly picked and executed.

queue:remove (task)

Removes a task from the queue. If it was in the "Waiting" state, it's returned to the "Ready" state. Returns the given task if successful, or nil if not (maybe it wasn't in this queue anymore).

queue:peek ()

Returns the front task of a queue, or nil if the queue was empty. Doesn't block nor modify the queue in any way.

queue:wait ([timeout])

Removes and returns the front task from the queue. If the queue is empty, it waits at most timeout seconds before returning nil. If no timeout is given, blocks indefinitely until a task appears in the queue.

Included Tasks

The Helper library includes a few tasks that can be useful for dispatchers:

helper.null ()

The simplest possible task. When a thread picks a null task from its input queue, it will immediately put it in the output queue without any processing. Even if it doesn't do anything, it has to be finished with a call to helper.update (task).

helper.waiter (queue [, timeout])

A background version of queue:wait ([timeout]), this task will wait up to timeout seconds for a task to appear in queue. The task will be returned by the helper.update() call. Its purpose is to allow a Lua thread (running under a coroutine-based dispatcher) to manage its own tasks, queues and threads.

Note that the timeout parameter is taken with respect to the task creation time, not with respect to the time the task was picked by the thread.

The C API

This API is defined in the helper.c file, for use by C programmers that use the Helper Threads Toolkit to create non-blocking libraries. For each 'task-able' operation, the library writer has to provide three C callback functions and register them as the task operations.

int (*prepare) (lua_State *L, void **udata)

This user-defined function is called in the main thread. It should get any needed parameters from the Lua State, and set *udata to a pointer holding any userdata associated with the task.

int (*work) (void *udata)

This callback is the one doing all the work. It gets called by the helper thread and shouldn't touch or access in any way the Lua state. The *udata provided is the same one set by prepare() for this task, it should point to a private structure holding any parameter needed.

One-shot tasks only need to do any work in this function, probably blocking on I/O, and store any results in the userdata structure. When this function finishes, the helper threads library will set the task's state to "Done" and put it in the output queue, maybe waking up a waiting Lua code.

Long-running tasks might want to signal events to the Lua code, maybe to return some intermediate results or maybe to request some more data to work. Use the signal_task() function for that.

int (*update) (lua_State *L, void *udata)

This callback is executed each time the Lua code calls the helper.update(task) function. It's has access both to the Lua state (to get any extra parameter and/or return any result) and to the task's userdata.

After this function returns, the task state will be updated. If the task was in the "Paused" state, it'll be reset to the "Busy" state and any waiting thread will be resumed. If it was in the "Done" state (because the work() callback has already returned), it'll be set to the "Finished" state and disposed soon.

typedef struct task_ops { int (*prepare) (lua_State *L, void **udata); int (*work) (void *udata); int (*update) (lua_State *L, void *udata); } task_ops;

This struct holds the three callbacks for a task. Used in the add_helperfunc() function and task_reg structure.

void add_helperfunc (lua_State *L, const task_ops *ops)

Used to create a task type associated with the callbacks in the ops parameter. Pushes in the Lua stack the Lua function that creates, prepares and returns the task.

typedef struct task_reg { const char *name; const task_ops *ops; } task_reg;

This struct associates a name with a task_ops structure. Used in the tasklib() function.

void tasklib (lua_State *L, const char *libname, const task_reg *l)

Analogous to the luaL_openlib() function. libname is a C string with the name to be given to the library. If libname is NULL, it uses the table that should be at the top of the Lua stack. l points to an array of task_reg structures (finished by a {NULL, NULL} record) specifying the names and callbacks of any 'task-able' operations. At return, the library's table is left on the Lua stack.

void signal_task (int pause)

This utility function can be called by the work callback to signal the Lua code. The current task will appear in the output queue, where it would eventually be picked by the queue:wait() function.

If pause is non-zero, the task is put in the "Paused" state and the thread blocks, waiting for the Lua code to call the helper.update(task) function. This is useful if the operation can't continue without some interaction with the Lua code.

Examples

timer.c

This code implements two different timers: one-shot and repeated-ticks.

nb_file.c

A small file I/O library. Uses the same file handles as the standard io package. Since the real I/O would occur at some time after the task creation, it's important not to manipulate the file using the same handler until the task is done. It's OK to use other handlers, even if they point to the same file (at least on POSIX systems).

nb_tcp.c

A very minimal TCP I/O library. Only those functions that would potentially block (newclient(), accept(), read(), write()) return a task; but others (newserver(), close()) don't need it and work as usual. There are two different objects: Server Port objects and TCP Stream objects. The first are used only to open a port and accept connections, while the second type represent opened connections, created either as clients (with newclient()), or as server, after accepting a connection.

sched.lua

A simple coroutine-based scheduler. Programs written using this scheduler shouldn't have to call any function from the helper package; just wrap any task-producing function with sched.yield() to block the current Lua thread.

Example 1:

For example, this is a scheduler-friendly file copying function that uses nb_file I/O functions:

local BLKSIZE = 10000
		
local function copy (infname, outfname)
	local infile = assert (io.open (infname, "r"))
	local outfile = assert (io.open (outfname, "w"))
		
	while true do
		local blk = sched.yield (nb_file.read (infile, BLKSIZE)))
		if blk == nil then break end
		
		sched.yield (nb_file.write (outfile, blk)))
	end
		
	outfile:close ()
	infile:close ()
end

Note how sched.yield() is used to hide all the managing of the task objects, so that the code looks like 'normal' blocking style code. This copy() function can be used like this to do two copies in parallell:

sched.add_thread (function ()
	copy ("srcfile", "dstfile-A")
end)
sched.add_thread (function ()
	copy ("srcfile", "dstfile-B")
end)

sched.run ()

Example 2:

A simple line-oriented "echo server"

require "helper"
require "sched"
require "nb_tcp"

function do_listen ()
	-- listens on a port and spawns a thread for each connection.
	local srv = assert (nb_tcp.newserver (3000))
	while true do
		local conn = assert (sched.yield (srv:accept ()))
		sched.add_thread (function ()
			handle_client (conn)
		end)
	end
end

function handle_client (conn)
	-- reads each line and responds with an echo.
	local ln
	repeat
		ln = assert (sched.yield (conn:read ("*l")))
		assert (sched.yield (conn:write ("=>"..ln.."\n")))
		print (ln)
	until ln == "quit"
	conn:close()
end

sched.add_thread (do_listen)
sched.run()

Copyright © 2006 Javier Guerra G. All rights reserved.