From cfec246f045a8ba767f4e7354dcfed569cfa8452 Mon Sep 17 00:00:00 2001 From: ico Date: Tue, 2 Jan 2024 14:02:43 +0100 Subject: [PATCH] adding ip restrictions to login process --- py4web/utils/auth.py | 83 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 80 insertions(+), 3 deletions(-) diff --git a/py4web/utils/auth.py b/py4web/utils/auth.py index 14b52b62f..507d175ca 100644 --- a/py4web/utils/auth.py +++ b/py4web/utils/auth.py @@ -8,6 +8,7 @@ import time import urllib import uuid +import json from pydal.validators import ( CRYPT, @@ -36,7 +37,75 @@ [ ] Lock account after x failed login attempts. [ ] Force new password every x days. """ - +BLCOKED_IP_MAX_COUNT=5 +# timeout for blocked IP: if BLOCKED_IP_TIMEOUT=0 blocking never expired +BLOCKED_IP_TIMEOUT=5*60 # reenable blocked ip after this time in seconds +FILE_BLCOKED_IP_COUNT="host_ip_blocked.txt" +FILE_HOST_IP_DENY="host_ip_deny.txt" +FILE_HOST_IP_ALLOW="host_ip_allow.txt" +host_ip_allow=[] +host_ip_deny=[] +host_ip_blocked={} +try: + with open(FILE_HOST_IP_ALLOW, "r") as fp: + host_ip_allow = json.load(fp) +except: + pass +try: + with open(FILE_HOST_IP_DENY, "r") as fp: + host_ip_deny = json.load(fp) +except: + pass + +try: + with open(FILE_BLCOKED_IP_COUNT, "r") as fp: + host_ip_blocked = json.load(fp) +except: + pass + +def get_timestamp(): # returns timestamp in seconds from a specific time in the past + return int((datetime.datetime.utcnow()-datetime.datetime(1,1,1,1,1,1)).total_seconds()) + +def register_failed_login_ip(): # called if error in credentials + client_ip=request.remote_addr + blocked_ip=None + if host_ip_blocked: + blocked_ip = host_ip_blocked[client_ip] + else: # start new registration with timestamp in seconds + host_ip_blocked[client_ip]={'count':0,'last_failed':get_timestamp()} + blocked_ip = host_ip_blocked[client_ip] + blocked_ip['last_failed']=get_timestamp() # update with the last failed time + print(request.path) + if blocked_ip['count']< BLCOKED_IP_MAX_COUNT: + blocked_ip['count']+=1 + with open(FILE_BLCOKED_IP_COUNT, "w") as fp: + json.dump(host_ip_blocked, fp) + + # called after successfule credentials validation to check if not blocked on too many attempts + # or black or white listed +def ip_allow(): + client_ip=request.remote_addr + is_this_ip_allowd=True + if host_ip_blocked: + blocked_ip =host_ip_blocked[client_ip] + if blocked_ip: #this ip is found in the blocked list + if blocked_ip['count']>=BLCOKED_IP_MAX_COUNT: # check the failed login count for this ip + if get_timestamp()-blocked_ip['last_failed']>BLOCKED_IP_TIMEOUT: #however if timeout from first fail, then reset the counter and timestamp. + del host_ip_blocked[client_ip] + with open(FILE_BLCOKED_IP_COUNT, "w") as fp: + json.dump(host_ip_blocked, fp) + is_this_ip_allowd = True + else: + is_this_ip_allowd=False + if host_ip_allow: #check if allowed list exist + if not( client_ip in host_ip_allow):# check if listed as allowed ip + is_this_ip_allowd=False + if host_ip_deny: #check if deny list exist.This will override the allow list if exist + if client_ip in host_ip_deny: # check if listed as denyed ip + is_this_ip_allowd=False + + + return is_this_ip_allowd def b16e(text): """convert unicode to b16 unicode""" @@ -198,6 +267,7 @@ class Auth(Fixture): "account_is_blocked": "Account is blocked", "account_needs_to_be_approved": "Account needs to be approved", "invalid_credentials": "Invalid Credentials", + "invalid_ip": "Invalid credentials", "invalid_token": "invalid token", "password_doesnt_match": "Password doesn't match", "invalid_current_password": "invalid current password", @@ -657,10 +727,15 @@ def login(self, email, password): user = db(field == value).select().first() if user and not (CRYPT()(password)[0] == user.password): user = None - + err_msg = "invalid_credentials" + if user: + if not ip_allow(): + user = None + err_msg="invalid_ip" ## this woill display Invalid credentials insted od invalid Credentials (lower case c) # then check for possible login blockers + if not user: - error = "invalid_credentials" + error = err_msg elif (user["action_token"] or "").startswith("pending-registration:"): error = "registration_is_pending" elif user["action_token"] == "account-blocked": @@ -670,6 +745,7 @@ def login(self, email, password): # return the error or the user if error: + register_failed_login_ip() return (None, self.param.messages["errors"].get(error, error)) return (user, None) @@ -1544,6 +1620,7 @@ def login_buttons(self): ] ) + return dict(buttons=top_buttons, combined_div=combined_div) def login(self, model=False):