Index ¦ Archives ¦ Atom

Salt zmq_filtering测试

Salt 2014.7新增 zmq_filtering 配置项, 基于zeromq PUB-SUB Envelopes 技术, 在Master端(publisher)进行message过滤(ZeroMQ 3.0+版本,之前版本是在subscriber端进行过滤), 以实现如果该指令只是少量主机执行的话, 只将指令发送到匹配的Minion端, 而并非发送到所有的Minion端. 需要注意的是, 目前zmq_filtering只作用list target, 即使用-L来指定target. 本文将对其进行功能及效果测试.

前置阅读

环境说明

  • OS: CentOS 6.5 X86_64
  • Salt: Master/Minion架构, 2014.7.1版本

开工

配置zmq_filtering

Note

zmq_filtering参数在Master端及Minion端均需要配置才能生效, 默认为False

Master端配置zmq_filtering

echo "zmq_filtering: True" >> /etc/salt/master
service salt-master restart

Minion端配置zmq_filtering

echo "zmq_filtering: True" >> /etc/salt/minion
service salt-minion restart

操作完毕后, Master进行test.ping测试

salt '*' test.ping

输出如下内容:

minion-02.example.com:
    True
minion-01.example.com:
    True

测试zmq_filtering

Note

以下操作如非特别声明, 均在Master端进行

在Master端开启一个新的控制台, 使用tcpdump进行抓包

tcpdump -i eth0 src port 4505

使用Globbing target进行测试

salt 'minion-01.example.com' test.ping

tcpdump抓包结果如下:

07:21:18.104954 IP salt-master.example.com.4505 > salt-minion-01.example.com.58024: Flags [P.], seq 396:602, ack 1, win 227, options [nop,nop,TS val 4253627 ecr 4239803], length 206
07:21:18.105190 IP salt-master.example.com.4505 > salt-minion-02.example.com.33595: Flags [P.], seq 396:602, ack 1, win 227, options [nop,nop,TS val 4253627 ecr 4294907239], length 206

从结果来看, 虽然指定了minion-01.example.com, 因为Salt是PUB-SUB结构, 消息均会发送到所有的Minion

使用List target进行测试

salt -L 'minion-01.example.com' test.ping

tcpdump抓包结果如下:

07:23:35.378587 IP salt-master.example.com.4505 > salt-minion-01.example.com.58024: Flags [P.], seq 602:839, ack 1, win 227, options [nop,nop,TS val 4390900 ecr 4245316], length 237

从结果看, 在Master进行了过滤, 虽然是PUB-SUB, 但消息只发送给了salt-minion-01.example.com, 并没有发送到其他Minion上, 达到了zmq_filtering的效果.

测试ZeroMQ PUB-SUB Message Envelopes性能

开启zmq_filtering, 如果不是所有Minion均需要执行的操作, 通过在Master端进行消息过滤, 能够大大降低Master端发送指令时的带宽消耗, 那么zmq_filtering的性能又如何?

由于zmq_filtering只是利用ZeroMQ的PUB-SUB Message Envelopes, 其性能测试个人觉得只需要测试ZeroMQ PUB/SUB即可. 因此就假设了如下极端场景:

  • publisher 1节点, subscriber 1000节点(单节点开启1000个线程)
  • 进行1000次消息发送, 每条消息均需要发送到所有subscriber

直接上代码:

publisher.py

import sys
import zmq
import time
import json
import hashlib

def pub():
    context = zmq.Context()
    socket = context.socket(zmq.PUB)

    socket.bind("tcp://*:5556")
    return context, socket

def sub_ids(subs=100):
    sub_list = []
    for each_sub in xrange(1, subs + 1):
        idx = hashlib.md5(str(each_sub)).hexdigest()
        sub_list.append(idx)
    return sub_list

def main(broadcast=True, times=100, subs=100):
    context, socket = pub()
    message = {'tgt_type': 'glob', 'jid': '20150106053023956920', 'tgt': '*', 'ret': '', 'user': 'sudo_vagrant', 'arg': [], 'fun': 'test.ping'}
    message = json.dumps(message)
    if not broadcast:
        sub_list = sub_ids(subs=subs)

    # sleep 30 seconds, guarantee all subscribes have  subscribed
    time.sleep(30)

    print "regain consciousness"
    start_time = time.time()
    for each in xrange(1, times+1):
        if broadcast:
            socket.send(message)
        else:
            for each_sub in sub_list:
               socket.send(each_sub, flags=zmq.SNDMORE)
               socket.send(message)

    end_time = time.time()

    print "-------------------------------"
    print "Exec times: %s, Exec time: %dms" %(times, end_time * 1000 - start_time * 1000)

if __name__ == '__main__':
    subs = 1000
    times = 1000
    if len(sys.argv) == 2 and sys.argv[1] == 'unicast':
        broadcast = False
    else:
        broadcast = True
    print "broadcast subscriber: %s" % broadcast
    main(broadcast=broadcast, times=times, subs=subs)

subscriber.py

import sys
import zmq
import threading
import hashlib

def sub(pub_uri, times=100, idx=None):
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect(pub_uri)
    if idx:
        socket.setsockopt(zmq.SUBSCRIBE, idx)
    else:
        socket.setsockopt(zmq.SUBSCRIBE, '')
    for each in xrange(1, times + 1):
        socket.recv()
    socket.close()


def main(subs=100, times=100, broadcast=True):
    pub_uri = 'tcp://salt-master.example.com:5556'
    sub_list = []
    for each_sub in xrange(1, subs+1):
        if broadcast:
            sub_list.append(threading.Thread(target=sub, args=(pub_uri,), kwargs={'times': times}))
        else:
            idx = hashlib.md5(str(each_sub)).hexdigest()
            sub_list.append(threading.Thread(target=sub, args=(pub_uri,), kwargs={'times': times, 'idx': idx}))
    for each_sub in sub_list:
        each_sub.start()
    print "subscriber start ok"
    for each_sub in sub_list:
        each_sub.join()
    print "subscriber done"


if __name__ == '__main__':
    subs = 1000
    times = 1000
    if len(sys.argv) == 2 and sys.argv[1] == 'unicast':
        broadcast = False
    else:
        broadcast = True
    print "broadcast subscriber: %s" % broadcast
    main(subs=subs, times=times, broadcast=broadcast)

测试用例1: 默认的PUB-SUB

python publisher.py    # 在Publisher(Master)端进行
python subscriber.py   # 在Subcriber(Minion-01)端进行

执行时间为 1588ms

测试用例2: 启用PUB-SUB Message Envelopes

python publisher.py unicast   # 在Publisher(Master)端进行
python subscriber.py unicast   # 在Subcriber(Minion-01)端进行

执行时间为 6786ms

两者相差5s左右, 由于本次测试, 为极端情况(1000次且每次都需要发送到所有subscriber), 增加的成本在可承受范围之内.

QA

Q: 如果只是单边启动zmq_filtering, 是否会影响使用?

A: 这里边有两种情况

  • Master端配置了zmq_filtering, 对于没有配置zmq_filtering的Minion, 将像以前一样, master端依然会将消息发送给它(不管target是否匹配), 对于已经开启zmq_filtering的minion, 则如果list target不匹配, master则不会发送消息给它.
  • Master没有配置zmq_filtering, 而Minion进行了配置, 则该Minion收不到任何指令

Q: zmq_filtering适用场景

A: 当前zmq_filtering只会匹配list target, 对于其他的target方式, 则采用默认的PUB-SUB. zmq_filtering适用于大规模集群, 但每次执行只是少数主机运行指令的场景.

© pengyao. Built using Pelican. Theme by Giulio Fidente on github. .