Package SimPy :: Module SimulationStep
[hide private]
[frames] | no frames]

Source Code for Module SimPy.SimulationStep

   1  #!/usr/bin/env python 
   2  from SimPy.Lister import * 
   3  import bisect 
   4  import types 
   5  import sys 
   6  import new 
   7  import random 
   8  import inspect 
   9   
  10  # $Revision: 1.1.1.25 $ $Date: 2007/01/18 14:44:17 $ kgm 
  11  """SimulationStep 1.8 Supports stepping through SimPy simulation event-by-event. 
  12  Based on generators (Python 2.3 and later) 
  13   
  14  LICENSE: 
  15  Copyright (C) 2002,2005,2006,2007  Klaus G. Muller, Tony Vignaux 
  16  mailto: kgmuller@xs4all.nl and Tony.Vignaux@vuw.ac.nz 
  17   
  18      This library is free software; you can redistribute it and/or 
  19      modify it under the terms of the GNU Lesser General Public 
  20      License as published by the Free Software Foundation; either 
  21      version 2.1 of the License, or (at your option) any later version. 
  22   
  23      This library is distributed in the hope that it will be useful, 
  24      but WITHOUT ANY WARRANTY; without even the implied warranty of 
  25      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  26      Lesser General Public License for more details. 
  27   
  28      You should have received a copy of the GNU Lesser General Public 
  29      License along with this library; if not, write to the Free Software 
  30      Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
  31  END OF LICENSE 
  32   
  33   
  34   
  35  **Change history:** 
  36   
  37      Started out as SiPy 0.9 
  38       
  39      5/9/2002: SiPy 0.9.1 
  40       
  41          - Addition of '_cancel' method in class Process and supporting '_unpost' method in  
  42            class __Evlist. 
  43           
  44          - Removal of redundant 'Action' method in class Process. 
  45           
  46      12/9/2002: 
  47       
  48          - Addition of resource class 
  49           
  50          - Addition of "_request" and "_release" coroutine calls 
  51           
  52      15/9/2002: moved into SimPy package 
  53       
  54      16/9/2002: 
  55          - Resource attributes fully implemented (resources can now have more 
  56            than 1 shareable resource units) 
  57           
  58      17/9/2002: 
  59       
  60          - corrected removal from waitQ (Vignaux) 
  61           
  62      17/9/2002: 
  63       
  64          - added test for queue discipline in "test_demo()". Must be FIFO 
  65           
  66      26/9/02: Version 0.2.0 
  67       
  68          - cleaned up code; more consistent naming 
  69           
  70          - prefixed all Simulation-private variable names with "_". 
  71           
  72          - prefixed all class-private variable names with "__". 
  73           
  74          - made normal exit quiet (but return message from scheduler() 
  75           
  76      28/9/02: 
  77       
  78          - included stopSimulation() 
  79           
  80      15/10/02: Simulation version 0.3 
  81       
  82          - Version printout now only if __TESTING 
  83           
  84          - "_stop" initialized to True by module load, and set to False in  
  85        initialize() 
  86           
  87          - Introduced 'simulate(until=0)' instead of 'scheduler(till=0)'.  
  88        Left 'scheduler()' in for backward compatibility, but marked 
  89        as deprecated. 
  90           
  91          - Added attribute "name" to class Process; default=="a_process" 
  92           
  93          - Changed Resource constructor to  
  94        'def __init__(self,capacity=1,name="a_resource",unitName="units"'. 
  95           
  96      13/11/02: Simulation version 0.6 
  97       
  98          - Major changes to class Resource: 
  99           
 100              - Added two queue types for resources, FIFO (default) and PriorityQ 
 101               
 102              - Changed constructor to allow selection of queue type. 
 103               
 104              - Introduced preemption of resources (to be used with PriorityQ 
 105                queue type) 
 106               
 107              - Changed constructor of class Resource to allow selection of preemption 
 108               
 109              - Changes to class Process to support preemption of service 
 110               
 111              - Cleaned up 'simulate' by replacing series of if-statements by dispatch table. 
 112   
 113      19/11/02: Simulation version 0.6.1 
 114          - Changed priority schemes so that higher values of Process  
 115            attribute "priority" represent higher priority. 
 116   
 117      20/11/02: Simulation version 0.7 
 118          - Major change of priority approach: 
 119   
 120              - Priority set by "yield request,self,res,priority" 
 121   
 122              - Priority of a Process instance associated with a specific  
 123                resource 
 124   
 125      25/11/02: Simulation version 0.7.1 
 126   
 127          - Code cleanup and optimization 
 128   
 129          - Made process attributes remainService and preempted private  
 130           (_remainService and _preempted) 
 131   
 132      11/12/2002: First process interrupt implementation 
 133   
 134          - Addition of user methods 'interrupt' and 'resume' 
 135   
 136          - Significant code cleanup to maintain process state 
 137   
 138      20/12/2002: Changes to "interrupt"; addition of boolean methods to show 
 139                       process states 
 140   
 141      16/3/2003: Changed hold (allowing posting events past _endtime) 
 142       
 143      18/3/2003: Changed _nextev to prevent _t going past _endtime 
 144   
 145      23/3/2003: Introduced new interrupt construct; deleted 'resume' method 
 146       
 147      25/3/2003: Expanded interrupt construct: 
 148       
 149          - Made 'interrupt' a method  of Process 
 150   
 151          - Added 'interruptCause' as an attribute of an interrupted process 
 152   
 153          - Changed definition of 'active' to  
 154           'self._nextTime <> None and not self._inInterrupt' 
 155   
 156          - Cleaned up test_interrupt function 
 157   
 158      30/3/2003: Modification of 'simulate': 
 159   
 160          - error message if 'initialize' not called (fatal) 
 161   
 162          - error message if no process scheduled (warning) 
 163   
 164          - Ensured that upon exit from 'simulate', now() == _endtime is  
 165            always valid 
 166   
 167      2/04/2003: 
 168   
 169          - Modification of 'simulate': leave _endtime alone (undid change 
 170            of 30 Mar 03) 
 171   
 172          - faster '_unpost' 
 173   
 174      3/04/2003: Made 'priority' private ('_priority') 
 175   
 176      4/04/2003: Catch activation of non-generator error 
 177   
 178      5/04/2003: Added 'interruptReset()' function to Process. 
 179   
 180      7/04/2003: Changed '_unpost' to ensure that process has 
 181                     _nextTime == None (is passive) afterwards. 
 182   
 183      8/04/2003: Changed _hold to allow for 'yield hold,self'  
 184                     (equiv to 'yield hold,self,0') 
 185   
 186      10/04/2003: Changed 'cancel' syntax to 'Process().cancel(victim)' 
 187   
 188      12/5/2003: Changed eventlist handling from dictionary to bisect 
 189       
 190      9/6/2003: - Changed eventlist handling from pure dictionary to bisect- 
 191                  sorted "timestamps" list of keys, resulting in greatly  
 192                  improved performance for models with large 
 193                  numbers of event notices with differing event times. 
 194                  ========================================================= 
 195                  This great change was suggested by Prof. Simon Frost.  
 196                  Thank you, Simon! This version 1.3 is dedicated to you! 
 197                  ========================================================= 
 198                - Added import of Lister which supports well-structured  
 199                  printing of all attributes of Process and Resource instances. 
 200   
 201      Oct 2003: Added monitored Resource instances (Monitors for activeQ and waitQ) 
 202   
 203      13 Dec 2003: Merged in Monitor and Histogram 
 204   
 205      27 Feb 2004: Repaired bug in activeQ monitor of class Resource. Now actMon 
 206                   correctly records departures from activeQ. 
 207                    
 208      19 May 2004: Added erroneously omitted Histogram class. 
 209   
 210      5 Sep 2004: Added SimEvents synchronization constructs 
 211       
 212      17 Sep 2004: Added waituntil synchronization construct 
 213       
 214      01 Dec 2004: SimPy version 1.5 
 215                   Changes in this module: Repaired SimEvents bug re proc.eventsFired 
 216                    
 217      12 Jan 2005: SimPy version 1.5.1 
 218                   Changes in this module: Monitor objects now have a default name 
 219                                           'a_Monitor' 
 220                                            
 221      29 Mar 2005: Start SimPy 1.6: compound "yield request" statements 
 222       
 223      05 Jun 2005: Fixed bug in _request method -- waitMon did not work properly in 
 224                   preemption case 
 225                    
 226      09 Jun 2005: Added test in 'activate' to see whether 'initialize()' was called first. 
 227       
 228      23 Aug 2005: - Added Tally data collection class 
 229                   - Adjusted Resource to work with Tally 
 230                   - Redid function allEventNotices() (returns prettyprinted string with event 
 231                     times and names of process instances 
 232                   - Added function allEventTimes (returns event times of all scheduled events) 
 233                    
 234      16 Mar 2006: - Added Store and Level classes 
 235                   - Added 'yield get' and 'yield put' 
 236                    
 237      10 May 2006: - Repaired bug in Store._get method 
 238                   - Repaired Level to allow initialBuffered have float value 
 239                   - Added type test for Level get parameter 'nrToGet' 
 240                    
 241      06 Jun 2006: - To improve pretty-printed output of 'Level' objects, changed attribute 
 242                     _nrBuffered to nrBuffered (synonym for amount property) 
 243                   - To improve pretty-printed output of 'Store' objects, added attribute 
 244                     buffered (which refers to _theBuffer) 
 245                      
 246      25 Aug 2006: - Start of version 1.8 
 247                   - made 'version' public 
 248                   - corrected condQ initialization bug 
 249                    
 250      30 Sep 2006: - Introduced checks to ensure capacity of a Buffer > 0 
 251                   - Removed from __future__ import (so Python 2.3 or later needed) 
 252                   
 253      15 Oct 2006: - Added code to register all Monitors and all Tallies in variables 
 254                     'allMonitors' and 'allTallies' 
 255                   - Added function 'startCollection' to activate Monitors and Tallies at a 
 256                     specified time (e.g. after warmup period) 
 257                   - Moved all test/demo programs to after 'if __name__=="__main__":'. 
 258                   
 259      17 Oct 2006: - Added compound 'put' and 'get' statements for Level and Store. 
 260       
 261      18 Oct 2006: - Repaired bug: self.eventsFired now gets set after an event fires 
 262                     in a compound yield get/put with a waitevent clause (reneging case). 
 263                      
 264      21 Oct 2006: - Introduced Store 'yield get' with a filter function. 
 265                   
 266      22 Oct 2006: - Repaired bug in prettyprinting of Store objects (the buffer  
 267                     content==._theBuffer was not shown) by changing ._theBuffer  
 268                     to .theBuffer. 
 269                   
 270      04 Dec 2006: - Added printHistogram method to Tally and Monitor (generates 
 271                     table-form histogram) 
 272                       
 273      07 Dec 2006: - Changed the __str__ method of Histogram to print a table (like printHistogram) 
 274  """ 
 275   
 276  __TESTING=False 
 277  version=__version__="1.8 $Revision: 1.1.1.25 $ $Date: 2007/01/18 14:44:17 $" 
 278  if __TESTING:  
 279      print "SimPy.SimulationStep %s" %__version__, 
 280      if __debug__: 
 281          print "__debug__ on" 
 282      else: 
 283          print 
 284   
 285  # yield keywords 
 286  hold=1 
 287  passivate=2 
 288  request=3 
 289  release=4 
 290  waitevent=5 
 291  queueevent=6 
 292  waituntil=7 
 293  get=8 
 294  put=9 
 295   
 296  _endtime=0 
 297  _t=0 
 298  _e=None 
 299  _stop=True 
 300  _wustep=False #controls per event stepping for waituntil construct; not for user API 
 301  True=1 
 302  False=0 
 303  paused=False 
 304  _step=False 
 305  condQ=[] 
 306  allMonitors=[] 
 307  allTallies=[] 
 308   
309 -def initialize():
310 global _e,_t,_stop,condQ,allMonitors,allTallies 311 _e=__Evlist() 312 _t=0 313 _stop=False 314 condQ=[] 315 allMonitors=[] 316 allTallies=[]
317
318 -def now():
319 return _t
320
321 -def stopSimulation():
322 """Application function to stop simulation run""" 323 global _stop 324 _stop=True
325
326 -def _startWUStepping():
327 """Application function to start stepping through simulation for waituntil construct.""" 328 global _wustep 329 _wustep=True
330
331 -def _stopWUStepping():
332 """Application function to stop stepping through simulation.""" 333 global _wustep 334 _wustep=False
335
336 -def startStepping():
337 """Application function to start stepping through simulation.""" 338 global _step 339 _step=True
340
341 -def stopStepping():
342 """Application function to stop stepping through simulation.""" 343 global _step 344 _step=False
345
346 -class Simerror(Exception):
347 - def __init__(self,value):
348 self.value=value
349
350 - def __str__(self):
351 return `self.value`
352
353 -class FatalSimerror(Simerror):
354 - def __init__(self,value):
355 Simerror.__init__(self,value) 356 self.value=value
357
358 -class Process(Lister):
359 """Superclass of classes which may use generator functions"""
360 - def __init__(self,name="a_process"):
361 #the reference to this Process instances single process (==generator) 362 self._nextpoint=None 363 self.name=name 364 self._nextTime=None #next activation time 365 self._remainService=0 366 self._preempted=0 367 self._priority={} 368 self._getpriority={} 369 self._putpriority={} 370 self._terminated= False 371 self._inInterrupt= False 372 self.eventsFired=[] #events waited/queued for occurred
373
374 - def active(self):
375 return self._nextTime <> None and not self._inInterrupt
376
377 - def passive(self):
378 return self._nextTime is None and not self._terminated
379
380 - def terminated(self):
381 return self._terminated
382
383 - def interrupted(self):
384 return self._inInterrupt and not self._terminated
385
386 - def queuing(self,resource):
387 return self in resource.waitQ
388 # TODO: Review whether this should include waiting for a buffer; or throw away?
389 - def cancel(self,victim):
390 """Application function to cancel all event notices for this Process instance;(should be all event 391 notices for the _generator_).""" 392 _e._unpost(whom=victim)
393
394 - def _hold(self,a):
395 if len(a[0]) == 3: 396 delay=abs(a[0][2]) 397 else: 398 delay=0 399 who=a[1] 400 self.interruptLeft=delay 401 self._inInterrupt=False 402 self.interruptCause=None 403 _e._post(what=who,at=_t+delay)
404
405 - def _passivate(self,a):
406 a[0][1]._nextTime=None
407
408 - def interrupt(self,victim):
409 """Application function to interrupt active processes""" 410 # can't interrupt terminated/passive/interrupted process 411 if victim.active(): 412 victim.interruptCause=self # self causes interrupt 413 left=victim._nextTime-_t 414 victim.interruptLeft=left # time left in current 'hold' 415 victim._inInterrupt=True 416 reactivate(victim) 417 return left 418 else: #victim not active -- can't interrupt 419 return None
420
421 - def interruptReset(self):
422 """ 423 Application function for an interrupt victim to get out of 424 'interrupted' state. 425 """ 426 self._inInterrupt= False
427
428 - def acquired(self,res):
429 """Multi-functional test for reneging for 'request' and 'get': 430 (1)If res of type Resource: 431 Tests whether resource res was acquired when proces reactivated. 432 If yes, the parallel wakeup process is killed. 433 If not, process is removed from res.waitQ (reneging). 434 (2)If res of type Store: 435 Tests whether item(s) gotten from Store res. 436 If yes, the parallel wakeup process is killed. 437 If no, process is removed from res.getQ 438 (3)If res of type Level: 439 Tests whether units gotten from Level res. 440 If yes, the parallel wakeup process is killed. 441 If no, process is removed from res.getQ. 442 """ 443 if isinstance(res,Resource): 444 test=self in res.activeQ 445 if test: 446 self.cancel(self._holder) 447 else: 448 res.waitQ.remove(self) 449 if res.monitored: 450 res.waitMon.observe(len(res.waitQ),t=now()) 451 return test 452 elif isinstance(res,Store): 453 test=len(self.got) 454 if test: 455 self.cancel(self._holder) 456 self.cancel(self._holder) 457 else: 458 res.getQ.remove(self) 459 if res.monitored: 460 res.getQMon.observe(len(res.getQ),t=now()) 461 return test 462 elif isinstance(res,Level): 463 test=not (self.got is None) 464 if test: 465 self.cancel(self._holder) 466 else: 467 res.getQ.remove(self) 468 if res.monitored: 469 res.getQMon.observe(len(res.getQ),t=now()) 470 return test
471
472 - def stored(self,buffer):
473 """Test for reneging for 'yield put . . .' compound statement (Level and 474 Store. Returns True if not reneged. 475 If self not in buffer.putQ, kill wakeup process, else take self out of 476 buffer.putQ (reneged)""" 477 test=self in buffer.putQ 478 if test: #reneged 479 buffer.putQ.remove(self) 480 if buffer.monitored: 481 buffer.putQMon.observe(len(buffer.putQ),t=now()) 482 else: 483 self.cancel(self._holder) 484 return not test
485
486 -def allEventNotices():
487 """Returns string with eventlist as 488 t1: [procname,procname2] 489 t2: [procname4,procname5, . . . ] 490 . . . . 491 """ 492 ret="" 493 for t in _e.timestamps: 494 ret+="%s:%s\n"%(t,[ev.who.name for ev in _e.events[t]]) 495 return ret[:-1]
496
497 -def allEventTimes():
498 """Returns list of all times for which events are scheduled. 499 """ 500 return _e.timestamps
501
502 -class __Evlist:
503 """Defines event list and operations on it"""
504 - def __init__(self):
505 #has the structure {<time1>:(ev notice1, ev notice2, . . ), 506 # <time2>:(ev notice 3,...),..} 507 self.events={} 508 #always sorted list of event times (keys for self.events) 509 self.timestamps=[]
510
511 - def _post(self,what,at,prior=False):
512 """Post an event notice for process what for time at""" 513 # event notices are _Action instances 514 if at < _t: 515 raise Simerror("Attempt to schedule event in the past") 516 if at in self.events: 517 if prior: 518 #before all other event notices at this time 519 self.events[at][0:0]= [what] 520 else: 521 self.events[at].append(what) 522 else: # first event notice at this time 523 self.events[at]=[what] 524 bisect.insort(self.timestamps,at) 525 what.who._nextTime=at
526
527 - def _unpost(self,whom):
528 """ 529 Search through all event notices at whom's event time and remove whom's 530 event notice if whom is a suspended process 531 """ 532 thistime=whom._nextTime 533 if thistime is None: #check if whom was actually active 534 return 535 else: 536 thislist=self.events[thistime] 537 for n in thislist: 538 if n.who==whom: 539 self.events[thistime].remove(n) 540 whom._nextTime = None 541 if not self.events[thistime]: 542 # no other ev notices for thistime 543 del(self.events[thistime]) 544 item_delete_point = bisect.bisect(self.timestamps, 545 thistime) 546 del self.timestamps[item_delete_point-1]
547
548 - def _nextev(self):
549 """Retrieve next event from event list""" 550 global _t, _stop 551 if not self.events: raise Simerror("No more events at time %s" %now()) 552 earliest=self.timestamps[0] 553 _t=max(earliest,_t) 554 temp=self.events[earliest] #list of actions for this time _t 555 tempev=temp[0] #first action 556 del(self.events[earliest][0]) 557 if self.events[earliest]==[]: 558 del(self.events[earliest]) #delete empty list of actions 559 del(self.timestamps[0]) 560 if _t > _endtime: 561 _t = _endtime 562 _stop = True 563 return (None,) 564 tempwho=tempev.who 565 try: 566 tt=tempwho._nextpoint.next() 567 except StopIteration: 568 tempwho._nextpoint=None 569 tempwho._terminated=True 570 tempwho._nextTime=None 571 tt=None 572 tempev=tempwho 573 return tt,tempev
574
575 -class _Action:
576 """Structure (who=process owner, generator=process)"""
577 - def __init__(self,who,generator):
578 self.who=who 579 self.process=generator
580
581 - def __str__(self):
582 return "Action %s %s" %((self.who.name,self.process))
583
584 -def activate(object,process,at="undefined",delay="undefined",prior=False):
585 """Application function to activate passive process.""" 586 if _e is None: 587 raise FatalSimerror\ 588 ("Fatal error: simulation is not initialized (call initialize() first)") 589 if not (type(process) == types.GeneratorType): 590 raise FatalSimerror("Activating function which"+ 591 " is not a generator (contains no 'yield')") 592 if not object._terminated and not object._nextTime: 593 #store generator reference in object; needed for reactivation 594 object._nextpoint=process 595 if at=="undefined": 596 at=_t 597 if delay=="undefined": 598 zeit=max(_t,at) 599 else: 600 zeit=max(_t,_t+delay) 601 _e._post(_Action(who=object,generator=process),at=zeit,prior=prior)
602
603 -def reactivate(obj,at="undefined",delay="undefined",prior=False):
604 """Application function to reactivate a process which is active, 605 suspended or passive.""" 606 # Object may be active, suspended or passive 607 if not obj._terminated: 608 a=Process("SimPysystem") 609 a.cancel(obj) 610 # object now passive 611 if at=="undefined": 612 at=_t 613 if delay=="undefined": 614 zeit=max(_t,at) 615 else: 616 zeit=max(_t,_t+delay) 617 _e._post(_Action(who=obj,generator=obj._nextpoint),at=zeit, 618 prior=prior)
619
620 -class Histogram(list):
621 """ A histogram gathering and sampling class""" 622
623 - def __init__(self,name = '',low=0.0,high=100.0,nbins=10):
624 list.__init__(self) 625 self.name = name 626 self.low = float(low) 627 self.high = float(high) 628 self.nbins = nbins 629 self.binsize=(self.high-self.low)/nbins 630 self._nrObs=0 631 self._sum=0 632 self[:] =[[low+(i-1)*self.binsize,0] for i in range(self.nbins+2)]
633
634 - def addIn(self,y):
635 """ add a value into the correct bin""" 636 self._nrObs+=1 637 self._sum+=y 638 b = int((y-self.low+self.binsize)/self.binsize) 639 if b < 0: b = 0 640 if b > self.nbins+1: b = self.nbins+1 641 assert 0 <= b <=self.nbins+1,'Histogram.addIn: b out of range: %s'%b 642 self[b][1]+=1
643
644 - def __str__(self):
645 histo=self 646 ylab="value" 647 nrObs=self._nrObs 648 width=len(str(nrObs)) 649 res=[] 650 res.append("<Histogram %s:"%self.name) 651 res.append("\nNumber of observations: %s"%nrObs) 652 if nrObs: 653 su=self._sum 654 cum=histo[0][1] 655 fmt="%s" 656 line="\n%s <= %s < %s: %s (cum: %s/%s%s)"\ 657 %(fmt,"%s",fmt,"%s","%s","%5.1f","%s") 658 line1="\n%s%s < %s: %s (cum: %s/%s%s)"\ 659 %("%s","%s",fmt,"%s","%s","%5.1f","%s") 660 l1width=len(("%s <= "%fmt)%histo[1][0]) 661 res.append(line1\ 662 %(" "*l1width,ylab,histo[1][0],str(histo[0][1]).rjust(width),\ 663 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 664 ) 665 for i in range(1,len(histo)-1): 666 cum+=histo[i][1] 667 res.append(line\ 668 %(histo[i][0],ylab,histo[i+1][0],str(histo[i][1]).rjust(width),\ 669 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 670 ) 671 cum+=histo[-1][1] 672 linen="\n%s <= %s %s : %s (cum: %s/%s%s)"\ 673 %(fmt,"%s","%s","%s","%s","%5.1f","%s") 674 lnwidth=len(("<%s"%fmt)%histo[1][0]) 675 res.append(linen\ 676 %(histo[-1][0],ylab," "*lnwidth,str(histo[-1][1]).rjust(width),\ 677 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 678 ) 679 res.append("\n>") 680 return " ".join(res)
681
682 -def startCollection(when=0.0,monitors=None,tallies=None):
683 """Starts data collection of all designated Monitor and Tally objects 684 (default=all) at time 'when'. 685 """ 686 class Starter(Process): 687 def collect(self,monitors,tallies): 688 for m in monitors: 689 print m.name 690 m.reset() 691 for t in tallies: 692 t.reset() 693 yield hold,self
694 if monitors is None: 695 monitors=allMonitors 696 if tallies is None: 697 tallies=allTallies 698 s=Starter() 699 activate(s,s.collect(monitors=monitors,tallies=tallies),at=when) 700
701 -class Monitor(list):
702 """ Monitored variables 703 704 A Class for monitored variables, that is, variables that allow one 705 to gather simple statistics. A Monitor is a subclass of list and 706 list operations can be performed on it. An object is established 707 using m= Monitor(name = '..'). It can be given a 708 unique name for use in debugging and in tracing and ylab and tlab 709 strings for labelling graphs. 710 """
711 - def __init__(self,name='a_Monitor',ylab='y',tlab='t'):
712 list.__init__(self) 713 self.startTime = 0.0 714 self.name = name 715 self.ylab = ylab 716 self.tlab = tlab 717 allMonitors.append(self)
718
719 - def setHistogram(self,name = '',low=0.0,high=100.0,nbins=10):
720 """Sets histogram parameters. 721 Must be called before call to getHistogram""" 722 if name=='': 723 histname=self.name 724 else: 725 histname=name 726 self.histo=Histogram(name=histname,low=low,high=high,nbins=nbins)
727
728 - def observe(self,y,t=None):
729 """record y and t""" 730 if t is None: t = now() 731 self.append([t,y])
732
733 - def tally(self,y):
734 """ deprecated: tally for backward compatibility""" 735 self.observe(y,0)
736
737 - def accum(self,y,t=None):
738 """ deprecated: accum for backward compatibility""" 739 self.observe(y,t)
740
741 - def reset(self,t=None):
742 """reset the sums and counts for the monitored variable """ 743 self[:]=[] 744 if t is None: t = now() 745 self.startTime = t
746
747 - def tseries(self):
748 """ the series of measured times""" 749 return list(zip(*self)[0])
750
751 - def yseries(self):
752 """ the series of measured values""" 753 return list(zip(*self)[1])
754
755 - def count(self):
756 """ deprecated: the number of observations made """ 757 return self.__len__()
758
759 - def total(self):
760 """ the sum of the y""" 761 if self.__len__()==0: return 0 762 else: 763 sum = 0.0 764 for i in range(self.__len__()): 765 sum += self[i][1] 766 return sum # replace by sum() later
767
768 - def mean(self):
769 """ the simple average of the monitored variable""" 770 try: return 1.0*self.total()/self.__len__() 771 except: print 'SimPy: No observations for mean'
772
773 - def var(self):
774 """ the sample variance of the monitored variable """ 775 n = len(self) 776 tot = self.total() 777 ssq=0.0 778 ##yy = self.yseries() 779 for i in range(self.__len__()): 780 ssq += self[i][1]**2 # replace by sum() eventually 781 try: return (ssq - float(tot*tot)/n)/n 782 except: print 'SimPy: No observations for sample variance'
783
784 - def timeAverage(self,t=None):
785 """ the time-average of the monitored variable. 786 787 If t is used it is assumed to be the current time, 788 otherwise t = now() 789 """ 790 N = self.__len__() 791 if N == 0: 792 print 'SimPy: No observations for timeAverage' 793 return None 794 795 if t is None: t = now() 796 sum = 0.0 797 tlast = self.startTime 798 #print 'DEBUG: timave ',t,tlast 799 ylast = 0.0 800 for i in range(N): 801 ti,yi = self[i] 802 sum += ylast*(ti-tlast) 803 tlast = ti 804 ylast = yi 805 sum += ylast*(t-tlast) 806 T = t - self.startTime 807 if T == 0: 808 print 'SimPy: No elapsed time for timeAverage' 809 return None 810 #print 'DEBUG: timave ',sum,t,T 811 return sum/float(T)
812
813 - def histogram(self,low=0.0,high=100.0,nbins=10):
814 """ A histogram of the monitored y data values. 815 """ 816 h = Histogram(name=self.name,low=low,high=high,nbins=nbins) 817 ys = self.yseries() 818 for y in ys: h.addIn(y) 819 return h
820
821 - def getHistogram(self):
822 """Returns a histogram based on the parameters provided in 823 preceding call to setHistogram. 824 """ 825 ys = self.yseries() 826 h=self.histo 827 for y in ys: h.addIn(y) 828 return h
829
830 - def printHistogram(self,fmt="%s"):
831 """Returns formatted frequency distribution table string from Monitor. 832 Precondition: setHistogram must have been called. 833 fmt==format of bin range values 834 """ 835 try: 836 histo=self.getHistogram() 837 except: 838 raise FatalSimerror("histogramTable: call setHistogram first"\ 839 " for Monitor %s"%self.name) 840 ylab=self.ylab 841 nrObs=self.count() 842 width=len(str(nrObs)) 843 res=[] 844 res.append("\nHistogram for %s:"%histo.name) 845 res.append("\nNumber of observations: %s"%nrObs) 846 su=sum(self.yseries()) 847 cum=histo[0][1] 848 line="\n%s <= %s < %s: %s (cum: %s/%s%s)"\ 849 %(fmt,"%s",fmt,"%s","%s","%5.1f","%s") 850 line1="\n%s%s < %s: %s (cum: %s/%s%s)"\ 851 %("%s","%s",fmt,"%s","%s","%5.1f","%s") 852 l1width=len(("%s <= "%fmt)%histo[1][0]) 853 res.append(line1\ 854 %(" "*l1width,ylab,histo[1][0],str(histo[0][1]).rjust(width),\ 855 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 856 ) 857 for i in range(1,len(histo)-1): 858 cum+=histo[i][1] 859 res.append(line\ 860 %(histo[i][0],ylab,histo[i+1][0],str(histo[i][1]).rjust(width),\ 861 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 862 ) 863 cum+=histo[-1][1] 864 linen="\n%s <= %s %s : %s (cum: %s/%s%s)"\ 865 %(fmt,"%s","%s","%s","%s","%5.1f","%s") 866 lnwidth=len(("<%s"%fmt)%histo[1][0]) 867 res.append(linen\ 868 %(histo[-1][0],ylab," "*lnwidth,str(histo[-1][1]).rjust(width),\ 869 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 870 ) 871 return " ".join(res)
872
873 -class Tally:
874 - def __init__(self, name="a_Tally", ylab="y",tlab="t"):
875 self.name = name 876 self.ylab = ylab 877 self.tlab = tlab 878 self.reset() 879 self.startTime=0.0 880 self.histo=None 881 self._sum_of_squares=0 882 allTallies.append(self)
883
884 - def setHistogram(self,name = '',low=0.0,high=100.0,nbins=10):
885 """Sets histogram parameters. 886 Must be called to prior to observations initiate data collection 887 for histogram. 888 """ 889 if name=='': 890 hname=self.name 891 else: 892 hname=name 893 self.histo=Histogram(name=hname,low=low,high=high,nbins=nbins)
894
895 - def observe(self, y, t=None):
896 if t is None: 897 t = now() 898 self._total += y 899 self._count += 1 900 self._integral += (t - self._last_timestamp) * self._last_observation 901 self._last_timestamp = t 902 self._last_observation = y 903 self._sum += y 904 self._sum_of_squares += y * y 905 if self.histo: 906 self.histo.addIn(y)
907
908 - def reset(self, t=None):
909 if t is None: 910 t = now() 911 self.startTime = t 912 self._last_timestamp = t 913 self._last_observation = 0.0 914 self._count = 0 915 self._total = 0.0 916 self._integral = 0.0 917 self._sum = 0.0 918 self._sum_of_squares = 0.0
919
920 - def count(self):
921 return self._count
922
923 - def total(self):
924 return self._total
925
926 - def mean(self):
927 return 1.0 * self._total / self._count
928
929 - def timeAverage(self,t=None):
930 if t is None: 931 t=now() 932 integ=self._integral+(t - self._last_timestamp) * self._last_observation 933 if (t > self.startTime): 934 return 1.0 * integ/(t - self.startTime) 935 else: 936 print 'SimPy: No elapsed time for timeAverage' 937 return None
938
939 - def var(self):
940 return 1.0 * (self._sum_of_squares - (1.0 * (self._sum * self._sum)\ 941 / self._count)) / (self._count)
942
943 - def __len__(self):
944 return self._count
945
946 - def __eq__(self, l):
947 return len(l) == self._count
948
949 - def getHistogram(self):
950 return self.histo
951
952 - def printHistogram(self,fmt="%s"):
953 """Returns formatted frequency distribution table string from Tally. 954 Precondition: setHistogram must have been called. 955 fmt==format of bin range values 956 """ 957 try: 958 histo=self.getHistogram() 959 except: 960 raise FatalSimerror("histogramTable: call setHistogram first"\ 961 " for Tally %s"%self.name) 962 ylab=self.ylab 963 nrObs=self.count() 964 width=len(str(nrObs)) 965 res=[] 966 res.append("\nHistogram for %s:"%histo.name) 967 res.append("\nNumber of observations: %s"%nrObs) 968 su=self.total() 969 cum=histo[0][1] 970 line="\n%s <= %s < %s: %s (cum: %s/%s%s)"\ 971 %(fmt,"%s",fmt,"%s","%s","%5.1f","%s") 972 line1="\n%s%s < %s: %s (cum: %s/%s%s)"\ 973 %("%s","%s",fmt,"%s","%s","%5.1f","%s") 974 l1width=len(("%s <= "%fmt)%histo[1][0]) 975 res.append(line1\ 976 %(" "*l1width,ylab,histo[1][0],str(histo[0][1]).rjust(width),\ 977 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 978 ) 979 for i in range(1,len(histo)-1): 980 cum+=histo[i][1] 981 res.append(line\ 982 %(histo[i][0],ylab,histo[i+1][0],str(histo[i][1]).rjust(width),\ 983 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 984 ) 985 cum+=histo[-1][1] 986 linen="\n%s <= %s %s : %s (cum: %s/%s%s)"\ 987 %(fmt,"%s","%s","%s","%s","%5.1f","%s") 988 lnwidth=len(("<%s"%fmt)%histo[1][0]) 989 res.append(linen\ 990 %(histo[-1][0],ylab," "*lnwidth,str(histo[-1][1]).rjust(width),\ 991 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 992 ) 993 return " ".join(res)
994
995 -class Queue(list):
996 - def __init__(self,res,moni):
997 if not moni is None: #moni==[]: 998 self.monit=True # True if a type of Monitor/Tally attached 999 else: 1000 self.monit=False 1001 self.moni=moni # The Monitor/Tally 1002 self.resource=res # the resource/buffer this queue belongs to
1003
1004 - def enter(self,obj):
1005 pass
1006
1007 - def leave(self):
1008 pass
1009
1010 - def takeout(self,obj):
1011 self.remove(obj) 1012 if self.monit: 1013 self.moni.observe(len(self),t=now())
1014
1015 -class FIFO(Queue):
1016 - def __init__(self,res,moni):
1017 Queue.__init__(self,res,moni)
1018
1019 - def enter(self,obj):
1020 self.append(obj) 1021 if self.monit: 1022 self.moni.observe(len(self),t=now())
1023
1024 - def enterGet(self,obj):
1025 self.enter(obj)
1026
1027 - def enterPut(self,obj):
1028 self.enter(obj)
1029
1030 - def leave(self):
1031 a= self.pop(0) 1032 if self.monit: 1033 self.moni.observe(len(self),t=now()) 1034 return a
1035
1036 -class PriorityQ(FIFO):
1037 """Queue is always ordered according to priority. 1038 Higher value of priority attribute == higher priority. 1039 """
1040 - def __init__(self,res,moni):
1041 FIFO.__init__(self,res,moni)
1042
1043 - def enter(self,obj):
1044 """Handles request queue for Resource""" 1045 if len(self): 1046 ix=self.resource 1047 if self[-1]._priority[ix] >= obj._priority[ix]: 1048 self.append(obj) 1049 else: 1050 z=0 1051 while self[z]._priority[ix] >= obj._priority[ix]: 1052 z += 1 1053 self.insert(z,obj) 1054 else: 1055 self.append(obj) 1056 if self.monit: 1057 self.moni.observe(len(self),t=now())
1058
1059 - def enterGet(self,obj):
1060 """Handles getQ in Buffer""" 1061 if len(self): 1062 ix=self.resource 1063 #print "priority:",[x._priority[ix] for x in self] 1064 if self[-1]._getpriority[ix] >= obj._getpriority[ix]: 1065 self.append(obj) 1066 else: 1067 z=0 1068 while self[z]._getpriority[ix] >= obj._getpriority[ix]: 1069 z += 1 1070 self.insert(z,obj) 1071 else: 1072 self.append(obj) 1073 if self.monit: 1074 self.moni.observe(len(self),t=now())
1075
1076 - def enterPut(self,obj):
1077 """Handles putQ in Buffer""" 1078 if len(self): 1079 ix=self.resource 1080 #print "priority:",[x._priority[ix] for x in self] 1081 if self[-1]._putpriority[ix] >= obj._putpriority[ix]: 1082 self.append(obj) 1083 else: 1084 z=0 1085 while self[z]._putpriority[ix] >= obj._putpriority[ix]: 1086 z += 1 1087 self.insert(z,obj) 1088 else: 1089 self.append(obj) 1090 if self.monit: 1091 self.moni.observe(len(self),t=now())
1092
1093 -class Resource(Lister):
1094 """Models shared, limited capacity resources with queuing; 1095 FIFO is default queuing discipline. 1096 """ 1097
1098 - def __init__(self,capacity=1,name="a_resource",unitName="units", 1099 qType=FIFO,preemptable=0,monitored=False,monitorType=Monitor):
1100 """ 1101 monitorType={Monitor(default)|Tally} 1102 """ 1103 self.name=name # resource name 1104 self.capacity=capacity # resource units in this resource 1105 self.unitName=unitName # type name of resource units 1106 self.n=capacity # uncommitted resource units 1107 self.monitored=monitored 1108 1109 if self.monitored: # Monitor waitQ, activeQ 1110 self.actMon=monitorType(name="Active Queue Monitor %s"%self.name, 1111 ylab="nr in queue",tlab="time") 1112 monact=self.actMon 1113 self.waitMon=monitorType(name="Wait Queue Monitor %s"%self.name, 1114 ylab="nr in queue",tlab="time") 1115 monwait=self.waitMon 1116 else: 1117 monwait=None 1118 monact=None 1119 self.waitQ=qType(self,monwait) 1120 self.preemptable=preemptable 1121 self.activeQ=qType(self,monact) 1122 self.priority_default=0
1123
1124 - def _request(self,arg):
1125 """Process request event for this resource""" 1126 object=arg[1].who 1127 if len(arg[0]) == 4: # yield request,self,resource,priority 1128 object._priority[self]=arg[0][3] 1129 else: # yield request,self,resource 1130 object._priority[self]=self.priority_default 1131 if self.preemptable and self.n == 0: # No free resource 1132 # test for preemption condition 1133 preempt=object._priority[self] > self.activeQ[-1]._priority[self] 1134 # If yes: 1135 if preempt: 1136 z=self.activeQ[-1] 1137 # suspend lowest priority process being served 1138 ##suspended = z 1139 # record remaining service time 1140 z._remainService = z._nextTime - _t 1141 Process().cancel(z) 1142 # remove from activeQ 1143 self.activeQ.remove(z) 1144 # put into front of waitQ 1145 self.waitQ.insert(0,z) 1146 # if self is monitored, update waitQ monitor 1147 if self.monitored: 1148 self.waitMon.observe(len(self.waitQ),now()) 1149 # record that it has been preempted 1150 z._preempted = 1 1151 # passivate re-queued process 1152 z._nextTime=None 1153 # assign resource unit to preemptor 1154 self.activeQ.enter(object) 1155 # post event notice for preempting process 1156 _e._post(_Action(who=object,generator=object._nextpoint), 1157 at=_t,prior=1) 1158 else: 1159 self.waitQ.enter(object) 1160 # passivate queuing process 1161 object._nextTime=None 1162 else: # treat non-preemption case 1163 if self.n == 0: 1164 self.waitQ.enter(object) 1165 # passivate queuing process 1166 object._nextTime=None 1167 else: 1168 self.n -= 1 1169 self.activeQ.enter(object) 1170 _e._post(_Action(who=object,generator=object._nextpoint), 1171 at=_t,prior=1)
1172
1173 - def _release(self,arg):
1174 """Process release request for this resource""" 1175 self.n += 1 1176 self.activeQ.remove(arg[1].who) 1177 if self.monitored: 1178 self.actMon.observe(len(self.activeQ),t=now()) 1179 #reactivate first waiting requestor if any; assign Resource to it 1180 if self.waitQ: 1181 object=self.waitQ.leave() 1182 self.n -= 1 #assign 1 resource unit to object 1183 self.activeQ.enter(object) 1184 # if resource preemptable: 1185 if self.preemptable: 1186 # if object had been preempted: 1187 if object._preempted: 1188 object._preempted = 0 1189 # reactivate object delay= remaining service time 1190 reactivate(object,delay=object._remainService) 1191 # else reactivate right away 1192 else: 1193 reactivate(object,delay=0,prior=1) 1194 # else: 1195 else: 1196 reactivate(object,delay=0,prior=1) 1197 _e._post(_Action(who=arg[1].who,generator=arg[1].who._nextpoint), 1198 at=_t,prior=1)
1199
1200 -class Buffer(Lister):
1201 """Abstract class for buffers 1202 Blocks a process when a put would cause buffer overflow or a get would cause 1203 buffer underflow. 1204 Default queuing discipline for blocked processes is FIFO.""" 1205 1206 priorityDefault=0
1207 - def __init__(self,name=None,capacity="unbounded",unitName="units", 1208 putQType=FIFO,getQType=FIFO, 1209 monitored=False,monitorType=Monitor,initialBuffered=None):
1210 if capacity=="unbounded": capacity=sys.maxint 1211 self.capacity=capacity 1212 self.name=name 1213 self.putQType=putQType 1214 self.getQType=getQType 1215 self.monitored=monitored 1216 self.initialBuffered=initialBuffered 1217 self.unitName=unitName 1218 if self.monitored: 1219 ## monitor for Producer processes' queue 1220 self.putQMon=monitorType(name="Producer Queue Monitor %s"%self.name, 1221 ylab="nr in queue",tlab="time") 1222 ## monitor for Consumer processes' queue 1223 self.getQMon=monitorType(name="Consumer Queue Monitor %s"%self.name, 1224 ylab="nr in queue",tlab="time") 1225 ## monitor for nr items in buffer 1226 self.bufferMon=monitorType(name="Buffer Monitor %s"%self.name, 1227 ylab="nr in buffer",tlab="time") 1228 else: 1229 self.putQMon=None 1230 self.getQMon=None 1231 self.bufferMon=None 1232 self.putQ=self.putQType(res=self,moni=self.putQMon) 1233 self.getQ=self.getQType(res=self,moni=self.getQMon) 1234 if self.monitored: 1235 self.putQMon.observe(y=len(self.putQ),t=now()) 1236 self.getQMon.observe(y=len(self.getQ),t=now()) 1237 self._putpriority={} 1238 self._getpriority={} 1239 1240 def _put(self): 1241 pass
1242 def _get(self): 1243 pass
1244
1245 -class Level(Buffer):
1246 """Models buffers for processes putting/getting un-distinguishable items. 1247 """
1248 - def getamount(self):
1249 return self.nrBuffered
1250
1251 - def gettheBuffer(self):
1252 return self.nrBuffered
1253 1254 theBuffer=property(gettheBuffer) 1255
1256 - def __init__(self,**pars):
1257 Buffer.__init__(self,**pars) 1258 if self.name is None: 1259 self.name="a_level" ## default name 1260 1261 if (type(self.capacity)!=type(1.0) and\ 1262 type(self.capacity)!=type(1)) or\ 1263 self.capacity<=0: 1264 raise FatalSimerror\ 1265 ("Level: capacity parameter not a positive number > 0: %s"\ 1266 %self.initialBuffered) 1267 1268 if type(self.initialBuffered)==type(1.0) or\ 1269 type(self.initialBuffered)==type(1): 1270 if self.initialBuffered>self.capacity: 1271 raise FatalSimerror("initialBuffered exceeds capacity") 1272 if self.initialBuffered>=0: 1273 self.nrBuffered=self.initialBuffered ## nr items initially in buffer 1274 ## buffer is just a counter (int type) 1275 else: 1276 raise FatalSimerror\ 1277 ("initialBuffered param of Level negative: %s"\ 1278 %self.initialBuffered) 1279 elif self.initialBuffered is None: 1280 self.initialBuffered=0 1281 self.nrBuffered=0 1282 else: 1283 raise FatalSimerror\ 1284 ("Level: wrong type of initialBuffered (parameter=%s)"\ 1285 %self.initialBuffered) 1286 if self.monitored: 1287 self.bufferMon.observe(y=self.amount,t=now())
1288 amount=property(getamount) 1289
1290 - def _put(self,arg):
1291 """Handles put requests for Level instances""" 1292 obj=arg[1].who 1293 if len(arg[0]) == 5: # yield put,self,buff,whattoput,priority 1294 obj._putpriority[self]=arg[0][4] 1295 whatToPut=arg[0][3] 1296 elif len(arg[0]) == 4: # yield get,self,buff,whattoput 1297 obj._putpriority[self]=Buffer.priorityDefault #default 1298 whatToPut=arg[0][3] 1299 else: # yield get,self,buff 1300 obj._putpriority[self]=Buffer.priorityDefault #default 1301 whatToPut=1 1302 if type(whatToPut)!=type(1) and type(whatToPut)!=type(1.0): 1303 raise FatalSimerror("Level: put parameter not a number") 1304 if not whatToPut>=0.0: 1305 raise FatalSimerror("Level: put parameter not positive number") 1306 whatToPutNr=whatToPut 1307 if whatToPutNr+self.amount>self.capacity: 1308 obj._nextTime=None #passivate put requestor 1309 obj._whatToPut=whatToPutNr 1310 self.putQ.enterPut(obj) #and queue, with size of put 1311 else: 1312 self.nrBuffered+=whatToPutNr 1313 if self.monitored: 1314 self.bufferMon.observe(y=self.amount,t=now()) 1315 # service any getters waiting 1316 # service in queue-order; do not serve second in queue before first 1317 # has been served 1318 while len(self.getQ) and self.amount>0: 1319 proc=self.getQ[0] 1320 if proc._nrToGet<=self.amount: 1321 proc.got=proc._nrToGet 1322 self.nrBuffered-=proc.got 1323 if self.monitored: 1324 self.bufferMon.observe(y=self.amount,t=now()) 1325 self.getQ.takeout(proc) # get requestor's record out of queue 1326 _e._post(_Action(who=proc,generator=proc._nextpoint), 1327 at=_t) # continue a blocked get requestor 1328 else: 1329 break 1330 _e._post(_Action(who=obj,generator=obj._nextpoint), 1331 at=_t,prior=1) # continue the put requestor
1332
1333 - def _get(self,arg):
1334 """Handles get requests for Level instances""" 1335 obj=arg[1].who 1336 obj.got=None 1337 if len(arg[0]) == 5: # yield get,self,buff,whattoget,priority 1338 obj._getpriority[self]=arg[0][4] 1339 nrToGet=arg[0][3] 1340 elif len(arg[0]) == 4: # yield get,self,buff,whattoget 1341 obj._getpriority[self]=Buffer.priorityDefault #default 1342 nrToGet=arg[0][3] 1343 else: # yield get,self,buff 1344 obj._getpriority[self]=Buffer.priorityDefault 1345 nrToGet=1 1346 if type(nrToGet)!=type(1.0) and type(nrToGet)!=type(1): 1347 raise FatalSimerror\ 1348 ("Level: get parameter not a number: %s"%nrToGet) 1349 if nrToGet<0: 1350 raise FatalSimerror\ 1351 ("Level: get parameter not positive number: %s"%nrToGet) 1352 if self.amount < nrToGet: 1353 obj._nrToGet=nrToGet 1354 self.getQ.enterGet(obj) 1355 # passivate queuing process 1356 obj._nextTime=None 1357 else: 1358 obj.got=nrToGet 1359 self.nrBuffered-=nrToGet 1360 if self.monitored: 1361 self.bufferMon.observe(y=self.amount,t=now()) 1362 _e._post(_Action(who=obj,generator=obj._nextpoint), 1363 at=_t,prior=1) 1364 # reactivate any put requestors for which space is now available 1365 # service in queue-order; do not serve second in queue before first 1366 # has been served 1367 while len(self.putQ): #test for queued producers 1368 proc=self.putQ[0] 1369 if proc._whatToPut+self.amount<=self.capacity: 1370 self.nrBuffered+=proc._whatToPut 1371 if self.monitored: 1372 self.bufferMon.observe(y=self.amount,t=now()) 1373 self.putQ.takeout(proc)#requestor's record out of queue 1374 _e._post(_Action(who=proc,generator=proc._nextpoint), 1375 at=_t) # continue a blocked put requestor 1376 else: 1377 break
1378
1379 -class Store(Buffer):
1380 """Models buffers for processes coupled by putting/getting distinguishable 1381 items. 1382 Blocks a process when a put would cause buffer overflow or a get would cause 1383 buffer underflow. 1384 Default queuing discipline for blocked processes is priority FIFO. 1385 """
1386 - def getnrBuffered(self):
1387 return len(self.theBuffer)
1388 nrBuffered=property(getnrBuffered) 1389
1390 - def getbuffered(self):
1391 return self.theBuffer
1392 buffered=property(getbuffered) 1393
1394 - def __init__(self,**pars):
1395 Buffer.__init__(self,**pars) 1396 self.theBuffer=[] 1397 if self.name is None: 1398 self.name="a_store" ## default name 1399 if type(self.capacity)!=type(1) or self.capacity<=0: 1400 raise FatalSimerror\ 1401 ("Store: capacity parameter not a positive integer > 0: %s"\ 1402 %self.initialBuffered) 1403 if type(self.initialBuffered)==type([]): 1404 if len(self.initialBuffered)>self.capacity: 1405 raise FatalSimerror("initialBuffered exceeds capacity") 1406 else: 1407 self.theBuffer[:]=self.initialBuffered##buffer==list of objects 1408 elif self.initialBuffered is None: 1409 self.theBuffer=[] 1410 else: 1411 raise FatalSimerror\ 1412 ("Store: initialBuffered not a list") 1413 if self.monitored: 1414 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1415 self._sort=None
1416 1417 1418
1419 - def addSort(self,sortFunc):
1420 """Adds buffer sorting to this instance of Store. It maintains 1421 theBuffer sorted by the sortAttr attribute of the objects in the 1422 buffer. 1423 The user-provided 'sortFunc' must look like this: 1424 1425 def mySort(self,par): 1426 tmplist=[(x.sortAttr,x) for x in par] 1427 tmplist.sort() 1428 return [x for (key,x) in tmplist] 1429 1430 """ 1431 1432 self._sort=new.instancemethod(sortFunc,self,self.__class__) 1433 self.theBuffer=self._sort(self.theBuffer)
1434
1435 - def _put(self,arg):
1436 """Handles put requests for Store instances""" 1437 obj=arg[1].who 1438 if len(arg[0]) == 5: # yield put,self,buff,whattoput,priority 1439 obj._putpriority[self]=arg[0][4] 1440 whatToPut=arg[0][3] 1441 elif len(arg[0]) == 4: # yield put,self,buff,whattoput 1442 obj._putpriority[self]=Buffer.priorityDefault #default 1443 whatToPut=arg[0][3] 1444 else: # error, whattoput missing 1445 raise FatalSimerror("Item to put missing in yield put stmt") 1446 if type(whatToPut)!=type([]): 1447 raise FatalSimerror("put parameter is not a list") 1448 whatToPutNr=len(whatToPut) 1449 if whatToPutNr+self.nrBuffered>self.capacity: 1450 obj._nextTime=None #passivate put requestor 1451 obj._whatToPut=whatToPut 1452 self.putQ.enterPut(obj) #and queue, with items to put 1453 else: 1454 self.theBuffer.extend(whatToPut) 1455 if not(self._sort is None): 1456 self.theBuffer=self._sort(self.theBuffer) 1457 if self.monitored: 1458 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1459 _e._post(_Action(who=obj,generator=obj._nextpoint), 1460 at=_t,prior=1) # continue the put requestor 1461 1462 # service any waiting getters 1463 # service in queue order: do not serve second in queue before first 1464 # has been served 1465 block=False 1466 buffQ=self.getQ 1467 for getter in buffQ: 1468 if self.nrBuffered>0 and len(self.getQ): 1469 proc=getter 1470 if inspect.isfunction(proc._nrToGet): 1471 movCand=proc._nrToGet(self.theBuffer) #predicate parameter 1472 if movCand: 1473 proc.got=movCand[:] 1474 for i in movCand: 1475 self.theBuffer.remove(i) 1476 self.getQ.takeout(proc) 1477 if self.monitored: 1478 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1479 _e._post(_Action(who=proc,generator=proc._nextpoint), 1480 at=_t) # continue a blocked get requestor 1481 else: #numerical parameter 1482 if not block and proc._nrToGet<=self.nrBuffered: 1483 nrToGet=proc._nrToGet 1484 proc.got=[] 1485 proc.got[:]=self.theBuffer[0:nrToGet] 1486 self.theBuffer[:]=self.theBuffer[nrToGet:] 1487 if self.monitored: 1488 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1489 # take this get requestor's record out of queue: 1490 self.getQ.takeout(proc) 1491 _e._post(_Action(who=proc,generator=proc._nextpoint), 1492 at=_t) # continue a blocked get requestor 1493 else: 1494 # block subsequent numerically specified get's in getQ 1495 # to prevent starvation of larger gets by smaller ones 1496 block=True 1497 else: 1498 break # either out of items in buffer or out of getters in getQ
1499
1500 - def _get(self,arg):
1501 """Handles get requests""" 1502 filtfunc=None 1503 obj=arg[1].who 1504 obj.got=[] # the list of items retrieved by 'get' 1505 if len(arg[0]) == 5: # yield get,self,buff,whattoget,priority 1506 obj._getpriority[self]=arg[0][4] 1507 if inspect.isfunction(arg[0][3]): 1508 filtfunc=arg[0][3] 1509 else: 1510 nrToGet=arg[0][3] 1511 elif len(arg[0]) == 4: # yield get,self,buff,whattoget 1512 obj._getpriority[self]=Buffer.priorityDefault #default 1513 if inspect.isfunction(arg[0][3]): 1514 filtfunc=arg[0][3] 1515 else: 1516 nrToGet=arg[0][3] 1517 else: # yield get,self,buff 1518 obj._getpriority[self]=Buffer.priorityDefault 1519 nrToGet=1 1520 if not filtfunc: #number specifies nr items to get 1521 if nrToGet<0: 1522 raise FatalSimerror\ 1523 ("Store: get parameter not positive number: %s"%nrToGet) 1524 if self.nrBuffered < nrToGet: 1525 obj._nrToGet=nrToGet 1526 self.getQ.enterGet(obj) 1527 # passivate/block queuing 'get' process 1528 obj._nextTime=None 1529 else: 1530 for i in range(nrToGet): 1531 obj.got.append(self.theBuffer.pop(0)) # move items from 1532 # buffer to requesting process 1533 if self.monitored: 1534 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1535 _e._post(_Action(who=obj,generator=obj._nextpoint), 1536 at=_t,prior=1) 1537 # reactivate any put requestors for which space is now available 1538 # serve in queue order: do not serve second in queue before first 1539 # has been served 1540 while len(self.putQ): 1541 proc=self.putQ[0] 1542 if len(proc._whatToPut)+self.nrBuffered<=self.capacity: 1543 for i in proc._whatToPut: 1544 self.theBuffer.append(i) #move items to buffer 1545 if not(self._sort is None): 1546 self.theBuffer=self._sort(self.theBuffer) 1547 if self.monitored: 1548 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1549 self.putQ.takeout(proc) # dequeue requestor's record 1550 _e._post(_Action(who=proc,generator=proc._nextpoint), 1551 at=_t) # continue a blocked put requestor 1552 else: 1553 break 1554 else: # items to get determined by filtfunc 1555 movCand=filtfunc(self.theBuffer) 1556 if movCand: # get succeded 1557 _e._post(_Action(who=obj,generator=obj._nextpoint), 1558 at=_t,prior=1) 1559 obj.got=movCand[:] 1560 for item in movCand: 1561 self.theBuffer.remove(item) 1562 if self.monitored: 1563 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1564 # reactivate any put requestors for which space is now available 1565 # serve in queue order: do not serve second in queue before first 1566 # has been served 1567 while len(self.putQ): 1568 proc=self.putQ[0] 1569 if len(proc._whatToPut)+self.nrBuffered<=self.capacity: 1570 for i in proc._whatToPut: 1571 self.theBuffer.append(i) #move items to buffer 1572 if not(self._sort is None): 1573 self.theBuffer=self._sort(self.theBuffer) 1574 if self.monitored: 1575 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1576 self.putQ.takeout(proc) # dequeue requestor's record 1577 _e._post(_Action(who=proc,generator=proc._nextpoint), 1578 at=_t) # continue a blocked put requestor 1579 else: 1580 break 1581 else: # get did not succeed, block 1582 obj._nrToGet=filtfunc 1583 self.getQ.enterGet(obj) 1584 # passivate/block queuing 'get' process 1585 obj._nextTime=None
1586 1587 1588 1589
1590 -class SimEvent(Lister):
1591 """Supports one-shot signalling between processes. All processes waiting for an event to occur 1592 get activated when its occurrence is signalled. From the processes queuing for an event, only 1593 the first gets activated. 1594 """
1595 - def __init__(self,name="a_SimEvent"):
1596 self.name=name 1597 self.waits=[] 1598 self.queues=[] 1599 self.occurred=False 1600 self.signalparam=None
1601
1602 - def signal(self,param=None):
1603 """Produces a signal to self; 1604 Fires this event (makes it occur). 1605 Reactivates ALL processes waiting for this event. (Cleanup waits lists 1606 of other events if wait was for an event-group (OR).) 1607 Reactivates the first process for which event(s) it is queuing for 1608 have fired. (Cleanup queues of other events if wait was for an event-group (OR).) 1609 """ 1610 self.signalparam=param 1611 if not self.waits and not self.queues: 1612 self.occurred=True 1613 else: 1614 #reactivate all waiting processes 1615 for p in self.waits: 1616 p[0].eventsFired.append(self) 1617 reactivate(p[0],prior=True) 1618 #delete waits entries for this process in other events 1619 for ev in p[1]: 1620 if ev!=self: 1621 if ev.occurred: 1622 p[0].eventsFired.append(ev) 1623 for iev in ev.waits: 1624 if iev[0]==p[0]: 1625 ev.waits.remove(iev) 1626 break 1627 self.waits=[] 1628 if self.queues: 1629 proc=self.queues.pop(0)[0] 1630 proc.eventsFired.append(self) 1631 reactivate(proc,prior=True)
1632
1633 - def _wait(self,par):
1634 """Consumes a signal if it has occurred, otherwise process 'proc' 1635 waits for this event. 1636 """ 1637 proc=par[0][1] #the process issuing the yield waitevent command 1638 proc.eventsFired=[] 1639 if not self.occurred: 1640 self.waits.append([proc,[self]]) 1641 proc._nextTime=None #passivate calling process 1642 else: 1643 proc.eventsFired.append(self) 1644 self.occurred=False 1645 _e._post(_Action(who=proc,generator=proc._nextpoint), 1646 at=_t,prior=1)
1647
1648 - def _waitOR(self,par):
1649 """Handles waiting for an OR of events in a tuple/list. 1650 """ 1651 proc=par[0][1] 1652 evlist=par[0][2] 1653 proc.eventsFired=[] 1654 anyoccur=False 1655 for ev in evlist: 1656 if ev.occurred: 1657 anyoccur=True 1658 proc.eventsFired.append(ev) 1659 ev.occurred=False 1660 if anyoccur: #at least one event has fired; continue process 1661 _e._post(_Action(who=proc,generator=proc._nextpoint), 1662 at=_t,prior=1) 1663 1664 else: #no event in list has fired, enter process in all 'waits' lists 1665 proc.eventsFired=[] 1666 proc._nextTime=None #passivate calling process 1667 for ev in evlist: 1668 ev.waits.append([proc,evlist])
1669
1670 - def _queue(self,par):
1671 """Consumes a signal if it has occurred, otherwise process 'proc' 1672 queues for this event. 1673 """ 1674 proc=par[0][1] #the process issuing the yield queueevent command 1675 proc.eventsFired=[] 1676 if not self.occurred: 1677 self.queues.append([proc,[self]]) 1678 proc._nextTime=None #passivate calling process 1679 else: 1680 proc.eventsFired.append(self) 1681 self.occurred=False 1682 _e._post(_Action(who=proc,generator=proc._nextpoint), 1683 at=_t,prior=1)
1684
1685 - def _queueOR(self,par):
1686 """Handles queueing for an OR of events in a tuple/list. 1687 """ 1688 proc=par[0][1] 1689 evlist=par[0][2] 1690 proc.eventsFired=[] 1691 anyoccur=False 1692 for ev in evlist: 1693 if ev.occurred: 1694 anyoccur=True 1695 proc.eventsFired.append(ev) 1696 ev.occurred=False 1697 if anyoccur: #at least one event has fired; continue process 1698 _e._post(_Action(who=proc,generator=proc._nextpoint), 1699 at=_t,prior=1) 1700 1701 else: #no event in list has fired, enter process in all 'waits' lists 1702 proc.eventsFired=[] 1703 proc._nextTime=None #passivate calling process 1704 for ev in evlist: 1705 ev.queues.append([proc,evlist])
1706 1707 ## begin waituntil functionality
1708 -def _test():
1709 """ 1710 Gets called by simulate after every event, as long as there are processes 1711 waiting in condQ for a condition to be satisfied. 1712 Tests the conditions for all waiting processes. Where condition satisfied, 1713 reactivates that process immediately and removes it from queue. 1714 """ 1715 global condQ 1716 rList=[] 1717 for el in condQ: 1718 if el.cond(): 1719 rList.append(el) 1720 reactivate(el) 1721 for i in rList: 1722 condQ.remove(i) 1723 1724 if not condQ: 1725 _stopWUStepping()
1726
1727 -def _waitUntilFunc(proc,cond):
1728 global condQ 1729 """ 1730 Puts a process 'proc' waiting for a condition into a waiting queue. 1731 'cond' is a predicate function which returns True if the condition is 1732 satisfied. 1733 """ 1734 if not cond(): 1735 condQ.append(proc) 1736 proc.cond=cond 1737 _startWUStepping() #signal 'simulate' that a process is waiting 1738 # passivate calling process 1739 proc._nextTime=None 1740 else: 1741 #schedule continuation of calling process 1742 _e._post(_Action(who=proc,generator=proc._nextpoint), 1743 at=_t,prior=1)
1744 1745 1746 ##end waituntil functionality 1747
1748 -def scheduler(till=0):
1749 """Schedules Processes/semi-coroutines until time 'till'. 1750 Deprecated since version 0.5. 1751 """ 1752 simulate(until=till) 1753
1754 -def holdfunc(a):
1755 a[0][1]._hold(a)
1756
1757 -def requestfunc(a):
1758 """Handles 'yield request,self,res' and 'yield (request,self,res),(<code>,self,par)'. 1759 <code> can be 'hold' or 'waitevent'. 1760 """ 1761 if type(a[0][0])==tuple: 1762 ## Compound yield request statement 1763 ## first tuple in ((request,self,res),(xx,self,yy)) 1764 b=a[0][0] 1765 ## b[2]==res (the resource requested) 1766 ##process the first part of the compound yield statement 1767 ##a[1] is the _Action instance (a[1].who==process owner, a[1].process==generator) 1768 b[2]._request(arg=(b,a[1])) 1769 ##deal with add-on condition to command 1770 ##Trigger processes for reneging 1771 class _Holder(Process): 1772 """Provides timeout process""" 1773 def trigger(self,delay): 1774 yield hold,self,delay 1775 if not proc in b[2].activeQ: 1776 reactivate(proc)
1777 1778 class _EventWait(Process): 1779 """Provides event waiting process""" 1780 def trigger(self,event): 1781 yield waitevent,self,event 1782 if not proc in b[2].activeQ: 1783 a[1].who.eventsFired=self.eventsFired 1784 reactivate(proc) 1785 1786 #activate it 1787 proc=a[0][0][1] # the process to be woken up 1788 actCode=a[0][1][0] 1789 if actCode==hold: 1790 proc._holder=_Holder(name="RENEGE-hold for %s"%proc.name) 1791 ## the timeout delay 1792 activate(proc._holder,proc._holder.trigger(a[0][1][2])) 1793 elif actCode==waituntil: 1794 raise FatalSimerror("Illegal code for reneging: waituntil") 1795 elif actCode==waitevent: 1796 proc._holder=_EventWait(name="RENEGE-waitevent for %s"%proc.name) 1797 ## the event 1798 activate(proc._holder,proc._holder.trigger(a[0][1][2])) 1799 elif actCode==queueevent: 1800 raise FatalSimerror("Illegal code for reneging: queueevent") 1801 else: 1802 raise FatalSimerror("Illegal code for reneging %s"%actCode) 1803 else: 1804 ## Simple yield request command 1805 a[0][2]._request(a) 1806
1807 -def releasefunc(a):
1808 a[0][2]._release(a)
1809
1810 -def passivatefunc(a):
1811 a[0][1]._passivate(a)
1812
1813 -def waitevfunc(a):
1814 #if waiting for one event only (not a tuple or list) 1815 evtpar=a[0][2] 1816 if isinstance(evtpar,SimEvent): 1817 a[0][2]._wait(a) 1818 # else, if waiting for an OR of events (list/tuple): 1819 else: #it should be a list/tuple of events 1820 # call _waitOR for first event 1821 evtpar[0]._waitOR(a)
1822
1823 -def queueevfunc(a):
1824 #if queueing for one event only (not a tuple or list) 1825 evtpar=a[0][2] 1826 if isinstance(evtpar,SimEvent): 1827 a[0][2]._queue(a) 1828 #else, if queueing for an OR of events (list/tuple): 1829 else: #it should be a list/tuple of events 1830 # call _queueOR for first event 1831 evtpar[0]._queueOR(a)
1832
1833 -def waituntilfunc(par):
1834 _waitUntilFunc(par[0][1],par[0][2])
1835
1836 -def getfunc(a):
1837 """Handles 'yield get,self,buffer,what,priority' and 1838 'yield (get,self,buffer,what,priority),(<code>,self,par)'. 1839 <code> can be 'hold' or 'waitevent'. 1840 """ 1841 if type(a[0][0])==tuple: 1842 ## Compound yield request statement 1843 ## first tuple in ((request,self,res),(xx,self,yy)) 1844 b=a[0][0] 1845 ## b[2]==res (the resource requested) 1846 ##process the first part of the compound yield statement 1847 ##a[1] is the _Action instance (a[1].who==process owner, a[1].process==generator) 1848 b[2]._get(arg=(b,a[1])) 1849 ##deal with add-on condition to command 1850 ##Trigger processes for reneging 1851 class _Holder(Process): 1852 """Provides timeout process""" 1853 def trigger(self,delay): 1854 yield hold,self,delay 1855 #if not proc in b[2].activeQ: 1856 if proc in b[2].getQ: 1857 reactivate(proc)
1858 1859 class _EventWait(Process): 1860 """Provides event waiting process""" 1861 def trigger(self,event): 1862 yield waitevent,self,event 1863 if proc in b[2].getQ: 1864 a[1].who.eventsFired=self.eventsFired 1865 reactivate(proc) 1866 1867 #activate it 1868 proc=a[0][0][1] # the process to be woken up 1869 actCode=a[0][1][0] 1870 if actCode==hold: 1871 proc._holder=_Holder("RENEGE-hold for %s"%proc.name) 1872 ## the timeout delay 1873 activate(proc._holder,proc._holder.trigger(a[0][1][2])) 1874 elif actCode==waituntil: 1875 raise FatalSimerror("Illegal code for reneging: waituntil") 1876 elif actCode==waitevent: 1877 proc._holder=_EventWait(proc.name) 1878 ## the event 1879 activate(proc._holder,proc._holder.trigger(a[0][1][2])) 1880 elif actCode==queueevent: 1881 raise FatalSimerror("Illegal code for reneging: queueevent") 1882 else: 1883 raise FatalSimerror("Illegal code for reneging %s"%actCode) 1884 else: 1885 ## Simple yield request command 1886 a[0][2]._get(a) 1887 1888
1889 -def putfunc(a):
1890 """Handles 'yield put' (simple and compound hold/waitevent) 1891 """ 1892 if type(a[0][0])==tuple: 1893 ## Compound yield request statement 1894 ## first tuple in ((request,self,res),(xx,self,yy)) 1895 b=a[0][0] 1896 ## b[2]==res (the resource requested) 1897 ##process the first part of the compound yield statement 1898 ##a[1] is the _Action instance (a[1].who==process owner, a[1].process==generator) 1899 b[2]._put(arg=(b,a[1])) 1900 ##deal with add-on condition to command 1901 ##Trigger processes for reneging 1902 class _Holder(Process): 1903 """Provides timeout process""" 1904 def trigger(self,delay): 1905 yield hold,self,delay 1906 #if not proc in b[2].activeQ: 1907 if proc in b[2].putQ: 1908 reactivate(proc)
1909 1910 class _EventWait(Process): 1911 """Provides event waiting process""" 1912 def trigger(self,event): 1913 yield waitevent,self,event 1914 if proc in b[2].putQ: 1915 a[1].who.eventsFired=self.eventsFired 1916 reactivate(proc) 1917 1918 #activate it 1919 proc=a[0][0][1] # the process to be woken up 1920 actCode=a[0][1][0] 1921 if actCode==hold: 1922 proc._holder=_Holder("RENEGE-hold for %s"%proc.name) 1923 ## the timeout delay 1924 activate(proc._holder,proc._holder.trigger(a[0][1][2])) 1925 elif actCode==waituntil: 1926 raise FatalSimerror("Illegal code for reneging: waituntil") 1927 elif actCode==waitevent: 1928 proc._holder=_EventWait("RENEGE-waitevent for %s"%proc.name) 1929 ## the event 1930 activate(proc._holder,proc._holder.trigger(a[0][1][2])) 1931 elif actCode==queueevent: 1932 raise FatalSimerror("Illegal code for reneging: queueevent") 1933 else: 1934 raise FatalSimerror("Illegal code for reneging %s"%actCode) 1935 else: 1936 ## Simple yield request command 1937 a[0][2]._put(a) 1938
1939 -def simulate(callback=lambda :None,until=0):
1940 """Schedules Processes/semi-coroutines until time 'until'""" 1941 1942 """Gets called once. Afterwards, co-routines (generators) return by 1943 'yield' with a cargo: 1944 yield hold, self, <delay>: schedules the "self" process for activation 1945 after <delay> time units.If <,delay> missing, 1946 same as "yield hold,self,0" 1947 1948 yield passivate,self : makes the "self" process wait to be re-activated 1949 1950 yield request,self,<Resource>[,<priority>]: request 1 unit from <Resource> 1951 with <priority> pos integer (default=0) 1952 1953 yield release,self,<Resource> : release 1 unit to <Resource> 1954 1955 yield waitevent,self,<SimEvent>|[<Evt1>,<Evt2>,<Evt3), . . . ]: 1956 wait for one or more of several events 1957 1958 1959 yield queueevent,self,<SimEvent>|[<Evt1>,<Evt2>,<Evt3), . . . ]: 1960 queue for one or more of several events 1961 1962 yield waituntil,self,cond : wait for arbitrary condition 1963 1964 yield get,self,<buffer>[,<WhatToGet>[,<priority>]] 1965 get <WhatToGet> items from buffer (default=1); 1966 <WhatToGet> can be a pos integer or a filter function 1967 (Store only) 1968 1969 yield put,self,<buffer>[,<WhatToPut>[,priority]] 1970 put <WhatToPut> items into buffer (default=1); 1971 <WhatToPut> can be a pos integer (Level) or a list of objects 1972 (Store) 1973 1974 EXTENSIONS: 1975 Request with timeout reneging: 1976 yield (request,self,<Resource>),(hold,self,<patience>) : 1977 requests 1 unit from <Resource>. If unit not acquired in time period 1978 <patience>, self leaves waitQ (reneges). 1979 1980 Request with event-based reneging: 1981 yield (request,self,<Resource>),(waitevent,self,<eventlist>): 1982 requests 1 unit from <Resource>. If one of the events in <eventlist> occurs before unit 1983 acquired, self leaves waitQ (reneges). 1984 1985 Get with timeout reneging (for Store and Level): 1986 yield (get,self,<buffer>,nrToGet etc.),(hold,self,<patience>) 1987 requests <nrToGet> items/units from <buffer>. If not acquired <nrToGet> in time period 1988 <patience>, self leaves <buffer>.getQ (reneges). 1989 1990 Get with event-based reneging (for Store and Level): 1991 yield (get,self,<buffer>,nrToGet etc.),(waitevent,self,<eventlist>) 1992 requests <nrToGet> items/units from <buffer>. If not acquired <nrToGet> before one of 1993 the events in <eventlist> occurs, self leaves <buffer>.getQ (reneges). 1994 1995 1996 1997 Event notices get posted in event-list by scheduler after "yield" or by 1998 "activate"/"reactivate" functions. 1999 2000 Nov 9, 2003: Added capability to step through simulation event by event if 2001 step==True. 'callback' gets called after every event. It can 2002 cancel stepping or end run. API and semantics backwards 2003 compatible with previous versions of simulate(). 2004 2005 """ 2006 global _endtime,_e,_stop,_t,_step,_wustep 2007 paused=False 2008 _stop=False 2009 2010 if _e is None: 2011 raise FatalSimerror("Simulation not initialized") 2012 if _e.events == {}: 2013 message="SimPy: No activities scheduled" 2014 return message 2015 2016 _endtime=until 2017 message="SimPy: Normal exit" 2018 dispatch={hold:holdfunc,request:requestfunc,release:releasefunc, 2019 passivate:passivatefunc,waitevent:waitevfunc,queueevent:queueevfunc, 2020 waituntil:waituntilfunc,get:getfunc,put:putfunc} 2021 commandcodes=dispatch.keys() 2022 commandwords={hold:"hold",request:"request",release:"release",passivate:"passivate", 2023 waitevent:"waitevent",queueevent:"queueevent",waituntil:"waituntil", 2024 get:"get",put:"put"} 2025 while not _stop and _t<=_endtime: 2026 try: 2027 a=_e._nextev() 2028 if not a[0] is None: 2029 ## 'a' is tuple "(<yield command>, <action>)" 2030 if type(a[0][0])==tuple: 2031 ##allowing for yield (request,self,res),(waituntil,self,cond) 2032 command=a[0][0][0] 2033 else: 2034 command = a[0][0] 2035 if __debug__: 2036 if not command in commandcodes: 2037 raise FatalSimerror("Illegal command: yield %s"%command) 2038 dispatch[command](a) 2039 except FatalSimerror,error: 2040 print "SimPy: "+error.value 2041 sys.exit(1) 2042 except Simerror,error: 2043 message="SimPy: "+error.value 2044 _stop = True 2045 if _step: 2046 callback() 2047 if _wustep: 2048 _test() 2049 _stopWUStepping() 2050 stopStepping() 2051 _e=None 2052 return message
2053
2054 -def simulateStep(callback=lambda :None,until=0):
2055 """Schedules Processes/semi-coroutines until next event""" 2056 2057 """Can be called repeatedly. 2058 Behaves like "simulate", but does execute only one event per call. 2059 2060 2061 2062 """ 2063 global _endtime,_e,_stop,_t,_step 2064 2065 status="resumable" 2066 2067 if _e == None: 2068 raise Simerror("Fatal SimPy error: Simulation not initialized") 2069 if _e.events == {}: 2070 message="SimPy: No activities scheduled" 2071 status="notResumable" 2072 return message,status 2073 2074 _endtime=until 2075 message="SimPy: Normal exit" 2076 dispatch={hold:holdfunc,request:requestfunc,release:releasefunc, 2077 passivate:passivatefunc,waitevent:waitevfunc,queueevent:queueevfunc, 2078 waituntil:waituntilfunc,get:getfunc,put:putfunc} 2079 commandcodes=dispatch.keys() 2080 commandwords={hold:"hold",request:"request",release:"release",passivate:"passivate", 2081 waitevent:"waitevent",queueevent:"queueevent",waituntil:"waituntil", 2082 get:"get",put:"put"} 2083 if not _stop and _t<=_endtime: 2084 try: 2085 a=_e._nextev() 2086 if not a[0]==None: 2087 ## 'a' is tuple "(<yield command>, <action>)" 2088 if type(a[0][0])==tuple: 2089 ##allowing for yield (request,self,res),(waituntil,self,cond) 2090 command=a[0][0][0] 2091 else: 2092 command = a[0][0] 2093 if __debug__: 2094 if not command in commandcodes: 2095 raise FatalSimerror("Fatal error: illegal command: yield %s"%command) 2096 dispatch[command](a) 2097 except FatalSimerror,error: 2098 print "SimPy: "+error.value 2099 sys.exit(1) 2100 except Simerror, error: 2101 message="SimPy: "+ error.value 2102 _stop = True 2103 status="notResumable" 2104 if _step: 2105 callback() 2106 return message,status
2107 2108 2109 ################### end of Simulation module 2110 2111 if __name__ == "__main__": 2112 print "SimPy.SimulationStep %s" %__version__ 2113 ################### start of test/demo programs 2114
2115 - def askCancel():
2116 a=raw_input("[Time=%s] End run (e), Continue stepping (s), Run to end (r)"%now()) 2117 if a=="e": 2118 stopSimulation() 2119 elif a=="s": 2120 return 2121 else: 2122 stopStepping()
2123
2124 - def test_demo():
2125 class Aa(Process): 2126 sequIn=[] 2127 sequOut=[] 2128 def __init__(self,holdtime,name): 2129 Process.__init__(self,name) 2130 self.holdtime=holdtime
2131 2132 def life(self,priority): 2133 for i in range(1): 2134 Aa.sequIn.append(self.name) 2135 print now(),rrr.name,"waitQ:",len(rrr.waitQ),"activeQ:",\ 2136 len(rrr.activeQ) 2137 print "waitQ: ",[(k.name,k._priority[rrr]) for k in rrr.waitQ] 2138 print "activeQ: ",[(k.name,k._priority[rrr]) \ 2139 for k in rrr.activeQ] 2140 assert rrr.n+len(rrr.activeQ)==rrr.capacity, \ 2141 "Inconsistent resource unit numbers" 2142 print now(),self.name,"requests 1 ", rrr.unitName 2143 yield request,self,rrr,priority 2144 print now(),self.name,"has 1 ",rrr.unitName 2145 print now(),rrr.name,"waitQ:",len(rrr.waitQ),"activeQ:",\ 2146 len(rrr.activeQ) 2147 print now(),rrr.name,"waitQ:",len(rrr.waitQ),"activeQ:",\ 2148 len(rrr.activeQ) 2149 assert rrr.n+len(rrr.activeQ)==rrr.capacity, \ 2150 "Inconsistent resource unit numbers" 2151 yield hold,self,self.holdtime 2152 print now(),self.name,"gives up 1",rrr.unitName 2153 yield release,self,rrr 2154 Aa.sequOut.append(self.name) 2155 print now(),self.name,"has released 1 ",rrr.unitName 2156 print "waitQ: ",[(k.name,k._priority[rrr]) for k in rrr.waitQ] 2157 print now(),rrr.name,"waitQ:",len(rrr.waitQ),"activeQ:",\ 2158 len(rrr.activeQ) 2159 assert rrr.n+len(rrr.activeQ)==rrr.capacity, \ 2160 "Inconsistent resource unit numbers" 2161 2162 class Destroyer(Process): 2163 def __init__(self): 2164 Process.__init__(self) 2165 2166 def destroy(self,whichProcesses): 2167 for i in whichProcesses: 2168 Process().cancel(i) 2169 yield hold,self,0 2170 2171 class Observer(Process): 2172 def __init__(self): 2173 Process.__init__(self) 2174 2175 def observe(self,step,processes,res): 2176 while now()<11: 2177 for i in processes: 2178 print " %s %s: act:%s, pass:%s, term: %s,interr:%s, qu:%s"\ 2179 %(now(),i.name,i.active(),i.passive(),i.terminated()\ 2180 ,i.interrupted(),i.queuing(res)) 2181 print 2182 yield hold,self,step 2183 2184 class UserControl(Process): 2185 def letUserInteract(self,when): 2186 yield hold,self,when 2187 startStepping() 2188 2189 print "****First case == priority queue, resource service not preemptable" 2190 initialize() 2191 rrr=Resource(5,name="Parking",unitName="space(s)", qType=PriorityQ, 2192 preemptable=0) 2193 procs=[] 2194 for i in range(10): 2195 z=Aa(holdtime=i,name="Car "+str(i)) 2196 procs.append(z) 2197 activate(z,z.life(priority=i)) 2198 o=Observer() 2199 activate(o,o.observe(1,procs,rrr)) 2200 startStepping() 2201 a=simulate(until=10000,callback=askCancel) 2202 print "Input sequence: ",Aa.sequIn 2203 print "Output sequence: ",Aa.sequOut 2204 2205 print "\n****Second case == priority queue, resource service preemptable" 2206 initialize() 2207 rrr=Resource(5,name="Parking",unitName="space(s)", qType=PriorityQ, 2208 preemptable=1) 2209 procs=[] 2210 for i in range(10): 2211 z=Aa(holdtime=i,name="Car "+str(i)) 2212 procs.append(z) 2213 activate(z,z.life(priority=i)) 2214 o=Observer() 2215 activate(o,o.observe(1,procs,rrr)) 2216 u=UserControl() 2217 activate(u,u.letUserInteract(4)) 2218 Aa.sequIn=[] 2219 Aa.sequOut=[] 2220 a=simulate(askCancel,until=10000) 2221 print a 2222 print "Input sequence: ",Aa.sequIn 2223 print "Output sequence: ",Aa.sequOut 2224
2225 - def test_interrupt():
2226 class Bus(Process): 2227 def __init__(self,name): 2228 Process.__init__(self,name)
2229 2230 def operate(self,repairduration=0): 2231 print now(),">> %s starts" %(self.name) 2232 tripleft = 1000 2233 while tripleft > 0: 2234 yield hold,self,tripleft 2235 if self.interrupted(): 2236 print "interrupted by %s" %self.interruptCause.name 2237 print "%s: %s breaks down " %(now(),self.name) 2238 tripleft=self.interruptLeft 2239 self.interruptReset() 2240 print "tripleft ",tripleft 2241 reactivate(br,delay=repairduration) # breakdowns only during operation 2242 yield hold,self,repairduration 2243 print now()," repaired" 2244 else: 2245 break # no breakdown, ergo bus arrived 2246 print now(),"<< %s done" %(self.name) 2247 2248 class Breakdown(Process): 2249 def __init__(self,myBus): 2250 Process.__init__(self,name="Breakdown "+myBus.name) 2251 self.bus=myBus 2252 2253 def breakBus(self,interval): 2254 2255 while True: 2256 yield hold,self,interval 2257 if self.bus.terminated(): break 2258 self.interrupt(self.bus) 2259 2260 print"\n\n+++test_interrupt" 2261 initialize() 2262 b=Bus("Bus 1") 2263 activate(b,b.operate(repairduration=20)) 2264 br=Breakdown(b) 2265 activate(br,br.breakBus(200)) 2266 startStepping() 2267 simulate(until=4000) 2268 2269
2270 - def testSimEvents():
2271 class Waiter(Process): 2272 def waiting(self,theSignal): 2273 while True: 2274 yield waitevent,self,theSignal 2275 print "%s: process '%s' continued after waiting for %s"%(now(),self.name,theSignal.name) 2276 yield queueevent,self,theSignal 2277 print "%s: process '%s' continued after queueing for %s"%(now(),self.name,theSignal.name)
2278 2279 class ORWaiter(Process): 2280 def waiting(self,signals): 2281 while True: 2282 yield waitevent,self,signals 2283 print now(),"one of %s signals occurred"%[x.name for x in signals] 2284 print "\t%s (fired/param)"%[(x.name,x.signalparam) for x in self.eventsFired] 2285 yield hold,self,1 2286 2287 class Caller(Process): 2288 def calling(self): 2289 while True: 2290 signal1.signal("wake up!") 2291 print "%s: signal 1 has occurred"%now() 2292 yield hold,self,10 2293 signal2.signal("and again") 2294 signal2.signal("sig 2 again") 2295 print "%s: signal1, signal2 have occurred"%now() 2296 yield hold,self,10 2297 print"\n+++testSimEvents output" 2298 initialize() 2299 signal1=SimEvent("signal 1") 2300 signal2=SimEvent("signal 2") 2301 signal1.signal("startup1") 2302 signal2.signal("startup2") 2303 w1=Waiter("waiting for signal 1") 2304 activate(w1,w1.waiting(signal1)) 2305 w2=Waiter("waiting for signal 2") 2306 activate(w2,w2.waiting(signal2)) 2307 w3=Waiter("also waiting for signal 2") 2308 activate(w3,w3.waiting(signal2)) 2309 w4=ORWaiter("waiting for either signal 1 or signal 2") 2310 activate(w4,w4.waiting([signal1,signal2]),prior=True) 2311 c=Caller("Caller") 2312 activate(c,c.calling()) 2313 print simulate(until=100) 2314
2315 - def testwaituntil():
2316 """ 2317 Demo of waitUntil capability. 2318 2319 Scenario: 2320 Three workers require sets of tools to do their jobs. Tools are shared, scarce 2321 resources for which they compete. 2322 """ 2323 2324 2325 class Worker(Process): 2326 def __init__(self,name,heNeeds=[]): 2327 Process.__init__(self,name) 2328 self.heNeeds=heNeeds
2329 def work(self): 2330 2331 def workerNeeds(): 2332 for item in self.heNeeds: 2333 if item.n==0: 2334 return False 2335 return True 2336 2337 while now()<8*60: 2338 yield waituntil,self,workerNeeds 2339 for item in self.heNeeds: 2340 yield request,self,item 2341 print "%s %s has %s and starts job" %(now(),self.name, 2342 [x.name for x in self.heNeeds]) 2343 yield hold,self,random.uniform(10,30) 2344 for item in self.heNeeds: 2345 yield release,self,item 2346 yield hold,self,2 #rest 2347 2348 print "\n+++ nwaituntil demo output" 2349 initialize() 2350 brush=Resource(capacity=1,name="brush") 2351 ladder=Resource(capacity=2,name="ladder") 2352 hammer=Resource(capacity=1,name="hammer") 2353 saw=Resource(capacity=1,name="saw") 2354 painter=Worker("painter",[brush,ladder]) 2355 activate(painter,painter.work()) 2356 roofer=Worker("roofer",[hammer,ladder,ladder]) 2357 activate(roofer,roofer.work()) 2358 treeguy=Worker("treeguy",[saw,ladder]) 2359 activate(treeguy,treeguy.work()) 2360 for who in (painter,roofer,treeguy): 2361 print "%s needs %s for his job" %(who.name,[x.name for x in who.heNeeds]) 2362 print 2363 print simulate(until=9*60) 2364 2365 2366 2367 2368 test_demo() 2369 test_interrupt() 2370 testSimEvents() 2371 testwaituntil() 2372