155 lines
5.1 KiB
Python
155 lines
5.1 KiB
Python
import os
|
|
import sys
|
|
import string
|
|
import random
|
|
import uuid
|
|
|
|
from flask import Flask, session, render_template_string, render_template, redirect, url_for, flash, request, current_app
|
|
from flask_security import Security, current_user, auth_required, hash_password, SQLAlchemySessionUserDatastore
|
|
from flask_login import login_user, logout_user
|
|
from flask_font_awesome import FontAwesome
|
|
|
|
from flask_dance.contrib.google import make_google_blueprint, google
|
|
from urllib.parse import urlencode
|
|
|
|
from flask_mailman import Mail
|
|
from werkzeug.middleware.proxy_fix import ProxyFix
|
|
from werkzeug.security import generate_password_hash
|
|
|
|
from .config import Config, ProductionConfig, DevelopmentConfig
|
|
from dotenv import load_dotenv
|
|
|
|
from .database import init_db
|
|
from .models import User, Role
|
|
|
|
from .views import views_bp
|
|
from .airtableget import airtable_bp
|
|
|
|
fa = FontAwesome()
|
|
#debug_mode = False
|
|
|
|
def create_app():
|
|
app = Flask(__name__, instance_relative_config=True)
|
|
app.config['MAX_CONTENT_LENGTH'] = 20 * 1024 * 1024
|
|
|
|
if "--debug" in sys.argv:
|
|
app.config.from_object(DevelopmentConfig)
|
|
print("Debug Mode " + str(app.config['DEBUG']))
|
|
#debug_mode = False
|
|
else:
|
|
app.config.from_object(ProductionConfig)
|
|
print("Debug Mode " + str(app.config['DEBUG']))
|
|
google_blueprint = create_google_blueprint(app)
|
|
app.register_blueprint(google_blueprint, url_prefix="/login")
|
|
#debug_mode = False
|
|
|
|
@app.context_processor
|
|
def context_processor():
|
|
google_login_url = False
|
|
def google_login_url():
|
|
if app.debug:
|
|
print("sorry, in debug mode, no google login available")
|
|
return None
|
|
return url_for("google.login")
|
|
return dict(google_login_url=google_login_url)
|
|
|
|
global db_session
|
|
with app.app_context():
|
|
db_session = init_db()
|
|
|
|
user_datastore = SQLAlchemySessionUserDatastore(db_session, User, Role)
|
|
app.security = Security(app, user_datastore)
|
|
|
|
mail = Mail(app)
|
|
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1)
|
|
app.register_blueprint(views_bp)
|
|
app.register_blueprint(airtable_bp)
|
|
|
|
with app.app_context():
|
|
create_users(app, user_datastore)
|
|
fa.init_app(app)
|
|
|
|
define_routes(app)
|
|
return app
|
|
|
|
def define_routes(app):
|
|
@app.route('/authorized')
|
|
def authorized():
|
|
if app.config['DEBUG'] == True:
|
|
return "Not available in development mode", 404
|
|
if not google.authorized:
|
|
return redirect(url_for("google.login"))
|
|
resp = google.get("/oauth2/v3/tokeninfo")
|
|
if not resp.ok:
|
|
flash("Failed to authenticate with Google.", "danger")
|
|
return redirect(url_for("views.home"))
|
|
|
|
idinfo = resp.json()
|
|
email = idinfo['email']
|
|
if 'name' in idinfo:
|
|
name = idinfo['name']
|
|
else:
|
|
name = email
|
|
|
|
def is_email_authorized(email):
|
|
for user in app.config['ALLOWED_USERS']:
|
|
if user[0] == email:
|
|
return True
|
|
return False
|
|
|
|
if not is_email_authorized(email):
|
|
flash("Your email is not authorized.", "danger")
|
|
return redirect(url_for("views.home"))
|
|
|
|
user = app.security.datastore.find_user(email=email)
|
|
if not user:
|
|
random_password = generate_random_password()
|
|
hashed_password = generate_password_hash(random_password, method='sha256')
|
|
user = app.security.datastore.create_user(email=email, username=name, password=hashed_password)
|
|
db_session.commit()
|
|
|
|
login_user(user)
|
|
session['logged_in_with_google'] = True
|
|
return redirect(url_for("views.home"))
|
|
app.add_url_rule('/authorized', view_func=authorized)
|
|
|
|
|
|
@app.route("/logout")
|
|
@auth_required()
|
|
def logout():
|
|
logout_user()
|
|
return redirect(url_for("views.home"))
|
|
|
|
def create_google_blueprint(app):
|
|
google_blueprint = make_google_blueprint(
|
|
client_id=app.config['GOOGLE_OAUTH_CLIENT_ID'],
|
|
client_secret=app.config['GOOGLE_OAUTH_CLIENT_SECRET'],
|
|
scope=["https://www.googleapis.com/auth/userinfo.email", "openid"],
|
|
redirect_url="/authorized",
|
|
)
|
|
return google_blueprint
|
|
|
|
def generate_random_password(length=12):
|
|
characters = string.ascii_letters + string.digits + string.punctuation
|
|
return ''.join(random.choice(characters) for i in range(length))
|
|
|
|
def create_users(app, user_datastore):
|
|
for user in app.config['ALLOWED_USERS']:
|
|
useremail, userrole = user
|
|
user_in_db = app.security.datastore.find_user(email=useremail)
|
|
role = app.security.datastore.find_role(userrole)
|
|
|
|
if not role:
|
|
role = user_datastore.create_role(name=userrole)
|
|
db_session.add(role)
|
|
db_session.commit()
|
|
|
|
if not user_in_db:
|
|
new_user = app.security.datastore.create_user(email=useremail, roles=[role], password=generate_password_hash(generate_random_password(), method='sha256'))
|
|
db_session.commit()
|
|
|
|
|
|
app = create_app()
|
|
|
|
if __name__ == "__main__":
|
|
app.run() |