stt.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. import asyncio
  2. import io
  3. from typing import Dict
  4. from openai import OpenAI
  5. from common.config.tokenizer_manage_config import TokenizerManage
  6. from models_provider.base_model_provider import MaxKBBaseModel
  7. from models_provider.impl.base_stt import BaseSpeechToText
  8. def custom_get_token_ids(text: str):
  9. tokenizer = TokenizerManage.get_tokenizer()
  10. return tokenizer.encode(text)
  11. class DockerAISpeechToText(MaxKBBaseModel, BaseSpeechToText):
  12. api_base: str
  13. api_key: str
  14. model: str
  15. params: dict
  16. @staticmethod
  17. def is_cache_model():
  18. return False
  19. def __init__(self, **kwargs):
  20. super().__init__(**kwargs)
  21. self.api_key = kwargs.get('api_key')
  22. self.api_base = kwargs.get('api_base')
  23. self.model = kwargs.get('model')
  24. self.params = kwargs.get('params')
  25. @staticmethod
  26. def new_instance(model_type, model_name, model_credential: Dict[str, object], **model_kwargs):
  27. optional_params = {}
  28. if 'max_tokens' in model_kwargs and model_kwargs['max_tokens'] is not None:
  29. optional_params['max_tokens'] = model_kwargs['max_tokens']
  30. if 'temperature' in model_kwargs and model_kwargs['temperature'] is not None:
  31. optional_params['temperature'] = model_kwargs['temperature']
  32. return DockerAISpeechToText(
  33. model=model_name,
  34. api_base=model_credential.get('api_base'),
  35. api_key=model_credential.get('api_key'),
  36. params = model_kwargs,
  37. **optional_params,
  38. )
  39. def check_auth(self):
  40. client = OpenAI(
  41. base_url=self.api_base,
  42. api_key=self.api_key
  43. )
  44. response_list = client.models.with_raw_response.list()
  45. # print(response_list)
  46. def speech_to_text(self, audio_file):
  47. client = OpenAI(
  48. base_url=self.api_base,
  49. api_key=self.api_key
  50. )
  51. audio_data = audio_file.read()
  52. buffer = io.BytesIO(audio_data)
  53. buffer.name = "file.mp3" # this is the important line
  54. filter_params = {k: v for k,v in self.params.items() if k not in {'model_id','use_local','streaming'}}
  55. transcription_params = {
  56. 'model': self.model,
  57. 'file': buffer,
  58. 'language': 'zh'
  59. }
  60. res = client.audio.transcriptions.create(**transcription_params,extra_body=filter_params)
  61. return res.text