from pyrogram import Client, filters, types import re import httpx import os from songbox import SongboxClient import json async def download_file(url, filename): async with httpx.AsyncClient() as client: response = await client.get(url) response.raise_for_status() with open(filename, 'wb') as f: f.write(response.content) return filename with open("config.json", "r") as f: config = json.load(f) songbox = SongboxClient() songbox.set_token(config['songboxToken']) app = Client("songbox", api_id=6, api_hash="eb06d4abfb49dc3eeb1aeb98ae0f581e", bot_token=config['botToken']) @app.on_message(filters.command('start')) async def start(_, message: types.Message): await message.reply("Hello!") @app.on_message(filters.text) async def echo(_, message: filters.Message): match = re.search(r'/((songs|albums|artists))/(\d+)', message.text) if match: content_type = match.group(1) content_id = match.group(3) if content_type == 'songs': song = songbox.music.get_song(content_id) artwork_file = f"{song['name']}_artwork.jpg" await download_file(song['artwork_url'], artwork_file) duration_seconds = int(song['duration']) await message.reply_photo( photo=artwork_file, caption=( f"**Album:** {song['album_name']}\n" f"**Artist:** {song['artist_name']}\n" f"**Song:** {song['name']}\n" f"**Duration:** {duration_seconds // 60}:{duration_seconds % 60:02}" ) ) audio_file = songbox.music.download_song(song['url_original']) await message.reply_audio( audio=audio_file, performer=song['artist_name'], title=song['name'], thumb=artwork_file, ) if os.path.exists(audio_file): os.remove(audio_file) if os.path.exists(artwork_file): os.remove(artwork_file) elif content_type =='albums': album = songbox.music.get_album(content_id) artwork_file = f"{album['name']}_artwork.jpg" await download_file(album['artwork_url'], artwork_file) await message.reply_photo( photo=artwork_file, caption=( f"**Album:** {album['name']} ({album['released_date']})\n" f"**Artist:** {album['artist_name']}\n" f"**Tracks:** {len(album['songs'])}\n" f"**Share:** https://on.lavafoshi.mv/albums/{album['id']}" ) ) for song in album['songs']: audio_file = songbox.music.download_song(song['url_original']) await message.reply_audio( audio=audio_file, performer=song['artist_name'], title=song['name'], thumb=artwork_file, ) if os.path.exists(audio_file): os.remove(audio_file) if os.path.exists(artwork_file): os.remove(artwork_file) elif content_type =='artists': await message.reply("downloading from artists does not support yet") new_token = songbox.auth.refresh_token() config['songboxToken'] = new_token with open("config.json", "w") as f: json.dump(config, f, indent=4) app.run()