Friday, October 30, 2009

Object Calls == Message Passing in Erlang

I started playing around with Erlang last night as a result of learning about Basho's key-value store Riak at NoSQL East yesterday (more specifically, it was due to Justin Sheehy's talk, and two realizations: (1) this guys *gets* a lot of important *operational* design choices in this space, and (2) he decided to build his system in Erlang).

So I decided to read the online Erlang "course" and started working on the exercises. One of them was:

Write a function which starts N processes in a ring, and sends a message M times around all the processes in the ring. After the messages have been sent the processes should terminate gracefully. picture of a unidirectional ring of nodes

And so, summoning vaguely-remembered lectures by Bob Harper in "Fundamentals of Computer Science II" at CMU on doing object-oriented programming in Scheme, and remembering that the original Smalltalk OOP guys always said "send an object a message" rather than "invoke a method on an object", I set to work. [Editor's note: please feel free to post comments showing me better ways, I have known Erlang for all of about 12 hours at this point!]

Let's get the declarations out of the way. I need to define the entry point function which creates the ring of N nodes and sends the message around it M times, and I know I'm going to need a function representing a node in the ring, since I'm going to have to spawn processes for them.

-export([ring_msg/3, ring_node/1]).

Ok, what job does a node in the ring have? Well, most of the time, when it receives a message, it just needs to pass it on to the next guy. So my node process is going to need to know about its next neighbor. Now in Erlang, what I would normally think of as an object can be modelled as a recursive function that passes its current state back into itself as an argument, and processes "method calls" by receiving messages. Interestingly, not all method calls actually have to send something back to the caller!

ring_node(Next) ->
    { pass, M, Msg } ->
      % Note that we got this message. 
      io:format("Node ~w~n",[Msg]),
      % Pass the message on around the ring.
      Next ! { pass, M, Msg },
      % If the count was down to zero, I can
      % exit, otherwise, I loop and wait for
      % the next incoming message.
      if M == 0 -> ok;
         true -> ring_node(Next)
Ok, seems pretty straightforward. But if I had a ring of these set up, a message would just keep running around the ring. At least one node needs to be special, so that it can decrement the count M as the message comes through. It's pretty similar to the ring_node above, but is a little different.
init_node(Next) -> receive
    % message has been all the way around
    % the last time, so I can quit
    { pass, 0, _ } -> ok;
    % otherwise, log the message and pass
    % it on, decrementing the count
    { pass, M, Msg } ->
      io:format("Node ~w~n",[Msg]),
      Next ! { pass, M-1, Msg },
Now an interesting thing here is that the init_node and the ring_node can both handle the "pass" message, and that when they send the message on, they don't actually care what their "Next" process is. It's like both of these "objects" implement the following interface:
public interface MessagePasser {
  void pass(int count, Object msg);
Ok, so now if we can create a ring with 1 init_node and (N-1) ring_nodes, we're all set if we inject the initial Msg into the init_node. So let's think about constructing a ring of nodes; if we have a node handy, we can pass that in as the initial argument (think "constructor") to a ring_node process to use as its Next node, then we just count down:
ring(Last, 0) -> Last;
ring(Last, N) -> 
  RN = spawn(ring, ring_node, [Last]),
  ring(RN, N-1).
Hmm, that's close, but that's a linked-list of nodes, not a ring. But we can't pass a node in as a constructor argument to the first node we create, because we don't have any yet! So it seems like we'll need to construct a linked-list of nodes, and then "close the loop" by stitching the front and the back together. Our init_node is already a special node, so maybe we can extend it this way:
init_node(Next) -> receive
    % acknowledge the request, update state
    { setNext, N, From } -> From ! ok, init_node(N);
In other words, the init_node can get a special message telling it to "update" its Next state. In some sense, we've just done this:
public interface InitNode extends MessagePasser {
  void setNext(MessagePasser N);
We want to acknowledge the request so our ring construction knows when that message has been processed -- we don't want to hand the ring back until it's all stitched together, and we can't guarantee ordering of message delivery unless we specifically wait for a response. So here's the full ring construction:
ring(N) when is_integer(N) ->
  % just pass in a placeholder for Next
  RN0 = spawn(ring, init_node, [nil]),
  ring(RN0, RN0, N-1);
% finished stitching, can return our
% init node to the caller
ring(Init) -> receive ok -> Init end.
ring(Init, Last, 0) -> Init ! { setNext, Last, self()}, ring(Init);
ring(Init, Last, N) ->
  RN = spawn(ring, ring_node, [Last]),
  ring(Init, RN, N-1).
Finally, the thing we're trying to do (including optimizing the degenerate cases):
ring_msg(0, _, _) -> ok;
ring_msg(_, 0, _) -> ok;
ring_msg(N, M, Msg) ->
  Init = ring(N), Init ! { pass, M, Msg }, ok.
Actually runs, too! It's pretty neat to see polymorphism via being able to accept the same message, and I've always loved the pattern matching in ML (both SML and OCaml variants!). Some pretty serious systems programs are getting written in this language; it's clear that the process spawning methodology lends itself well to a SEDA-style approach which is great for graceful degradation of service, and the fully-functional style (no mutation) means that you have no locks, no shared state, and hence safe concurrency (as long as you can model what you're doing properly).


Jon Moore said...

Ah, just realized that I could make the init_node and the ring_node the same if I multiplied the message counter out with N*M initially, had each node decrement before passing, and then gracefully die if the counter is < N (meaning it is on its last circuit). Refactor, refactor, refactor.

Dan said...

I handled the stitching of the endpoints without a new message. You could argue that I cheated, but it works since erlang seems to hold the message for a process until it exists (confirmed by adding a sleep before the last node() call). I also counted messages at each node individually which simplified things a bit. Not sure which way I like better.

(sorry, can't use <pre> in a comment)
-export([start/2, make_node/2, node/3]).

node(Id, _, 0) ->
  io:format("Node ~p exiting~n", [Id]),

node(Id, Next, Num_rem) ->
    M ->
      io:format("Node ~p got msg (~p left)~n", [Id, Num_rem - 1]),
      Next ! M,
      node(Id, Next, Num_rem - 1)

make_node(1, NMsgs) ->
  spawn(conc2, node, [1, self(), NMsgs]);

make_node(NNodes, NMsgs) ->
  spawn(conc2, node, [NNodes, make_node(NNodes - 1, NMsgs), NMsgs]).

start(NNodes, NMsgs) ->
  First = spawn(conc2, node,
         [NNodes - 1, make_node(NNodes - 2, NMsgs), NMsgs]),
  First ! yo,
  node(NNodes, First, NMsgs).