Python bulk processor

If you want to ingest multiple records in Xata, using bulk is the most efficient way. The BulkProcessor, a helper of the Python SDK, aims to make the process even simpler by abstracting away any complexity of juggling concurrent workers or chunking data, or maintaining queues.

You can use the BulkProcessor to e.g. ingest a CSV file into Xata or read documents from a queue and delegate the ingestion to the processor.

Two methods are available to put data in the processing queue:

  • bp.put_records(":table", ":records") to add multiple records
  • bp.put_record(":table", ":record") to add only one record

Using the option of multiples, bp.add_records(":table", ":records") is more efficient as it requires less locking of the internal data structures.

You can tweak the processor to your needs if necessary:

  • thread_pool_size: How many data queue workers should be deployed (default: 4)
  • batch_size: How many records per table should be pushed as batch (default: 25)
  • flush_interval: After how many seconds should the per table queue be flushed (default: 5 seconds)
  • processing_timeout: Cooldown period between batches (default: 0.025 seconds)

If data is coming slowly, e.g. > 1 record / second, it's reasonable to have fewer threads deployed and decrease the batch_size and flush_interval to get documents in faster.

from xata.client import XataClient
from xata.helpers import BulkProcessor
 
client = XataClient()
bp = BulkProcessor(client)
 
# The dict keys match the columns in the destination table "Users"
data = [
   {"name": "Max Musterman", "email": "max@acme.co"},
   {"name": "Ida von Klammer", "email": "ida@acme.co"},
   # ... more records
   {"name": "Mia Diaz", "email": "mia@acme.co"},
]
# Add records to processor
bp.put_records("Users", data)
# Ensure the Processing queue is flushed before the script terminates.
# This command will halt the script until all records have been pushed.
bp.flush_queue()
from xata.client import XataClient
from xata.helpers import BulkProcessor
 
client = XataClient()
bp = BulkProcessor(client)
 
# Sub to a queue and continously read messages
while queue.subscribed():
   msg = queue.read()
 
   # Reading destination and data from queue
   table_name = msg["table"]
   record = msg["data"]
 
   # Add records to processor
   bp.put_record(table_name, record)
 
# Ensure the Processing queue is flushed before the script terminates.
# This command will halt the script until all records have been pushed.
bp.flush_queue()

The BulkProcessor just like the client use the logging package and will emit logs.