/usr/share/mozart/doc/apptut/node10.html is in mozart-doc 1.4.0-8ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 | <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN">
<HTML><HEAD><TITLE>8 Programming Patterns</TITLE><LINK href="ozdoc.css" rel="stylesheet" type="text/css"></HEAD><BODY><TABLE align="center" border="0" cellpadding="6" cellspacing="6" class="nav"><TR bgcolor="#DDDDDD"><TD><A href="node9.html#chapter.concurrency.cheap"><< Prev</A></TD><TD><A href="node8.html">- Up -</A></TD></TR></TABLE><DIV id="chapter.concurrency.patterns"><H1><A name="chapter.concurrency.patterns">8 Programming Patterns</A></H1><P>In this chapter, we present a number of patterns that take advantage of concurrency. When programming in Oz, you don't have to agonize over the question whether you really need to invest into a new thread. You just do it! This bears repeating because most people with experience of threads in other languages just don't believe it. Threads aren't just for long running computations: you can spawn threads to perform single operations asynchronously.</P><P>In the previous chapter, we demonstrated that it is realistic to create a huge number of threads. However, we exercised the worst case: all threads wanted a piece of the action all the time. In reality, the situation is usually much better: most threads are blocked, waiting for some event, and only a very small number of them compete for processor time.</P><H2><A name="label22">8.1 Stream Processing Agents</A></H2><P>A very common pattern is for a thread to implement an agent that processes all messages that appear on a stream. For example, here, procedure <CODE>Process</CODE> is applied to each element of stream <CODE>Messages</CODE>, one after the other: </P><BLOCKQUOTE class="code"><CODE><SPAN class="keyword">thread</SPAN> {ForAll Messages Process} <SPAN class="keyword">end</SPAN></CODE></BLOCKQUOTE><P> Typically, the tail of the stream is uninstantiated, at which point the <CODE>ForAll</CODE> procedure, and thus the thread, suspends until a new message comes in that instantiates the stream further.</P><DIV id="subsection.stream.merging"><H3><A name="subsection.stream.merging">8.1.1 Stream Merging</A></H3><P>As an application of this technique, we consider now the <EM>fair merge</EM> of two streams <CODE>L1</CODE> and <CODE>L2</CODE> into one single new stream <CODE>L3</CODE>. For this, we create the new port <CODE>Mailbox</CODE> connected to stream <CODE>L3</CODE>, and two agents to forward the messages of <CODE>L1</CODE> and <CODE>L2</CODE> to the <CODE>Mailbox</CODE>: </P><BLOCKQUOTE class="code"><CODE><SPAN class="keyword">proc</SPAN><SPAN class="variablename"> </SPAN>{<SPAN class="functionname">Merge</SPAN> L1 L2 L3}<BR> Mailbox = {Port<SPAN class="keyword">.</SPAN>new L3}<BR> <SPAN class="keyword">proc</SPAN><SPAN class="variablename"> </SPAN>{<SPAN class="functionname">Forward</SPAN> Msg} {Port<SPAN class="keyword">.</SPAN>send Mailbox Msg} <SPAN class="keyword">end</SPAN> <BR><SPAN class="keyword">in</SPAN> <BR> <SPAN class="keyword">thread</SPAN> {ForAll L1 Forward} <SPAN class="keyword">end</SPAN> <BR> <SPAN class="keyword">thread</SPAN> {ForAll L2 Forward} <SPAN class="keyword">end</SPAN> <BR><SPAN class="keyword">end</SPAN></CODE></BLOCKQUOTE><P> Fairness of merging is guaranteed by the fairness of thread scheduling. Actually, the code above can easily be generalized. Here is an abstraction that returns two results: a merged stream <CODE>L</CODE> and a procedure <CODE>AlsoMerge</CODE> to cause yet another stream to be merged into <CODE>L</CODE>: </P><BLOCKQUOTE class="code"><CODE><SPAN class="keyword">proc</SPAN><SPAN class="variablename"> </SPAN>{<SPAN class="functionname">MakeMerger</SPAN> AlsoMerge L}<BR> Mailbox = {Port<SPAN class="keyword">.</SPAN>new L}<BR> <SPAN class="keyword">proc</SPAN><SPAN class="variablename"> </SPAN>{<SPAN class="functionname">Forward</SPAN> Msg} {Port<SPAN class="keyword">.</SPAN>send Mailbox Msg} <SPAN class="keyword">end</SPAN> <BR><SPAN class="keyword">in</SPAN> <BR> <SPAN class="keyword">proc</SPAN><SPAN class="variablename"> </SPAN>{<SPAN class="functionname">AlsoMerge</SPAN> LL}<BR> <SPAN class="keyword">thread</SPAN> {ForAll LL Forward} <SPAN class="keyword">end</SPAN> <BR> <SPAN class="keyword">end</SPAN> <BR><SPAN class="keyword">end</SPAN></CODE></BLOCKQUOTE><P> </P></DIV><H2><A name="label23">8.2 Communication Patterns</A></H2><P>A great advantage of <EM>concurrency for free</EM> is that it gives you a new way to manage design complexity: you can partition your design into a number of small simple agents. You then use streams to connect them together: agents exchange and process messages.</P><P>There are two major designs for stream-based communication among agents: one is the <EM>email</EM> model, the other the <EM>newsgroup</EM> model. Of course, in realistic applications, you should mix these models as appropriate.</P><H3><A name="label24">8.2.1 Email Model</A></H3><P>In the email model, each agent is equipped with his own mailbox. In the simplest case, the agent is known to others only through its mailbox. For example, here is a function that takes a message processing function as argument, creates an agent, and returns its mailbox: </P><BLOCKQUOTE class="code"><CODE><SPAN class="keyword">proc</SPAN><SPAN class="variablename"> </SPAN>{<SPAN class="functionname">MakeAgent</SPAN> Process Mailbox}<BR> <SPAN class="keyword">thread</SPAN> {ForAll {Port<SPAN class="keyword">.</SPAN>new $ Mailbox} Process} <SPAN class="keyword">end</SPAN> <BR><SPAN class="keyword">end</SPAN></CODE></BLOCKQUOTE><P> </P><H3><A name="label25">8.2.2 Newsgroup Model</A></H3><P>In the newsgroup model, all agents process and post to the same stream of messages.</P><H4><A name="label26">Forward Inference Engine: Implementation</A></H4><P>We illustrate the newsgroup model with an application to forward inference rules. A forward inference rule has the form <IMG alt="\forall\bar{x}\; C\Rightarrow D" src="latex1.png"> where <IMG alt="C" src="latex2.png"> and <IMG alt="D" src="latex3.png"> are conjunctions of literals and all variables of the conclusion appear in the premise. The newsgroup will be where inferred literals are published. A rule is said to be partially recognized when some, but not all, of the premise literals have been discovered on the newsgroup. A partially recognized rule is implemented by an agent that reads the newsgroup in search of candidates for the next premise literal. When a rule has been fully recognized, its conclusion is then <EM>asserted</EM>, which normally results in the publication of new literals.</P><P>We express the engine in the form of a functor that exports the list of <CODE>Literals</CODE> being published as well as a procedure to <CODE>Assert</CODE> literals and rules. </P><DL><DT><SPAN class="chunktitle"><SPAN class="chunkborder"><</SPAN><A name="label27">Forward Inference Module</A><SPAN class="chunkborder">>=</SPAN></SPAN></DT><DD class="code"><CODE><SPAN class="keyword">functor</SPAN> <BR><SPAN class="keyword">import</SPAN> Search<BR><SPAN class="keyword">export</SPAN> Literals Assert<BR><SPAN class="keyword">define</SPAN> <BR> Literals Box={Port<SPAN class="keyword">.</SPAN>new Literals}<BR> </CODE><SPAN class="chunktitle"><SPAN class="chunkborder"><</SPAN><A href="node10.html#label28">Assert conclusion</A><SPAN class="chunkborder">></SPAN></SPAN><CODE> <BR> </CODE><SPAN class="chunktitle"><SPAN class="chunkborder"><</SPAN><A href="node10.html#label30">Replace symbolic by actual variables</A><SPAN class="chunkborder">></SPAN></SPAN><CODE> <BR> </CODE><SPAN class="chunktitle"><SPAN class="chunkborder"><</SPAN><A href="node10.html#label31">Agent for partially recognized rule</A><SPAN class="chunkborder">></SPAN></SPAN><CODE> <BR><SPAN class="keyword">end</SPAN></CODE></DD></DL><P> What can be asserted are literals and rules, and conjunctions thereof. A conjunction is represented as a list. Since we don't want to publish twice the same literal (or else we might have termination problems), we maintain here a database of all published literals, indexed according to their outermost predicate. A real implementation might prefer to replace this by an adaptive discrimination tree. Whenever we are about to publish a literal, we first check that it isn't already in the database: in that case, we enter it and then only publish it. </P><DL><DT><SPAN class="chunktitle"><SPAN class="chunkborder"><</SPAN><A name="label28">Assert conclusion</A><SPAN class="chunkborder">>=</SPAN></SPAN></DT><DD class="code"><CODE>Database = {Dictionary<SPAN class="keyword">.</SPAN>new}<BR><SPAN class="keyword">proc</SPAN><SPAN class="variablename"> </SPAN>{<SPAN class="functionname">Assert</SPAN> Conclusion}<BR> <SPAN class="keyword">if</SPAN> {IsList Conclusion} <SPAN class="keyword">then</SPAN> {ForAll Conclusion Assert}<BR> <SPAN class="keyword">elsecase</SPAN> Conclusion <SPAN class="keyword">of</SPAN> rule(VarList Premises Conclusion) <SPAN class="keyword">then</SPAN> <BR> </CODE><SPAN class="chunktitle"><SPAN class="chunkborder"><</SPAN><A href="node10.html#label29">Assert rule</A><SPAN class="chunkborder">></SPAN></SPAN><CODE> <BR> <SPAN class="keyword">else</SPAN> <BR> Pred = {Label Conclusion}<BR> Lits = {Dictionary<SPAN class="keyword">.</SPAN>condGet Database Pred nil}<BR> <SPAN class="keyword">in</SPAN> <BR> <SPAN class="keyword">if</SPAN> {Member Conclusion Lits} <SPAN class="keyword">then</SPAN> <SPAN class="keyword">skip</SPAN> <SPAN class="keyword">else</SPAN> <BR> {Dictionary<SPAN class="keyword">.</SPAN>put Database Pred Conclusion<SPAN class="keyword">|</SPAN>Lits}<BR> {Port<SPAN class="keyword">.</SPAN>send Box Conclusion}<BR> <SPAN class="keyword">end</SPAN> <BR> <SPAN class="keyword">end</SPAN> <BR><SPAN class="keyword">end</SPAN></CODE></DD></DL><P> Asserting a rule consists of creating an agent to recognize it. The agent is equipped with (1) an index (2) a predicate. The index indicates which premise literal to recognize next; it starts from <CODE>N</CODE>, the last one, and decreases down to 1. The predicate constrains a representation <CODE>rule(premises:P conclusion:C)</CODE> of the partially recognized rule. In order to create this representation, we invoke <CODE>Abstract</CODE> to replace the quantified symbolic variables of the rule by new free Oz variables. </P><DL><DT><SPAN class="chunktitle"><SPAN class="chunkborder"><</SPAN><A name="label29">Assert rule</A><SPAN class="chunkborder">>=</SPAN></SPAN></DT><DD class="code"><CODE><SPAN class="keyword">local</SPAN> <BR> N = {Length Premises}<BR> <SPAN class="keyword">proc</SPAN><SPAN class="variablename"> </SPAN>{<SPAN class="functionname">RulePredicate</SPAN> RuleExpression}<BR> <SPAN class="keyword">case</SPAN> {Abstract VarList Premises<SPAN class="keyword">#</SPAN>Conclusion} <SPAN class="keyword">of</SPAN> P<SPAN class="keyword">#</SPAN>C <SPAN class="keyword">then</SPAN> <BR> RuleExpression=rule(premises:P conclusion:C)<BR> <SPAN class="keyword">end</SPAN> <BR> <SPAN class="keyword">end</SPAN> <BR><SPAN class="keyword">in</SPAN> {Agent N RulePredicate} <SPAN class="keyword">end</SPAN></CODE></DD></DL><P> Below, we create a mapping from symbolic variables to new free Oz variables, then recursively process the expression to effect the replacements. Note that we cary the list <CODE>Avoid</CODE> of symbolic variables that are quantified in a nested rule expression. </P><DL><DT><SPAN class="chunktitle"><SPAN class="chunkborder"><</SPAN><A name="label30">Replace symbolic by actual variables</A><SPAN class="chunkborder">>=</SPAN></SPAN></DT><DD class="code"><CODE><SPAN class="keyword">fun</SPAN><SPAN class="variablename"> </SPAN>{<SPAN class="functionname">Abstract</SPAN> VarList E}<BR> Vars = {Record<SPAN class="keyword">.</SPAN>make o VarList}<BR> <SPAN class="keyword">fun</SPAN><SPAN class="variablename"> </SPAN>{<SPAN class="functionname">Loop</SPAN> E Avoid}<BR> <SPAN class="keyword">if</SPAN> {IsAtom E} <SPAN class="keyword">then</SPAN> <BR> <SPAN class="keyword">if</SPAN> {Member E Avoid} <SPAN class="keyword">then</SPAN> E<BR> <SPAN class="keyword">elseif</SPAN> {HasFeature Vars E} <SPAN class="keyword">then</SPAN> Vars<SPAN class="keyword">.</SPAN>E<BR> <SPAN class="keyword">else</SPAN> E <SPAN class="keyword">end</SPAN> <BR> <SPAN class="keyword">elseif</SPAN> {IsRecord E} <SPAN class="keyword">then</SPAN> <BR> <SPAN class="keyword">case</SPAN> E <SPAN class="keyword">of</SPAN> rule(VarL Prem Conc) <SPAN class="keyword">then</SPAN> <BR> rule(VarL {Loop Prem {Append VarL Avoid}}<BR> {Loop Conc {Append VarL Avoid}})<BR> <SPAN class="keyword">else</SPAN> {Record<SPAN class="keyword">.</SPAN>map E <SPAN class="keyword">fun</SPAN><SPAN class="variablename"> </SPAN>{<SPAN class="functionname">$</SPAN> F} {Loop F Avoid} <SPAN class="keyword">end</SPAN>} <SPAN class="keyword">end</SPAN> <BR> <SPAN class="keyword">else</SPAN> E <SPAN class="keyword">end</SPAN> <BR> <SPAN class="keyword">end</SPAN> <BR><SPAN class="keyword">in</SPAN> {Loop E nil} <SPAN class="keyword">end</SPAN></CODE></DD></DL><P> The agent is equipped with the index <CODE>I</CODE> of the next premise literal to be recognized and with <CODE>RulePredicate</CODE> to constrain the representation of the partially recognized rule. For each literal that is being published, the agent finds all possible solutions that result from unifying it with the <CODE>I</CODE>th premise literal, and produces the corresponding refined predicates. For each new predicate produced, a new agent is created to recognize the next premise literal; unless of course all premise literals have been recognized, in which case we retrieve the corresponding instantiated conclusion and assert it. </P><DL><DT><SPAN class="chunktitle"><SPAN class="chunkborder"><</SPAN><A name="label31">Agent for partially recognized rule</A><SPAN class="chunkborder">>=</SPAN></SPAN></DT><DD class="code"><CODE><SPAN class="keyword">proc</SPAN><SPAN class="variablename"> </SPAN>{<SPAN class="functionname">Agent</SPAN> I RulePredicate}<BR> {ForAll Literals<BR> <SPAN class="keyword">proc</SPAN><SPAN class="variablename"> </SPAN>{<SPAN class="functionname">$</SPAN> Literal}<BR> {ForAll<BR> {Search<SPAN class="keyword">.</SPAN>allP<BR> <SPAN class="keyword">proc</SPAN><SPAN class="variablename"> </SPAN>{<SPAN class="functionname">$</SPAN> RuleExpression}<BR> {RulePredicate RuleExpression}<BR> {Nth RuleExpression<SPAN class="keyword">.</SPAN>premises I}=Literal<BR> <SPAN class="keyword">end</SPAN> 1 _}<BR> <SPAN class="keyword">proc</SPAN><SPAN class="variablename"> </SPAN>{<SPAN class="functionname">$</SPAN> NewRulePredicate}<BR> <SPAN class="keyword">if</SPAN> I<SPAN class="keyword">==</SPAN>1 <SPAN class="keyword">then</SPAN> {Assert {NewRulePredicate}<SPAN class="keyword">.</SPAN>conclusion}<BR> <SPAN class="keyword">else</SPAN> <SPAN class="keyword">thread</SPAN> {Agent I<SPAN class="keyword">-</SPAN>1 NewRulePredicate} <SPAN class="keyword">end</SPAN> <SPAN class="keyword">end</SPAN> <BR> <SPAN class="keyword">end</SPAN>}<BR> <SPAN class="keyword">end</SPAN>}<BR><SPAN class="keyword">end</SPAN></CODE></DD></DL><P></P><H4><A name="label32">Forward Inference Engine: Usage</A></H4><P>The functor can be compiled as follows: </P><BLOCKQUOTE class="code"><CODE>ozc -c forward.oz</CODE></BLOCKQUOTE><P> and you might experiment with it in the OPI as follows (where <I>DIR</I> is the directory where the compiled functor is located). </P><BLOCKQUOTE class="code"><CODE><SPAN class="keyword">declare</SPAN> [Forward] = {Module<SPAN class="keyword">.</SPAN>link [<SPAN class="string">'</SPAN></CODE><I>DIR</I><CODE><SPAN class="string">/forward.ozf'</SPAN>]}<BR>{Browse Forward<SPAN class="keyword">.</SPAN>literals}<BR>{Forward<SPAN class="keyword">.</SPAN>assert rule([x y z] [a(x y) a(y z)] a(x z))}<BR>{Forward<SPAN class="keyword">.</SPAN>assert a(one two)}<BR>{Forward<SPAN class="keyword">.</SPAN>assert a(two three)}</CODE></BLOCKQUOTE><P> We asserted one rule expressing the transitivity of binary predicate <CODE>a</CODE>, and then two facts. In the browser, you will now observe: </P><BLOCKQUOTE class="code"><CODE>a(one two)<SPAN class="keyword">|</SPAN>a(two three)<SPAN class="keyword">|</SPAN>a(one three)<SPAN class="keyword">|</SPAN>_<SPAN class="keyword"><</SPAN>Future<SPAN class="keyword">></SPAN></CODE></BLOCKQUOTE><P></P><H2><A name="label33">8.3 Synchronization</A></H2><P>In Oz, synchronization is done on data and typically takes the form of waiting for a variable to become instantiated. Furthermore, this happens automatically: every operation that requires <EM>determined</EM> data will suspend until this data becomes determined. For example, this is why you can write: </P><BLOCKQUOTE class="code"><CODE>{ForAll Messages Process}</CODE></BLOCKQUOTE><P> where <CODE>Messages</CODE> is a stream whose tail only incrementally becomes instantiated with new messages. The <CODE>ForAll</CODE> operation suspends when it reaches the uninstantiated tail of the stream, and resumes automatically when further messages become available.</P><P>If you need to synchronize explicitly on a variable <CODE>X</CODE>, you may write: </P><BLOCKQUOTE class="code"><CODE>{Wait X}</CODE></BLOCKQUOTE><P> which suspends this thread until <CODE>X</CODE> becomes determined.</P><P>The truth is actually much more general: a conditional suspends until its condition can be decided, one way or the other. What makes this possible is the fact that the information in the <EM>constraint store</EM> increases monotonically. A conditional suspends until its condition is <EM>entailed</EM> by the store (implied), or <EM>disentailed</EM> (its negation is implied). Thus, the <CODE>Wait</CODE> operation mentioned above can (almost) be coded as follows: </P><BLOCKQUOTE class="code"><CODE><SPAN class="keyword">proc</SPAN><SPAN class="variablename"> </SPAN>{<SPAN class="functionname">Wait</SPAN> X}<BR> <SPAN class="keyword">if</SPAN> X<SPAN class="keyword">==</SPAN>a <SPAN class="keyword">then</SPAN> <SPAN class="keyword">skip</SPAN> <SPAN class="keyword">else</SPAN> <SPAN class="keyword">skip</SPAN> <SPAN class="keyword">end</SPAN> <BR><SPAN class="keyword">end</SPAN></CODE></BLOCKQUOTE><P> This suspends until it can be decided whether or not <CODE>X</CODE> is equal to <CODE>a</CODE>. I said ``almost'' because in between being free and determined, a variable may be kinded (i. e. its type is known), and the code above does not account for this possibility.</P><P>The <CODE>ForAll</CODE> procedure is actually implemented as follows: </P><BLOCKQUOTE class="code"><CODE><SPAN class="keyword">proc</SPAN><SPAN class="variablename"> </SPAN>{<SPAN class="functionname">ForAll</SPAN> L P}<BR> <SPAN class="keyword">case</SPAN> L <SPAN class="keyword">of</SPAN> H<SPAN class="keyword">|</SPAN>T <SPAN class="keyword">then</SPAN> {P X} {ForAll T P}<BR> <SPAN class="keyword">elseof</SPAN> nil <SPAN class="keyword">then</SPAN> <SPAN class="keyword">skip</SPAN> <SPAN class="keyword">end</SPAN> <BR><SPAN class="keyword">end</SPAN></CODE></BLOCKQUOTE><P> The case statement (a conditional) suspends until it can be determined whether <CODE>L</CODE> matches <CODE>H<SPAN class="keyword">|</SPAN>T</CODE>, i. e. is a list pair.</P><H3><A name="label34">8.3.1 The Short-Circuit Technique</A></H3><P>The short-circuit technique is the standard means of programming an n-way rendez-vous in concurrent constraint programming. The problem is the following: given n concurrent threads, how to synchronize on the fact that they have all terminated? The idea is to have a <EM>determined</EM> termination token, and to require that each thread, when it terminates, passes the token that it got from its left neighbour to its right neightbour. When the termination token really arrives at the <EM>rightmost</EM> end, we know that all threads have terminated.</P><P>For example, in the example below, we create <CODE>Token0</CODE> with value <CODE><SPAN class="keyword">unit</SPAN></CODE>, and then each thread, when it terminates, passes the token on to the next thread. When the value <CODE><SPAN class="keyword">unit</SPAN></CODE> reaches <CODE>Token5</CODE>, we know that all threads have terminated. </P><BLOCKQUOTE class="code"><CODE><SPAN class="keyword">local</SPAN> Token0 Token1 Token2 Token3 Token4 Token5 <SPAN class="keyword">in</SPAN> <BR> Token0 = <SPAN class="keyword">unit</SPAN> <BR> <SPAN class="keyword">thread</SPAN> <SPAN class="keyword">...</SPAN> Token1=Token0 <SPAN class="keyword">end</SPAN> <BR> <SPAN class="keyword">thread</SPAN> <SPAN class="keyword">...</SPAN> Token2=Token1 <SPAN class="keyword">end</SPAN> <BR> <SPAN class="keyword">thread</SPAN> <SPAN class="keyword">...</SPAN> Token3=Token2 <SPAN class="keyword">end</SPAN> <BR> <SPAN class="keyword">thread</SPAN> <SPAN class="keyword">...</SPAN> Token4=Token3 <SPAN class="keyword">end</SPAN> <BR> <SPAN class="keyword">thread</SPAN> <SPAN class="keyword">...</SPAN> Token5=Token4 <SPAN class="keyword">end</SPAN> <BR><SPAN class="keyword">in</SPAN> <BR> %% <SPAN class="comment">synchronize on the termination of all 5 threads<BR></SPAN> {Wait Token5}<BR><SPAN class="keyword">end</SPAN></CODE></BLOCKQUOTE><P> This technique was used in <A href="node9.html#death.in.oz">Section ``Death by Concurrency in Oz''</A>. Of course it can be used for any arbitrary n-way rendez-vous, and not exclusively for synchronizing on the termination of a collection of threads.</P></DIV><TABLE align="center" border="0" cellpadding="6" cellspacing="6" class="nav"><TR bgcolor="#DDDDDD"><TD><A href="node9.html#chapter.concurrency.cheap"><< Prev</A></TD><TD><A href="node8.html">- Up -</A></TD></TR></TABLE><HR><ADDRESS><A href="http://www.ps.uni-sb.de/~duchier/">Denys Duchier</A>, <A href="http://www.ps.uni-sb.de/~kornstae/">Leif Kornstaedt</A> and <A href="http://www.ps.uni-sb.de/~schulte/">Christian Schulte</A><BR><SPAN class="version">Version 1.4.0 (20110908185330)</SPAN></ADDRESS></BODY></HTML>
|