cdc-placement-website-backend/CDC_Backend/APIs/utils.py

391 lines
15 KiB
Python
Raw Permalink Normal View History

2021-12-17 17:15:56 +05:30
import datetime
2022-05-02 17:16:56 +05:30
import json
2021-10-15 20:47:23 +05:30
import logging
import os
import random
2021-12-16 23:05:04 +05:30
import re
2021-10-15 20:47:23 +05:30
import string
import sys
import traceback
2021-10-15 20:47:23 +05:30
from os import path, remove
2022-05-02 17:16:56 +05:30
import background_task
2021-12-17 19:23:32 +05:30
import jwt
2022-05-02 17:16:56 +05:30
import pdfkit
import pytz
2022-05-02 17:16:56 +05:30
import requests as rq
2021-10-15 20:47:23 +05:30
from django.conf import settings
from django.core.mail import EmailMultiAlternatives
from django.forms.models import model_to_dict
2021-10-15 20:47:23 +05:30
from django.http import Http404
from django.shortcuts import get_object_or_404
from django.template.loader import render_to_string
from django.utils.html import strip_tags
from django.utils import timezone
2021-10-15 20:47:23 +05:30
from google.auth.transport import requests
from google.oauth2 import id_token
from rest_framework import status
from rest_framework.response import Response
from .constants import *
2022-07-28 18:01:55 +05:30
from .models import User, PrePlacementOffer, PlacementApplication, Placement, Student
2021-10-15 20:47:23 +05:30
logger = logging.getLogger('db')
def precheck(required_data=None):
if required_data is None:
required_data = []
def decorator(view_func):
def wrapper_func(request, *args, **kwargs):
try:
request_data = None
if request.method == 'GET':
request_data = request.GET
elif request.method == 'POST':
request_data = request.data
if not len(request_data):
request_data = request.POST
if len(request_data):
for i in required_data:
if i not in request_data:
return Response({'action': "Pre check", 'message': str(i) + " Not Found"},
status=status.HTTP_400_BAD_REQUEST)
else:
return Response({'action': "Pre check", 'message': "Message Data not Found"},
status=status.HTTP_400_BAD_REQUEST)
return view_func(request, *args, **kwargs)
except:
logger.warning("Pre check: " + str(sys.exc_info()))
2021-12-12 20:13:47 +05:30
return Response({'action': "Pre check", 'message': "Something went wrong"},
2021-10-15 20:47:23 +05:30
status=status.HTTP_400_BAD_REQUEST)
return wrapper_func
return decorator
def isAuthorized(allowed_users=None):
if allowed_users is None:
allowed_users = []
def decorator(view_func):
def wrapper_func(request, *args, **kwargs):
try:
headers = request.META
if 'HTTP_AUTHORIZATION' in headers:
token_id = headers['HTTP_AUTHORIZATION'][7:]
idinfo = id_token.verify_oauth2_token(token_id, requests.Request(), CLIENT_ID, clock_skew_in_seconds=60)
2021-10-15 20:47:23 +05:30
email = idinfo[EMAIL]
user = get_object_or_404(User, email=email)
if user:
user.last_login_time = timezone.now()
2022-05-02 17:16:56 +05:30
user.save()
2021-10-15 20:47:23 +05:30
if len(set(user.user_type).intersection(set(allowed_users))) or allowed_users == '*':
return view_func(request, user.id, user.email, user.user_type, *args, **kwargs)
else:
raise PermissionError("Access Denied. You are not allowed to use this service")
else:
raise PermissionError("Authorization Header Not Found")
2021-12-12 20:13:47 +05:30
except PermissionError:
return Response({'action': "Is Authorized?", 'message': 'Access Denied'},
2021-10-15 20:47:23 +05:30
status=status.HTTP_401_UNAUTHORIZED)
except Http404:
return Response({'action': "Is Authorized?", 'message': "User Not Found. Contact CDC for more details"},
status=status.HTTP_404_NOT_FOUND)
except ValueError as e:
2021-12-12 20:13:47 +05:30
logger.error("Problem with Google Oauth2.0 " + str(e))
return Response({'action': "Is Authorized?", 'message': 'Problem with Google Sign In'},
2021-10-15 20:47:23 +05:30
status=status.HTTP_401_UNAUTHORIZED)
except:
logger.warning("Is Authorized? " + str(sys.exc_info()))
return Response(
{'action': "Is Authorized?", 'message': "Something went wrong. Contact CDC for more details"},
status=status.HTTP_400_BAD_REQUEST)
2021-10-15 20:47:23 +05:30
return wrapper_func
return decorator
def generateRandomString():
try:
N = 15
2021-12-17 17:15:56 +05:30
res = ''.join(random.choices(string.ascii_uppercase + string.ascii_lowercase + string.digits, k=N))
2021-10-15 20:47:23 +05:30
return res
except:
return False
def saveFile(file, location):
prefix = generateRandomString()
file_name = prefix + "_" + file.name.strip()
2021-10-15 20:47:23 +05:30
2021-12-16 23:05:04 +05:30
file_name = re.sub(r'[\\/:*?"<>|]', '_', file_name)
2021-10-15 20:47:23 +05:30
if not path.isdir(location):
2021-12-16 23:05:04 +05:30
os.makedirs(location)
2021-10-15 20:47:23 +05:30
destination_path = location + str(file_name)
2021-10-15 20:47:23 +05:30
if path.exists(destination_path):
remove(destination_path)
with open(destination_path, 'wb+') as destination:
for chunk in file.chunks():
destination.write(chunk)
return file_name
2022-07-28 18:01:55 +05:30
@background_task.background(schedule=2)
def sendEmail(email_to, subject, data, template, attachment_jnf_response=None):
2021-10-15 20:47:23 +05:30
try:
if not isinstance(data, dict):
data = json.loads(data)
2021-10-15 20:47:23 +05:30
html_content = render_to_string(template, data) # render with dynamic value
text_content = strip_tags(html_content)
email_from = settings.EMAIL_HOST_USER
if type(email_to) is list:
recipient_list = [str(email) for email in email_to]
else:
recipient_list = [str(email_to), ]
2021-10-15 20:47:23 +05:30
msg = EmailMultiAlternatives(subject, text_content, email_from, recipient_list)
msg.attach_alternative(html_content, "text/html")
if attachment_jnf_response:
2022-09-27 12:36:47 +05:30
# logger.info(attachment_jnf_response)
pdf = pdfkit.from_string(attachment_jnf_response['html'], False,
2022-05-02 17:16:56 +05:30
options={"--enable-local-file-access": "", '--dpi': '96'})
msg.attach(attachment_jnf_response['name'], pdf, 'application/pdf')
2021-10-15 20:47:23 +05:30
msg.send()
return True
except:
logger.error("Send Email: " + str(sys.exc_info()))
return False
2021-10-15 20:47:23 +05:30
def PlacementApplicationConditions(student, placement):
try:
selected_companies = PlacementApplication.objects.filter(student=student, selected=True)
selected_companies_PSU = [i for i in selected_companies if i.placement.tier == 'psu']
PPO = PrePlacementOffer.objects.filter(student=student, accepted=True)
2022-07-28 18:01:55 +05:30
# find length of PPO
if len(selected_companies) + len(PPO) >= MAX_OFFERS_PER_STUDENT:
2021-10-15 20:47:23 +05:30
raise PermissionError("Max Applications Reached for the Season")
if len(selected_companies_PSU) > 0:
raise PermissionError('Selected for PSU Can\'t apply anymore')
if placement.tier == 'psu':
return True, "Conditions Satisfied"
for i in selected_companies:
if int(i.placement.tier) < int(placement.tier):
2022-10-08 13:20:41 +05:30
return False, "Can't apply for this tier"
for i in PPO:
if int(i.placement.tier) < int(placement.tier):
2021-10-15 20:47:23 +05:30
return False, "Can't apply for this tier"
if student.degree != 'bTech' and not placement.rs_eligible:
raise PermissionError("Can't apply for this placement")
2021-10-15 20:47:23 +05:30
return True, "Conditions Satisfied"
except PermissionError as e:
return False, e
except:
logger.warning("Utils - PlacementApplicationConditions: " + str(sys.exc_info()))
return False, "_"
def getTier(compensation_gross, is_psu=False):
try:
if is_psu:
return True, 'psu'
if compensation_gross < 0:
raise ValueError("Negative Compensation")
elif compensation_gross < 600000: # Tier 7 If less than 600,000
return True, "7"
# Tier 6 If less than 800,000 and greater than or equal to 600,000
elif compensation_gross < 800000:
return True, "6"
# Tier 5 If less than 1,000,000 and greater than or equal to 800,000
elif compensation_gross < 1000000:
return True, "5"
# Tier 4 If less than 1,200,000 and greater than or equal to 1,000,000
elif compensation_gross < 1200000:
return True, "4"
# Tier 3 If less than 1,500,000 and greater than or equal to 1,200,000
elif compensation_gross < 1500000:
return True, "3"
# Tier 2 If less than 1,800,000 and greater than or equal to 1,500,000
elif compensation_gross < 1800000:
return True, "2"
# Tier 1 If greater than or equal to 1,800,000
elif compensation_gross >= 1800000:
return True, "1"
else:
raise ValueError("Invalid Compensation")
except ValueError as e:
logger.warning("Utils - getTier: " + str(sys.exc_info()))
return False, e
except:
logger.warning("Utils - getTier: " + str(sys.exc_info()))
return False, "_"
2021-12-17 17:15:56 +05:30
def generateOneTimeVerificationLink(email, opening_id, opening_type):
try:
token_payload = {
"email": email,
"opening_id": opening_id,
"opening_type": opening_type,
'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=EMAIL_VERIFICATION_TOKEN_TTL)
}
token = jwt.encode(token_payload, os.environ.get("EMAIL_VERIFICATION_SECRET_KEY"), algorithm="HS256")
link = LINK_TO_EMAIl_VERIFICATION_API.format(token=token)
return True, link
except:
logger.warning("Utils - generateOneTimeVerificationLink: " + str(sys.exc_info()))
return False, "_"
2022-04-10 23:12:02 +05:30
2022-05-02 17:16:56 +05:30
2022-04-10 23:12:02 +05:30
def verify_recaptcha(request):
try:
data = {
'secret': settings.RECAPTCHA_SECRET_KEY,
'response': request
}
r = rq.post('https://www.google.com/recaptcha/api/siteverify', data=data)
result = r.json()
if not result['success']:
logger.warning("Utils - verify_recaptcha: " + str(result))
2022-04-10 23:12:02 +05:30
return result['success']
except:
# get exception line number
2022-04-10 23:12:02 +05:30
logger.warning("Utils - verify_recaptcha: " + str(sys.exc_info()))
return False, "_"
2022-05-02 17:16:56 +05:30
def opening_description_table_html(opening):
# check typing of opening
if isinstance(opening, Placement):
details = model_to_dict(opening, fields=[field.name for field in Placement._meta.fields],
exclude=EXCLUDE_IN_PDF)
# check typing of opening is query dict
else: # if isinstance(opening, QueryDict):
details = opening
keys = list(details.keys())
newdetails = {}
for key in keys:
if isinstance(details[key], list):
details[key] = {"details": details[key], "type": ["list"]}
if key in SPECIAL_FORMAT_IN_PDF:
if key == 'website':
details[key] = {"details": details[key], "type": ["link"]}
else:
details[key] = {"details": [item for item in details[key]["details"]], "type": ["list", "link"],
2022-05-02 17:16:56 +05:30
"link": PDF_FILES_SERVING_ENDPOINT + opening.id + "/"}
new_key = key.replace('_', ' ')
if new_key.endswith(' names'):
new_key = new_key[:-6]
new_key = new_key.capitalize()
newdetails[new_key] = details[key]
imagepath = os.path.abspath('./templates/image.png')
data = {
2022-05-02 17:16:56 +05:30
"data": newdetails,
"imgpath": imagepath
}
2022-05-02 17:16:56 +05:30
return render_to_string(COMPANY_JNF_RESPONSE_TEMPLATE, data)
2022-07-28 18:01:55 +05:30
def placement_eligibility_filters(student, placements):
try:
filtered_placements = []
for placement in placements.iterator():
if PlacementApplicationConditions(student, placement)[0]:
filtered_placements.append(placement)
return filtered_placements
except:
logger.warning("Utils - placement_eligibility_filters: " + str(sys.exc_info()))
return placements
@background_task.background(schedule=2)
def send_opening_notifications(placement_id):
try:
placement = get_object_or_404(Placement, id=placement_id)
students = Student.objects.all()
for student in students.iterator():
if student.branch in placement.allowed_branch:
if student.degree == 'bTech' or placement.rs_eligible is True:
if PlacementApplicationConditions(student, placement)[0]:
try:
student_user = get_object_or_404(User, id=student.id)
subject = NOTIFY_STUDENTS_OPENING_TEMPLATE_SUBJECT.format(
company_name=placement.company_name)
deadline_datetime = placement.deadline_datetime.astimezone(pytz.timezone('Asia/Kolkata'))
data = {
"company_name": placement.company_name,
"opening_type": 'Placement',
"designation": placement.designation,
"deadline": deadline_datetime.strftime("%A, %-d %B %Y, %-I:%M %p"),
"link": PLACEMENT_OPENING_URL.format(id=placement.id)
}
sendEmail(student_user.email, subject, data, NOTIFY_STUDENTS_OPENING_TEMPLATE)
except Http404:
logger.warning('Utils - send_opening_notifications: user not found : ' + student.id)
except Exception as e:
logger.warning('Utils - send_opening_notifications: For Loop' + str(e))
2022-07-28 18:01:55 +05:30
except:
logger.warning('Utils - send_opening_notifications: ' + str(sys.exc_info()))
return False
def exception_email(opening):
opening = opening.dict()
data = {
"designation": opening["designation"],
"opening_type": PLACEMENT,
"company_name": opening["company_name"],
}
pdfhtml = opening_description_table_html(opening)
name = opening["company_name"] + '_jnf_response.pdf'
attachment_jnf_respone = {
"name": name,
"html": pdfhtml,
}
sendEmail(CDC_MAIl_ADDRESS, COMPANY_OPENING_ERROR_TEMPLATE.format(company_name=opening["company_name"]), data,
COMPANY_OPENING_SUBMITTED_TEMPLATE, attachment_jnf_respone)
def store_all_files(request):
files = request.FILES
data = request.data
# save all the files
if files:
# company details pdf
for file in files.getlist(COMPANY_DETAILS_PDF):
file_location = STORAGE_DESTINATION_COMPANY_ATTACHMENTS + "temp" + '/'
saveFile(file, file_location)
# compensation details pdf
for file in files.getlist(COMPENSATION_DETAILS_PDF):
file_location = STORAGE_DESTINATION_COMPANY_ATTACHMENTS + "temp" + '/'
saveFile(file, file_location)
# selection procedure details pdf
for file in files.getlist(SELECTION_PROCEDURE_DETAILS_PDF):
file_location = STORAGE_DESTINATION_COMPANY_ATTACHMENTS + "temp" + '/'
saveFile(file, file_location)
# description pdf
for file in files.getlist(DESCRIPTION_PDF):
file_location = STORAGE_DESTINATION_COMPANY_ATTACHMENTS + "temp" + '/'
saveFile(file, file_location)