235 lines
7.6 KiB
Python
Raw Normal View History

from flask import Flask, render_template, redirect, url_for, request, flash, jsonify
from flask_login import LoginManager, login_user, logout_user, login_required, current_user, UserMixin
import psycopg2
import logging
from dotenv import load_dotenv
import os
from werkzeug.security import generate_password_hash, check_password_hash
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
# Load environment variables from .env file
load_dotenv()
# Set up logging
logging.basicConfig(level=logging.DEBUG)
app = Flask(__name__)
app.secret_key = os.getenv('SECRET_KEY') or 'supersecretkey'
# Flask-Login configuration
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = "login"
# Rate Limiting configuration
limiter = Limiter(get_remote_address, app=app)
# Database configuration
DB_HOST = os.getenv('DB_HOST')
DB_NAME = os.getenv('DB_NAME')
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
TWILIO_PHONE_NUMBER = os.getenv('TWILIO_PHONE_NUMBER')
# Log environment variables
logging.debug(f"DB_HOST: {DB_HOST}, DB_NAME: {DB_NAME}, DB_USER: {DB_USER}, TWILIO_PHONE_NUMBER: {TWILIO_PHONE_NUMBER}")
class User(UserMixin):
def __init__(self, id, username, password):
self.id = id
self.username = username
self.password_hash = password
@staticmethod
def get_by_username(username):
conn = get_db_connection()
if not conn:
return None
try:
cur = conn.cursor()
cur.execute("SELECT id, username, password FROM users WHERE username = %s", (username,))
user = cur.fetchone()
cur.close()
return User(user[0], user[1], user[2]) if user else None
except Exception as e:
logging.error(f"Error in get_by_username: {e}")
return None
finally:
conn.close()
@staticmethod
def get_by_id(user_id):
conn = get_db_connection()
if not conn:
return None
try:
cur = conn.cursor()
cur.execute("SELECT id, username, password FROM users WHERE id = %s", (user_id,))
user = cur.fetchone()
cur.close()
return User(user[0], user[1], user[2]) if user else None
except Exception as e:
logging.error(f"Error in get_by_id: {e}")
return None
finally:
conn.close()
@staticmethod
def create(username, password):
password_hash = generate_password_hash(password)
logging.debug(f"Creating user: {username}")
conn = get_db_connection()
if not conn:
return None
try:
cur = conn.cursor()
cur.execute("INSERT INTO users (username, password) VALUES (%s, %s) RETURNING id", (username, password_hash))
user_id = cur.fetchone()[0]
conn.commit()
cur.close()
return User(user_id, username, password_hash)
except Exception as e:
logging.error(f"Error creating user: {e}")
return None
finally:
conn.close()
def check_password(self, password):
return check_password_hash(self.password_hash, password)
@login_manager.user_loader
def load_user(user_id):
return User.get_by_id(user_id)
# Connect to the database
def get_db_connection():
try:
conn = psycopg2.connect(dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST)
logging.debug("Database connection established")
return conn
except psycopg2.Error as e:
logging.error(f"Error connecting to the database: {e}")
return None
@app.errorhandler(404)
def page_not_found(e):
return render_template('404.html'), 404
@app.errorhandler(500)
def internal_server_error(e):
logging.error(f"Server error: {e}")
return "Internal server error occurred", 500
@app.route('/api/messages', methods=['GET'])
def get_messages():
conn = get_db_connection()
if not conn:
return jsonify({"error": "Failed to connect to the database"}), 500
try:
cur = conn.cursor()
cur.execute("SELECT sender, recipient, message FROM messages ORDER BY id DESC")
messages = cur.fetchall()
cur.close()
messages_list = [{"sender": msg[0].strip() if msg[0] else None, "recipient": msg[1], "message": msg[2]} for msg in messages]
return jsonify(messages_list)
except Exception as e:
logging.error(f"Error fetching messages: {e}")
return jsonify({"error": "Failed to fetch messages"}), 500
finally:
conn.close()
@app.route('/sms', methods=['POST'])
def handle_sms():
data = request.form
sender = data.get('From').strip()
body = data.get('Body')
recipient = TWILIO_PHONE_NUMBER
conn = get_db_connection()
if not conn:
return "Failed to connect to the database", 500
try:
cur = conn.cursor()
cur.execute(
"INSERT INTO messages (sender, recipient, message) VALUES (%s, %s, %s)",
[sender, recipient, body]
)
conn.commit()
cur.close()
return "SMS received", 200
except psycopg2.Error as e:
logging.error(f"Error inserting message: {e}")
return "Failed to save SMS", 500
finally:
conn.close()
@app.route('/messages', methods=['GET'])
@login_required
def messages_page():
search_query = request.args.get('search', '')
conn = get_db_connection()
if not conn:
return "Failed to connect to the database", 500
try:
cur = conn.cursor()
if search_query:
cur.execute(
"SELECT sender, recipient, message FROM messages WHERE sender ILIKE %s OR recipient ILIKE %s OR message ILIKE %s ORDER BY id DESC",
(f'%{search_query}%', f'%{search_query}%', f'%{search_query}%')
)
else:
cur.execute("SELECT sender, recipient, message FROM messages ORDER BY id DESC")
messages = cur.fetchall()
cur.close()
messages_list = [{"sender": msg[0].strip() if msg[0] else None, "recipient": msg[1], "message": msg[2]} for msg in messages]
return render_template('messages.html', messages=messages_list)
except Exception as e:
logging.error(f"Error fetching messages for rendering: {e}")
return "Failed to fetch messages", 500
finally:
conn.close()
@app.route('/login', methods=['GET', 'POST'])
@limiter.limit("5 per minute")
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
user = User.get_by_username(username)
if user and user.check_password(password):
login_user(user)
flash('Logged in successfully.')
return redirect(url_for('messages_page'))
else:
flash('Invalid username or password.')
return render_template('login.html')
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
if User.get_by_username(username):
flash('Username already exists.')
else:
user = User.create(username, password)
if user:
flash('User created successfully.')
return redirect(url_for('login'))
else:
flash('Error creating user.')
return render_template('register.html')
@app.route('/logout')
@login_required
def logout():
logout_user()
flash('Logged out successfully.')
return redirect(url_for('login'))
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)