A LinkedBlockingQueue Transport for Conduit (or one way of sucking async data into a distributed processing system)
This post discusses how to connect a LinkedBlockingQueue to a Conduit composition. It is useful for passing event type input into a stream processing system, specifically one that can be easily distributed.
Its power is that it is based on arrows, which are an abstraction over function composition (I write this, but all I really understand is that arrows are like monads, but even even scarier). This abstraction enables each function in the composition to be connected to the others in an arbitrary way, which allows all sorts of cleverness, primarily seamless distribution of the composition.
By way of brief explanation, this is what function composition looks like 'natively'.
Values flow from function to function.
This is what it looks like in Conduit (or with any arrow, I guess).
Conduit provides plumbing, in that each, otherwise normal function, is wrapped with code that transports one function's output to the input of the next. Here's the cool part: the transport can be completely arbitrary and easily swapped out. So the diamond above could represent transport over a rabbitmq queue, an IRC channel, whatever. So you can use a declarative style to define your application in one place, and yet have f1, f3 and f4 running on one box and f2 on another connected by rabbitmq (say). Now that is da' kizzle kazzle, as you can write your application and then easily distribute it as it grows, and the example I linked above shows exactly how to do that.
My problem was getting stuff into the Conduit system. I have a web app that generates events that need to be processed: event driven input. I needed to convert input to a more sanitary thing, and so lifted a page directly from the CGrand Master himself. In that post he shows how to create a LinkedBlockingQueue (henceforth LBQ to save pixels), hand off a function, f, which, when called with an argument, will put the argument on the queue, and a seq, s, over which you can map, reduce, filter etc. This is really one of my favourite snippets as it completely changes the 'polarity' of the data source from an async event type source to a seq.
So what I needed was to write an LBQ transport for Conduit and I would be able to create a queue, hand f to the web app as a callback and connect s Conduit. This would allow me to have my extant, potentially distributed, system consume events from a web app with no fuss. So I wrote one.
Its easy enough to use. For the sake of explanation lets create the simplest processing step we can: identity.
Firstly, lets create the output, just an vector atom that will hold the results of all the processing, and a function that conj's an x into that vector.
(def effect (atom )) (defn side-effect [sink x] (swap! sink conj x))
This is our absolutely bog standard Clojure function. To connect it to the Conduit plumbing is absolutely trivial. We partial the first argument of side-effect so that its bound to the effect vector, resulting in a function only of x. Then we use a-arr, which is a takes a normal function and returns a proc, that is, it connects the function to Conduit. a-arr is the little green circle around the function in the second picture above.
(def identity-proc (a-arr (partial side-effect effect)))
Next, create an LBQ which will be the head of the conduit.
(def my-queue (create-queue))
Finally, declare that identity-proc has its input data transported over the LBQ, my-queue, with id "gogo".
(def my-tproc (a-lbq my-queue "gogo" identity-proc))
So we have declared a proc and told Conduit to route the contents of my-queue to it. Now we have to tell Conduit to start watching the queue and routing its data.
(def my-future (future (lbq-run my-tproc my-queue)))
The lbq-run is just a blocking read over the effectively infinite seq over the LBQ, and we put it in a future so that we can continue playing in the REPL. OK, all the plumbing is connected. Lets put something on the queue and, using the gogo id, let Conduit know to route it to our proc.
(enqueue my-queue "gogo" 13)
And now read what's in the output, the effect atom:
@effect ;; Should be 
Presto. Kill the future for hygiene:
(.cancel my-future true)
So that's how I envisaged it - purely as a way to get stuff into Conduit locally. Start everything up as above, pass the enqueue function as the callback to the event based data source and enjoy the show. Of course you can put the LBQ proc in the middle of a Conduit composition too:
(def my-tproc (a-lbq my-queue "gogo" (a-arr (partial side-effect effect)))) (def my-future (future (lbq-run my-tproc my-queue))) (def ow (a-comp (a-arr identity) my-tproc)) (conduit-map ow [1 2 3]) (.cancel my-future true) @effect ;; Should be ;; [13 1 2 3]
I'm not sure why you might want to do so, perhaps buffering, but you can.
Conduit is powerful abstraction over function composition with a sweet spot for distributing that composition. It is stream based while many data sources are event based and this provides a neat way to switch the polarity on the input to couple into Conduit. There is much to Conduit, and the pages I linked to above will give you the full story.