Core.async: Take all values from collection of promise-chans

Consider a dataset like this:

(def data [{:url "http://www.url1.com" :type :a}
           {:url "http://www.url2.com" :type :a}
           {:url "http://www.url3.com" :type :a}
           {:url "http://www.url4.com" :type :b}])

The contents of those URL's should be requested in parallel. Depending on the item's :type value those contents should be parsed by corresponding functions. The parsing functions return collections, which should be concatenated, once all the responses have arrived.

So let's assume that there are functions parse-a and parse-b, which both return a collection of strings when they are passed a string containing HTML content.

It looks like core.async could be a good tool for this. One could either have separate channels for each item ore one single channel. I'm not sure which way would be preferable here. With several channels one could use transducers for the postprocessing/parsing. There is also a special promise-chan which might be proper here.

Here is a code-sketch, I'm using a callback based HTTP kit function. Unfortunately, I could not find a generic solution inside the go block.

(defn f [data] 
  (let [chans (map (fn [{:keys [url type]}] 
                     (let [c (promise-chan (map ({:a parse-a :b parse-b} type)))] 
                       (http/get url {} #(put! c %))
                       c))
                   data)
        result-c (promise-chan)] 
    (go (put! result-c (concat (<! (nth chans 0))
                               (<! (nth chans 1))
                               (<! (nth chans 2))
                               (<! (nth chans 3)))))
    result-c))

The result can be read like so:

(go (prn (<! (f data))))

I'd say that promise-chan does more harm than good here. The problem is that most of core.async API (a/merge, a/reduce etc.) relies on fact that channels will close at some point, promise-chans in turn never close.

So, if sticking with core.async is crucial for you, the better solution will be not to use promise-chan, but ordinary channel instead, which will be closed after first put!:

...
(let [c (chan 1 (map ({:a parse-a :b parse-b} type)))]
  (http/get url {} #(do (put! c %) (close! c)))
  c)
...

At this point, you're working with closed channels and things become a bit simpler. To collect all values you could do something like this:

;; (go (put! result-c (concat (<! (nth chans 0))
;;                            (<! (nth chans 1))
;;                            (<! (nth chans 2))
;;                            (<! (nth chans 3)))))
;; instead of above, now you can do this:
(->> chans
     async/merge
     (async/reduce into []))

UPD (below are my personal opinions):

Seems, that using core.async channels as promises (either in form of promise-chan or channel that closes after single put!) is not the best approach. When things grow, it turns out that core.async API overall is (you may have noticed that) not that pleasant as it could be. Also there are several unsupported constructs, that may force you to write less idiomatic code than it could be. In addition, there is no built-in error handling (if error occurs within go-block, go-block will silently return nil) and to address this you'll need to come up with something of your own (reinvent the wheel). Therefore, if you need promises, I'd recommend to use specific library for that, for example manifold or promesa.

promise-chan - clojure.core.async, A promise channel can take exactly one value that consumers will receive. Once full All rights reserved. No examples for clojure.core.async/promise-chan. core.async doesn’t provide a built-in fn that both takes the value from each channel and returns those values in the order the channels were given. But writing a fn is simple enough: (defn into [coll & chans] (go-loop [coll coll chans chans] (if (seq chans) (recur (conj coll (<! (first chans))) (rest chans)) coll)))

Use pipeline-async in async.core to launch asynchronous operations like http/get concurrently while delivering the result in the same order as the input:

(let [result (chan)] 
  (pipeline-async 
    20 result
    (fn [{:keys [url type]} ch]
      (let [parse ({:a parse-a :b parse-b} type)
            callback #(put! ch (parse %)(partial close! ch))]  
        (http/get url {} callback)))
    (to-chan data))
  result)

Promises with core.async, Is there a core.async equivalent to Promise.all? core.async doesn't provide a built-in fn that both takes the value from each channel and returns those (defn into [coll & chans] (go-loop [coll coll chans chans] (if (seq chans) (recur (conj coll ( <! Again, we return a channel with the collection as a value. A promise channel can take exactly one value that consumers will receive. Once full, puts complete but val is dropped (no transfer). Consumers will block until either a value is placed in the channel or the channel is closed, then return the value (or nil) forever. See chan for the semantics of xform and ex-handler. Source

I wanted this functionality as well because I really like core.async but I also wanted to use it in certain places like traditional JavaScript promises. I came up with a solution using macros. In the code below, <? is the same thing as <! but it throws if there's an error. It behaves like Promise.all() in that it returns a vector of all the returned values from the channels if they all are successful; otherwise it will return the first error (since <? will cause it to throw that value).

(defmacro <<? [chans]
  `(let [res# (atom [])]
     (doseq [c# ~chans]
       (swap! res# conj (serverless.core.async/<? c#)))
     @res#))

If you'd like to see the full context of the function it's located on GitHub. It's heavily inspired from David Nolen's blog post.

clojure.core.async, Use the clojure.core.async.go-checking flag to detect invalid use (see Takes a collection of source channels and returns a channel which contains all values Usage: (promise-chan) (promise-chan xform) (promise-chan xform ex-handler) Takes a function and a collection of source channels, and returns a channel which contains the values produced by applying f to the set of first items taken from each source channel, followed by applying f to the set of second items from each channel, until any one of the channels is closed, at which point the output channel will be closed.

if anyone is still looking at this, adding on to the answer by @OlegTheCat:

You can use a separate channel for errors.

(:require [cljs.core.async :as async]
            [cljs-http.client :as http])
(:require-macros [cljs.core.async.macros :refer [go]])

(go (as-> [(http/post <url1> <params1>)
           (http/post <url2> <params2>)
           ...]
          chans
          (async/merge chans (count chans))
          (async/reduce conj [] chans)
          (async/<! chans)
          (<callback> chans)))

wilkerlucio/wsscode-async: Core.async utilities package., Core.async provides a promise-chan , which is a channel that has a this result gets passed to multiple readers, all of them will get the realized value or error. Clojure - apply to all but nth element. clojure. map-indexed is a decent choice. call a function you pass with the value of one of the items form your input and the index where it was found (index first). that function can choose to produce a new value or return the existing one. user> (map-indexed (fn [i v] (if-not

clojure/core.async: Facilities for async programming and , In general, all changes endeavor to be non-breaking (by moving to new names rather than by Pull requests and GitHub issues are not accepted; please use the core.async JIRA project to ASYNC-198 (CLJ) Fix exception rewriting in go can replace return value ASYNC-159 - promise-chan in ClojureScript is broken . You don’t need Promise.all() because it’s dead. I have demonstrated how you can run promises in parallel easily and effectively without using Promise.all(). So that means it’s completely dead, right? Well, some might argue that a perfect use case is when you have an array of values and you need to map() it to an array of promises. For

cljs.core.async 101. There comes a time in all good programs…, Like vectors, channels are collections that take values from one end Write asynchronous code as if it was synchronous; They're faster than Promises . A core.async channel ( chan ) can take from 0 to 3 arguments. To start� The fundamental operations on channels are putting and taking values. Both of those operations potentially block, but the nature of the blocking depends on the nature of the thread of control in which the operation is performed. core.async supports two kinds of threads of control - ordinary threads and IOC (inversion of control) 'threads'.

Intro to Core Async | Lesson 25, While it is entirely possible to use JavaScript's Promise API from ClojureScript, we By using communication over channels, multiple processes can synchronize which creates a new lightweight process, chan , which creates a channel, and Before diving in with core.async , let's take a quick step back to talk about CSP. Now let's take a look at how to do the same thing but return a collection of values when our Task finishes. As you can see the code is very similar to the example that does not return a value.

Comments
  • What's your question, specifically?
  • Actually in the meanwhile I came to more or less the solution you are proposing. This worked smoothly until I decided to add error handling to it. I might use plain promises instead.