我正在使用sqlAlchemy 1.0.0,并希望批量生成一些UPDATE(更新,如果匹配主键,则不执行任何操作)查询.
我做了一些实验,发现批量更新看起来比批量插入或批量upsert慢得多.
你能不能帮助我指出为什么它的工作速度如此之慢,或者有没有其他方法/想法用sqlAlchemy进行BULK UPDATE(不是BULK UPSERT)?
下面是MysqL中的表:
CREATE TABLE `test` ( `id` int(11) unsigned NOT NULL,`value` int(11) DEFAULT NULL,PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
@H_301_14@和测试代码:
from sqlalchemy import create_engine,text import time driver = 'MysqL' host = 'host' user = 'user' password = 'password' database = 'database' url = "{}://{}:{}@{}/{}?charset=utf8".format(driver,user,password,host,database) engine = create_engine(url) engine.connect() engine.execute('TRUNCATE TABLE test') num_of_rows = 1000 rows = [] for i in xrange(0,num_of_rows): rows.append({'id': i,'value': i}) print '--------- test insert --------------' sql = ''' INSERT INTO test (id,value) VALUES (:id,:value) ''' start = time.time() engine.execute(text(sql),rows) end = time.time() print 'Cost {} seconds'.format(end - start) print '--------- test upsert --------------' for r in rows: r['value'] = r['id'] + 1 sql = ''' INSERT INTO test (id,:value) ON DUPLICATE KEY UPDATE value = VALUES(value) ''' start = time.time() engine.execute(text(sql),rows) end = time.time() print 'Cost {} seconds'.format(end - start) print '--------- test update --------------' for r in rows: r['value'] = r['id'] * 10 sql = ''' UPDATE test SET value = :value WHERE id = :id ''' start = time.time() engine.execute(text(sql),rows) end = time.time() print 'Cost {} seconds'.format(end - start)
@H_301_14@num_of_rows = 100时的输出:
--------- test insert -------------- Cost 0.568960905075 seconds --------- test upsert -------------- Cost 0.569655895233 seconds --------- test update -------------- Cost 20.0891299248 seconds
@H_301_14@num_of_rows = 1000时的输出:
--------- test insert -------------- Cost 0.807548999786 seconds --------- test upsert -------------- Cost 0.584554195404 seconds --------- test update -------------- Cost 206.199367046 seconds
@H_301_14@@R_301_457@服务器的网络延迟大约为500毫秒.
看起来像批量更新它一个接一个地发送和执行每个查询,而不是批处理?
提前致谢.
最佳答案
即使@R_301_457@服务器(如您的情况)具有非常糟糕的延迟,您也可以通过技巧加快批量更新操作.您可以使用stage-table非常快速地插入新数据,然后对目标表执行一次join-update,而不是直接更新表.这样做的另一个好处是可以大大减少必须发送到@R_301_457@的语句数量.
这如何与UPDATE一起使用?
假设您有一个表条目,并且您始终有新数据,但您只想更新已存储的数据.您创建目标表entries_stage的副本,其中只包含相关字段:
entries = Table('entries',Metadata,Column('id',Integer,autoincrement=True,primary_key=True),Column('value',Unicode(64),nullable=False),) entries_stage = Table('entries_stage',autoincrement=False,unique=True),)
@H_301_14@然后使用批量插入插入数据.如果您使用MysqL的多值插入语法(sqlAlchemy本身不支持,但可以毫无困难地构建),这可以进一步加速.
INSERT INTO enries_stage (`id`,`value`) VALUES (1,'string1'),(2,'string2'),(3,'string3'),...;
@H_301_14@最后,使用stage-table中的值更新destination-table的值,如下所示:
UPDATE entries e JOIN entries_stage es ON e.id = es.id SET e.value = es.value;
@H_301_14@然后你就完成了.
插入怎么样?
当然,这也可以加速插入.由于您已经在stage-table中拥有了数据,所以您需要做的就是发出INSERT INTO … SELECT语句,其中的数据还没有在destination-table中.
INSERT INTO entries (id,value) SELECT FROM entries_stage es LEFT JOIN entries e ON e.id = es.id HAVING e.id IS NULL;
@H_301_14@关于这一点的好处是你不必执行INSERT IGNORE,REPLACE或ON DUPLICATE KEY UPDATE,这将增加你的主键,即使它们什么也不做.