Module scalaris_bench
[hide private]
[frames] | no frames]

Source Code for Module scalaris_bench

  1  # Copyright 2011-2015 Zuse Institute Berlin 
  2  # 
  3  #    Licensed under the Apache License, Version 2.0 (the "License"); 
  4  #    you may not use this file except in compliance with the License. 
  5  #    You may obtain a copy of the License at 
  6  # 
  7  #        http://www.apache.org/licenses/LICENSE-2.0 
  8  # 
  9  #    Unless required by applicable law or agreed to in writing, software 
 10  #    distributed under the License is distributed on an "AS IS" BASIS, 
 11  #    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 12  #    See the License for the specific language governing permissions and 
 13  #    limitations under the License. 
 14   
 15  import scalaris 
 16  from datetime import datetime 
 17  from threading import Thread 
 18  import time, threading 
 19  import random, string 
 20  import os, sys, traceback 
 21   
 22  _BENCH_DATA_SIZE = 1000 
 23  """The size of a single data item that is send to scalaris.""" 
 24  _benchTime = 0 
 25  """This is used to create different erlang keys for each run.""" 
 26  _PERCENT_TO_REMOVE = 5 
 27  """Cut 5% off of both ends of the result list.""" 
 28  _TESTRUNS = 1; 
 29  """Number of test runs (accumulates results over all test runs).""" 
 30   
 31  if 'SCALARIS_JSON_URLS' in os.environ and os.environ['SCALARIS_JSON_URLS'] != '': 
 32      DEFAULT_URLS = os.environ['SCALARIS_JSON_URLS'].split() 
 33  else: 
 34      DEFAULT_URLS = [scalaris.DEFAULT_URL] 
 35   
36 -def minibench(operations, threads_per_node, benchmarks):
37 """ 38 Default minimal benchmark. 39 40 Tests some strategies for writing key/value pairs to scalaris: 41 1) writing binary objects (random data, size = _BENCH_DATA_SIZE) 42 2) writing string objects (random data, size = _BENCH_DATA_SIZE) 43 each with the given number of consecutive operations and parallel 44 threads per Scalaris node, 45 * first using a new Transaction or TransactionSingleOp for each test, 46 * then using a new Transaction or TransactionSingleOp but re-using a single connection, 47 * and finally re-using a single Transaction or TransactionSingleOp object. 48 """ 49 # The time when the (whole) benchmark suite was started. 50 # This is used to create different erlang keys for each run. 51 _benchTime = _getCurrentMillis() 52 53 parallel_runs = len(DEFAULT_URLS) * threads_per_node; 54 print 'Number of available nodes: ' + str(len(DEFAULT_URLS)) 55 print '-> Using ' + str(parallel_runs) + ' parallel instances per test run...' 56 sys.stdout.flush() 57 58 print 'Benchmark of scalaris.TransactionSingleOp:' 59 sys.stdout.flush() 60 test_types = ['binary', 'string'] 61 test_types_str = ['B', 'S'] 62 columns = ['TransactionSingleOp.write(string, bytearray)', 63 'TransactionSingleOp.write(string, string)'] 64 test_bench = [TransSingleOpBench1, TransSingleOpBench2, TransSingleOpBench3] 65 rows = ['separate connection', 're-use connection', 're-use object'] 66 test_group = 'transsinglebench'; 67 results = _getResultArray(rows, columns) 68 _runBenchAndPrintResults(benchmarks, results, columns, rows, test_types, 69 test_types_str, test_bench, test_group, 1, operations, parallel_runs) 70 71 print '-----' 72 print 'Benchmark of scalaris.Transaction:' 73 sys.stdout.flush() 74 test_types = ['binary', 'string'] 75 test_types_str = ['B', 'S'] 76 columns = ['Transaction.write(string, bytearray)', 77 'Transaction.write(string, string)'] 78 test_bench = [TransBench1, TransBench2, TransBench3] 79 rows = ['separate connection', 're-use connection', 're-use object'] 80 test_group = 'transbench'; 81 results = _getResultArray(rows, columns) 82 _runBenchAndPrintResults(benchmarks, results, columns, rows, test_types, 83 test_types_str, test_bench, test_group, 1, operations, parallel_runs) 84 85 print '-----' 86 print 'Benchmark incrementing an integer key (read+write):' 87 sys.stdout.flush() 88 test_types = ['int'] 89 test_types_str = ['I'] 90 columns = ['Transaction.add_add_on_nr(string, int)'] 91 test_bench = [TransIncrementBench1, TransIncrementBench2, TransIncrementBench3] 92 rows = ['separate connection', 're-use connection', 're-use object'] 93 test_group = 'transbench_inc'; 94 results = _getResultArray(rows, columns) 95 _runBenchAndPrintResults(benchmarks, results, columns, rows, test_types, 96 test_types_str, test_bench, test_group, 7, operations, parallel_runs) 97 98 print '-----' 99 print 'Benchmark read 5 + write 5:' 100 sys.stdout.flush() 101 test_types = ['binary', 'string'] 102 test_types_str = ['B', 'S'] 103 columns = ['Transaction.read(string) + Transaction.write(string, binary)', 104 'Transaction.read(string) + Transaction.write(string, string)'] 105 test_bench = [TransRead5Write5Bench1, TransRead5Write5Bench2, TransRead5Write5Bench3] 106 rows = ['separate connection', 're-use connection', 're-use object'] 107 test_group = 'transbench_r5w5'; 108 results = _getResultArray(rows, columns) 109 _runBenchAndPrintResults(benchmarks, results, columns, rows, test_types, 110 test_types_str, test_bench, test_group, 10, operations, parallel_runs) 111 112 print '-----' 113 print 'Benchmark appending to a String list (read+write):' 114 sys.stdout.flush() 115 test_types = ['string'] 116 test_types_str = ['S'] 117 columns = ['Transaction.add_add_del_on_list(string, [string], [])'] 118 test_bench = [TransAppendToListBench1, TransAppendToListBench2, TransAppendToListBench3] 119 rows = ['separate connection', 're-use connection', 're-use object'] 120 test_group = 'transbench_append'; 121 results = _getResultArray(rows, columns) 122 _runBenchAndPrintResults(benchmarks, results, columns, rows, test_types, 123 test_types_str, test_bench, test_group, 16, operations, parallel_runs)
124
125 -class BenchRunnable(Thread):
126 """ 127 Abstract base class of a test run that is to be run in a thread. 128 """ 129
130 - def __init__(self, key, value, operations):
131 """ 132 Create a new runnable. 133 """ 134 Thread.__init__(self) 135 self._key = key 136 self._value = value 137 self._operations = operations 138 139 self._shouldStop = False 140 self._timeAtStart = 0 141 self._speed = -1
142
143 - def _testBegin(self):
144 """ 145 Call this method when a benchmark is started. 146 Sets the time the benchmark was started. 147 """ 148 self._timeAtStart = _getCurrentMillis()
149
150 - def _testEnd(self, testRuns):
151 """ 152 Call this method when a benchmark is finished. 153 Calculates the time the benchmark took and the number of transactions 154 performed during this time. 155 """ 156 timeTaken = _getCurrentMillis() - self._timeAtStart 157 speed = (testRuns * 1000) / timeTaken 158 return speed
159
160 - def pre_init(self, j = None):
161 """ 162 Will be called before the benchmark starts with all possible 163 variations of "j" in the operation() call. 164 "j" with None is the overall initialisation run at first. 165 """ 166 pass
167
168 - def init(self):
169 """ 170 Will be called at the start of the benchmark. 171 """ 172 pass
173
174 - def cleanup(self):
175 """ 176 Will be called before the end of the benchmark. 177 """ 178 pass
179
180 - def operation(self, j):
181 """ 182 The operation to execute during the benchmark. 183 """ 184 pass
185
186 - def run(self):
187 threading.currentThread().name = "BenchRunnable-" + self._key 188 retry = 0 189 while (retry < 3) and (not self._shouldStop): 190 try: 191 self.pre_init() 192 for j in xrange(self._operations): 193 self.pre_init(j) 194 self._testBegin() 195 self.init() 196 for j in xrange(self._operations): 197 self.operation(j) 198 self.cleanup() 199 self._speed = self._testEnd(self._operations) 200 break 201 except: 202 # _printException() 203 pass 204 retry += 1
205
206 - def getSpeed(self):
207 return self._speed
208
209 - def shouldStop(self):
210 self._shouldStop = True
211
212 -class BenchRunnable2(BenchRunnable):
213 - def __init__(self, key, value, operations):
214 """ 215 Create a new runnable. 216 """ 217 BenchRunnable.__init__(self, key, value, operations)
218
219 - def init(self):
220 """ 221 Will be called at the start of the benchmark. 222 """ 223 self._connection = _getConnection()
224
225 - def cleanup(self):
226 """ 227 Will be called before the end of the benchmark. 228 """ 229 self._connection.close()
230
231 -class TransSingleOpBench1(BenchRunnable):
232 """ 233 Performs a benchmark writing objects using a new TransactionSingleOp object 234 for each test. 235 """
236 - def __init__(self, key, value, operations):
237 BenchRunnable.__init__(self, key, value, operations)
238
239 - def operation(self, j):
240 tx = scalaris.TransactionSingleOp(conn = _getConnection()) 241 tx.write(self._key + '_' + str(j), self._value) 242 tx.close_connection()
243
244 -class TransSingleOpBench2(BenchRunnable2):
245 """ 246 Performs a benchmark writing objects using a new TransactionSingleOp but 247 re-using a single connection for each test. 248 """
249 - def __init__(self, key, value, operations):
250 BenchRunnable2.__init__(self, key, value, operations)
251
252 - def operation(self, j):
253 tx = scalaris.TransactionSingleOp(conn = self._connection) 254 tx.write(self._key + '_' + str(j), self._value)
255
256 -class TransSingleOpBench3(BenchRunnable):
257 """ 258 Performs a benchmark writing objects using a single TransactionSingleOp 259 object for all tests. 260 """
261 - def __init__(self, key, value, operations):
262 BenchRunnable.__init__(self, key, value, operations)
263
264 - def init(self):
265 self._tx = scalaris.TransactionSingleOp(conn = _getConnection())
266
267 - def cleanup(self):
268 self._tx.close_connection()
269
270 - def operation(self, j):
271 self._tx.write(self._key + '_' + str(j), self._value)
272
273 -class TransBench1(BenchRunnable):
274 """ 275 Performs a benchmark writing objects using a new Transaction for each test. 276 """
277 - def __init__(self, key, value, operations):
278 BenchRunnable.__init__(self, key, value, operations)
279
280 - def operation(self, j):
281 tx = scalaris.Transaction(conn = _getConnection()) 282 tx.write(self._key + '_' + str(j), self._value) 283 tx.commit() 284 tx.close_connection()
285
286 -class TransBench2(BenchRunnable2):
287 """ 288 Performs a benchmark writing objects using a new Transaction but re-using a 289 single connection for each test. 290 """
291 - def __init__(self, key, value, operations):
292 BenchRunnable2.__init__(self, key, value, operations)
293
294 - def operation(self, j):
295 tx = scalaris.Transaction(conn = self._connection) 296 tx.write(self._key + '_' + str(j), self._value) 297 tx.commit()
298
299 -class TransBench3(BenchRunnable):
300 """ 301 Performs a benchmark writing objects using a single Transaction object 302 for all tests. 303 """
304 - def __init__(self, key, value, operations):
305 BenchRunnable.__init__(self, key, value, operations)
306
307 - def init(self):
308 self._tx = scalaris.Transaction(conn = _getConnection())
309
310 - def cleanup(self):
311 self._tx.close_connection()
312
313 - def operation(self, j):
314 self._tx.write(self._key + '_' + str(j), self._value) 315 self._tx.commit()
316
317 -class TransIncrementBench(BenchRunnable):
318 """ 319 Performs a benchmark writing integer numbers on a single key and 320 increasing them. 321 Provides convenience methods for the full increment benchmark 322 implementations. 323 """
324 - def __init__(self, key, value, operations):
325 BenchRunnable.__init__(self, key, value, operations)
326
327 - def pre_init(self, j = None):
328 tx_init = scalaris.Transaction(conn = _getConnection()) 329 tx_init.write(self._key, 0) 330 tx_init.commit() 331 tx_init.close_connection()
332
333 - def operation2(self, tx, j):
334 reqs = tx.new_req_list() 335 reqs.add_add_on_nr(self._key, 1).add_commit() 336 # value_old = tx.read(self._key) 337 # reqs.add_write(key, value_old + 1).add_commit() 338 results = tx.req_list(reqs) 339 # tx.process_result_write(results[0]) 340 tx.process_result_add_on_nr(results[0])
341
342 -class TransIncrementBench1(TransIncrementBench):
343 """ 344 Performs a benchmark writing integer numbers on a single key and 345 increasing them using a new Transaction for each test. 346 """
347 - def __init__(self, key, value, operations):
348 TransIncrementBench.__init__(self, key, value, operations)
349
350 - def operation(self, j):
351 tx = scalaris.Transaction(conn = _getConnection()) 352 self.operation2(tx, j) 353 tx.close_connection()
354
355 -class TransIncrementBench2(TransIncrementBench):
356 """ 357 Performs a benchmark writing integer numbers on a single key and 358 increasing them using a new Transaction but re-using a single 359 connection for each test. 360 """
361 - def __init__(self, key, value, operations):
362 TransIncrementBench.__init__(self, key, value, operations)
363
364 - def init(self):
365 self._connection = _getConnection()
366
367 - def cleanup(self):
368 self._connection.close()
369
370 - def operation(self, j):
371 tx = scalaris.Transaction(conn = self._connection) 372 self.operation2(tx, j)
373
374 -class TransIncrementBench3(TransIncrementBench):
375 """ 376 Performs a benchmark writing objects using a single Transaction 377 object for all tests. 378 """
379 - def __init__(self, key, value, operations):
380 TransIncrementBench.__init__(self, key, value, operations)
381
382 - def init(self):
383 self._tx = scalaris.Transaction(conn = _getConnection())
384
385 - def cleanup(self):
386 self._tx.close_connection()
387
388 - def operation(self, j):
389 self.operation2(self._tx, j)
390
391 -class TransReadXWriteXBench(BenchRunnable):
392 """ 393 Performs a benchmark reading X values and overwriting them afterwards 394 inside a transaction. 395 Provides convenience methods for the full read-x, write-x benchmark 396 implementations. 397 """
398 - def __init__(self, key, value, nr_keys, operations):
399 BenchRunnable.__init__(self, key, value, operations) 400 self._keys = [] 401 self._value_write = [] 402 for i in xrange(nr_keys): 403 self._keys.append(key + "_" + str(i)) 404 self._value_write.append(_getRandom(_BENCH_DATA_SIZE, type(value).__name__))
405
406 - def pre_init(self, j = None):
407 value_init = [] 408 for i in xrange(len(self._keys)): 409 value_init.append(_getRandom(_BENCH_DATA_SIZE, type(self._value).__name__)) 410 411 tx_init = scalaris.Transaction(conn = _getConnection()) 412 reqs = tx_init.new_req_list() 413 for i in xrange(len(self._keys)): 414 reqs.add_write(self._keys[i], value_init[i]) 415 reqs.add_commit() 416 results = tx_init.req_list(reqs) 417 for i in xrange(len(self._keys)): 418 tx_init.process_result_write(results[i]) 419 tx_init.close_connection()
420
421 - def operation2(self, tx, j):
422 reqs = tx.new_req_list() 423 # read old values into the transaction 424 for i in xrange(len(self._keys)): 425 reqs.add_read(self._keys[i]) 426 reqs.add_commit() 427 results = tx.req_list(reqs) 428 for i in xrange(len(self._keys)): 429 tx.process_result_read(results[i]) 430 431 # write new values... 432 reqs = tx.new_req_list() 433 for i in xrange(len(self._keys)): 434 value = self._value_write[j % len(self._value_write)] 435 reqs.add_write(self._keys[i], value) 436 reqs.add_commit() 437 results = tx.req_list(reqs) 438 for i in xrange(len(self._keys)): 439 tx.process_result_write(results[i])
440
441 -class TransRead5Write5Bench1(TransReadXWriteXBench):
442 """ 443 Performs a benchmark reading 5 values and overwriting them afterwards 444 inside a transaction using a new Transaction for each test. 445 """
446 - def __init__(self, key, value, operations):
447 TransReadXWriteXBench.__init__(self, key, value, 5, operations)
448
449 - def operation(self, j):
450 tx = scalaris.Transaction(conn = _getConnection()) 451 self.operation2(tx, j) 452 tx.close_connection()
453
454 -class TransRead5Write5Bench2(TransReadXWriteXBench):
455 """ 456 Performs a benchmark reading 5 values and overwriting them afterwards 457 inside a transaction using a new Transaction but re-using a single 458 connection for each test. 459 """
460 - def __init__(self, key, value, operations):
461 TransReadXWriteXBench.__init__(self, key, value, 5, operations)
462
463 - def init(self):
464 self._connection = _getConnection()
465
466 - def cleanup(self):
467 self._connection.close()
468
469 - def operation(self, j):
470 tx = scalaris.Transaction(conn = self._connection) 471 self.operation2(tx, j)
472
473 -class TransRead5Write5Bench3(TransReadXWriteXBench):
474 """ 475 Performs a benchmark reading 5 values and overwriting them afterwards 476 inside a transaction using a single Transaction object for all tests. 477 """
478 - def __init__(self, key, value, operations):
479 TransReadXWriteXBench.__init__(self, key, value, 5, operations)
480
481 - def init(self):
482 self._tx = scalaris.Transaction(conn = _getConnection())
483
484 - def cleanup(self):
485 self._tx.close_connection()
486
487 - def operation(self, j):
488 self.operation2(self._tx, j)
489
490 -class TransAppendToListBench(BenchRunnable):
491 """ 492 Performs a benchmark adding values to a list inside a transaction. 493 Provides convenience methods for the full append-to-list benchmark 494 implementations. 495 """
496 - def __init__(self, key, value, nr_keys, operations):
497 BenchRunnable.__init__(self, key, value, operations) 498 self._value_init = [] 499 for _i in xrange(nr_keys): 500 self._value_init.append(_getRandom(_BENCH_DATA_SIZE, 'string'))
501
502 - def pre_init(self, j = None):
503 if j is None: 504 return 505 tx_init = scalaris.Transaction(conn = _getConnection()) 506 reqs = tx_init.new_req_list() 507 reqs.add_write(self._key + '_' + str(j), self._value_init).add_commit() 508 results = tx_init.req_list(reqs) 509 tx_init.process_result_write(results[0]) 510 tx_init.close_connection()
511
512 - def operation2(self, tx, j):
513 reqs = tx.new_req_list() 514 reqs.add_add_del_on_list(self._key + '_' + str(j), [self._value], []).add_commit() 515 # read old list into the transaction 516 # list = scalaris.str_to_list(tx.read(self._key + '_' + str(j))) 517 # write new list ... 518 # list.append(self._value) 519 # reqs.add_write(self._key + '_' + str(j), list).add_commit()) 520 521 results = tx.req_list(reqs) 522 # tx.process_result_write(results[0]) 523 tx.process_result_add_del_on_list(results[0])
524
525 -class TransAppendToListBench1(TransAppendToListBench):
526 """ 527 Performs a benchmark adding values to a list inside a transaction 528 using a new Transaction for each test. 529 """
530 - def __init__(self, key, value, operations):
531 TransAppendToListBench.__init__(self, key, value, 5, operations)
532
533 - def operation(self, j):
534 tx = scalaris.Transaction(conn = _getConnection()) 535 self.operation2(tx, j) 536 tx.close_connection()
537
538 -class TransAppendToListBench2(TransAppendToListBench):
539 """ 540 Performs a benchmark adding values to a list inside a transaction using a 541 new Transaction but re-using a single connection for each test. 542 """
543 - def __init__(self, key, value, operations):
544 TransAppendToListBench.__init__(self, key, value, 5, operations)
545
546 - def init(self):
547 self._connection = _getConnection()
548
549 - def cleanup(self):
550 self._connection.close()
551
552 - def operation(self, j):
553 tx = scalaris.Transaction(conn = self._connection) 554 self.operation2(tx, j)
555
556 -class TransAppendToListBench3(TransAppendToListBench):
557 """ 558 Performs a benchmark adding values to a list inside a transaction using a 559 single Transaction object for all tests. 560 """
561 - def __init__(self, key, value, operations):
562 TransAppendToListBench.__init__(self, key, value, 5, operations)
563
564 - def init(self):
565 self._tx = scalaris.Transaction(conn = _getConnection())
566
567 - def cleanup(self):
568 self._tx.close_connection()
569
570 - def operation(self, j):
571 self.operation2(self._tx, j)
572
573 -def _getCurrentMillis():
574 """ 575 Gets the number of milliseconds since epoch. 576 """ 577 now = datetime.now() 578 return int(time.mktime(now.timetuple())) * 1000 + (now.microsecond // 1000)
579
580 -def _testBegin():
581 """ 582 Call this method when a benchmark is started. 583 Sets the time the benchmark was started. 584 """ 585 global _timeAtStart 586 _timeAtStart = _getCurrentMillis()
587
588 -def _testEnd(testruns):
589 """ 590 Call this method when a benchmark is finished. 591 Calculates the time the benchmark took and the number of transactions 592 performed during this time. 593 Returns the number of achieved transactions per second. 594 """ 595 global _timeAtStart 596 timeTaken = _getCurrentMillis() - _timeAtStart 597 speed = (testruns * 1000) // timeTaken 598 return speed
599
600 -def _getConnection():
601 url = random.choice(DEFAULT_URLS) 602 return scalaris.JSONConnection(url = url)
603
604 -def _getResultArray(rows, columns):
605 """ 606 Returns a pre-initialized results array with values <tt>-1</tt>. 607 """ 608 results = {} 609 for row in rows: 610 results[row] = {} 611 for column in columns: 612 results[row][column] = -1 613 return results
614
615 -def _getRandom(size, mytype):
616 """ 617 Creates an random string or binary object from <size> random characters/bytes. 618 """ 619 if mytype == 'int': 620 return random.randint(0, 2147483647) 621 elif mytype == 'string' or mytype == 'str': 622 return ''.join(random.choice(string.printable) for _x in xrange(size)) 623 elif mytype == 'binary': 624 return bytearray(random.randrange(0, 256) for _x in xrange(size))
625
626 -def _integrateResults(results, i, worker, failed):
627 """ 628 Integrates the workers' results into the result array. 629 """ 630 try: 631 for bench_thread in worker: 632 if failed >= 3: 633 bench_thread.shouldStop() 634 try: 635 while(bench_thread.isAlive()): # non-blocking join so we are able to receive CTRL-C 636 bench_thread.join(1) 637 except RuntimeError: 638 pass 639 else: 640 try: 641 while(bench_thread.isAlive()): # non-blocking join so we are able to receive CTRL-C 642 bench_thread.join(1) 643 speed = bench_thread.getSpeed() 644 except RuntimeError: 645 speed = -1 646 647 if speed < 0: 648 failed += 1 649 else: 650 results[i] += speed 651 return failed 652 except KeyboardInterrupt: 653 print 'CTRL-C received, aborting...' 654 for bench_thread in worker: 655 bench_thread.shouldStop() 656 sys.exit(1)
657
658 -def _getAvgSpeed(results):
659 """ 660 Calculates the average number of transactions per second from the results 661 of executing 10 transactions per test run. Will remove the top and bottom 662 _PERCENT_TO_REMOVE percent of the sorted results array. 663 Returns the average number of transactions per second. 664 """ 665 results.sort() 666 toRemove = int((len(results) * _PERCENT_TO_REMOVE) // 100); 667 avgSpeed = 0; 668 for i in xrange(toRemove, (len(results) - toRemove)): 669 avgSpeed += results[i] 670 671 avgSpeed //= len(results) - 2 * toRemove 672 return avgSpeed
673
674 -def _runBenchAndPrintResults(benchmarks, results, columns, rows, test_types, 675 test_types_str, test_bench, test_group, 676 first_bench_id, operations, parallel_runs):
677 """ 678 Runs the given benchmarks and prints a results table. 679 """ 680 # assume non-empty results dict: 681 for test in xrange(len(results) * len(results[list(results.keys())[0]])): 682 try: 683 i = test % len(results); 684 j = test // len(results); 685 if (test + first_bench_id) in benchmarks: 686 results[rows[i]][columns[j]] = _runBench(operations, 687 _getRandom(_BENCH_DATA_SIZE, test_types[j]), 688 test_group + "_" + test_types_str[j] + "_" + str(i + 1), 689 test_bench[i], parallel_runs) 690 time.sleep(1) 691 else: 692 results[rows[i]][columns[j]] = -2 693 except Exception: # do not catch SystemExit 694 _printException() 695 696 _printResults(columns, rows, results, operations, parallel_runs)
697
698 -def _runBench(operations, value, name, clazz, parallel_runs):
699 """ 700 Runs the given benchmark. 701 """ 702 key = str(_benchTime) + name 703 results = [-1]*_TESTRUNS 704 705 for i in xrange(_TESTRUNS): 706 worker = [] 707 for thread in xrange(parallel_runs): 708 new_worker = clazz(key + '_' + str(i) + '_' + str(thread), value, operations) 709 worker.append(new_worker) 710 new_worker.start() 711 failed = 0 712 failed = _integrateResults(results, i, worker, failed); 713 if failed >= 3: 714 return -1 715 716 return _getAvgSpeed(results)
717
718 -def _printResults(columns, rows, results, operations, parallel_runs):
719 """ 720 Prints a result table. 721 """ 722 print 'Concurrent threads: ' + str(parallel_runs) + ', each using ' + str(operations) + ' transactions' 723 colLen = 25 724 emptyFirstColumn = ''.join([' ']*colLen) 725 print emptyFirstColumn + '\tspeed (transactions / second)' 726 print emptyFirstColumn, 727 i = 1 728 for column in columns: 729 print '\t(' + str(i) + ')', 730 i += 1 731 print '' 732 for row in rows: 733 print row + ''.join([' ']*(colLen - len(row))), 734 for column in columns: 735 value = results[row][column] 736 if (value == -2): 737 print '\tn/a', 738 elif (value == -1): 739 print '\tfailed', 740 else: 741 print '\t' + str(int(value)), 742 print '' 743 744 i = 1 745 for column in columns: 746 print '(' + str(i) + ') ' + column 747 i += 1 748 sys.stdout.flush()
749
750 -def _printException():
751 mytype, message, trace = sys.exc_info() 752 print str(mytype) + str(message) 753 traceback.print_tb(trace)
754
755 -def run_from_cmd(argv):
756 nr_operations = 500 757 threads_per_node = 10 758 allBenchs = False 759 if (len(argv) == 1): 760 allBenchs = True 761 elif (len(argv) == 2): 762 allBenchs = True 763 nr_operations = int(argv[1]) 764 elif (len(argv) == 3): 765 allBenchs = True 766 nr_operations = int(argv[1]) 767 threads_per_node = int(argv[2]) 768 elif (len(argv) >= 4): 769 nr_operations = int(argv[1]) 770 threads_per_node = int(argv[2]) 771 benchmarks = [] 772 for i in xrange(3, len(argv)): 773 if argv[i] == 'all': 774 allBenchs = True 775 else: 776 benchmarks.append(int(argv[i])) 777 if allBenchs: 778 benchmarks = xrange(1, 19, 1) 779 minibench(nr_operations, threads_per_node, benchmarks)
780 781 if __name__ == "__main__": 782 run_from_cmd(sys.argv) 783