1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 import httplib, urlparse, base64, urllib
16 import os, threading, numbers, socket
17 from datetime import datetime, timedelta
18 try: import simplejson as json
19 except ImportError: import json
20
21 if 'SCALARIS_JSON_URL' in os.environ and os.environ['SCALARIS_JSON_URL'] != '':
22 DEFAULT_URL = os.environ['SCALARIS_JSON_URL']
23 else:
24 DEFAULT_URL = 'http://localhost:8000'
25 """default URL and port to a scalaris node"""
26 DEFAULT_PATH = '/jsonrpc.yaws'
27 """path to the json rpc page"""
30 """
31 Abstracts connections to scalaris using JSON
32 """
33
35 """
36 Creates a JSON connection to the given URL using the given TCP timeout
37 """
38 try:
39 uri = urlparse.urlparse(url)
40 self._conn = httplib.HTTPConnection(uri.hostname, uri.port,
41 timeout = timeout)
42 except Exception as instance:
43 raise ConnectionError(instance)
44
45 - def callp(self, path, function, params, retry_if_bad_status = True):
46 return self.call(function, params, path = path, retry_if_bad_status = retry_if_bad_status)
47
48 - def call(self, function, params, path = DEFAULT_PATH, retry_if_bad_status = True):
49 """
50 Calls the given function with the given parameters via the JSON
51 interface of scalaris.
52 """
53 params2 = {'jsonrpc': '2.0',
54 'method': function,
55 'params': params,
56 'id': 0}
57 try:
58 data = None
59 response = None
60
61 params_json = json.dumps(params2, separators=(',',':'))
62 headers = {"Content-type": "application/json; charset=utf-8"}
63
64 self._conn.request("POST", path, urllib.quote(params_json), headers)
65 response = self._conn.getresponse()
66
67 data = response.read().decode('utf-8')
68 if (response.status < 200 or response.status >= 300):
69 raise ConnectionError(data, response = response)
70 response_json = json.loads(data)
71 return response_json['result']
72 except httplib.BadStatusLine as instance:
73
74 self.close()
75 if retry_if_bad_status:
76 return self.call(function, params, path = path, retry_if_bad_status = False)
77 else:
78 raise ConnectionError(data, response = response, error = instance)
79 except ConnectionError:
80
81 self.close()
82 raise
83 except Exception as instance:
84
85 self.close()
86 raise ConnectionError(data, response = response, error = instance)
87
88 @staticmethod
90 """
91 Encodes the value to the form required by the scalaris JSON API
92 """
93 if isinstance(value, bytearray):
94 return {'type': 'as_bin', 'value': (base64.b64encode(bytes(value))).decode('ascii')}
95 else:
96 return {'type': 'as_is', 'value': value}
97
98 @staticmethod
100 """
101 Decodes the value from the scalaris JSON API form to a native type
102 """
103 if ('type' not in value) or ('value' not in value):
104 raise UnknownError(value)
105 if value['type'] == 'as_bin':
106 return bytearray(base64.b64decode(value['value'].encode('ascii')))
107 else:
108 return value['value']
109
110
111
112 @staticmethod
114 """
115 Processes the result of some Scalaris operation and raises a
116 TimeoutError if found.
117 """
118 if result == {'status': 'fail', 'reason': 'timeout'}:
119 raise TimeoutError(result)
120
121
122
123 @staticmethod
125 """
126 Processes the result of a read operation.
127 Returns the read value on success.
128 Raises the appropriate exception if the operation failed.
129 """
130 if isinstance(result, dict) and 'status' in result and len(result) == 2:
131 if result['status'] == 'ok' and 'value' in result:
132 return JSONConnection.decode_value(result['value'])
133 elif result['status'] == 'fail' and 'reason' in result:
134 if result['reason'] == 'timeout':
135 raise TimeoutError(result)
136 elif result['reason'] == 'not_found':
137 raise NotFoundError(result)
138 raise UnknownError(result)
139
140
141
142 @staticmethod
144 """
145 Processes the result of a write operation.
146 Raises the appropriate exception if the operation failed.
147 """
148 if isinstance(result, dict):
149 if result == {'status': 'ok'}:
150 return None
151 elif result == {'status': 'fail', 'reason': 'timeout'}:
152 raise TimeoutError(result)
153 raise UnknownError(result)
154
155
156
157
158 @staticmethod
160 """
161 Processes the result of a commit operation.
162 Raises the appropriate exception if the operation failed.
163 """
164 if isinstance(result, dict) and 'status' in result:
165 if result == {'status': 'ok'}:
166 return None
167 elif result['status'] == 'fail' and 'reason' in result:
168 if len(result) == 2 and result['reason'] == 'timeout':
169 raise TimeoutError(result)
170 elif len(result) == 3 and result['reason'] == 'abort' and 'keys' in result:
171 raise AbortError(result, result['keys'])
172 raise UnknownError(result)
173
174
175
176 @staticmethod
178 """
179 Processes the result of a add_del_on_list operation.
180 Raises the appropriate exception if the operation failed.
181 """
182 if isinstance(result, dict) and 'status' in result:
183 if result == {'status': 'ok'}:
184 return None
185 elif result['status'] == 'fail' and 'reason' in result:
186 if len(result) == 2:
187 if result['reason'] == 'timeout':
188 raise TimeoutError(result)
189 elif result['reason'] == 'not_a_list':
190 raise NotAListError(result)
191 raise UnknownError(result)
192
193
194
195 @staticmethod
197 """
198 Processes the result of a add_on_nr operation.
199 Raises the appropriate exception if the operation failed.
200 """
201 if isinstance(result, dict) and 'status' in result:
202 if result == {'status': 'ok'}:
203 return None
204 elif result['status'] == 'fail' and 'reason' in result:
205 if len(result) == 2:
206 if result['reason'] == 'timeout':
207 raise TimeoutError(result)
208 elif result['reason'] == 'not_a_number':
209 raise NotANumberError(result)
210 raise UnknownError(result)
211
212
213
214
215 @staticmethod
217 """
218 Processes the result of a test_and_set operation.
219 Raises the appropriate exception if the operation failed.
220 """
221 if isinstance(result, dict) and 'status' in result:
222 if result == {'status': 'ok'}:
223 return None
224 elif result['status'] == 'fail' and 'reason' in result:
225 if len(result) == 2:
226 if result['reason'] == 'timeout':
227 raise TimeoutError(result)
228 elif result['reason'] == 'not_found':
229 raise NotFoundError(result)
230 elif result['reason'] == 'key_changed' and 'value' in result and len(result) == 3:
231 raise KeyChangedError(result, JSONConnection.decode_value(result['value']))
232 raise UnknownError(result)
233
234
235
236 @staticmethod
238 """
239 Processes the result of a delete operation.
240 Returns the tuple
241 (<success (True | 'timeout')>, <number of deleted items>, <detailed results>) on success.
242 Raises the appropriate exception if the operation failed.
243 """
244 if isinstance(result, dict) and 'ok' in result and 'results' in result:
245 if 'failure' not in result:
246 return (True, result['ok'], result['results'])
247 elif result['failure'] == 'timeout':
248 return ('timeout', result['ok'], result['results'])
249 raise UnknownError(result)
250
251
252 @staticmethod
254 """
255 Creates a new DeleteResult from the given result list.
256 """
257 ok = 0
258 locks_set = 0
259 undefined = 0
260 if isinstance(result, list):
261 for element in result:
262 if element == 'ok':
263 ok += 1
264 elif element == 'locks_set':
265 locks_set += 1
266 elif element == 'undef':
267 undefined += 1
268 else:
269 raise UnknownError('Unknown reason ' + element + 'in ' + result)
270 return DeleteResult(ok, locks_set, undefined)
271 raise UnknownError('Unknown result ' + result)
272
273
274
275
276 @staticmethod
278 """
279 Processes the result of a req_list operation of the Transaction class.
280 Returns the tuple (<tlog>, <result>) on success.
281 Raises the appropriate exception if the operation failed.
282 """
283 if 'tlog' not in result or 'results' not in result or \
284 not isinstance(result['results'], list):
285 raise UnknownError(result)
286 return (result['tlog'], result['results'])
287
288
289
290 @staticmethod
292 """
293 Processes the result of a req_list operation of the TransactionSingleOp class.
294 Returns <result> on success.
295 Raises the appropriate exception if the operation failed.
296 """
297 if not isinstance(result, list):
298 raise UnknownError(result)
299 return result
300
301
302 @staticmethod
304 """
305 Processes the result of a api_vm/get_version operation.
306 Raises the appropriate exception if the operation failed.
307 """
308 if isinstance(result, dict) and 'status' in result and 'value' in result:
309 if result['status'] == 'ok':
310 return result['value']
311 raise UnknownError(result)
312
313
314
315
316
317
318
319
320
321
322 @staticmethod
324 """
325 Processes the result of a api_vm/get_info operation.
326 Raises the appropriate exception if the operation failed.
327 """
328 if isinstance(result, dict) and 'status' in result and 'value' in result:
329 value = result['value']
330 if result['status'] == 'ok' and \
331 'scalaris_version' in value and 'erlang_version' in value and \
332 'mem_total' in value and 'uptime' in value and \
333 'erlang_node' in value and 'ip' in value and \
334 'port' in value and 'yaws_port' in value:
335 try:
336 return ScalarisVM.GetInfoResult(value['scalaris_version'],
337 value['erlang_version'],
338 int(value['mem_total']),
339 int(value['uptime']),
340 value['erlang_node'],
341 value['ip'],
342 int(value['port']),
343 int(value['yaws_port']))
344 except:
345 pass
346 raise UnknownError(result)
347
348
349 @staticmethod
351 """
352 Processes the result of a api_vm/number_of_nodes operation.
353 Raises the appropriate exception if the operation failed.
354 """
355 if isinstance(result, dict) and 'status' in result and 'value' in result:
356 if result['status'] == 'ok':
357 try:
358 return int(result['value'])
359 except:
360 pass
361 raise UnknownError(result)
362
363
364 @staticmethod
366 """
367 Processes the result of a api_vm/get_nodes operation.
368 Raises the appropriate exception if the operation failed.
369 """
370 if isinstance(result, dict) and 'status' in result and 'value' in result:
371 if result['status'] == 'ok' and isinstance(result['value'], list):
372 return result['value']
373 raise UnknownError(result)
374
375
376 @staticmethod
378 """
379 Processes the result of a api_vm/add_nodes operation.
380 Raises the appropriate exception if the operation failed.
381 """
382 if isinstance(result, dict) and 'status' in result and 'ok' in result and 'failed' in result:
383 if result['status'] == 'ok' and isinstance(result['ok'], list) and isinstance(result['failed'], list):
384 return (result['ok'], result['failed'])
385 raise UnknownError(result)
386
387
388 @staticmethod
390 """
391 Processes the result of a api_vm/shutdown_node and api_vm/kill_node operations.
392 Raises the appropriate exception if the operation failed.
393 """
394 if result == {'status': 'ok'}:
395 return True
396 if result == {'status': 'not_found'}:
397 return False
398 raise UnknownError(result)
399
400
401 @staticmethod
403 """
404 Processes the result of a api_vm/shutdown_nodes and api_vm/kill_nodes operations.
405 Raises the appropriate exception if the operation failed.
406 """
407 if isinstance(result, dict) and 'status' in result and 'ok' in result and \
408 result['status'] == 'ok' and isinstance(result['ok'], list):
409 return result['ok']
410 raise UnknownError(result)
411
412
413 @staticmethod
415 """
416 Processes the result of a api_vm/shutdown_nodes_by_name and api_vm/kill_nodes_by_name operations.
417 Raises the appropriate exception if the operation failed.
418 """
419 if isinstance(result, dict) and 'status' in result and 'ok' in result and 'not_found' in result:
420 if result['status'] == 'ok' and isinstance(result['ok'], list) and isinstance(result['not_found'], list):
421 return (result['ok'], result['not_found'])
422 raise UnknownError(result)
423
424
425 @staticmethod
427 """
428 Processes the result of a api_vm/shutdown_vm and api_vm/kill_vm operations.
429 Raises the appropriate exception if the operation failed.
430 """
431 if result == {'status': 'ok'}:
432 return None
433 raise UnknownError(result)
434
435
436
437
438
439
440 @staticmethod
442 """
443 Processes the result of a api_vm/get_other_vms operation.
444 Raises the appropriate exception if the operation failed.
445 """
446 if isinstance(result, dict) and 'status' in result and 'value' in result:
447 value = result['value']
448 if result['status'] == 'ok' and isinstance(value, list):
449 vms = []
450 try:
451 for vm in value:
452 if 'erlang_node' in vm and 'ip' in vm and \
453 'port' in vm and 'yaws_port' in vm:
454 vms.append('http://' + vm['ip'] + ':' + str(int(vm['yaws_port'])))
455 else:
456 raise UnknownError(result)
457 return vms
458 except:
459 pass
460 raise UnknownError(result)
461
462
463 @staticmethod
465 if isinstance(result, dict) and 'status' in result:
466 if result['status'] == 'ok':
467 return True
468 else:
469 return False
470 raise UnknownError(result)
471
472
473
474
475 @staticmethod
477 if isinstance(result, dict) and 'status' in result:
478 if result['status'] == 'ok' and 'value' in result and isinstance(result['value'], int):
479 return result['value']
480
481 if result['status'] == 'error' and 'reason' in result:
482 if result['reason'] == 'resp_timeout':
483 raise TimeoutError(result)
484 elif result['reason'] == 'autoscale_false':
485 raise ConfigError(result)
486 raise UnknownError(result)
487
488
489
490
491 @staticmethod
493 if isinstance(result, dict) and 'status' in result:
494 if result['status'] == 'ok':
495 return result['status']
496
497 if result['status'] == 'error':
498 if result['reason'] == 'locked':
499 raise LockError(result)
500 elif result['reason'] == 'resp_timeout':
501 raise TimeoutError(result)
502 elif result['reason'] == 'autoscale_false':
503 raise ConfigError(result)
504
505 return UnknownError(result)
506
507
508
509
510 @staticmethod
512 if isinstance(result, dict) and 'status' in result:
513 if result['status'] == 'ok':
514 return result['status']
515
516 if result['status'] == 'error':
517 if result['reason'] == 'not_locked':
518 raise LockError(result)
519 elif result['reason'] == 'resp_timeout':
520 raise TimeoutError(result)
521 elif result['reason'] == 'autoscale_false':
522 raise ConfigError(result)
523
524 return UnknownError(result)
525
526
527 @staticmethod
529 """
530 Processes the result of a nop operation.
531 Raises the appropriate exception if the operation failed.
532 """
533 if result != 'ok':
534 raise UnknownError(result)
535
536 @staticmethod
538 """
539 Returns a new ReqList object allowing multiple parallel requests for
540 the Transaction class.
541 """
542 return _JSONReqListTransaction(other)
543
544 @staticmethod
546 """
547 Returns a new ReqList object allowing multiple parallel requests for
548 the TransactionSingleOp class.
549 """
550 return _JSONReqListTransactionSingleOp(other)
551
554
556 """Base class for errors in the scalaris package."""
557
559 """
560 Exception that is thrown if a the commit of a write operation on a scalaris
561 ring fails.
562 """
563
564 - def __init__(self, raw_result, failed_keys):
565 self.raw_result = raw_result
566 self.failed_keys = failed_keys
568 return repr(self.raw_result)
569
571 """
572 Exception that is thrown if an operation on a scalaris ring fails because
573 a connection does not exist or has been disconnected.
574 """
575
576 - def __init__(self, raw_result, response = None, error = None):
577 self.raw_result = raw_result
578 self.response = response
579 self.error = error
581 result_str = ''
582 if self.response is not None:
583 result_str += 'status: ' + str(self.response.status)
584 result_str += ', reason: ' + self.response.reason + '\n'
585 if self.error is not None:
586 result_str += 'error: ' + repr(self.error) + '\n'
587 result_str += 'data: ' + repr(self.raw_result)
588 return result_str
589
591 """
592 Exception that is thrown if a test_and_set operation on a scalaris ring
593 fails because the old value did not match the expected value.
594 """
595
596 - def __init__(self, raw_result, old_value):
597 self.raw_result = raw_result
598 self.old_value = old_value
600 return repr(self.raw_result) + ', old value: ' + repr(self.old_value)
601
603 """
604 Exception that is thrown if a delete operation on a scalaris ring fails
605 because no scalaris node was found.
606 """
607
609 self.raw_result = raw_result
611 return repr(self.raw_result)
612
614 """
615 Exception that is thrown if a read operation on a scalaris ring fails
616 because the key did not exist before.
617 """
618
620 self.raw_result = raw_result
622 return repr(self.raw_result)
623
625 """
626 Exception that is thrown if a add_del_on_list operation on a scalaris ring
627 fails because the participating values are not lists.
628 """
629
631 self.raw_result = raw_result
633 return repr(self.raw_result)
634
636 """
637 Exception that is thrown if a add_del_on_list operation on a scalaris ring
638 fails because the participating values are not numbers.
639 """
640
642 self.raw_result = raw_result
644 return repr(self.raw_result)
645
647 """
648 Exception that is thrown if a read or write operation on a scalaris ring
649 fails due to a timeout.
650 """
651
653 self.raw_result = raw_result
655 return repr(self.raw_result)
656
658 """
659 Exception that is thrown if a autoscale operation fails, because it was not
660 configured correctly.
661 """
663 self.raw_result = raw_result
665 return repr(self.raw_result)
666
668 """
669 Exception that is thrown if a autoscale lock/unlock operation fails,
670 because of a wrong lock state, i.e. lock when is already locked or unlock
671 when not locked.
672 """
674 self.raw_result = raw_result
676 return repr(self.raw_result)
677
679 """
680 Generic exception that is thrown during operations on a scalaris ring, e.g.
681 if an unknown result has been returned.
682 """
683
685 self.raw_result = raw_result
687 return repr(self.raw_result)
688
690 """
691 Stores the result of a delete operation.
692 """
693 - def __init__(self, ok, locks_set, undefined):
694 self.ok = ok
695 self.locks_set = locks_set
696 self.undefined = undefined
697
699 """
700 Implements a simple (thread-safe) connection pool for Scalaris connections.
701 """
702
704 """
705 Create a new connection pool with the given maximum number of connections.
706 """
707 self._max_connections = max_connections
708 self._available_conns = []
709 self._checked_out_sema = threading.BoundedSemaphore(value=max_connections)
710 self._wait_cond = threading.Condition()
711
713 """
714 Creates a new connection for the pool. Override this to use some other
715 connection class than JSONConnection.
716 """
717 return JSONConnection()
718
720 """
721 Gets a connection from the pool. Creates a new connection if necessary.
722 Returns <tt>None</tt> if the maximum number of connections has already
723 been hit.
724 """
725 conn = None
726 if self._max_connections == 0:
727 conn = self._new_connection()
728 elif self._checked_out_sema.acquire(False):
729 try:
730 conn = self._available_conns.pop(0)
731 except IndexError:
732 conn = self._new_connection()
733 return conn
734
736 """
737 Tries to get a valid connection from the pool waiting at most
738 the given timeout. If timeout is an integer, it will be interpreted as
739 a number of milliseconds. Alternatively, timeout can be given as a
740 datetime.timedelta. Creates a new connection if necessary
741 and the maximum number of connections has not been hit yet.
742 If the timeout is hit and no connection is available, <tt>None</tt> is
743 returned.
744 """
745 if timeout == None:
746 return self._get_connection()
747 else:
748 if isinstance(timeout, numbers.Integral ):
749 timeout = timedelta(milliseconds=timeout)
750 start = datetime.now()
751 while True:
752 conn = self._get_connection()
753 if not conn is None:
754 return conn
755 self._wait_cond.wait(timeout.microseconds / 1000.0)
756 end = datetime.now()
757 if end - start > timeout:
758 return None
759
761 """
762 Puts the given connection back into the pool.
763 """
764 self._available_conns.append(connection)
765 self._checked_out_sema.release()
766 self._wait_cond.notify_all()
767
769 """
770 Close all connections to scalaris.
771 """
772 for conn in self._available_conns:
773 conn.close()
774 self._available_conns = []
775
777 """
778 Single write or read operations on scalaris.
779 """
780
782 """
783 Create a new object using the given connection
784 """
785 if conn is None:
786 conn = JSONConnection()
787 self._conn = conn
788
790 """
791 Returns a new ReqList object allowing multiple parallel requests.
792 """
793 return self._conn.new_req_list_tso(other)
794
796 """
797 Issues multiple parallel requests to scalaris; each will be committed.
798 NOTE: The execution order of multiple requests on the same key is
799 undefined!
800 Request lists can be created using new_req_list().
801 The returned list has the following form:
802 [{'status': 'ok'} or {'status': 'ok', 'value': xxx} or
803 {'status': 'fail', 'reason': 'timeout' or 'abort' or 'not_found'}].
804 Elements of this list can be processed with process_result_read() and
805 process_result_write().
806 """
807 result = self._conn.callp('/api/tx.yaws', 'req_list_commit_each', [reqlist.get_requests()])
808 result = self._conn.process_result_req_list_tso(result)
809 return result
810
812 """
813 Processes a result element from the list returned by req_list() which
814 originated from a read operation.
815 Returns the read value on success.
816 Raises the appropriate exceptions if a failure occurred during the
817 operation.
818 Beware: lists of (small) integers may be (falsely) returned as a string -
819 use str_to_list() to convert such strings.
820 """
821 return self._conn.process_result_read(result)
822
824 """
825 Processes a result element from the list returned by req_list() which
826 originated from a write operation.
827 Raises the appropriate exceptions if a failure occurred during the
828 operation.
829 """
830 self._conn.check_fail_abort(result)
831 return self._conn.process_result_write(result)
832
834 """
835 Processes a result element from the list returned by req_list() which
836 originated from a add_del_on_list operation.
837 Raises the appropriate exceptions if a failure occurred during the
838 operation.
839 """
840 self._conn.check_fail_abort(result)
841 self._conn.process_result_add_del_on_list(result)
842
844 """
845 Processes a result element from the list returned by req_list() which
846 originated from a add_on_nr operation.
847 Raises the appropriate exceptions if a failure occurred during the
848 operation.
849 """
850 self._conn.check_fail_abort(result)
851 self._conn.process_result_add_on_nr(result)
852
854 """
855 Processes a result element from the list returned by req_list() which
856 originated from a test_and_set operation.
857 Raises the appropriate exceptions if a failure occurred during the
858 operation.
859 """
860 self._conn.check_fail_abort(result)
861 self._conn.process_result_test_and_set(result)
862
863 - def read(self, key):
864 """
865 Read the value at key.
866 Beware: lists of (small) integers may be (falsely) returned as a string -
867 use str_to_list() to convert such strings.
868 """
869 result = self._conn.callp('/api/tx.yaws', 'read', [key])
870 return self._conn.process_result_read(result)
871
872 - def write(self, key, value):
880
882 """
883 Changes the list stored at the given key, i.e. first adds all items in
884 to_add then removes all items in to_remove.
885 Both, to_add and to_remove, must be lists.
886 Assumes en empty list if no value exists at key.
887 """
888 result = self._conn.callp('/api/tx.yaws', 'add_del_on_list', [key, to_add, to_remove])
889 self._conn.check_fail_abort(result)
890 self._conn.process_result_add_del_on_list(result)
891
893 """
894 Changes the number stored at the given key, i.e. adds some value.
895 Assumes 0 if no value exists at key.
896 """
897 result = self._conn.callp('/api/tx.yaws', 'add_on_nr', [key, to_add])
898 self._conn.check_fail_abort(result)
899 self._conn.process_result_add_on_nr(result)
900
902 """
903 Atomic test and set, i.e. if the old value at key is old_value, then
904 write new_value.
905 """
906 old_value = self._conn.encode_value(old_value)
907 new_value = self._conn.encode_value(new_value)
908 result = self._conn.callp('/api/tx.yaws', 'test_and_set', [key, old_value, new_value])
909 self._conn.check_fail_abort(result)
910 self._conn.process_result_test_and_set(result)
911
912 - def nop(self, value):
913 """
914 No operation (may be used for measuring the JSON overhead).
915 """
916 value = self._conn.encode_value(value)
917 result = self._conn.callp('/api/tx.yaws', 'nop', [value])
918 self._conn.process_result_nop(result)
919
921 """
922 Close the connection to scalaris
923 (it will automatically be re-opened on the next request).
924 """
925 self._conn.close()
926
928 """
929 Write or read operations on scalaris inside a transaction.
930 """
931
933 """
934 Create a new object using the given connection
935 """
936 if conn is None:
937 conn = JSONConnection()
938 self._conn = conn
939 self._tlog = None
940
942 """
943 Returns a new ReqList object allowing multiple parallel requests.
944 """
945 return self._conn.new_req_list_t(other)
946
948 """
949 Issues multiple parallel requests to scalaris.
950 Request lists can be created using new_req_list().
951 The returned list has the following form:
952 [{'status': 'ok'} or {'status': 'ok', 'value': xxx} or
953 {'status': 'fail', 'reason': 'timeout' or 'abort' or 'not_found'}].
954 Elements of this list can be processed with process_result_read() and
955 process_result_write().
956 A commit (at the end of the request list) will be automatically checked
957 for its success.
958 """
959 if self._tlog is None:
960 result = self._conn.callp('/api/tx.yaws', 'req_list', [reqlist.get_requests()])
961 else:
962 result = self._conn.callp('/api/tx.yaws', 'req_list', [self._tlog, reqlist.get_requests()])
963 (tlog, result) = self._conn.process_result_req_list_t(result)
964 self._tlog = tlog
965 if reqlist.is_commit():
966 self._process_result_commit(result[-1])
967
968 self._tlog = None
969 return result
970
972 """
973 Processes a result element from the list returned by req_list() which
974 originated from a read operation.
975 Returns the read value on success.
976 Raises the appropriate exceptions if a failure occurred during the
977 operation.
978 Beware: lists of (small) integers may be (falsely) returned as a string -
979 use str_to_list() to convert such strings.
980 """
981 return self._conn.process_result_read(result)
982
984 """
985 Processes a result element from the list returned by req_list() which
986 originated from a write operation.
987 Raises the appropriate exceptions if a failure occurred during the
988 operation.
989 """
990 return self._conn.process_result_write(result)
991
993 """
994 Processes a result element from the list returned by req_list() which
995 originated from a add_del_on_list operation.
996 Raises the appropriate exceptions if a failure occurred during the
997 operation.
998 """
999 self._conn.process_result_add_del_on_list(result)
1000
1002 """
1003 Processes a result element from the list returned by req_list() which
1004 originated from a add_on_nr operation.
1005 Raises the appropriate exceptions if a failure occurred during the
1006 operation.
1007 """
1008 self._conn.process_result_add_on_nr(result)
1009
1011 """
1012 Processes a result element from the list returned by req_list() which
1013 originated from a test_and_set operation.
1014 Raises the appropriate exceptions if a failure occurred during the
1015 operation.
1016 """
1017 self._conn.process_result_test_and_set(result)
1018
1020 """
1021 Processes a result element from the list returned by req_list() which
1022 originated from a commit operation.
1023 Raises the appropriate exceptions if a failure occurred during the
1024 operation.
1025 """
1026 return self._conn.process_result_commit(result)
1027
1029 """
1030 Issues a commit operation to scalaris validating the previously
1031 created operations inside the transaction.
1032 """
1033 result = self.req_list(self.new_req_list().add_commit())[0]
1034 self._process_result_commit(result)
1035
1036 self._tlog = None
1037
1039 """
1040 Aborts all previously created operations inside the transaction.
1041 """
1042 self._tlog = None
1043
1044 - def read(self, key):
1045 """
1046 Issues a read operation to scalaris, adds it to the current
1047 transaction and returns the result.
1048 Beware: lists of (small) integers may be (falsely) returned as a string -
1049 use str_to_list() to convert such strings.
1050 """
1051 result = self.req_list(self.new_req_list().add_read(key))[0]
1052 return self.process_result_read(result)
1053
1054 - def write(self, key, value):
1061
1063 """
1064 Issues a add_del_on_list operation to scalaris and adds it to the
1065 current transaction.
1066 Changes the list stored at the given key, i.e. first adds all items in
1067 to_add then removes all items in to_remove.
1068 Both, to_add and to_remove, must be lists.
1069 Assumes en empty list if no value exists at key.
1070 """
1071 result = self.req_list(self.new_req_list().add_add_del_on_list(key, to_add, to_remove))[0]
1072 self.process_result_add_del_on_list(result)
1073
1075 """
1076 Issues a add_on_nr operation to scalaris and adds it to the
1077 current transaction.
1078 Changes the number stored at the given key, i.e. adds some value.
1079 Assumes 0 if no value exists at key.
1080 """
1081 result = self.req_list(self.new_req_list().add_add_on_nr(key, to_add))[0]
1082 self.process_result_add_on_nr(result)
1083
1085 """
1086 Issues a test_and_set operation to scalaris and adds it to the
1087 current transaction.
1088 Atomic test and set, i.e. if the old value at key is old_value, then
1089 write new_value.
1090 """
1091 result = self.req_list(self.new_req_list().add_test_and_set(key, old_value, new_value))[0]
1092 self.process_result_test_and_set(result)
1093
1094 - def nop(self, value):
1095 """
1096 No operation (may be used for measuring the JSON overhead).
1097 """
1098 value = self._conn.encode_value(value)
1099 result = self._conn.callp('/api/tx.yaws', 'nop', [value])
1100 self._conn.process_result_nop(result)
1101
1103 """
1104 Close the connection to scalaris
1105 (it will automatically be re-opened on the next request).
1106 """
1107 self._conn.close()
1108
1110 """
1111 Generic request list.
1112 """
1113
1115 """
1116 Create a new object using a JSON connection.
1117 """
1118 self._requests = []
1119 self._is_commit = False
1120 if other is not None:
1121 self.extend(other)
1122
1124 """
1125 Adds a read operation to the request list.
1126 """
1127 if (self._is_commit):
1128 raise RuntimeError("No further request supported after a commit!")
1129 self._requests.append({'read': key})
1130 return self
1131
1133 """
1134 Adds a write operation to the request list.
1135 """
1136 if (self._is_commit):
1137 raise RuntimeError("No further request supported after a commit!")
1138 self._requests.append({'write': {key: JSONConnection.encode_value(value)}})
1139 return self
1140
1142 """
1143 Adds a add_del_on_list operation to the request list.
1144 """
1145 if (self._is_commit):
1146 raise RuntimeError("No further request supported after a commit!")
1147 self._requests.append({'add_del_on_list': {'key': key, 'add': to_add, 'del': to_remove}})
1148 return self
1149
1151 """
1152 Adds a add_on_nr operation to the request list.
1153 """
1154 if (self._is_commit):
1155 raise RuntimeError("No further request supported after a commit!")
1156 self._requests.append({'add_on_nr': {key: to_add}})
1157 return self
1158
1160 """
1161 Adds a test_and_set operation to the request list.
1162 """
1163 if (self._is_commit):
1164 raise RuntimeError("No further request supported after a commit!")
1165 self._requests.append({'test_and_set': {'key': key, 'old': old_value, 'new': new_value}})
1166 return self
1167
1169 """
1170 Adds a commit operation to the request list.
1171 """
1172 if (self._is_commit):
1173 raise RuntimeError("Only one commit per request list allowed!")
1174 self._requests.append({'commit': ''})
1175 self._is_commit = True
1176 return self
1177
1179 """
1180 Gets the collected requests.
1181 """
1182 return self._requests
1183
1185 """
1186 Returns whether the transactions contains a commit or not.
1187 """
1188 return self._is_commit
1189
1191 """
1192 Checks whether the request list is empty.
1193 """
1194 return self._requests == []
1195
1197 """
1198 Gets the number of requests in the list.
1199 """
1200 return len(self._requests)
1201
1203 """
1204 Adds all requests of the other request list to the end of this list.
1205 """
1206 self._requests.extend(other._requests)
1207 return self
1208
1210 """
1211 Request list for use with Transaction.req_list().
1212 """
1213
1216
1218 """
1219 Request list for use with TransactionSingleOp.req_list() which does not
1220 support commits.
1221 """
1222
1225
1227 """
1228 Adds a commit operation to the request list.
1229 """
1230 raise RuntimeError("No commit allowed in TransactionSingleOp.req_list()!")
1231
1233 """
1234 Non-transactional operations on the replicated DHT of scalaris
1235 """
1236
1238 """
1239 Create a new object using the given connection.
1240 """
1241 if conn is None:
1242 conn = JSONConnection()
1243 self._conn = conn
1244
1245
1246
1247 - def delete(self, key, timeout = 2000):
1248 """
1249 Tries to delete the value at the given key.
1250
1251 WARNING: This function can lead to inconsistent data (e.g. deleted items
1252 can re-appear). Also when re-creating an item the version before the
1253 delete can re-appear.
1254 """
1255 result = self._conn.callp('/api/rdht.yaws', 'delete', [key, timeout])
1256 (success, ok, results) = self._conn.process_result_delete(result)
1257 self._lastDeleteResult = results
1258 if success == True:
1259 return ok
1260 elif success == 'timeout':
1261 raise TimeoutError(result)
1262 else:
1263 raise UnknownError(result)
1264
1266 """
1267 Returns the result of the last call to delete().
1268
1269 NOTE: This function traverses the result list returned by scalaris and
1270 therefore takes some time to process. It is advised to store the returned
1271 result object once generated.
1272 """
1273 return self._conn.create_delete_result(self._lastDeleteResult)
1274
1275 - def nop(self, value):
1276 """
1277 No operation (may be used for measuring the JSON overhead).
1278 """
1279 value = self._conn.encode_value(value)
1280 result = self._conn.callp('/api/rdht.yaws', 'nop', [value])
1281 self._conn.process_result_nop(result)
1282
1284 """
1285 Close the connection to scalaris
1286 (it will automatically be re-opened on the next request).
1287 """
1288 self._conn.close()
1289
1291 """
1292 API for using routing tables
1293 """
1294
1296 """
1297 Create a new object using the given connection.
1298 """
1299 if conn is None:
1300 conn = JSONConnection()
1301 self._conn = conn
1302
1304 result = self._conn.callp('/api/rt.yaws', 'get_replication_factor', [])
1305 if isinstance(result, dict) and 'status' in result and len(result) == 2 and result['status'] == 'ok' and 'value' in result:
1306 return result['value']
1307 else:
1308 raise UnknownError(result)
1309
1311 """
1312 Provides methods to interact with a specific Scalaris (Erlang) VM.
1313 """
1314
1316 - def __init__(self, scalarisVersion, erlangVersion, memTotal, uptime,
1317 erlangNode, ip, port, yawsPort):
1318 self.scalarisVersion = scalarisVersion
1319 self.erlangVersion = erlangVersion
1320 self.memTotal = memTotal
1321 self.uptime = uptime
1322 self.erlangNode = erlangNode
1323 self.ip = ip
1324 self.port = port
1325 self.yawsPort = yawsPort
1326
1328 """
1329 Create a new object using the given connection.
1330 """
1331 if conn is None:
1332 conn = JSONConnection()
1333 self._conn = conn
1334
1336 """
1337 Gets the version of the Scalaris VM of the current connection.
1338 """
1339 result = self._conn.callp('/api/vm.yaws', 'get_version', [])
1340 return self._conn.process_result_vm_get_version(result)
1341
1343 """
1344 Gets some information about the VM and Scalaris.
1345 """
1346 result = self._conn.callp('/api/vm.yaws', 'get_info', [])
1347 return self._conn.process_result_vm_get_info(result)
1348
1350 """
1351 Gets the number of nodes in the Scalaris VM of the current connection.
1352 """
1353 result = self._conn.callp('/api/vm.yaws', 'number_of_nodes', [])
1354 return self._conn.process_result_vm_get_number_of_nodes(result)
1355
1357 """
1358 Gets the names of the nodes in the Scalaris VM of the current connection.
1359 """
1360 result = self._conn.callp('/api/vm.yaws', 'get_nodes', [])
1361 return self._conn.process_result_vm_get_nodes(result)
1362
1364 """
1365 Adds Scalaris nodes to the Scalaris VM of the current connection.
1366 """
1367 result = self._conn.callp('/api/vm.yaws', 'add_nodes', [number])
1368 return self._conn.process_result_vm_add_nodes(result)
1369
1371 """
1372 Shuts down the given node (graceful leave) in the Scalaris VM of the current connection.
1373 """
1374 result = self._conn.callp('/api/vm.yaws', 'shutdown_node', [name])
1375 return self._conn.process_result_vm_delete_node(result)
1376
1378 """
1379 Kills the given node in the Scalaris VM of the current connection.
1380 """
1381 result = self._conn.callp('/api/vm.yaws', 'kill_node', [name])
1382 return self._conn.process_result_vm_delete_node(result)
1383
1385 """
1386 Shuts down the given number of nodes (graceful leave) in the Scalaris VM of the current connection.
1387 """
1388 result = self._conn.callp('/api/vm.yaws', 'shutdown_nodes', [number])
1389 return self._conn.process_result_vm_delete_nodes(result)
1390
1392 """
1393 Kills the given number of nodes in the Scalaris VM of the current connection.
1394 """
1395 result = self._conn.callp('/api/vm.yaws', 'kill_nodes', [number])
1396 return self._conn.process_result_vm_delete_nodes(result)
1397
1399 """
1400 Shuts down the given nodes (graceful leave) in the Scalaris VM of the current connection.
1401 """
1402 result = self._conn.callp('/api/vm.yaws', 'shutdown_nodes_by_name', [names])
1403 return self._conn.process_result_vm_delete_nodes(result)
1404
1406 """
1407 Kills the given nodes in the Scalaris VM of the current connection.
1408 """
1409 result = self._conn.callp('/api/vm.yaws', 'kill_nodes_by_name', [names])
1410 return self._conn.process_result_vm_delete_nodes(result)
1411
1413 """
1414 Retrieves additional nodes from the Scalaris VM of the current
1415 connection for use as URLs in JSONConnection.
1416 """
1417 if maxVMs <= 0:
1418 raise ValueError("max must be an integer > 0")
1419 result = self._conn.callp('/api/vm.yaws', 'get_other_vms', [maxVMs])
1420 return self._conn.process_result_vm_get_other_vms(result)
1421
1423 """
1424 Tells the Scalaris VM of the current connection to shut down gracefully.
1425 """
1426 result = self._conn.callp('/api/vm.yaws', 'shutdown_vm', [])
1427 return self._conn.process_result_vm_delete_vm(result)
1428
1430 """
1431 Kills the Scalaris VM of the current connection.
1432 """
1433 result = self._conn.callp('/api/vm.yaws', 'kill_vm', [])
1434 return self._conn.process_result_vm_delete_vm(result)
1435
1436 - def nop(self, value):
1437 """
1438 No operation (may be used for measuring the JSON overhead).
1439 """
1440 value = self._conn.encode_value(value)
1441 result = self._conn.callp('/api/vm.yaws', 'nop', [value])
1442 self._conn.process_result_nop(result)
1443
1445 """
1446 Close the connection to scalaris
1447 (it will automatically be re-opened on the next request).
1448 """
1449 self._conn.close()
1450
1452 """
1453 Provides methods to interact with autoscale API.
1454 """
1455
1456 api = '/api/autoscale.yaws'
1457
1458 """
1459 Create a new object using the given connection.
1460 """
1462 if conn is None:
1463 conn = JSONConnection()
1464 self._conn = conn
1465
1468
1471
1474
1477
1478 """ API calls """
1482
1486
1490
1494
1496 """
1497 Close the connection to scalaris
1498 (it will automatically be re-opened on the next request).
1499 """
1500 self._conn.close()
1501
1503 """
1504 Converts a string to a list of integers.
1505 If the expected value of a read operation is a list, the returned value
1506 could be (mistakenly) a string if it is a list of integers.
1507 """
1508 if (isinstance(value, str) or isinstance(value, unicode)):
1509 chars = list(value)
1510 return [ord(char) for char in chars]
1511 else:
1512 return value
1513