dashd/backend.py

233 lines
8 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
dashd backend - user auth + settings persistence
"""
import os
import json
import sqlite3
import hashlib
import secrets
from datetime import datetime, timedelta
from functools import wraps
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
import jwt
DB_PATH = os.environ.get('DASHD_DB', '/data/dashd.db')
JWT_SECRET = os.environ.get('DASHD_SECRET', secrets.token_hex(32))
JWT_EXPIRY_DAYS = 30
def get_db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = get_db()
conn.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS settings (
user_id INTEGER PRIMARY KEY,
services TEXT DEFAULT '[]',
card_positions TEXT DEFAULT '{}',
card_sizes TEXT DEFAULT '{}',
grid_size TEXT DEFAULT '{}',
machines TEXT DEFAULT '[]',
updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
conn.commit()
conn.close()
def hash_password(password):
salt = secrets.token_hex(16)
hashed = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000)
return salt + ':' + hashed.hex()
def verify_password(password, stored):
salt, hashed = stored.split(':')
check = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000)
return check.hex() == hashed
def create_token(user_id, username):
payload = {
'user_id': user_id,
'username': username,
'exp': datetime.utcnow() + timedelta(days=JWT_EXPIRY_DAYS)
}
return jwt.encode(payload, JWT_SECRET, algorithm='HS256')
def verify_token(token):
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=['HS256'])
return payload
except:
return None
class APIHandler(BaseHTTPRequestHandler):
def send_json(self, data, status=200):
self.send_response(status)
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Headers', 'Content-Type, Authorization')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
self.end_headers()
self.wfile.write(json.dumps(data).encode())
def do_OPTIONS(self):
self.send_response(200)
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Headers', 'Content-Type, Authorization')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
self.end_headers()
def get_user(self):
auth = self.headers.get('Authorization', '')
if auth.startswith('Bearer '):
token = auth[7:]
return verify_token(token)
return None
def read_body(self):
length = int(self.headers.get('Content-Length', 0))
if length:
return json.loads(self.rfile.read(length).decode())
return {}
def do_POST(self):
path = urlparse(self.path).path
if path == '/api/auth/register':
data = self.read_body()
username = data.get('username', '').strip().lower()
password = data.get('password', '')
if not username or not password:
return self.send_json({'error': 'username and password required'}, 400)
if len(username) < 3:
return self.send_json({'error': 'username too short'}, 400)
if len(password) < 6:
return self.send_json({'error': 'password too short'}, 400)
conn = get_db()
try:
cur = conn.execute(
'INSERT INTO users (username, password_hash) VALUES (?, ?)',
(username, hash_password(password))
)
user_id = cur.lastrowid
conn.execute('INSERT INTO settings (user_id) VALUES (?)', (user_id,))
conn.commit()
token = create_token(user_id, username)
self.send_json({'token': token, 'username': username})
except sqlite3.IntegrityError:
self.send_json({'error': 'username taken'}, 400)
finally:
conn.close()
elif path == '/api/auth/login':
data = self.read_body()
username = data.get('username', '').strip().lower()
password = data.get('password', '')
conn = get_db()
user = conn.execute(
'SELECT id, password_hash FROM users WHERE username = ?', (username,)
).fetchone()
conn.close()
if user and verify_password(password, user['password_hash']):
token = create_token(user['id'], username)
self.send_json({'token': token, 'username': username})
else:
self.send_json({'error': 'invalid credentials'}, 401)
elif path == '/api/settings/save':
user = self.get_user()
if not user:
return self.send_json({'error': 'unauthorized'}, 401)
data = self.read_body()
conn = get_db()
conn.execute('''
UPDATE settings SET
services = ?,
card_positions = ?,
card_sizes = ?,
grid_size = ?,
machines = ?,
updated_at = CURRENT_TIMESTAMP
WHERE user_id = ?
''', (
json.dumps(data.get('services', [])),
json.dumps(data.get('cardPositions', {})),
json.dumps(data.get('cardSizes', {})),
json.dumps(data.get('gridSize', {})),
json.dumps(data.get('machines', [])),
user['user_id']
))
conn.commit()
conn.close()
self.send_json({'success': True})
else:
self.send_json({'error': 'not found'}, 404)
def do_GET(self):
path = urlparse(self.path).path
if path == '/api/settings/load':
user = self.get_user()
if not user:
return self.send_json({'error': 'unauthorized'}, 401)
conn = get_db()
settings = conn.execute(
'SELECT * FROM settings WHERE user_id = ?', (user['user_id'],)
).fetchone()
conn.close()
if settings:
self.send_json({
'services': json.loads(settings['services']),
'cardPositions': json.loads(settings['card_positions']),
'cardSizes': json.loads(settings['card_sizes']),
'gridSize': json.loads(settings['grid_size']),
'machines': json.loads(settings['machines'])
})
else:
self.send_json({})
elif path == '/api/auth/verify':
user = self.get_user()
if user:
self.send_json({'valid': True, 'username': user['username']})
else:
self.send_json({'valid': False}, 401)
elif path == '/health':
self.send_json({'status': 'ok'})
else:
self.send_json({'error': 'not found'}, 404)
def log_message(self, format, *args):
pass # silent logging
if __name__ == '__main__':
init_db()
port = int(os.environ.get('PORT', 8086))
server = HTTPServer(('0.0.0.0', port), APIHandler)
print(f'dashd backend running on port {port}')
server.serve_forever()