import os
import time
import math
import asyncio
import pymysql
import aiohttp
from pyrogram import Client, filters, idle
from pyrogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery
from aiohttp import web

# ================= Configuration ================= #
API_ID = "29706288"
API_HASH = "32e67c7cffb983305c7b4fa6e52353cc"
BOT_TOKEN = "8430997589:AAGBXSdwdsXPAc7m8ywmXv0U6MuMDm01CIg"

DB_HOST = "localhost"
DB_USER = "jvaebqrj_akbot"
DB_PASS = "Aa@8ijn9okm"
DB_NAME = "jvaebqrj_abyssdata"

PORT = 8080 # Web server port for Keep-Alive
# ================================================= #

app = Client("abyss_bot", api_id=API_ID, api_hash=API_HASH, bot_token=BOT_TOKEN)

# User-specific queue locks to process 1 video at a time per user
user_locks = {}
# Tracks the currently active asyncio Task for each user to allow cancellation
active_tasks = {}

# --- Database Helpers ---
def get_db():
    return pymysql.connect(host=DB_HOST, user=DB_USER, password=DB_PASS, database=DB_NAME)

def init_db():
    """Automatically creates the required table if it doesn't exist."""
    db = get_db()
    cursor = db.cursor()
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS `abyss_users` (
          `user_id` BIGINT NOT NULL,
          `api_key` VARCHAR(255) NOT NULL,
          PRIMARY KEY (`user_id`)
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
    """)
    db.commit()
    db.close()
    print("Database check complete: 'abyss_users' table is ready.")

def save_user(user_id, api_key):
    db = get_db()
    cursor = db.cursor()
    cursor.execute("REPLACE INTO abyss_users (user_id, api_key) VALUES (%s, %s)", (user_id, api_key))
    db.commit()
    db.close()

def get_user_key(user_id):
    db = get_db()
    cursor = db.cursor()
    cursor.execute("SELECT api_key FROM abyss_users WHERE user_id = %s", (user_id,))
    result = cursor.fetchone()
    db.close()
    return result[0] if result else None

def delete_user(user_id):
    db = get_db()
    cursor = db.cursor()
    cursor.execute("DELETE FROM abyss_users WHERE user_id = %s", (user_id,))
    db.commit()
    db.close()

# --- Abyss API Helpers ---
async def verify_abyss_api(api_key):
    """Hits the /v1/about endpoint to verify if the key is valid."""
    async with aiohttp.ClientSession() as session:
        url = f"https://api.abyss.to/v1/about?key={api_key}"
        async with session.get(url) as resp:
            return resp.status == 200

async def get_custom_domain(api_key):
    """Fetches custom domain from user's resources if available."""
    async with aiohttp.ClientSession() as session:
        url = f"https://api.abyss.to/v1/resources?key={api_key}&maxResults=1"
        async with session.get(url) as resp:
            if resp.status == 200:
                data = await resp.json()
                return data.get("domainEmbed")
    return None

# --- Progress String Formatter ---
def format_bytes(size):
    power = 2**10
    n = 0
    power_labels = {0: '', 1: 'KB', 2: 'MB', 3: 'GB', 4: 'TB'}
    while size > power:
        size /= power
        n += 1
    return f"{round(size, 2)} {power_labels[n]}"

async def progress_bar(current, total, msg, action, start_time, last_update_time, user_id):
    """Updates the single status message without hitting Telegram's FloodWait limits."""
    now = time.time()
    
    # Only edit message every 3 seconds to avoid FloodWait
    if (now - last_update_time[0]) < 3 and current != total:
        return

    last_update_time[0] = now
    elapsed_time = now - start_time
    
    if elapsed_time == 0:
        elapsed_time = 0.1
        
    speed = current / elapsed_time
    percentage = current * 100 / total
    
    eta = round((total - current) / speed)
    eta_str = time.strftime("%H:%M:%S", time.gmtime(eta))
    
    bar_length = 15
    filled_len = math.floor(bar_length * current / total)
    bar = '█' * filled_len + '░' * (bar_length - filled_len)

    text = (
        f"**🔄 {action}**\n"
        f"[{bar}] {percentage:.1f}%\n"
        f"**📦 Done:** {format_bytes(current)} / {format_bytes(total)}\n"
        f"**🚀 Speed:** {format_bytes(speed)}/s\n"
        f"**⏳ ETA:** {eta_str}"
    )
    
    # Inline keyboard button attached directly to the active update payload
    reply_markup = InlineKeyboardMarkup([
        [InlineKeyboardButton("❌ Cancel Process", callback_data=f"cancel_{user_id}")]
    ])
    
    try:
        await msg.edit_text(text, reply_markup=reply_markup)
    except Exception:
        pass # Ignore minor FloodWaits or identical message edits

# --- Custom Async Generator for Uploading ---
async def file_sender(file_path, msg, action, start_time, last_update_time, user_id):
    """Yields file chunks and reports upload progress."""
    chunk_size = 512 * 1024 # 512KB chunks
    total_size = os.path.getsize(file_path)
    bytes_read = 0

    with open(file_path, 'rb') as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            bytes_read += len(chunk)
            await progress_bar(bytes_read, total_size, msg, action, start_time, last_update_time, user_id)
            yield chunk

# ================= Telegram Handlers ================= #

@app.on_message(filters.command("start"))
async def start_cmd(client, message: Message):
    await message.reply_text("Hello! Send `/login YOUR_API_KEY` to connect your Abyss.to account.")

@app.on_message(filters.command("login"))
async def login_cmd(client, message: Message):
    if len(message.command) < 2:
        await message.reply_text("⚠️ Usage: `/login YOUR_API_KEY`")
        return
    
    api_key = message.command[1]
    wait_msg = await message.reply_text("🔄 Verifying API key...")
    
    try:
        is_valid = await verify_abyss_api(api_key)
        if is_valid:
            save_user(message.from_user.id, api_key)
            await wait_msg.edit_text("✅ Successfully logged in! You can now send me videos to upload.")
        else:
            await wait_msg.edit_text("❌ Invalid API Key. Please check and try again.")
    except Exception as e:
        await wait_msg.edit_text(f"❌ Database error: `{str(e)}`")

@app.on_message(filters.command("logout"))
async def logout_cmd(client, message: Message):
    delete_user(message.from_user.id)
    await message.reply_text("👋 Logged out. Your API key has been deleted from the database.")

@app.on_message(filters.video | filters.document)
async def handle_video(client, message: Message):
    user_id = message.from_user.id
    api_key = get_user_key(user_id)
    
    if not api_key:
        await message.reply_text("⚠️ You need to login first. Send `/login YOUR_API_KEY`.")
        return

    # Ensure queue lock exists for user
    if user_id not in user_locks:
        user_locks[user_id] = asyncio.Lock()
    
    # Check if queue is locked (meaning they already have a video processing)
    if user_locks[user_id].locked():
        queue_msg = await message.reply_text("🕒 Added to your queue. Waiting for previous uploads to finish...")
    else:
        queue_msg = await message.reply_text("🕒 Processing started...")

    # Wait for turn in queue
    async with user_locks[user_id]:
        # Track this coroutine task instance as the active worker
        active_tasks[user_id] = asyncio.current_task()
        
        reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("❌ Cancel Process", callback_data=f"cancel_{user_id}")]])
        status_msg = await queue_msg.edit_text("🔄 Preparing to download...", reply_markup=reply_markup)
        
        start_time = time.time()
        last_update_time = [0]
        file_path = None
        
        try:
            # 1. Download File from Telegram
            file_path = await client.download_media(
                message,
                progress=progress_bar,
                progress_args=(status_msg, "Downloading from Telegram...", start_time, last_update_time, user_id)
            )

            if not file_path:
                await status_msg.edit_text("❌ Failed to download media.")
                return

            await status_msg.edit_text("🔄 Connecting to Abyss Server...", reply_markup=reply_markup)

            # 2. Upload to Abyss.to
            upload_url = f"https://up.abyss.to/{api_key}"
            start_time = time.time()
            last_update_time = [0]

            async with aiohttp.ClientSession() as session:
                form = aiohttp.FormData()
                form.add_field('file', 
                               file_sender(file_path, status_msg, "Uploading to Abyss.to...", start_time, last_update_time, user_id),
                               filename=os.path.basename(file_path))
                
                async with session.post(upload_url, data=form) as response:
                    if response.status == 200:
                        res_json = await response.json()
                        slug = res_json.get("slug")
                        
                        # 3. Retrieve Domain Configuration
                        domain = await get_custom_domain(api_key)
                        
                        if domain:
                            final_link = f"https://{domain}/{slug}"
                        else:
                            final_link = f"https://abyssplayer.com/{slug}"

                        await status_msg.edit_text(f"✅ **Upload Complete!**\n\n🔗 **Link:** {final_link}")
                    else:
                        await status_msg.edit_text(f"❌ Upload failed. Server returned: {response.status}")

        except asyncio.CancelledError:
            # Caught automatically when task.cancel() is requested by a button click
            try:
                await status_msg.edit_text("❌ **Process Cancelled**\nThe download/upload process was terminated by the user.")
            except Exception:
                pass
            raise # Re-raise to ensure the task terminates properly and exits the queue lock context
            
        except Exception as e:
            await status_msg.edit_text(f"❌ An error occurred: `{str(e)}`")
        
        finally:
            # Cleanup tracking assignment
            active_tasks.pop(user_id, None)
            # Cleanup local temporary file to save storage space
            if file_path and os.path.exists(file_path):
                os.remove(file_path)

# ================= Cancellation Button Handler ================= #
@app.on_callback_query(filters.regex(r"^cancel_(\d+)$"))
async def handle_cancellation(client, callback_query: CallbackQuery):
    target_user_id = int(callback_query.matches[0].group(1))
    
    # Security check: Ensure the person clicking is the actual owner of the process
    if callback_query.from_user.id != target_user_id:
        await callback_query.answer("⚠️ You can only cancel your own tasks!", show_alert=True)
        return
        
    task = active_tasks.get(target_user_id)
    
    if task and not task.done():
        task.cancel() # Kills the task instantly
        await callback_query.answer("🛑 Terminating download/upload stream...", show_alert=True)
    else:
        await callback_query.answer("⚠️ No active running task found for this process.", show_alert=True)

# ================= Keep-Alive Web Server ================= #
async def web_handler(request):
    return web.Response(text="Bot is running smoothly!", status=200)

async def run_web_server():
    app_web = web.Application()
    app_web.add_routes([web.get('/', web_handler)])
    runner = web.AppRunner(app_web)
    await runner.setup()
    site = web.TCPSite(runner, '0.0.0.0', PORT)
    await site.start()
    print(f"Web server started on port {PORT}")

# ================= Startup Routine ================= #
async def main():
    init_db()
    await run_web_server()
    print("Starting bot...")
    await app.start()
    await idle()
    await app.stop()

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())