并发版爬虫,在上一篇单机版爬虫的基础上演变而来
这里只有并发引擎的代码,基本的解析器代码参考: https://www.cnblogs.com/ITPower/articles/12450374.html
一. 单节点版爬虫的问题
拉取数据的速度太慢,慢有两部分. 一部分是网络请求,根据url拉取zhenai网的数据. 另一部分是: 解析. 两部分相比较,第一部分更慢
既然慢,我们就要想办法解决慢的问题.
其实拉取数据和数据解析,可以看成是一个部分. 他们都是具体的工作者. 因此,我们把他们化为工作者模型
那么,一个工作者工作,速度很慢. 很多个工作者同时工作,速度肯定比一个工作者要快很多.
这里想到java的多线程同时工作. go里面应该就是多协程一起工作了. 将工作者抽象出来,然后创建多个协程,比如,5个,10个,20个
在单机版的爬虫里面. 会将数据延绵不断的放入队列中. 那么,在并发版的应该也有一个类似于队列的东西,来保存request. 这里定义为一个Scheduler
这是并发版的架构模型. 有一个engine,一个scheduler,多个worker. 这里的一个,多个映射到代码里表示的是协程.
sheduler是一个协程调度者,他把收到的request分发给worker. worker拿到request进行处理. 处理结果输出为Requests,Items,交给engine执行引擎,执行引擎再把新的Request放入到Scheduler调度者中,然后循环往复
这里每条线都代表一个chan. 协程和协程之间的通信,使用的是chan.
二. 并发版爬虫第一版---所有的worker公用一个输入
什么意思呢?
在单机版是一个队列,单线程执行,去队列里去url,然后get网页内容,在进行解析.
在并发版,我们让多个工作者同时工作,去队列里取request,然后同时工作,get网页内容,解析网页.
在go里多个工作者同时工作,我们可以考虑开多个协程. 同时去队列去数据. 这第一版,把队列改成了任务调度器Scheduler. 任务调度器是单独的一个goroutine.
拿到的request放到Scheduler里面,然后多个worker去Scheduler里拿request进行处理.
增加一个并发的引擎--concurrectEngine
package engine import "fmt"
// 定义了一个 type ConcurrectEngine struct { Scheduler Scheduler WorkerCount int } type Scheduler interface{ Submit(Request) ConfigureMasterWorkerChan(chan Request) } func (c *ConcurrectEngine) Run(seeds ...Request) { // 第一步: 做初始化工作 in := make(chan Request) out := make(chan ParseResult) c.Scheduler.ConfigureMasterWorkerChan(in) 创建工作者,从in输入管道中取出request进行处理,处理后放入到ParseResult的管道中 for i := 0; i<c.WorkerCount; i++ { 创建WorkerCount个工作者 c.createWorker(in,out) } 第二步: 把种子放到任务调度器中 for _,seed := range seeds { c.Scheduler.Submit(seed) } 第三步: 处理工作者线程返回的ParseResult,将items打印,requests再次添加到任务调度器中 for { pr := <- out range pr.Items { fmt.Printf(内容项: %v \n,item) } range pr.Req{ c.Scheduler.Submit(p) } } } func (c *ConcurrectEngine) createWorker(in chan Request,1)"> chan ParseResult) { go func() { 从in中取出request请求 req := <- parseResult,e := worker(req) if e != nil { return } out <- parseResult }() }
拆解分析
定义了一个并发的引擎结构,目的是,区别于之前写的单机版的Engine. type ConcurrectEngine }
调度器接口
这里是定义了一个并发引擎. 每个引擎engine都有自己的Run方法.
定义了调度器的接口. 既然是调度器,就要有一个接受请求的方法. type Scheduler { 接收请求的方法 Submit(Request) 这里比较巧妙的地方是,其实调度器的chan类型和工作者输入的chan类型是同一个. 调度器输出的request,就是工作者的输入request,这里让他们指向同一个地址. ConfigureMasterWorkerChan(chan Request) }
定义了调度器的接口.
在任务开始前,我们需要将要处理的request放入到调度器中. 所以,调度器需要有一个方法submit(request)
调度器的输出request,其实就是工作者的输入request. 这里让他们指向同一个地址,一个变就都变了. 不用再进行拷贝了
引擎执行的代码
在并发的引擎结构中,第一个方法是调度器的接口. 再具体使用的时候,传什么类型的调度器,就执行其具体的调度规则
func (c * range pr.Req{
c.Scheduler.Submit(p)
}
}
}
开始工作.
首先,我们再来思考这个模型
刚开始,种子请求过来了,我们将其放入到调度器中. 调度器是单独工作的. 只需要有一个调度器.
然后从调度器中取出一个request. 通过管道in chan request,将请求发送给worker. worker处理请求,处理完成以后,将处理结果放入out chan ParseResut类型的管道中. 通过管道进行通讯
然后引擎从管道中取出ParseResult,将其中的items部分打印出来,将新的requests添加到Scheduler调度器中.
循环反复执行
看看engine的Run代码
第一步: 做了初始化操作. 输入管道,输出管道. 以及Scheduler调度器中的管道就是in输入管道.
第二步: 将初始的种子请求,放入到任务调度器中.
第三步: 从调度器中取出一个请求,进行任务处理.
第四步: 处理返回的处理结果.
具体调度器的代码
package sechduler import aaa/crawler/zhenai/engine type SimpleScheduler { workerChan chan engine.Request } 这里需要是一个地址拷贝,而不是值拷贝. 也就是workerChan和in使用的是同一个地址 func (s *SimpleScheduler) ConfigureMasterWorkerChan( chan engine.Request) { s.workerChan = } 将请求添加到任务调度器 func (s *SimpleScheduler) Submit(r engine.Request) { go func() { s.workerChan <- r }() }
这里,调度器里的submit定义为一个go routine,原因是,如果不定义为goroutine,会出现循环等待. 然后卡死,为什么会卡死呢?
根本原因还是管道的特性. 必须要有人从管道中取走数据,且同时有人向管道中放数据,这样才可以,没有人取数据或没有人发数据,管道就会一直等待.
func (c *ConcurrectEngine) createWorker( { 从in中取出request请求 req := <- parseResult,1)"> worker(req) nil { } parseResult } }() }
看这个工作者,从in中取出request,然后处理后发送给out. 在run中在取出out的ParseResult,发送给任务调度器. 任务调度器的submit方法里,将request添加到workerChan. 也就是只有workerChan添加成功了,反过来这一些列的流程才能继续执行. 但是workerChan是否能够添加成功呢? 这又取决于,是否有woker取走workerChan中的request. 现在有10个goroutine,10个goroutine都工作起来的,都开始等待新的worker取走request. 但是有没有新的worker执行工作了,所以,就进入了循环等待. 出现卡住的现象
要解决这个问题,其实也很简单. 保证workerChan不会循环等待. 给workerChan开一个单独的goroutine. 这样,在这里就不会循环等待了. 执行到submit,就开了一个goroutine. 然后这个动作就执行完了,那么worker就有功夫去workerChan中取数据了,这样submit中的request也可以添加到workerChan了,整个链路有活起来了.
其实最终的架构变成了这样
为每一个request创建了一个goroutine. 然后等待去发送到worker. 发送完成以后,就关闭了.
执行结果:
三. 并发爬虫第二版---将scheduler和worker都变成一个队列
为什么要将Scheduler和worker变成队列呢?
在上面,我们刚开始会出现循环等待,程序卡死,为了解决这个问题,我们给scheduler的submit添加了一个单独的goroutine. 让request放入workerChan的过程中迅速执行完毕,不要等待.
但这样有个缺点,我开了一个goroutine,这个goroutine执行的怎么样,执行了么?我们是不知道的. 没有办法跟踪
对于worker来说,有10个worker. 每次都是这10个worker去workerChan里面抢request,如果我想要把某个request分发给指定的worker去执行,这样是不可以的.
这样如果我们想做负载均衡,就会很困难了. 因此. 我们将scheduler和worker都变成一个队列. 然后可以受我们的控制. 我们可以主动分发
感受go使用channel进行通信
这个demo写完了以后,最大的感受就是go使用channel进行通信. 在scheduler和request使用的是channel进行通信,scheduler和worker之间也是使用的channel进行通信.
结构
这个结构的重点,在于Scheduler,在Scheduler里面,管理了两个队列. 一个是Request队列,一个是worker队列. 过来的请求,发送给空闲的worker去工作.
也就是说,worker还是原来的worker . request还是原来的request,变化的是调度器,调度器变成了一个队列调度器
来看看具体的代码实现
type QueuedScheduler { RequestChan chan engine.Request 工作者channel,他的类型是request类型的chan. 工作者需要接受request类型的chan,然后进行工作 多个工作者之间,也是一个chan WorkerChan chan chan engine.Request } func (q *QueuedScheduler) Submit(r engine.Request) { q.RequestChan <- r } func (q *QueuedScheduler) ConfigureMasterWorkerChan(chan engine.Request) { panic(implement me) } /** * 告诉我哪一个worker已经准备好,可以工作了 */ func (q *QueuedScheduler) WorkerReady(worker chan engine.Request) { q.WorkerChan <- worker } func (q *QueuedScheduler) Run() { q.RequestChan = make(chan engine.Request) q.WorkerChan = make(chan chan engine.Request) go func() { var requestQ []engine.Request workerQ []chan engine.Request { requestActive engine.Request workerActive chan engine.Request if len(requestQ) > 0 && len(workerQ) > 0 { requestActive = requestQ[] workerActive = workerQ[] } selectcase r := <-q.RequestChan: 发送给worker requestQ = append(requestQ,r) case w := <-q.WorkerChan: 将下一个请求发送给worker workerQ = append(workerQ,w) case workerActive <- requestActive: requestQ = requestQ[1:] workerQ = workerQ[:] } } }() }
这个结构,充分体现了,使用channel进行通信
1. 外部有一个请求过来了,那么调动submit将请求发送到request chan
2. 外部有一个worker已经准备好可以工作了,调用workerReady,将准备好的worker放入到workerChan
3. 接下来,如果有请求来,我们就想请求添加到request队列. 如果有worker准备好了,从workerChan中取出放入到worker队列
4. 当request队列中有请求过来,且worker队列中有等待的worker的时候,就把这个请求发送给这个worker,让worker开始工作,处理request
engine做简单修改
" WorkChan chan {} } ConfigureMasterWorkerChan(chan Request) WorkerReady(chan Request) Run() } func (c * 第一步: 做初始化工作 out := make(chan ParseResult) c.Scheduler.Run() 创建WorkerCount个工作者 c.createWorker(c.Scheduler,requests再次添加到任务调度器中 itemCount := 0 内容项: %d,%v \n } range pr.Req { c.Scheduler.Submit(p) } } } func (c *ConcurrectEngine) createWorker(sche Scheduler,1)"> chan ParseResult) { in := make(chan Request) go func() { 告诉调度器,我已经准备好开始工作了 sche.WorkerReady(in) parseResult } }() }
这样功能是完成了,but,来看看simple和queued这两个Scheduler调度器. 先来看看他们的接口
ConfigureMasterWorkerChan(chan Request)
WorkerReady(chan Request)
Run()
}
simple 实现了接口的前两个方法,而queued没有实现第二个方法. 下面我们来统一一下接口的方法. 先看看第二个方法
第二个方法是干什么用的?
ConfigureMasterWorkerChan(chan Request)
在simple中,我们所有的worker都有一个共同的输入request. 通过 这个方法ConfigureMasterWorkerChan(chan Request),我们将worker的输入和simple中的workerChan关联了
在Queued中,我们的每一个worker都有一个自己的输入 request,和第一个的区别是第一个是所有worker公用一个chan request. 而第二种是每个worker自己一个request
我们对这件事进行一个抽象,在engine中,调用哪一个Scheduler,他是不知道的. 那么到底worker是公用一个chan request,还是每个worker有一个chan reqeust呢,他也是不知道的.
那么谁知道呢? 具体的Scheduler调度器知道. 也就是,Simple Scheduler知道,Queued Scheduler也知道.
因此,我们写一个方法,来问各种类型的调度器要chan request.
上面是把获得的item打印了,下面我们来设计入库的模块
分析: 其实,我们要做是什么呢? 将打印变成入库操作. but,有一个问题. 我们在fetch阶段,能直接入库么?和数据库交互的速度,肯定会影响fetch的速度.
其实这里有两种解决方案
1. 放到内存list中. 批量保存
2. 使用chan,间数据发送到管道里面,在单开一个goroutine,去取数据,将取出来的数据save到数据库.
因为,我们是在学习go,采用第二种方式
三. docker和ElasticSearh
数据库---我们使用elasticSearch,搭建在docker容器之上
因为都是初次接触,一步一步. 先搭建起docker的环境,然后在docker上安装elasticSearch.
Docker 的主要用途,目前有三大类
Docker 的主要用途,目前有三大类。
(1)提供一次性的环境。比如,本地测试他人的软件、持续集成的时候提供单元测试和构建的环境。
(2)提供弹性的云服务。因为 Docker 容器可以随开随关,很适合动态扩容和缩容。
(3)组建微服务架构。通过多个容器,一台机器可以跑多个服务,因此在本机就可以模拟出微服务架构。
而我们这里应该是使用docker的第三类用途. 后面我们要做分布式版的爬虫,可以使用docker来模拟多台服务器间调用.
docker的安装和使用
第一步: 官网docker: https://docs.docker.com/
我是mac: 下载地址: https://hub.docker.com/editions/community/docker-ce-desktop-mac/
第三步: 基本命令
查看docker有哪些命令
docker
查看docker的版本信息
docker version
查看docker的基本信息
docker info
比如docker有两部分组成,sercer和client. 我们启动的docker desktop就是一个server. 在控制台使用docker命令就是一个client
Server中的Registry是下载镜像的地址.
elasticSearch是什么
参考这篇文章,说的很明白:https://blog.csdn.net/paicmis/article/details/82535018
Lucene是单机的模式,如果你的数据量超过了一台物理机的容量,你需要扩容,将数据拆分成2份放在不同的集群,这个就是典型的分布式计算了。需要拷贝容错,机器宕机,数据一致性等复杂的场景,这个实现就比较复杂了。
ES解决了这些问题
1、自动维护数据的分布到多个节点的索引的建立,还有搜索请求分布到多个节点的执行
2、自动维护数据的冗余副本,保证了一旦机器宕机,不会丢失数据
3、封装了更多高级的功能,例如聚合分析的功能,基于地理位置的搜索
ElasticSearch的功能
分布式的搜索引擎和数据分析引擎
搜索:网站的站内搜索,IT系统的检索
数据分析:电商网站,统计销售排名前10的商家
全文检索,结构化检索,数据分析
全文检索:我想搜索商品名称包含某个关键字的商品
结构化检索:我想搜索商品分类为日化用品的商品都有哪些
数据分析:我们分析每一个商品分类下有多少个商品
对海量数据进行近实时的处理
分布式:ES自动可以将海量数据分散到多台服务器上去存储和检索
海联数据的处理:分布式以后,就可以采用大量的服务器去存储和检索数据,自然而然就可以实现海量数据的处理了
近实时:检索数据要花费1小时(这就不要近实时,离线批处理,batch-processing);在秒级别对数据进行搜索和分析
ElasticSearch的应用场景
维基百科
The Guardian(国外新闻网站)
Stack Overflow(国外的程序异常讨论论坛)
GitHub(开源代码管理)
电商网站
日志数据分析
商品价格监控网站
BI系统
站内搜索
ElasticSearch的特点
可以作为一个大型分布式集群(数百台服务器)技术,处理PB级数据,服务大公司;也可以运行在单机上,服务小公司
Elasticsearch不是什么新技术,主要是将全文检索、数据分析以及分布式技术,合并在了一起
对用户而言,是开箱即用的,非常简单,作为中小型的应用,直接3分钟部署一下ES
Elasticsearch作为传统数据库的一个补充,比如全文检索,同义词处理,相关度排名,复杂数据分析,海量数据的近实时处理;
先大概了解一下elasticSearch可以干什么,接下来我们在使用
ElasticSearch安装和使用
第一步: 下载并运行elasticSearch
下载并运行elasticSearch docker run -d -p9200:9200 daocloud.io/library/elasticsearch 这里的-p 后面跟的是端口号. 第一个9200表示映射到物理机的端口是9200 第二个9200表示elasticSearch在虚拟机中运行的端口是9200
第二步: 查询运行情况
表示当前已经运行起来的elasticSearch
第三步: 在浏览器输入localhost:9200
看到如上信息,表示已经启动成功了
第四步: 简单了解如何使用elasticSearch
打开postman
我们可以直接运行localhost:9200,GET请求,获取到当前已经连接的elasticSearch数据库
接下来我们添加一条记录
在elasticSearch中Index/Type/id,对应于数据库的是index--->database,Type---->table,id--->记录的id
比如:
添加一条记录为1的数据
请求方式: POST 请求的url: localhost:9200/pachong/user/1
执行后的结果
{ _index": pachong_typeuser_id1_versionresultcreated_shards: { total2successfulFailed },1)">": true }
前三个字段分别是: 库名,表明,记录的id. result表示当前是created.
来查询刚刚保存的记录
请求方式: GET 请求url: localhost:1
found_sourcenamelxlage12 } }
如果想查询所有的记录呢?
9200/pachong/user/_search 输入参数: 空
返回结果:
9200/pachong/user/_search?q=ykk
查询结果
40.25811607 } } ] } }
四. 将数据保存到elasticSearch
通过上面的demo,我们知道elasticSearch使用的是restful的风格增删改查数据的. 那么我们可以直接使用http.Get,http.Post就可以实现
处理这种方式,市面上还有对应的elasticSearch客户端,我们使用客户端会更方便
百度 elasticSearch client--->找到对应的官网, https://www.elastic.co/guide/en/elasticsearch/client/index.html
点击最后一个社区版客户端. 进去之后查看go的elasticSearch client
这里我们使用第二类Google go,点击进去是插件的源码,看后面的README.
我们使用的elasticSearch版本是5.12,所以下载对应的5版本的client.
在go中执行下载client.
下载完成就可以使用了,使用文档: https://godoc.org/gopkg.in/olivere/elastic.v5
func Save(item interface{}) (string 第一步: 创建一个elasticSearch client 文档: https://godoc.org/gopkg.in/olivere/elastic.v5 client,err := elastic.NewClient(elastic.SetURL(http://localhost:9200), sniff: 是用来维护客户端集群的状态的. 但是我们的集群不跑在本机上,而是跑在docker上. docker只有一个内网,内网我们看不见.所以没有办法维护状态,设置为false elastic.SetSniff()) if err != nil { return "" 将item数据保存到的elasticSearch中 elastic search 保存数据使用的Index. elasticSearch 数据中的三部分分别是 /index/type/id . 对应数据库的/database/table/id response,err := client.Index(). Index(dataing_profile). Type(zhenai). BodyJson(item). Do(context.Background()) +v 表示打印结构体带上结构体的名称 response.Id,nil }
第一步: 建立elasticSearch 连接
第二步: 保存数据. elasticSearch保存数据的方法是Index
完毕,是不是很简单....
接下来写一个单元测试,测试我们是否添加数据成功了
func TestSave(t *testing.T) { tests := [] { name item model.Profile }{ {冰靓晴雪已婚23白羊座15449北京10000-20000销售总监大学本科四川理财有房有车不要小孩 range tests { t.Run(tt.name,func(t *testing.T) { id,1)"> Save(tt.item) nil { panic(e) } client,e := elastic.NewClient(elastic.SetSniff()) nil { panic(e) } resp,1)"> client.Get(). Index(). Type(). Id(id). Do(context.Background()) nil { panic(e) } fmt.Printf(%s",*resp.Source) user := model.Profile{} e = json.Unmarshal(*resp.Source,&user) nil { panic(e) } if user != tt.item { t.Errorf(error) } }) } }
第一步: 写测试的cases
第三步: 取出save中保存的内容
第四步: 和初始值对比,是否一致