Files
songboxBot/main.py
ポキ d57590617d feat(echo): implement album download functionality
Add support for downloading albums in the echo command, including artwork and all tracks. The implementation fetches album details, downloads artwork, and processes each song in the album, cleaning up temporary files after sending.
2025-06-07 12:25:22 -07:00

98 lines
3.5 KiB
Python

from pyrogram import Client, filters, types
import re
import httpx
import os
from songbox import SongboxClient
import json
async def download_file(url, filename):
async with httpx.AsyncClient() as client:
response = await client.get(url)
response.raise_for_status()
with open(filename, 'wb') as f:
f.write(response.content)
return filename
with open("config.json", "r") as f:
config = json.load(f)
songbox = SongboxClient()
songbox.set_token(config['songboxToken'])
app = Client("songbox", api_id=6, api_hash="eb06d4abfb49dc3eeb1aeb98ae0f581e", bot_token=config['botToken'])
@app.on_message(filters.command('start'))
async def start(_, message: types.Message):
await message.reply("Hello!")
@app.on_message(filters.text)
async def echo(_, message: filters.Message):
match = re.search(r'/((songs|albums|artists))/(\d+)', message.text)
if match:
content_type = match.group(1)
content_id = match.group(3)
if content_type == 'songs':
song = songbox.music.get_song(content_id)
artwork_file = f"{song['name']}_artwork.jpg"
await download_file(song['artwork_url'], artwork_file)
duration_seconds = int(song['duration'])
await message.reply_photo(
photo=artwork_file,
caption=(
f"**Album:** {song['album_name']}\n"
f"**Artist:** {song['artist_name']}\n"
f"**Song:** {song['name']}\n"
f"**Duration:** {duration_seconds // 60}:{duration_seconds % 60:02}"
)
)
audio_file = songbox.music.download_song(song['url_original'])
await message.reply_audio(
audio=audio_file,
performer=song['artist_name'],
title=song['name'],
thumb=artwork_file,
)
if os.path.exists(audio_file):
os.remove(audio_file)
if os.path.exists(artwork_file):
os.remove(artwork_file)
elif content_type =='albums':
album = songbox.music.get_album(content_id)
artwork_file = f"{album['name']}_artwork.jpg"
await download_file(album['artwork_url'], artwork_file)
await message.reply_photo(
photo=artwork_file,
caption=(
f"**Album:** {album['name']} ({album['released_date']})\n"
f"**Artist:** {album['artist_name']}\n"
f"**Tracks:** {len(album['songs'])}\n"
f"**Share:** https://on.lavafoshi.mv/albums/{album['id']}"
)
)
for song in album['songs']:
audio_file = songbox.music.download_song(song['url_original'])
await message.reply_audio(
audio=audio_file,
performer=song['artist_name'],
title=song['name'],
thumb=artwork_file,
)
if os.path.exists(audio_file):
os.remove(audio_file)
if os.path.exists(artwork_file):
os.remove(artwork_file)
elif content_type =='artists':
await message.reply("downloading from artists does not support yet")
new_token = songbox.auth.refresh_token()
config['songboxToken'] = new_token
with open("config.json", "w") as f:
json.dump(config, f, indent=4)
app.run()