我使用的是Rails 5和
Ruby 2.4.我如何知道,或者可以通过查看以下内容来判断是否有多个线程同时运行?
pool = Concurrent::FixedThreadPool.new(1) promises = links.map do |link| Concurrent::Promise.execute(executor: pool) do result = process_link(link) if result if result.kind_of?(Array) result.each do |my_obj| my_obj.update_attributes({ :a => a }) records_processed = records_processed + my_obj.matches.count end else records_processed = records_processed + result.matches.count result.update_attributes({ :a => a }) end end end end promises.map(&:wait).map(&:value!)
因为我把我的池设置为“1”,我的假设是没有任何东西同时运行,但是我不断得到这个错误…
Error during processing: (ActiveRecord::ConnectionTimeoutError) could not obtain a connection from the pool within 5.000 seconds (waited 5.002 seconds); all pooled connections were in use /Users/nataliab/.rvm/gems/ruby-2.4.0@global/gems/activerecord-5.0.1/lib/active_record/connection_adapters/abstract/connection_pool.rb:202:in `block in wait_poll' /Users/nataliab/.rvm/gems/ruby-2.4.0@global/gems/activerecord-5.0.1/lib/active_record/connection_adapters/abstract/connection_pool.rb:193:in `loop' /Users/nataliab/.rvm/gems/ruby-2.4.0@global/gems/activerecord-5.0.1/lib/active_record/connection_adapters/abstract/connection_pool.rb:193:in `wait_poll' /Users/nataliab/.rvm/gems/ruby-2.4.0@global/gems/activerecord-5.0.1/lib/active_record/connection_adapters/abstract/connection_pool.rb:154:in `internal_poll' /Users/nataliab/.rvm/gems/ruby-2.4.0@global/gems/activerecord-5.0.1/lib/active_record/connection_adapters/abstract/connection_pool.rb:278:in `internal_poll' /Users/nataliab/.rvm/gems/ruby-2.4.0@global/gems/activerecord-5.0.1/lib/active_record/connection_adapters/abstract/connection_pool.rb:148:in `block in poll' /Users/nataliab/.rvm/rubies/ruby-2.4.0/lib/ruby/2.4.0/monitor.rb:214:in `mon_synchronize' /Users/nataliab/.rvm/gems/ruby-2.4.0@global/gems/activerecord-5.0.1/lib/active_record/connection_adapters/abstract/connection_pool.rb:158:in `synchronize' /Users/nataliab/.rvm/gems/ruby-2.4.0@global/gems/activerecord-5.0.1/lib/active_record/connection_adapters/abstract/connection_pool.rb:148:in `poll' /Users/nataliab/.rvm/gems/ruby-2.4.0@global/gems/activerecord-5.0.1/lib/active_record/connection_adapters/abstract/connection_pool.rb:717:in `acquire_connection' /Users/nataliab/.rvm/gems/ruby-2.4.0@global/gems/activerecord-5.0.1/lib/active_record/connection_adapters/abstract/connection_pool.rb:490:in `checkout' /Users/nataliab/.rvm/gems/ruby-2.4.0@global/gems/activerecord-5.0.1/lib/active_record/connection_adapters/abstract/connection_pool.rb:364:in `connection' /Users/nataliab/.rvm/gems/ruby-2.4.0@global/gems/activerecord-5.0.1/lib/active_record/connection_adapters/abstract/connection_pool.rb:883:in `retrieve_connection' /Users/nataliab/.rvm/gems/ruby-2.4.0@global/gems/activerecord-5.0.1/lib/active_record/connection_handling.rb:128:in `retrieve_connection'
我没有得到上面的错误,如果我修改我的代码运行,我是积极的,没有并发的…
links.each do |link| result = process_link(link) if result if result.kind_of?(Array) result.each do |race| my_obj.update_attributes({ :a => a }) records_processed = records_processed + my_obj.matches.count end else records_processed = records_processed + result.matches.count result.update_attributes({ :a => a }) end end end
编辑:这是我的开发环境的数据库配置.还要注意,所有这些都在rails控制台中运行.
development: adapter: postgresql encoding: utf8 database: sims username: postgres password: password pool: 5 timeout: 15000 host: 127.0.0.1
解决方法
您假设多个线程必须同时运行,因为连接池被耗尽是不正确的.只是因为一个连接仍然从连接池中“签出”并不意味着一个查询当前正在一个线程中被检出的连接中执行,这只是意味着该线程的连接没有被重新检入.只要线程没有被明确地终止,该线程可能处于空闲状态,但是仍然保持连接到连接池的连接.
由于ActiveRecord连接是线程本地的,您可以通过在多个线程上运行ActiveRecord查询来耗尽连接池,就像您在这种情况下所做的那样. (每次调用Concurrent :: FixedThreadPool.new(1)时,都会创建一个新的线程.)即使您一次只在单个线程上运行查询,默认情况下,每个线程上的连接仍将保持打开状态直到它们被终止.
为了避免这种情况,您可以在使用它们之后手动检入连接,或者确保线程被终止(被杀死),以便它们的连接可以由池恢复(收回).
>要手动检入连接,请参阅ConnectionPool
documentation的选项.最简单的方法是将ActiveRecord代码包装在with_connection块中:
Concurrent::Promise.execute(executor: pool) do ActiveRecord::Base.connection_pool.with_connection do # update_attributes,etc end end
>要确保所有线程都被终止,请在完成使用线程池之后,在线程池上调用#shutdown
,然后调用#wait_for_termination
:
values = promises.map(&:value!) pool.shutdown pool.wait_for_termination