Openstack Nova(八)----Instance 创建(流水线)

前端之家收集整理的这篇文章主要介绍了Openstack Nova(八)----Instance 创建(流水线)前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

在前面的章节中, 学习了Nova的WSGI相关的服务器创建及路由的基本原理。现在看看Deploy中的流水线操作。

在api-paste.ini中, 可以看出Nova API有以下的流水线。

  1. [composite:openstack_compute_api_v2]
  2. use = call:nova.api.auth:pipeline_factory
  3. noauth = faultwrap sizelimit noauth ratelimit osapi_compute_app_v2
  4. keystone = faultwrap sizelimit authtoken keystonecontext ratelimit osapi_compute_app_v2
  5. keystone_nolimit = faultwrap sizelimit authtoken keystonecontext osapi_compute_app_v2

看过之前章节的就明白, 这里使用的是keystone这条流水线。下面一个一个来分析下。

faultwrap

  1. [filter:faultwrap]
  2. paste.filter_factory = nova.api.openstack:FaultWrapper.factory
  1. class FaultWrapper(wsgi.Middleware):
  2. """Calls the middleware stack,captures any exceptions into faults."""
  3.  
  4. @webob.dec.wsgify(RequestClass=wsgi.Request)
  5. def __call__(self,req):
  6. try:
  7. return req.get_response(self.application)
  8. except Exception as ex:
  9. LOG.exception(_("FaultWrapper: %s"),unicode(ex))
  10. return faults.Fault(webob.exc.HTTPInternalServerError())

可以看出, 这个非常简单, 只是获得一个response对象。

sizelimit

  1. [filter:sizelimit]
  2. paste.filter_factory = nova.api.sizelimit:RequestBodySizeLimiter.factory
  1. class RequestBodySizeLimiter(wsgi.Middleware):
  2. """Limit the size of incoming requests."""
  3.  
  4. def __init__(self,*args,**kwargs):
  5. super(RequestBodySizeLimiter,self).__init__(*args,**kwargs)
  6.  
  7. @webob.dec.wsgify(RequestClass=wsgi.Request)
  8. def __call__(self,req):
  9. if req.content_length > CONF.osapi_max_request_body_size:
  10. msg = _("Request is too large.")
  11. raise webob.exc.HTTPRequestEntityTooLarge(explanation=msg)
  12. if req.content_length is None and req.is_body_readable:
  13. limiter = LimitingReader(req.body_file,CONF.osapi_max_request_body_size)
  14. req.body_file = limiter
  15. return self.application

这个就像它的名字一样, 只是检查了REST请求中的内容大小。

authtoken

  1. [filter:authtoken]
  2. paste.filter_factory = keystoneclient.middleware.auth_token:filter_factory
  3. auth_host = 127.0.0.1
  4. auth_port = 35357
  5. auth_protocol = http
  6. admin_tenant_name = %SERVICE_TENANT_NAME%
  7. admin_user = %SERVICE_USER%
  8. admin_password = %SERVICE_PASSWORD%
  9. # signing_dir is configurable,but the default behavior of the authtoken
  10. # middleware should be sufficient. It will create a temporary directory
  11. # in the home directory for the user the nova process is running as.
  12. #signing_dir = /var/lib/nova/keystone-signing
  13. # Workaround for https://bugs.launchpad.net/nova/+bug/1154809
  14. auth_version = v2.0

在这里, 我没有去下载keystoneclient的代码。所以就没法去从代码的角度分析实际的操作。但是如果之前有看过keystone的文章。这里其实很清楚。就是从当前的REST请求中, 取出token, 然后发给keystone服务,再返回验证结果。

keystonecontext

  1. [filter:keystonecontext]
  2. paste.filter_factory = nova.api.auth:NovaKeystoneContext.factory
  1. class NovaKeystoneContext(wsgi.Middleware):
  2. """Make a request context from keystone headers."""
  3.  
  4. @webob.dec.wsgify(RequestClass=wsgi.Request)
  5. def __call__(self,req):
  6. user_id = req.headers.get('X_USER')
  7. user_id = req.headers.get('X_USER_ID',user_id)
  8. if user_id is None:
  9. LOG.debug("Neither X_USER_ID nor X_USER found in request")
  10. return webob.exc.HTTPUnauthorized()
  11.  
  12. roles = self._get_roles(req)
  13.  
  14. if 'X_TENANT_ID' in req.headers:
  15. # This is the new header since Keystone went to ID/Name
  16. project_id = req.headers['X_TENANT_ID']
  17. else:
  18. # This is for legacy compatibility
  19. project_id = req.headers['X_TENANT']
  20. project_name = req.headers.get('X_TENANT_NAME')
  21. user_name = req.headers.get('X_USER_NAME')
  22.  
  23. # Get the auth token
  24. auth_token = req.headers.get('X_AUTH_TOKEN',req.headers.get('X_STORAGE_TOKEN'))
  25.  
  26. # Build a context,including the auth_token...
  27. remote_address = req.remote_addr
  28. if CONF.use_forwarded_for:
  29. remote_address = req.headers.get('X-Forwarded-For',remote_address)
  30.  
  31. service_catalog = None
  32. if req.headers.get('X_SERVICE_CATALOG') is not None:
  33. try:
  34. catalog_header = req.headers.get('X_SERVICE_CATALOG')
  35. service_catalog = jsonutils.loads(catalog_header)
  36. except ValueError:
  37. raise webob.exc.HTTPInternalServerError(
  38. _('Invalid service catalog json.'))
  39.  
  40. ctx = context.RequestContext(user_id,project_id,user_name=user_name,project_name=project_name,roles=roles,auth_token=auth_token,remote_address=remote_address,service_catalog=service_catalog)
  41.  
  42. req.environ['nova.context'] = ctx
  43. return self.application
  44.  
  45. def _get_roles(self,req):
  46. """Get the list of roles."""
  47.  
  48. if 'X_ROLES' in req.headers:
  49. roles = req.headers.get('X_ROLES','')
  50. else:
  51. # Fallback to deprecated role header:
  52. roles = req.headers.get('X_ROLE','')
  53. if roles:
  54. LOG.warn(_("Sourcing roles from deprecated X-Role HTTP "
  55. "header"))
  56. return [r.strip() for r in roles.split(',')]

代码可以看出, 这段代码的目的就是从当前HTTP 头中取出相对于的上下文环境, 以方便后面的环节使用。

ratelimit

  1. [filter:ratelimit]
  2. paste.filter_factory = nova.api.openstack.compute.limits:RateLimitingMiddleware.factory

这是一个基本漏桶的限速模型。

  1. class RateLimitingMiddleware(base_wsgi.Middleware):
  2. """ Rate-limits requests passing through this middleware. All limit information is stored in memory for this implementation. """
  3.  
  4. def __init__(self,application,limits=None,limiter=None,**kwargs):
  5. """ Initialize new `RateLimitingMiddleware`,which wraps the given WSGI application and sets up the given limits. @param application: WSGI application to wrap @param limits: String describing limits @param limiter: String identifying class for representing limits Other parameters are passed to the constructor for the limiter. """
  6. base_wsgi.Middleware.__init__(self,application)
  7.  
  8. #因为使用factory生成,所以参数都会不存在。也就是说limiter和limits都是None.
  9. # Select the limiter class
  10. if limiter is None:
  11. limiter = Limiter
  12. else:
  13. limiter = importutils.import_class(limiter)
  14.  
  15. # Parse the limits,if any are provided
  16. if limits is not None:
  17. limits = limiter.parse_limits(limits)
  18. #这里会取DEFAULT_LIMITS
  19. self._limiter = limiter(limits or DEFAULT_LIMITS,req):
  20. """ Represents a single call through this middleware. We should record the request if we have a limit relevant to it. If no limit is relevant to the request,ignore it. If the request should be rate limited,return a fault telling the user they are over the limit and need to retry later. """
  21. verb = req.method
  22. url = req.url
  23. context = req.environ.get("nova.context")
  24.  
  25. if context:
  26. username = context.user_id
  27. else:
  28. username = None
  29. #根据当前的用户去检查速率
  30. delay,error = self._limiter.check_for_delay(verb,url,username)
  31. #如果delay存在,就超出了当前的速率。这里的delay可以理解为需要多久后能够发这种类型的请求
  32. if delay:
  33. msg = _("This request was rate-limited.")
  34. retry = time.time() + delay
  35. return wsgi.RateLimitFault(msg,error,retry)
  36.  
  37. req.environ["nova.limits"] = self._limiter.get_limits(username)
  38.  
  39. return self.application

再看看DEFAULT_LIMITS及limit的实现

  1. DEFAULT_LIMITS = [
  2. Limit("POST","*",".*",120,utils.TIME_UNITS['MINUTE']),Limit("POST","*/servers","^/servers",Limit("PUT",Limit("GET","*changes-since*",".*changes-since.*",Limit("DELETE","*/os-fping","^/os-fping",12,]
  3.  
  4. class Limit(object):
  5. """ Stores information about a limit for HTTP requests. """
  6.  
  7. UNITS = dict([(v,k) for k,v in utils.TIME_UNITS.items()])
  8.  
  9. def __init__(self,verb,uri,regex,value,unit):
  10. """ Initialize a new `Limit`. @param verb: HTTP verb (POST,PUT,etc.) @param uri: Human-readable URI @param regex: Regular expression format for this limit @param value: Integer number of requests which can be made @param unit: Unit of measure for the value parameter """
  11.  
  12. #这里的参数都比较显示, 其中value表示单位时间内可以通过的请求数
  13. #unit表示单位, 最终都会转化为秒, 比如说120每分钟, 实际会变成120每60秒
  14. self.verb = verb
  15. self.uri = uri
  16. self.regex = regex
  17. self.value = int(value)
  18. self.unit = unit
  19. self.unit_string = self.display_unit().lower()
  20. self.remaining = int(value)
  21.  
  22. if value <= 0:
  23. raise ValueError("Limit value must be > 0")
  24.  
  25. self.last_request = None
  26. self.next_request = None
  27. #水位值(water_level), 表示当前已使用了多少
  28. self.water_level = 0
  29. #容量(capacity)是最大容量, 这里简单的和时间单位相等
  30. self.capacity = self.unit
  31. #request_value相当于一次请求占总容量的多少。
  32. #比如上面的120次每60秒,因为容量是60
  33. self.request_value = float(self.capacity) / float(self.value)
  34. msg = _("Only %(value)s %(verb)s request(s) can be "
  35. "made to %(uri)s every %(unit_string)s.")
  36. self.error_message = msg % self.__dict__
  37.  
  38. def __call__(self,url):
  39. """ Represents a call to this limit from a relevant request. @param verb: string http verb (POST,GET,etc.) @param url: string URL """
  40. #基于HTTP的请求类型及URL的正则式进行匹配
  41. if self.verb != verb or not re.match(self.regex,url):
  42. return
  43.  
  44. now = self._get_time()
  45.  
  46. if self.last_request is None:
  47. self.last_request = now
  48.  
  49. leak_value = now - self.last_request
  50. #从上次调用到这次间隔的时间,这样可以确定当前的水位是多少
  51. self.water_level -= leak_value
  52. self.water_level = max(self.water_level,0)
  53. #每次调用, 对应的水位需要上升
  54. self.water_level += self.request_value
  55. #水位和容量的差值
  56. difference = self.water_level - self.capacity
  57.  
  58. self.last_request = now
  59. #如果差值大于0,也就是说要再经过差值这么多时间, 然后才有可能有容量, 这也是上面delay变量定义的由来
  60. if difference > 0:
  61. self.water_level -= self.request_value
  62. self.next_request = now + difference
  63. return difference
  64.  
  65. cap = self.capacity
  66. water = self.water_level
  67. val = self.value
  68.  
  69. self.remaining = math.floor(((cap - water) / cap) * val)
  70. self.next_request = now
  71.  
  72. def _get_time(self):
  73. """Retrieve the current time. Broken out for testability."""
  74. return time.time()
  75.  
  76. def display_unit(self):
  77. """Display the string name of the unit."""
  78. return self.UNITS.get(self.unit,"UNKNOWN")
  79.  
  80. def display(self):
  81. """Return a useful representation of this class."""
  82. return {
  83. "verb": self.verb,"URI": self.uri,"regex": self.regex,"value": self.value,"remaining": int(self.remaining),"unit": self.display_unit(),"resetTime": int(self.next_request or self._get_time()),}

这里还有一个相当于limit的管理类

  1. class Limiter(object):
  2. """ Rate-limit checking class which handles limits in memory. """
  3.  
  4. def __init__(self,limits,**kwargs):
  5. """ Initialize the new `Limiter`. @param limits: List of `Limit` objects """
  6. self.limits = copy.deepcopy(limits)
  7. #这里很关键,可以看出在check_for_delay中, 参数中带了用户名,但是在所有的配置中,是没有用户名的,就是通过defaultdict来取值的。
  8. self.levels = collections.defaultdict(lambda: copy.deepcopy(limits))
  9.  
  10. # Pick up any per-user limit information
  11. for key,value in kwargs.items():
  12. if key.startswith(LIMITS_PREFIX):
  13. username = key[len(LIMITS_PREFIX):]
  14. self.levels[username] = self.parse_limits(value)
  15.  
  16. def get_limits(self,username=None):
  17. """ Return the limits for a given user. """
  18. return [limit.display() for limit in self.levels[username]]
  19.  
  20. def check_for_delay(self,username=None):
  21. """ Check the given verb/user/user triplet for limit. @return: Tuple of delay (in seconds) and error message (or None,None) """
  22. delays = []
  23. #取出当前的用户所对应的limit, 因为这里没有基于用户名的配置,所以取的是默认参数。也就是DEFAULT_LIMITS
  24. for limit in self.levels[username]:
  25. delay = limit(verb,url)
  26. if delay:
  27. delays.append((delay,limit.error_message))
  28.  
  29. if delays:
  30. delays.sort()
  31. return delays[0]
  32.  
  33. return None,None
  34.  
  35. # Note: This method gets called before the class is instantiated,
  36. # so this must be either a static method or a class method. It is
  37. # used to develop a list of limits to Feed to the constructor. We
  38. # put this in the class so that subclasses can override the
  39. # default limit parsing.
  40. @staticmethod
  41. def parse_limits(limits):
  42. """ Convert a string into a list of Limit instances. This implementation expects a semicolon-separated sequence of parenthesized groups,where each group contains a comma-separated sequence consisting of HTTP method,user-readable URI,a URI reg-exp,an integer number of requests which can be made,and a unit of measure. Valid values for the latter are "SECOND","MINUTE","HOUR",and "DAY". @return: List of Limit instances. """
  43.  
  44. # Handle empty limit strings
  45. limits = limits.strip()
  46. if not limits:
  47. return []
  48.  
  49. # Split up the limits by semicolon
  50. result = []
  51. for group in limits.split(';'):
  52. group = group.strip()
  53. if group[:1] != '(' or group[-1:] != ')':
  54. raise ValueError("Limit rules must be surrounded by "
  55. "parentheses")
  56. group = group[1:-1]
  57.  
  58. # Extract the Limit arguments
  59. args = [a.strip() for a in group.split(',')]
  60. if len(args) != 5:
  61. raise ValueError("Limit rules must contain the following "
  62. "arguments: verb,unit")
  63.  
  64. # Pull out the arguments
  65. verb,unit = args
  66.  
  67. # Upper-case the verb
  68. verb = verb.upper()
  69.  
  70. # Convert value--raises ValueError if it's not integer
  71. value = int(value)
  72.  
  73. # Convert unit
  74. unit = unit.upper()
  75. if unit not in utils.TIME_UNITS:
  76. raise ValueError("Invalid units specified")
  77. unit = utils.TIME_UNITS[unit]
  78.  
  79. # Build a limit
  80. result.append(Limit(verb,unit))
  81.  
  82. return result

至此, 限速的部分基于结束, 虽然代码看起来有点多, 但原理其实很简单,就是基于HTTP的请求类型及URL做正则式的匹配, 然后用漏桶算法来做限速。

osapi_compute_app_v2

  1. [app:osapi_compute_app_v2]
  2. paste.app_factory = nova.api.openstack.compute:APIRouter.factory

这是流水线的最后一级,也是我们的APP。从这里看出,它只是简单的生成一些URL的路由信息。
这里有一个不同的地方,mapper.resource这是一个用来生成RESTful风格的路由。

  1. class APIRouter(nova.api.openstack.APIRouter):
  2. """ Routes requests on the OpenStack API to the appropriate controller and method. """
  3. ExtensionManager = extensions.ExtensionManager
  4.  
  5. def _setup_routes(self,mapper,ext_mgr,init_only):
  6. if init_only is None or 'versions' in init_only:
  7. self.resources['versions'] = versions.create_resource()
  8. mapper.connect("versions","/",controller=self.resources['versions'],action='show',conditions={"method": ['GET']})
  9.  
  10. mapper.redirect("","/")
  11.  
  12. if init_only is None or 'consoles' in init_only:
  13. self.resources['consoles'] = consoles.create_resource()
  14. mapper.resource("console","consoles",controller=self.resources['consoles'],parent_resource=dict(member_name='server',collection_name='servers'))
  15.  
  16. if init_only is None or 'consoles' in init_only or \
  17. 'servers' in init_only or ips in init_only:
  18. self.resources['servers'] = servers.create_resource(ext_mgr)
  19. #生成servers相关的路由,也就是我们用来创建Instance的路由
  20. mapper.resource("server","servers",controller=self.resources['servers'],collection={'detail': 'GET'},member={'action': 'POST'})
  21.  
  22. if init_only is None or 'ips' in init_only:
  23. self.resources['ips'] = ips.create_resource()
  24. mapper.resource("ip","ips",controller=self.resources['ips'],collection_name='servers'))
  25.  
  26. if init_only is None or 'images' in init_only:
  27. self.resources['images'] = images.create_resource()
  28. mapper.resource("image","images",controller=self.resources['images'],collection={'detail': 'GET'})
  29.  
  30. if init_only is None or 'limits' in init_only:
  31. self.resources['limits'] = limits.create_resource()
  32. mapper.resource("limit","limits",controller=self.resources['limits'])
  33.  
  34. if init_only is None or 'flavors' in init_only:
  35. self.resources['flavors'] = flavors.create_resource()
  36. mapper.resource("flavor","flavors",controller=self.resources['flavors'],member={'action': 'POST'})
  37.  
  38. if init_only is None or 'image_Metadata' in init_only:
  39. self.resources['image_Metadata'] = image_Metadata.create_resource()
  40. image_Metadata_controller = self.resources['image_Metadata']
  41.  
  42. mapper.resource("image_Meta","Metadata",controller=image_Metadata_controller,parent_resource=dict(member_name='image',collection_name='images'))
  43.  
  44. mapper.connect("Metadata","/{project_id}/images/{image_id}/Metadata",action='update_all',conditions={"method": ['PUT']})
  45.  
  46. if init_only is None or 'server_Metadata' in init_only:
  47. self.resources['server_Metadata'] = \
  48. server_Metadata.create_resource()
  49. server_Metadata_controller = self.resources['server_Metadata']
  50.  
  51. mapper.resource("server_Meta",controller=server_Metadata_controller,collection_name='servers'))
  52.  
  53. mapper.connect("Metadata","/{project_id}/servers/{server_id}/Metadata",conditions={"method": ['PUT']})

但是这里还有一个问题, 创建一个Instance的URL如下:
/v2/​{tenant_id}​/servers
可以看出,里面还有tenant_id这个值没有地方解析。
回到APIRouter的父类nova.api.openstack.APIRouter,

  1. def __init__(self,ext_mgr=None,init_only=None):
  2. if ext_mgr is None:
  3. if self.ExtensionManager:
  4. ext_mgr = self.ExtensionManager()
  5. else:
  6. raise Exception(_("Must specify an ExtensionManager class"))
  7. #mapper是ProjectMapper的对象, 而ProjectMapper是自定义的。
  8. mapper = ProjectMapper()
  9. self.resources = {}
  10. self._setup_routes(mapper,init_only)
  11. self._setup_ext_routes(mapper,init_only)
  12. self._setup_extensions(ext_mgr)
  13. super(APIRouter,self).__init__(mapper)

再看ProjectMapper

  1. class ProjectMapper(APIMapper):
  2. def resource(self,member_name,collection_name,**kwargs):
  3. if 'parent_resource' not in kwargs:
  4. #在URL的路由中,增加前缀{project_id}
  5. kwargs['path_prefix'] = '{project_id}/'
  6. else:
  7. parent_resource = kwargs['parent_resource']
  8. p_collection = parent_resource['collection_name']
  9. p_member = parent_resource['member_name']
  10. kwargs['path_prefix'] = '{project_id}/%s/:%s_id' % (p_collection,p_member)
  11. routes.Mapper.resource(self,**kwargs)

至此, 路由的信息基本全了,这里还有一些扩展路由,可以自己去看。

下一章中, 但要Controller的控制里面。

猜你在找的设计模式相关文章