Nesta série, estamos mostrando como implementamos uma skill para Alexa que reproduz os Drops da EximiaCo. Ela está disponível no Marketplace da Alexa.
No post anterior, indicamos que a primeira interação com uma skill da Alexa deve retornar uma mensagem de boas-vindas e também instruções de utilização, proporcionando uma experiência agradável para os usuários.
A sequência, em nosso projeto, são dois intents principais para seleção e reprodução de episódios. O primeiro, PlayLatestEpisode, faz com que a Alexa reproduza o episódio mais recente. O segundo, SearchEpisode, permite pesquisar um episódio baseado em um tema específico. Além disso, é necessário ativar a interface de AudioPlayer da Alexa que fornece intents nativos para avançar, retroceder, parar e continuar.
Para a execução da mídia, construímos uma abstração chamada Player, que permite tratar eventos relacionados e comandar a reprodução.
class Player(object):
def __init__(self, handler_input):
self.episodes_provider = EpisodesProvider()
self.handler_input = handler_input
self.state = PlayerState(handler_input.attributes_manager, self.episodes_provider)
def reset(self):
self.state.set_offset(0)
# user interaction
def play(self, episode):
if episode is None:
return False
self.handler_input.response_builder.add_directive(
PlayDirective(
play_behavior=PlayBehavior.REPLACE_ALL,
audio_item=AudioItem(
stream=Stream(
token=episode["pub"],
url=episode["address"],
offset_in_milliseconds=self.state.get_offset(),
expected_previous_token=None),
metadata=None))
).set_should_end_session(True)
return True
def play_latest(self):
episode = self.episodes_provider.get_latest()
return self.play(episode)
def is_playing_episode(self):
current_episode = self.state.get_current_episode()
if current_episode is None:
return False
return True
def stop(self):
self.handler_input.response_builder.add_directive(StopDirective())
def resume(self):
self.play(self.state.get_current_episode())
def previous(self, jump_current_episode=True):
current_episode = self.state.get_current_episode()
if current_episode is None:
return False
self.reset()
if jump_current_episode:
self.disable_repeat()
episode = self.episodes_provider.get_previous(current_episode)
if episode is None and self.state.get_loop():
episode = self.episodes_provider.get_latest()
return self.play(episode)
return self.play(current_episode)
def next(self):
current_episode = self.state.get_current_episode()
if current_episode is None:
return False
self.reset()
self.disable_repeat()
episode = self.episodes_provider.get_next(current_episode)
if episode is None and self.state.get_loop():
episode = self.episodes_provider.get_first()
return self.play(episode)
# handlers
def handle_playback_started(self):
current_token = self.handler_input.request_envelope.request.token
self.state.set_token(current_token)
self.state.set_current_episode(current_token)
def handle_playback_nearly_finished(self):
current_episode = self.state.get_current_episode()
if current_episode is None:
return
previous_episode = current_episode
if not self.state.get_repeat():
previous_episode = self.episodes_provider.get_previous(current_episode)
if previous_episode is None and self.state.get_loop():
previous_episode = self.episodes_provider.get_latest()
if previous_episode is None:
return
self.handler_input.response_builder.add_directive(
PlayDirective(
play_behavior=PlayBehavior.ENQUEUE,
audio_item=AudioItem(
stream=Stream(
token=previous_episode["pub"],
url=previous_episode["address"],
offset_in_milliseconds=0,
expected_previous_token=current_episode["pub"]),
metadata=None))
).set_should_end_session(True)
def handle_playback_finished(self):
self.reset()
def handle_playback_stopped(self):
millis = self.handler_input.request_envelope.request.offset_in_milliseconds
print("handle_playback_stopped: {} millis".format(millis))
self.state.set_offset(millis)
# playback state
def enable_repeat(self):
self.state.set_repeat(True)
def disable_repeat(self):
self.state.set_repeat(False)
def enable_loop(self):
self.state.set_loop(True)
def disable_loop(self):
self.state.set_loop(False)
Explicando: no intent PlayLatestEpisode, será obtido o episódio mais novo da base de dados para ser passado como parâmetro no PlayDirective, instruindo a Alexa a iniciar a reprodução do aúdio (MP3).
Um ponto muito importante na execução, é o Token enviado a esta diretiva. Ele garante a ordem em que os episódios são executados, para que a Alexa não confunda os episódios em determinadas situações, principalmente quando mandamos avançar e retroceder.
class StartLatestEpisodeHandler(AbstractRequestHandler):
def can_handle(self, handler_input):
return ask_utils.is_intent_name("PlayLatestEpisode")(handler_input)
def handle(self, handler_input):
player = Player(handler_input)
player.play_latest()
return handler_input.response_builder.response
Já a implementação do intent SearchEpisode é um pouco mais complexa. Primeiramente, é utilizado o conteúdo slot episode para buscar o episódio na base de dados. Se forem retornados diversos resultados, os mesmos são armazenados na sessão da skill, e como resposta é solicitado ao usuário um refinamento da pesquisa. Quando é recebida a resposta do usuário no segundo estágio da pesquisa, utilizamos a biblioteca Whoosh do Python para selecionar o episódio que mais se aproxima da solicitação do usuário, e iniciamos a reprodução.
class SearchEpisodeHandler(AbstractRequestHandler):
def can_handle(self, handler_input):
return ask_utils.is_intent_name("SearchEpisode")(handler_input)
def handle(self, handler_input):
player = Player(handler_input)
provider = EpisodesProvider()
search_term = ask_utils.get_slot_value(handler_input, "episode")
session_attrs = handler_input.attributes_manager.session_attributes
if len(session_attrs) == 0:
self.search_episodes(handler_input, player, provider, search_term, session_attrs)
else:
self.filter_session_episodes(handler_input, player, provider, search_term, session_attrs)
return handler_input.response_builder.response
def filter_session_episodes(self, handler_input, player, provider, search_term, session_attrs):
episodes_to_seek = session_attrs["episodes_to_seek"]
if not os.path.exists("/tmp/indexepisodes"):
os.mkdir("/tmp/indexepisodes")
schema = Schema(title=(TEXT(stored=True)))
ix = create_in("/tmp/indexepisodes", schema)
writer = ix.writer()
for ep in episodes_to_seek:
writer.add_document(title=ep)
writer.commit()
title = None
with ix.searcher() as searcher:
query = QueryParser("title", ix.schema).parse(search_term)
episode_titles = searcher.search(query)
if len(episode_titles) > 0:
title = episode_titles[0]["title"]
if title is not None:
episodes = provider.search(title)
player.play(episodes[0])
else:
speak_out = "Nenhum dos episódios sugeridos fala sobre {}. "
"Qual dos episódio sugeridos anteriormente você gostaria de ouvir?"
.format(search_term)
handler_input.response_builder.speak(speak_out).ask(speak_out)
def search_episodes(self, handler_input, player, provider, search_term, session_attrs):
episodes = provider.search(search_term)
episodes_count = 0 if episodes is None else len(episodes)
print("episodes_count: {}", episodes_count)
if episodes_count == 0:
speak_out = "Não encontrei nenhum episódio sobre {}. Qual episódio você gostaria de ouvir?".format(
search_term)
handler_input.response_builder.speak(speak_out).ask(speak_out)
elif episodes_count == 1:
player.play(episodes[0])
else:
speak_out = "Encontrei {} episódios sobre {}. São eles:<break strength="strong"/>".format(
len(episodes), search_term)
session_attrs["episodes_to_seek"] = []
ix = 0
for ep in episodes:
title = ep["title"]
session_attrs["episodes_to_seek"].append(title)
ix = ix + 1
speak_out = "{} {},<break strength="medium"/>{}<break strength="strong"/>; "
.format(speak_out, ix, title)
speak_out = "{}. Qual destes episódios você quer ouvir?".format(speak_out)
handler_input.response_builder.speak(speak_out).ask(speak_out)
Para a navegação entre os episódios, podemos usar os intents nativos da Alexa, como Amazon.NextIntent (abaixo) e o Amazon.PreviousIntent, que são ativados quando solicitamos para avançar ou retroceder, respectivamente.
class NextEpisodeHandler(AbstractRequestHandler):
def can_handle(self, handler_input):
return ask_utils.is_intent_name("AMAZON.NextIntent")(handler_input)
def handle(self, handler_input):
player = Player(handler_input)
if not player.is_playing_episode():
speak_out = 'Não posso iniciar o próximo episódio, pois nenhum episódio está sendo reproduzido no momento. '
'Para mais informações, basta me pedir ajuda. Qual episódio você gostaria de ouvir?'
handler_input.response_builder.speak(speak_out).ask(speak_out)
else:
if not player.next():
speak_out = 'Chegamos ao final da playlist, continue ouvindo este ou selecione outro episódio'
handler_input.response_builder.speak(speak_out)
return handler_input.response_builder.response
Pronto!
No próximo post, será demonstrada a gestão do estado do player, e também como os eventos do AudioPlayer são manipulados.
Para quem ainda não testou a nossa skill, lembramos que ela está disponível no Marketplace da Alexa. Gostaríamos muito de conhecer sua opinião.