pythonでシミュレーション

シミュレーションと待ち行列であるが、待ち行列の理解があるといい。
人が窓口に並ぶ、待ち時間や作業時間を計算する。
タスクのスケジュールのシュミレーションも同じで、人をタスクととらえ、窓口をCPUと考えるといいだろう。条件を変えて、どれくらいの時間で処理できるかをシミュレーションできる。

1.環境について

(1)概要
GoogleColabで実施する。
python3.hatenadiary.com

(2)環境の確認とモジュールsimpyのインストール
冒頭に以下を実行する。もしかするとpip3かも
!pip install simpy

#カレンフォルダの確認
!pwd
#ファイルの確認
!ls
#インストールされているモジュールを確認
!pip list
#simpyのモジュールを入れる
!pip install simpy 

2.simpy

(1)simpyについて

Pythonで離散イベントシミュレーションを実施できる。
概要としては、simpyをインポートし、env = simpy.Environment() というオマジナイでシミュレーション環境を構築する。そして、ジェネレータをdefで作成し、yieldで次々と送る。
それを実行する。

(2)シンプルなコード1

qiita.com
上記にあるように、以下が、最も単純な書き方であり、文法でもある。

❶プログラム概要
・今の時刻(秒)を表示する。※おそらく、プログラムを開始してからの秒数
・func1(env)のイテレータで、10秒まで回す。

❷プログラムそのもの
上記のサイトそのままである。サイト管理者さん、勉強させてもらっております。ありがとうございます。

import simpy

def func1(env):
    while True:
        print(env.now)
        yield env.timeout(2)

env = simpy.Environment()
env.process(func1(env))
env.run(until=10)  # 0 2 4 6 8

以下、説明を入れてみる。

#simpyをインポート
import simpy

#イテレータを定義する。この中にシミュレーションする内容
を記載する。
def func1(env):
    while True:
        #今の時刻を表示?
        print(env.now)
        #2秒間待つ
        yield env.timeout(2)

#インスタンス化する。シミュレーション環境を作っている?
env = simpy.Environment()

#上記で作成したシミュレーション環境に、プロセスを追加する?
env.process(func1(env))

#実行する
#untilによって、ある時刻か事象になるまでシミュレーションを実行
env.run(until=10)  # 0 2 4 6 8
(3)シンプルなコード2 :計算式を入れる。時間と距離の計算

・秒速20mで進む場合、何m進んだかを表示する。
・func1(env)のイテレータで、5秒まで回す。

import simpy

def func1(env):
    while True:
        #秒速20なので、今の時刻(経過した秒)と20を掛ける
        print(env.now, "秒後の距離=", env.now*20)  
        yield env.timeout(1)

env = simpy.Environment()
env.process(func1(env))
env.run(until=5) 

実行結果は以下

0 秒後の距離= 0
1 秒後の距離= 20
2 秒後の距離= 40
3 秒後の距離= 60
4 秒後の距離= 80
(3)シンプルなコード3:乱数を加える

・先のプログラムに乱数を加える。秒速20mではなく、乱数を掛ける
・進む場合、何m進んだかを表示する。

import simpy

def func1(env):
    mean=20 #平均
    std=2 #標準偏差
    speed=np.random.normal(mean,std) #平均と標準偏差を使って速度の乱数を作る
    while True:
        #秒速20に乱数を付与。
        print(env.now, "秒後の距離=", env.now*speed)  
        yield env.timeout(1)

env = simpy.Environment()
env.process(func1(env))
env.run(until=5) 

実行結果は以下

0 秒後の距離= 0.0
1 秒後の距離= 23.22936762721028
2 秒後の距離= 46.45873525442056
3 秒後の距離= 69.68810288163084
4 秒後の距離= 92.91747050884112
(4)シンプルなコード4:毎回乱数を使う

・先と同様に乱数を使うが、1秒ごとに乱数の値を変える。
・locationで現在位置を指定し、そこに乱数を加えていく

import simpy

def func1(env):
    mean=20 #平均
    std=2 #標準偏差
    location=0 #位置
    while True:
        step=np.random.normal(mean,std) #乱数により、毎回進む距離を発生させる
        print(env.now, "秒後の距離=", location, "今回進む距離=",step)  
        location = location + step #現在位置(location)に、今回進む距離stepを加える
        yield env.timeout(1)

env = simpy.Environment()
env.process(func1(env))
env.run(until=5) 

実行結果は以下

0 秒後の距離= 0 今回進む距離= 18.23077711331148
1 秒後の距離= 18.23077711331148 今回進む距離= 17.822299668852168
2 秒後の距離= 36.05307678216364 今回進む距離= 17.87092879869479
3 秒後の距離= 53.924005580858434 今回進む距離= 22.18420721884411
4 秒後の距離= 76.10821279970254 今回進む距離= 21.176451647274014
(5)シンプルなコード5:何回も繰り返す

・ほしいのは結果だけなので、4秒後の距離の結果だけを出力する。
・それを繰り返す。今までとはenv.run(until=5) の使い方が違う。これまでは、秒を変えることに使っていたが、今回は繰り返しの回数に使う

import simpy

def func1(env):
    mean=20 #平均
    std=2 #標準偏差
    while True:
        location=0 #位置
        for i in range(5):
          step=np.random.normal(mean,std) #乱数により、毎回進む距離を発生させる
          location = location + step #現在位置(location)に、今回進む距離stepを加える
        print(location)
        yield env.timeout(1)

env = simpy.Environment()
env.process(func1(env))
env.run(until=5) #5回繰り返す

実行結果は以下

102.07861590133939
100.20338184303091
92.8811368947726
105.10484325211485
99.95588434026536
(6)シンプルなコード6:工程ごとの時間を乱数で変えて、何回も繰り返す

・工程の作業時間を与え、合計時間を求める。
・各工程の作業時間にはバラつきがあるので、乱数にてバラつきを付与する。
・このとき、バラつきは工程によって違うので、標準偏差を変える

import simpy

def func1(env):
    #標準偏差
    std_L=3 #バラつきが多い工程
    std_S=1 #バラつきが少ない工程

    #工程の時間
    b_time=20  #工程B
    c_time=15  #工程C
    d_time=150  #工程D
    while True:
        total_time = np.random.normal(b_time,std_L) + np.random.normal(c_time,std_L) + np.random.normal(d_time,std_S)  #総時間を乱数を使って求める
        print(total_time)
        yield env.timeout(1)

env = simpy.Environment()
env.process(func1(env))
env.run(until=5) #5回繰り返す

実行結果は以下

190.39466291002768
187.3850570612345
186.02605962989372
189.05992209146984
190.75738535763048
(7)classにしてみる

・中身はよくわかっていないが、classにしてみた

import simpy
import numpy as np

class Func1:
   
    def __init__(self, env):    #コンストラクタ
        self.env = env

        #標準偏差
        self.std_L=3 #バラつきが多い工程
        self.std_S=1 #バラつきが少ない工程

        #工程の時間
        self.b_time=20  #工程B
        self.c_time=15  #工程C
        self.d_time=150  #工程D

    def start(self):   #startメソッドを作成し、時間を計算する
        while True:
          total_time = np.random.normal(self.b_time,self.std_L) + np.random.normal(self.c_time,self.std_L) + np.random.normal(self.d_time,self.std_S)  #総時間を乱数を使って求める
          print(total_time)
          yield env.timeout(1)

env = simpy.Environment() #simpyを作る環境
instance1 = Func1(env) #インスタンスを作成、同時にコンストラクタが呼ばれる
env.process(instance1.start()) #クラスFunc1のインスタンスinstance1のメソッドであるstartを呼び出す。()は引数を渡していないので、何も入れていない。
env.run(until=5) #runで、simpyを実行する。5回繰り返す
(8)上記の結果をグラフにする・・・ヒストグラム

シミュレーション結果を配列に入れて、グラフにする。

import simpy
import numpy as np
import matplotlib.pyplot as plt

class Func1:
    arr_time = [] #結果を入れる配列を用意
    def __init__(self, env):    #コンストラクタ 最初に読み込まれる

        self.env = env

        #標準偏差
        self.std_L=3 #バラつきが多い工程
        self.std_S=1 #バラつきが少ない工程

        #工程の時間
        self.b_time=20  #工程B
        self.c_time=15  #工程C
        self.d_time=150  #工程D

    def start(self):   #startメソッドを作成し、時間を計算する
        while True:
          total_time = np.random.normal(self.b_time,self.std_L) + np.random.normal(self.c_time,self.std_L) + np.random.normal(self.d_time,self.std_S)  #総時間を乱数を使って求める
          self.arr_time.append(total_time) #配列に結果を入れていく
          yield env.timeout(1)

env = simpy.Environment() #simpyを作る環境
instance1 = Func1(env) #インスタンスを作成、同時にコンストラクタが呼ばれる
env.process(instance1.start()) #クラスFunc1のインスタンスinstance1のメソッドであるstartを呼び出す。()は引数を渡していないので、何も入れていない。
env.run(until=10000) #runで、simpyを実行する。10000回繰り返す

#以下、ヒストグラムの作成
np_arr = np.array(instance1.arr_time) #リストをnp配列に変換した。リストのままでもグラフが描けるのであれば、それでもいいと思う。
fig, ax = plt.subplots() 
ax.hist((np_arr), bins=100)  #100に分割したヒストグラムという意味だと思う。この値を大きくすると、詳細なグラフになる。

f:id:seeeko:20210913224449p:plain
❷ひげ根
以下は、さらにバラつきがある場合のシミュレーションを実施している。勉強していきたい。
https://sinhrks.hatenablog.com/entry/2014/12/14/005604
https://cpp-learning.com/simpy/

3.ガントチャート

(1)やってみよう

以下のplotyを使うと一気に作ってくれる。
plotly.com
環境としては、plotlyのデフォルトのバージョンだと動かないので、以下のバージョンを指定して動かす。

!pip install plotly==4.9.0

以下は、上記のURLの通り。そのまま張り付けるとガントチャートが表示される。

import plotly.express as px
import pandas as pd

df = pd.DataFrame([
    dict(Task="Job A", Start='2009-01-01', Finish='2009-02-28'),
    dict(Task="Job B", Start='2009-03-05', Finish='2009-04-15'),
    dict(Task="Job C", Start='2009-02-20', Finish='2009-05-30')
])

fig = px.timeline(df, x_start="Start", x_end="Finish", y="Task")
fig.update_yaxes(autorange="reversed") # otherwise tasks are listed from the bottom up
fig.show()

f:id:seeeko:20210918121848p:plain

(2)タスクリストをもとにしたガントチャート
"""
プログラムの概要:タスクの前後関係を考慮しながら、最短作業時間を計算する

- タスクリスト(tasks)に、タスクおよび作業時間や前後関係、担当者が割り当てられている
- 1分ごとに、タスクリストの情報から、作業する内容を決めて、while でループさせる。
- 複数のタスクがある場合は上から順に実行する → クリティカルパスを選べない場合がある
- 各担当者が作業済かどうかのフラグを用意する。
- ループごとに、それぞれのタスクが実行可能か判断する。実行不可能な条件は以下。
    - 完了済みである。(残り時間が0)
    - 前工程が残っている。
    - 担当者がすでに作業済み。
- 実行可能なタスクなら、次のふたつの処理を行う。
    - タスクの残り時間を-1。
    - タスクを担当したリソースの「作業したフラグ」を True にする。
- 実行可能なタスクが無ければ終了。
- そしてガントチャートを表示する。
"""
!pip install plotly==4.9.0
!pip list | grep plot

import plotly.express as px
import pandas as pd

# タスクを定義する。ここは人間が作成
# [タスク名, 完了に必要な時間(分), 前工程のタスク名, タスクに必要なリソース(担当者)] 

tasks = [
['A',0,[],['CSIRT']],  
['B',60,['A'],['CSIRT']],
['C',60,['B'],['CSIRT']],
['D',120,['B'],['CSIRT']],
['E',300,['B'],['CSIRT']],
['F',60,['C'],['Sales']],
['G',60,['F'],['Sales']],
['H',360,['D'],['Vender']],
['I',60,['E'],['CSIRT']],
['J',120, ['D', 'H'],['CSIRT']],
['K',60,['G'],['Sales']],
['L',120,['E', 'I', 'J', 'G', 'K'],['CSIRT','Sales']],
['M',120,['L'],['CSIRT']],
['N',60,['M', 'I'],['Exective']],
['O',120,['E', 'J', 'N'],['CSIRT']],
]

#求める総時間(total_time)を初期化
total_time = 0

#リソース(担当者)を,taskから拾う
resources = []
for task in tasks:
    for resource in task[3]:
        resources.append(resource)
resources = list(set(resources))  # 重複を消すために、setにした後、リストに戻す

#前処理:時間が0のものは、他タスクの「前工程」制限を外す
for task in tasks:
    if task[1] == 0:
        for _task in tasks:  # _を付けているのは、_無しのtaskと区別したいから
            if task[0] in _task[2]:
                _task[2].remove(task[0])

# 最後にガントチャートを表示する。
# ガントチャートの元データとなる情報を格納していく list です。
gantt_source = []


#本処理:
i = 0
while 1: #無限ループ。あとの処理で、10000回でSTOP
    worked_flag = {} #各リソースにおいて、その1分間で仕事をしたかのFLAG。初期値は、まだしていない(=False)
    for resource in resources:
        worked_flag[resource] = False

    # タスクを実行
    for task in tasks:
        # 複数のタスクの中で、実行可能なタスクかを判断。実行可能でない場合はcontinueで抜けて、次のtaskへ
        if task[1] == 0:  # 残り時間が0なら実行不可。
            continue  #このforループにおいて、次のtaskに進む
        if task[2]:  # task[2]=前工程。空だったらFalseで、要素が存在すればTrue。前工程がある場合は、実行不可なので、次のタスクへ。
            continue
        someone_is_busy = False  # タスクには複数の担当者が割り当てられていることがある。その担当者(リソース)が、ひとりでも仕事済み(busy)かどうかを判断する。→その場合はタスクを実行不可
        for resource in resources:
            if resource in task[3] and worked_flag[resource] is True:
                # たとえば、CSIRT の担当タスクだけれど、 CSIRT はもう作業済み -> 実行不可。
                # この if に入るということは、誰かひとりが busy ということなので True にする。
                someone_is_busy = True
                break
        if someone_is_busy:
            # 担当者(リソース)が誰かひとりでも忙しい -> 実行不可。
            continue

        # ここに来た(つまり、continueされなかった)ということは、タスクが実行できるということ。このタスクを実行(=残り時間を1分減らす)。
        task[1] -= 1

        # タスクが実行される = ガントチャートに1分の内容を追加。
        gantt_source.append(
            dict(
                Task=task[0],
                Start=total_time,
                Finish=total_time + 1,
                Resource=' & '.join(task[3])),  # 'CSIRT' とか ['CSIRT', 'Sales']を こうする --> 'CSIRT & Sales'
        )

        # タスクが実行されたので、worked_flagを True にする。
        for resource in resources:
            if resource in task[3]:
                worked_flag[resource] = True

    # 完了(残り時間0)したタスクがあれば、他のタスクの「前工程」制限を外す。
    for task in tasks:
        if task[1] == 0:
            for _task in tasks:  # _を付けているのは、_無しのtaskと区別したいから
                if task[0] in _task[2]:
                    _task[2].remove(task[0])

#    print(tasks, total_time)

    # タスクが実行されたのなら、総時間を1分経過させる。
    for resource in resources:
        if worked_flag[resource]:
            # 総経過時間を+1します。
            total_time += 1
            break

    # タスクが実行されなかったのなら、実行できるタスクが無いということで、終了。
    else:
        break

    # 無限ループ防止の保険。
    if i == 10000:
        break
    i += 1

#print(dict(total_time=total_time))

#gantt_sourceを DataFrame にして、 gantt_source にする。
#from pprint import pprint
#pprint(dict(gantt_source=gantt_source))

# グラフを描くためにDataFrame に変換。
df = pd.DataFrame(gantt_source)

# Start と Finish を datetime ではなく int にしたことに対応。
df['delta'] = df['Finish'] - df['Start']
fig = px.timeline(df, x_start='Start', x_end='Finish', y='Task')
# デフォルトでは 'date'
fig.layout.xaxis.type = 'linear'
fig.data[0].x = df.delta.tolist()
fig.update_yaxes(autorange='reversed') #グラフの項目の表示を逆向きにする。この方が見やすいと思う
fig.show()
(3)生産性なども考慮したガントチャート
!pip install plotly==4.9.0
!pip list | grep plot


#タスクをインポートする場合
from google.colab import files
uploaded = files.upload()

import csv
csvfile='tasklist.csv'   #ファイル名とパスを指定
with open(csvfile, encoding='shift_jis') as file1:  #ファイルを開く
    tasks = list(csv.reader(file1))              #csvモジュールでリストに入れる
print(tasks) 

for _task in range(len(tasks)):
  tasks[_task][1] = int(tasks[_task][1])
  tasks[_task][2] = tasks[_task][2].split(',')
  tasks[_task][3] = tasks[_task][3].split(',')
print(tasks) 


"""
プログラムの概要:タスクの前後関係を考慮しながら、最短作業時間を計算する

- タスクリスト(tasks)に、タスクおよび作業時間や前後関係、担当者が割り当てられている
- 1分ごとに、タスクリストの情報から、作業する内容を決めて、while でループさせる。
- 複数のタスクがある場合は上から順に実行する → クリティカルパスを選べない場合がある
- 各担当者が作業済かどうかのフラグを用意する。
- ループごとに、それぞれのタスクが実行可能か判断する。実行不可能な条件は以下。
    - 完了済みである。(残り時間が0)
    - 前工程が残っている。
    - 担当者がすでに作業済み。
- 実行可能なタスクなら、次のふたつの処理を行う。
    - タスクの残り時間を-1。
    - タスクを担当したリソースの「作業したフラグ」を True にする。
- 実行可能なタスクが無ければ終了。
- そしてガントチャートを表示する。
"""

import datetime

import pandas as pd
import plotly.express as px

# タスクをインポートではなく、人間が定義する場合。
# [タスク名, 完了に必要な時間(分), 前工程のタスク名, タスクに必要なリソース(担当者)] 

tasks = [
['A',0,[],['CSIRT']],  
['B',60,['A'],['CSIRT']],
['C',60,['B'],['CSIRT']],
['D',120,['B'],['CSIRT']],
['E',300,['B'],['CSIRT']],
['F',60,['C'],['Sales']],
['G',60,['F'],['Sales']],
['H',360,['D'],['Vender']],
['I',60,['E'],['CSIRT']],
['J',120, ['D', 'H'],['CSIRT']],
['K',60,['G'],['Sales']],
['L',120,['E', 'I', 'J', 'G', 'K'],['CSIRT','Sales']],
['M',120,['L'],['CSIRT']],
['N',60,['M', 'I'],['Exective']],
['O',120,['E', 'J', 'N'],['CSIRT']],
]

"""
tasks = [
    ['A', 10,         [],                        ['CSIRT']],  # noqa: E241 <- flake8 を無視する設定です。
    ['B',  5,         [],                        ['Sales']],  # noqa: E241
     ['C', 15, ['A', 'B'], ['CSIRT', 'Sales', 'Management']],
     ['D', 10,         [],                        ['Sales']],  # noqa: E241
     ['E', 20,      ['D'],                       ['Vender']],  # noqa: E241
]
"""
# print(tasks)

#求める総時間(total_time)を初期化
total_time = 0

#リソース(担当者)を,taskから拾う
resources = []
for task in tasks:
    for resource in task[3]:
        resources.append(resource)
resources = list(set(resources))  # 重複を消すために、setにした後、リストに戻す

# 新しい概念、生産性です。
productivity = {
    'CSIRT': 2,
}
# productivity={'CSIRT';2,'Sales':1,'Vender':1, 'Excective':1}
# 他のリソースをいちいち :1 で書くのは面倒だと思うので
# これ↓で補完します。
for resource in resources:
    productivity.setdefault(resource, 1)

#前処理:時間が0のものは、他タスクの「前工程」制限を外す
for task in tasks:
    if task[1] == 0:
        for _task in tasks:  # _を付けているのは、_無しのtaskと区別したいから
            if task[0] in _task[2]:
                _task[2].remove(task[0])

# 最後にガントチャートを表示する。
# ガントチャートの元データとなる情報を格納していく list。
gantt_source = []

#本処理:
i = 0
while 1: #無限ループ。あとの処理で、10000回でSTOP
    worked_flag = {} #各リソースにおいて、その1分間で仕事をしたかのFLAG。初期値は、まだしていない(=False)
    for resource in resources:
        # CSIRT:2 だとしたら worked_flag[CSIRT] = [False, False] こう書いているのと同じです。
        worked_flag[resource] = [False] * productivity[resource]

    # タスクを実行
    for task in tasks:
        # 複数のタスクの中で、実行可能なタスクかを判断。実行可能でない場合はcontinueで抜けて、次のtaskへ
        if task[1] == 0:  # 残り時間が0なら実行不可。
            continue  #このforループにおいて、次のtaskに進む
        if task[2]:  # task[2]=前工程。空だったらFalseで、要素が存在すればTrue。前工程がある場合は、実行不可なので、次のタスクへ。
            continue
        someone_is_busy = False  # タスクには複数の担当者が割り当てられていることがある。その担当者(リソース)が、ひとりでも仕事済み(busy)かどうかを判断する。→その場合はタスクの実行不可
        for resource in resources:
            # 生産性の高いリソースについては、 [True, True] のようにすべてのフラグが
            # True になっている(all)とき、 is_busy であると判断します。
            if resource in task[3] and all(worked_flag[resource]):  # allは、全ての要素がTrueかどうかを判断 
                # たとえば、CSIRT の担当タスクだけれど、 CSIRT はもう作業済み -> 実行不可。
                # この if に入るということは、誰かひとりが busy ということなので True にする。
                someone_is_busy = True
                break
        if someone_is_busy:
            # 担当者(リソース)が誰かひとりでも忙しい -> 実行不可。
            continue

        # ここに来た(つまり、continueされなかった)ということは、タスクが実行できるということ。このタスクを実行(=残り時間を1分減らす)。

        # この変数は、担当者(リソース)の余力を示します。
        # 余力というのは、 worked_flag の中にある False の数です。
        remaining_power = 1
        # この if は「タスクの担当者がひとり」という意味。
        # NOTE: 仕様のためです。「複数の担当者を持つタスクについては生産性を考慮しなくていい」
        #       複数の担当者を持つなら、これまでどおり担当者の余力は1です。
        if len(task[3]) == 1:
            resources_for_this_task = task[3]  # これは list
            resource_name_for_this_task = resources_for_this_task[0]  # これは str
            # このリソースのフラグの中に、いくつ False があるか、数えています。
            # 上のコメントに書いたとおり、 False の数が余力の数です。
            remaining_power = worked_flag[resource_name_for_this_task].count(False)
        # 余力の数だけ残り時間を減らします。
        for _ in range(remaining_power):
            # いくら余力があっても残り時間が0だったら、
            # これ以上タスクはできません。(人あまりの状態)
            if task[1] > 0:
                task[1] -= 1
            else:
                break

            # タスクが実行されたので、worked_flagを True にする。
            for resource in task[3]:
                # ちなみに、ここまで処理が進んでいる時点で、
                # worked_flag に False があることは保障されています。indexは、最初にFalseの場所を探す
                worked_flag[resource][worked_flag[resource].index(False)] = True

        # タスクが実行される = ガントチャートに1分の内容を追加。
        gantt_source.append(
            dict(
                Task=task[0],
                Start=total_time,
                Finish=total_time + 1,
                Resource=' & '.join(task[3])),  # 'CSIRT' とか ['CSIRT', 'Sales']を こうする --> 'CSIRT & Sales'
        )

        # ここで for 終わり。
        # これはタスクリストの中のひとつのタスクを見終わったということ。
        # HACK: 関数化したりしてネストを短くしたり減らしたりしないと
        #       そろそろバグの温床になると思う。

    # 完了(残り時間0)したタスクがあれば、他のタスクの「前工程」制限を外す。
    for task in tasks:
        if task[1] == 0:
            for _task in tasks:  # _を付けているのは、_無しのtaskと区別したいから
                if task[0] in _task[2]:
                    _task[2].remove(task[0])

#    print(tasks, total_time)

    # タスクが実行されたのなら、総時間を1分経過させる。
    for resource in resources:
        if any(worked_flag[resource]):
            # 総経過時間を+1します。
            total_time += 1
            break

    # タスクが実行されなかったのなら、実行できるタスクが無いということで、終了。
    else:
        break

    # 無限ループ防止の保険。
    if i == 10000:
        break
    i += 1

# 1分刻みをまとめる
gantt_source_2 = []
for achievement in gantt_source:
    # この実績を前の実績と接続したら True になる変数です。
    connected = False
    # 接続可能な実績があるかどうか探します。
    for _achievement in gantt_source_2:
        if _achievement['Task'] != achievement['Task']:
            # このタスクじゃない。
            continue
        if _achievement['Finish'] == achievement['Start']:
            # 前の実績の終わりと、この実績の始まりが同じ -> 接続できる。
            _achievement['Finish'] = achievement['Finish']
            # さきほど定義した「接続したら True になる変数」を True にします。
            connected = True
            break

    # 接続できなかったならば、これは新しく始めたタスクの実績です。
    # dict を追加します。
    if not connected:
        gantt_source_2.append(achievement)
print(gantt_source_2)

# 所要時間(Finish - Start)の列を作っておきます。これはグラフ上の表記用です。
for row in gantt_source_2:
    row['Delta'] = row['Finish'] - row['Start']

# base_timeとして、実行した日の0時を基準にする。
base_date = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
for row in gantt_source_2:
    # plotlyの仕様で、StartとFinishは時刻表記にする。よって、ベース時刻に、StartとFinishのを足す。グラフの表記がどれが見やすいかという観点から、分ではなく、秒に足すことにした
    row['Start'] = base_date + datetime.timedelta(minutes=row['Start'])
    row['Finish'] = base_date + datetime.timedelta(minutes=row['Finish'])
    # datetime 型ではなくて、 str に直します。
    row['Start'] = row['Start'].strftime('%Y-%m-%d %H:%M')
    row['Finish'] = row['Finish'].strftime('%Y-%m-%d %H:%M')

# print(gantt_source_2)
# gantt_source_2.sort()

# # list -> DataFrame にします。
df = pd.DataFrame(gantt_source_2)

print(df)

# # timeline 作ります。
fig = px.timeline(
    df,
    x_start='Start',
    x_end='Finish',
    y='Task',
    color='Resource',
    # ガントチャートのバーのまんなかに DataFrame の列を表示させるときは
    # こう書きます。
    text='Delta',
)
# # 表の並び順を逆にします。
fig.update_yaxes(autorange='reversed')

fig.show()
print(total_time)

4.バラつきを考慮

(1)各種の分布と乱数

いろいろな乱数がある。
https://note.nkmk.me/python-random-randrange-randint/

シミュレーションをするには、論文をしっかり読む必要があるが、たとえば、以下でどうだろうか。

・単純な作業時間
エスカレーションは、在籍するかしないかという単純な2択に左右されたりもするので、正規分布(または2項分布)
import random
random.normalvariate(平均, 標準偏差)

・ベンダの調査時間
解決しない場合には、どれだけでも時間がかかる工程なので、
基本時間6時間の正規分布+指数分布(平均を8時間とした)

total_time = random.normalvariate(6,1)+random.expovariate(1/8)

参考だが、指数分布は、以下
random.expovariate(1/平均値)

これに、各工程において、バラつきの標準偏差などを変えていけばいいのではないか。

(2)バラつきの結果をヒストグラム化して追加
"""
プログラムの概要:タスクの前後関係を考慮しながら、最短作業時間を計算する

- タスクリスト(tasks)に、タスクおよび作業時間や前後関係、担当者が割り当てられている
- 1分ごとに、タスクリストの情報から、作業する内容を決めて、while でループさせる。
- 複数のタスクがある場合は上から順に実行する → クリティカルパスを選べない場合がある
- 各担当者が作業済かどうかのフラグを用意する。
- ループごとに、それぞれのタスクが実行可能か判断する。実行不可能な条件は以下。
    - 完了済みである。(残り時間が0)
    - 前工程が残っている。
    - 担当者がすでに作業済み。
- 実行可能なタスクなら、次のふたつの処理を行う。
    - タスクの残り時間を-1。
    - タスクを担当したリソースの「作業したフラグ」を True にする。
- 実行可能なタスクが無ければ終了。
- 生産性を考慮する。複数人で作業すれば、作業時間は人数で割った時間になる。
 ただし、複数の担当者(CSIRT:2、Sales:1)で実施するタスクは、Salseは1人のままなので、短縮されない。
 このとき、このタスクはCSIRTが一人で実行できるので、もう一人のCSIRTが他のタスクを実行できる
- そしてガントチャートを表示する。
"""

import datetime
import pandas as pd
import plotly.express as px
import numpy as np
import matplotlib.pyplot as plt
import copy

# タスクをインポートではなく、人間が定義する場合。
# [タスク名, 完了に必要な時間(分), 前工程のタスク名, タスクに必要なリソース(担当者),バラつきの指標] 

tasks = [
['A',0,[],['CSIRT'],1],
['B',60,['A'],['CSIRT'],1],
['C',60,['B'],['CSIRT'],1],
['D',120,['B'],['CSIRT'],1],
['E',300,['B'],['CSIRT'],1],
['F',60,['C'],['Sales'],1],
['G',60,['F'],['Sales'],1],
['H',360,['D'],['Vender'],2],
['I',60,['E'],['CSIRT'],1],
['J',120, ['D', 'H'],['CSIRT'],1],
['K',60,['G'],['Sales'],1],
['L',120,['E', 'I', 'J', 'G', 'K'],['CSIRT','Sales'],2],
['M',120,['L'],['CSIRT'],1],
['N',60,['M', 'I'],['Exective'],1],
['O',120,['E', 'J', 'N'],['CSIRT'],1],
]

def time_calc(tasks):
    #求める総時間(total_time)を初期化
    total_time = 0

    #リソース(担当者)を,taskから拾う
    resources = []
    for task in tasks:
        for resource in task[3]:
            resources.append(resource)
    resources = list(set(resources))  # 重複を消すために、setにした後、リストに戻す

    # 生産性
    productivity = {
        'CSIRT': 2,
    }
    # productivity={'CSIRT';2,'Sales':1,'Vender':1, 'Excective':1}などと書くのは、インポートするデータにどんな担当者が入っているか不明なので、以下で一括処理。
    for resource in resources:
        productivity.setdefault(resource, 1)

    #前処理:時間が0のものは、他タスクの「前工程」制限を外す
    for task in tasks:
        if task[1] == 0:
            for _task in tasks:  # _を付けているのは、_無しのtaskと区別したいから
                if task[0] in _task[2]:
                    _task[2].remove(task[0])

    # ガントチャートの元データを格納するlist
    gantt_source = []

    #本処理:
    i = 0
    while 1: #無限ループ。あとの処理で、10000回でSTOP
        worked_flag = {} #各リソースにおいて、その1分間で仕事をしたかのFLAG。初期値は、まだしていない(=False)
        for resource in resources:
            # CSIRT:2 だとしたら worked_flag[CSIRT] = [False, False] こう書いているのと同じ。
            worked_flag[resource] = [False] * productivity[resource]

        # タスクを実行
        for task in tasks:
            # 複数のタスクの中で、実行可能なタスクかを判断。実行可能でない場合はcontinueで抜けて、次のtaskへ
            if task[1] == 0:  # 残り時間が0なら実行不可。
                continue  #このforループにおいて、次のtaskに進む
            if task[2]:  # task[2]=前工程。空だったらFalseで、要素が存在すればTrue。前工程がある場合は、実行不可なので、次のタスクへ。
                continue
            someone_is_busy = False  # タスクには複数の担当者が割り当てられていることがある。その担当者(リソース)が、ひとりでも仕事済み(busy)かどうかを判断する。→その場合はタスクの実行不可
            for resource in resources:
                # 生産性の高いリソースについては、 [True, True] のようにすべてのフラグが
                # True になっている(all)とき、 is_busy であると判断
                if resource in task[3] and all(worked_flag[resource]):  # allは、全ての要素がTrueかどうかを判断するもの。 
                    # たとえば、CSIRT の担当タスクだけれど、 CSIRT はもう作業済み -> 実行不可。
                    # この if に入るということは、誰かひとりが busy ということなので True にする。
                    someone_is_busy = True
                    break  # 誰か一人でもbusyならば、実行不可なので、抜けてその下へ
            if someone_is_busy:
                # 担当者(リソース)が誰かひとりでも忙しい -> 実行不可。
                continue

            # ここに来た(つまり、continueされなかった)ということは、タスクが実行できるということ。このタスクを実行(=残り時間を1分減らす)。

            # この変数は、担当者(リソース)の余力を示す。
            # 余力というのは、 worked_flag の中にある False の数
            remaining_power = 1
            # この if は「タスクの担当者がひとり」という意味。
            # NOTE: 仕様のためです。「複数の担当者を持つタスクについては生産性を考慮しなくていい」
            #       複数の担当者を持つなら、これまでどおり担当者の余力は1です。
            if len(task[3]) == 1:
                resources_for_this_task = task[3]  # これは list
                resource_name_for_this_task = resources_for_this_task[0]  # これは str
                # このリソースのフラグの中に、いくつ False があるか、数えています。
                # 上のコメントに書いたとおり、 False の数が余力の数です。
                remaining_power = worked_flag[resource_name_for_this_task].count(False)
            # 余力の数だけ残り時間を減らします。
            for _ in range(remaining_power):
                # いくら余力があっても残り時間が0だったら、
                # これ以上タスクはできません。(人あまりの状態)
                if task[1] > 0:
                    task[1] -= 1
                else:
                    break

                # タスクが実行されたので、worked_flagを True にする。
                for resource in task[3]:
                    # ちなみに、ここまで処理が進んでいる時点で、
                    # worked_flag に False があることは保障されています。indexは、最初にFalseの場所を探す
                    worked_flag[resource][worked_flag[resource].index(False)] = True

            # タスクが実行される = ガントチャートに1分の内容を追加。
            gantt_source.append(
                dict(
                    Task=task[0],
                    Start=total_time,
                    Finish=total_time + 1,
                    Resource=' & '.join(task[3])),  # 'CSIRT' とか ['CSIRT', 'Sales']を こうする --> 'CSIRT & Sales'
            )

            # ここで for 終わり。
            # これはタスクリストの中のひとつのタスクを見終わったということ。
            # HACK: 関数化したりしてネストを短くしたり減らしたりしないと
            #       そろそろバグの温床になると思う。

        # 完了(残り時間0)したタスクがあれば、他のタスクの「前工程」制限を外す。
        for task in tasks:
            if task[1] == 0:
                for _task in tasks:  # _を付けているのは、_無しのtaskと区別したいから
                    if task[0] in _task[2]:
                        _task[2].remove(task[0])

        # タスクが実行されたのなら、総時間を1分経過させる。
        for resource in resources:
            if any(worked_flag[resource]):
                # 総経過時間を+1します。
                total_time += 1
                break

        # タスクが実行されなかったのなら、実行できるタスクが無いということで、終了。
        else:
            break

        # 無限ループ防止の保険。
        if i == 10000:
            break
    return gantt_source,total_time

arr_time = [] 
for i in range(1000):
    # tasks をそのまま渡すと、 time_calc 関数は tasks データ自体を書き換えるので、コピーを作る。
    tasks_copy = copy.deepcopy(tasks)

    #乱数を加える。乱数のパラメータによって、処理を変える。
    for j in range(len(tasks_copy)):
        if tasks_copy[j][1] != 0:  # 時間が0の場合は何もしない
            if tasks_copy[j][4] == 1:
               tasks_copy[j][1] = round(random.normalvariate(tasks_copy[j][1],5))
            if tasks_copy[j][4] == 2:
                # tasks_copy[j][1] = round(random.normalvariate(tasks_copy[j][1]*0.8,1)+random.expovariate(1/tasks_copy[j][1]/4))  # 時間の0.8倍を正規分布+時間の1/4を平均値とした指数分布
                tasks_copy[j][1] = round(random.normalvariate(tasks_copy[j][1]*0.8,1)+random.expovariate(1/2)*60)  # 時間の0.8倍を正規分布+時間の1/4を平均値とした指数分布

    gantt_source, total_time = time_calc(tasks_copy)
    arr_time.append(total_time) #配列に結果を入れていく

#以下、Total_timeの結果のヒストグラムを作成
np_arr = np.array(arr_time) #リストをnp配列に変換した。リストのままでもグラフが描けるのであれば、それでもいいと思う。
fig, ax = plt.subplots() 
ax.hist((np_arr), bins=100)  #100に分割したヒストグラムという意味だと思う。この値を大きくすると、詳細なグラフになる。

#ここからは、基準となる(乱数無し)のデータのガントチャートを作る
# 1分刻みをまとめる
tasks_copy = copy.deepcopy(tasks)  # 大元のtasksを変えずに、コピーする 
gantt_source, total_time = time_calc(tasks_copy)

gantt_source_2 = []
for achievement in gantt_source:
    # この実績を前の実績と接続したら True になる変数。
    connected = False
    # 接続可能な実績があるかどうか探す。
    for _achievement in gantt_source_2:
        if _achievement['Task'] != achievement['Task']:
            # このタスクじゃない。
            continue
        if _achievement['Finish'] == achievement['Start']:
            # 前の実績の終わりと、この実績の始まりが同じ -> 接続できる。
            _achievement['Finish'] = achievement['Finish']
            # さきほど定義した「接続したら True になる変数」を True にする。
            connected = True
            break

    # 接続できなかったならば、これは新しく始めたタスクの実績。
    # dict を追加します。
    if not connected:
        gantt_source_2.append(achievement)
print(gantt_source_2)

# 所要時間(Finish - Start)の列を作っておく。これはグラフ上の表記用。
for row in gantt_source_2:
    row['Delta'] = row['Finish'] - row['Start']

print(pd.DataFrame(gantt_source_2))

# base_timeとして、実行した日の0時を基準にする。
base_date = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
for row in gantt_source_2:
    # plotlyの仕様で、StartとFinishは時刻表記にする。よって、ベース時刻に、StartとFinishのを足す。グラフの表記がどれが見やすいかという観点から、分ではなく、秒に足すことにした
    row['Start'] = base_date + datetime.timedelta(minutes=row['Start'])
    row['Finish'] = base_date + datetime.timedelta(minutes=row['Finish'])
    # datetime 型ではなくて、 str に直します。
    row['Start'] = row['Start'].strftime('%Y-%m-%d %H:%M')
    row['Finish'] = row['Finish'].strftime('%Y-%m-%d %H:%M')

# # list -> DataFrame にします。
df = pd.DataFrame(gantt_source_2)

print(df)

# # timeline 作る
fig = px.timeline(
    df,
    x_start='Start',
    x_end='Finish',
    y='Task',
    color='Resource',
    # ガントチャートのバーのまんなかに DataFrame の列を表示させる
    text='Delta',
)
# # 表の並び順を逆に。
fig.update_yaxes(autorange='reversed')

fig.show()
print(total_time)

f:id:seeeko:20210925194253p:plain