我正在使用
Java和Protoc 3.0编译器,我的proto文件在下面提到.
https://github.com/openconfig/public/blob/master/release/models/rpc/openconfig-rpc-api.yang
https://github.com/openconfig/public/blob/master/release/models/rpc/openconfig-rpc-api.yang
- Syntax = "proto3";
- package Telemetry;
- // Interface exported by Agent
- service OpenConfigTelemetry {
- // Request an inline subscription for data at the specified path.
- // The device should send telemetry data back on the same
- // connection as the subscription request.
- rpc telemetrySubscribe(SubscriptionRequest) returns (stream OpenConfigData) {}
- // Terminates and removes an exisiting telemetry subscription
- rpc cancelTelemetrySubscription(CancelSubscriptionRequest) returns (CancelSubscriptionReply) {}
- // Get the list of current telemetry subscriptions from the
- // target. This command returns a list of existing subscriptions
- // not including those that are established via configuration.
- rpc getTelemetrySubscriptions(GetSubscriptionsRequest) returns (GetSubscriptionsReply) {}
- // Get Telemetry Agent Operational States
- rpc getTelemetryOperationalState(GetOperationalStateRequest) returns (GetOperationalStateReply) {}
- // Return the set of data encodings supported by the device for
- // telemetry data
- rpc getDataEncodings(DataEncodingRequest) returns (DataEncodingReply) {}
- }
- // Message sent for a telemetry subscription request
- message SubscriptionRequest {
- // Data associated with a telemetry subscription
- SubscriptionInput input = 1;
- // List of data models paths and filters
- // which are used in a telemetry operation.
- repeated Path path_list = 2;
- // The below configuration is not defined in Openconfig RPC.
- // It is a proposed extension to configure additional
- // subscription request features.
- SubscriptionAdditionalConfig additional_config = 3;
- }
- // Data associated with a telemetry subscription
- message SubscriptionInput {
- // List of optional collector endpoints to send data for
- // this subscription.
- // If no collector destinations are specified,the collector
- // destination is assumed to be the requester on the rpc channel.
- repeated Collector collector_list = 1;
- }
- // Collector endpoints to send data specified as an ip+port combination.
- message Collector {
- // IP address of collector endpoint
- string address = 1;
- // Transport protocol port number for the collector destination.
- uint32 port = 2;
- }
- // Data model path
- message Path {
- // Data model path of interest
- // Path specification for elements of OpenConfig data models
- string path = 1;
- // Regular expression to be used in filtering state leaves
- string filter = 2;
- // If this is set to true,the target device will only send
- // updates to the collector upon a change in data value
- bool suppress_unchanged = 3;
- // Maximum time in ms the target device may go without sending
- // a message to the collector. If this time expires with
- // suppress-unchanged set,the target device must send an update
- // message regardless if the data values have changed.
- uint32 max_silent_interval = 4;
- // Time in ms between collection and transmission of the
- // specified data to the collector platform. The target device
- // will sample the corresponding data (e.g,. a counter) and
- // immediately send to the collector destination.
- //
- // If sample-frequency is set to 0,then the network device
- // must emit an update upon every datum change.
- uint32 sample_frequency = 5;
- }
- // Configure subscription request additional features.
- message SubscriptionAdditionalConfig {
- // limit the number of records sent in the stream
- int32 limit_records = 1;
- // limit the time the stream remains open
- int32 limit_time_seconds = 2;
- }
- // Reply to inline subscription for data at the specified path is done in
- // two-folds.
- // 1. Reply data message sent out using out-of-band channel.
- // 2. Telemetry data send back on the same connection as the
- // subscription request.
- // 1. Reply data message sent out using out-of-band channel.
- message SubscriptionReply {
- // Response message to a telemetry subscription creation or
- // get request.
- SubscriptionResponse response = 1;
- // List of data models paths and filters
- // which are used in a telemetry operation.
- repeated Path path_list = 2;
- }
- // Response message to a telemetry subscription creation or get request.
- message SubscriptionResponse {
- // Unique id for the subscription on the device. This is
- // generated by the device and returned in a subscription
- // request or when listing existing subscriptions
- uint32 subscription_id = 1;
- }
- // 2. Telemetry data send back on the same connection as the
- // subscription request.
- message OpenConfigData {
- // router name:export IP address
- string system_id = 1;
- // line card / RE (slot number)
- uint32 component_id = 2;
- // PFE (if applicable)
- uint32 sub_component_id = 3;
- // Path specification for elements of OpenConfig data models
- string path = 4;
- // Sequence number,monotonically increasing for each
- // system_id,component_id,sub_component_id + path.
- uint64 sequence_number = 5;
- // timestamp (milliseconds since epoch)
- uint64 timestamp = 6;
- // List of key-value pairs
- repeated KeyValue kv = 7;
- }
- // Simple Key-value,where value could be one of scalar types
- message KeyValue {
- // Key
- string key = 1;
- // One of possible values
- oneof value {
- double double_value = 5;
- int64 int_value = 6;
- uint64 uint_value = 7;
- sint64 sint_value = 8;
- bool bool_value = 9;
- string str_value = 10;
- bytes bytes_value = 11;
- }
- }
- // Message sent for a telemetry subscription cancellation request
- message CancelSubscriptionRequest {
- // Subscription identifier as returned by the device when
- // subscription was requested
- uint32 subscription_id = 1;
- }
- // Reply to telemetry subscription cancellation request
- message CancelSubscriptionReply {
- // Return code
- ReturnCode code = 1;
- // Return code string
- string code_str = 2;
- };
- // Result of the operation
- enum ReturnCode {
- SUCCESS = 0;
- NO_SUBSCRIPTION_ENTRY = 1;
- UNKNOWN_ERROR = 2;
- }
- // Message sent for a telemetry get request
- message GetSubscriptionsRequest {
- // Subscription identifier as returned by the device when
- // subscription was requested
- // --- or ---
- // 0xFFFFFFFF for all subscription identifiers
- uint32 subscription_id = 1;
- }
- // Reply to telemetry subscription get request
- message GetSubscriptionsReply {
- // List of current telemetry subscriptions
- repeated SubscriptionReply subscription_list = 1;
- }
- // Message sent for telemetry agent operational states request
- message GetOperationalStateRequest {
- // Per-subscription_id level operational state can be requested.
- //
- // Subscription identifier as returned by the device when
- // subscription was requested
- // --- or ---
- // 0xFFFFFFFF for all subscription identifiers including agent-level
- // operational stats
- // --- or ---
- // If subscription_id is not present then sent only agent-level
- // operational stats
- uint32 subscription_id = 1;
- // Control verbosity of the output
- VerbosityLevel verbosity = 2;
- }
- // Verbosity Level
- enum VerbosityLevel {
- DETAIL = 0;
- TERSE = 1;
- BRIEF = 2;
- }
- // Reply to telemetry agent operational states request
- message GetOperationalStateReply {
- // List of key-value pairs where
- // key = operational state definition
- // value = operational state value
- repeated KeyValue kv = 1;
- }
- // Message sent for a data encoding request
- message DataEncodingRequest {
- }
- // Reply to data encodings supported request
- message DataEncodingReply {
- repeated EncodingType encoding_list = 1;
- }
- // Encoding Type Supported
- enum EncodingType {
- UNDEFINED = 0;
- XML = 1;
- JSON_IETF = 2;
- PROTO3 = 3;
- }
为了进行服务调用(rpc TelemetrySubscribe),我首先需要读取具有订阅ID的头,然后开始阅读消息.现在,使用Java我能够连接服务,我确实介绍了拦截器但是当我打印/检索头时它是null.我的调用拦截器代码如下,
- ClientInterceptor interceptor = new HeaderClientInterceptor();
- originChannel = OkHttpChannelBuilder.forAddress(host,port)
- .usePlaintext(true)
- .build();
- Channel channel = ClientInterceptors.intercept(originChannel,interceptor);
- telemetryStub = OpenConfigTelemetryGrpc.newStub(channel);
- @Override
- public <ReqT,RespT> ClientCall<ReqT,RespT> interceptCall(MethodDescriptor<ReqT,RespT> method,CallOptions callOptions,Channel next) {
- return new SimpleForwardingClientCall<ReqT,RespT>(next.newCall(method,callOptions)) {
- @Override
- public void start(Listener<RespT> responseListener,Metadata headers) {
- super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
- @Override
- public void onHeaders(Metadata headers) {
- Key<String> CUSTOM_HEADER_KEY = Metadata.Key.of("responseKEY",Metadata.ASCII_STRING_MARSHALLER);
- System.out.println("Contains Key?? "+headers.containsKey(CUSTOM_HEADER_KEY));
想知道有没有其他方法来读取元数据或第一个有订阅ID的消息?所有我需要阅读第一条有订阅ID的消息,并将相同的订阅ID返回给服务器以便流可以启动我使用相同的原型文件使用相同的Python代码,它通过下面的代码提供与服务器通信仅供参考:
- sub_req = SubscribeRequestMsg("host",port)
- data_itr = stub.telemetrySubscribe(sub_req,_TIMEOUT_SECONDS)
- Metadata = data_itr.initial_Metadata()
- if Metadata[0][0] == "responseKey":
- Metainfo = Metadata[0][1]
- print Metainfo
- subreply = agent_pb2.SubscriptionReply()
- subreply.SetInParent()
- google.protobuf.text_format.Merge(Metainfo,subreply)
- if subreply.response.subscription_id:
- SUB_ID = subreply.response.subscription_id
从上面的python代码我可以轻松检索元数据对象,不知道如何使用Java检索它?
在阅读MetaData之后,我得到的是:元数据({content-type = [application / grpc],grpc-encoding = [identity],grpc-accept-encoding = [identity,deflate,gzip]})
但我知道从元数据到它还有一条线,也就是说
- response {
- subscription_id: 2
- }