Source code for oneworldsync.content1_client

"""
1WorldSync Content1 API Client

This module provides a client for interacting with the 1WorldSync Content1 API.
"""

import os
import json
import requests
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional, Union
from .content1_auth import Content1HMACAuth
from .exceptions import APIError, AuthenticationError
from .criteria import ProductCriteria, DateRangeCriteria, SortField
from .models import Content1ProductResults, Content1HierarchyResults


[docs] class Content1Client: """ Client for the 1WorldSync Content1 API This class provides methods for interacting with the 1WorldSync Content1 API, handling authentication, request construction, and response parsing. """
[docs] def __init__(self, app_id=None, secret_key=None, gln=None, api_url=None, timeout=30): """ Initialize the 1WorldSync Content1 API client Args: app_id (str, optional): The application ID provided by 1WorldSync. If None, will try to get from ONEWORLDSYNC_APP_ID environment variable. secret_key (str, optional): The secret key provided by 1WorldSync. If None, will try to get from ONEWORLDSYNC_SECRET_KEY environment variable. gln (str, optional): Global Location Number for the user. If None, will try to get from ONEWORLDSYNC_USER_GLN environment variable. api_url (str, optional): The API URL to use. If None, will try to get from ONEWORLDSYNC_CONTENT1_API_URL environment variable. Defaults to production API if not specified. timeout (int, optional): Request timeout in seconds. Defaults to 30. """ # Get credentials from environment variables if not provided self.app_id = app_id or os.environ.get('ONEWORLDSYNC_APP_ID') self.secret_key = secret_key or os.environ.get('ONEWORLDSYNC_SECRET_KEY') self.gln = gln or os.environ.get('ONEWORLDSYNC_USER_GLN') # Get API URL from environment variable if not provided default_api_url = 'https://content1-api.1worldsync.com' self.api_url = api_url or os.environ.get('ONEWORLDSYNC_CONTENT1_API_URL', default_api_url) # Remove trailing slash if present if self.api_url.endswith('/'): self.api_url = self.api_url[:-1] # Validate required parameters if not self.app_id or not self.secret_key: raise ValueError("ONEWORLDSYNC_APP_ID and ONEWORLDSYNC_SECRET_KEY must be provided either as parameters or environment variables") self.auth = Content1HMACAuth(self.app_id, self.secret_key, self.gln) self.timeout = timeout
def _make_request(self, method, path, query_params=None, data=None): """ Make a request to the 1WorldSync Content1 API Args: method (str): HTTP method (GET, POST, etc.) path (str): API endpoint path query_params (dict, optional): Query parameters. Defaults to None. data (dict, optional): Request body data. Defaults to None. Returns: dict: API response parsed as JSON Raises: AuthenticationError: If authentication fails APIError: If the API returns an error """ # Initialize parameters if None if query_params is None: query_params = {} # Add timestamp to query parameters timestamp = self.auth.generate_timestamp() query_params['timestamp'] = timestamp # Build the URI (path + query parameters) - exactly as in TypeScript implementation uri = path if query_params: # Sort query parameters to ensure consistent order sorted_params = sorted(query_params.items()) query_string = '&'.join([f"{k}={v}" for k, v in sorted_params]) uri = f"{path}?{query_string}" # Get authentication headers headers = self.auth.generate_auth_headers(uri) # Build the full URL url = f"{self.api_url}{uri}" # Debug Print - equivalent curl command for debugging # data_str = "" if data is None else f" -d '{json.dumps(data)}'" # headers_str = " ".join([f"-H \"{k}: {v}\"" for k, v in headers.items()]) # curl_cmd = f"curl -X {method} \"{url}\" {headers_str}{data_str}" # print(f"Equivalent curl command:\n{curl_cmd}") try: # Make the request response = requests.request( method, url, json=data, headers=headers, timeout=self.timeout ) # Check for errors if response.status_code == 401: error_message = f"Authentication failed: {response.text}" print(f"Authentication error details: Status {response.status_code}, Response: {response.text}") print(f"Request URL: {url}") print(f"Request headers: {headers}") raise AuthenticationError(error_message) if response.status_code >= 400: print(f"API error details: Status {response.status_code}, Response: {response.text}") raise APIError( response.status_code, response.text, response ) # Return empty dict for 204 No Content if response.status_code == 204: return {} # Parse response return response.json() except requests.exceptions.RequestException as e: raise APIError(0, str(e))
[docs] def count_products(self, criteria=None): """ Count products using the Content1 API Args: criteria (dict or ProductCriteria, optional): Search criteria. Defaults to empty dict. Returns: int: Count of products matching the criteria """ if criteria is None: criteria = {} elif isinstance(criteria, ProductCriteria): criteria = criteria.build() response = self._make_request('POST', '/V1/product/count', data=criteria) return response.get('count', 0)
[docs] def fetch_products(self, criteria=None, page_size=1000): """ Fetch products using the Content1 API Args: criteria (dict or ProductCriteria, optional): Search criteria. Defaults to empty dict. page_size (int, optional): Number of products to return per page. Defaults to 1000. Returns: Content1ProductResults: Product fetch results """ if criteria is None: criteria = {} elif isinstance(criteria, ProductCriteria): criteria = criteria.build() query_params = {'pageSize': page_size} response = self._make_request('POST', '/V1/product/fetch', query_params=query_params, data=criteria) return Content1ProductResults(response)
[docs] def fetch_hierarchies(self, criteria=None, page_size=1000): """ Fetch product hierarchies using the Content1 API Args: criteria (dict or ProductCriteria, optional): Search criteria. Defaults to empty dict. page_size (int, optional): Number of hierarchies to return per page. Defaults to 1000. Returns: Content1HierarchyResults: Hierarchy fetch results """ if criteria is None: criteria = {} elif isinstance(criteria, ProductCriteria): criteria = criteria.build() query_params = {'pageSize': page_size} response = self._make_request('POST', '/V1/product/hierarchy', query_params=query_params, data=criteria) return Content1HierarchyResults(response)
[docs] def fetch_products_by_gtin(self, gtins, page_size=1000): """ Fetch products by GTIN Args: gtins (list): List of GTINs to fetch page_size (int, optional): Number of products to return per page. Defaults to 1000. Returns: dict: Product fetch results """ criteria = { 'gtin': gtins } return self.fetch_products(criteria, page_size)
[docs] def fetch_products_by_ip_gln(self, ip_gln, page_size=1000): """ Fetch products by Information Provider GLN Args: ip_gln (str): Information Provider GLN page_size (int, optional): Number of products to return per page. Defaults to 1000. Returns: dict: Product fetch results """ criteria = { 'ipGln': ip_gln } return self.fetch_products(criteria, page_size)
[docs] def fetch_products_by_target_market(self, target_market, page_size=1000): """ Fetch products by target market Args: target_market (str): Target market code (e.g., 'US') page_size (int, optional): Number of products to return per page. Defaults to 1000. Returns: dict: Product fetch results """ criteria = { 'targetMarket': target_market } return self.fetch_products(criteria, page_size)
[docs] def fetch_next_page(self, previous_response, page_size=1000, original_criteria=None): """ Fetch the next page of products using the searchAfter value from a previous response Args: previous_response (dict or Content1ProductResults): Previous response from fetch_products page_size (int, optional): Number of products to return per page. Defaults to 1000. original_criteria (dict or ProductCriteria, optional): Original search criteria to preserve. Defaults to None. Returns: Content1ProductResults: Next page of product fetch results """ # Handle Content1ProductResults object if isinstance(previous_response, Content1ProductResults): search_after = previous_response.search_after else: if 'searchAfter' not in previous_response: raise ValueError("Previous response does not contain searchAfter value") search_after = previous_response['searchAfter'] # Handle original criteria if original_criteria is None: criteria = {} elif isinstance(original_criteria, ProductCriteria): criteria = original_criteria.build() else: criteria = original_criteria.copy() # Add searchAfter parameter criteria['searchAfter'] = search_after return self.fetch_products(criteria, page_size)
[docs] def fetch_products_by_date_range(self, from_date, to_date, target_market=None, page_size=1000): """ Fetch products by last modified date range Args: from_date (str): Start date in YYYY-MM-DD format to_date (str): End date in YYYY-MM-DD format target_market (str, optional): Target market code (e.g., 'US'). Defaults to None. page_size (int, optional): Number of products to return per page. Defaults to 1000. Returns: Content1ProductResults: Product fetch results """ criteria = ProductCriteria() criteria.with_last_modified_date(DateRangeCriteria.between(from_date, to_date)) if target_market: criteria.with_target_market(target_market) return self.fetch_products(criteria, page_size)
[docs] def fetch_products_last_30_days(self, target_market=None, page_size=1000): """ Fetch products modified in the last 30 days Args: target_market (str, optional): Target market code (e.g., 'US'). Defaults to None. page_size (int, optional): Number of products to return per page. Defaults to 1000. Returns: Content1ProductResults: Product fetch results """ criteria = ProductCriteria() criteria.with_last_modified_date(DateRangeCriteria.last_30_days()) if target_market: criteria.with_target_market(target_market) return self.fetch_products(criteria, page_size)
[docs] def fetch_products_by_brand(self, brand_name, target_market=None, page_size=1000): """ Fetch products by brand name Args: brand_name (str): Brand name to search for target_market (str, optional): Target market code (e.g., 'US'). Defaults to None. page_size (int, optional): Number of products to return per page. Defaults to 1000. Returns: Content1ProductResults: Product fetch results """ criteria = ProductCriteria().with_brand_name(brand_name) if target_market: criteria.with_target_market(target_market) return self.fetch_products(criteria, page_size)
[docs] def fetch_products_by_gpc_code(self, gpc_code, target_market=None, page_size=1000): """ Fetch products by GPC code Args: gpc_code (str): GPC code to search for target_market (str, optional): Target market code (e.g., 'US'). Defaults to None. page_size (int, optional): Number of products to return per page. Defaults to 1000. Returns: Content1ProductResults: Product fetch results """ criteria = ProductCriteria().with_gpc_code(gpc_code) if target_market: criteria.with_target_market(target_market) return self.fetch_products(criteria, page_size)
[docs] def fetch_products_by_upc(self, upc_code, target_market=None, page_size=1000): """ Fetch products by UPC code Args: upc_code (str): UPC code to search for target_market (str, optional): Target market code (e.g., 'US'). Defaults to None. page_size (int, optional): Number of products to return per page. Defaults to 1000. Returns: Content1ProductResults: Product fetch results """ criteria = ProductCriteria().with_upc_code(upc_code) if target_market: criteria.with_target_market(target_market) return self.fetch_products(criteria, page_size)