Commit aab37983 authored by harshavardhan.c's avatar harshavardhan.c

New Module implemented for generating login-token.

parents
-----BEGIN RSA PRIVATE KEY-----
MIICWwIBAAKBgQClilTaeHq6Zc+kWHCNl1O0btGRm7ct3O5zqWx1mwwLUWH14eft
Hi5wIbOYh79JQ9BO2OA4UjPq31uwmJ96Okl0OULfENhwd/D7P3mnoRlktPT2t+tt
RRrKvx3wNpOy/3nBsXnNt8EKxyA7k9vbqLbv9pGw2hcqOYe/NGTkmm1PswIDAQAB
AoGAZPARR1l5NBkKYGKQ1rU0E+wSmx+AtVVmjF39RUSyNmB8Q+poebwSgsr58IKt
T6Yq6Tjyl0UAZTGmferCK0xJJrqyP0hMn4nNNut+acWMKyt+9YrA2FO+r5Jb9JuT
SK35xXnM4aZLGppgWJxRzctpIz+qkf6oLRSZme0AuiqcwYECQQDY+QDL3wbWplRW
bze0DsZRMkDAkNY5OCydvjte4SR/mmAzsrpNrS5NztWbaaQrefoPbsdYBPbd8rS7
C/s/0L1zAkEAw1EC5zt2STuhkcKLa/tL+bk8WHHHtf19aC9kBj1TvWBFh+JojWCo
86iK5fLcHzhyQx5Qi3E9LG2HvOWhS1iUwQJAKbEHHyWW2c4SLJ2oVXf1UYrXeGkc
UNhjclgobl3StpZCYAy60cwyNo9E6l0NR7FjhG2j7lzd1t4ZLkvqFmQU0wJATLPe
yQIwBLh3Te+xoxlQD+Tvzuf3/v9qpWSfClhBL4jEJYYDeynvj6iry3whd91J+hPI
m8o/tNfay5L+UcGawQJAAtbqQc7qidFq+KQYLnv5gPRYlX/vNM+sWstUAqvWdMze
JYUoTHKgiXnSZ4mizI6/ovsBOMJTb6o1OJCKQtYylw==
-----END RSA PRIVATE KEY-----
-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQClilTaeHq6Zc+kWHCNl1O0btGR
m7ct3O5zqWx1mwwLUWH14eftHi5wIbOYh79JQ9BO2OA4UjPq31uwmJ96Okl0OULf
ENhwd/D7P3mnoRlktPT2t+ttRRrKvx3wNpOy/3nBsXnNt8EKxyA7k9vbqLbv9pGw
2hcqOYe/NGTkmm1PswIDAQAB
-----END PUBLIC KEY-----
import uuid
from datetime import datetime, timedelta
import redis
import typer
from typer import Typer
from jwt_util import Secrets, JWT
app = Typer(
name="Generating Token",
)
jwt = JWT()
def create_token(user_id, ip, token, age=Secrets.LOCK_OUT_TIME_MINS):
"""
This method is to create a cookie
"""
try:
uid = str(uuid.uuid4()).replace("-", "")
payload = {"ip": ip, "user_id": user_id, "token": token, "uid": uid, "age": age}
exp = datetime.utcnow() + timedelta(minutes=age)
_extras = {"iss": Secrets.issuer, "exp": exp}
_payload = {**payload, **_extras}
new_token = jwt.encode(_payload)
# Add session to redis
login_db.set(uid, new_token)
login_db.expire(uid, timedelta(minutes=age))
return uid
except Exception as e:
print(e)
raise
Redis_host: str = typer.prompt("Enter Redis Host to continue", default='192.168.0.220')
Redis_port: int = int(typer.prompt("Enter Redis Port to continue", default='6379'))
Redis_db: int = int(typer.prompt("Enter Redis DB to continue", default='9'))
user_details: str = typer.prompt("Enter User Id to continue", default='user_099')
ip_details: str = typer.prompt("Enter Ip to continue", default='127.0.0.1')
token_age_in_days: int = int(typer.prompt("Enter Token Age in days to continue", default='2'))
login_db = redis.Redis(
host=Redis_host,
port=Redis_port,
db=Redis_db,
decode_responses=True,
)
age_in_minutes = token_age_in_days * 24 * 60
token_details = create_token(user_id=user_details, ip=ip_details, token=Secrets.token, age=age_in_minutes)
typer.echo(typer.style(f"Token Generated for {token_age_in_days} days is {token_details}", fg=typer.colors.GREEN))
import os
import jwt
from jwt.exceptions import (
InvalidSignatureError,
ExpiredSignatureError,
MissingRequiredClaimError,
)
class KeyPath:
public = os.path.join("data", "keys", "public")
private = os.path.join("data", "keys", "private")
class Secrets:
LOCK_OUT_TIME_MINS = 30
leeway_in_mins = 10
unique_key = "45c37939-0f75"
token = "8674cd1d-2578-4a62-8ab7-d3ee5f9a"
issuer = "ilens"
alg = "RS256"
class AuthenticationError(Exception):
"""
JWT Authentication Error
"""
class ErrorMessages:
ERROR001 = "Authentication Failed. Please verify token"
ERROR002 = "Signature Expired"
ERROR003 = "Signature Not Valid"
class JWT:
def __init__(self):
self.max_login_age = Secrets.LOCK_OUT_TIME_MINS
self.issuer = Secrets.issuer
self.alg = Secrets.alg
self.public = KeyPath.public
self.private = KeyPath.private
def encode(self, payload):
try:
with open(self.private, "r") as f:
key = f.read()
return jwt.encode(payload, key, algorithm=self.alg)
except Exception as e:
print(f"Exception while encoding JWT: {str(e)}")
raise
finally:
f.close()
def decode(self, token):
try:
with open(self.public, "r") as f:
key = f.read()
return jwt.decode(token, key, algorithms=self.alg)
except Exception as e:
print(f"Exception while encoding JWT: {str(e)}")
raise
finally:
f.close()
def validate(self, token):
try:
with open(self.public, "r") as f:
key = f.read()
payload = jwt.decode(
token,
key,
algorithms=self.alg,
leeway=Secrets.leeway_in_mins,
options={"require": ["exp", "iss"]},
)
return payload
except InvalidSignatureError:
raise AuthenticationError(ErrorMessages.ERROR003)
except ExpiredSignatureError:
raise AuthenticationError(ErrorMessages.ERROR002)
except MissingRequiredClaimError:
raise AuthenticationError(ErrorMessages.ERROR002)
except Exception as e:
print(f"Exception while validating JWT: {str(e)}")
raise
finally:
f.close()
PyJWT==2.1.0
pyjwt[crypto]
redis~=3.5.3
typer==0.4.0
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment