Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 124 additions & 117 deletions apps/beeswax/src/beeswax/server/dbms.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import re
import json
import time
import logging
import re
import threading
import time
from builtins import object

from azure.abfs import abfspath
from django.core.cache import caches
from django.urls import reverse
from django.utils.encoding import force_str
from django.utils.translation import gettext as _
from kazoo.client import KazooClient

from azure.abfs import abfspath
from beeswax.common import apply_natural_sort, is_compute
from beeswax.conf import (
APPLY_NATURAL_SORT_MAX,
Expand Down Expand Up @@ -66,8 +65,7 @@
hiveserver2_use_ssl,
)
from beeswax.models import QUERY_TYPES, Compute, QueryHistory
from desktop.conf import CLUSTER_ID, has_connectors
from desktop.lib.django_util import format_preserving_redirect
from desktop.conf import has_connectors
from desktop.lib.exceptions_renderable import PopupException
from desktop.lib.parameterization import substitute_variables
from desktop.lib.view_util import location_to_url
Expand Down Expand Up @@ -96,15 +94,28 @@ def reset_ha():


def get_zk_hs2():
hiveservers = None
hiveservers = []
zk = KazooClient(hosts=libzookeeper_conf.ENSEMBLE.get(), read_only=True)
zk.start(timeout=ZOOKEEPER_CONN_TIMEOUT.get())
znode = HIVE_DISCOVERY_HIVESERVER2_ZNODE.get()

if zk.exists(znode):
LOG.debug("Selecting up Hive server via the following node {0}".format(znode))
LOG.info("Selecting Hive server via the following node {0}".format(znode))
hiveservers = zk.get_children(znode)
if hiveservers and 'sequence' in hiveservers[0]:
hiveservers.sort(key=lambda x: re.findall(r'sequence=\d+', x)[0])

# Filter nodes that match the expected pattern before sorting
sequence_nodes = [x for x in hiveservers if re.search(r'sequence=\d+', x)]
LOG.info("Selecting Hive server via the following sequence_nodes {0}".format(sequence_nodes))

if sequence_nodes:
# Sort the filtered list based on the sequence number
sequence_nodes.sort(key=lambda x: int(re.findall(r'sequence=(\d+)', x)[0]))
hiveservers = sequence_nodes
else:
LOG.warning(r"No nodes matching 'sequence=\d+' found under {0}".format(znode))
else:
LOG.error("ZooKeeper node {0} does not exist.".format(znode))

zk.stop()
return hiveservers

Expand Down Expand Up @@ -134,20 +145,20 @@ def get(user, query_server=None, cluster=None):
from impala.dbms import ImpalaDbms
from impala.server import ImpalaServerClient
DBMS_CACHE[user.id][query_server['server_name']] = ImpalaDbms(
HiveServerClientCompatible(ImpalaServerClient(query_server, user)),
QueryHistory.SERVER_TYPE[1][0]
HiveServerClientCompatible(ImpalaServerClient(query_server, user)),
QueryHistory.SERVER_TYPE[1][0]
)
elif query_server['server_name'] == 'hms':
from beeswax.server.hive_metastore_server import HiveMetastoreClient
DBMS_CACHE[user.id][query_server['server_name']] = HiveServer2Dbms(
HiveMetastoreClient(query_server, user),
QueryHistory.SERVER_TYPE[1][0]
HiveMetastoreClient(query_server, user),
QueryHistory.SERVER_TYPE[1][0]
)
else:
from beeswax.server.hive_server2_lib import HiveServerClient
DBMS_CACHE[user.id][query_server['server_name']] = HiveServer2Dbms(
HiveServerClientCompatible(HiveServerClient(query_server, user)),
QueryHistory.SERVER_TYPE[1][0]
HiveServerClientCompatible(HiveServerClient(query_server, user)),
QueryHistory.SERVER_TYPE[1][0]
)
elif RESET_HS2_QUERY_SERVER:
from beeswax.server.hive_server2_lib import HiveServerClient, HiveServerClientCompatible
Expand Down Expand Up @@ -191,10 +202,10 @@ def get_query_server_config(name='beeswax', connector=None):
cache.set(
"llap",
json.dumps({
"host": llap_servers["addresses"][0]["host"],
"port": llap_servers["addresses"][0]["port"]
}),
CACHE_TIMEOUT.get()
"host": llap_servers["addresses"][0]["host"],
"port": llap_servers["addresses"][0]["port"]
}),
CACHE_TIMEOUT.get()
)
else:
LOG.error("Hive LLAP endpoint not found, reverting to config values")
Expand Down Expand Up @@ -282,45 +293,45 @@ def get_query_server_config(name='beeswax', connector=None):
elif name == 'hms':
kerberos_principal = get_hiveserver2_kerberos_principal(HIVE_SERVER_HOST.get())
query_server = {
'server_name': 'hms',
'server_host': HIVE_METASTORE_HOST.get() if not cluster_config else cluster_config.get('server_host'),
'server_port': HIVE_METASTORE_PORT.get(),
'principal': kerberos_principal,
'transport_mode': 'http' if hiveserver2_transport_mode() == 'HTTP' else 'socket',
'auth_username': AUTH_USERNAME.get(),
'auth_password': AUTH_PASSWORD.get(),
'use_sasl': HIVE_USE_SASL.get()
'server_name': 'hms',
'server_host': HIVE_METASTORE_HOST.get(),
'server_port': HIVE_METASTORE_PORT.get(),
'principal': kerberos_principal,
'transport_mode': 'http' if hiveserver2_transport_mode() == 'HTTP' else 'socket',
'auth_username': AUTH_USERNAME.get(),
'auth_password': AUTH_PASSWORD.get(),
'use_sasl': HIVE_USE_SASL.get()
}
else:
kerberos_principal = get_hiveserver2_kerberos_principal(activeEndpoint["host"])
query_server = {
'server_name': 'beeswax' if name != 'hplsql' else 'hplsql',
'server_host': activeEndpoint["host"],
'server_port': LLAP_SERVER_PORT.get() if name == 'llap' else int(activeEndpoint["port"]),
'principal': kerberos_principal,
'http_url': '%(protocol)s://%(host)s:%(port)s/%(end_point)s' % {
'protocol': 'https' if hiveserver2_use_ssl() else 'http',
'host': activeEndpoint["host"],
'port': activeEndpoint["port"],
'end_point': hiveserver2_thrift_http_path()
},
'transport_mode': 'http' if hiveserver2_transport_mode() == 'HTTP' else 'socket',
'auth_username': AUTH_USERNAME.get(),
'auth_password': AUTH_PASSWORD.get(),
'use_sasl': HIVE_USE_SASL.get(),
'close_sessions': CLOSE_SESSIONS.get(),
'has_session_pool': has_session_pool(),
'max_number_of_sessions': MAX_NUMBER_OF_SESSIONS.get()
}
'server_name': 'beeswax' if name != 'hplsql' else 'hplsql',
'server_host': activeEndpoint["host"],
'server_port': LLAP_SERVER_PORT.get() if name == 'llap' else int(activeEndpoint["port"]),
'principal': kerberos_principal,
'http_url': '%(protocol)s://%(host)s:%(port)s/%(end_point)s' % {
'protocol': 'https' if hiveserver2_use_ssl() else 'http',
'host': activeEndpoint["host"],
'port': activeEndpoint["port"],
'end_point': hiveserver2_thrift_http_path()
},
'transport_mode': 'http' if hiveserver2_transport_mode() == 'HTTP' else 'socket',
'auth_username': AUTH_USERNAME.get(),
'auth_password': AUTH_PASSWORD.get(),
'use_sasl': HIVE_USE_SASL.get(),
'close_sessions': CLOSE_SESSIONS.get(),
'has_session_pool': has_session_pool(),
'max_number_of_sessions': MAX_NUMBER_OF_SESSIONS.get()
}

if name == 'sparksql': # Extends Hive as very similar
from spark.conf import SQL_SERVER_HOST as SPARK_SERVER_HOST, SQL_SERVER_PORT as SPARK_SERVER_PORT, USE_SASL as SPARK_USE_SASL

query_server.update({
'server_name': 'sparksql',
'server_host': SPARK_SERVER_HOST.get(),
'server_port': SPARK_SERVER_PORT.get(),
'use_sasl': SPARK_USE_SASL.get()
'server_name': 'sparksql',
'server_host': SPARK_SERVER_HOST.get(),
'server_port': SPARK_SERVER_PORT.get(),
'use_sasl': SPARK_USE_SASL.get()
})

if not query_server.get('dialect'):
Expand Down Expand Up @@ -361,28 +372,28 @@ def get_query_server_config_via_connector(connector):
auth_password = dbms_conf.AUTH_PASSWORD.get()

return {
'is_compute': True,
'dialect': compute['dialect'],
'server_name': compute_name,
'server_host': server_host,
'server_port': server_port,
# For connectors/computes, the auth details are not available
# from configs and needs patching before submitting requests
'principal': 'TODO',
'auth_username': compute['options'].get('auth_username', auth_username),
'auth_password': compute['options'].get('auth_password', auth_password),

'impersonation_enabled': impersonation_enabled,
'use_sasl': str(compute['options'].get('use_sasl', True)).upper() == 'TRUE',
'SESSION_TIMEOUT_S': 15 * 60,
'querycache_rows': 1000,
'QUERY_TIMEOUT_S': 15 * 60,
'transport_mode': compute['options'].get('transport_mode', 'http'),
'http_url': compute['options'].get('http_url', 'http://%s:%s/cliservice' % (server_host, server_port)),

'close_sessions': str(compute['options'].get('close_sessions', True)).upper() == 'TRUE',
'has_session_pool': str(compute['options'].get('has_session_pool', False)).upper() == 'TRUE',
'max_number_of_sessions': compute['options'].get('has_session_pool', -1)
'is_compute': True,
'dialect': compute['dialect'],
'server_name': compute_name,
'server_host': server_host,
'server_port': server_port,
# For connectors/computes, the auth details are not available
# from configs and needs patching before submitting requests
'principal': 'TODO',
'auth_username': compute['options'].get('auth_username', auth_username),
'auth_password': compute['options'].get('auth_password', auth_password),

'impersonation_enabled': impersonation_enabled,
'use_sasl': str(compute['options'].get('use_sasl', True)).upper() == 'TRUE',
'SESSION_TIMEOUT_S': 15 * 60,
'querycache_rows': 1000,
'QUERY_TIMEOUT_S': 15 * 60,
'transport_mode': compute['options'].get('transport_mode', 'http'),
'http_url': compute['options'].get('http_url', 'http://%s:%s/cliservice' % (server_host, server_port)),

'close_sessions': str(compute['options'].get('close_sessions', True)).upper() == 'TRUE',
'has_session_pool': str(compute['options'].get('has_session_pool', False)).upper() == 'TRUE',
'max_number_of_sessions': compute['options'].get('has_session_pool', -1)
}


Expand Down Expand Up @@ -412,7 +423,7 @@ def __init__(self, client, server_type):
self.client = client
self.server_type = server_type
self.server_name = self.client.query_server.get('dialect') if self.client.query_server['server_name'].isdigit() \
else self.client.query_server['server_name']
else self.client.query_server['server_name']

@classmethod
def to_matching_wildcard(cls, identifier=None):
Expand Down Expand Up @@ -500,10 +511,10 @@ def _get_tables_via_sparksql(self, database, table_names='*'):

# We get back: database | tableName | isTemporary
return [{
'name': row[1],
'type': 'VIEW' if row[2] else 'TABLE',
'comment': ''
}
'name': row[1],
'type': 'VIEW' if row[2] else 'TABLE',
'comment': ''
}
for row in result.rows()
]
else:
Expand All @@ -515,10 +526,10 @@ def get_table(self, database, table_name):
except QueryServerException as e:
LOG.debug("Seems like %s.%s could be a Kudu table" % (database, table_name))
if 'java.lang.ClassNotFoundException' in e.message and [
prop
for prop in self.get_table_properties(database, table_name, property_name='storage_handler').rows()
if 'KuduStorageHandler' in prop[0]
]:
prop
for prop in self.get_table_properties(database, table_name, property_name='storage_handler').rows()
if 'KuduStorageHandler' in prop[0]
]:
query_server = get_query_server_config('impala')
db = get(self.client.user, query_server)
table = db.get_table(database, table_name)
Expand Down Expand Up @@ -616,9 +627,9 @@ def execute_statement(self, hql):

def fetch(self, query_handle, start_over=False, rows=None):
no_start_over_support = [
config_variable
for config_variable in self.get_default_configuration(False)
if config_variable.key == 'support_start_over' and config_variable.value == 'false'
config_variable
for config_variable in self.get_default_configuration(False)
if config_variable.key == 'support_start_over' and config_variable.value == 'false'
]
if no_start_over_support:
start_over = False
Expand Down Expand Up @@ -816,28 +827,28 @@ def get_table_columns_stats(self, database, table, column):
return self._extract_impala_column(data)
else:
return [
{'col_name': data[2][0]},
{'data_type': data[2][1]},
{'min': data[2][2]},
{'max': data[2][3]},
{'num_nulls': data[2][4]},
{'distinct_count': data[2][5]},
{'avg_col_len': data[2][6]},
{'max_col_len': data[2][7]},
{'num_trues': data[2][8]},
{'num_falses': data[2][9]}
{'col_name': data[2][0]},
{'data_type': data[2][1]},
{'min': data[2][2]},
{'max': data[2][3]},
{'num_nulls': data[2][4]},
{'distinct_count': data[2][5]},
{'avg_col_len': data[2][6]},
{'max_col_len': data[2][7]},
{'num_trues': data[2][8]},
{'num_falses': data[2][9]}
]
else:
return []

def _extract_impala_column(self, col):
return [
{'col_name': col[0]},
{'data_type': col[1]},
{'distinct_count': col[2]},
{'num_nulls': col[3]},
{'max_col_len': col[4]},
{'avg_col_len': col[5]},
{'col_name': col[0]},
{'data_type': col[1]},
{'distinct_count': col[2]},
{'num_nulls': col[3]},
{'max_col_len': col[4]},
{'avg_col_len': col[5]},
]

def get_table_properties(self, database, table, property_name=None):
Expand Down Expand Up @@ -876,7 +887,7 @@ def get_top_terms(self, database, table, column, limit=30, prefix=None):
GROUP BY %(column)s
ORDER BY ct DESC
LIMIT %(limit)s''' % {
'database': database, 'table': table, 'column': column, 'prefix_match': prefix_match, 'limit': limit,
'database': database, 'table': table, 'column': column, 'prefix_match': prefix_match, 'limit': limit,
}

query = hql_query(hql)
Expand Down Expand Up @@ -1034,7 +1045,6 @@ def create_table_as_a_select(self, request, query_history, target_database, targ
except Exception as double_trouble:
LOG.exception('Failed to drop table "%s" as well: %s' % (target_table, double_trouble))
raise ex
url = format_preserving_redirect(request, reverse('metastore:index'))

return query_history

Expand Down Expand Up @@ -1107,17 +1117,17 @@ def execute_and_watch(self, query, design=None, query_history=None):
hql_query = query.hql_query
if query_history is None:
query_history = QueryHistory.build(
owner=self.client.user,
query=hql_query,
server_host='%(server_host)s' % self.client.query_server,
server_port='%(server_port)d' % self.client.query_server,
server_name='%(server_name)s' % self.client.query_server,
server_type=self.server_type,
last_state=QueryHistory.STATE.submitted.value,
design=design,
notify=query.query.get('email_notify', False),
query_type=query.query['type'],
statement_number=0
owner=self.client.user,
query=hql_query,
server_host='%(server_host)s' % self.client.query_server,
server_port='%(server_port)d' % self.client.query_server,
server_name='%(server_name)s' % self.client.query_server,
server_type=self.server_type,
last_state=QueryHistory.STATE.submitted.value,
design=design,
notify=query.query.get('email_notify', False),
query_type=query.query['type'],
statement_number=0
)
query_history.save()

Expand Down Expand Up @@ -1226,9 +1236,6 @@ def get_functions(self, prefix=None, database=None):
'''
Not using self.client.get_functions() as pretty limited. More comments there.
'''
result = None

function_filter = "'%s*'" % prefix if prefix else ''

if self.client.query_server['dialect'] == 'impala':
if database is None:
Expand Down Expand Up @@ -1286,7 +1293,7 @@ def get_primary_keys(self, database_name, table_name, catalog_name=None):
)

def get_foreign_keys(self, parent_catalog_name=None, parent_database_name=None, parent_table_name=None, foreign_catalog_name=None,
foreign_database_name=None, foreign_table_name=None):
foreign_database_name=None, foreign_table_name=None):

return self.client.get_foreign_keys(
parent_catalog_name=parent_catalog_name,
Expand Down
Loading