The "kill pattern" in Clojure's core.async

Posted on January 3, 2016 by Josh

You’ll find, at times, that you have a long-running channel which you need to respond to a kill signal. I first needed this to perform graceful shutdowns with a Component workflow.

A program’s worth a thousand words, so I’ll just let you play with the following code. I commented it pretty thoroughly.

(ns clj-scratch.core-async
  (:require [clojure.core.async :refer [<! <!!
                                        >! >!!
                                        alt! alt!! alts! alts!!
                                        chan go timeout close!]
             :as a]))

;; setup channels ----------

(def a> (chan 3)) ; notice, when `a>` has more than 3 items on it,
                  ; future puts don't fail, they just pause and wait
                  ; for room on the channel
                  
(def kill> (chan)) ; when `kill>` recieves a signal, the following go
                   ; loop recognizes it and ceases pulling

;; take from channels ----------

(go (loop [coll (alts! [kill> a>] :priority true)]
      (let [[x ch] coll]
        (cond
          (= ch kill>) (println "kill signal recieved")
          (nil? x)     (println "a> closed")
          :else        (do (println "val: " x)
                           (<! (timeout 100)) ; slow down takes
                           (recur (alts! [kill> a>] :priority true)))))))
;; put on `a>` ----------

(go (loop []
      (when (>! a>  (rand 100)) ; this `when` pattern allows failed
                                ; puts to cause the loop to end
        (<! (timeout 500))              ; slow down puts
        (recur)))
    (println "can't put on closed channel"))

;; killing and closing `a>`----------

(comment
  ;;;; try these ways of killing the process

  ;; send the kill signal. 
  ;; Try putting on a few, they'll still be sitting on
  ;; the `kill>` channel if you try to start another take loop
  (go (>! kill> :this-can-be-any-value))

  ;; note, `a>` is still alive, and can be put on, and taken from. Try
  ;; the following line, or go start another taking go loop
  (go (println "new taker: " (<! a>)))

  ;; or just `close!` it for good
  (close! a>))