diff --git a/apps/beeswax/src/beeswax/server/dbms.py b/apps/beeswax/src/beeswax/server/dbms.py index 6defed37743..009340a88d0 100644 --- a/apps/beeswax/src/beeswax/server/dbms.py +++ b/apps/beeswax/src/beeswax/server/dbms.py @@ -15,20 +15,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -import re import json -import time import logging +import re import threading +import time from builtins import object +from azure.abfs import abfspath from django.core.cache import caches -from django.urls import reverse from django.utils.encoding import force_str from django.utils.translation import gettext as _ from kazoo.client import KazooClient -from azure.abfs import abfspath from beeswax.common import apply_natural_sort, is_compute from beeswax.conf import ( APPLY_NATURAL_SORT_MAX, @@ -66,8 +65,7 @@ hiveserver2_use_ssl, ) from beeswax.models import QUERY_TYPES, Compute, QueryHistory -from desktop.conf import CLUSTER_ID, has_connectors -from desktop.lib.django_util import format_preserving_redirect +from desktop.conf import has_connectors from desktop.lib.exceptions_renderable import PopupException from desktop.lib.parameterization import substitute_variables from desktop.lib.view_util import location_to_url @@ -96,15 +94,28 @@ def reset_ha(): def get_zk_hs2(): - hiveservers = None + hiveservers = [] zk = KazooClient(hosts=libzookeeper_conf.ENSEMBLE.get(), read_only=True) zk.start(timeout=ZOOKEEPER_CONN_TIMEOUT.get()) znode = HIVE_DISCOVERY_HIVESERVER2_ZNODE.get() + if zk.exists(znode): - LOG.debug("Selecting up Hive server via the following node {0}".format(znode)) + LOG.info("Selecting Hive server via the following node {0}".format(znode)) hiveservers = zk.get_children(znode) - if hiveservers and 'sequence' in hiveservers[0]: - hiveservers.sort(key=lambda x: re.findall(r'sequence=\d+', x)[0]) + + # Filter nodes that match the expected pattern before sorting + sequence_nodes = [x for x in hiveservers if re.search(r'sequence=\d+', x)] + LOG.info("Selecting Hive server via the following sequence_nodes {0}".format(sequence_nodes)) + + if sequence_nodes: + # Sort the filtered list based on the sequence number + sequence_nodes.sort(key=lambda x: int(re.findall(r'sequence=(\d+)', x)[0])) + hiveservers = sequence_nodes + else: + LOG.warning(r"No nodes matching 'sequence=\d+' found under {0}".format(znode)) + else: + LOG.error("ZooKeeper node {0} does not exist.".format(znode)) + zk.stop() return hiveservers @@ -134,20 +145,20 @@ def get(user, query_server=None, cluster=None): from impala.dbms import ImpalaDbms from impala.server import ImpalaServerClient DBMS_CACHE[user.id][query_server['server_name']] = ImpalaDbms( - HiveServerClientCompatible(ImpalaServerClient(query_server, user)), - QueryHistory.SERVER_TYPE[1][0] + HiveServerClientCompatible(ImpalaServerClient(query_server, user)), + QueryHistory.SERVER_TYPE[1][0] ) elif query_server['server_name'] == 'hms': from beeswax.server.hive_metastore_server import HiveMetastoreClient DBMS_CACHE[user.id][query_server['server_name']] = HiveServer2Dbms( - HiveMetastoreClient(query_server, user), - QueryHistory.SERVER_TYPE[1][0] + HiveMetastoreClient(query_server, user), + QueryHistory.SERVER_TYPE[1][0] ) else: from beeswax.server.hive_server2_lib import HiveServerClient DBMS_CACHE[user.id][query_server['server_name']] = HiveServer2Dbms( - HiveServerClientCompatible(HiveServerClient(query_server, user)), - QueryHistory.SERVER_TYPE[1][0] + HiveServerClientCompatible(HiveServerClient(query_server, user)), + QueryHistory.SERVER_TYPE[1][0] ) elif RESET_HS2_QUERY_SERVER: from beeswax.server.hive_server2_lib import HiveServerClient, HiveServerClientCompatible @@ -191,10 +202,10 @@ def get_query_server_config(name='beeswax', connector=None): cache.set( "llap", json.dumps({ - "host": llap_servers["addresses"][0]["host"], - "port": llap_servers["addresses"][0]["port"] - }), - CACHE_TIMEOUT.get() + "host": llap_servers["addresses"][0]["host"], + "port": llap_servers["addresses"][0]["port"] + }), + CACHE_TIMEOUT.get() ) else: LOG.error("Hive LLAP endpoint not found, reverting to config values") @@ -282,45 +293,45 @@ def get_query_server_config(name='beeswax', connector=None): elif name == 'hms': kerberos_principal = get_hiveserver2_kerberos_principal(HIVE_SERVER_HOST.get()) query_server = { - 'server_name': 'hms', - 'server_host': HIVE_METASTORE_HOST.get() if not cluster_config else cluster_config.get('server_host'), - 'server_port': HIVE_METASTORE_PORT.get(), - 'principal': kerberos_principal, - 'transport_mode': 'http' if hiveserver2_transport_mode() == 'HTTP' else 'socket', - 'auth_username': AUTH_USERNAME.get(), - 'auth_password': AUTH_PASSWORD.get(), - 'use_sasl': HIVE_USE_SASL.get() + 'server_name': 'hms', + 'server_host': HIVE_METASTORE_HOST.get(), + 'server_port': HIVE_METASTORE_PORT.get(), + 'principal': kerberos_principal, + 'transport_mode': 'http' if hiveserver2_transport_mode() == 'HTTP' else 'socket', + 'auth_username': AUTH_USERNAME.get(), + 'auth_password': AUTH_PASSWORD.get(), + 'use_sasl': HIVE_USE_SASL.get() } else: kerberos_principal = get_hiveserver2_kerberos_principal(activeEndpoint["host"]) query_server = { - 'server_name': 'beeswax' if name != 'hplsql' else 'hplsql', - 'server_host': activeEndpoint["host"], - 'server_port': LLAP_SERVER_PORT.get() if name == 'llap' else int(activeEndpoint["port"]), - 'principal': kerberos_principal, - 'http_url': '%(protocol)s://%(host)s:%(port)s/%(end_point)s' % { - 'protocol': 'https' if hiveserver2_use_ssl() else 'http', - 'host': activeEndpoint["host"], - 'port': activeEndpoint["port"], - 'end_point': hiveserver2_thrift_http_path() - }, - 'transport_mode': 'http' if hiveserver2_transport_mode() == 'HTTP' else 'socket', - 'auth_username': AUTH_USERNAME.get(), - 'auth_password': AUTH_PASSWORD.get(), - 'use_sasl': HIVE_USE_SASL.get(), - 'close_sessions': CLOSE_SESSIONS.get(), - 'has_session_pool': has_session_pool(), - 'max_number_of_sessions': MAX_NUMBER_OF_SESSIONS.get() - } + 'server_name': 'beeswax' if name != 'hplsql' else 'hplsql', + 'server_host': activeEndpoint["host"], + 'server_port': LLAP_SERVER_PORT.get() if name == 'llap' else int(activeEndpoint["port"]), + 'principal': kerberos_principal, + 'http_url': '%(protocol)s://%(host)s:%(port)s/%(end_point)s' % { + 'protocol': 'https' if hiveserver2_use_ssl() else 'http', + 'host': activeEndpoint["host"], + 'port': activeEndpoint["port"], + 'end_point': hiveserver2_thrift_http_path() + }, + 'transport_mode': 'http' if hiveserver2_transport_mode() == 'HTTP' else 'socket', + 'auth_username': AUTH_USERNAME.get(), + 'auth_password': AUTH_PASSWORD.get(), + 'use_sasl': HIVE_USE_SASL.get(), + 'close_sessions': CLOSE_SESSIONS.get(), + 'has_session_pool': has_session_pool(), + 'max_number_of_sessions': MAX_NUMBER_OF_SESSIONS.get() + } if name == 'sparksql': # Extends Hive as very similar from spark.conf import SQL_SERVER_HOST as SPARK_SERVER_HOST, SQL_SERVER_PORT as SPARK_SERVER_PORT, USE_SASL as SPARK_USE_SASL query_server.update({ - 'server_name': 'sparksql', - 'server_host': SPARK_SERVER_HOST.get(), - 'server_port': SPARK_SERVER_PORT.get(), - 'use_sasl': SPARK_USE_SASL.get() + 'server_name': 'sparksql', + 'server_host': SPARK_SERVER_HOST.get(), + 'server_port': SPARK_SERVER_PORT.get(), + 'use_sasl': SPARK_USE_SASL.get() }) if not query_server.get('dialect'): @@ -361,28 +372,28 @@ def get_query_server_config_via_connector(connector): auth_password = dbms_conf.AUTH_PASSWORD.get() return { - 'is_compute': True, - 'dialect': compute['dialect'], - 'server_name': compute_name, - 'server_host': server_host, - 'server_port': server_port, - # For connectors/computes, the auth details are not available - # from configs and needs patching before submitting requests - 'principal': 'TODO', - 'auth_username': compute['options'].get('auth_username', auth_username), - 'auth_password': compute['options'].get('auth_password', auth_password), - - 'impersonation_enabled': impersonation_enabled, - 'use_sasl': str(compute['options'].get('use_sasl', True)).upper() == 'TRUE', - 'SESSION_TIMEOUT_S': 15 * 60, - 'querycache_rows': 1000, - 'QUERY_TIMEOUT_S': 15 * 60, - 'transport_mode': compute['options'].get('transport_mode', 'http'), - 'http_url': compute['options'].get('http_url', 'http://%s:%s/cliservice' % (server_host, server_port)), - - 'close_sessions': str(compute['options'].get('close_sessions', True)).upper() == 'TRUE', - 'has_session_pool': str(compute['options'].get('has_session_pool', False)).upper() == 'TRUE', - 'max_number_of_sessions': compute['options'].get('has_session_pool', -1) + 'is_compute': True, + 'dialect': compute['dialect'], + 'server_name': compute_name, + 'server_host': server_host, + 'server_port': server_port, + # For connectors/computes, the auth details are not available + # from configs and needs patching before submitting requests + 'principal': 'TODO', + 'auth_username': compute['options'].get('auth_username', auth_username), + 'auth_password': compute['options'].get('auth_password', auth_password), + + 'impersonation_enabled': impersonation_enabled, + 'use_sasl': str(compute['options'].get('use_sasl', True)).upper() == 'TRUE', + 'SESSION_TIMEOUT_S': 15 * 60, + 'querycache_rows': 1000, + 'QUERY_TIMEOUT_S': 15 * 60, + 'transport_mode': compute['options'].get('transport_mode', 'http'), + 'http_url': compute['options'].get('http_url', 'http://%s:%s/cliservice' % (server_host, server_port)), + + 'close_sessions': str(compute['options'].get('close_sessions', True)).upper() == 'TRUE', + 'has_session_pool': str(compute['options'].get('has_session_pool', False)).upper() == 'TRUE', + 'max_number_of_sessions': compute['options'].get('has_session_pool', -1) } @@ -412,7 +423,7 @@ def __init__(self, client, server_type): self.client = client self.server_type = server_type self.server_name = self.client.query_server.get('dialect') if self.client.query_server['server_name'].isdigit() \ - else self.client.query_server['server_name'] + else self.client.query_server['server_name'] @classmethod def to_matching_wildcard(cls, identifier=None): @@ -500,10 +511,10 @@ def _get_tables_via_sparksql(self, database, table_names='*'): # We get back: database | tableName | isTemporary return [{ - 'name': row[1], - 'type': 'VIEW' if row[2] else 'TABLE', - 'comment': '' - } + 'name': row[1], + 'type': 'VIEW' if row[2] else 'TABLE', + 'comment': '' + } for row in result.rows() ] else: @@ -515,10 +526,10 @@ def get_table(self, database, table_name): except QueryServerException as e: LOG.debug("Seems like %s.%s could be a Kudu table" % (database, table_name)) if 'java.lang.ClassNotFoundException' in e.message and [ - prop - for prop in self.get_table_properties(database, table_name, property_name='storage_handler').rows() - if 'KuduStorageHandler' in prop[0] - ]: + prop + for prop in self.get_table_properties(database, table_name, property_name='storage_handler').rows() + if 'KuduStorageHandler' in prop[0] + ]: query_server = get_query_server_config('impala') db = get(self.client.user, query_server) table = db.get_table(database, table_name) @@ -616,9 +627,9 @@ def execute_statement(self, hql): def fetch(self, query_handle, start_over=False, rows=None): no_start_over_support = [ - config_variable - for config_variable in self.get_default_configuration(False) - if config_variable.key == 'support_start_over' and config_variable.value == 'false' + config_variable + for config_variable in self.get_default_configuration(False) + if config_variable.key == 'support_start_over' and config_variable.value == 'false' ] if no_start_over_support: start_over = False @@ -816,28 +827,28 @@ def get_table_columns_stats(self, database, table, column): return self._extract_impala_column(data) else: return [ - {'col_name': data[2][0]}, - {'data_type': data[2][1]}, - {'min': data[2][2]}, - {'max': data[2][3]}, - {'num_nulls': data[2][4]}, - {'distinct_count': data[2][5]}, - {'avg_col_len': data[2][6]}, - {'max_col_len': data[2][7]}, - {'num_trues': data[2][8]}, - {'num_falses': data[2][9]} + {'col_name': data[2][0]}, + {'data_type': data[2][1]}, + {'min': data[2][2]}, + {'max': data[2][3]}, + {'num_nulls': data[2][4]}, + {'distinct_count': data[2][5]}, + {'avg_col_len': data[2][6]}, + {'max_col_len': data[2][7]}, + {'num_trues': data[2][8]}, + {'num_falses': data[2][9]} ] else: return [] def _extract_impala_column(self, col): return [ - {'col_name': col[0]}, - {'data_type': col[1]}, - {'distinct_count': col[2]}, - {'num_nulls': col[3]}, - {'max_col_len': col[4]}, - {'avg_col_len': col[5]}, + {'col_name': col[0]}, + {'data_type': col[1]}, + {'distinct_count': col[2]}, + {'num_nulls': col[3]}, + {'max_col_len': col[4]}, + {'avg_col_len': col[5]}, ] def get_table_properties(self, database, table, property_name=None): @@ -876,7 +887,7 @@ def get_top_terms(self, database, table, column, limit=30, prefix=None): GROUP BY %(column)s ORDER BY ct DESC LIMIT %(limit)s''' % { - 'database': database, 'table': table, 'column': column, 'prefix_match': prefix_match, 'limit': limit, + 'database': database, 'table': table, 'column': column, 'prefix_match': prefix_match, 'limit': limit, } query = hql_query(hql) @@ -1034,7 +1045,6 @@ def create_table_as_a_select(self, request, query_history, target_database, targ except Exception as double_trouble: LOG.exception('Failed to drop table "%s" as well: %s' % (target_table, double_trouble)) raise ex - url = format_preserving_redirect(request, reverse('metastore:index')) return query_history @@ -1107,17 +1117,17 @@ def execute_and_watch(self, query, design=None, query_history=None): hql_query = query.hql_query if query_history is None: query_history = QueryHistory.build( - owner=self.client.user, - query=hql_query, - server_host='%(server_host)s' % self.client.query_server, - server_port='%(server_port)d' % self.client.query_server, - server_name='%(server_name)s' % self.client.query_server, - server_type=self.server_type, - last_state=QueryHistory.STATE.submitted.value, - design=design, - notify=query.query.get('email_notify', False), - query_type=query.query['type'], - statement_number=0 + owner=self.client.user, + query=hql_query, + server_host='%(server_host)s' % self.client.query_server, + server_port='%(server_port)d' % self.client.query_server, + server_name='%(server_name)s' % self.client.query_server, + server_type=self.server_type, + last_state=QueryHistory.STATE.submitted.value, + design=design, + notify=query.query.get('email_notify', False), + query_type=query.query['type'], + statement_number=0 ) query_history.save() @@ -1226,9 +1236,6 @@ def get_functions(self, prefix=None, database=None): ''' Not using self.client.get_functions() as pretty limited. More comments there. ''' - result = None - - function_filter = "'%s*'" % prefix if prefix else '' if self.client.query_server['dialect'] == 'impala': if database is None: @@ -1286,7 +1293,7 @@ def get_primary_keys(self, database_name, table_name, catalog_name=None): ) def get_foreign_keys(self, parent_catalog_name=None, parent_database_name=None, parent_table_name=None, foreign_catalog_name=None, - foreign_database_name=None, foreign_table_name=None): + foreign_database_name=None, foreign_table_name=None): return self.client.get_foreign_keys( parent_catalog_name=parent_catalog_name,