我正在编写一个ETL流程,从产品数据库中读取事件级数据,转换/汇总并写入分析数据仓库.我使用clojure的core.async库将这些进程分离成并发执行的组件.这是我现在的代码的主要部分
(ns data-staging.main (:require [clojure.core.async :as async]) (:use [clojure.core.match :only (match)] [data-staging.map-vecs] [data-staging.tables]) (:gen-class)) (def submissions (make-table "Submission" "Valid")) (def photos (make-table "Photo")) (def videos (make-table "Video")) (def votes (make-table "Votes")) ;; define channels used for sequential data processing (def chan-in (async/chan 100)) (def chan-out (async/chan 100)) (defn write-thread [table] "infinitely loops between reading subsequent 10000 rows from table and ouputting a vector of the rows(maps) into 'chan-in'" (while true (let [next-rows (get-rows table)] (async/>!! chan-in next-rows) (set-max table (:max-id (last next-rows)))))) (defn aggregator [] "takes output from 'chan-in' and aggregates it by coupon_id,date. then adds / drops any fields that are needed / not needed and inputs into 'chan-out'" (while true (->> (async/<!! chan-in) aggregate (async/>!! chan-out)))) (defn read-thread [] "reads data from chan out and interts into Analytics DB" (while true (upsert (async/<!! chan-out)))) (defn -main [] (async/thread (write-thread submissions)) (async/thread (write-thread photos)) (async/thread (write-thread videos)) (async/thread-call aggregator) (async/thread-call read-thread))
你可以看到,我将每个os组件放在自己的线程上并使用阻止>!呼叫频道.感觉就像使用非阻塞>!对于这种用例,调用以及执行例程可能更好,特别是对于大部分时间执行i / o并等待产品数据库中的新行的数据库读取.是这样的,如果是这样,实施它的最好办法是什么?我对两种方法之间的所有权衡有一点不清楚,以及如何有效地使用go例程.还有关于如何提高整体架构的任何其他建议将非常感谢!
解决方法
就个人而言,我认为你在这里使用线程可能是正确的. go-blocks的魔法非阻塞性质来自“停车”,这是core.async的状态机使用的特殊类型的伪阻塞,但是由于您的数据库调用真正的阻塞而不是将状态机置于停放状态你只是从core.async线程池阻止一些线程.这取决于你的同步调用需要多长时间,所以这是基准可以提供信息的东西,但我强烈怀疑线程是正确的方法.
一个例外是您的聚合器功能.它看起来像我喜欢它可以被折叠成chan-out的定义,as(def chan-out(map< aggregate chan-in)). 对于线程与线程的一般概述,Martin Trojer写了一个很好的examination of the two approaches,哪一个在哪种情况下更快. Cliff的Notes版本是,go-blocks适用于使用与core.async一起使用的已经异步的库,而线程有助于使异步进程不在同步部分中.例如,如果您的数据库有一个基于回调的API,那么go-block将是一个明确的胜利.但是由于它是同步的,它们并不适合.