The Tale of Batman and Mr. Whiskers

His Father's Parentheses

Bruce still remembers the day Alfred gave him his father's parentheses as if it was yesterday . He remembers feeling lost, and uttering:

Alfred, all I see are a bunch of parentheses, I can't fathom the elegance or utility of them.

Alfred replied:

Master Bruce, you are not ready yet. Go get trained in the art of lisp and then you'll realize that a gun is a liar's weapon.

Bruce is ready now. He has emerged from the depths of recursion to alter the reality using only his sheer will and determination, wrapped in his father's parentheses. A gun is a liar's weapon and he ain't no liar, he is Batman.

Mr. Whiskers

Mr. Whiskers was once a kitten, so quick that it would seem that he was at three places at once . He was trained in the art of lisp from a young age by Batman. Agility was his biggest strength and arrogance was his biggest weakness. He would make fun of scrum masters, yelling:

You wouldn't know agile even if it came and hit you in the face. Look at me, this is agile, I am agile.

Batman tried to teach him that the humble improve but he wouldn't listen. He also paid no heed to the lessons on concurrency and parallelism, he was fast and he was furious and thus wasn't that curious.

Mr. Whiskers is no longer a kitten, he has lost his agility and is now just a bitter cynical cat . Today at the bottom of his glass of whiskey, Mr. Whiskers found rock bottom and decided to change his life for the better.

He pleaded Batman to teach him concurrency and parallelism, at first Batman hesitated but in Mr. Whiskers' eyes Batman saw desperation, desperation for finding structure among ruins, structure only a bunch of parentheses could provide and so he agreed.

Let the Lessons Begin

Batman: Alright Mr. Whiskers, time to meet your old friend.
Mr. Whiskers: Clojure? Clojure! My paws are tingling with excitement!!
Batman: Yeah, the concurrency model we are going to talk about is called Communicating Sequential Processes (CSP). The core.async library brings CSP support to Clojure, so let's use that:

(ns csp.core
  (:require-macros [cljs.core.async.macros :refer [go]])
  (:require [cljs.core.async :refer [chan <! >! timeout close! dropping-buffer sliding-buffer alts!]]))

Some basics first:

A concurrent program has multiple logical threads of control. These threads may or may not run in parallel. Concurrency is about dealing with lots of things at once.

A parallel program potentially runs more quickly than a sequential program by executing different parts of the computation simultaneously (in parallel). It may or may not have more than one logical thread of control. Parallelism is about doing lots of things at once.

Mr. Whiskers: I didn't get that Batman, be kind, rewind.
Batman: Remember that day when I got attacked by Penguin and Joker at once, that day I had to multitask, at any one instant I was beating up only one of them but I still had to deal with both of them concurrently. I would beat up Joker, then while he was down, I would turn around and beat up Penguin, so on and so forth. That was concurrent, but it wasn't parallel as I was alone in beating them up.

Concurrent but not Parallel

Once I was with Robin and was attacked by Penguin, Joker and Riddler. I dealt with Penguin and Joker concurrently, and parallely to me, Robin took care of Riddler. That was both concurrent and parallel.

Concurrent and Parallel

Once Robin and I had to beat up Bane together. That was parallel but not concurrent.

Parallel but not Concurrent

Mr. Whiskers: Alright Batman, enough with the bragging, I get it now.
Batman: Alright, so let's talk about CSP now. A program using the communicating sequential processes (CSP) model consists of independent, concurrently executing entities that communicate by sending each other messages.
Mr. Whiskers: Like how we are communicating right now?
Batman: The concurrency model is based on observation of the real world, so yeah kinda like how we are communicating right now. The two core concepts of core.async are channels and go blocks. Let's first talk about go blocks:

go block is how you create a communicating sequential process. You put the task you need to run concurrently in a go block.

(println "before")
  (println "current")

  (println "after")

Code flow above is completely sequential.

(println "before")
  ;; wait for 1000ms 
  (<! (timeout 1000))
  (println "In go block"))
  (println "after")
In go block

Code flow above isn't completely sequential. The code in go block executes concurrently in a separate core.async process, hence after is printed before In go block.

Mr. Whiskers: I didn't get that weird timeout thingy.
Batman: Yeah we'll discuss timeout later, all that line does is cause the core.async process to kinda sleep for 1000ms. We'll talk more about go blocks later, let's talk about channels now:

channel is a thread-safe queue—any task with a reference to a channel can add messages to one end(put), and any task with a reference to it can remove messages from the other(take).

By default, channels are synchronous (or unbuffered)— putting on a channel blocks until something takes from it:

  ;; creating synchronous (or unbuffered) channel
    (def no-receiver (chan))
  ;; putting on a channel using >!
  ;; >! can only be used inside a go block
  (>! no-receiver "message in a bottle")
  (println "this will never be echoed as my message will never be received"))

Similarly, taking from a synchronous channel blocks until something puts on it:

  (def no-sender (chan))
  ;; taking from a channel using <!
  ;; <! can only be used inside a go block
  (<! no-sender)
  (println "this will never be echoed as no message will ever be sent"))

An unbuffered channel with both a sender (batman) and a receiver (mr-whiskers):

  (def channel (chan))
    (def mr-whiskers
    (println "I am Mr. Whiskers :)")
    (println "Let me just say that" (<! channel))
    (println "OMG, Batman is putting words in my mouth!!")))
  (def batman 
    (>! channel "you suck!")))

We can create a buffered channel by passing a buffer size to chan:

  ;; Creates a channel with a buffer large enough to contain five messages
    (def buffered-channel (chan 5))

Putting on a buffered channel wouldn't block until buffer is full:

  (doseq [num (range 6)]
    (>! buffered-channel num)
    (println "I was able to put" num "on buffered-channel")))

Taking from a buffered channel wouldn't block until buffer is empty:

  (while true
    (println "I was able to take" (<! buffered-channel) "from buffered-channel")))

Mr. Whiskers: The output above makes my head hurt!
Batman: That's the hangover talking, nothing perplexing about the output. The go block putting on the buffered-channel was blocked on putting 5 as the buffer was full, when the go block above took from the buffered-channel the buffer was no longer full and put 5 succeeded. The above go block then emptied the buffer and got blocked.
Mr. Whiskers: Batman, I am claustrophobic, any way I can avoid getting blocked?
Batman: Taking from an empty buffer is always going to block but you do have options for not getting blocked on a put operation, we can either create a channel with a dropping buffer:

;; 5 messages can be stored in the buffer
  ;; subsequent messages will be dropped but put operation won't block
  (def dropping-channel (chan (dropping-buffer 5)))

  (doseq [num (range 10)]
    (>! dropping-channel num))
  (while true
    (println "I was able to take" (<! dropping-channel) "from dropping-channel")))

Or a channel with a sliding buffer:

;; 5 messages can be stored in the buffer
  ;; new messages will be stored and older messages will be dropped but put operation won't block
  (def sliding-channel (chan (sliding-buffer 5)))

  (doseq [num (range 11)]
    (>! sliding-channel num))
  (while true
    (println "I was able to take" (<! sliding-channel) "from sliding-channel")))

Mr. Whiskers: Enough of channels Batman, let's close this discussion.
Batman: From closing this discussion I remembered that we still haven't discussed about closing a channel.
Mr. Whiskers: Arghhhhh
Batman: Closing a channel:

(def to-be-closed (chan))

;; you can close a channel using close!
;; a channel once closed cannot be reopened
(close! to-be-closed)

Putting on a closed channel returns false:

  (println (>! to-be-closed "test")))

Taking from a closed channel returns nil:

  (println (<! to-be-closed)))

Mr. Whiskers: Enough with the channels Batman! Let me divert the discussion by asking a question. See this is a do block which returns 0:

(def do-return-value 
   (println "I am going to return 0")

(println do-return-value)

What does a go block return?
Batman: I am glad you asked, it returns a channel.
Mr. Whiskers: You must be kidding me, I guess we are still talking about channels.
Batman: The go block returns a channel immediately and the return value is put on that channel once it is available:

(def returned-channel

  (println (<! returned-channel)))

While we are on the topic of things that return channels:

;; Returns a channel which closes in 1000ms
    (time (<! (timeout 1000))))
"Elapsed time: 1001.320000 msecs"

Mr. Whiskers: Oh I get it now, taking from that channel basically blocks us until it is closed i.e. for 1000ms.
Batman: Alright Mr. Whiskers, the last thing I need to talk about is how to deal with multiple channels at once. Let's say you have three channels:

;; Red is put on this channel every 1.5 seconds
  (def red-channel (chan))
  (while true
    (<! (timeout 1500))
    (>! red-channel "Red")))
;; Green is put on this channel every four seconds
  (def green-channel (chan))
  (while true
    (<! (timeout 4000))
    (>! green-channel "Green")))
;; Blue is put on this channel every five seconds
  (def blue-channel (chan))
  (while true
    (<! (timeout 5000))
    (>! blue-channel "Blue")))

What if we want to multiplex the three channels into one:

Multiplexing three channels into one

The basic idea is that when taking from the multiplexed channel, we are going to get blocked only if there is radio silence on all the three channels. We use alts! for multiplexing in core.async:

(go (dotimes [_ 5]
      ;; The alts! function must be called inside a go block and takes a vector of
      ;; channels as its argument. This is like saying, “Try to do a blocking take on
      ;; each of these channels simultaneously. As soon as a take succeeds, return
      ;; a vector whose first element is the value taken and whose second element
      ;; is the winning channel. If there is a tie between two or more channels, choose one
      ;; randomly if :priority option is false or use the order of the channels in the vector
      ;; to break the tie if :priority option is true.”
      (let [[value channel] (alts! [green-channel blue-channel red-channel]
                                   :priority true)]
        (println value))))

Mr. Whiskers: This was fun Batman :)
Batman: This was just an introduction to core.async, you should watch the course on core.async by Eric Normand and read the relevant sections of Clojure for the Brave and True and Seven Concurrency Models in Seven Weeks to further your understanding. Now bugger off you insufferable ball of fur.
Mr. Whiskers: