Package SimPy :: Module Simulation
[hide private]
[frames] | no frames]

Source Code for Module SimPy.Simulation

   1  #!/usr/bin/env python 
   2  from SimPy.Lister import * 
   3  import bisect 
   4  import types 
   5  import sys 
   6  import new 
   7  import random 
   8  import inspect 
   9   
  10  # $Revision: 1.1.1.64 $ $Date: 2007/01/18 14:34:11 $ kgm 
  11  """Simulation 1.8 Implements SimPy Processes, Resources, Buffers, and the backbone simulation  
  12  scheduling by coroutine calls. Provides data collection through classes  
  13  Monitor and Tally. 
  14  Based on generators (Python 2.3 and later) 
  15   
  16  LICENSE: 
  17  Copyright (C) 2002,2005,2006,2007  Klaus G. Muller, Tony Vignaux 
  18  mailto: kgmuller@xs4all.nl and Tony.Vignaux@vuw.ac.nz 
  19   
  20      This library is free software; you can redistribute it and/or 
  21      modify it under the terms of the GNU Lesser General Public 
  22      License as published by the Free Software Foundation; either 
  23      version 2.1 of the License, or (at your option) any later version. 
  24   
  25      This library is distributed in the hope that it will be useful, 
  26      but WITHOUT ANY WARRANTY; without even the implied warranty of 
  27      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  28      Lesser General Public License for more details. 
  29   
  30      You should have received a copy of the GNU Lesser General Public 
  31      License along with this library; if not, write to the Free Software 
  32      Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
  33  END OF LICENSE 
  34   
  35  **Change history:** 
  36   
  37      Started out as SiPy 0.9 
  38       
  39      5/9/2002: SiPy 0.9.1 
  40       
  41          - Addition of '_cancel' method in class Process and supporting '_unpost' method in  
  42            class __Evlist. 
  43           
  44          - Removal of redundant 'Action' method in class Process. 
  45           
  46      12/9/2002: 
  47       
  48          - Addition of resource class 
  49           
  50          - Addition of "_request" and "_release" coroutine calls 
  51           
  52      15/9/2002: moved into SimPy package 
  53       
  54      16/9/2002: 
  55          - Resource attributes fully implemented (resources can now have more 
  56            than 1 shareable resource units) 
  57           
  58      17/9/2002: 
  59       
  60          - corrected removal from waitQ (Vignaux) 
  61           
  62      17/9/2002: 
  63       
  64          - added test for queue discipline in "test_demo()". Must be FIFO 
  65           
  66      26/9/02: Version 0.2.0 
  67       
  68          - cleaned up code; more consistent naming 
  69           
  70          - prefixed all Simulation-private variable names with "_". 
  71           
  72          - prefixed all class-private variable names with "__". 
  73           
  74          - made normal exit quiet (but return message from scheduler() 
  75           
  76      28/9/02: 
  77       
  78          - included stopSimulation() 
  79           
  80      15/10/02: Simulation version 0.3 
  81       
  82          - Version printout now only if __TESTING 
  83           
  84          - "_stop" initialized to True by module load, and set to False in  
  85        initialize() 
  86           
  87          - Introduced 'simulate(until=0)' instead of 'scheduler(till=0)'.  
  88        Left 'scheduler()' in for backward compatibility, but marked 
  89        as deprecated. 
  90           
  91          - Added attribute "name" to class Process; default=="a_process" 
  92           
  93          - Changed Resource constructor to  
  94        'def __init__(self,capacity=1,name="a_resource",unitName="units"'. 
  95           
  96      13/11/02: Simulation version 0.6 
  97       
  98          - Major changes to class Resource: 
  99           
 100              - Added two queue types for resources, FIFO (default) and PriorityQ 
 101               
 102              - Changed constructor to allow selection of queue type. 
 103               
 104              - Introduced preemption of resources (to be used with PriorityQ 
 105                queue type) 
 106               
 107              - Changed constructor of class Resource to allow selection of preemption 
 108               
 109              - Changes to class Process to support preemption of service 
 110               
 111              - Cleaned up 'simulate' by replacing series of if-statements by dispatch table. 
 112   
 113      19/11/02: Simulation version 0.6.1 
 114          - Changed priority schemes so that higher values of Process  
 115            attribute "priority" represent higher priority. 
 116   
 117      20/11/02: Simulation version 0.7 
 118          - Major change of priority approach: 
 119   
 120              - Priority set by "yield request,self,res,priority" 
 121   
 122              - Priority of a Process instance associated with a specific  
 123                resource 
 124   
 125      25/11/02: Simulation version 0.7.1 
 126   
 127          - Code cleanup and optimization 
 128   
 129          - Made process attributes remainService and preempted private  
 130           (_remainService and _preempted) 
 131   
 132      11/12/2002: First process interrupt implementation 
 133   
 134          - Addition of user methods 'interrupt' and 'resume' 
 135   
 136          - Significant code cleanup to maintain process state 
 137   
 138      20/12/2002: Changes to "interrupt"; addition of boolean methods to show 
 139                       process states 
 140   
 141      16/3/2003: Changed hold (allowing posting events past _endtime) 
 142       
 143      18/3/2003: Changed _nextev to prevent _t going past _endtime 
 144   
 145      23/3/2003: Introduced new interrupt construct; deleted 'resume' method 
 146       
 147      25/3/2003: Expanded interrupt construct: 
 148       
 149          - Made 'interrupt' a method  of Process 
 150   
 151          - Added 'interruptCause' as an attribute of an interrupted process 
 152   
 153          - Changed definition of 'active' to  
 154           'self._nextTime <> None and not self._inInterrupt' 
 155   
 156          - Cleaned up test_interrupt function 
 157   
 158      30/3/2003: Modification of 'simulate': 
 159   
 160          - error message if 'initialize' not called (fatal) 
 161   
 162          - error message if no process scheduled (warning) 
 163   
 164          - Ensured that upon exit from 'simulate', now() == _endtime is  
 165            always valid 
 166   
 167      2/04/2003: 
 168   
 169          - Modification of 'simulate': leave _endtime alone (undid change 
 170            of 30 Mar 03) 
 171   
 172          - faster '_unpost' 
 173   
 174      3/04/2003: Made 'priority' private ('_priority') 
 175   
 176      4/04/2003: Catch activation of non-generator error 
 177   
 178      5/04/2003: Added 'interruptReset()' function to Process. 
 179   
 180      7/04/2003: Changed '_unpost' to ensure that process has 
 181                     _nextTime == None (is passive) afterwards. 
 182   
 183      8/04/2003: Changed _hold to allow for 'yield hold,self'  
 184                     (equiv to 'yield hold,self,0') 
 185   
 186      10/04/2003: Changed 'cancel' syntax to 'Process().cancel(victim)' 
 187   
 188      12/5/2003: Changed eventlist handling from dictionary to bisect 
 189       
 190      9/6/2003: - Changed eventlist handling from pure dictionary to bisect- 
 191                  sorted "timestamps" list of keys, resulting in greatly  
 192                  improved performance for models with large 
 193                  numbers of event notices with differing event times. 
 194                  ========================================================= 
 195                  This great change was suggested by Prof. Simon Frost.  
 196                  Thank you, Simon! This version 1.3 is dedicated to you! 
 197                  ========================================================= 
 198                - Added import of Lister which supports well-structured  
 199                  printing of all attributes of Process and Resource instances. 
 200   
 201      Oct 2003: Added monitored Resource instances (Monitors for activeQ and waitQ) 
 202   
 203      13 Dec 2003: Merged in Monitor and Histogram 
 204   
 205      27 Feb 2004: Repaired bug in activeQ monitor of class Resource. Now actMon 
 206                   correctly records departures from activeQ. 
 207                    
 208      19 May 2004: Added erroneously omitted Histogram class. 
 209   
 210      5 Sep 2004: Added SimEvents synchronization constructs 
 211       
 212      17 Sep 2004: Added waituntil synchronization construct 
 213       
 214      01 Dec 2004: SimPy version 1.5 
 215                   Changes in this module: Repaired SimEvents bug re proc.eventsFired 
 216                    
 217      12 Jan 2005: SimPy version 1.5.1 
 218                   Changes in this module: Monitor objects now have a default name 
 219                                           'a_Monitor' 
 220                                            
 221      29 Mar 2005: Start SimPy 1.6: compound "yield request" statements 
 222       
 223      05 Jun 2005: Fixed bug in _request method -- waitMon did not work properly in 
 224                   preemption case 
 225                    
 226      09 Jun 2005: Added test in 'activate' to see whether 'initialize()' was called first. 
 227       
 228      23 Aug 2005: - Added Tally data collection class 
 229                   - Adjusted Resource to work with Tally 
 230                   - Redid function allEventNotices() (returns prettyprinted string with event 
 231                     times and names of process instances 
 232                   - Added function allEventTimes (returns event times of all scheduled events) 
 233                    
 234      16 Mar 2006: - Added Store and Level classes 
 235                   - Added 'yield get' and 'yield put' 
 236                    
 237      10 May 2006: - Repaired bug in Store._get method 
 238                   - Repaired Level to allow initialBuffered have float value 
 239                   - Added type test for Level get parameter 'nrToGet' 
 240                    
 241      06 Jun 2006: - To improve pretty-printed output of 'Level' objects, changed attribute 
 242                     _nrBuffered to nrBuffered (synonym for amount property) 
 243                   - To improve pretty-printed output of 'Store' objects, added attribute 
 244                     buffered (which refers to _theBuffer) 
 245                      
 246      25 Aug 2006: - Start of version 1.8 
 247                   - made 'version' public 
 248                   - corrected condQ initialization bug 
 249                    
 250      30 Sep 2006: - Introduced checks to ensure capacity of a Buffer > 0 
 251                   - Removed from __future__ import (so Python 2.3 or later needed) 
 252                   
 253      15 Oct 2006: - Added code to register all Monitors and all Tallies in variables 
 254                     'allMonitors' and 'allTallies' 
 255                   - Added function 'startCollection' to activate Monitors and Tallies at a 
 256                     specified time (e.g. after warmup period) 
 257                   - Moved all test/demo programs to after 'if __name__=="__main__":'. 
 258                   
 259      17 Oct 2006: - Added compound 'put' and 'get' statements for Level and Store. 
 260       
 261      18 Oct 2006: - Repaired bug: self.eventsFired now gets set after an event fires 
 262                     in a compound yield get/put with a waitevent clause (reneging case). 
 263                      
 264      21 Oct 2006: - Introduced Store 'yield get' with a filter function. 
 265                   
 266      22 Oct 2006: - Repaired bug in prettyprinting of Store objects (the buffer  
 267                     content==._theBuffer was not shown) by changing ._theBuffer  
 268                     to .theBuffer. 
 269                   
 270      04 Dec 2006: - Added printHistogram method to Tally and Monitor (generates 
 271                     table-form histogram) 
 272                       
 273      07 Dec 2006: - Changed the __str__ method of Histogram to print a table (like printHistogram) 
 274  """ 
 275   
 276  __TESTING=False 
 277  version=__version__="1.8 $Revision: 1.1.1.64 $ $Date: 2007/01/18 14:34:11 $" 
 278  if __TESTING:  
 279      print "SimPy.Simulation %s" %__version__, 
 280      if __debug__: 
 281          print "__debug__ on" 
 282      else: 
 283          print 
 284   
 285  # yield keywords 
 286  hold=1 
 287  passivate=2 
 288  request=3 
 289  release=4 
 290  waitevent=5 
 291  queueevent=6 
 292  waituntil=7 
 293  get=8 
 294  put=9 
 295   
 296  _endtime=0 
 297  _t=0 
 298  _e=None 
 299  _stop=True 
 300  _wustep=False #controls per event stepping for waituntil construct; not for user API 
 301  True=1 
 302  False=0 
 303  condQ=[] 
 304  allMonitors=[] 
 305  allTallies=[] 
 306   
307 -def initialize():
308 global _e,_t,_stop,condQ,allMonitors,allTallies 309 _e=__Evlist() 310 _t=0 311 _stop=False 312 condQ=[] 313 allMonitors=[] 314 allTallies=[]
315
316 -def now():
317 return _t
318
319 -def stopSimulation():
320 """Application function to stop simulation run""" 321 global _stop 322 _stop=True
323
324 -def _startWUStepping():
325 """Application function to start stepping through simulation for waituntil construct.""" 326 global _wustep 327 _wustep=True
328
329 -def _stopWUStepping():
330 """Application function to stop stepping through simulation.""" 331 global _wustep 332 _wustep=False
333
334 -class Simerror(Exception):
335 - def __init__(self,value):
336 self.value=value
337
338 - def __str__(self):
339 return `self.value`
340
341 -class FatalSimerror(Simerror):
342 - def __init__(self,value):
343 Simerror.__init__(self,value) 344 self.value=value
345
346 -class Process(Lister):
347 """Superclass of classes which may use generator functions"""
348 - def __init__(self,name="a_process"):
349 #the reference to this Process instances single process (==generator) 350 self._nextpoint=None 351 self.name=name 352 self._nextTime=None #next activation time 353 self._remainService=0 354 self._preempted=0 355 self._priority={} 356 self._getpriority={} 357 self._putpriority={} 358 self._terminated= False 359 self._inInterrupt= False 360 self.eventsFired=[] #events waited/queued for occurred
361
362 - def active(self):
363 return self._nextTime <> None and not self._inInterrupt
364
365 - def passive(self):
366 return self._nextTime is None and not self._terminated
367
368 - def terminated(self):
369 return self._terminated
370
371 - def interrupted(self):
372 return self._inInterrupt and not self._terminated
373
374 - def queuing(self,resource):
375 return self in resource.waitQ
376 # TODO: Review whether this should include waiting for a buffer; or throw away?
377 - def cancel(self,victim):
378 """Application function to cancel all event notices for this Process instance;(should be all event 379 notices for the _generator_).""" 380 _e._unpost(whom=victim)
381
382 - def _hold(self,a):
383 if len(a[0]) == 3: 384 delay=abs(a[0][2]) 385 else: 386 delay=0 387 who=a[1] 388 self.interruptLeft=delay 389 self._inInterrupt=False 390 self.interruptCause=None 391 _e._post(what=who,at=_t+delay)
392
393 - def _passivate(self,a):
394 a[0][1]._nextTime=None
395
396 - def interrupt(self,victim):
397 """Application function to interrupt active processes""" 398 # can't interrupt terminated/passive/interrupted process 399 if victim.active(): 400 victim.interruptCause=self # self causes interrupt 401 left=victim._nextTime-_t 402 victim.interruptLeft=left # time left in current 'hold' 403 victim._inInterrupt=True 404 reactivate(victim) 405 return left 406 else: #victim not active -- can't interrupt 407 return None
408
409 - def interruptReset(self):
410 """ 411 Application function for an interrupt victim to get out of 412 'interrupted' state. 413 """ 414 self._inInterrupt= False
415
416 - def acquired(self,res):
417 """Multi-functional test for reneging for 'request' and 'get': 418 (1)If res of type Resource: 419 Tests whether resource res was acquired when proces reactivated. 420 If yes, the parallel wakeup process is killed. 421 If not, process is removed from res.waitQ (reneging). 422 (2)If res of type Store: 423 Tests whether item(s) gotten from Store res. 424 If yes, the parallel wakeup process is killed. 425 If no, process is removed from res.getQ 426 (3)If res of type Level: 427 Tests whether units gotten from Level res. 428 If yes, the parallel wakeup process is killed. 429 If no, process is removed from res.getQ. 430 """ 431 if isinstance(res,Resource): 432 test=self in res.activeQ 433 if test: 434 self.cancel(self._holder) 435 else: 436 res.waitQ.remove(self) 437 if res.monitored: 438 res.waitMon.observe(len(res.waitQ),t=now()) 439 return test 440 elif isinstance(res,Store): 441 test=len(self.got) 442 if test: 443 self.cancel(self._holder) 444 self.cancel(self._holder) 445 else: 446 res.getQ.remove(self) 447 if res.monitored: 448 res.getQMon.observe(len(res.getQ),t=now()) 449 return test 450 elif isinstance(res,Level): 451 test=not (self.got is None) 452 if test: 453 self.cancel(self._holder) 454 else: 455 res.getQ.remove(self) 456 if res.monitored: 457 res.getQMon.observe(len(res.getQ),t=now()) 458 return test
459
460 - def stored(self,buffer):
461 """Test for reneging for 'yield put . . .' compound statement (Level and 462 Store. Returns True if not reneged. 463 If self not in buffer.putQ, kill wakeup process, else take self out of 464 buffer.putQ (reneged)""" 465 test=self in buffer.putQ 466 if test: #reneged 467 buffer.putQ.remove(self) 468 if buffer.monitored: 469 buffer.putQMon.observe(len(buffer.putQ),t=now()) 470 else: 471 self.cancel(self._holder) 472 return not test
473
474 -def allEventNotices():
475 """Returns string with eventlist as 476 t1: [procname,procname2] 477 t2: [procname4,procname5, . . . ] 478 . . . . 479 """ 480 ret="" 481 for t in _e.timestamps: 482 ret+="%s:%s\n"%(t,[ev.who.name for ev in _e.events[t]]) 483 return ret[:-1]
484
485 -def allEventTimes():
486 """Returns list of all times for which events are scheduled. 487 """ 488 return _e.timestamps
489
490 -class __Evlist:
491 """Defines event list and operations on it"""
492 - def __init__(self):
493 #has the structure {<time1>:(ev notice1, ev notice2, . . ), 494 # <time2>:(ev notice 3,...),..} 495 self.events={} 496 #always sorted list of event times (keys for self.events) 497 self.timestamps=[]
498
499 - def _post(self,what,at,prior=False):
500 """Post an event notice for process what for time at""" 501 # event notices are _Action instances 502 if at < _t: 503 raise Simerror("Attempt to schedule event in the past") 504 if at in self.events: 505 if prior: 506 #before all other event notices at this time 507 self.events[at][0:0]= [what] 508 else: 509 self.events[at].append(what) 510 else: # first event notice at this time 511 self.events[at]=[what] 512 bisect.insort(self.timestamps,at) 513 what.who._nextTime=at
514
515 - def _unpost(self,whom):
516 """ 517 Search through all event notices at whom's event time and remove whom's 518 event notice if whom is a suspended process 519 """ 520 thistime=whom._nextTime 521 if thistime is None: #check if whom was actually active 522 return 523 else: 524 thislist=self.events[thistime] 525 for n in thislist: 526 if n.who==whom: 527 self.events[thistime].remove(n) 528 whom._nextTime = None 529 if not self.events[thistime]: 530 # no other ev notices for thistime 531 del(self.events[thistime]) 532 item_delete_point = bisect.bisect(self.timestamps, 533 thistime) 534 del self.timestamps[item_delete_point-1]
535
536 - def _nextev(self):
537 """Retrieve next event from event list""" 538 global _t, _stop 539 if not self.events: raise Simerror("No more events at time %s" %now()) 540 earliest=self.timestamps[0] 541 _t=max(earliest,_t) 542 temp=self.events[earliest] #list of actions for this time _t 543 tempev=temp[0] #first action 544 del(self.events[earliest][0]) 545 if self.events[earliest]==[]: 546 del(self.events[earliest]) #delete empty list of actions 547 del(self.timestamps[0]) 548 if _t > _endtime: 549 _t = _endtime 550 _stop = True 551 return (None,) 552 tempwho=tempev.who 553 try: 554 tt=tempwho._nextpoint.next() 555 except StopIteration: 556 tempwho._nextpoint=None 557 tempwho._terminated=True 558 tempwho._nextTime=None 559 tt=None 560 tempev=tempwho 561 return tt,tempev
562
563 -class _Action:
564 """Structure (who=process owner, generator=process)"""
565 - def __init__(self,who,generator):
566 self.who=who 567 self.process=generator
568
569 - def __str__(self):
570 return "Action %s %s" %((self.who.name,self.process))
571
572 -def activate(object,process,at="undefined",delay="undefined",prior=False):
573 """Application function to activate passive process.""" 574 if _e is None: 575 raise FatalSimerror\ 576 ("Fatal error: simulation is not initialized (call initialize() first)") 577 if not (type(process) == types.GeneratorType): 578 raise FatalSimerror("Activating function which"+ 579 " is not a generator (contains no 'yield')") 580 if not object._terminated and not object._nextTime: 581 #store generator reference in object; needed for reactivation 582 object._nextpoint=process 583 if at=="undefined": 584 at=_t 585 if delay=="undefined": 586 zeit=max(_t,at) 587 else: 588 zeit=max(_t,_t+delay) 589 _e._post(_Action(who=object,generator=process),at=zeit,prior=prior)
590
591 -def reactivate(obj,at="undefined",delay="undefined",prior=False):
592 """Application function to reactivate a process which is active, 593 suspended or passive.""" 594 # Object may be active, suspended or passive 595 if not obj._terminated: 596 a=Process("SimPysystem") 597 a.cancel(obj) 598 # object now passive 599 if at=="undefined": 600 at=_t 601 if delay=="undefined": 602 zeit=max(_t,at) 603 else: 604 zeit=max(_t,_t+delay) 605 _e._post(_Action(who=obj,generator=obj._nextpoint),at=zeit, 606 prior=prior)
607
608 -class Histogram(list):
609 """ A histogram gathering and sampling class""" 610
611 - def __init__(self,name = '',low=0.0,high=100.0,nbins=10):
612 list.__init__(self) 613 self.name = name 614 self.low = float(low) 615 self.high = float(high) 616 self.nbins = nbins 617 self.binsize=(self.high-self.low)/nbins 618 self._nrObs=0 619 self._sum=0 620 self[:] =[[low+(i-1)*self.binsize,0] for i in range(self.nbins+2)]
621
622 - def addIn(self,y):
623 """ add a value into the correct bin""" 624 self._nrObs+=1 625 self._sum+=y 626 b = int((y-self.low+self.binsize)/self.binsize) 627 if b < 0: b = 0 628 if b > self.nbins+1: b = self.nbins+1 629 assert 0 <= b <=self.nbins+1,'Histogram.addIn: b out of range: %s'%b 630 self[b][1]+=1
631
632 - def __str__(self):
633 histo=self 634 ylab="value" 635 nrObs=self._nrObs 636 width=len(str(nrObs)) 637 res=[] 638 res.append("<Histogram %s:"%self.name) 639 res.append("\nNumber of observations: %s"%nrObs) 640 if nrObs: 641 su=self._sum 642 cum=histo[0][1] 643 fmt="%s" 644 line="\n%s <= %s < %s: %s (cum: %s/%s%s)"\ 645 %(fmt,"%s",fmt,"%s","%s","%5.1f","%s") 646 line1="\n%s%s < %s: %s (cum: %s/%s%s)"\ 647 %("%s","%s",fmt,"%s","%s","%5.1f","%s") 648 l1width=len(("%s <= "%fmt)%histo[1][0]) 649 res.append(line1\ 650 %(" "*l1width,ylab,histo[1][0],str(histo[0][1]).rjust(width),\ 651 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 652 ) 653 for i in range(1,len(histo)-1): 654 cum+=histo[i][1] 655 res.append(line\ 656 %(histo[i][0],ylab,histo[i+1][0],str(histo[i][1]).rjust(width),\ 657 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 658 ) 659 cum+=histo[-1][1] 660 linen="\n%s <= %s %s : %s (cum: %s/%s%s)"\ 661 %(fmt,"%s","%s","%s","%s","%5.1f","%s") 662 lnwidth=len(("<%s"%fmt)%histo[1][0]) 663 res.append(linen\ 664 %(histo[-1][0],ylab," "*lnwidth,str(histo[-1][1]).rjust(width),\ 665 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 666 ) 667 res.append("\n>") 668 return " ".join(res)
669
670 -def startCollection(when=0.0,monitors=None,tallies=None):
671 """Starts data collection of all designated Monitor and Tally objects 672 (default=all) at time 'when'. 673 """ 674 class Starter(Process): 675 def collect(self,monitors,tallies): 676 for m in monitors: 677 print m.name 678 m.reset() 679 for t in tallies: 680 t.reset() 681 yield hold,self
682 if monitors is None: 683 monitors=allMonitors 684 if tallies is None: 685 tallies=allTallies 686 s=Starter() 687 activate(s,s.collect(monitors=monitors,tallies=tallies),at=when) 688
689 -class Monitor(list):
690 """ Monitored variables 691 692 A Class for monitored variables, that is, variables that allow one 693 to gather simple statistics. A Monitor is a subclass of list and 694 list operations can be performed on it. An object is established 695 using m= Monitor(name = '..'). It can be given a 696 unique name for use in debugging and in tracing and ylab and tlab 697 strings for labelling graphs. 698 """
699 - def __init__(self,name='a_Monitor',ylab='y',tlab='t'):
700 list.__init__(self) 701 self.startTime = 0.0 702 self.name = name 703 self.ylab = ylab 704 self.tlab = tlab 705 allMonitors.append(self)
706
707 - def setHistogram(self,name = '',low=0.0,high=100.0,nbins=10):
708 """Sets histogram parameters. 709 Must be called before call to getHistogram""" 710 if name=='': 711 histname=self.name 712 else: 713 histname=name 714 self.histo=Histogram(name=histname,low=low,high=high,nbins=nbins)
715
716 - def observe(self,y,t=None):
717 """record y and t""" 718 if t is None: t = now() 719 self.append([t,y])
720
721 - def tally(self,y):
722 """ deprecated: tally for backward compatibility""" 723 self.observe(y,0)
724
725 - def accum(self,y,t=None):
726 """ deprecated: accum for backward compatibility""" 727 self.observe(y,t)
728
729 - def reset(self,t=None):
730 """reset the sums and counts for the monitored variable """ 731 self[:]=[] 732 if t is None: t = now() 733 self.startTime = t
734
735 - def tseries(self):
736 """ the series of measured times""" 737 return list(zip(*self)[0])
738
739 - def yseries(self):
740 """ the series of measured values""" 741 return list(zip(*self)[1])
742
743 - def count(self):
744 """ deprecated: the number of observations made """ 745 return self.__len__()
746
747 - def total(self):
748 """ the sum of the y""" 749 if self.__len__()==0: return 0 750 else: 751 sum = 0.0 752 for i in range(self.__len__()): 753 sum += self[i][1] 754 return sum # replace by sum() later
755
756 - def mean(self):
757 """ the simple average of the monitored variable""" 758 try: return 1.0*self.total()/self.__len__() 759 except: print 'SimPy: No observations for mean'
760
761 - def var(self):
762 """ the sample variance of the monitored variable """ 763 n = len(self) 764 tot = self.total() 765 ssq=0.0 766 ##yy = self.yseries() 767 for i in range(self.__len__()): 768 ssq += self[i][1]**2 # replace by sum() eventually 769 try: return (ssq - float(tot*tot)/n)/n 770 except: print 'SimPy: No observations for sample variance'
771
772 - def timeAverage(self,t=None):
773 """ the time-average of the monitored variable. 774 775 If t is used it is assumed to be the current time, 776 otherwise t = now() 777 """ 778 N = self.__len__() 779 if N == 0: 780 print 'SimPy: No observations for timeAverage' 781 return None 782 783 if t is None: t = now() 784 sum = 0.0 785 tlast = self.startTime 786 #print 'DEBUG: timave ',t,tlast 787 ylast = 0.0 788 for i in range(N): 789 ti,yi = self[i] 790 sum += ylast*(ti-tlast) 791 tlast = ti 792 ylast = yi 793 sum += ylast*(t-tlast) 794 T = t - self.startTime 795 if T == 0: 796 print 'SimPy: No elapsed time for timeAverage' 797 return None 798 #print 'DEBUG: timave ',sum,t,T 799 return sum/float(T)
800
801 - def histogram(self,low=0.0,high=100.0,nbins=10):
802 """ A histogram of the monitored y data values. 803 """ 804 h = Histogram(name=self.name,low=low,high=high,nbins=nbins) 805 ys = self.yseries() 806 for y in ys: h.addIn(y) 807 return h
808
809 - def getHistogram(self):
810 """Returns a histogram based on the parameters provided in 811 preceding call to setHistogram. 812 """ 813 ys = self.yseries() 814 h=self.histo 815 for y in ys: h.addIn(y) 816 return h
817
818 - def printHistogram(self,fmt="%s"):
819 """Returns formatted frequency distribution table string from Monitor. 820 Precondition: setHistogram must have been called. 821 fmt==format of bin range values 822 """ 823 try: 824 histo=self.getHistogram() 825 except: 826 raise FatalSimerror("histogramTable: call setHistogram first"\ 827 " for Monitor %s"%self.name) 828 ylab=self.ylab 829 nrObs=self.count() 830 width=len(str(nrObs)) 831 res=[] 832 res.append("\nHistogram for %s:"%histo.name) 833 res.append("\nNumber of observations: %s"%nrObs) 834 su=sum(self.yseries()) 835 cum=histo[0][1] 836 line="\n%s <= %s < %s: %s (cum: %s/%s%s)"\ 837 %(fmt,"%s",fmt,"%s","%s","%5.1f","%s") 838 line1="\n%s%s < %s: %s (cum: %s/%s%s)"\ 839 %("%s","%s",fmt,"%s","%s","%5.1f","%s") 840 l1width=len(("%s <= "%fmt)%histo[1][0]) 841 res.append(line1\ 842 %(" "*l1width,ylab,histo[1][0],str(histo[0][1]).rjust(width),\ 843 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 844 ) 845 for i in range(1,len(histo)-1): 846 cum+=histo[i][1] 847 res.append(line\ 848 %(histo[i][0],ylab,histo[i+1][0],str(histo[i][1]).rjust(width),\ 849 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 850 ) 851 cum+=histo[-1][1] 852 linen="\n%s <= %s %s : %s (cum: %s/%s%s)"\ 853 %(fmt,"%s","%s","%s","%s","%5.1f","%s") 854 lnwidth=len(("<%s"%fmt)%histo[1][0]) 855 res.append(linen\ 856 %(histo[-1][0],ylab," "*lnwidth,str(histo[-1][1]).rjust(width),\ 857 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 858 ) 859 return " ".join(res)
860
861 -class Tally:
862 - def __init__(self, name="a_Tally", ylab="y",tlab="t"):
863 self.name = name 864 self.ylab = ylab 865 self.tlab = tlab 866 self.reset() 867 self.startTime=0.0 868 self.histo=None 869 self._sum_of_squares=0 870 allTallies.append(self)
871
872 - def setHistogram(self,name = '',low=0.0,high=100.0,nbins=10):
873 """Sets histogram parameters. 874 Must be called to prior to observations initiate data collection 875 for histogram. 876 """ 877 if name=='': 878 hname=self.name 879 else: 880 hname=name 881 self.histo=Histogram(name=hname,low=low,high=high,nbins=nbins)
882
883 - def observe(self, y, t=None):
884 if t is None: 885 t = now() 886 self._total += y 887 self._count += 1 888 self._integral += (t - self._last_timestamp) * self._last_observation 889 self._last_timestamp = t 890 self._last_observation = y 891 self._sum += y 892 self._sum_of_squares += y * y 893 if self.histo: 894 self.histo.addIn(y)
895
896 - def reset(self, t=None):
897 if t is None: 898 t = now() 899 self.startTime = t 900 self._last_timestamp = t 901 self._last_observation = 0.0 902 self._count = 0 903 self._total = 0.0 904 self._integral = 0.0 905 self._sum = 0.0 906 self._sum_of_squares = 0.0
907
908 - def count(self):
909 return self._count
910
911 - def total(self):
912 return self._total
913
914 - def mean(self):
915 return 1.0 * self._total / self._count
916
917 - def timeAverage(self,t=None):
918 if t is None: 919 t=now() 920 integ=self._integral+(t - self._last_timestamp) * self._last_observation 921 if (t > self.startTime): 922 return 1.0 * integ/(t - self.startTime) 923 else: 924 print 'SimPy: No elapsed time for timeAverage' 925 return None
926
927 - def var(self):
928 return 1.0 * (self._sum_of_squares - (1.0 * (self._sum * self._sum)\ 929 / self._count)) / (self._count)
930
931 - def __len__(self):
932 return self._count
933
934 - def __eq__(self, l):
935 return len(l) == self._count
936
937 - def getHistogram(self):
938 return self.histo
939
940 - def printHistogram(self,fmt="%s"):
941 """Returns formatted frequency distribution table string from Tally. 942 Precondition: setHistogram must have been called. 943 fmt==format of bin range values 944 """ 945 try: 946 histo=self.getHistogram() 947 except: 948 raise FatalSimerror("histogramTable: call setHistogram first"\ 949 " for Tally %s"%self.name) 950 ylab=self.ylab 951 nrObs=self.count() 952 width=len(str(nrObs)) 953 res=[] 954 res.append("\nHistogram for %s:"%histo.name) 955 res.append("\nNumber of observations: %s"%nrObs) 956 su=self.total() 957 cum=histo[0][1] 958 line="\n%s <= %s < %s: %s (cum: %s/%s%s)"\ 959 %(fmt,"%s",fmt,"%s","%s","%5.1f","%s") 960 line1="\n%s%s < %s: %s (cum: %s/%s%s)"\ 961 %("%s","%s",fmt,"%s","%s","%5.1f","%s") 962 l1width=len(("%s <= "%fmt)%histo[1][0]) 963 res.append(line1\ 964 %(" "*l1width,ylab,histo[1][0],str(histo[0][1]).rjust(width),\ 965 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 966 ) 967 for i in range(1,len(histo)-1): 968 cum+=histo[i][1] 969 res.append(line\ 970 %(histo[i][0],ylab,histo[i+1][0],str(histo[i][1]).rjust(width),\ 971 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 972 ) 973 cum+=histo[-1][1] 974 linen="\n%s <= %s %s : %s (cum: %s/%s%s)"\ 975 %(fmt,"%s","%s","%s","%s","%5.1f","%s") 976 lnwidth=len(("<%s"%fmt)%histo[1][0]) 977 res.append(linen\ 978 %(histo[-1][0],ylab," "*lnwidth,str(histo[-1][1]).rjust(width),\ 979 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 980 ) 981 return " ".join(res)
982
983 -class Queue(list):
984 - def __init__(self,res,moni):
985 if not moni is None: #moni==[]: 986 self.monit=True # True if a type of Monitor/Tally attached 987 else: 988 self.monit=False 989 self.moni=moni # The Monitor/Tally 990 self.resource=res # the resource/buffer this queue belongs to
991
992 - def enter(self,obj):
993 pass
994
995 - def leave(self):
996 pass
997
998 - def takeout(self,obj):
999 self.remove(obj) 1000 if self.monit: 1001 self.moni.observe(len(self),t=now())
1002
1003 -class FIFO(Queue):
1004 - def __init__(self,res,moni):
1005 Queue.__init__(self,res,moni)
1006
1007 - def enter(self,obj):
1008 self.append(obj) 1009 if self.monit: 1010 self.moni.observe(len(self),t=now())
1011
1012 - def enterGet(self,obj):
1013 self.enter(obj)
1014
1015 - def enterPut(self,obj):
1016 self.enter(obj)
1017
1018 - def leave(self):
1019 a= self.pop(0) 1020 if self.monit: 1021 self.moni.observe(len(self),t=now()) 1022 return a
1023
1024 -class PriorityQ(FIFO):
1025 """Queue is always ordered according to priority. 1026 Higher value of priority attribute == higher priority. 1027 """
1028 - def __init__(self,res,moni):
1029 FIFO.__init__(self,res,moni)
1030
1031 - def enter(self,obj):
1032 """Handles request queue for Resource""" 1033 if len(self): 1034 ix=self.resource 1035 if self[-1]._priority[ix] >= obj._priority[ix]: 1036 self.append(obj) 1037 else: 1038 z=0 1039 while self[z]._priority[ix] >= obj._priority[ix]: 1040 z += 1 1041 self.insert(z,obj) 1042 else: 1043 self.append(obj) 1044 if self.monit: 1045 self.moni.observe(len(self),t=now())
1046
1047 - def enterGet(self,obj):
1048 """Handles getQ in Buffer""" 1049 if len(self): 1050 ix=self.resource 1051 #print "priority:",[x._priority[ix] for x in self] 1052 if self[-1]._getpriority[ix] >= obj._getpriority[ix]: 1053 self.append(obj) 1054 else: 1055 z=0 1056 while self[z]._getpriority[ix] >= obj._getpriority[ix]: 1057 z += 1 1058 self.insert(z,obj) 1059 else: 1060 self.append(obj) 1061 if self.monit: 1062 self.moni.observe(len(self),t=now())
1063
1064 - def enterPut(self,obj):
1065 """Handles putQ in Buffer""" 1066 if len(self): 1067 ix=self.resource 1068 #print "priority:",[x._priority[ix] for x in self] 1069 if self[-1]._putpriority[ix] >= obj._putpriority[ix]: 1070 self.append(obj) 1071 else: 1072 z=0 1073 while self[z]._putpriority[ix] >= obj._putpriority[ix]: 1074 z += 1 1075 self.insert(z,obj) 1076 else: 1077 self.append(obj) 1078 if self.monit: 1079 self.moni.observe(len(self),t=now())
1080
1081 -class Resource(Lister):
1082 """Models shared, limited capacity resources with queuing; 1083 FIFO is default queuing discipline. 1084 """ 1085
1086 - def __init__(self,capacity=1,name="a_resource",unitName="units", 1087 qType=FIFO,preemptable=0,monitored=False,monitorType=Monitor):
1088 """ 1089 monitorType={Monitor(default)|Tally} 1090 """ 1091 self.name=name # resource name 1092 self.capacity=capacity # resource units in this resource 1093 self.unitName=unitName # type name of resource units 1094 self.n=capacity # uncommitted resource units 1095 self.monitored=monitored 1096 1097 if self.monitored: # Monitor waitQ, activeQ 1098 self.actMon=monitorType(name="Active Queue Monitor %s"%self.name, 1099 ylab="nr in queue",tlab="time") 1100 monact=self.actMon 1101 self.waitMon=monitorType(name="Wait Queue Monitor %s"%self.name, 1102 ylab="nr in queue",tlab="time") 1103 monwait=self.waitMon 1104 else: 1105 monwait=None 1106 monact=None 1107 self.waitQ=qType(self,monwait) 1108 self.preemptable=preemptable 1109 self.activeQ=qType(self,monact) 1110 self.priority_default=0
1111
1112 - def _request(self,arg):
1113 """Process request event for this resource""" 1114 object=arg[1].who 1115 if len(arg[0]) == 4: # yield request,self,resource,priority 1116 object._priority[self]=arg[0][3] 1117 else: # yield request,self,resource 1118 object._priority[self]=self.priority_default 1119 if self.preemptable and self.n == 0: # No free resource 1120 # test for preemption condition 1121 preempt=object._priority[self] > self.activeQ[-1]._priority[self] 1122 # If yes: 1123 if preempt: 1124 z=self.activeQ[-1] 1125 # suspend lowest priority process being served 1126 ##suspended = z 1127 # record remaining service time 1128 z._remainService = z._nextTime - _t 1129 Process().cancel(z) 1130 # remove from activeQ 1131 self.activeQ.remove(z) 1132 # put into front of waitQ 1133 self.waitQ.insert(0,z) 1134 # if self is monitored, update waitQ monitor 1135 if self.monitored: 1136 self.waitMon.observe(len(self.waitQ),now()) 1137 # record that it has been preempted 1138 z._preempted = 1 1139 # passivate re-queued process 1140 z._nextTime=None 1141 # assign resource unit to preemptor 1142 self.activeQ.enter(object) 1143 # post event notice for preempting process 1144 _e._post(_Action(who=object,generator=object._nextpoint), 1145 at=_t,prior=1) 1146 else: 1147 self.waitQ.enter(object) 1148 # passivate queuing process 1149 object._nextTime=None 1150 else: # treat non-preemption case 1151 if self.n == 0: 1152 self.waitQ.enter(object) 1153 # passivate queuing process 1154 object._nextTime=None 1155 else: 1156 self.n -= 1 1157 self.activeQ.enter(object) 1158 _e._post(_Action(who=object,generator=object._nextpoint), 1159 at=_t,prior=1)
1160
1161 - def _release(self,arg):
1162 """Process release request for this resource""" 1163 self.n += 1 1164 self.activeQ.remove(arg[1].who) 1165 if self.monitored: 1166 self.actMon.observe(len(self.activeQ),t=now()) 1167 #reactivate first waiting requestor if any; assign Resource to it 1168 if self.waitQ: 1169 object=self.waitQ.leave() 1170 self.n -= 1 #assign 1 resource unit to object 1171 self.activeQ.enter(object) 1172 # if resource preemptable: 1173 if self.preemptable: 1174 # if object had been preempted: 1175 if object._preempted: 1176 object._preempted = 0 1177 # reactivate object delay= remaining service time 1178 reactivate(object,delay=object._remainService) 1179 # else reactivate right away 1180 else: 1181 reactivate(object,delay=0,prior=1) 1182 # else: 1183 else: 1184 reactivate(object,delay=0,prior=1) 1185 _e._post(_Action(who=arg[1].who,generator=arg[1].who._nextpoint), 1186 at=_t,prior=1)
1187
1188 -class Buffer(Lister):
1189 """Abstract class for buffers 1190 Blocks a process when a put would cause buffer overflow or a get would cause 1191 buffer underflow. 1192 Default queuing discipline for blocked processes is FIFO.""" 1193 1194 priorityDefault=0
1195 - def __init__(self,name=None,capacity="unbounded",unitName="units", 1196 putQType=FIFO,getQType=FIFO, 1197 monitored=False,monitorType=Monitor,initialBuffered=None):
1198 if capacity=="unbounded": capacity=sys.maxint 1199 self.capacity=capacity 1200 self.name=name 1201 self.putQType=putQType 1202 self.getQType=getQType 1203 self.monitored=monitored 1204 self.initialBuffered=initialBuffered 1205 self.unitName=unitName 1206 if self.monitored: 1207 ## monitor for Producer processes' queue 1208 self.putQMon=monitorType(name="Producer Queue Monitor %s"%self.name, 1209 ylab="nr in queue",tlab="time") 1210 ## monitor for Consumer processes' queue 1211 self.getQMon=monitorType(name="Consumer Queue Monitor %s"%self.name, 1212 ylab="nr in queue",tlab="time") 1213 ## monitor for nr items in buffer 1214 self.bufferMon=monitorType(name="Buffer Monitor %s"%self.name, 1215 ylab="nr in buffer",tlab="time") 1216 else: 1217 self.putQMon=None 1218 self.getQMon=None 1219 self.bufferMon=None 1220 self.putQ=self.putQType(res=self,moni=self.putQMon) 1221 self.getQ=self.getQType(res=self,moni=self.getQMon) 1222 if self.monitored: 1223 self.putQMon.observe(y=len(self.putQ),t=now()) 1224 self.getQMon.observe(y=len(self.getQ),t=now()) 1225 self._putpriority={} 1226 self._getpriority={} 1227 1228 def _put(self): 1229 pass
1230 def _get(self): 1231 pass
1232
1233 -class Level(Buffer):
1234 """Models buffers for processes putting/getting un-distinguishable items. 1235 """
1236 - def getamount(self):
1237 return self.nrBuffered
1238
1239 - def gettheBuffer(self):
1240 return self.nrBuffered
1241 1242 theBuffer=property(gettheBuffer) 1243
1244 - def __init__(self,**pars):
1245 Buffer.__init__(self,**pars) 1246 if self.name is None: 1247 self.name="a_level" ## default name 1248 1249 if (type(self.capacity)!=type(1.0) and\ 1250 type(self.capacity)!=type(1)) or\ 1251 self.capacity<=0: 1252 raise FatalSimerror\ 1253 ("Level: capacity parameter not a positive number > 0: %s"\ 1254 %self.initialBuffered) 1255 1256 if type(self.initialBuffered)==type(1.0) or\ 1257 type(self.initialBuffered)==type(1): 1258 if self.initialBuffered>self.capacity: 1259 raise FatalSimerror("initialBuffered exceeds capacity") 1260 if self.initialBuffered>=0: 1261 self.nrBuffered=self.initialBuffered ## nr items initially in buffer 1262 ## buffer is just a counter (int type) 1263 else: 1264 raise FatalSimerror\ 1265 ("initialBuffered param of Level negative: %s"\ 1266 %self.initialBuffered) 1267 elif self.initialBuffered is None: 1268 self.initialBuffered=0 1269 self.nrBuffered=0 1270 else: 1271 raise FatalSimerror\ 1272 ("Level: wrong type of initialBuffered (parameter=%s)"\ 1273 %self.initialBuffered) 1274 if self.monitored: 1275 self.bufferMon.observe(y=self.amount,t=now())
1276 amount=property(getamount) 1277
1278 - def _put(self,arg):
1279 """Handles put requests for Level instances""" 1280 obj=arg[1].who 1281 if len(arg[0]) == 5: # yield put,self,buff,whattoput,priority 1282 obj._putpriority[self]=arg[0][4] 1283 whatToPut=arg[0][3] 1284 elif len(arg[0]) == 4: # yield get,self,buff,whattoput 1285 obj._putpriority[self]=Buffer.priorityDefault #default 1286 whatToPut=arg[0][3] 1287 else: # yield get,self,buff 1288 obj._putpriority[self]=Buffer.priorityDefault #default 1289 whatToPut=1 1290 if type(whatToPut)!=type(1) and type(whatToPut)!=type(1.0): 1291 raise FatalSimerror("Level: put parameter not a number") 1292 if not whatToPut>=0.0: 1293 raise FatalSimerror("Level: put parameter not positive number") 1294 whatToPutNr=whatToPut 1295 if whatToPutNr+self.amount>self.capacity: 1296 obj._nextTime=None #passivate put requestor 1297 obj._whatToPut=whatToPutNr 1298 self.putQ.enterPut(obj) #and queue, with size of put 1299 else: 1300 self.nrBuffered+=whatToPutNr 1301 if self.monitored: 1302 self.bufferMon.observe(y=self.amount,t=now()) 1303 # service any getters waiting 1304 # service in queue-order; do not serve second in queue before first 1305 # has been served 1306 while len(self.getQ) and self.amount>0: 1307 proc=self.getQ[0] 1308 if proc._nrToGet<=self.amount: 1309 proc.got=proc._nrToGet 1310 self.nrBuffered-=proc.got 1311 if self.monitored: 1312 self.bufferMon.observe(y=self.amount,t=now()) 1313 self.getQ.takeout(proc) # get requestor's record out of queue 1314 _e._post(_Action(who=proc,generator=proc._nextpoint), 1315 at=_t) # continue a blocked get requestor 1316 else: 1317 break 1318 _e._post(_Action(who=obj,generator=obj._nextpoint), 1319 at=_t,prior=1) # continue the put requestor
1320
1321 - def _get(self,arg):
1322 """Handles get requests for Level instances""" 1323 obj=arg[1].who 1324 obj.got=None 1325 if len(arg[0]) == 5: # yield get,self,buff,whattoget,priority 1326 obj._getpriority[self]=arg[0][4] 1327 nrToGet=arg[0][3] 1328 elif len(arg[0]) == 4: # yield get,self,buff,whattoget 1329 obj._getpriority[self]=Buffer.priorityDefault #default 1330 nrToGet=arg[0][3] 1331 else: # yield get,self,buff 1332 obj._getpriority[self]=Buffer.priorityDefault 1333 nrToGet=1 1334 if type(nrToGet)!=type(1.0) and type(nrToGet)!=type(1): 1335 raise FatalSimerror\ 1336 ("Level: get parameter not a number: %s"%nrToGet) 1337 if nrToGet<0: 1338 raise FatalSimerror\ 1339 ("Level: get parameter not positive number: %s"%nrToGet) 1340 if self.amount < nrToGet: 1341 obj._nrToGet=nrToGet 1342 self.getQ.enterGet(obj) 1343 # passivate queuing process 1344 obj._nextTime=None 1345 else: 1346 obj.got=nrToGet 1347 self.nrBuffered-=nrToGet 1348 if self.monitored: 1349 self.bufferMon.observe(y=self.amount,t=now()) 1350 _e._post(_Action(who=obj,generator=obj._nextpoint), 1351 at=_t,prior=1) 1352 # reactivate any put requestors for which space is now available 1353 # service in queue-order; do not serve second in queue before first 1354 # has been served 1355 while len(self.putQ): #test for queued producers 1356 proc=self.putQ[0] 1357 if proc._whatToPut+self.amount<=self.capacity: 1358 self.nrBuffered+=proc._whatToPut 1359 if self.monitored: 1360 self.bufferMon.observe(y=self.amount,t=now()) 1361 self.putQ.takeout(proc)#requestor's record out of queue 1362 _e._post(_Action(who=proc,generator=proc._nextpoint), 1363 at=_t) # continue a blocked put requestor 1364 else: 1365 break
1366
1367 -class Store(Buffer):
1368 """Models buffers for processes coupled by putting/getting distinguishable 1369 items. 1370 Blocks a process when a put would cause buffer overflow or a get would cause 1371 buffer underflow. 1372 Default queuing discipline for blocked processes is priority FIFO. 1373 """
1374 - def getnrBuffered(self):
1375 return len(self.theBuffer)
1376 nrBuffered=property(getnrBuffered) 1377
1378 - def getbuffered(self):
1379 return self.theBuffer
1380 buffered=property(getbuffered) 1381
1382 - def __init__(self,**pars):
1383 Buffer.__init__(self,**pars) 1384 self.theBuffer=[] 1385 if self.name is None: 1386 self.name="a_store" ## default name 1387 if type(self.capacity)!=type(1) or self.capacity<=0: 1388 raise FatalSimerror\ 1389 ("Store: capacity parameter not a positive integer > 0: %s"\ 1390 %self.initialBuffered) 1391 if type(self.initialBuffered)==type([]): 1392 if len(self.initialBuffered)>self.capacity: 1393 raise FatalSimerror("initialBuffered exceeds capacity") 1394 else: 1395 self.theBuffer[:]=self.initialBuffered##buffer==list of objects 1396 elif self.initialBuffered is None: 1397 self.theBuffer=[] 1398 else: 1399 raise FatalSimerror\ 1400 ("Store: initialBuffered not a list") 1401 if self.monitored: 1402 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1403 self._sort=None
1404 1405 1406
1407 - def addSort(self,sortFunc):
1408 """Adds buffer sorting to this instance of Store. It maintains 1409 theBuffer sorted by the sortAttr attribute of the objects in the 1410 buffer. 1411 The user-provided 'sortFunc' must look like this: 1412 1413 def mySort(self,par): 1414 tmplist=[(x.sortAttr,x) for x in par] 1415 tmplist.sort() 1416 return [x for (key,x) in tmplist] 1417 1418 """ 1419 1420 self._sort=new.instancemethod(sortFunc,self,self.__class__) 1421 self.theBuffer=self._sort(self.theBuffer)
1422
1423 - def _put(self,arg):
1424 """Handles put requests for Store instances""" 1425 obj=arg[1].who 1426 if len(arg[0]) == 5: # yield put,self,buff,whattoput,priority 1427 obj._putpriority[self]=arg[0][4] 1428 whatToPut=arg[0][3] 1429 elif len(arg[0]) == 4: # yield put,self,buff,whattoput 1430 obj._putpriority[self]=Buffer.priorityDefault #default 1431 whatToPut=arg[0][3] 1432 else: # error, whattoput missing 1433 raise FatalSimerror("Item to put missing in yield put stmt") 1434 if type(whatToPut)!=type([]): 1435 raise FatalSimerror("put parameter is not a list") 1436 whatToPutNr=len(whatToPut) 1437 if whatToPutNr+self.nrBuffered>self.capacity: 1438 obj._nextTime=None #passivate put requestor 1439 obj._whatToPut=whatToPut 1440 self.putQ.enterPut(obj) #and queue, with items to put 1441 else: 1442 self.theBuffer.extend(whatToPut) 1443 if not(self._sort is None): 1444 self.theBuffer=self._sort(self.theBuffer) 1445 if self.monitored: 1446 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1447 _e._post(_Action(who=obj,generator=obj._nextpoint), 1448 at=_t,prior=1) # continue the put requestor 1449 1450 # service any waiting getters 1451 # service in queue order: do not serve second in queue before first 1452 # has been served 1453 block=False 1454 buffQ=self.getQ 1455 for getter in buffQ: 1456 if self.nrBuffered>0 and len(self.getQ): 1457 proc=getter 1458 if inspect.isfunction(proc._nrToGet): 1459 movCand=proc._nrToGet(self.theBuffer) #predicate parameter 1460 if movCand: 1461 proc.got=movCand[:] 1462 for i in movCand: 1463 self.theBuffer.remove(i) 1464 self.getQ.takeout(proc) 1465 if self.monitored: 1466 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1467 _e._post(_Action(who=proc,generator=proc._nextpoint), 1468 at=_t) # continue a blocked get requestor 1469 else: #numerical parameter 1470 if not block and proc._nrToGet<=self.nrBuffered: 1471 nrToGet=proc._nrToGet 1472 proc.got=[] 1473 proc.got[:]=self.theBuffer[0:nrToGet] 1474 self.theBuffer[:]=self.theBuffer[nrToGet:] 1475 if self.monitored: 1476 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1477 # take this get requestor's record out of queue: 1478 self.getQ.takeout(proc) 1479 _e._post(_Action(who=proc,generator=proc._nextpoint), 1480 at=_t) # continue a blocked get requestor 1481 else: 1482 # block subsequent numerically specified get's in getQ 1483 # to prevent starvation of larger gets by smaller ones 1484 block=True 1485 else: 1486 break # either out of items in buffer or out of getters in getQ
1487
1488 - def _get(self,arg):
1489 """Handles get requests""" 1490 filtfunc=None 1491 obj=arg[1].who 1492 obj.got=[] # the list of items retrieved by 'get' 1493 if len(arg[0]) == 5: # yield get,self,buff,whattoget,priority 1494 obj._getpriority[self]=arg[0][4] 1495 if inspect.isfunction(arg[0][3]): 1496 filtfunc=arg[0][3] 1497 else: 1498 nrToGet=arg[0][3] 1499 elif len(arg[0]) == 4: # yield get,self,buff,whattoget 1500 obj._getpriority[self]=Buffer.priorityDefault #default 1501 if inspect.isfunction(arg[0][3]): 1502 filtfunc=arg[0][3] 1503 else: 1504 nrToGet=arg[0][3] 1505 else: # yield get,self,buff 1506 obj._getpriority[self]=Buffer.priorityDefault 1507 nrToGet=1 1508 if not filtfunc: #number specifies nr items to get 1509 if nrToGet<0: 1510 raise FatalSimerror\ 1511 ("Store: get parameter not positive number: %s"%nrToGet) 1512 if self.nrBuffered < nrToGet: 1513 obj._nrToGet=nrToGet 1514 self.getQ.enterGet(obj) 1515 # passivate/block queuing 'get' process 1516 obj._nextTime=None 1517 else: 1518 for i in range(nrToGet): 1519 obj.got.append(self.theBuffer.pop(0)) # move items from 1520 # buffer to requesting process 1521 if self.monitored: 1522 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1523 _e._post(_Action(who=obj,generator=obj._nextpoint), 1524 at=_t,prior=1) 1525 # reactivate any put requestors for which space is now available 1526 # serve in queue order: do not serve second in queue before first 1527 # has been served 1528 while len(self.putQ): 1529 proc=self.putQ[0] 1530 if len(proc._whatToPut)+self.nrBuffered<=self.capacity: 1531 for i in proc._whatToPut: 1532 self.theBuffer.append(i) #move items to buffer 1533 if not(self._sort is None): 1534 self.theBuffer=self._sort(self.theBuffer) 1535 if self.monitored: 1536 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1537 self.putQ.takeout(proc) # dequeue requestor's record 1538 _e._post(_Action(who=proc,generator=proc._nextpoint), 1539 at=_t) # continue a blocked put requestor 1540 else: 1541 break 1542 else: # items to get determined by filtfunc 1543 movCand=filtfunc(self.theBuffer) 1544 if movCand: # get succeded 1545 _e._post(_Action(who=obj,generator=obj._nextpoint), 1546 at=_t,prior=1) 1547 obj.got=movCand[:] 1548 for item in movCand: 1549 self.theBuffer.remove(item) 1550 if self.monitored: 1551 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1552 # reactivate any put requestors for which space is now available 1553 # serve in queue order: do not serve second in queue before first 1554 # has been served 1555 while len(self.putQ): 1556 proc=self.putQ[0] 1557 if len(proc._whatToPut)+self.nrBuffered<=self.capacity: 1558 for i in proc._whatToPut: 1559 self.theBuffer.append(i) #move items to buffer 1560 if not(self._sort is None): 1561 self.theBuffer=self._sort(self.theBuffer) 1562 if self.monitored: 1563 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1564 self.putQ.takeout(proc) # dequeue requestor's record 1565 _e._post(_Action(who=proc,generator=proc._nextpoint), 1566 at=_t) # continue a blocked put requestor 1567 else: 1568 break 1569 else: # get did not succeed, block 1570 obj._nrToGet=filtfunc 1571 self.getQ.enterGet(obj) 1572 # passivate/block queuing 'get' process 1573 obj._nextTime=None
1574 1575 1576 1577
1578 -class SimEvent(Lister):
1579 """Supports one-shot signalling between processes. All processes waiting for an event to occur 1580 get activated when its occurrence is signalled. From the processes queuing for an event, only 1581 the first gets activated. 1582 """
1583 - def __init__(self,name="a_SimEvent"):
1584 self.name=name 1585 self.waits=[] 1586 self.queues=[] 1587 self.occurred=False 1588 self.signalparam=None
1589
1590 - def signal(self,param=None):
1591 """Produces a signal to self; 1592 Fires this event (makes it occur). 1593 Reactivates ALL processes waiting for this event. (Cleanup waits lists 1594 of other events if wait was for an event-group (OR).) 1595 Reactivates the first process for which event(s) it is queuing for 1596 have fired. (Cleanup queues of other events if wait was for an event-group (OR).) 1597 """ 1598 self.signalparam=param 1599 if not self.waits and not self.queues: 1600 self.occurred=True 1601 else: 1602 #reactivate all waiting processes 1603 for p in self.waits: 1604 p[0].eventsFired.append(self) 1605 reactivate(p[0],prior=True) 1606 #delete waits entries for this process in other events 1607 for ev in p[1]: 1608 if ev!=self: 1609 if ev.occurred: 1610 p[0].eventsFired.append(ev) 1611 for iev in ev.waits: 1612 if iev[0]==p[0]: 1613 ev.waits.remove(iev) 1614 break 1615 self.waits=[] 1616 if self.queues: 1617 proc=self.queues.pop(0)[0] 1618 proc.eventsFired.append(self) 1619 reactivate(proc,prior=True)
1620
1621 - def _wait(self,par):
1622 """Consumes a signal if it has occurred, otherwise process 'proc' 1623 waits for this event. 1624 """ 1625 proc=par[0][1] #the process issuing the yield waitevent command 1626 proc.eventsFired=[] 1627 if not self.occurred: 1628 self.waits.append([proc,[self]]) 1629 proc._nextTime=None #passivate calling process 1630 else: 1631 proc.eventsFired.append(self) 1632 self.occurred=False 1633 _e._post(_Action(who=proc,generator=proc._nextpoint), 1634 at=_t,prior=1)
1635
1636 - def _waitOR(self,par):
1637 """Handles waiting for an OR of events in a tuple/list. 1638 """ 1639 proc=par[0][1] 1640 evlist=par[0][2] 1641 proc.eventsFired=[] 1642 anyoccur=False 1643 for ev in evlist: 1644 if ev.occurred: 1645 anyoccur=True 1646 proc.eventsFired.append(ev) 1647 ev.occurred=False 1648 if anyoccur: #at least one event has fired; continue process 1649 _e._post(_Action(who=proc,generator=proc._nextpoint), 1650 at=_t,prior=1) 1651 1652 else: #no event in list has fired, enter process in all 'waits' lists 1653 proc.eventsFired=[] 1654 proc._nextTime=None #passivate calling process 1655 for ev in evlist: 1656 ev.waits.append([proc,evlist])
1657
1658 - def _queue(self,par):
1659 """Consumes a signal if it has occurred, otherwise process 'proc' 1660 queues for this event. 1661 """ 1662 proc=par[0][1] #the process issuing the yield queueevent command 1663 proc.eventsFired=[] 1664 if not self.occurred: 1665 self.queues.append([proc,[self]]) 1666 proc._nextTime=None #passivate calling process 1667 else: 1668 proc.eventsFired.append(self) 1669 self.occurred=False 1670 _e._post(_Action(who=proc,generator=proc._nextpoint), 1671 at=_t,prior=1)
1672
1673 - def _queueOR(self,par):
1674 """Handles queueing for an OR of events in a tuple/list. 1675 """ 1676 proc=par[0][1] 1677 evlist=par[0][2] 1678 proc.eventsFired=[] 1679 anyoccur=False 1680 for ev in evlist: 1681 if ev.occurred: 1682 anyoccur=True 1683 proc.eventsFired.append(ev) 1684 ev.occurred=False 1685 if anyoccur: #at least one event has fired; continue process 1686 _e._post(_Action(who=proc,generator=proc._nextpoint), 1687 at=_t,prior=1) 1688 1689 else: #no event in list has fired, enter process in all 'waits' lists 1690 proc.eventsFired=[] 1691 proc._nextTime=None #passivate calling process 1692 for ev in evlist: 1693 ev.queues.append([proc,evlist])
1694 1695 ## begin waituntil functionality
1696 -def _test():
1697 """ 1698 Gets called by simulate after every event, as long as there are processes 1699 waiting in condQ for a condition to be satisfied. 1700 Tests the conditions for all waiting processes. Where condition satisfied, 1701 reactivates that process immediately and removes it from queue. 1702 """ 1703 global condQ 1704 rList=[] 1705 for el in condQ: 1706 if el.cond(): 1707 rList.append(el) 1708 reactivate(el) 1709 for i in rList: 1710 condQ.remove(i) 1711 1712 if not condQ: 1713 _stopWUStepping()
1714
1715 -def _waitUntilFunc(proc,cond):
1716 global condQ 1717 """ 1718 Puts a process 'proc' waiting for a condition into a waiting queue. 1719 'cond' is a predicate function which returns True if the condition is 1720 satisfied. 1721 """ 1722 if not cond(): 1723 condQ.append(proc) 1724 proc.cond=cond 1725 _startWUStepping() #signal 'simulate' that a process is waiting 1726 # passivate calling process 1727 proc._nextTime=None 1728 else: 1729 #schedule continuation of calling process 1730 _e._post(_Action(who=proc,generator=proc._nextpoint), 1731 at=_t,prior=1)
1732 1733 1734 ##end waituntil functionality 1735
1736 -def scheduler(till=0):
1737 """Schedules Processes/semi-coroutines until time 'till'. 1738 Deprecated since version 0.5. 1739 """ 1740 simulate(until=till) 1741
1742 -def holdfunc(a):
1743 a[0][1]._hold(a)
1744
1745 -def requestfunc(a):
1746 """Handles 'yield request,self,res' and 'yield (request,self,res),(<code>,self,par)'. 1747 <code> can be 'hold' or 'waitevent'. 1748 """ 1749 if type(a[0][0])==tuple: 1750 ## Compound yield request statement 1751 ## first tuple in ((request,self,res),(xx,self,yy)) 1752 b=a[0][0] 1753 ## b[2]==res (the resource requested) 1754 ##process the first part of the compound yield statement 1755 ##a[1] is the _Action instance (a[1].who==process owner, a[1].process==generator) 1756 b[2]._request(arg=(b,a[1])) 1757 ##deal with add-on condition to command 1758 ##Trigger processes for reneging 1759 class _Holder(Process): 1760 """Provides timeout process""" 1761 def trigger(self,delay): 1762 yield hold,self,delay 1763 if not proc in b[2].activeQ: 1764 reactivate(proc)
1765 1766 class _EventWait(Process): 1767 """Provides event waiting process""" 1768 def trigger(self,event): 1769 yield waitevent,self,event 1770 if not proc in b[2].activeQ: 1771 a[1].who.eventsFired=self.eventsFired 1772 reactivate(proc) 1773 1774 #activate it 1775 proc=a[0][0][1] # the process to be woken up 1776 actCode=a[0][1][0] 1777 if actCode==hold: 1778 proc._holder=_Holder(name="RENEGE-hold for %s"%proc.name) 1779 ## the timeout delay 1780 activate(proc._holder,proc._holder.trigger(a[0][1][2])) 1781 elif actCode==waituntil: 1782 raise FatalSimerror("Illegal code for reneging: waituntil") 1783 elif actCode==waitevent: 1784 proc._holder=_EventWait(name="RENEGE-waitevent for %s"%proc.name) 1785 ## the event 1786 activate(proc._holder,proc._holder.trigger(a[0][1][2])) 1787 elif actCode==queueevent: 1788 raise FatalSimerror("Illegal code for reneging: queueevent") 1789 else: 1790 raise FatalSimerror("Illegal code for reneging %s"%actCode) 1791 else: 1792 ## Simple yield request command 1793 a[0][2]._request(a) 1794
1795 -def releasefunc(a):
1796 a[0][2]._release(a)
1797
1798 -def passivatefunc(a):
1799 a[0][1]._passivate(a)
1800
1801 -def waitevfunc(a):
1802 #if waiting for one event only (not a tuple or list) 1803 evtpar=a[0][2] 1804 if isinstance(evtpar,SimEvent): 1805 a[0][2]._wait(a) 1806 # else, if waiting for an OR of events (list/tuple): 1807 else: #it should be a list/tuple of events 1808 # call _waitOR for first event 1809 evtpar[0]._waitOR(a)
1810
1811 -def queueevfunc(a):
1812 #if queueing for one event only (not a tuple or list) 1813 evtpar=a[0][2] 1814 if isinstance(evtpar,SimEvent): 1815 a[0][2]._queue(a) 1816 #else, if queueing for an OR of events (list/tuple): 1817 else: #it should be a list/tuple of events 1818 # call _queueOR for first event 1819 evtpar[0]._queueOR(a)
1820
1821 -def waituntilfunc(par):
1822 _waitUntilFunc(par[0][1],par[0][2])
1823
1824 -def getfunc(a):
1825 """Handles 'yield get,self,buffer,what,priority' and 1826 'yield (get,self,buffer,what,priority),(<code>,self,par)'. 1827 <code> can be 'hold' or 'waitevent'. 1828 """ 1829 if type(a[0][0])==tuple: 1830 ## Compound yield request statement 1831 ## first tuple in ((request,self,res),(xx,self,yy)) 1832 b=a[0][0] 1833 ## b[2]==res (the resource requested) 1834 ##process the first part of the compound yield statement 1835 ##a[1] is the _Action instance (a[1].who==process owner, a[1].process==generator) 1836 b[2]._get(arg=(b,a[1])) 1837 ##deal with add-on condition to command 1838 ##Trigger processes for reneging 1839 class _Holder(Process): 1840 """Provides timeout process""" 1841 def trigger(self,delay): 1842 yield hold,self,delay 1843 #if not proc in b[2].activeQ: 1844 if proc in b[2].getQ: 1845 reactivate(proc)
1846 1847 class _EventWait(Process): 1848 """Provides event waiting process""" 1849 def trigger(self,event): 1850 yield waitevent,self,event 1851 if proc in b[2].getQ: 1852 a[1].who.eventsFired=self.eventsFired 1853 reactivate(proc) 1854 1855 #activate it 1856 proc=a[0][0][1] # the process to be woken up 1857 actCode=a[0][1][0] 1858 if actCode==hold: 1859 proc._holder=_Holder("RENEGE-hold for %s"%proc.name) 1860 ## the timeout delay 1861 activate(proc._holder,proc._holder.trigger(a[0][1][2])) 1862 elif actCode==waituntil: 1863 raise FatalSimerror("Illegal code for reneging: waituntil") 1864 elif actCode==waitevent: 1865 proc._holder=_EventWait(proc.name) 1866 ## the event 1867 activate(proc._holder,proc._holder.trigger(a[0][1][2])) 1868 elif actCode==queueevent: 1869 raise FatalSimerror("Illegal code for reneging: queueevent") 1870 else: 1871 raise FatalSimerror("Illegal code for reneging %s"%actCode) 1872 else: 1873 ## Simple yield request command 1874 a[0][2]._get(a) 1875 1876
1877 -def putfunc(a):
1878 """Handles 'yield put' (simple and compound hold/waitevent) 1879 """ 1880 if type(a[0][0])==tuple: 1881 ## Compound yield request statement 1882 ## first tuple in ((request,self,res),(xx,self,yy)) 1883 b=a[0][0] 1884 ## b[2]==res (the resource requested) 1885 ##process the first part of the compound yield statement 1886 ##a[1] is the _Action instance (a[1].who==process owner, a[1].process==generator) 1887 b[2]._put(arg=(b,a[1])) 1888 ##deal with add-on condition to command 1889 ##Trigger processes for reneging 1890 class _Holder(Process): 1891 """Provides timeout process""" 1892 def trigger(self,delay): 1893 yield hold,self,delay 1894 #if not proc in b[2].activeQ: 1895 if proc in b[2].putQ: 1896 reactivate(proc)
1897 1898 class _EventWait(Process): 1899 """Provides event waiting process""" 1900 def trigger(self,event): 1901 yield waitevent,self,event 1902 if proc in b[2].putQ: 1903 a[1].who.eventsFired=self.eventsFired 1904 reactivate(proc) 1905 1906 #activate it 1907 proc=a[0][0][1] # the process to be woken up 1908 actCode=a[0][1][0] 1909 if actCode==hold: 1910 proc._holder=_Holder("RENEGE-hold for %s"%proc.name) 1911 ## the timeout delay 1912 activate(proc._holder,proc._holder.trigger(a[0][1][2])) 1913 elif actCode==waituntil: 1914 raise FatalSimerror("Illegal code for reneging: waituntil") 1915 elif actCode==waitevent: 1916 proc._holder=_EventWait("RENEGE-waitevent for %s"%proc.name) 1917 ## the event 1918 activate(proc._holder,proc._holder.trigger(a[0][1][2])) 1919 elif actCode==queueevent: 1920 raise FatalSimerror("Illegal code for reneging: queueevent") 1921 else: 1922 raise FatalSimerror("Illegal code for reneging %s"%actCode) 1923 else: 1924 ## Simple yield request command 1925 a[0][2]._put(a) 1926
1927 -def simulate(until=0):
1928 """Schedules Processes/semi-coroutines until time 'until'""" 1929 1930 """Gets called once. Afterwards, co-routines (generators) return by 1931 'yield' with a cargo: 1932 yield hold, self, <delay>: schedules the "self" process for activation 1933 after <delay> time units.If <,delay> missing, 1934 same as "yield hold,self,0" 1935 1936 yield passivate,self : makes the "self" process wait to be re-activated 1937 1938 yield request,self,<Resource>[,<priority>]: request 1 unit from <Resource> 1939 with <priority> pos integer (default=0) 1940 1941 yield release,self,<Resource> : release 1 unit to <Resource> 1942 1943 yield waitevent,self,<SimEvent>|[<Evt1>,<Evt2>,<Evt3), . . . ]: 1944 wait for one or more of several events 1945 1946 1947 yield queueevent,self,<SimEvent>|[<Evt1>,<Evt2>,<Evt3), . . . ]: 1948 queue for one or more of several events 1949 1950 yield waituntil,self,cond : wait for arbitrary condition 1951 1952 yield get,self,<buffer>[,<WhatToGet>[,<priority>]] 1953 get <WhatToGet> items from buffer (default=1); 1954 <WhatToGet> can be a pos integer or a filter function 1955 (Store only) 1956 1957 yield put,self,<buffer>[,<WhatToPut>[,priority]] 1958 put <WhatToPut> items into buffer (default=1); 1959 <WhatToPut> can be a pos integer (Level) or a list of objects 1960 (Store) 1961 1962 EXTENSIONS: 1963 Request with timeout reneging: 1964 yield (request,self,<Resource>),(hold,self,<patience>) : 1965 requests 1 unit from <Resource>. If unit not acquired in time period 1966 <patience>, self leaves waitQ (reneges). 1967 1968 Request with event-based reneging: 1969 yield (request,self,<Resource>),(waitevent,self,<eventlist>): 1970 requests 1 unit from <Resource>. If one of the events in <eventlist> occurs before unit 1971 acquired, self leaves waitQ (reneges). 1972 1973 Get with timeout reneging (for Store and Level): 1974 yield (get,self,<buffer>,nrToGet etc.),(hold,self,<patience>) 1975 requests <nrToGet> items/units from <buffer>. If not acquired <nrToGet> in time period 1976 <patience>, self leaves <buffer>.getQ (reneges). 1977 1978 Get with event-based reneging (for Store and Level): 1979 yield (get,self,<buffer>,nrToGet etc.),(waitevent,self,<eventlist>) 1980 requests <nrToGet> items/units from <buffer>. If not acquired <nrToGet> before one of 1981 the events in <eventlist> occurs, self leaves <buffer>.getQ (reneges). 1982 1983 1984 1985 Event notices get posted in event-list by scheduler after "yield" or by 1986 "activate"/"reactivate" functions. 1987 1988 """ 1989 global _endtime,_e,_stop,_t,_wustep 1990 _stop=False 1991 1992 if _e is None: 1993 raise FatalSimerror("Simulation not initialized") 1994 if _e.events == {}: 1995 message="SimPy: No activities scheduled" 1996 return message 1997 1998 _endtime=until 1999 message="SimPy: Normal exit" 2000 dispatch={hold:holdfunc,request:requestfunc,release:releasefunc, 2001 passivate:passivatefunc,waitevent:waitevfunc,queueevent:queueevfunc, 2002 waituntil:waituntilfunc,get:getfunc,put:putfunc} 2003 commandcodes=dispatch.keys() 2004 commandwords={hold:"hold",request:"request",release:"release",passivate:"passivate", 2005 waitevent:"waitevent",queueevent:"queueevent",waituntil:"waituntil", 2006 get:"get",put:"put"} 2007 while not _stop and _t<=_endtime: 2008 try: 2009 a=_e._nextev() 2010 if not a[0] is None: 2011 ## 'a' is tuple "(<yield command>, <action>)" 2012 if type(a[0][0])==tuple: 2013 ##allowing for yield (request,self,res),(waituntil,self,cond) 2014 command=a[0][0][0] 2015 else: 2016 command = a[0][0] 2017 if __debug__: 2018 if not command in commandcodes: 2019 raise FatalSimerror("Illegal command: yield %s"%command) 2020 dispatch[command](a) 2021 except FatalSimerror,error: 2022 print "SimPy: "+error.value 2023 sys.exit(1) 2024 except Simerror,error: 2025 message="SimPy: "+error.value 2026 _stop = True 2027 if _wustep: 2028 _test() 2029 _stopWUStepping() 2030 _e=None 2031 return message
2032 2033 2034 if __name__ == "__main__": 2035 print "SimPy.Simulation %s" %__version__ 2036 2037 ############# Test/demo functions #############
2038 - def test_demo():
2039 class Aa(Process): 2040 sequIn=[] 2041 sequOut=[] 2042 def __init__(self,holdtime,name): 2043 Process.__init__(self,name) 2044 self.holdtime=holdtime
2045 2046 def life(self,priority): 2047 for i in range(1): 2048 Aa.sequIn.append(self.name) 2049 print now(),rrr.name,"waitQ:",len(rrr.waitQ),"activeQ:",\ 2050 len(rrr.activeQ) 2051 print "waitQ: ",[(k.name,k._priority[rrr]) for k in rrr.waitQ] 2052 print "activeQ: ",[(k.name,k._priority[rrr]) \ 2053 for k in rrr.activeQ] 2054 assert rrr.n+len(rrr.activeQ)==rrr.capacity, \ 2055 "Inconsistent resource unit numbers" 2056 print now(),self.name,"requests 1 ", rrr.unitName 2057 yield request,self,rrr,priority 2058 print now(),self.name,"has 1 ",rrr.unitName 2059 print now(),rrr.name,"waitQ:",len(rrr.waitQ),"activeQ:",\ 2060 len(rrr.activeQ) 2061 print now(),rrr.name,"waitQ:",len(rrr.waitQ),"activeQ:",\ 2062 len(rrr.activeQ) 2063 assert rrr.n+len(rrr.activeQ)==rrr.capacity, \ 2064 "Inconsistent resource unit numbers" 2065 yield hold,self,self.holdtime 2066 print now(),self.name,"gives up 1",rrr.unitName 2067 yield release,self,rrr 2068 Aa.sequOut.append(self.name) 2069 print now(),self.name,"has released 1 ",rrr.unitName 2070 print "waitQ: ",[(k.name,k._priority[rrr]) for k in rrr.waitQ] 2071 print now(),rrr.name,"waitQ:",len(rrr.waitQ),"activeQ:",\ 2072 len(rrr.activeQ) 2073 assert rrr.n+len(rrr.activeQ)==rrr.capacity, \ 2074 "Inconsistent resource unit numbers" 2075 2076 class Observer(Process): 2077 def __init__(self): 2078 Process.__init__(self) 2079 2080 def observe(self,step,processes,res): 2081 while now()<11: 2082 for i in processes: 2083 print " %s %s: act:%s, pass:%s, term: %s,interr:%s, qu:%s"\ 2084 %(now(),i.name,i.active(),i.passive(),i.terminated()\ 2085 ,i.interrupted(),i.queuing(res)) 2086 print 2087 yield hold,self,step 2088 2089 print"\n+++test_demo output" 2090 print "****First case == priority queue, resource service not preemptable" 2091 initialize() 2092 rrr=Resource(5,name="Parking",unitName="space(s)", qType=PriorityQ, 2093 preemptable=0) 2094 procs=[] 2095 for i in range(10): 2096 z=Aa(holdtime=i,name="Car "+str(i)) 2097 procs.append(z) 2098 activate(z,z.life(priority=i)) 2099 o=Observer() 2100 activate(o,o.observe(1,procs,rrr)) 2101 a=simulate(until=10000) 2102 print a 2103 print "Input sequence: ",Aa.sequIn 2104 print "Output sequence: ",Aa.sequOut 2105 2106 print "\n****Second case == priority queue, resource service preemptable" 2107 initialize() 2108 rrr=Resource(5,name="Parking",unitName="space(s)", qType=PriorityQ, 2109 preemptable=1) 2110 procs=[] 2111 for i in range(10): 2112 z=Aa(holdtime=i,name="Car "+str(i)) 2113 procs.append(z) 2114 activate(z,z.life(priority=i)) 2115 o=Observer() 2116 activate(o,o.observe(1,procs,rrr)) 2117 Aa.sequIn=[] 2118 Aa.sequOut=[] 2119 a=simulate(until=10000) 2120 print a 2121 print "Input sequence: ",Aa.sequIn 2122 print "Output sequence: ",Aa.sequOut 2123
2124 - def test_interrupt():
2125 class Bus(Process): 2126 def __init__(self,name): 2127 Process.__init__(self,name)
2128 2129 def operate(self,repairduration=0): 2130 print now(),">> %s starts" %(self.name) 2131 tripleft = 1000 2132 while tripleft > 0: 2133 yield hold,self,tripleft 2134 if self.interrupted(): 2135 print "interrupted by %s" %self.interruptCause.name 2136 print "%s: %s breaks down " %(now(),self.name) 2137 tripleft=self.interruptLeft 2138 self.interruptReset() 2139 print "tripleft ",tripleft 2140 reactivate(br,delay=repairduration) # breakdowns only during operation 2141 yield hold,self,repairduration 2142 print now()," repaired" 2143 else: 2144 break # no breakdown, ergo bus arrived 2145 print now(),"<< %s done" %(self.name) 2146 2147 class Breakdown(Process): 2148 def __init__(self,myBus): 2149 Process.__init__(self,name="Breakdown "+myBus.name) 2150 self.bus=myBus 2151 2152 def breakBus(self,interval): 2153 2154 while True: 2155 yield hold,self,interval 2156 if self.bus.terminated(): break 2157 self.interrupt(self.bus) 2158 2159 print"\n\n+++test_interrupt" 2160 initialize() 2161 b=Bus("Bus 1") 2162 activate(b,b.operate(repairduration=20)) 2163 br=Breakdown(b) 2164 activate(br,br.breakBus(200)) 2165 print simulate(until=4000) 2166
2167 - def testSimEvents():
2168 class Waiter(Process): 2169 def waiting(self,theSignal): 2170 while True: 2171 yield waitevent,self,theSignal 2172 print "%s: process '%s' continued after waiting for %s"%(now(),self.name,theSignal.name) 2173 yield queueevent,self,theSignal 2174 print "%s: process '%s' continued after queueing for %s"%(now(),self.name,theSignal.name)
2175 2176 class ORWaiter(Process): 2177 def waiting(self,signals): 2178 while True: 2179 yield waitevent,self,signals 2180 print now(),"one of %s signals occurred"%[x.name for x in signals] 2181 print "\t%s (fired/param)"%[(x.name,x.signalparam) for x in self.eventsFired] 2182 yield hold,self,1 2183 2184 class Caller(Process): 2185 def calling(self): 2186 while True: 2187 signal1.signal("wake up!") 2188 print "%s: signal 1 has occurred"%now() 2189 yield hold,self,10 2190 signal2.signal("and again") 2191 signal2.signal("sig 2 again") 2192 print "%s: signal1, signal2 have occurred"%now() 2193 yield hold,self,10 2194 print"\n+++testSimEvents output" 2195 initialize() 2196 signal1=SimEvent("signal 1") 2197 signal2=SimEvent("signal 2") 2198 signal1.signal("startup1") 2199 signal2.signal("startup2") 2200 w1=Waiter("waiting for signal 1") 2201 activate(w1,w1.waiting(signal1)) 2202 w2=Waiter("waiting for signal 2") 2203 activate(w2,w2.waiting(signal2)) 2204 w3=Waiter("also waiting for signal 2") 2205 activate(w3,w3.waiting(signal2)) 2206 w4=ORWaiter("waiting for either signal 1 or signal 2") 2207 activate(w4,w4.waiting([signal1,signal2]),prior=True) 2208 c=Caller("Caller") 2209 activate(c,c.calling()) 2210 print simulate(until=100) 2211
2212 - def testwaituntil():
2213 """ 2214 Demo of waitUntil capability. 2215 2216 Scenario: 2217 Three workers require sets of tools to do their jobs. Tools are shared, scarce 2218 resources for which they compete. 2219 """ 2220 class Worker(Process): 2221 def __init__(self,name,heNeeds=[]): 2222 Process.__init__(self,name) 2223 self.heNeeds=heNeeds
2224 def work(self): 2225 2226 def workerNeeds(): 2227 for item in self.heNeeds: 2228 if item.n==0: 2229 return False 2230 return True 2231 2232 while now()<8*60: 2233 yield waituntil,self,workerNeeds 2234 for item in self.heNeeds: 2235 yield request,self,item 2236 print "%s %s has %s and starts job" %(now(),self.name, 2237 [x.name for x in self.heNeeds]) 2238 yield hold,self,random.uniform(10,30) 2239 for item in self.heNeeds: 2240 yield release,self,item 2241 yield hold,self,2 #rest 2242 2243 print "\n+++ nwaituntil demo output" 2244 initialize() 2245 brush=Resource(capacity=1,name="brush") 2246 ladder=Resource(capacity=2,name="ladder") 2247 hammer=Resource(capacity=1,name="hammer") 2248 saw=Resource(capacity=1,name="saw") 2249 painter=Worker("painter",[brush,ladder]) 2250 activate(painter,painter.work()) 2251 roofer=Worker("roofer",[hammer,ladder,ladder]) 2252 activate(roofer,roofer.work()) 2253 treeguy=Worker("treeguy",[saw,ladder]) 2254 activate(treeguy,treeguy.work()) 2255 for who in (painter,roofer,treeguy): 2256 print "%s needs %s for his job" %(who.name,[x.name for x in who.heNeeds]) 2257 print 2258 print simulate(until=9*60) 2259 2260 ## ------------------------------------------------------------- 2261 ## TEST COMPOUND "YIELD REQUEST" COMMANDS 2262 ## ------------------------------------------------------------- 2263 2264 ## ------------------------------------------------------------- 2265 ## TEST "yield (request,self,res),(hold,self,delay)" 2266 ## == timeout renege 2267 ## ------------------------------------------------------------- 2268
2269 - class JobTO(Process):
2270 """ Job class for testing timeout reneging 2271 """
2272 - def __init__(self,server=None,name=""):
2273 Process.__init__(self,name) 2274 self.res=server 2275 self.gotResource=None
2276
2277 - def execute(self,timeout,usetime):
2278 yield (request,self,self.res),(hold,self,timeout) 2279 if self.acquired(self.res): 2280 self.gotResource=True 2281 yield hold,self,usetime 2282 yield release,self,self.res 2283 else: 2284 self.gotResource=False
2285
2286 - def testNoTimeout():
2287 """Test that resource gets acquired without timeout 2288 """ 2289 res=Resource(name="Server",capacity=1) 2290 initialize() 2291 usetime=5 2292 timeout=1000000 2293 j1=JobTO(server=res,name="Job_1") 2294 activate(j1,j1.execute(timeout=timeout,usetime=usetime)) 2295 j2=JobTO(server=res,name="Job_2") 2296 activate(j2,j2.execute(timeout=timeout,usetime=usetime)) 2297 simulate(until=2*usetime) 2298 assert now()==2*usetime,"time not ==2*usetime" 2299 assert j1.gotResource and j2.gotResource,\ 2300 "at least one job failed to get resource" 2301 assert not (res.waitQ or res.activeQ),\ 2302 "job waiting or using resource"
2303
2304 - def testTimeout1():
2305 """Test that timeout occurs when resource busy 2306 """ 2307 res=Resource(name="Server",capacity=1,monitored=True) 2308 initialize() 2309 usetime=5 2310 timeout=3 2311 j1=JobTO(server=res,name="Job_1") 2312 activate(j1,j1.execute(timeout=timeout,usetime=usetime)) 2313 j2=JobTO(server=res,name="Job_2") 2314 activate(j2,j2.execute(timeout=timeout,usetime=usetime)) 2315 simulate(until=2*usetime) 2316 assert(now()==usetime),"time not ==usetime" 2317 assert(j1.gotResource),"Job_1 did not get resource" 2318 assert(not j2.gotResource),"Job_2 did not renege" 2319 assert not (res.waitQ or res.activeQ),\ 2320 "job waiting or using resource"
2321
2322 - def testTimeout2():
2323 """Test that timeout occurs when resource has no capacity free 2324 """ 2325 res=Resource(name="Server",capacity=0) 2326 initialize() 2327 usetime=5 2328 timeout=3 2329 j1=JobTO(server=res,name="Job_1") 2330 activate(j1,j1.execute(timeout=timeout,usetime=usetime)) 2331 j2=JobTO(server=res,name="Job_2") 2332 activate(j2,j2.execute(timeout=timeout,usetime=usetime)) 2333 simulate(until=2*usetime) 2334 assert now()==timeout,"time %s not == timeout"%now() 2335 assert not j1.gotResource,"Job_1 got resource" 2336 assert not j2.gotResource,"Job_2 got resource" 2337 assert not (res.waitQ or res.activeQ),\ 2338 "job waiting or using resource"
2339 2340 ## ------------------------------------------------------------------ 2341 ## TEST "yield (request,self,res),(waitevent,self,event)" 2342 ## == event renege 2343 ## ------------------------------------------------------------------
2344 - class JobEvt(Process):
2345 """ Job class for testing event reneging 2346 """
2347 - def __init__(self,server=None,name=""):
2348 Process.__init__(self,name) 2349 self.res=server 2350 self.gotResource=None
2351
2352 - def execute(self,event,usetime):
2353 yield (request,self,self.res),(waitevent,self,event) 2354 if self.acquired(self.res): 2355 self.gotResource=True 2356 yield hold,self,usetime 2357 yield release,self,self.res 2358 else: 2359 self.gotResource=False
2360
2361 - class JobEvtMulti(Process):
2362 """ Job class for testing event reneging with multi-event lists 2363 """
2364 - def __init__(self,server=None,name=""):
2365 Process.__init__(self,name) 2366 self.res=server 2367 self.gotResource=None
2368
2369 - def execute(self,eventlist,usetime):
2370 yield (request,self,self.res),(waitevent,self,eventlist) 2371 if self.acquired(self.res): 2372 self.gotResource=True 2373 yield hold,self,usetime 2374 yield release,self,self.res 2375 else: 2376 self.gotResource=False
2377
2378 - class FireEvent(Process):
2379 """Fires reneging event 2380 """
2381 - def fire(self,fireDelay,event):
2382 yield hold,self,fireDelay 2383 event.signal()
2384
2385 - def testNoEvent():
2386 """Test that processes acquire resource normally if no event fires 2387 """ 2388 res=Resource(name="Server",capacity=1) 2389 event=SimEvent("Renege_trigger") #never gets fired 2390 initialize() 2391 usetime=5 2392 j1=JobEvt(server=res,name="Job_1") 2393 activate(j1,j1.execute(event=event,usetime=usetime)) 2394 j2=JobEvt(server=res,name="Job_2") 2395 activate(j2,j2.execute(event=event,usetime=usetime)) 2396 simulate(until=2*usetime) 2397 # Both jobs should get server (in sequence) 2398 assert now()==2*usetime,"time not ==2*usetime" 2399 assert j1.gotResource and j2.gotResource,\ 2400 "at least one job failed to get resource" 2401 assert not (res.waitQ or res.activeQ),\ 2402 "job waiting or using resource"
2403
2404 - def testWaitEvent1():
2405 """Test that signalled event leads to renege when resource busy 2406 """ 2407 res=Resource(name="Server",capacity=1) 2408 initialize() 2409 event=SimEvent("Renege_trigger") 2410 usetime=5 2411 eventtime=1 2412 j1=JobEvt(server=res,name="Job_1") 2413 activate(j1,j1.execute(event=event,usetime=usetime)) 2414 j2=JobEvt(server=res,name="Job_2") 2415 activate(j2,j2.execute(event=event,usetime=usetime)) 2416 f=FireEvent(name="FireEvent") 2417 activate(f,f.fire(fireDelay=eventtime,event=event)) 2418 simulate(until=2*usetime) 2419 # Job_1 should get server, Job_2 renege 2420 assert(now()==usetime),"time not ==usetime" 2421 assert(j1.gotResource),"Job_1 did not get resource" 2422 assert(not j2.gotResource),"Job_2 did not renege" 2423 assert not (res.waitQ or res.activeQ),\ 2424 "job waiting or using resource"
2425
2426 - def testWaitEvent2():
2427 """Test that renege-triggering event can be one of an event list 2428 """ 2429 res=Resource(name="Server",capacity=1) 2430 initialize() 2431 event1=SimEvent("Renege_trigger_1") 2432 event2=SimEvent("Renege_trigger_2") 2433 usetime=5 2434 eventtime=1 #for both events 2435 j1=JobEvtMulti(server=res,name="Job_1") 2436 activate(j1,j1.execute(eventlist=[event1,event2],usetime=usetime)) 2437 j2=JobEvtMulti(server=res,name="Job_2") 2438 activate(j2,j2.execute(eventlist=[event1,event2],usetime=usetime)) 2439 f1=FireEvent(name="FireEvent_1") 2440 activate(f1,f1.fire(fireDelay=eventtime,event=event1)) 2441 f2=FireEvent(name="FireEvent_2") 2442 activate(f2,f2.fire(fireDelay=eventtime,event=event2)) 2443 simulate(until=2*usetime) 2444 # Job_1 should get server, Job_2 should renege 2445 assert(now()==usetime),"time not ==usetime" 2446 assert(j1.gotResource),"Job_1 did not get resource" 2447 assert(not j2.gotResource),"Job_2 did not renege" 2448 assert not (res.waitQ or res.activeQ),\ 2449 "job waiting or using resource"
2450 2451 ## Run tests 2452 testNoTimeout() 2453 testTimeout1() 2454 testTimeout2() 2455 testNoEvent() 2456 testWaitEvent1() 2457 testWaitEvent2() 2458 test_demo() 2459 test_interrupt() 2460 testwaituntil() 2461