Python bulk processor
If you want to ingest multiple records in Xata, using bulk
is the most efficient way. The BulkProcessor
, a helper of the Python SDK, aims to make the process even simpler by abstracting away any complexity of juggling concurrent workers or chunking data, or maintaining queues.
You can use the BulkProcessor
to e.g. ingest a CSV file into Xata or read documents from a queue and delegate the ingestion to the processor.
Two methods are available to put data in the processing queue:
bp.put_records(":table", ":records")
to add multiple recordsbp.put_record(":table", ":record")
to add only one record
Using the option of multiples, bp.add_records(":table", ":records")
is more efficient as it requires less locking of the internal data structures.
You can tweak the processor to your needs if necessary:
thread_pool_size
: How many data queue workers should be deployed (default: 4)batch_size
: How many records per table should be pushed as batch (default: 25)flush_interval
: After how many seconds should the per table queue be flushed (default: 5 seconds)processing_timeout
: Cooldown period between batches (default: 0.025 seconds)
If data is coming slowly, e.g. > 1 record / second, it's reasonable to have fewer threads deployed and decrease the batch_size
and flush_interval
to get documents in faster.
from xata.client import XataClient
from xata.helpers import BulkProcessor
client = XataClient()
bp = BulkProcessor(client)
# The dict keys match the columns in the destination table "Users"
data = [
{"name": "Max Musterman", "email": "max@acme.co"},
{"name": "Ida von Klammer", "email": "ida@acme.co"},
# ... more records
{"name": "Mia Diaz", "email": "mia@acme.co"},
]
# Add records to processor
bp.put_records("Users", data)
# Ensure the Processing queue is flushed before the script terminates.
# This command will halt the script until all records have been pushed.
bp.flush_queue()
from xata.client import XataClient
from xata.helpers import BulkProcessor
client = XataClient()
bp = BulkProcessor(client)
# Sub to a queue and continously read messages
while queue.subscribed():
msg = queue.read()
# Reading destination and data from queue
table_name = msg["table"]
record = msg["data"]
# Add records to processor
bp.put_record(table_name, record)
# Ensure the Processing queue is flushed before the script terminates.
# This command will halt the script until all records have been pushed.
bp.flush_queue()
The BulkProcessor
just like the client
use the logging
package and will emit logs.