summaryrefslogtreecommitdiffstats
path: root/g4f
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--g4f/api/__init__.py281
1 files changed, 138 insertions, 143 deletions
diff --git a/g4f/api/__init__.py b/g4f/api/__init__.py
index 8ea61dba..17951339 100644
--- a/g4f/api/__init__.py
+++ b/g4f/api/__init__.py
@@ -1,167 +1,162 @@
-from fastapi import FastAPI, Response, Request
-from fastapi.middleware.cors import CORSMiddleware
-from typing import List, Union, Any, Dict, AnyStr
-from ._tokenizer import tokenize
-import g4f
+from fastapi import FastAPI, Response, Request
+from typing import List, Union, Any, Dict, AnyStr
+from ._tokenizer import tokenize
+from .. import BaseProvider
+
import time
import json
import random
import string
import uvicorn
import nest_asyncio
+import g4f
-app = FastAPI()
-nest_asyncio.apply()
-
-origins = [
- "http://localhost",
- "http://localhost:1337",
-]
-
-app.add_middleware(
- CORSMiddleware,
- allow_origins=origins,
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
-)
-
-JSONObject = Dict[AnyStr, Any]
-JSONArray = List[Any]
-JSONStructure = Union[JSONArray, JSONObject]
-
-@app.get("/")
-async def read_root():
- return Response(content=json.dumps({"info": "G4F API"}, indent=4), media_type="application/json")
-
-@app.get("/v1")
-async def read_root_v1():
- return Response(content=json.dumps({"info": "Go to /v1/chat/completions or /v1/models."}, indent=4), media_type="application/json")
-
-@app.get("/v1/models")
-async def models():
- model_list = [{
- 'id': model,
- 'object': 'model',
- 'created': 0,
- 'owned_by': 'g4f'} for model in g4f.Model.__all__()]
-
- return Response(content=json.dumps({
- 'object': 'list',
- 'data': model_list}, indent=4), media_type="application/json")
-
-@app.get("/v1/models/{model_name}")
-async def model_info(model_name: str):
- try:
- model_info = (g4f.ModelUtils.convert[model_name])
-
- return Response(content=json.dumps({
- 'id': model_name,
- 'object': 'model',
- 'created': 0,
- 'owned_by': model_info.base_provider
- }, indent=4), media_type="application/json")
- except:
- return Response(content=json.dumps({"error": "The model does not exist."}, indent=4), media_type="application/json")
-
-@app.post("/v1/chat/completions")
-async def chat_completions(request: Request, item: JSONStructure = None):
-
- item_data = {
- 'model': 'gpt-3.5-turbo',
- 'stream': False,
- }
-
- item_data.update(item or {})
- model = item_data.get('model')
- stream = item_data.get('stream')
- messages = item_data.get('messages')
-
- try:
- response = g4f.ChatCompletion.create(model=model, stream=stream, messages=messages)
- except:
- return Response(content=json.dumps({"error": "An error occurred while generating the response."}, indent=4), media_type="application/json")
-
- completion_id = ''.join(random.choices(string.ascii_letters + string.digits, k=28))
- completion_timestamp = int(time.time())
-
- if not stream:
- prompt_tokens, _ = tokenize(''.join([message['content'] for message in messages]))
- completion_tokens, _ = tokenize(response)
-
- json_data = {
- 'id': f'chatcmpl-{completion_id}',
- 'object': 'chat.completion',
- 'created': completion_timestamp,
- 'model': model,
- 'choices': [
- {
- 'index': 0,
- 'message': {
- 'role': 'assistant',
- 'content': response,
- },
- 'finish_reason': 'stop',
- }
- ],
- 'usage': {
- 'prompt_tokens': prompt_tokens,
- 'completion_tokens': completion_tokens,
- 'total_tokens': prompt_tokens + completion_tokens,
- },
- }
-
- return Response(content=json.dumps(json_data, indent=4), media_type="application/json")
-
- def streaming():
- try:
- for chunk in response:
- completion_data = {
+class Api:
+ def __init__(self, engine: g4f, debug: bool = True, sentry: bool = False,
+ list_ignored_providers: List[Union[str, BaseProvider]] = None) -> None:
+ self.engine = engine
+ self.debug = debug
+ self.sentry = sentry
+ self.list_ignored_providers = list_ignored_providers
+
+ self.app = FastAPI()
+ nest_asyncio.apply()
+
+ JSONObject = Dict[AnyStr, Any]
+ JSONArray = List[Any]
+ JSONStructure = Union[JSONArray, JSONObject]
+
+ @self.app.get("/")
+ async def read_root():
+ return Response(content=json.dumps({"info": "g4f API"}, indent=4), media_type="application/json")
+
+ @self.app.get("/v1")
+ async def read_root_v1():
+ return Response(content=json.dumps({"info": "Go to /v1/chat/completions or /v1/models."}, indent=4), media_type="application/json")
+
+ @self.app.get("/v1/models")
+ async def models():
+ model_list = [{
+ 'id': model,
+ 'object': 'model',
+ 'created': 0,
+ 'owned_by': 'g4f'} for model in g4f.Model.__all__()]
+
+ return Response(content=json.dumps({
+ 'object': 'list',
+ 'data': model_list}, indent=4), media_type="application/json")
+
+ @self.app.get("/v1/models/{model_name}")
+ async def model_info(model_name: str):
+ try:
+ model_info = (g4f.ModelUtils.convert[model_name])
+
+ return Response(content=json.dumps({
+ 'id': model_name,
+ 'object': 'model',
+ 'created': 0,
+ 'owned_by': model_info.base_provider
+ }, indent=4), media_type="application/json")
+ except:
+ return Response(content=json.dumps({"error": "The model does not exist."}, indent=4), media_type="application/json")
+
+ @self.app.post("/v1/chat/completions")
+ async def chat_completions(request: Request, item: JSONStructure = None):
+ item_data = {
+ 'model': 'gpt-3.5-turbo',
+ 'stream': False,
+ }
+
+ item_data.update(item or {})
+ model = item_data.get('model')
+ stream = item_data.get('stream')
+ messages = item_data.get('messages')
+
+ try:
+ response = g4f.ChatCompletion.create(model=model, stream=stream, messages=messages)
+ except:
+ return Response(content=json.dumps({"error": "An error occurred while generating the response."}, indent=4), media_type="application/json")
+
+ completion_id = ''.join(random.choices(string.ascii_letters + string.digits, k=28))
+ completion_timestamp = int(time.time())
+
+ if not stream:
+ prompt_tokens, _ = tokenize(''.join([message['content'] for message in messages]))
+ completion_tokens, _ = tokenize(response)
+
+ json_data = {
'id': f'chatcmpl-{completion_id}',
- 'object': 'chat.completion.chunk',
+ 'object': 'chat.completion',
'created': completion_timestamp,
'model': model,
'choices': [
{
'index': 0,
- 'delta': {
- 'content': chunk,
+ 'message': {
+ 'role': 'assistant',
+ 'content': response,
},
- 'finish_reason': None,
+ 'finish_reason': 'stop',
}
],
+ 'usage': {
+ 'prompt_tokens': prompt_tokens,
+ 'completion_tokens': completion_tokens,
+ 'total_tokens': prompt_tokens + completion_tokens,
+ },
}
- content = json.dumps(completion_data, separators=(',', ':'))
- yield f'data: {content}\n\n'
- time.sleep(0.03)
-
- end_completion_data = {
- 'id': f'chatcmpl-{completion_id}',
- 'object': 'chat.completion.chunk',
- 'created': completion_timestamp,
- 'model': model,
- 'choices': [
- {
- 'index': 0,
- 'delta': {},
- 'finish_reason': 'stop',
+ return Response(content=json.dumps(json_data, indent=4), media_type="application/json")
+
+ def streaming():
+ try:
+ for chunk in response:
+ completion_data = {
+ 'id': f'chatcmpl-{completion_id}',
+ 'object': 'chat.completion.chunk',
+ 'created': completion_timestamp,
+ 'model': model,
+ 'choices': [
+ {
+ 'index': 0,
+ 'delta': {
+ 'content': chunk,
+ },
+ 'finish_reason': None,
+ }
+ ],
+ }
+
+ content = json.dumps(completion_data, separators=(',', ':'))
+ yield f'data: {content}\n\n'
+ time.sleep(0.03)
+
+ end_completion_data = {
+ 'id': f'chatcmpl-{completion_id}',
+ 'object': 'chat.completion.chunk',
+ 'created': completion_timestamp,
+ 'model': model,
+ 'choices': [
+ {
+ 'index': 0,
+ 'delta': {},
+ 'finish_reason': 'stop',
+ }
+ ],
}
- ],
- }
- content = json.dumps(end_completion_data, separators=(',', ':'))
- yield f'data: {content}\n\n'
+ content = json.dumps(end_completion_data, separators=(',', ':'))
+ yield f'data: {content}\n\n'
- except GeneratorExit:
- pass
+ except GeneratorExit:
+ pass
- return Response(content=json.dumps(streaming(), indent=4), media_type="application/json")
+ return Response(content=json.dumps(streaming(), indent=4), media_type="application/json")
-@app.post("/v1/completions")
-async def completions():
- return Response(content=json.dumps({'info': 'Not working yet.'}, indent=4), media_type="application/json")
+ @self.app.post("/v1/completions")
+ async def completions():
+ return Response(content=json.dumps({'info': 'Not working yet.'}, indent=4), media_type="application/json")
-def run(ip, thread_quantity):
- split_ip = ip.split(":")
- uvicorn.run(app, host=split_ip[0], port=int(split_ip[1]), use_colors=False, workers=thread_quantity)
+ def run(self, ip, thread_quantity):
+ split_ip = ip.split(":")
+ uvicorn.run(self.app, host=split_ip[0], port=int(split_ip[1]), use_colors=False, workers=thread_quantity)