Beanstalkd的使用(Golang)

前端之家收集整理的这篇文章主要介绍了Beanstalkd的使用(Golang)前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
最近需要引入一种新的消息队列,这个队列最好有专业、简单、消息不丢失等特性,但又不会引入过多的复杂性,
特别是在目前单枪匹马的情况下。然后发现Beanstalkd看起来是我所需要的.
Beanstalkd 支持任务优先级 (priority),延时 (delay),超时重发 (time-to-run) 和预留 (buried),
同时支持binlog.最后速度还可以。
看了下源码,c语言代码量小而清晰.作者从07年维护到14年,Star也很高,质量应当是有保障的。
队列作者提供了Go客户端,从作者项目列表中可以看到已经写了不少Go相关的东西,看来Go很受后台开发的欢迎。

Beanstalkd 主页在这: http://kr.github.io/beanstalkd

写了个调用例子如下.

/*
  xcl (2015-8-15)
  多TubeName 多消费者
*/

package main

import (
	"fmt"
	"github.com/kr/beanstalk"
	"runtime"
	"strings"
	"time"
)

var (
	TubeName1 string = "channel1"
	TubeName2 string = "channel2"
)

func Producer(fname,tubeName string) {
	if fname == "" || tubeName == "" {
		return
	}

	c,err := beanstalk.Dial("tcp","127.0.0.1:11300")
	if err != nil {
		panic(err)
	}
	defer c.Close()

	c.Tube.Name = tubeName
	c.TubeSet.Name[tubeName] = true
	fmt.Println(fname," [Producer] tubeName:",tubeName," c.Tube.Name:",c.Tube.Name)

	for i := 0; i < 5; i++ {
		msg := fmt.Sprintf("for %s %d",i)
		c.Put([]byte(msg),30,120*time.Second)
		fmt.Println(fname," [Producer] beanstalk put body:",msg)
		//time.Sleep(1 * time.Second)
	}

	c.Close()
	fmt.Println("Producer() end.")
}

func Consumer(fname,"127.0.0.1:11300")
	if err != nil {
		panic(err)
	}
	defer c.Close()

	c.Tube.Name = tubeName
	c.TubeSet.Name[tubeName] = true

	fmt.Println(fname," [Consumer] tubeName:",c.Tube.Name)

	substr := "timeout"
	for {
		fmt.Println(fname," [Consumer]///////////////////////// ")
		//从队列中取出
		id,body,err := c.Reserve(1 * time.Second)
		if err != nil {
			if !strings.Contains(err.Error(),substr) {
				fmt.Println(fname," [Consumer] [",c.Tube.Name,"] err:",err," id:",id)
			}
			continue
		}
		fmt.Println(fname,"] job:",id," body:",string(body))

		//从队列中清掉
		err = c.Delete(id)
		if err != nil {
			fmt.Println(fname,"] Delete err:",id)
		} else {
			fmt.Println(fname,"] Successfully deleted. id:",id)
		}
		fmt.Println(fname," [Consumer]/////////////////////////")
		//time.Sleep(1 * time.Second)
	}
	fmt.Println("Consumer() end. ")
}

func main() {
	runtime.GOMAXPROCS(runtime.Numcpu())

	go Producer("PA",TubeName1)
	go Producer("PB",TubeName2)

	go Consumer("CA",TubeName1)
	go Consumer("CB",TubeName2)

	time.Sleep(10 * time.Second)
}

/*
运行结果:

XCLdeiMac:src xcl$ clear
XCLdeiMac:src xcl$ go run testmq.go
CB  [Consumer] tubeName: channel2  c.Tube.Name: channel2
CA  [Consumer] tubeName: channel1  c.Tube.Name: channel1
CB  [Consumer]/////////////////////////
CA  [Consumer]/////////////////////////
PB  [Producer] tubeName: channel2  c.Tube.Name: channel2
PA  [Producer] tubeName: channel1  c.Tube.Name: channel1
PB  [Producer] beanstalk put body: for channel2 0
PA  [Producer] beanstalk put body: for channel1 0
CA  [Consumer] [ channel1 ] job: 47027  body: for channel1 0
CB  [Consumer] [ channel2 ] job: 47026  body: for channel2 0
PB  [Producer] beanstalk put body: for channel2 1
PA  [Producer] beanstalk put body: for channel1 1
CB  [Consumer] [ channel2 ] Successfully deleted. id: 47026
CB  [Consumer]/////////////////////////
CB  [Consumer]/////////////////////////
CA  [Consumer] [ channel1 ] Successfully deleted. id: 47027
CA  [Consumer]/////////////////////////
CA  [Consumer]/////////////////////////
CA  [Consumer] [ channel1 ] job: 47028  body: for channel1 1
PA  [Producer] beanstalk put body: for channel1 2
CB  [Consumer] [ channel2 ] job: 47029  body: for channel2 1
PB  [Producer] beanstalk put body: for channel2 2
PA  [Producer] beanstalk put body: for channel1 3
CA  [Consumer] [ channel1 ] Successfully deleted. id: 47028
CA  [Consumer]/////////////////////////
CA  [Consumer]/////////////////////////
CB  [Consumer] [ channel2 ] Successfully deleted. id: 47029
PB  [Producer] beanstalk put body: for channel2 3
CB  [Consumer]/////////////////////////
CB  [Consumer]/////////////////////////
PB  [Producer] beanstalk put body: for channel2 4
CB  [Consumer] [ channel2 ] job: 47030  body: for channel2 2
CA  [Consumer] [ channel1 ] job: 47031  body: for channel1 2
PA  [Producer] beanstalk put body: for channel1 4
Producer() end.
Producer() end.
CA  [Consumer] [ channel1 ] Successfully deleted. id: 47031
CA  [Consumer]/////////////////////////
CA  [Consumer]/////////////////////////
CB  [Consumer] [ channel2 ] Successfully deleted. id: 47030
CB  [Consumer]/////////////////////////
CB  [Consumer]/////////////////////////
CB  [Consumer] [ channel2 ] job: 47033  body: for channel2 3
CA  [Consumer] [ channel1 ] job: 47032  body: for channel1 3
CB  [Consumer] [ channel2 ] Successfully deleted. id: 47033
CA  [Consumer] [ channel1 ] Successfully deleted. id: 47032
CB  [Consumer]/////////////////////////
CB  [Consumer]/////////////////////////
CA  [Consumer]/////////////////////////
CA  [Consumer]/////////////////////////
CA  [Consumer] [ channel1 ] job: 47034  body: for channel1 4
CB  [Consumer] [ channel2 ] job: 47035  body: for channel2 4
CB  [Consumer] [ channel2 ] Successfully deleted. id: 47035
CB  [Consumer]/////////////////////////
CA  [Consumer] [ channel1 ] Successfully deleted. id: 47034
CB  [Consumer]/////////////////////////
XCLdeiMac:src xcl$
*/
可用beanstool来查看队列状态

也可以参考我写下面两段,来查。

ar,er := c.ListTubes()
if er != nil {
	fmt.Println("[Example]  er:",er)
} else {
	for i,v := range ar {
		fmt.Println("[Example] ListTubes  i:",i," v:",v)
		c.Tube.Name = v
		id,err := c.Reserve(5 * time.Second)
		if err != nil {
			fmt.Println("[Example] err:"," name:",c.Tube.Name)
			continue
		} else {
			fmt.Println("[Example] job:",id)
			fmt.Println("[Example] body:",string(body))
		}

	}
}

func tubeStatus(c *beanstalk.Conn) {
	fmt.Println("[tubeStatus]/////////////////////////")
	fmt.Println("Tube(",") Stats:")
	m,er := c.Tube.Stats()
	if er != nil {
		fmt.Println("[tubeStatus] err:",er)
	} else {
		for k,v := range m {
			fmt.Println(k," : ",v)
		}
	}
	fmt.Println("[tubeStatus]/////////////////////////")
}
从测试看,Beanstalkd 足以满足我现在的需求了.

BLOG: http://blog.csdn.net/xcl168

原文链接:https://www.f2er.com/go/190266.html

猜你在找的Go相关文章