[ Reference Manual | Alphabetic Index ]

Engines and Threads

Built-ins for controlling independent engines and threads   [more]

Predicates

condition_signal(+Handle, +Mode)
Signal a condition on a handle
condition_wait(+Handle, +Timeout)
Wait for a condition to be signaled on a handle
engine_clone(+Original, +Clone, +Options)
Clone the complete state of an engine
engine_create(-Engine, ++Options)
Create a new ECLiPSe engine
engine_join(+Engine, +Timeout, -Status)
Wait for an engine to stop running, and return its status
engine_post(+Engine, +EventGoal)
Post an event goal to an engine
engine_resume(+Engine, ?Term, -Status)
Resume execution of an engine
engine_resume_thread(+Engine, ?Term)
Asynchronously resume execution of an engine
engine_self(-Engine)
Get a handle of the engine executing the call
get_engine_property(+Engine, +Name, -Value)
Obtain properties of an engine, including current status
with_mutex(++Handle, +Goal)
Equivalent to once(Goal) but with mutual exclusion
yield(+ToParent, -FromParent)
Stop the running engine in the yielded-state, and wait for resume

Description

Engines

An 'engine' is an entity with independent control flow and data areas. This implies:

On the other hand, engines share or can share the following:

Traditionally, ECLiPSe had only one (implicit) engine per process. The multi-engine model is a slight generalization of the engine model in the ECLiPSe foreign language interfaces.

Engines can be created, instructed to execute ECLiPSe code, and their status and results queried. The programming paradigm is that of an engine as a co-routine, which is either running or stopped. An engine is set running using a resume-operation, and will then run until it comes to a stop (which can be for several reasons, and is encoded in the engine status).

Example:

    ?-  engine_create(E, []),
        engine_resume(E, writeln(hello), Status).
    hello

    E = $&(engine,"376oe7")
    Status = true
    Yes (0.00s cpu)
Here, an engine E is created, and set running (resumed) with a goal to execute. When the engine terminates, the engine_resume/3 returns with a status code. Since the goal succeeded, the status is 'true'.

A more interesting case is where the engine executes a nondeterministic goal:

    ?-  engine_create(E, []),
        engine_resume(E, (
                member(X,[one,two]),
                writeln(X)
            ), S1),
        writeln(first_resume:S1),
        engine_resume(E,
                fail, S2),
        writeln(second_resume:S2),
        engine_resume(E,
                fail, S3),
        writeln(third_resume:S3).

    one
    first_resume : true
    two
    second_resume : true
    third_resume : false
On the first resume, the engine executes the goal and succeeds with the first solution. At this point, despite returning from the resume, the engine retains all computation state associated with the success of the executed goal. In particular, it can be made to backtrack, and deliver another solution. This is done by resuming with a 'fail'. This second resume returns again with a status of 'true', indicating another solution. The third resume returns 'false', indicating no further solution. The engine is now in the same state as just after creation.

Engines and Threads

An engine can be resumed in its own thread, which makes it execute concurrently with the resumer:

Example:

    ?-  engine_create(E, []),
        engine_resume_thread(E, writeln(hello)),
        % do other work here, while engine is running concurrently
        % ...,
        engine_join(E, block, Status).
    hello

    E = $&(engine,"376oe7")
    Status = true
    Yes (0.00s cpu)
The only difference from above is that the engine_resume/3 operation is now split into engine_resume_thread/2 and engine_join/3. The engine_resume_thread/2 sets the engine running in its own thread, and returns immediately. The main program can now do work independently of the separately running engine. To re-synchronize, engine_join/3 is called, which, if necessary, will wait for the engine to stop, and report its status.

The backtracking example above works equally well by replacing the engine_resume/3 calls with pairs of engine_resume_thread/2 and engine_join/3.

Engine status

An engine can be in one of the following states:

running
Engine is running and cannot be used, except for engine_join/3, get_engine_property/3 and engine_post/2.
true
Engine is stopped after success of its goals. Engine is ready to be resumed with new (additional) goals, which form a conjunction with the already succeeded goals.
false
Engine is stopped after failure of its goals. Engine is ready to be resumed with new goals. This is also the initial state of a new engine.
exception(Term)
Engine is stopped after an uncaught throw(Term). Engine is ready to be resumed with new goals.
yielded(Out)
Engine is stopped in a yield(Out,In) call. The term Out is a copy of the Out argument of the yield/2 call. The engine is ready to be resumed. When resumed, the yield/2 call will succeed, with its In argument unified with a copy of the resume's Term argument.
flushio(Stream)
Engine is stopped in a flush operation of the queue-stream Stream. Data is ready to be read from the Stream, after which the engine should be resumed. After resume, the engine will continue after the flush operation. Any term passed to the resume is ignored.
waitio(Stream)
Engine is stopped in a read operation of the empty queue-stream Stream. Data is expected to be written to the Stream, after which the engine should be resumed. After resume, the engine will continue with its read operation. Any term passed to the resume is ignored.
exited(ExitCode)
Engine has stopped following an exit(ExitCode) where ExitCode is an integer. The engine cannot be resumed.
The status code is returned by engine_resume/3, engine_join/3 and can also be obtained via get_engine_property/3.

Communication via yield/2

The yield/2 built-in is used in combination with resume to synchronize execution and exchange data directly between two engines:

    ?-  engine_create(E, []),
        engine_resume(E, (yield(hello,In),writeln(received(In))), S1),
        writeln(S1),
        engine_resume(E, dear, S2),
        writeln(S2).
    yielded(hello)
    received(dear)
    true

The next example does the same as findall/3, but using an auxiliary engine:

    engine_findall(X, Goal, Xs) :-
        engine_create(E, []),
        engine_resume(E, (Goal,yield(X,Cont),Cont), Status),
        (
            fromto(Status,yielded(Sol),Status1,false),
            foreach(Sol,Xs),
            param(E)
        do
            engine_resume(E, fail, Status1)
        ).

The next example adds solutions lazily to a list:

    lazy_findall(X, Goal, Xs) :-
        engine_create(E, [thread]),
        engine_resume_thread(E, (Goal,yield(X,_),fail)),
        solutions(E, Xs).

    delay solutions(_,Xs) if var(Xs).
    solutions(E, Xs) :-
        engine_join(E, block, Status),
        ( Status = yielded(X) ->
            Xs = [X|Xs1],
            engine_resume_thread(E, fail),
            solutions(E, Xs1)
        ; Status = false ->
            Xs = []
        ;
            throw(unexpected_status(Status))
        ).


    ?- lazy_findall(X, member(X,[a,b,c]), Xs), Xs = [A,B|T].
    Xs = [a, b|T]
    A = a
    B = b
    T = T
    Delayed goals:
	    solutions($&(engine,"376oe7"), T)
    Yes (0.00s cpu)

Communication via in-memory queue streams

The flushio and waitio functionality refers to queue-streams, which can be used for communication. These should be set up as follows:

    open(queue(""), update, Stream, [yield(on)])
This should be done in the main program, and the Stream handle passed as an argument to the goal during resume.

Example for returning data from an engine via a queue:

    ?-  open(queue(""), update, Q, [yield(on)]),
        engine_create(E, []),
        engine_resume(E, (write(Q,hello),flush(Q)), Status),
        ( Status = flushio(Stream) ->
            read(Stream, Result)
        ;
            writeln(unexpected:Status)
        ).

    Q = $&(stream,7)
    E = $&(engine,"376oe7")
    Status = flushio(7)
    Result = hello

Example for passing data to engine via a queue:

    ?-  open(queue(""), update, Q, [yield(on)]),
        engine_create(E, []),
        engine_resume(E, (read(Q,Term),writeln(received(Term))), Status),
        ( Status = waitio(Stream) ->
            write(Stream, hello),
            engine_resume(E, unused, _)
        ;
            writeln(unexpected:Status)
        ).

    received(hello)

    Q = $&(stream,9)
    E = $&(engine,"376nlr")
    Status = waitio(9)

Communication via shared data structures

Data can be transferred via non-backtrackable storage, i.e. bags, stores, shelves, or records:

    ?-  bag_create(B),
        engine_create(E, [thread]),
        engine_resume_thread(E, (between(1,20,1,X),bag_enter(B,X),fail)),
        engine_join(E, block, Status),
        bag_retrieve(B, Xs).

    B = $&(bag,"1hqv")
    E = $&(engine,"376nlr")
    Status = false
    Xs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, ...]
The anonymous (handle-based) versions of these facilities should be used, as they are more efficient and are disposed of automatically by garbage collection. The use of dynamic predicates and assert/retract is not recommended.

Mutual Exclusion

The basic operations on non-backtrackable storage objects (bags, shelves, stores, records) and streams are all atomic, i.e. operations such as

are all safe to use in a multi-threaded situation, and access the storage or stream object under mutual exclusion. To make larger code sections atomic, use with_mutex/2. As most objects internally contain a mutex, their handles themselves can be used as the mutex. In the following example we make sure that a sequence of write operations cannot be interrupted by another thread writing to the same stream:
    atomic_write_list(Stream, Xs) :-
        with_mutex(Stream, write_list(Stream, Xs)).

    write_list(Stream, Xs) :-
        ( foreach(X,Xs), param(Stream) do writeln(Stream, X) ).
Without with_mutex/2, each writeln/2 would be atomic, but another thread that has the same stream handle could write to the stream between these writeln-calls.

Here is an example of how to make a read-modify-write operation on a store thread-safe:

    add_to_store(Store, Key, Increment) :-
        with_mutex(Store, (
            store_get(Store, Key, OldVal),
            NewVal is OldVal+Increment,
            store_set(Store, Key, OldVal)
        )).

A couple of common atomic operations have been introduced as new primitives:
shelf_get_and_dec(+Shelf,+Index,-Value)
get and post-decrement an integer stored in a shelf slot
shelf_inc_and_get(+Shelf,+Index,-Value)
pre-increment an get an integer stored in a shelf slot
shelf_test_and_set(+Shelf,+Index,+Old,+New)
if the old shelved value is identical to Old, set it to New, else fail

Waiting for Conditions

The following predicates cause a the calling engine-thread to block until a particular continuation condition arises:

sleep(Seconds)
puts the thread to sleep for the given time.
stream_select(Streams,Timeout,ReadyStreams)
puts the thread to sleep until streams are ready for I/O.
engine_join(Engine,Timeout,Status)
puts the current thread to sleep until Engine stops running.
record_wait_remove(+Queue,?Value,+Timeout)
puts the current thread to sleep until entry Value appears in Queue, then removes this entry.
record_wait_append(+Queue,+Value,+Timeout,+Max)
puts the current thread to sleep until Queue has less than Max entries, then enters Value into Queue.
condition_wait(Handle,Timeout)
general lower-level operation for synchronizing access to shared data structures: puts the current thread to sleep until Handle is signaled via condition_signal/2.

Communication via Record-Queues

The synchronized operations record_wait_append/4 and record_wait_remove/3 can be used to implement bounded queues for thread communication. On the receiving side, record_wait_remove/3 waits for entries to appear in the queue. On the sending side, record_wait_append/4 waits for the queue to have space for new entries. Arbitrary terms can be transferred.

    produce_consume(N) :-
	record_create(Q),
	engine_create(E, []),
	engine_resume_thread(E, consume(Q)),
	produce(Q, N).

    produce(Q, N) :-
	( for(I,1,N), param(Q) do
	    writeln(producing(I)),
	    record_wait_append(Q, I, block, 20)
	).

    consume(Q) :-
	record_wait_remove(Q, Msg, block),
	writeln(consuming(Msg)),
	consume(Q).

Interrupting Running Engines

ECLiPSe's event mechanism can be used to interrupt a running engine and to insert an arbitrary goal into the execution. This is the same mechanism used for timer-controlled after-events and for signal-triggered events.

Use engine_post/2 to post such a goal to a particular engine:

?- engine_create(E, [thread]),
   engine_resume_thread(E, (repeat,fail)),   % run forever
   sleep(3),
   engine_post(E, abort),
   engine_join(E, block, Status).

E = $&(engine,"376oe7")
Status = exception(abort)
Yes (3.00s cpu)

See Also

record_wait_append / 4, record_wait_remove / 3, exit / 1
Generated from engines.eci on 2022-09-03 14:26