Go语言-爬虫框架colly基础

摘要

文章内容是本人基于官方文档的学习过程中记录整理

官方文档

Colly是一款基于Go语言实现的轻量级爬虫框架,也几乎是目前Go语言中唯一比较知名的爬虫框架了,结合Go语言高并发的特点,Colly天生具备非常优秀的高并发能力,同时Colly的设计非常优雅,开发起来也非常快速,同时Colly还具备分布式和强大的扩展能力。

特性

  1. - 简单的API
  2. - 快速(单核上> 1k请求/秒)
  3. - 控制请求延迟和每个域名的最大并发数
  4. - 自动cookie和session处理
  5. - 同步/异步/并行抓取
  6. - 高速缓存
  7. - 对非unicode响应自动编码
  8. - Robots.txt支持
  9. - 分布式抓取
  10. - 支持通过环境变量配置
  11. - 随意扩展

安装

1
go get -u github.com/gocolly/colly/v2...

使用go mod管理

1
2
3
4
5
6
# 设置GO111MODULE开启go mod
go env -w GO111MODULE=on
# 初始化
go mod init
# 拉取依赖,需要在项目中先import依赖包
go mod tidy

快速上手

导入colly依赖包

1
import "github.com/gocolly/colly/v2"

创建Collector对象

Colly要做的第一件事就是创建一个 Collector对象。通过 Collector管理网络通信并负责在 Collector job 运行时执行附加的回调函数。

1
c := colly.NewCollector()

添加回调函数

我们可以把不同类型的回调函数附加到收集器上来监听不同的采集任务,然后通过回调对事件进行处理。

可用的回调函数有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
c.OnRequest(func(r *colly.Request) {
fmt.Println("Visiting", r.URL)
})
c.OnError(func(_ *colly.Response, err error) {
log.Println("Something went wrong:", err)
})
c.OnResponse(func(r *colly.Response) {
fmt.Println("Visited", r.Request.URL)
})
c.OnHTML("a[href]", func(e *colly.HTMLElement) {
e.Request.Visit(e.Attr("href"))
})
c.OnHTML("tr td:nth-of-type(1)", func(e *colly.HTMLElement) {
fmt.Println("First column of a table row:", e.Text)
})
c.OnXML("//h1", func(e *colly.XMLElement) {
fmt.Println(e.Text)
})
c.OnScraped(func(r *colly.Response) {
fmt.Println("Finished", r.Request.URL)
})

回调函数的执行顺序

  1. OnRequest:请求执行之前调用
  2. OnError:请求过程中出现Error时调用
  3. OnResponse:收到response之后调用
  4. OnHTML:如果收到的内容是HTML,就在onResponse执行后调用
  5. OnXML:如果收到的内容是HTML或者XML,就在onHTML执行后调用
  6. OnScraped:在OnXML执行后调用,完成所有工作后执行

访问目标地址

1
c.Visit("https://c.isme.pub/")

其他访问方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 通过一个指定的url开始采集器的采集任务,之后会调用设置好的回调函数,请求方式主要为Get请求
func (c *Collector) Visit(URL string) error {
if c.CheckHead {
if check := c.scrape(URL, "HEAD", 1, nil, nil, nil, true); check != nil {
return check
}
}
return c.scrape(URL, "GET", 1, nil, nil, nil, true)
}

// 发送head请求
func (c *Collector) Head(URL string) error {
return c.scrape(URL, "HEAD", 1, nil, nil, nil, false)
}

// 通过制定url发送Post请求开始采集器的采集任务,之后也会调用设置好的回调函数
// 参数为form参数
func (c *Collector) Post(URL string, requestData map[string]string) error {
return c.scrape(URL, "POST", 1, createFormReader(requestData), nil, nil, true)
}

// 同上,传参为原始二进制参数
func (c *Collector) PostRaw(URL string, requestData []byte) error {
return c.scrape(URL, "POST", 1, bytes.NewReader(requestData), nil, nil, true)
}

// 同上
func (c *Collector) PostMultipart(URL string, requestData map[string][]byte) error {
boundary := randomBoundary()
hdr := http.Header{}
hdr.Set("Content-Type", "multipart/form-data; boundary="+boundary)
hdr.Set("User-Agent", c.UserAgent)
return c.scrape(URL, "POST", 1, createMultipartReader(boundary, requestData), nil, hdr, true)
}

// 支持任意方法,需要传参方法、参数、上下文
// 支持的方法如下
// - "GET"
// - "HEAD"
// - "POST"
// - "PUT"
// - "DELETE"
// - "PATCH"
// - "OPTIONS"
func (c *Collector) Request(method, URL string, requestData io.Reader, ctx *Context, hdr http.Header) error {
return c.scrape(URL, method, 1, requestData, ctx, hdr, true)
}

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"fmt"
"github.com/gocolly/colly/v2"
"log"
"strings"
)

func main() {
c := colly.NewCollector()
c.OnRequest(func(r *colly.Request) {
fmt.Println("Visiting OnRequest", r.URL)
})
c.OnError(func(_ *colly.Response, err error) {
log.Println("Something went wrong:", err)
})
c.OnResponse(func(r *colly.Response) {
fmt.Println("Visited OnResponse", r.Request.URL)
})
c.OnHTML("a[href]", func(e *colly.HTMLElement) {
link := e.Attr("href")
if strings.HasPrefix(e.Request.AbsoluteURL(link), "https://c.isme.pub") {
text := strings.ReplaceAll(strings.ReplaceAll(e.Text, "\n", ""), " ", "")
fmt.Printf("Link found: %q -> %s\n", text, e.Request.AbsoluteURL(link))
c.Visit(e.Request.AbsoluteURL(link))
}
})
c.OnScraped(func(r *colly.Response) {
fmt.Println("Finished OnScraped", r.Request.URL)
})
c.Visit("https://c.isme.pub/")
}

配置

Colly是一个高度可定制的爬虫框架。它为开发者提供了大量的配置选项,并且为每个选项提供了比较合理的默认值。

全部可配置参数

创建不带参数的collector

1
c := colly.NewCollector()

创建collector时设置参数

1
2
3
4
5
6
c := colly.NewCollector(
// 默认参数为"colly - https://github.com/gocolly/colly"
colly.UserAgent("isme.pub"),
// 允许重复访问,默认false,设置为true
colly.AllowURLRevisit(),
)

创建collector后随时可以修改参数

1
2
3
c := colly.NewCollector()
c.UserAgent = "isme.pub"
c.AllowURLRevisit = true

任意位置修改参数

collector的配置可以在爬虫执行到任何阶段改变。

一个经典的例子:通过随机修改 user-agent,可以帮助我们实现简单的反爬。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"github.com/gocolly/colly/v2"
"math/rand"
"strings"
)
const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
func RandomString() string {
b := make([]byte, rand.Intn(10)+10)
for i := range b {
b[i] = letterBytes[rand.Intn(len(letterBytes))]
}
return string(b)
}

func main() {
c := colly.NewCollector()

c.OnRequest(func(r *colly.Request) {
r.Headers.Set("User-Agent", RandomString())
})

c.Visit("https://c.isme.pub/")
}

通过环境变量来修改参数

colly支持通过设置环境变量来配置collector的默认配置,这样我们就可以在不重新编译的情况下修改配置了。

需要注意的是环境变量的配置是在 collector 初始化的最后一步生效的,程序正式启动之后,对每个配置的修改,都会覆盖从环境变量中读取到的配置

支持的配置项,如下:

1
2
3
4
5
6
7
8
9
10
ALLOWED_DOMAINS (字符串切片) 允许访问的域名,比如 []string{"c.isme.pub", "z.isme.pub"}
CACHE_DIR (string) 缓存目录配置
DETECT_CHARSET (y/n) 是否检测响应内容的编码
DISABLE_COOKIES (y/n) 是否禁止cookies
DISALLOWED_DOMAINS (字符串切片) 禁止访问的域名,同ALLOWED_DOMAINS类型
IGNORE_ROBOTSTXT (y/n) 是否忽略ROBOTS协议
MAX_BODY_SIZE (int) 配置响应包的最大大小
MAX_DEPTH (int - 0 means infinite) 设置访问最大深度
PARSE_HTTP_ERROR_RESPONSE (y/n) 是否解析HTTP的响应错误
USER_AGENT (string) 浏览器ua

HTTP配置

Colly使用Golang默认的http客户端作为网络层。可以通过更改默认的HTTP roundtripper来调整HTTP配置

1
2
3
4
5
6
7
8
9
10
11
12
13
c := colly.NewCollector()
c.WithTransport(&http.Transport{
Proxy: http.ProxyFromEnvironment, // 代理
DialContext: (&net.Dialer{
Timeout: 30 * time.Second, // 超时时间
KeepAlive: 30 * time.Second, // KeepAlive 超时时间
DualStack: true, // 是否开启双协议栈
}).DialContext,
MaxIdleConns: 100, // 最大空闲连接数
IdleConnTimeout: 90 * time.Second, // 空闲连接超时时间
TLSHandshakeTimeout: 10 * time.Second, // TLS握手超时时间
ExpectContinueTimeout: 1 * time.Second, // 如果非零,如果指定请求包含“Expect: 100-continue”报头,则在写完请求报头后等待服务器第一个响应报头的时间。0表示没有超时,并立即发送正文,而无需等待服务器批准。这个时间不包括发送请求头的时间。
}

Debug调试

在部分场景下,在回调函数中通过log.Println()输出信息就足够了,但是对于一些复杂的场景就明显不够用了。Colly内置了调试收集器的功能。我们可以使用调试器接口和不同类型的调试器来实现信息的收集工作。

将调试器附加到收集器

附加一个基本的日志调试器需要 Colly 的 repo 中的debug( github.com/gocolly/colly/debug) 包。

Debugger

logdebugger

1
2
3
4
5
6
7
8
9
import (
"github.com/gocolly/colly/v2"
"github.com/gocolly/colly/v2/debug"
)

func main() {
c := colly.NewCollector(colly.Debugger(&debug.LogDebugger{}))
// [..]
}

LogDebugger

LogDebuggercolly提供给我们的内置数据结构,我们可以通过定义发现,我们只需要给他提供一个io.Writer类型的对象就可以进行日志输出了

1
2
3
4
5
6
7
8
9
10
11
12
13
// LogDebugger is the simplest debugger which prints log messages to the STDERR
type LogDebugger struct {
// Output is the log destination, anything can be used which implements them
// io.Writer interface. Leave it blank to use STDERR
Output io.Writer
// Prefix appears at the beginning of each generated log line
Prefix string
// Flag defines the logging properties.
Flag int
logger *log.Logger
counter int32
start time.Time
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"fmt"
"github.com/gocolly/colly/v2"
"github.com/gocolly/colly/v2/debug"
"math/rand"
"os"
"strings"
)

func main() {
logFile, err := os.OpenFile("isme.log", os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
panic(err)
}
c := colly.NewCollector(
colly.Debugger(&debug.LogDebugger{Output: logFile}),
)

c.OnHTML("a[href]", func(e *colly.HTMLElement) {
link := e.Attr("href")
if strings.HasPrefix(e.Request.AbsoluteURL(link), "https://c.isme.pub") {
text := strings.ReplaceAll(strings.ReplaceAll(e.Text, "\n", ""), " ", "")
fmt.Printf("Link found: %q -> %s\n", text, e.Request.AbsoluteURL(link))
c.Visit(e.Request.AbsoluteURL(link))
}
})

c.Visit("https://c.isme.pub/")
}

日志输出

1
2
3
4
5
6
7
8
9
$ tail isme.log
...
[000021] 1 [ 4 - html] map["selector":"a[href]" "url":"https://c.isme.pub/k8s/"] (199.679625ms)
[000022] 1 [ 4 - html] map["selector":"a[href]" "url":"https://c.isme.pub/k8s/"] (199.713417ms)
[000023] 1 [ 4 - html] map["selector":"a[href]" "url":"https://c.isme.pub/k8s/"] (199.727375ms)
[000024] 1 [ 4 - html] map["selector":"a[href]" "url":"https://c.isme.pub/k8s/"] (199.740792ms)
[000025] 1 [ 4 - html] map["selector":"a[href]" "url":"https://c.isme.pub/k8s/"] (199.755167ms)
[000026] 1 [ 4 - html] map["selector":"a[href]" "url":"https://c.isme.pub/k8s/"] (199.766875ms)
...

分布式爬虫

分布式爬虫可以通过爬取任务的要求,通过不同的方法来实现。大多数情况下,爬虫跨站网络层通信就足够了(并发发起网络请求)。

Colly中分布式爬虫可以通过几种方式来实现:

  • 代理层
  • 执行层
  • 存储层

代理池

通过设置代理池,我们可以将不同的HTTP请求通过不同的代理进行访问,这样有2个优点:

  1. 网络层面上可以提高整体的爬取速度
  2. 防止单个IP访问速度过快导致被封禁

但是使用这种方式,代理池切换器仍然是集中式的,爬取任务的执行还是从单个节点上发起的。

内置代理切换器proxy.RoundRobinProxySwitcher

Colly有一个内置的代理切换器,可以在每个请求上轮换代理列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package main

import (
"github.com/gocolly/colly/v2"
"github.com/gocolly/colly/v2/proxy"
)

func main() {
c := colly.NewCollector()

if p, err := proxy.RoundRobinProxySwitcher(
"socks5://127.0.0.1:1337",
"socks5://127.0.0.1:1338",
"http://127.0.0.1:8080",
); err == nil {
c.SetProxyFunc(p)
}
// ...
}

自定义代理切换器

Colly支持用户自定义代理切换器,并且通过SetProxyFunc()更换代理切换器。

1
2
3
4
5
6
7
8
9
10
11
var proxies []*url.URL = []*url.URL{
&url.URL{Host: "127.0.0.1:8080"},
&url.URL{Host: "127.0.0.1:8081"},
}

func randomProxySwitcher(_ *http.Request) (*url.URL, error) {
return proxies[random.Intn(len(proxies))], nil
}

// ...
c.SetProxyFunc(randomProxySwitcher)

分布式爬虫

对于分布式爬虫,我们需要管理的是调度器和执行器,通过调度器发起任务,将任务发送给部署在不同节点的执行器中,实际的爬取任务让执行节点来执行,这样就可以实现真正意义上的分布式爬虫了。

要实现分布式爬虫,还需要解决几个问题:

  1. 需要先定义好调度器和执行器之间的通信协议,如:HTTPTCPGoogle App Engine
  2. 需要集中式存储持久化cookie和访问的url处理结果

Colly 内置了 Google App Engine 的方法。Collector.Appengine(*http.Request)

HTTP执行器的实现

下面是一个Colly提供的HTTP执行器的源码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package main

import (
"encoding/json"
"log"
"net/http"

"github.com/gocolly/colly/v2"
)

type pageInfo struct {
StatusCode int
Links map[string]int
}

func handler(w http.ResponseWriter, r *http.Request) {
URL := r.URL.Query().Get("url")
if URL == "" {
log.Println("missing URL argument")
return
}
log.Println("visiting", URL)

c := colly.NewCollector()

p := &pageInfo{Links: make(map[string]int)}

// count links
c.OnHTML("a[href]", func(e *colly.HTMLElement) {
link := e.Request.AbsoluteURL(e.Attr("href"))
if link != "" {
p.Links[link]++
}
})

// extract status code
c.OnResponse(func(r *colly.Response) {
log.Println("response received", r.StatusCode)
p.StatusCode = r.StatusCode
})
c.OnError(func(r *colly.Response, err error) {
log.Println("error:", r.StatusCode, err)
p.StatusCode = r.StatusCode
})

c.Visit(URL)

// dump results
b, err := json.Marshal(p)
if err != nil {
log.Println("failed to serialize response:", err)
return
}
w.Header().Add("Content-Type", "application/json")
w.Write(b)
}

func main() {
// example usage: curl -s 'http://127.0.0.1:7171/?url=http://go-colly.org/'
addr := ":7171"

http.HandleFunc("/", handler)

log.Println("listening on", addr)
log.Fatal(http.ListenAndServe(addr, nil))
}

基于官方提供的示例简单实现一个调度器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
"fmt"
"github.com/gocolly/colly/v2"
"github.com/gocolly/colly/v2/debug"
"github.com/imroc/req"
"os"
"strings"
)


func main() {
logFile, err := os.OpenFile("isme.log", os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
panic(err)
}
c := colly.NewCollector(
colly.Debugger(&debug.LogDebugger{Output: logFile}),
)

c.OnHTML("a[href]", func(e *colly.HTMLElement) {
link := e.Attr("href")
if strings.HasPrefix(e.Request.AbsoluteURL(link), "https://c.isme.pub") {
text := strings.ReplaceAll(strings.ReplaceAll(e.Text, "\n", ""), " ", "")
fmt.Printf("Link found: %q -> %s\n", text, e.Request.AbsoluteURL(link))
// 这里可以自定义一个节点轮询器,用来轮询启动爬虫任务的节点,来将不同的任务发送给不同的执行节点
url := "http://127.0.0.1:7171/?url=" + e.Request.AbsoluteURL(link)
s,_:=req.Get(url)
fmt.Println(s.String())
}
})

c.Visit("https://c.isme.pub/")
}

分布式爬虫的集中存储

我们虽然将执行任务分配到了不同的执行节点,但是请求的cookie、url、处理结果等都临时存储在执行节点的内存中,对于一些简单场景下的任务,这就足够了,但是在处理大规模请求,需要长时间处理大量数据时,这种方式就不能满足使用了,因此我们需要将数据存储在集中存储中,以此来实现不同执行节点的数据共享

内置存储

目前Colly支持的存储有以下几种:

1
2
3
4
5
6
7
8
9
10
11
12
13
type Storage interface {
// Init initializes the storage
Init() error
// Visited receives and stores a request ID that is visited by the Collector
Visited(requestID uint64) error
// IsVisited returns true if the request was visited before IsVisited
// is called
IsVisited(requestID uint64) (bool, error)
// Cookies retrieves stored cookies for a given host
Cookies(u *url.URL) string
// SetCookies stores cookies for a given host
SetCookies(u *url.URL, cookies string)
}

memory

Colly默认的存储方式,可以通过collector.SetStorage()方法修改成其他的存储。

redis

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import (
"fmt"
"strings"

"github.com/gocolly/colly/v2"
"github.com/gocolly/redisstorage"
)

func main() {
c := colly.NewCollector()

storage := &redisstorage.Storage{
Address: "127.0.0.1:6379",
Password: "",
DB: 2,
Prefix: "isme",
}

err := c.SetStorage(storage)
if err != nil {
panic(err)
}

// delete previous data from storage
if err := storage.Clear(); err != nil {
log.Fatal(err)
}

// close redis client
defer storage.Client.Close()

c.OnHTML("a[href]", func(e *colly.HTMLElement) {
link := e.Attr("href")
if strings.HasPrefix(e.Request.AbsoluteURL(link), "https://c.isme.pub") {
text := strings.ReplaceAll(strings.ReplaceAll(e.Text, "\n", ""), " ", "")
fmt.Printf("Link found: %q -> %s\n", text, e.Request.AbsoluteURL(link))
}
})

c.Visit("https://c.isme.pub/")
}

sqlite3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
"log"

"github.com/gocolly/colly/v2"
"github.com/gocolly/colly/v2/extensions"
"github.com/gocolly/colly/v2/queue"
"github.com/velebak/colly-sqlite3-storage/colly/sqlite3"
)

func main() {
c := colly.NewCollector(
colly.AllowedDomains("c.isme.pub"),
)

storage := &sqlite3.Storage{
Filename: "./results.db",
}

defer storage.Close()

err := c.SetStorage(storage)
if err != nil {
panic(err)
}

q, _ := queue.New(8, storage)
q.AddURL("https://c.isme.pub")

//c.Limit(&colly.LimitRule{DomainGlob: "*", Parallelism: 4})

// Find and visit all links
c.OnHTML("a[href]", func(e *colly.HTMLElement) {
q.AddURL(e.Request.AbsoluteURL(e.Attr("href")))
})

q.Run(c)
log.Println(c)
}

mongo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"fmt"

"github.com/gocolly/colly/v2"
"github.com/zolamk/colly-mongo-storage/colly/mongo"
)

func main() {

c := colly.NewCollector()

storage := &mongo.Storage{
Database: "colly",
URI: "mongodb://127.0.0.1:27017",
}

if err := c.SetStorage(storage); err != nil {
panic(err)
}

// Find and visit all links
c.OnHTML("a[href]", func(e *colly.HTMLElement) {
e.Request.Visit(e.Attr("href"))
})

c.OnRequest(func(r *colly.Request) {
fmt.Println("Visiting", r.URL)
})

c.Visit("https://c.isme.pub/")
}

自定义存储

Colly还支持用户自定义存储,只需要用户实现colly/storage.Storage接口,就可以当做存储后端来用了,至于如何实现,我们后面会出单独的篇章来讲。

1
2
3
4
5
6
7
8
9
10
11
12
13
type Storage interface {
// Init initializes the storage
Init() error
// Visited receives and stores a request ID that is visited by the Collector
Visited(requestID uint64) error
// IsVisited returns true if the request was visited before IsVisited
// is called
IsVisited(requestID uint64) (bool, error)
// Cookies retrieves stored cookies for a given host
Cookies(u *url.URL) string
// SetCookies stores cookies for a given host
SetCookies(u *url.URL, cookies string)
}

分布式爬虫-队列

Collyv2版本添加了使用存储当做队列的功能,我们可以通过调度器将需要爬取的任务加入到队列中,然后执行节点可以从集中存储中获取爬取任务并执行任务。

Colly中支持的存储只要实现了以下接口都可以用作队列使用:

1
2
3
4
5
6
7
8
9
10
11
type Storage interface {
// Init initializes the storage
Init() error
// AddRequest adds a serialized request to the queue
AddRequest([]byte) error
// GetRequest pops the next request from the queue
// or returns error if the queue is empty
GetRequest() ([]byte, error)
// QueueSize returns with the size of the queue
QueueSize() (int, error)
}

memoy队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main

import (
"fmt"
"strings"

"github.com/gocolly/colly/v2"
"github.com/gocolly/colly/v2/queue"
)

func main() {
url := "https://c.isme.pub/"
c := colly.NewCollector(
colly.MaxDepth(1),
)

// create a request queue with 2 consumer threads
q, _ := queue.New(
2, // Number of consumer threads
&queue.InMemoryQueueStorage{MaxSize: 10000}, // Use default queue storage
)

c.OnHTML("a[href]", func(e *colly.HTMLElement) {
link := e.Attr("href")
if strings.HasPrefix(e.Request.AbsoluteURL(link), "https://c.isme.pub") {
text := strings.ReplaceAll(strings.ReplaceAll(e.Text, "\n", ""), " ", "")
fmt.Printf("Link found: %q -> %s\n", text, e.Request.AbsoluteURL(link))
q.AddURL(e.Request.AbsoluteURL(link))
}
})

q.AddURL(url)
q.Run(c)
}

redis队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package main

import (
"fmt"
"log"
"strings"

"github.com/gocolly/colly/v2"
"github.com/gocolly/colly/v2/queue"
"github.com/gocolly/redisstorage"
)

func main() {
urls := []string{
"https://c.isme.pub/",
"https://c.isme.pub/linux",
"https://c.isme.pub/go",
"https://c.isme.pub/k8s",
}

c := colly.NewCollector()

// create the redis storage
storage := &redisstorage.Storage{
Address: "127.0.0.1:6379",
Password: "",
DB: 2,
Prefix: "isme",
}

// add storage to the collector
err := c.SetStorage(storage)
if err != nil {
panic(err)
}

// delete previous data from storage
if err := storage.Clear(); err != nil {
log.Fatal(err)
}

// close redis client
defer storage.Client.Close()

// create a new request queue with redis storage backend
q, _ := queue.New(2, storage)

c.OnResponse(func(r *colly.Response) {
log.Println("Cookies:", c.Cookies(r.Request.URL.String()))
})

c.OnHTML("a[href]", func(e *colly.HTMLElement) {
link := e.Attr("href")
if strings.HasPrefix(e.Request.AbsoluteURL(link), "https://c.isme.pub") {
text := strings.ReplaceAll(strings.ReplaceAll(e.Text, "\n", ""), " ", "")
fmt.Printf("Link found: %q -> %s\n", text, e.Request.AbsoluteURL(link))
q.AddURL(e.Request.AbsoluteURL(link))
}
})

// add URLs to the queue
for _, u := range urls {
q.AddURL(u)
}
// consume requests
q.Run(c)
}

使用多个采集器

在我们实际的爬取任务中,经常会碰到需要处理不同逻辑的多个页面的情况,比如:通过父页面获取到多个子页面的链接,然后需要用另一种逻辑去处理子页面。

对于这种复杂的爬取任务,只使用一个采集器就不合适了。对于这种场景,我们可以针对不同处理逻辑的页面,定义多个不同的采集器,通过不同的采集器来处理不同的页面。

克隆采集器

通过Clone()可以将父采集器的配置复制到克隆出来的采集器中,克隆的时候只会克隆配置,不会克隆采集器的回调方法。

1
2
3
4
5
6
c := colly.NewCollector(
colly.UserAgent("myUserAgent"),
colly.AllowedDomains("foo.com", "bar.com"),
)
// Custom User-Agent and allowed domains are cloned to c2
c2 := c.Clone()

在多个采集器间传递自定义数据

使用收集器的Request()功能能够与其他收集器共享上下文。

共享上下文示例:

1
2
3
4
c.OnResponse(func(r *colly.Response) {
r.Ctx.Put(r.Headers.Get("Custom-Header"))
c2.Request("GET", "https://foo.com/", nil, r.Ctx, nil)
})

这个 Context 只是 Colly 实现的数据共享的数据结构,并非 Go 标准库中的 Context

官方示例

coursera course scraper,这个示例中使用了两个收集器:

  • 一个解析列表视图并处理分页
  • 另一个收集课程详细信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package main

import (
"encoding/json"
"log"
"os"
"strings"

"github.com/gocolly/colly/v2"
)

// Course stores information about a coursera course
type Course struct {
Title string
Description string
Creator string
Level string
URL string
Language string
Commitment string
HowToPass string
Rating string
}

func main() {
// Instantiate default collector
c := colly.NewCollector(
// Visit only domains: coursera.org, www.coursera.org
colly.AllowedDomains("coursera.org", "www.coursera.org"),

// Cache responses to prevent multiple download of pages
// even if the collector is restarted
colly.CacheDir("./coursera_cache"),
)

// Create another collector to scrape course details
detailCollector := c.Clone()

courses := make([]Course, 0, 200)

// On every a element which has href attribute call callback
c.OnHTML("a[href]", func(e *colly.HTMLElement) {
// If attribute class is this long string return from callback
// As this a is irrelevant
if e.Attr("class") == "Button_1qxkboh-o_O-primary_cv02ee-o_O-md_28awn8-o_O-primaryLink_109aggg" {
return
}
link := e.Attr("href")
// If link start with browse or includes either signup or login return from callback
if !strings.HasPrefix(link, "/browse") || strings.Index(link, "=signup") > -1 || strings.Index(link, "=login") > -1 {
return
}
// start scaping the page under the link found
e.Request.Visit(link)
})

// Before making a request print "Visiting ..."
c.OnRequest(func(r *colly.Request) {
log.Println("visiting", r.URL.String())
})

// On every a HTML element which has name attribute call callback
c.OnHTML(`a[name]`, func(e *colly.HTMLElement) {
// Activate detailCollector if the link contains "coursera.org/learn"
courseURL := e.Request.AbsoluteURL(e.Attr("href"))
if strings.Index(courseURL, "coursera.org/learn") != -1 {
detailCollector.Visit(courseURL)
}
})

// Extract details of the course
detailCollector.OnHTML(`div[id=rendered-content]`, func(e *colly.HTMLElement) {
log.Println("Course found", e.Request.URL)
title := e.ChildText(".course-title")
if title == "" {
log.Println("No title found", e.Request.URL)
}
course := Course{
Title: title,
URL: e.Request.URL.String(),
Description: e.ChildText("div.content"),
Creator: e.ChildText("div.creator-names > span"),
}
// Iterate over rows of the table which contains different information
// about the course
e.ForEach("table.basic-info-table tr", func(_ int, el *colly.HTMLElement) {
switch el.ChildText("td:first-child") {
case "Language":
course.Language = el.ChildText("td:nth-child(2)")
case "Level":
course.Level = el.ChildText("td:nth-child(2)")
case "Commitment":
course.Commitment = el.ChildText("td:nth-child(2)")
case "How To Pass":
course.HowToPass = el.ChildText("td:nth-child(2)")
case "User Ratings":
course.Rating = el.ChildText("td:nth-child(2) div:nth-of-type(2)")
}
})
courses = append(courses, course)
})

// Start scraping on http://coursera.com/browse
c.Visit("https://coursera.org/browse")

enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")

// Dump json to the standard output
enc.Encode(courses)
}

优化爬虫配置

Colly的默认配置是针对一些简单、数据量小的场景下的配置,如果你需要对大量的目标进行爬取,就需要对默认的配置进行优化了

持久化集中存储

前面提到过了,我们需要将一些数据存储在集中存储中。

存储官方文档

递归启动异步任务

默认情况下,Colly会阻塞请求直到请求返回,因此Collector.Visit从回调中递归调用就会产生大量的堆栈,这个时候我们就需要开启异步Collector.Async = true来避免这种问题。开启异步功能后,需要增加c.Wait()来等待任务的结束。

禁用KeepAlive连接

Colly默认会开启KeepAlive来提高爬取速度,但是这会导致文件描述符的占用增加,我们要么修改系统参数来增加进程可以使用的文件描述符,或者我们可以关闭KeepAlive来减少文件描述符的增加。

1
2
3
4
c := colly.NewCollector()
c.WithTransport(&http.Transport{
DisableKeepAlives: true,
})

扩展插件

Colly还提供了一些扩展功能,主要与爬虫相关的一些常用功能: colly/extensions/

  • referer
  • random_user_agent
  • url_length_filter

以下示例启用随机 User-Agent 切换器和 Referrer 设置器扩展并访问 httpbin.org 两次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import (
"log"

"github.com/gocolly/colly/v2"
"github.com/gocolly/colly/v2/extensions"
)

func main() {
c := colly.NewCollector()
visited := false

extensions.RandomUserAgent(c)
extensions.Referrer(c)

c.OnResponse(func(r *colly.Response) {
log.Println(string(r.Body))
if !visited {
visited = true
r.Request.Visit("/get?q=2")
}
})

c.Visit("http://httpbin.org/get")
}

扩展插件官方源码

官方地址

通过阅读官方提供的扩展源码我们可以发现,扩展的实现非常的简单,基本都是基于c.OnResponsec.OnRequest来实现的,这样我们就能很轻松的自定义扩展了。

RandomUserAgent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package extensions

import (
"github.com/gocolly/colly/v2"
)

// Referer sets valid Referer HTTP header to requests.
// Warning: this extension works only if you use Request.Visit
// from callbacks instead of Collector.Visit.
func Referer(c *colly.Collector) {
c.OnResponse(func(r *colly.Response) {
r.Ctx.Put("_referer", r.Request.URL.String())
})
c.OnRequest(func(r *colly.Request) {
if ref := r.Ctx.Get("_referer"); ref != "" {
r.Headers.Set("Referer", ref)
}
})
}

Referer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package extensions

import (
"github.com/gocolly/colly/v2"
)

// Referer sets valid Referer HTTP header to requests.
// Warning: this extension works only if you use Request.Visit
// from callbacks instead of Collector.Visit.
func Referer(c *colly.Collector) {
c.OnResponse(func(r *colly.Response) {
r.Ctx.Put("_referer", r.Request.URL.String())
})
c.OnRequest(func(r *colly.Request) {
if ref := r.Ctx.Get("_referer"); ref != "" {
r.Headers.Set("Referer", ref)
}
})
}

URLLengthFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package extensions

import (
"github.com/gocolly/colly/v2"
)

// URLLengthFilter filters out requests with URLs longer than URLLengthLimit
func URLLengthFilter(c *colly.Collector, URLLengthLimit int) {
c.OnRequest(func(r *colly.Request) {
if len(r.URL.String()) > URLLengthLimit {
r.Abort()
}
})
}

更多官方示例

Colly提供的官方文档内容特别少,但是通过官方文档以及源码的学习,我们就已经能够初步掌握Colly的使用技巧了。后续除了多加练习以外,官方还提供了一些不同场景的示例以供我们参考、学习。

官方示例