Archive

Posts Tagged ‘lambdas’

Acting In C++

May 21, 2015 7 comments

Because they’re a perfect fit for mega-core processors and they’re safer and more enjoyable to program than raw, multi-threaded, systems, I’ve been a fan of Actor-based concurrent systems ever since I experimented with Erlang and Akka (via its Java API). Thus, as a C++ programmer, I was excited to discover the “C++ Actor Framework” (CAF). Like the Akka toolkit/runtime, the CAF is based on the design of the longest living, proven, production-ready, pragmatic, actor-based, language that I know of – the venerable Erlang programming language.

Curious to check out the CAF, I downloaded, compiled, and installed the framework source code with nary a problem (on CentOS 7.0, GCC 4.8.2). I then built and ran the sample “Hello World” program presented in the user manual:

CAF HW

As you can see, the CAF depends on the use one of my favorite C++11 features – lambda functions – to implement actor behavior.

The logic in main() uses the CAF spawn() function to create two concurrently running, message-exchanging (messages of type std::string), actors named “mirror” and “hello_world“. During the spawning of the “hello_world” actor, the main() logic passes it a handle (via const actor&) to the previously created “mirror” actor so that “hello_world” can asynchronously communicate with “mirror” via message-passing. Then, much like the parent thread in a multi-threaded program “join()“s the threads it creates to wait for the threads to complete their work, the main() thread of control uses the CAF await_all_actors_done() function to do the same for the two actors it spawns.

When I ran the program as a local C++ application in the Eclipse IDE, here is the output that appeared in the console window:

CAF mirror

The figure below visually illustrates the exchange of messages between the program’s actors during runtime. Upon being created, the “hello_world” actor sends a text message (via sync_send()) containing “Hello World” as its payload to the previously spawned “mirror” actor. The “hello_world” actor then immediately chains the then() function to the call to synchronously await for the response message it expects back from the “mirror” actor. Note that a lambda function that will receive and print the response message string to the console (via the framework’s thread-safe aout object) is passed as an argument into the then() function.

CAF HW actors

Looking at the code for the “mirror” actor, we can deduce that when the framework spawns an instance of it, the framework gives the actor a handle to itself and the actor returns the behavior (again, in the form of a lambda function) that it wants the framework to invoke each time a message is sent to the actor. The “mirror” actor behavior: prints the received message to the console, tells the framework that it wants to terminate itself (via the call to quit()), and returns a mirrored translation of the input message to the framework for passage back to the sending actor (in this case, the “hello_world” actor).

Note that the application contains no explicit code for threads, tasks, mutexes, locks, atomics, condition variables, promises, futures, or thread-safe containers to futz with. The CAF framework hides all the underlying synchronization and message passing minutiae under the abstractions it provides so that the programmer can concentrate on the application domain functionality. Great stuff!

I modified the code slightly so that the application actors exchange user-defined message types instead of std::strings.

req-resp CAF

Here is the modified code:

struct ReqMessage {
  int32_t anInt32_t;
  double aDouble;
  string aString;
};

struct RespMessage {
  enum class AckNack{ACK, NACK};
  AckNack response;
};

void printMsg(event_based_actor* self, const ReqMessage& msg) {
  aout(self) << "anInt32_t = " << msg.anInt32_t << endl
             << "aDouble = " << msg.aDouble << endl
             << "aString = " << msg.aString << endl
             << endl;
}

void fillReqMessage(ReqMessage& msg) {
  msg.aDouble = 55.5;
  msg.anInt32_t = 22;
  msg.aString = "Bulldozer00 Rocks!";
}

behavior responder(event_based_actor* self) {
  // return the (initial) actor behavior
  return {
  // a handler for messages of type ReqMessage
  // and replies with a RespMessage
  [=](const ReqMessage& msg) -> RespMessage {
    // prints the content of ReqMessage via aout
    // (thread-safe cout wrapper)
    printMsg(self, msg);
    // terminates this actor *after* the return statement
    // ('become' a quitter, otherwise loops forever)
    self->quit();
    // reply with an "ACK"
    return RespMessage{RespMessage::AckNack::ACK};
  }};
}

void requester(event_based_actor* self, const actor& responder) {
  // create & send a ReqMessage to our buddy ...
  ReqMessage req{};
  fillReqMessage(req);
  self->sync_send(responder, req).then(
  // ... wait for a response ...
  [=](const RespMessage& resp) {
    // ... and print it
    RespMessage::AckNack payload{resp.response};
    string txt =
      (payload == RespMessage::AckNack::ACK) ? "ACK" : "NACK";
    aout(self) << txt << endl;
  });
}

int main() {
  // create a new actor that calls 'responder()'
  auto responder_actor = spawn(responder);
  // create another actor that calls 'requester(responder_actor)';
  spawn(requester, responder_actor);
  // wait until all other actors we have spawned are done
  await_all_actors_done();
  // run cleanup code before exiting main
  shutdown();
}

And here is the Eclipse console output:
req-resp CAF output

Categories: C++, C++11 Tags: , , ,