From 9a16296416ab2c801aa2f5920cc1790d554ec1ce Mon Sep 17 00:00:00 2001 From: kqlio67 Date: Thu, 26 Sep 2024 21:38:04 +0300 Subject: feat(g4f/client/async_client.py): enhance image generation and logging --- g4f/client/async_client.py | 359 ++++++++++++++++++++++++--------------------- 1 file changed, 192 insertions(+), 167 deletions(-) (limited to 'g4f/client/async_client.py') diff --git a/g4f/client/async_client.py b/g4f/client/async_client.py index 2fe4640b..9caa74b2 100644 --- a/g4f/client/async_client.py +++ b/g4f/client/async_client.py @@ -1,32 +1,37 @@ from __future__ import annotations +import os import time import random import string +import logging import asyncio -import base64 -from aiohttp import ClientSession, BaseConnector - -from .types import Client as BaseClient -from .types import ProviderType, FinishReason -from .stubs import ChatCompletion, ChatCompletionChunk, ImagesResponse, Image -from .types import AsyncIterResponse, ImageProvider -from .image_models import ImageModels -from .helper import filter_json, find_stop, filter_none, cast_iter_async -from .service import get_last_provider, get_model_and_provider -from ..Provider import ProviderUtils -from ..typing import Union, Messages, AsyncIterator, ImageType -from ..errors import NoImageResponseError, ProviderNotFoundError -from ..requests.aiohttp import get_connector +from typing import Union, AsyncIterator +from ..providers.base_provider import AsyncGeneratorProvider +from ..image import ImageResponse, to_image, to_data_uri +from ..typing import Messages, ImageType +from ..providers.types import BaseProvider, ProviderType, FinishReason from ..providers.conversation import BaseConversation -from ..image import ImageResponse as ImageProviderResponse, ImageDataResponse +from ..image import ImageResponse as ImageProviderResponse +from ..errors import NoImageResponseError +from .stubs import ChatCompletion, ChatCompletionChunk, Image, ImagesResponse +from .image_models import ImageModels +from .types import IterResponse, ImageProvider +from .types import Client as BaseClient +from .service import get_model_and_provider, get_last_provider +from .helper import find_stop, filter_json, filter_none +from ..models import ModelUtils +from ..Provider import IterListProvider +from .helper import cast_iter_async try: - anext + anext # Python 3.8+ except NameError: - async def anext(iter): - async for chunk in iter: - return chunk + async def anext(aiter): + try: + return await aiter.__anext__() + except StopAsyncIteration: + raise StopIteration async def iter_response( response: AsyncIterator[str], @@ -34,11 +39,12 @@ async def iter_response( response_format: dict = None, max_tokens: int = None, stop: list = None -) -> AsyncIterResponse: +) -> AsyncIterator[Union[ChatCompletion, ChatCompletionChunk]]: content = "" finish_reason = None completion_id = ''.join(random.choices(string.ascii_letters + string.digits, k=28)) - count: int = 0 + idx = 0 + async for chunk in response: if isinstance(chunk, FinishReason): finish_reason = chunk.reason @@ -46,18 +52,26 @@ async def iter_response( elif isinstance(chunk, BaseConversation): yield chunk continue + content += str(chunk) - count += 1 - if max_tokens is not None and count >= max_tokens: + idx += 1 + + if max_tokens is not None and idx >= max_tokens: finish_reason = "length" - first, content, chunk = find_stop(stop, content, chunk) + + first, content, chunk = find_stop(stop, content, chunk if stream else None) + if first != -1: finish_reason = "stop" + if stream: yield ChatCompletionChunk(chunk, None, completion_id, int(time.time())) + if finish_reason is not None: break + finish_reason = "stop" if finish_reason is None else finish_reason + if stream: yield ChatCompletionChunk(None, finish_reason, completion_id, int(time.time())) else: @@ -66,12 +80,12 @@ async def iter_response( content = filter_json(content) yield ChatCompletion(content, finish_reason, completion_id, int(time.time())) -async def iter_append_model_and_provider(response: AsyncIterResponse) -> AsyncIterResponse: +async def iter_append_model_and_provider(response: AsyncIterator) -> AsyncIterator: last_provider = None async for chunk in response: last_provider = get_last_provider(True) if last_provider is None else last_provider chunk.model = last_provider.get("model") - chunk.provider = last_provider.get("name") + chunk.provider = last_provider.get("name") yield chunk class AsyncClient(BaseClient): @@ -80,59 +94,32 @@ class AsyncClient(BaseClient): provider: ProviderType = None, image_provider: ImageProvider = None, **kwargs - ): + ) -> None: super().__init__(**kwargs) self.chat: Chat = Chat(self, provider) - self.images: Images = Images(self, image_provider) - -def create_response( - messages: Messages, - model: str, - provider: ProviderType = None, - stream: bool = False, - proxy: str = None, - max_tokens: int = None, - stop: list[str] = None, - api_key: str = None, - **kwargs -): - has_asnyc = hasattr(provider, "create_async_generator") - if has_asnyc: - create = provider.create_async_generator - else: - create = provider.create_completion - response = create( - model, messages, - stream=stream, - **filter_none( - proxy=proxy, - max_tokens=max_tokens, - stop=stop, - api_key=api_key - ), - **kwargs - ) - if not has_asnyc: - response = cast_iter_async(response) - return response + self._images: Images = Images(self, image_provider) -class Completions(): - def __init__(self, client: AsyncClient, provider: ProviderType = None): - self.client: AsyncClient = client + @property + def images(self) -> Images: + return self._images + +class Completions: + def __init__(self, client: 'AsyncClient', provider: ProviderType = None): + self.client: 'AsyncClient' = client self.provider: ProviderType = provider - def create( + async def create( self, messages: Messages, model: str, provider: ProviderType = None, stream: bool = False, proxy: str = None, + response_format: dict = None, max_tokens: int = None, stop: Union[list[str], str] = None, api_key: str = None, - response_format: dict = None, - ignored : list[str] = None, + ignored: list[str] = None, ignore_working: bool = False, ignore_stream: bool = False, **kwargs @@ -143,133 +130,171 @@ class Completions(): stream, ignored, ignore_working, - ignore_stream + ignore_stream, ) + stop = [stop] if isinstance(stop, str) else stop - response = create_response( - messages, model, - provider, stream, - proxy=self.client.get_proxy() if proxy is None else proxy, - max_tokens=max_tokens, - stop=stop, - api_key=self.client.api_key if api_key is None else api_key, + + response = provider.create_completion( + model, + messages, + stream=stream, + **filter_none( + proxy=self.client.get_proxy() if proxy is None else proxy, + max_tokens=max_tokens, + stop=stop, + api_key=self.client.api_key if api_key is None else api_key + ), **kwargs ) - response = iter_response(response, stream, response_format, max_tokens, stop) - response = iter_append_model_and_provider(response) - return response if stream else anext(response) -class Chat(): + if isinstance(response, AsyncIterator): + response = iter_response(response, stream, response_format, max_tokens, stop) + response = iter_append_model_and_provider(response) + return response if stream else await anext(response) + else: + response = cast_iter_async(response) + response = iter_response(response, stream, response_format, max_tokens, stop) + response = iter_append_model_and_provider(response) + return response if stream else await anext(response) + + + +class Chat: completions: Completions def __init__(self, client: AsyncClient, provider: ProviderType = None): self.completions = Completions(client, provider) -async def iter_image_response( - response: AsyncIterator, - response_format: str = None, - connector: BaseConnector = None, - proxy: str = None -) -> Union[ImagesResponse, None]: +async def iter_image_response(response: AsyncIterator) -> Union[ImagesResponse, None]: + logging.info("Starting iter_image_response") async for chunk in response: + logging.info(f"Processing chunk: {chunk}") if isinstance(chunk, ImageProviderResponse): - if response_format == "b64_json": - async with ClientSession( - connector=get_connector(connector, proxy), - cookies=chunk.options.get("cookies") - ) as session: - async def fetch_image(image): - async with session.get(image) as response: - return base64.b64encode(await response.content.read()).decode() - images = await asyncio.gather(*[fetch_image(image) for image in chunk.get_list()]) - return ImagesResponse([Image(None, image, chunk.alt) for image in images], int(time.time())) - return ImagesResponse([Image(image, None, chunk.alt) for image in chunk.get_list()], int(time.time())) - elif isinstance(chunk, ImageDataResponse): - return ImagesResponse([Image(None, image, chunk.alt) for image in chunk.get_list()], int(time.time())) - -def create_image(provider: ProviderType, prompt: str, model: str = "", **kwargs) -> AsyncIterator: + logging.info("Found ImageProviderResponse") + return ImagesResponse([Image(image) for image in chunk.get_list()]) + + logging.warning("No ImageProviderResponse found in the response") + return None + +async def create_image(client: AsyncClient, provider: ProviderType, prompt: str, model: str = "", **kwargs) -> AsyncIterator: + logging.info(f"Creating image with provider: {provider}, model: {model}, prompt: {prompt}") + if isinstance(provider, type) and provider.__name__ == "You": kwargs["chat_mode"] = "create" else: - prompt = f"create a image with: {prompt}" - return provider.create_async_generator( + prompt = f"create an image with: {prompt}" + + response = await provider.create_completion( model, [{"role": "user", "content": prompt}], stream=True, + proxy=client.get_proxy(), **kwargs ) + + logging.info(f"Response from create_completion: {response}") + return response -class Images(): - def __init__(self, client: AsyncClient, provider: ImageProvider = None): - self.client: AsyncClient = client +class Images: + def __init__(self, client: 'AsyncClient', provider: ImageProvider = None): + self.client: 'AsyncClient' = client self.provider: ImageProvider = provider self.models: ImageModels = ImageModels(client) - def get_provider(self, model: str, provider: ProviderType = None): - if isinstance(provider, str): - if provider in ProviderUtils.convert: - provider = ProviderUtils.convert[provider] + async def generate(self, prompt: str, model: str = None, **kwargs) -> ImagesResponse: + logging.info(f"Starting asynchronous image generation for model: {model}, prompt: {prompt}") + provider = self.models.get(model, self.provider) + if provider is None: + raise ValueError(f"Unknown model: {model}") + + logging.info(f"Provider: {provider}") + + if isinstance(provider, IterListProvider): + if provider.providers: + provider = provider.providers[0] + logging.info(f"Using first provider from IterListProvider: {provider}") + else: + raise ValueError(f"IterListProvider for model {model} has no providers") + + if isinstance(provider, type) and issubclass(provider, AsyncGeneratorProvider): + logging.info("Using AsyncGeneratorProvider") + messages = [{"role": "user", "content": prompt}] + async for response in provider.create_async_generator(model, messages, **kwargs): + if isinstance(response, ImageResponse): + return self._process_image_response(response) + elif isinstance(response, str): + image_response = ImageResponse([response], prompt) + return self._process_image_response(image_response) + elif hasattr(provider, 'create'): + logging.info("Using provider's create method") + async_create = asyncio.iscoroutinefunction(provider.create) + if async_create: + response = await provider.create(prompt) else: - raise ProviderNotFoundError(f'Provider not found: {provider}') + response = provider.create(prompt) + + if isinstance(response, ImageResponse): + return self._process_image_response(response) + elif isinstance(response, str): + image_response = ImageResponse([response], prompt) + return self._process_image_response(image_response) + elif hasattr(provider, 'create_completion'): + logging.info("Using provider's create_completion method") + response = await create_image(provider, prompt, model, **kwargs) + async for chunk in response: + if isinstance(chunk, ImageProviderResponse): + logging.info("Found ImageProviderResponse") + return ImagesResponse([Image(image) for image in chunk.get_list()]) else: - provider = self.models.get(model, self.provider) - return provider + raise ValueError(f"Provider {provider} does not support image generation") - async def generate( - self, - prompt, - model: str = "", - provider: ProviderType = None, - response_format: str = None, - connector: BaseConnector = None, - proxy: str = None, - **kwargs - ) -> ImagesResponse: - provider = self.get_provider(model, provider) - if hasattr(provider, "create_async_generator"): - response = create_image( - provider, - prompt, - **filter_none( - response_format=response_format, - connector=connector, - proxy=self.client.get_proxy() if proxy is None else proxy, - ), - **kwargs - ) + logging.error(f"Unexpected response type: {type(response)}") + raise NoImageResponseError(f"Unexpected response type: {type(response)}") + + def _process_image_response(self, response: ImageResponse) -> ImagesResponse: + processed_images = [] + for image_data in response.get_list(): + if image_data.startswith('http://') or image_data.startswith('https://'): + processed_images.append(Image(url=image_data)) + else: + image = to_image(image_data) + file_name = self._save_image(image) + processed_images.append(Image(url=file_name)) + return ImagesResponse(processed_images) + + def _save_image(self, image: 'PILImage') -> str: + os.makedirs('generated_images', exist_ok=True) + file_name = f"generated_images/image_{int(time.time())}.png" + image.save(file_name) + return file_name + + async def create_variation(self, image: Union[str, bytes], model: str = None, **kwargs) -> ImagesResponse: + provider = self.models.get(model, self.provider) + if provider is None: + raise ValueError(f"Unknown model: {model}") + + if isinstance(provider, type) and issubclass(provider, AsyncGeneratorProvider): + messages = [{"role": "user", "content": "create a variation of this image"}] + image_data = to_data_uri(image) + async for response in provider.create_async_generator(model, messages, image=image_data, **kwargs): + if isinstance(response, ImageResponse): + return self._process_image_response(response) + elif isinstance(response, str): + image_response = ImageResponse([response], "Image variation") + return self._process_image_response(image_response) + elif hasattr(provider, 'create_variation'): + if asyncio.iscoroutinefunction(provider.create_variation): + response = await provider.create_variation(image, **kwargs) + else: + response = provider.create_variation(image, **kwargs) + + if isinstance(response, ImageResponse): + return self._process_image_response(response) + elif isinstance(response, str): + image_response = ImageResponse([response], "Image variation") + return self._process_image_response(image_response) else: - response = await provider.create_async(prompt) - return ImagesResponse([Image(image) for image in response.get_list()]) - image = await iter_image_response(response, response_format, connector, proxy) - if image is None: - raise NoImageResponseError() - return image - - async def create_variation( - self, - image: ImageType, - model: str = None, - response_format: str = None, - connector: BaseConnector = None, - proxy: str = None, - **kwargs - ): - provider = self.get_provider(model, provider) - result = None - if hasattr(provider, "create_async_generator"): - response = provider.create_async_generator( - "", - [{"role": "user", "content": "create a image like this"}], - stream=True, - image=image, - **filter_none( - response_format=response_format, - connector=connector, - proxy=self.client.get_proxy() if proxy is None else proxy, - ), - **kwargs - ) - result = iter_image_response(response, response_format, connector, proxy) - if result is None: - raise NoImageResponseError() - return result + raise ValueError(f"Provider {provider} does not support image variation") + + raise NoImageResponseError("Failed to create image variation") -- cgit v1.2.3