stt.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. import io
  2. from typing import Dict
  3. from openai import OpenAI
  4. from common.config.tokenizer_manage_config import TokenizerManage
  5. from models_provider.base_model_provider import MaxKBBaseModel
  6. from models_provider.impl.base_stt import BaseSpeechToText
  7. def custom_get_token_ids(text: str):
  8. tokenizer = TokenizerManage.get_tokenizer()
  9. return tokenizer.encode(text)
  10. class XInferenceSpeechToText(MaxKBBaseModel, BaseSpeechToText):
  11. api_base: str
  12. api_key: str
  13. model: str
  14. params: dict
  15. def __init__(self, **kwargs):
  16. super().__init__(**kwargs)
  17. self.api_key = kwargs.get('api_key')
  18. self.api_base = kwargs.get('api_base')
  19. self.model = kwargs.get('model')
  20. self.params = kwargs.get('params')
  21. @staticmethod
  22. def is_cache_model():
  23. return False
  24. @staticmethod
  25. def new_instance(model_type, model_name, model_credential: Dict[str, object], **model_kwargs):
  26. optional_params = {}
  27. if 'max_tokens' in model_kwargs and model_kwargs['max_tokens'] is not None:
  28. optional_params['max_tokens'] = model_kwargs['max_tokens']
  29. if 'temperature' in model_kwargs and model_kwargs['temperature'] is not None:
  30. optional_params['temperature'] = model_kwargs['temperature']
  31. return XInferenceSpeechToText(
  32. model=model_name,
  33. api_base=model_credential.get('api_base'),
  34. api_key=model_credential.get('api_key'),
  35. params=model_kwargs,
  36. **optional_params,
  37. )
  38. def check_auth(self):
  39. client = OpenAI(
  40. base_url=self.api_base,
  41. api_key=self.api_key
  42. )
  43. response_list = client.models.with_raw_response.list()
  44. # print(response_list)
  45. def speech_to_text(self, audio_file):
  46. client = OpenAI(
  47. base_url=self.api_base,
  48. api_key=self.api_key
  49. )
  50. audio_data = audio_file.read()
  51. buffer = io.BytesIO(audio_data)
  52. buffer.name = "file.mp3" # this is the important line
  53. filter_params = {k: v for k, v in self.params.items() if k not in {'model_id', 'use_local', 'streaming'}}
  54. transcription_params = {
  55. 'model': self.model,
  56. 'file': buffer,
  57. 'language': 'zh',
  58. **filter_params
  59. }
  60. res = client.audio.transcriptions.create(**transcription_params)
  61. return res.text