Add multi-batch change transaction support for datasync consumers
Otherwise it was possible for the rattail transaction to fall out of sync from consumer, if it was committing more often than us.
This commit is contained in:
parent
d7cb42a98b
commit
295f2d53c4
|
@ -45,9 +45,26 @@ class DataSyncConsumer(object):
|
|||
This method is called when the consumer thread is first started.
|
||||
"""
|
||||
|
||||
def begin_transaction(self):
|
||||
"""
|
||||
Called just before the consumer is asked to process changes, possibly
|
||||
via multiple batches.
|
||||
"""
|
||||
|
||||
def process_changes(self, session, changes):
|
||||
"""
|
||||
Process (consume) a set of changes.
|
||||
Process (consume) a batch of changes.
|
||||
"""
|
||||
|
||||
def rollback_transaction(self):
|
||||
"""
|
||||
Called when any batch of changes failed to process.
|
||||
"""
|
||||
|
||||
def commit_transaction(self):
|
||||
"""
|
||||
Called just after the consumer has successfully finished processing
|
||||
changes, possibly via multiple batches.
|
||||
"""
|
||||
|
||||
|
||||
|
|
|
@ -201,14 +201,15 @@ def consume_changes(profile, consumers):
|
|||
session = Session()
|
||||
for consumer in consumers:
|
||||
|
||||
changes = session.query(model.DataSyncChange).filter(
|
||||
model.DataSyncChange.source == profile.key,
|
||||
model.DataSyncChange.consumer == consumer.key)\
|
||||
.order_by(model.DataSyncChange.obtained)\
|
||||
.all()
|
||||
changes = session.query(model.DataSyncChange)\
|
||||
.filter(model.DataSyncChange.source == profile.key)\
|
||||
.filter(model.DataSyncChange.consumer == consumer.key)\
|
||||
.order_by(model.DataSyncChange.obtained)\
|
||||
.all()
|
||||
|
||||
if changes:
|
||||
log.debug("found {0} changes to process".format(len(changes)))
|
||||
log.debug("found {} changes to process".format(len(changes)))
|
||||
consumer.begin_transaction()
|
||||
|
||||
# Process (consume) changes in batches, according to timestamp.
|
||||
batch = []
|
||||
|
@ -235,14 +236,17 @@ def consume_changes(profile, consumers):
|
|||
log.debug("processed {} changes in {} batches{}".format(
|
||||
changecount, batchcount, " (with errors)" if error else ""))
|
||||
if error:
|
||||
consumer.rollback_transaction()
|
||||
break
|
||||
else:
|
||||
consumer.commit_transaction()
|
||||
|
||||
if error:
|
||||
session.rollback()
|
||||
session.close()
|
||||
break
|
||||
|
||||
session.commit()
|
||||
session.close()
|
||||
else:
|
||||
session.commit()
|
||||
session.close()
|
||||
|
||||
time.sleep(profile.consumer_delay)
|
||||
|
|
Loading…
Reference in a new issue