计算机教程网

您现在的位置是:首页 > 主机教程 > 服务器运维

服务器运维

kafka的使用—系统保卫战

2024-10-02 22:28:26服务器运维 主机评测网
最近有个需求,在不同的系统中做数据同步。我们是java+mysql、他们是c#+sqlserver。需求是sqlserver提出的,并且他们提出要实时,并且要我们主动推数据给他们。他们接口都提供好了,说要我们对数据库表操作的时候调用他们的接口把数据传他们。咋看没有什么事,不就是一个接口的调用么。仔细想想,这样对我们的系统影响还是很大的,其他的不说。重要的一点是我们的系统都依赖他们的系统了,如果他们的系统问题或网络问题会影响我们系统的操作,这显然是不可行的。为了保卫我们系统的利益。这种事是绝对不能做的。

前言

最近有个需求,在不同的系统中做数据同步。我们是java+mysql、他们是c#+sqlserver。需求是sqlserver提出的,并且他们提出要实时,并且要我们主动推数据给他们。他们接口都提供好了,说要我们对数据库表操作的时候调用他们的接口把数据传他们。咋看没有什么事,不就是一个接口的调用么。仔细想想,这样对我们的系统影响还是很大的,其他的不说。重要的一点是我们的系统都依赖他们的系统了,如果他们的系统问题或网络问题会影响我们系统的操作,这显然是不可行的。为了保卫我们系统的利益。这种事是绝对不能做的。

讨论了一下了解到,他们的需求无非就是需要实时能得到某个表的数据码。刚开始我提出,我们开一个接口,让你们查看我们从库数据不就好了,这样多省事。可是他们说自己要保存数据到sqlserver(当然还有其他原因)。他们要把事情搞复杂也没办法。当然,我们同样要保护自己的利益啊。这时候就想到了使用 MQ 消息队列的方案。我们只要在数据操作成功后吧数据传到 MQ 中,之后的处理就让他们自己做了。真的是费了好大的力气才说服让他们使用 MQ 啊~~~

下面就使用python来模拟一下我们的方案(希望大家来吐槽 :) )

软件介绍

在这里我们使用 zookeeper + kafka 的方案来做。

软件版本其他
zookeeper3.4.6
kafka2.10-0.9.0.0
pykafka2.1.2python的kafka API

zookeeper + kafka 基本使用教程

http://www.linuxidc.com/Linux/2014-07/104470.htm

先决条件

  1. 使用zookeeper、kafka创建一个topic名为 goods-topic
  2. 需要安装pykafka一个python的zookeeper、kafka API
  3. 一个goods示例数据库
  • 使用消息队列:

# 启动zookeeper/usr/local/zookeeper/bin/zkServer.sh start# 启动kafka/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties > /tmp/kafka-logs/kafka.out 2>&1 &# 创建 goods-topic/usr/local/kafka/bin/kafka-topics.sh /  --create /  --zookeeper localhost:2181 /  --replication-factor 1 /  --partitions 1 /  --topic test

  • 安装pykafka:

pip install pykafka

官网:http://readthedocs.org/projects/pykafka/

  • 创建示例数据库:

CREATE TABLE goods(  goods_id INT NOT NULL AUTO_INCREMENT,  goods_name VARCHAR(30) NOT NULL,  goods_price DECIMAL(13, 2) NOT NULL DEFAULT 0.00,  create_time DATETIME NOT NULL,  PRIMARY KEY(goods_id));

伪代码展示

  • 生产者端伪代码-python

import time, jsonfrom pykafka import KafkaClient # 相关的mysql操作mysql_op() # 可接受多个Client这是重点client = KafkaClient(hosts="192.168.1.233:9092, /                            192.168.1.233:9093, /                            192.168.1.233:9094")# 选择一个topictopic = client.topics['goods-topic']# 创建一个生产者producer = topic.get_producer()# 模拟接收前端生成的商品信息goods_dict = {  'option_type':'insert'  'option_obj':{    'goods_name':'goods-1',    'goods_price':10.00,    'create_time':time.strftime('%Y-%m-%d %H:%M:%S')  }}goods_json = json.dumps(goods_dict)# 生产消息producer.produce(msg)

  • 消费者端伪代码-python(作为后台进程在跑)

import time, jsonfrom pykafka import KafkaClient# 可接受多个Client这是重点client = KafkaClient(hosts="192.168.1.233:9092, /                            192.168.1.233:9093, /                            192.168.1.233:9094")# 选择一个topictopic = client.topics['goods-topic']# 生成一个消费者balanced_consumer = topic.get_balanced_consumer(  consumer_group='goods_group',  auto_commit_enable=True,  zookeeper_connect='localhost:2181')# 消费信息for message in balanced_consumer:  if message is not None:    # 解析json为dict    goods_dict = json.loads(message)    # 对数据库进行操作    if goods_dict['option_type'] == 'insert':      mysql_insert()    elif goods_dict['option_type'] == 'update':      mysql_update()    elif goods_dict['option_type'] == 'delete':      mysql_delete()    else:      order_option()

 作者信息

昵称:HH

QQ:275258836

感觉本文内容不错,读后有收获?

逛逛衣服店,鼓励作者写出更好文章。