3434import com .fasterxml .jackson .databind .SerializationFeature ;
3535import com .fasterxml .jackson .databind .node .ObjectNode ;
3636import com .google .protobuf .DescriptorProtos ;
37+ import com .google .protobuf .DescriptorProtos .FileDescriptorProto ;
38+ import com .google .protobuf .DescriptorProtos .FileDescriptorSet ;
3739import com .google .protobuf .Descriptors ;
40+ import com .google .protobuf .Descriptors .DescriptorValidationException ;
41+ import com .google .protobuf .Descriptors .FileDescriptor ;
3842import com .google .protobuf .DynamicMessage ;
43+ import com .google .protobuf .ProtocolStringList ;
3944import com .google .protobuf .util .JsonFormat ;
4045import io .grpc .CallOptions ;
4146import io .grpc .Channel ;
6974import static org .springframework .cloud .gateway .support .GatewayToStringStyler .filterToStringCreator ;
7075
7176/**
72- * This filter takes a JSON payload, transform it into a protobuf object, send it to a
77+ * This filter takes a JSON payload, transform it into a protobuf object, send
78+ * it to a
7379 * given gRPC channel, and transform the response back to JSON.
7480 *
75- * Making it transparent for the consumer that the service under the gateway is a gRPC
81+ * Making it transparent for the consumer that the service under the gateway is
82+ * a gRPC
7683 * one.
7784 *
7885 * @author Alberto C. Ríos
@@ -104,7 +111,7 @@ public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
104111
105112 ServerWebExchangeUtils .setAlreadyRouted (exchange );
106113 return modifiedResponse .writeWith (exchange .getRequest ().getBody ())
107- .then (chain .filter (exchange .mutate ().response (modifiedResponse ).build ()));
114+ .then (chain .filter (exchange .mutate ().response (modifiedResponse ).build ()));
108115 }
109116
110117 @ Override
@@ -193,8 +200,7 @@ class GRPCResponseDecorator extends ServerHttpResponseDecorator {
193200 objectReader = objectMapper .readerFor (JsonNode .class );
194201 objectNode = objectMapper .createObjectNode ();
195202
196- }
197- catch (IOException | Descriptors .DescriptorValidationException e ) {
203+ } catch (IOException | Descriptors .DescriptorValidationException e ) {
198204 throw new RuntimeException (e );
199205 }
200206 }
@@ -204,24 +210,25 @@ public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
204210 exchange .getResponse ().getHeaders ().set ("Content-Type" , "application/json" );
205211
206212 return getDelegate ().writeWith (deserializeJSONRequest ().map (callGRPCServer ())
207- .map (serialiseGRPCResponse ())
208- .map (wrapGRPCResponse ())
209- .cast (DataBuffer .class )
210- .last ());
213+ .map (serialiseGRPCResponse ())
214+ .map (wrapGRPCResponse ())
215+ .cast (DataBuffer .class )
216+ .last ());
211217 }
212218
213219 private ClientCall <DynamicMessage , DynamicMessage > createClientCallForType (Config config ,
214220 Descriptors .ServiceDescriptor serviceDescriptor , Descriptors .Descriptor outputType ) {
215221 MethodDescriptor .Marshaller <DynamicMessage > marshaller = ProtoUtils
216- .marshaller (DynamicMessage .newBuilder (outputType ).build ());
222+ .marshaller (DynamicMessage .newBuilder (outputType ).build ());
217223 MethodDescriptor <DynamicMessage , DynamicMessage > methodDescriptor = MethodDescriptor
218- .<DynamicMessage , DynamicMessage >newBuilder ()
219- .setType (MethodDescriptor .MethodType .UNKNOWN )
220- .setFullMethodName (
221- MethodDescriptor .generateFullMethodName (serviceDescriptor .getFullName (), config .getMethod ()))
222- .setRequestMarshaller (marshaller )
223- .setResponseMarshaller (marshaller )
224- .build ();
224+ .<DynamicMessage , DynamicMessage >newBuilder ()
225+ .setType (MethodDescriptor .MethodType .UNKNOWN )
226+ .setFullMethodName (
227+ MethodDescriptor .generateFullMethodName (serviceDescriptor .getFullName (),
228+ config .getMethod ()))
229+ .setRequestMarshaller (marshaller )
230+ .setResponseMarshaller (marshaller )
231+ .build ();
225232 Channel channel = createChannel ();
226233 return channel .newCall (methodDescriptor , CallOptions .DEFAULT );
227234 }
@@ -230,10 +237,10 @@ private Descriptors.MethodDescriptor getMethodDescriptor(Config config)
230237 throws IOException , Descriptors .DescriptorValidationException {
231238 Resource descriptorFile = resourceLoader .getResource (config .getProtoDescriptor ());
232239 DescriptorProtos .FileDescriptorSet fileDescriptorSet = DescriptorProtos .FileDescriptorSet
233- .parseFrom (descriptorFile .getInputStream ());
240+ .parseFrom (descriptorFile .getInputStream ());
234241 DescriptorProtos .FileDescriptorProto fileProto = fileDescriptorSet .getFile (0 );
235242 Descriptors .FileDescriptor fileDescriptor = Descriptors .FileDescriptor .buildFrom (fileProto ,
236- new Descriptors . FileDescriptor [ 0 ] );
243+ dependencies ( fileDescriptorSet , fileProto . getDependencyList ()) );
237244
238245 Descriptors .ServiceDescriptor serviceDescriptor = fileDescriptor .findServiceByName (config .getService ());
239246 if (serviceDescriptor == null ) {
@@ -243,9 +250,35 @@ private Descriptors.MethodDescriptor getMethodDescriptor(Config config)
243250 List <Descriptors .MethodDescriptor > methods = serviceDescriptor .getMethods ();
244251
245252 return methods .stream ()
246- .filter (method -> method .getName ().equals (config .getMethod ()))
247- .findFirst ()
248- .orElseThrow (() -> new NoSuchElementException ("No Method found" ));
253+ .filter (method -> method .getName ().equals (config .getMethod ()))
254+ .findFirst ()
255+ .orElseThrow (() -> new NoSuchElementException ("No Method found" ));
256+ }
257+
258+ private FileDescriptor [] dependencies (FileDescriptorSet input , ProtocolStringList list ) {
259+ FileDescriptor [] deps = new FileDescriptor [list .size ()];
260+ for (int i = 0 ; i < list .size (); i ++) {
261+ String name = list .get (i );
262+ FileDescriptorProto file = findFileByName (input , name );
263+ if (file == null ) {
264+ throw new IllegalStateException ("Missing dependency: " + name );
265+ }
266+ try {
267+ deps [i ] = FileDescriptor .buildFrom (file , dependencies (input , file .getDependencyList ()));
268+ } catch (DescriptorValidationException e ) {
269+ throw new IllegalStateException ("Invalid descriptor: " + file .getName (), e );
270+ }
271+ }
272+ return deps ;
273+ }
274+
275+ private FileDescriptorProto findFileByName (FileDescriptorSet input , String name ) {
276+ for (FileDescriptorProto file : input .getFileList ()) {
277+ if (file .getName ().equals (name )) {
278+ return file ;
279+ }
280+ }
281+ return null ;
249282 }
250283
251284 private ManagedChannel createChannel () {
@@ -259,8 +292,7 @@ private Function<JsonNode, DynamicMessage> callGRPCServer() {
259292 DynamicMessage .Builder builder = DynamicMessage .newBuilder (descriptor );
260293 JsonFormat .parser ().merge (jsonRequest .toString (), builder );
261294 return ClientCalls .blockingUnaryCall (clientCall , builder .build ());
262- }
263- catch (IOException e ) {
295+ } catch (IOException e ) {
264296 throw new RuntimeException (e );
265297 }
266298 };
@@ -270,9 +302,8 @@ private Function<DynamicMessage, Object> serialiseGRPCResponse() {
270302 return gRPCResponse -> {
271303 try {
272304 return objectReader
273- .readValue (JsonFormat .printer ().omittingInsignificantWhitespace ().print (gRPCResponse ));
274- }
275- catch (IOException e ) {
305+ .readValue (JsonFormat .printer ().omittingInsignificantWhitespace ().print (gRPCResponse ));
306+ } catch (IOException e ) {
276307 throw new RuntimeException (e );
277308 }
278309 };
@@ -292,9 +323,8 @@ private Function<Object, DataBuffer> wrapGRPCResponse() {
292323 return jsonResponse -> {
293324 try {
294325 return new NettyDataBufferFactory (new PooledByteBufAllocator ())
295- .wrap (Objects .requireNonNull (new ObjectMapper ().writeValueAsBytes (jsonResponse )));
296- }
297- catch (JsonProcessingException e ) {
326+ .wrap (Objects .requireNonNull (new ObjectMapper ().writeValueAsBytes (jsonResponse )));
327+ } catch (JsonProcessingException e ) {
298328 return new NettyDataBufferFactory (new PooledByteBufAllocator ()).allocateBuffer ();
299329 }
300330 };
@@ -305,8 +335,7 @@ private ManagedChannel createChannelChannel(String host, int port) {
305335 NettyChannelBuilder nettyChannelBuilder = NettyChannelBuilder .forAddress (host , port );
306336 try {
307337 return grpcSslConfigurer .configureSsl (nettyChannelBuilder );
308- }
309- catch (SSLException e ) {
338+ } catch (SSLException e ) {
310339 throw new RuntimeException (e );
311340 }
312341 }
0 commit comments