2929import io .strimzi .api .kafka .model .connector .KafkaConnector ;
3030import io .strimzi .api .kafka .model .connector .KafkaConnectorSpec ;
3131import io .strimzi .api .kafka .model .connector .ListOffsets ;
32+ import io .strimzi .api .kafka .model .kafka .KafkaResources ;
3233import io .strimzi .api .kafka .model .kafka .Status ;
3334import io .strimzi .api .kafka .model .mirrormaker2 .KafkaMirrorMaker2 ;
3435import io .strimzi .operator .cluster .ClusterOperatorConfig ;
6263import io .strimzi .operator .common .BackOff ;
6364import io .strimzi .operator .common .Reconciliation ;
6465import io .strimzi .operator .common .ReconciliationLogger ;
65- import io .strimzi .operator .common .Util ;
6666import io .strimzi .operator .common .model .InvalidResourceException ;
6767import io .strimzi .operator .common .model .Labels ;
6868import io .strimzi .operator .common .model .OrderedProperties ;
8484import java .util .Optional ;
8585import java .util .Set ;
8686import java .util .TreeMap ;
87- import java .util .concurrent .ConcurrentHashMap ;
8887import java .util .function .BiFunction ;
8988import java .util .function .Function ;
9089import java .util .stream .Collectors ;
@@ -274,7 +273,7 @@ protected Future<ReconcileResult<NetworkPolicy>> connectNetworkPolicy(Reconcilia
274273 *
275274 * @return Future which completes when the reconciliation is done
276275 */
277- protected Future <Void > tlsTrustedCertsSecret (Reconciliation reconciliation , String namespace , KafkaConnectCluster connect ) {
276+ protected Future <Void > tlsTrustedCertsSecret (Reconciliation reconciliation , KafkaConnectCluster connect ) {
278277 ClientTls tls = connect .getTls ();
279278 Set <String > secretsToCopy = new HashSet <>();
280279
@@ -286,27 +285,7 @@ protected Future<Void> tlsTrustedCertsSecret(Reconciliation reconciliation, Stri
286285 return Future .succeededFuture ();
287286 }
288287
289- ConcurrentHashMap <String , String > secretData = new ConcurrentHashMap <>();
290- return Future .join (secretsToCopy .stream ()
291- .map (secretName -> secretOperations .getAsync (namespace , secretName )
292- .compose (secret -> {
293- if (secret == null ) {
294- return Future .failedFuture ("Secret " + secretName + " not found" );
295- } else {
296- secret .getData ().entrySet ().stream ()
297- .filter (e -> e .getKey ().contains (".crt" ))
298- // In case secrets contain the same key, append the secret name into the key
299- .forEach (e -> secretData .put (secretName + "-" + e .getKey (), e .getValue ()));
300- }
301- return Future .succeededFuture ();
302- }))
303- .collect (Collectors .toList ()))
304- .compose (ignore -> secretOperations .reconcile (
305- reconciliation ,
306- namespace ,
307- KafkaConnectResources .internalTlsTrustedCertsSecretName (connect .getCluster ()),
308- connect .generateTlsTrustedCertsSecret (secretData , KafkaConnectResources .internalTlsTrustedCertsSecretName (connect .getCluster ())))
309- .mapEmpty ());
288+ return ReconcilerUtils .generateTlsTrustedCertsSecret (reconciliation , secretsToCopy , KafkaConnectResources .internalTlsTrustedCertsSecretName (connect .getCluster ()), secretOperations , connect ::generateSecret ).mapEmpty ();
310289 }
311290
312291 /**
@@ -316,7 +295,7 @@ protected Future<Void> tlsTrustedCertsSecret(Reconciliation reconciliation, Stri
316295 *
317296 * @return Future which completes when the reconciliation is done
318297 */
319- protected Future <Void > oauthTrustedCertsSecret (Reconciliation reconciliation , String namespace , KafkaConnectCluster connect ) {
298+ protected Future <Void > oauthTrustedCertsSecret (Reconciliation reconciliation , KafkaConnectCluster connect ) {
320299 KafkaClientAuthentication authentication = connect .getAuthentication ();
321300 Set <String > secretsToCopy = new HashSet <>();
322301
@@ -328,42 +307,7 @@ protected Future<Void> oauthTrustedCertsSecret(Reconciliation reconciliation, St
328307 return Future .succeededFuture ();
329308 }
330309
331- List <String > certs = new ArrayList <>();
332- String oauthSecret = KafkaConnectResources .internalOauthTrustedCertsSecretName (connect .getCluster ());
333- return Future .join (secretsToCopy .stream ()
334- .map (secretName -> secretOperations .getAsync (namespace , secretName )
335- .compose (secret -> {
336- if (secret == null ) {
337- return Future .failedFuture ("Secret " + secretName + " not found" );
338- } else {
339- secret .getData ().entrySet ().stream ()
340- .filter (e -> e .getKey ().contains (".crt" ))
341- // In case secrets contain the same key, append the secret name into the key
342- .forEach (e -> certs .add (e .getValue ()));
343- }
344- return Future .succeededFuture ();
345- }))
346- .collect (Collectors .toList ()))
347- .compose (ignore -> secretOperations .reconcile (
348- reconciliation ,
349- namespace ,
350- oauthSecret ,
351- connect .generateTlsTrustedCertsSecret (Map .of (oauthSecret + ".crt" , mergeAndEncodeCerts (certs )), oauthSecret ))
352- .mapEmpty ());
353- }
354-
355- private String mergeAndEncodeCerts (List <String > certs ) {
356- if (certs .size () > 1 ) {
357- String decodedAndMergedCerts = certs .stream ()
358- .map (Util ::decodeFromBase64 )
359- .collect (Collectors .joining ("\n " ));
360-
361- return Util .encodeToBase64 (decodedAndMergedCerts );
362- } else if (certs .size () < 1 ) {
363- return "" ;
364- } else {
365- return certs .get (0 );
366- }
310+ return ReconcilerUtils .generateOauthTrustedCertsSecret (reconciliation , secretsToCopy , KafkaResources .internalOauthTrustedCertsSecretName (connect .getCluster ()), secretOperations , connect ::generateSecret );
367311 }
368312
369313 /**
0 commit comments