tools.sorter のソースコード
import sys
sys.path.append("../")
from utils import DfManager, single_to_double
from config import constants
[ドキュメント]
class Sorter:
"""Scratch作品を命令処理順にソートするためのクラス"""
__IF_BLOCKS = constants.IF_BLOCKS
__R_BLOCKS = constants.REPEAT_BLOCKS
__E_BLOCKS = constants.EVENT_BLOCKS
__VAR_BLOCKS = constants.VARIABLE_BLOCKS
__PRO_CALL_BLOCKS = constants.PROCEDURES_CALL
def __init__(self, project):
"""Sorterの初期化
Args:
project(dictionary): 作品のJSON
"""
self.__dfM = DfManager(
["BlockName", "Key", "Field", "node_id", "parent_id", "hash"]
)
self.__blocks = project["targets"][1]["blocks"]
self.__sprite = project["targets"][1]
self.__variables = self.__get_variables(project["monitors"])
self.__node_id = 0
[ドキュメント]
def to_csv(self, dir_path):
"""ソートされたブロックをCSVに保存
Args:
dir_path(str): CSVを保存するパス(ファイル名含む)
"""
self.__dfM.to_csv(dir_path)
[ドキュメント]
def sort_blocks(self):
"""ブロックを命令処理順にソートして取得
Returns:
Dataframe: ブロックを命令処理順にソートしたDataframeを返す
"""
self.__dfM.add_row(
[
self.__sprite["direction"],
self.__sprite["x"],
self.__sprite["y"],
None,
None,
None,
]
)
for block_hash, block in self.__blocks.items():
if "event" in block["opcode"]:
self.__write_blocks(block_hash)
return self.__dfM.get_df()
def __get_variables(self, monitors):
variables = {}
for monitor in monitors:
if monitor["opcode"] == "data_variable":
variables[monitor["id"]] = "0"
return variables
def __get_parent_index(self, block):
if block["parent"]:
df = self.__dfM.get_df()
return df[df["hash"] == block["parent"]].index.to_list()[0]
else:
return 0
def __categorize_blocks(self, block_name):
try:
if block_name in self.__E_BLOCKS:
return "EVENT"
elif block_name in self.__IF_BLOCKS:
return "IF"
elif block_name in self.__R_BLOCKS:
return "REPEAT"
elif block_name in self.__VAR_BLOCKS:
return "VARIABLE"
elif block_name in self.__PRO_CALL_BLOCKS:
return "CALL"
else:
return "NORMAL"
except Exception as e:
print(e)
def __has_forever_block(self, block_hash):
while True:
if self.__blocks[block_hash]["opcode"] == constants.FOREVER_BLOCK:
return True
if self.__blocks[block_hash]["next"]:
block_hash = self.__blocks[block_hash]["next"]
continue
else:
return False
def __get_variable_value(self, input):
if len(input) == 3:
if input[1][2] in self.__variables:
return self.__variables[input[1][2]]
else:
return "0"
else:
return input[1][1]
def __write_blocks(self, block_hash):
try:
block = self.__blocks[block_hash]
block_name = block["opcode"]
category = self.__categorize_blocks(block_name)
self.__node_id += 1
match category:
case "EVENT":
self.__node_id = 0
self.__dfM.add_row(
[
"SCRIPT",
None,
None,
self.__node_id,
self.__get_parent_index(block),
None,
]
)
self.__node_id += 1
if block_name == constants.EVENT_KEY_BLOCK:
key_name = self.__blocks[block_hash]["fields"]["KEY_OPTION"][0]
self.__dfM.add_row(
[
block_name,
key_name,
None,
self.__node_id,
self.__get_parent_index(block),
block_hash,
]
)
else:
self.__dfM.add_row(
[
block_name,
None,
None,
self.__node_id,
self.__get_parent_index(block),
block_hash,
]
)
if block["next"]:
self.__write_blocks(block["next"])
case "REPEAT":
self.__dfM.add_row(
[
block_name,
None,
None,
self.__node_id,
self.__get_parent_index(block),
block_hash,
]
)
if block_name == constants.REPEAT_BLOCK:
times = int(self.__get_variable_value(block["inputs"]["TIMES"]))
elif block_name == constants.FOREVER_BLOCK:
times = constants.REPEAT_TIMES
if block["inputs"]["SUBSTACK"]:
for key in range(times):
self.__write_blocks(block["inputs"]["SUBSTACK"][1])
if block["next"]:
self.__write_blocks(block["next"])
case "IF":
self.__dfM.add_row(
[
block_name,
None,
None,
self.__node_id,
self.__get_parent_index(block),
block_hash,
]
)
if block["inputs"]["SUBSTACK"]:
self.__write_blocks(block["inputs"]["SUBSTACK"][1])
case "VARIABLE":
if block_name == constants.SET_VARIABLE:
self.__variables[
block["fields"]["VARIABLE"][1]
] = self.__get_variable_value(block["inputs"]["VALUE"])
elif block_name == constants.CHANGE_VARIABLE:
self.__variables[block["fields"]["VARIABLE"][1]] = str(
float(self.__variables[block["fields"]["VARIABLE"][1]])
+ float(self.__get_variable_value(block["inputs"]["VALUE"]))
)
self.__dfM.add_row(
[
block_name,
None,
None,
self.__node_id,
self.__get_parent_index(block),
block_hash,
]
)
if block["next"]:
self.__write_blocks(block["next"])
case "CALL":
procedure_name = block["mutation"]["proccode"]
self.__dfM.add_row(
[
block_name,
None,
None,
self.__node_id,
self.__get_parent_index(block),
block_hash,
]
)
for block_hash, block2 in self.__blocks.items():
if block2["opcode"] == constants.PROCEDURES_DEFINE:
if (
self.__blocks[block2["inputs"]["custom_block"][1]][
"mutation"
]["proccode"]
== procedure_name
):
if block2["next"]:
self.__dfM.add_row(
[
block2["opcode"],
None,
None,
self.__node_id,
self.__get_parent_index(block2),
block_hash,
]
)
self.__write_blocks(block2["next"])
break
if not self.__has_forever_block(block_hash):
self.__write_blocks(block["next"])
case _:
if block["inputs"] != None:
for key, input in block["inputs"].items():
if isinstance(block["inputs"][key][1], list):
block["inputs"][key][1][1] = self.__get_variable_value(
input
)
self.__dfM.add_row(
[
block_name,
None,
single_to_double(str(block["inputs"])),
self.__node_id,
self.__get_parent_index(block),
block_hash,
]
)
elif block["fields"] != None:
self.__dfM.add_row(
[
block_name,
single_to_double(str(block["fields"])),
None,
self.__node_id,
self.__get_parent_index(block),
block_hash,
]
)
else:
self.__dfM.add_row(
[
block_name,
None,
None,
self.__node_id,
self.__get_parent_index(block),
block_hash,
]
)
if block["next"]:
self.__write_blocks(block["next"])
except Exception as e:
print(e)