55
66package org .opensearch .sql .opensearch .executor ;
77
8- import static org .opensearch .sql .expression .function .BuiltinFunctionName .DISTINCT_COUNT_APPROX ;
9-
8+ import com .google .common .base .Suppliers ;
109import java .security .AccessController ;
1110import java .security .PrivilegedAction ;
1211import java .sql .PreparedStatement ;
1716import java .util .LinkedHashMap ;
1817import java .util .List ;
1918import java .util .Map ;
19+ import java .util .concurrent .ConcurrentHashMap ;
2020import java .util .concurrent .atomic .AtomicReference ;
21+ import java .util .function .Supplier ;
2122import org .apache .calcite .plan .RelOptUtil ;
2223import org .apache .calcite .rel .RelNode ;
2324import org .apache .calcite .rel .RelRoot ;
2425import org .apache .calcite .rel .type .RelDataType ;
2526import org .apache .calcite .rel .type .RelDataTypeField ;
2627import org .apache .calcite .runtime .Hook ;
2728import org .apache .calcite .sql .SqlExplainLevel ;
29+ import org .apache .calcite .sql .SqlOperator ;
30+ import org .apache .calcite .sql .SqlOperatorTable ;
2831import org .apache .calcite .sql .type .ReturnTypes ;
2932import org .apache .calcite .sql .type .SqlTypeName ;
33+ import org .apache .calcite .sql .util .ListSqlOperatorTable ;
3034import org .apache .calcite .sql .validate .SqlUserDefinedAggFunction ;
3135import org .apache .calcite .sql .validate .SqlUserDefinedFunction ;
3236import org .apache .logging .log4j .LogManager ;
@@ -273,8 +277,9 @@ private void buildResultSet(
273277 private void registerOpenSearchFunctions () {
274278 if (client instanceof OpenSearchNodeClient ) {
275279 SqlUserDefinedFunction geoIpFunction =
276- new GeoIpFunction (client .getNodeClient ()).toUDF (" GEOIP" );
280+ new GeoIpFunction (client .getNodeClient ()).toUDF (BuiltinFunctionName . GEOIP . name () );
277281 PPLFuncImpTable .INSTANCE .registerExternalOperator (BuiltinFunctionName .GEOIP , geoIpFunction );
282+ OperatorTable .addOperator (BuiltinFunctionName .GEOIP .name (), geoIpFunction );
278283 } else {
279284 logger .info (
280285 "Function [GEOIP] not registered: incompatible client type {}" ,
@@ -284,10 +289,37 @@ private void registerOpenSearchFunctions() {
284289 SqlUserDefinedAggFunction approxDistinctCountFunction =
285290 UserDefinedFunctionUtils .createUserDefinedAggFunction (
286291 DistinctCountApproxAggFunction .class ,
287- DISTINCT_COUNT_APPROX .toString (),
292+ BuiltinFunctionName . DISTINCT_COUNT_APPROX .name (),
288293 ReturnTypes .BIGINT_FORCE_NULLABLE ,
289294 null );
290295 PPLFuncImpTable .INSTANCE .registerExternalAggOperator (
291- DISTINCT_COUNT_APPROX , approxDistinctCountFunction );
296+ BuiltinFunctionName .DISTINCT_COUNT_APPROX , approxDistinctCountFunction );
297+ OperatorTable .addOperator (
298+ BuiltinFunctionName .DISTINCT_COUNT_APPROX .name (), approxDistinctCountFunction );
299+ }
300+
301+ /**
302+ * Dynamic SqlOperatorTable that allows adding operators after initialization. Similar to
303+ * PPLBuiltinOperator.instance() or SqlStdOperatorTable.instance().
304+ */
305+ public static class OperatorTable extends ListSqlOperatorTable {
306+ private static final Supplier <OperatorTable > INSTANCE =
307+ Suppliers .memoize (() -> (OperatorTable ) new OperatorTable ().init ());
308+ // Use map instead of list to avoid duplicated elements if the class is initialized multiple
309+ // times
310+ private static final Map <String , SqlOperator > operators = new ConcurrentHashMap <>();
311+
312+ public static SqlOperatorTable instance () {
313+ return INSTANCE .get ();
314+ }
315+
316+ private ListSqlOperatorTable init () {
317+ setOperators (buildIndex (operators .values ()));
318+ return this ;
319+ }
320+
321+ public static synchronized void addOperator (String name , SqlOperator operator ) {
322+ operators .put (name , operator );
323+ }
292324 }
293325}
0 commit comments