
今天在写zabbix storm job监控脚本的时候用到了python的redis模块,之前也有用过,但是没有过多的了解,今天看了下相关的api和源码,看到有ConnectionPool的实现,这里简单说下。



  1. redis.StrictRedis Implementation of the Redis protocol.This abstract class provides a Python interface to all Redis commands and an 
  2. implementation of the Redis protocol.Connection and Pipeline derive from this, implementing how the commands are sent and received to the Redis server


1 2  r=redis.StrictRedis(host=xxxx, port=xxxx, db=xxxx)  r.xxxx()


  1. r=redis.StrictRedis(host=xxxx, port=xxxx, db=xxxx)
  2. r.xxxx()




1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class  StrictRedis( object ): ........      def  __init__( self , host = 'localhost' , port = 6379 ,                   db = 0 , password = None , socket_timeout = None ,                   socket_connect_timeout = None ,                   socket_keepalive = None , socket_keepalive_options = None ,                   connection_pool = None , unix_socket_path = None ,                   encoding = 'utf-8' , encoding_errors = 'strict' ,                   charset = None , errors = None ,                   decode_responses = False , retry_on_timeout = False ,                   ssl = False , ssl_keyfile = None , ssl_certfile = None ,                   ssl_cert_reqs = None , ssl_ca_certs = None ):           if  not  connection_pool:               ..........                connection_pool  =  ConnectionPool( * * kwargs)           self .connection_pool  =  connection_pool


1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17     # COMMAND EXECUTION AND PROTOCOL PARSING      def  execute_command( self * args,  * * options):          "Execute a command and return a parsed response"          pool  =  self .connection_pool          command_name  =  args[ 0 ]          connection  =  pool.get_connection(command_name,  * * options)   #调用ConnectionPool.get_connection方法获取一个连接          try :              connection.send_command( * args)   #命令执行,这里为Connection.send_command              return  self .parse_response(connection, command_name,  * * options)          except  (ConnectionError, TimeoutError) as e:              connection.disconnect()              if  not  connection.retry_on_timeout  and  isinstance (e, TimeoutError):                  raise              connection.send_command( * args)                return  self .parse_response(connection, command_name,  * * options)          finally :              pool.release(connection)   #调用ConnectionPool.release释放连接


1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47       class  ConnectionPool( object ):           ...........      def  __init__( self , connection_class = Connection, max_connections = None ,                   * * connection_kwargs):    #类初始化时调用构造函数          max_connections  =  max_connections  or  2  * *  31          if  not  isinstance (max_connections, ( int long ))  or  max_connections <  0 :   #判断输入的max_connections是否合法              raise  ValueError( '"max_connections" must be a positive integer' )          self .connection_class  =  connection_class   #设置对应的参数          self .connection_kwargs  =  connection_kwargs          self .max_connections  =  max_connections          self .reset()   #初始化ConnectionPool 时的reset操作      def  reset( self ):          self .pid  =  os.getpid()          self ._created_connections  =  0   #已经创建的连接的计数器          self ._available_connections  =  []    #声明一个空的数组,用来存放可用的连接          self ._in_use_connections  =  set ()   #声明一个空的集合,用来存放已经在用的连接          self ._check_lock  =  threading.Lock() .......      def  get_connection( self , command_name,  * keys,  * * options):   #在连接池中获取连接的方法          "Get a connection from the pool"          self ._checkpid()          try :              connection  =  self ._available_connections.pop()   #获取并删除代表连接的元素,在***次获取connectiong时,因为_available_connections是一个空的数组,              会直接调用make_connection方法          except  IndexError:              connection  =  self .make_connection()          self ._in_use_connections.add(connection)    #向代表正在使用的连接的集合中添加元素          return  connection         def  make_connection( self ):  #在_available_connections数组为空时获取连接调用的方法          "Create a new connection"          if  self ._created_connections > =  self .max_connections:    #判断创建的连接是否已经达到***限制,max_connections可以通过参数初始化              raise  ConnectionError( "Too many connections" )          self ._created_connections  + =  1    #把代表已经创建的连接的数值+1          return  self .connection_class( * * self .connection_kwargs)      #返回有效的连接,默认为Connection(**self.connection_kwargs)      def  release( self , connection):   #释放连接,链接并没有断开,只是存在链接池中          "Releases the connection back to the pool"          self ._checkpid()          if  connection.pid ! =  self .pid:              return          self ._in_use_connections.remove(connection)    #从集合中删除元素          self ._available_connections.append(connection)  #并添加到_available_connections 的数组中      def  disconnect( self ):  #断开所有连接池中的链接          "Disconnects all connections in the pool"          all_conns  =  chain( self ._available_connections,                            self ._in_use_connections)          for  connection  in  all_conns:              connection.disconnect()

execute_command最终调用的是Connection.send_command方法,关闭链接为 Connection.disconnect方法,而Connection类的实现:

1 2 3 4 5 6 7 class  Connection( object ):      "Manages TCP communication to and from a Redis server"      def  __del__( self ):    #对象删除时的操作,调用disconnect释放连接          try :              self .disconnect()          except  Exception:              pass


1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27      def  _connect( self ):          err  =  None          for  res  in  socket.getaddrinfo( self .host,  self .port,  0 ,                                        socket.SOCK_STREAM):              family, socktype, proto, canonname, socket_address  =  res              sock  =  None              try :                  sock  =  socket.socket(family, socktype, proto)                  # TCP_NODELAY                  sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY,  1 )                  # TCP_KEEPALIVE                  if  self .socket_keepalive:    #构造函数中默认 socket_keepalive=False,因此这里默认为短连接                      sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE,  1 )                      for  k, v  in  iteritems( self .socket_keepalive_options):                          sock.setsockopt(socket.SOL_TCP, k, v)                  # set the socket_connect_timeout before we connect                  sock.settimeout( self .socket_connect_timeout)   #构造函数中默认socket_connect_timeout=None,即连接为blocking的模式                  # connect                  sock.connect(socket_address)                  # set the socket_timeout now that we're connected                  sock.settimeout( self .socket_timeout)   #构造函数中默认socket_timeout=None                  return  sock              except  socket.error as _:                  err  =  _                  if  sock  is  not  None :                      sock.close() .....


1 2 3 4 5 6 7 8 9 10 11      def  disconnect( self ):          "Disconnects from the Redis server"          self ._parser.on_disconnect()          if  self ._sock  is  None :              return          try :              self ._sock.shutdown(socket.SHUT_RDWR)   #先shutdown再close              self ._sock.close()          except  socket.error:              pass          self ._sock  =  None    





