stt.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. import asyncio
  2. import io
  3. from typing import Dict
  4. from openai import OpenAI
  5. from common.config.tokenizer_manage_config import TokenizerManage
  6. from models_provider.base_model_provider import MaxKBBaseModel
  7. from models_provider.impl.base_stt import BaseSpeechToText
  8. def custom_get_token_ids(text: str):
  9. tokenizer = TokenizerManage.get_tokenizer()
  10. return tokenizer.encode(text)
  11. class SiliconCloudSpeechToText(MaxKBBaseModel, BaseSpeechToText):
  12. api_base: str
  13. api_key: str
  14. model: str
  15. params: dict
  16. def __init__(self, **kwargs):
  17. super().__init__(**kwargs)
  18. self.api_key = kwargs.get('api_key')
  19. self.api_base = kwargs.get('api_base')
  20. self.params = kwargs.get('params')
  21. @staticmethod
  22. def new_instance(model_type, model_name, model_credential: Dict[str, object], **model_kwargs):
  23. optional_params = {}
  24. if 'max_tokens' in model_kwargs and model_kwargs['max_tokens'] is not None:
  25. optional_params['max_tokens'] = model_kwargs['max_tokens']
  26. if 'temperature' in model_kwargs and model_kwargs['temperature'] is not None:
  27. optional_params['temperature'] = model_kwargs['temperature']
  28. return SiliconCloudSpeechToText(
  29. model=model_name,
  30. api_base=model_credential.get('api_base'),
  31. api_key=model_credential.get('api_key'),
  32. params=model_kwargs,
  33. **optional_params,
  34. )
  35. @staticmethod
  36. def is_cache_model():
  37. return False
  38. def check_auth(self):
  39. client = OpenAI(
  40. base_url=self.api_base,
  41. api_key=self.api_key
  42. )
  43. response_list = client.models.with_raw_response.list()
  44. # print(response_list)
  45. def speech_to_text(self, audio_file):
  46. client = OpenAI(
  47. base_url=self.api_base,
  48. api_key=self.api_key
  49. )
  50. audio_data = audio_file.read()
  51. buffer = io.BytesIO(audio_data)
  52. buffer.name = "file.mp3" # this is the important line
  53. filter_params = {k: v for k, v in self.params.items() if k not in {'model_id', 'use_local', 'streaming'}}
  54. transcription_params = {
  55. 'model': self.model,
  56. 'file': buffer,
  57. 'language': 'zh'
  58. }
  59. res = client.audio.transcriptions.create(**transcription_params,extra_body=filter_params)
  60. return res.text