如何在客户端使用Java读取gRPC中的元数据

前端之家收集整理的这篇文章主要介绍了如何在客户端使用Java读取gRPC中的元数据前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
我正在使用 Java和Protoc 3.0编译器,我的proto文件在下面提到.
https://github.com/openconfig/public/blob/master/release/models/rpc/openconfig-rpc-api.yang
  1. Syntax = "proto3";
  2.  
  3. package Telemetry;
  4.  
  5. // Interface exported by Agent
  6. service OpenConfigTelemetry {
  7. // Request an inline subscription for data at the specified path.
  8. // The device should send telemetry data back on the same
  9. // connection as the subscription request.
  10. rpc telemetrySubscribe(SubscriptionRequest) returns (stream OpenConfigData) {}
  11.  
  12. // Terminates and removes an exisiting telemetry subscription
  13. rpc cancelTelemetrySubscription(CancelSubscriptionRequest) returns (CancelSubscriptionReply) {}
  14.  
  15. // Get the list of current telemetry subscriptions from the
  16. // target. This command returns a list of existing subscriptions
  17. // not including those that are established via configuration.
  18. rpc getTelemetrySubscriptions(GetSubscriptionsRequest) returns (GetSubscriptionsReply) {}
  19.  
  20. // Get Telemetry Agent Operational States
  21. rpc getTelemetryOperationalState(GetOperationalStateRequest) returns (GetOperationalStateReply) {}
  22.  
  23. // Return the set of data encodings supported by the device for
  24. // telemetry data
  25. rpc getDataEncodings(DataEncodingRequest) returns (DataEncodingReply) {}
  26. }
  27.  
  28. // Message sent for a telemetry subscription request
  29. message SubscriptionRequest {
  30. // Data associated with a telemetry subscription
  31. SubscriptionInput input = 1;
  32.  
  33. // List of data models paths and filters
  34. // which are used in a telemetry operation.
  35. repeated Path path_list = 2;
  36.  
  37. // The below configuration is not defined in Openconfig RPC.
  38. // It is a proposed extension to configure additional
  39. // subscription request features.
  40. SubscriptionAdditionalConfig additional_config = 3;
  41. }
  42.  
  43. // Data associated with a telemetry subscription
  44. message SubscriptionInput {
  45. // List of optional collector endpoints to send data for
  46. // this subscription.
  47. // If no collector destinations are specified,the collector
  48. // destination is assumed to be the requester on the rpc channel.
  49. repeated Collector collector_list = 1;
  50. }
  51.  
  52. // Collector endpoints to send data specified as an ip+port combination.
  53. message Collector {
  54. // IP address of collector endpoint
  55. string address = 1;
  56.  
  57. // Transport protocol port number for the collector destination.
  58. uint32 port = 2;
  59. }
  60.  
  61. // Data model path
  62. message Path {
  63. // Data model path of interest
  64. // Path specification for elements of OpenConfig data models
  65. string path = 1;
  66.  
  67. // Regular expression to be used in filtering state leaves
  68. string filter = 2;
  69.  
  70. // If this is set to true,the target device will only send
  71. // updates to the collector upon a change in data value
  72. bool suppress_unchanged = 3;
  73.  
  74. // Maximum time in ms the target device may go without sending
  75. // a message to the collector. If this time expires with
  76. // suppress-unchanged set,the target device must send an update
  77. // message regardless if the data values have changed.
  78. uint32 max_silent_interval = 4;
  79.  
  80. // Time in ms between collection and transmission of the
  81. // specified data to the collector platform. The target device
  82. // will sample the corresponding data (e.g,. a counter) and
  83. // immediately send to the collector destination.
  84. //
  85. // If sample-frequency is set to 0,then the network device
  86. // must emit an update upon every datum change.
  87. uint32 sample_frequency = 5;
  88. }
  89.  
  90. // Configure subscription request additional features.
  91. message SubscriptionAdditionalConfig {
  92. // limit the number of records sent in the stream
  93. int32 limit_records = 1;
  94.  
  95. // limit the time the stream remains open
  96. int32 limit_time_seconds = 2;
  97. }
  98.  
  99. // Reply to inline subscription for data at the specified path is done in
  100. // two-folds.
  101. // 1. Reply data message sent out using out-of-band channel.
  102. // 2. Telemetry data send back on the same connection as the
  103. // subscription request.
  104.  
  105. // 1. Reply data message sent out using out-of-band channel.
  106. message SubscriptionReply {
  107. // Response message to a telemetry subscription creation or
  108. // get request.
  109. SubscriptionResponse response = 1;
  110.  
  111. // List of data models paths and filters
  112. // which are used in a telemetry operation.
  113. repeated Path path_list = 2;
  114. }
  115.  
  116. // Response message to a telemetry subscription creation or get request.
  117. message SubscriptionResponse {
  118. // Unique id for the subscription on the device. This is
  119. // generated by the device and returned in a subscription
  120. // request or when listing existing subscriptions
  121. uint32 subscription_id = 1;
  122. }
  123.  
  124. // 2. Telemetry data send back on the same connection as the
  125. // subscription request.
  126. message OpenConfigData {
  127. // router name:export IP address
  128. string system_id = 1;
  129.  
  130. // line card / RE (slot number)
  131. uint32 component_id = 2;
  132.  
  133. // PFE (if applicable)
  134. uint32 sub_component_id = 3;
  135.  
  136. // Path specification for elements of OpenConfig data models
  137. string path = 4;
  138.  
  139. // Sequence number,monotonically increasing for each
  140. // system_id,component_id,sub_component_id + path.
  141. uint64 sequence_number = 5;
  142.  
  143. // timestamp (milliseconds since epoch)
  144. uint64 timestamp = 6;
  145.  
  146. // List of key-value pairs
  147. repeated KeyValue kv = 7;
  148. }
  149.  
  150. // Simple Key-value,where value could be one of scalar types
  151. message KeyValue {
  152. // Key
  153. string key = 1;
  154.  
  155. // One of possible values
  156. oneof value {
  157. double double_value = 5;
  158. int64 int_value = 6;
  159. uint64 uint_value = 7;
  160. sint64 sint_value = 8;
  161. bool bool_value = 9;
  162. string str_value = 10;
  163. bytes bytes_value = 11;
  164. }
  165. }
  166.  
  167. // Message sent for a telemetry subscription cancellation request
  168. message CancelSubscriptionRequest {
  169. // Subscription identifier as returned by the device when
  170. // subscription was requested
  171. uint32 subscription_id = 1;
  172. }
  173.  
  174. // Reply to telemetry subscription cancellation request
  175. message CancelSubscriptionReply {
  176. // Return code
  177. ReturnCode code = 1;
  178.  
  179. // Return code string
  180. string code_str = 2;
  181. };
  182.  
  183. // Result of the operation
  184. enum ReturnCode {
  185. SUCCESS = 0;
  186. NO_SUBSCRIPTION_ENTRY = 1;
  187. UNKNOWN_ERROR = 2;
  188. }
  189.  
  190. // Message sent for a telemetry get request
  191. message GetSubscriptionsRequest {
  192. // Subscription identifier as returned by the device when
  193. // subscription was requested
  194. // --- or ---
  195. // 0xFFFFFFFF for all subscription identifiers
  196. uint32 subscription_id = 1;
  197. }
  198.  
  199. // Reply to telemetry subscription get request
  200. message GetSubscriptionsReply {
  201. // List of current telemetry subscriptions
  202. repeated SubscriptionReply subscription_list = 1;
  203. }
  204.  
  205. // Message sent for telemetry agent operational states request
  206. message GetOperationalStateRequest {
  207. // Per-subscription_id level operational state can be requested.
  208. //
  209. // Subscription identifier as returned by the device when
  210. // subscription was requested
  211. // --- or ---
  212. // 0xFFFFFFFF for all subscription identifiers including agent-level
  213. // operational stats
  214. // --- or ---
  215. // If subscription_id is not present then sent only agent-level
  216. // operational stats
  217. uint32 subscription_id = 1;
  218.  
  219. // Control verbosity of the output
  220. VerbosityLevel verbosity = 2;
  221. }
  222.  
  223. // Verbosity Level
  224. enum VerbosityLevel {
  225. DETAIL = 0;
  226. TERSE = 1;
  227. BRIEF = 2;
  228. }
  229.  
  230. // Reply to telemetry agent operational states request
  231. message GetOperationalStateReply {
  232. // List of key-value pairs where
  233. // key = operational state definition
  234. // value = operational state value
  235. repeated KeyValue kv = 1;
  236. }
  237.  
  238. // Message sent for a data encoding request
  239. message DataEncodingRequest {
  240. }
  241.  
  242. // Reply to data encodings supported request
  243. message DataEncodingReply {
  244. repeated EncodingType encoding_list = 1;
  245. }
  246.  
  247. // Encoding Type Supported
  248. enum EncodingType {
  249. UNDEFINED = 0;
  250. XML = 1;
  251. JSON_IETF = 2;
  252. PROTO3 = 3;
  253. }

为了进行服务调用(rpc TelemetrySubscribe),我首先需要读取具有订阅ID的头,然后开始阅读消息.现在,使用Java我能够连接服务,我确实介绍了拦截器但是当我打印/检索头时它是null.我的调用拦截代码如下,

  1. ClientInterceptor interceptor = new HeaderClientInterceptor();
  2. originChannel = OkHttpChannelBuilder.forAddress(host,port)
  3. .usePlaintext(true)
  4. .build();
  5. Channel channel = ClientInterceptors.intercept(originChannel,interceptor);
  6. telemetryStub = OpenConfigTelemetryGrpc.newStub(channel);

这是读取元数据的拦截代码.

  1. @Override
  2. public <ReqT,RespT> ClientCall<ReqT,RespT> interceptCall(MethodDescriptor<ReqT,RespT> method,CallOptions callOptions,Channel next) {
  3. return new SimpleForwardingClientCall<ReqT,RespT>(next.newCall(method,callOptions)) {
  4.  
  5. @Override
  6. public void start(Listener<RespT> responseListener,Metadata headers) {
  7.  
  8. super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
  9. @Override
  10. public void onHeaders(Metadata headers) {
  11.  
  12. Key<String> CUSTOM_HEADER_KEY = Metadata.Key.of("responseKEY",Metadata.ASCII_STRING_MARSHALLER);
  13.  
  14. System.out.println("Contains Key?? "+headers.containsKey(CUSTOM_HEADER_KEY));

想知道有没有其他方法来读取元数据或第一个有订阅ID的消息?所有我需要阅读第一条有订阅ID的消息,并将相同的订阅ID返回给服务器以便流可以启动我使用相同的原型文件使用相同的Python代码,它通过下面的代码提供与服务器通信仅供参考:

  1. sub_req = SubscribeRequestMsg("host",port)
  2. data_itr = stub.telemetrySubscribe(sub_req,_TIMEOUT_SECONDS)
  3. Metadata = data_itr.initial_Metadata()
  4.  
  5. if Metadata[0][0] == "responseKey":
  6. Metainfo = Metadata[0][1]
  7. print Metainfo
  8.  
  9. subreply = agent_pb2.SubscriptionReply()
  10. subreply.SetInParent()
  11. google.protobuf.text_format.Merge(Metainfo,subreply)
  12.  
  13. if subreply.response.subscription_id:
  14. SUB_ID = subreply.response.subscription_id

从上面的python代码我可以轻松检索元数据对象,不知道如何使用Java检索它?

在阅读MetaData之后,我得到的是:元数据({content-type = [application / grpc],grpc-encoding = [identity],grpc-accept-encoding = [identity,deflate,gzip]})

但我知道从元数据到它还有一条线,也就是说

  1. response {
  2. subscription_id: 2
  3. }

如何从Header中提取包含订阅ID的最后一个响应.我确实尝试了很多选项,我迷失在这里.

解决方法

您使用的方法是请求元数据,而不是响应元数据:
  1. public void start(Listener<RespT> responseListener,Metadata headers) {

对于响应元数据,您将需要ClientCall.Listener并等待onHeaders回调:

  1. public void onHeaders(Metadata headers)

我觉得你提到的元数据的使用似乎很奇怪.元数据通常用于其他错误详细信息或不特定于RPC方法的交叉功能(如身份验证,跟踪等).

猜你在找的Java相关文章