Source code for oneworldsync.utils

"""
Utility functions for the 1WorldSync Content1 API client
"""

import json
import logging
from datetime import datetime, timezone
from typing import Dict, List, Any, Optional, Union

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

[docs] def format_timestamp(dt=None): """ Format a datetime object as a timestamp for the 1WorldSync API Args: dt (datetime, optional): Datetime object to format. Defaults to current UTC time. Returns: str: Formatted timestamp """ if dt is None: dt = datetime.now(timezone.utc) return dt.strftime('%Y-%m-%dT%H:%M:%SZ')
[docs] def parse_timestamp(timestamp_str): """ Parse a timestamp string from the 1WorldSync API Args: timestamp_str (str): Timestamp string in ISO 8601 format Returns: datetime: Parsed datetime object """ return datetime.strptime(timestamp_str, '%Y-%m-%dT%H:%M:%SZ')
[docs] def pretty_print_json(data): """ Pretty print JSON data Args: data (dict): JSON data to print """ print(json.dumps(data, indent=2))
[docs] def extract_nested_value(data, path, default=None): """ Extract a value from a nested dictionary using a path Args: data (dict): Dictionary to extract from path (list): List of keys to traverse default: Value to return if path doesn't exist Returns: The value at the path or the default value """ current = data try: for key in path: if isinstance(current, dict): current = current.get(key) elif isinstance(current, list) and isinstance(key, int) and 0 <= key < len(current): current = current[key] else: return default if current is None: return default return current except (KeyError, IndexError, TypeError): return default
[docs] def get_nested_dict_value(data: Dict, path: str, default: Any = None) -> Any: """ Extract a value from a nested dictionary using a dot-separated path Args: data (dict): Dictionary to extract from path (str): Dot-separated path (e.g., "item.tradeItemInformation.0.tradeItemDescriptionModule") default: Value to return if path doesn't exist Returns: The value at the path or the default value """ if not data or not isinstance(data, dict): return default parts = path.split('.') current = data for part in parts: # Handle array indices if part.isdigit() and isinstance(current, list): index = int(part) if 0 <= index < len(current): current = current[index] else: return default elif isinstance(current, dict): current = current.get(part) else: return default if current is None: return default return current
[docs] def extract_product_data(product_data: Dict) -> Dict: """ Extract relevant product data from a 1WorldSync product object based on the Swagger Search schema. Args: product_data (dict): A product data dictionary from the 1WorldSync Search REST API Returns: dict: A dictionary with structured product data """ # Initialize with default values extracted_data = { 'gtin': '', 'brand_name': '', 'product_name': '', 'description': '', 'manufacturer': '', 'image_url': '', 'category': '', 'subcategory': '', 'gpc_code': '', 'ingredients': '', 'dimensions': {}, 'country_of_origin': '', 'allergen_info': [], 'item_id': '', 'images': [] } try: item = product_data.get('item', {}) # Extract GTIN (item identifier) identifiers = get_nested_dict_value(item, 'itemIdentificationInformation.itemIdentifier', []) for identifier in identifiers: if identifier.get('itemIdType', {}).get('value') == 'GTIN': extracted_data['gtin'] = identifier.get('itemId', '') break # Extract item reference ID item_ref_info = get_nested_dict_value(item, 'itemIdentificationInformation.itemReferenceIdInformation', {}) extracted_data['item_id'] = item_ref_info.get('itemReferenceId', '') # Extract trade item information trade_item_info = get_nested_dict_value(item, 'tradeItemInformation', []) if trade_item_info: # Extract brand name and product name for info in trade_item_info: desc_info = get_nested_dict_value(info, 'tradeItemDescriptionModule.tradeItemDescriptionInformation', []) for desc in desc_info: # Brand name brand_info = desc.get('brandNameInformation', {}) if brand_info: extracted_data['brand_name'] = brand_info.get('brandName', '') # Product name reg_names = desc.get('regulatedProductName', []) for reg_name in reg_names: values = get_nested_dict_value(reg_name, 'statement.values', []) for value in values: if value.get('value'): extracted_data['product_name'] = value.get('value') break # Description add_desc = get_nested_dict_value(desc, 'additionalTradeItemDescription.values', []) for desc_val in add_desc: if desc_val.get('value'): extracted_data['description'] = desc_val.get('value') break # Extract images file_module = get_nested_dict_value(info, 'referencedFileDetailInformationModule', {}) file_headers = file_module.get('referencedFileHeader', []) for file_header in file_headers: file_type = get_nested_dict_value(file_header, 'referencedFileTypeCode.value', '') if file_type == 'PRODUCT_IMAGE': uri = file_header.get('uniformResourceIdentifier', '') is_primary = get_nested_dict_value(file_header, 'isPrimaryFile.value', '') == 'true' image_data = { 'url': uri, 'is_primary': is_primary } extracted_data['images'].append(image_data) # Set primary image as the main image URL if is_primary and uri: extracted_data['image_url'] = uri # Extract dimensions measurement_groups = get_nested_dict_value(info, 'tradeItemMeasurementsModuleGroup', []) for group in measurement_groups: measurements = get_nested_dict_value(group, 'tradeItemMeasurementsModule.tradeItemMeasurements', {}) if measurements: # Height height = measurements.get('height', {}) if height: extracted_data['dimensions']['height'] = { 'value': height.get('value', ''), 'unit': height.get('qual', '') } # Width width = measurements.get('width', {}) if width: extracted_data['dimensions']['width'] = { 'value': width.get('value', ''), 'unit': width.get('qual', '') } # Depth depth = measurements.get('depth', {}) if depth: extracted_data['dimensions']['depth'] = { 'value': depth.get('value', ''), 'unit': depth.get('qual', '') } # Extract ingredients ingredient_modules = get_nested_dict_value(info, 'foodAndBeverageIngredientModule', []) for module in ingredient_modules: statements = get_nested_dict_value(module, 'ingredientStatement', []) for statement in statements: values = get_nested_dict_value(statement, 'statement.values', []) for value in values: if value.get('value'): extracted_data['ingredients'] = value.get('value') break # Extract country of origin place_module = get_nested_dict_value(info, 'placeOfItemActivityModule', {}) countries = get_nested_dict_value(place_module, 'placeOfProductActivity.countryOfOrigin', []) for country in countries: country_code = get_nested_dict_value(country, 'countryCode.value', '') if country_code: extracted_data['country_of_origin'] = country_code break # Extract GPC code and category product_categories = get_nested_dict_value(item, 'productCategory', []) for category in product_categories: scheme = get_nested_dict_value(category, 'productCategoryScheme.value', '') if scheme == 'GPC': category_codes = category.get('productCategoryCodes', []) for code in category_codes: gpc_code = get_nested_dict_value(code, 'productCategoryCode.value', '') if gpc_code: extracted_data['gpc_code'] = gpc_code # Try to get category component component = get_nested_dict_value(code, 'productCategoryComponent.value', '') if component: if component == 'BRICK': extracted_data['category'] = gpc_code elif component == 'SEGMENT': extracted_data['subcategory'] = gpc_code except Exception as e: logger.error(f"Error extracting product data: {e}") return extracted_data
[docs] def extract_search_results(search_results: Dict) -> Dict: """ Extract structured data from search results Args: search_results (dict): Search results from the API Returns: dict: Structured search results with metadata and products """ result = { 'metadata': { 'response_code': search_results.get('responseCode'), 'response_message': search_results.get('responseMessage'), 'total_results': int(search_results.get('totalNumOfResults', '0')), 'next_cursor': search_results.get('nextCursorMark') }, 'products': [] } # Extract product data for product in search_results.get('results', []): product_data = extract_product_data(product) result['products'].append(product_data) return result
[docs] def get_primary_image(product_data: Dict) -> str: """ Get the primary image URL from product data Args: product_data (dict): Product data Returns: str: Primary image URL or empty string if not found """ # First check if we already extracted the image URL if product_data.get('image_url'): return product_data['image_url'] # Otherwise, look through images for a primary one for image in product_data.get('images', []): if image.get('is_primary'): return image.get('url', '') # If no primary image, return the first image if available if product_data.get('images'): return product_data['images'][0].get('url', '') return ''
[docs] def format_dimensions(dimensions: Dict) -> str: """ Format dimensions as a string Args: dimensions (dict): Dimensions dictionary Returns: str: Formatted dimensions string """ if not dimensions: return '' parts = [] for dim_name in ['height', 'width', 'depth']: dim = dimensions.get(dim_name, {}) if dim and dim.get('value') and dim.get('unit'): parts.append(f"{dim_name.capitalize()}: {dim['value']} {dim['unit']}") return ', '.join(parts)