Introduction
The Engine defines primitive operations to concurrently access/modify data while maintaining consistency. Since it is desirable to build more complex routines from these operations (while also maintaining consistency in concurrent environments), we must build an AtomicOperation protocol at the Engine level, which ConcourseServer can use to safely provide complex routines to the client.
AtomicOperation
- AtomicOperation is a BufferedStore that transports to the Engine (or a Transaction)
- AtomicOperation uses just in time locking so it never block other operations before it goes to commit
- When committing, AtomicOperation grabs locks to create a fence around the resources on which it will operate. It does not release these locks until it is done committing
- AtomicOperation listens for version changes to the resources that it intends to lock. If it is notified about a version change it fails immediately.
TODO: may have to move durability up from Transaction to AtomicOperation
Built-In Atomic Functions
All built-in atomic functions must be defined at the Server level and can only mutate using the engine recognized primitive operations defined in AtomicOperation.
Link
- How do we determine that a link is valid?
- Ping the record? This will tell us if the record currently has any data. What about records with no current data, but some historical data, should we be able to link to those?
- Pinging is not a primitive operation
Revert
Revert key IN record TO timestamp
void revert(String key, long record, long timestamp){ AtomicOperation operation = null; while (operation == null || !operation.commit()) { operation = doRevert(key, record, timestamp, transaction != null ? transactions.get(transaction) : engine); } } AtomicOperation doRevert(String key, long record, long timestamp, Compoundable store) { AtomicOperation operation = AtomicOperation.start(store); Set<TObject> past = operation.fetch(key, record, timestamp); Set<TObject> present = operation.fetch(key, record); Set<TObject> xor = Sets.symmetricDifference(past, present); try { for (TObject value : xor) { if(present.contains(value)) { operation.remove(key, value, record); } else { operation.add(key, value, record); } } return operation; } catch (AtomicStateException e) { return null; }
Clear
CLEAR key IN record
boolean clear(String key, long record){ AtomicOperation operation = null; while(operation == null || !operation.commit()){ operation = doClear(key, record, transaction != null ? transactions.get(transaction) : engine); } return true; } AtomicOperation doClear(String key, long record, Store store){ AtomicOperation operation = AtomicOperation.start(store); AtomicOperation operation = AtomicOperation.start(store); Set<TObject> values = operation.fetch(key, record); for (TObject value : values){ operation.remove(key, value, record); } return operation; }
Set
SET key AS value IN record
//NOTE: CANNOT use the clear() method but must do removes in terms of the atomic operation!! boolean set(String key, TObject value, long record){ AtomicOperation operation = null; while(operation == null || !operation.commit()){ operation = doSet(key, value, record, transaction != null ? transactions.get(transaction) : engine); } return true; } boolean doSet(String key, TObject value, long record, Store store){ AtomicOperation operation = AtomicOperation.start(store); Set<TObject> values = operation.fetch(key, record); for (TObject v : values){ operation.remove(key, v, record); } operation.add(key, value, record); return operation; }
Verify And Swap
See http://en.wikipedia.org/wiki/Compare-and-swap
Verify key AS value1 IN record AND SWAP WITH value2
//NOTE: cannot do automatic retry for this method, so caller must handle that boolean verifyAndSwap(String key, TObject expected, long record, TObject replacement){ AtomicOperation operation = AtomicOperation.start(store); if(operation.verify(key, expected, record){ operation.remove(key, expected, record); operation.add(key, replacement, record); return operation.commit(); } else{ return false; } }
Get And Increment
See http://en.wikipedia.org/wiki/Fetch-and-add
GET key AS value in record AND INCREMENT
The first value for key in record (get returns the first value of a fetch) must be a Number, otherwise this method should throw an exception.
TODO: how to handle floats and doubles?
Number getAndIncrement(String key,long record){ while(true){ value = get(key, record); if(verifyAndSwap(key, value, record, value + 1){ return value; } } }