diff --git a/README.md b/README.md index 2ff4709..5c86caf 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,8 @@ This document will describe the following: - What is an Algorithm Development Kit - Changes to Algorithm development - Example workflows you can use to create your own Algorithms. +- The Model Manifest System +- Datarobot MLOps integrations support ## What is an Algorithm Development Kit @@ -209,6 +211,142 @@ algorithm.init({"data": "https://i.imgur.com/bXdORXl.jpeg"}) ``` +## The Model Manifest System +Model Manifests are optional files that you can provide to your algorithm to easily +define important model files, their locations; and metadata - this file is called `model_manifest.json`. + +```json +{ + "required_files" : [ + { "name": "squeezenet", + "source_uri": "data://AlgorithmiaSE/image_cassification_demo/squeezenet1_1-f364aa15.pth", + "fail_on_tamper": true, + "metadata": { + "dataset_md5_checksum": "46a44d32d2c5c07f7f66324bef4c7266" + } + }, + { + "name": "labels", + "source_uri": "data://AlgorithmiaSE/image_cassification_demo/imagenet_class_index.json", + "fail_on_tamper": true, + "metadata": { + "dataset_md5_checksum": "46a44d32d2c5c07f7f66324bef4c7266" + } + } + ], + "optional_files": [ + { + "name": "mobilenet", + "source_uri": "data://AlgorithmiaSE/image_cassification_demo/mobilenet_v2-b0353104.pth", + "fail_on_tamper": false, + "metadata": { + "dataset_md5_checksum": "46a44d32d2c5c07f7f66324bef4c7266" + } + } + ] +} +``` +With the Model Manifest system, you're also able to "freeze" your model_manifest.json, creating a model_manifest.json.freeze. +This file encodes the hash of the model file, preventing tampering once frozen - forver locking a version of your algorithm code with your model file. + +```json +{ + "required_files":[ + { + "name":"squeezenet", + "source_uri":"data://AlgorithmiaSE/image_cassification_demo/squeezenet1_1-f364aa15.pth", + "fail_on_tamper":true, + "metadata":{ + "dataset_md5_checksum":"46a44d32d2c5c07f7f66324bef4c7266" + }, + "md5_checksum":"46a44d32d2c5c07f7f66324bef4c7266" + }, + { + "name":"labels", + "source_uri":"data://AlgorithmiaSE/image_cassification_demo/imagenet_class_index.json", + "fail_on_tamper":true, + "metadata":{ + "dataset_md5_checksum":"46a44d32d2c5c07f7f66324bef4c7266" + }, + "md5_checksum":"c2c37ea517e94d9795004a39431a14cb" + } + ], + "optional_files":[ + { + "name":"mobilenet", + "source_uri":"data://AlgorithmiaSE/image_cassification_demo/mobilenet_v2-b0353104.pth", + "fail_on_tamper":false, + "metadata":{ + "dataset_md5_checksum":"46a44d32d2c5c07f7f66324bef4c7266" + } + } + ], + "timestamp":"1633450866.985464", + "lock_checksum":"24f5eca888d87661ca6fc08042e40cb7" +} +``` + +As you can link to both hosted data collections, and AWS/GCP/Azure based block storage media, you're able to link your algorithm code with your model files, wherever they live today. + + +## Datarobot MLOps Integration +As part of the integration with Datarobot, we've built out integration support for the [DataRobot MLOps Agent](https://docs.datarobot.com/en/docs/mlops/deployment/mlops-agent/index.html) +By selecting `mlops=True` as part of the ADK `init()` function, the ADK will configure and setup the MLOps Agent to support writing content directly back to DataRobot. + + +For this, you'll need to select an MLOps Enabled Environment; and you will need to setup a DataRobot External Deployment. +Once setup, you will need to define your `mlops.json` file, including your deployment and model ids. + + +```json +{ + "model_id": "YOUR_MODEL_ID", + "deployment_id": "YOUR_DEPLOYMENT_ID", + "datarobot_mlops_service_url": "https://app.datarobot.com" +} +``` + +Along with defining your `DATAROBOT_MLOPS_API_TOKEN` as a secret to your Algorithm, you're ready to start sending MLOps data back to DataRobot! + + +```python +from Algorithmia import ADK +from time import time + +# API calls will begin at the apply() method, with the request body passed as 'input' +# For more details, see algorithmia.com/developers/algorithm-development/languages + +def load(state): + # Lets initialize the final components of the MLOps plugin and prepare it for sending info back to DataRobot. + state['mlops'] = MLOps().init() + return state + +def apply(input, state): + t1 = time() + df = pd.DataFrame(columns=['id', 'values']) + df.loc[0] = ["abcd", 0.25] + df.loc[0][1] += input + association_ids = df.iloc[:, 0].tolist() + reporting_predictions = df.loc[0][1] + t2 = time() + # As we're only making 1 prediction, our reporting tool should show only 1 prediction being made + state['mlops'].report_deployment_stats(1, t2 - t1) + + # Report the predictions data: features, predictions, class_names + state['mlops'].report_predictions_data(features_df=df, + predictions=reporting_predictions, + association_ids=association_ids) + return reporting_predictions + + +algorithm = ADK(apply, load) +algorithm.init(0.25, mlops=True) + + +``` + + + ## Readme publishing To compile the template readme, please check out [embedme](https://github.com/zakhenry/embedme) utility and run the following: diff --git a/README_template.md b/README_template.md index a2e756d..3c47c6a 100644 --- a/README_template.md +++ b/README_template.md @@ -8,6 +8,8 @@ This document will describe the following: - What is an Algorithm Development Kit - Changes to Algorithm development - Example workflows you can use to create your own Algorithms. +- The Model Manifest System +- Datarobot MLOps integrations support ## What is an Algorithm Development Kit @@ -55,6 +57,41 @@ Check out these examples to help you get started: ```python ``` +## The Model Manifest System +Model Manifests are optional files that you can provide to your algorithm to easily +define important model files, their locations; and metadata - this file is called `model_manifest.json`. + +```json +``` +With the Model Manifest system, you're also able to "freeze" your model_manifest.json, creating a model_manifest.json.freeze. +This file encodes the hash of the model file, preventing tampering once frozen - forver locking a version of your algorithm code with your model file. + +```json +``` + +As you can link to both hosted data collections, and AWS/GCP/Azure based block storage media, you're able to link your algorithm code with your model files, wherever they live today. + + +## Datarobot MLOps Integration +As part of the integration with Datarobot, we've built out integration support for the [DataRobot MLOps Agent](https://docs.datarobot.com/en/docs/mlops/deployment/mlops-agent/index.html) +By selecting `mlops=True` as part of the ADK `init()` function, the ADK will configure and setup the MLOps Agent to support writing content directly back to DataRobot. + + +For this, you'll need to select an MLOps Enabled Environment; and you will need to setup a DataRobot External Deployment. +Once setup, you will need to define your `mlops.json` file, including your deployment and model ids. + + +```json +``` + +Along with defining your `DATAROBOT_MLOPS_API_TOKEN` as a secret to your Algorithm, you're ready to start sending MLOps data back to DataRobot! + + +```python +``` + + + ## Readme publishing To compile the template readme, please check out [embedme](https://github.com/zakhenry/embedme) utility and run the following: diff --git a/adk/ADK.py b/adk/ADK.py index 9107bff..718ef98 100644 --- a/adk/ADK.py +++ b/adk/ADK.py @@ -3,8 +3,12 @@ import os import sys import Algorithmia +import os +import subprocess + from adk.io import create_exception, format_data, format_response from adk.modeldata import ModelData +from adk.mlops import MLOps class ADK(object): @@ -17,6 +21,7 @@ def __init__(self, apply_func, load_func=None, client=None): :param client: A Algorithmia Client instance that might be user defined, and is used for interacting with a model manifest file; if defined. """ + self.mlops = None self.FIFO_PATH = "/tmp/algoout" if client: @@ -29,8 +34,8 @@ def __init__(self, apply_func, load_func=None, client=None): if load_func: load_args, _, _, _, _, _, _ = inspect.getfullargspec(load_func) self.load_arity = len(load_args) - if self.load_arity != 1: - raise Exception("load function expects 1 parameter to be used to store algorithm state") + if self.load_arity not in (0, 1): + raise Exception("load function expects 0 parameters or 1 parameter to be used to store algorithm state") self.load_func = load_func else: self.load_func = None @@ -38,18 +43,18 @@ def __init__(self, apply_func, load_func=None, client=None): self.is_local = not os.path.exists(self.FIFO_PATH) self.load_result = None self.loading_exception = None - self.manifest_path = "model_manifest.json.freeze" - self.model_data = self.init_manifest(self.manifest_path) - - def init_manifest(self, path): - return ModelData(self.client, path) + self.manifest_path = "model_manifest.json" + self.mlops_path = "mlops.json" + self.model_data = ModelData(self.client, self.manifest_path) def load(self): try: if self.model_data.available(): self.model_data.initialize() - if self.load_func: + if self.load_func and self.load_arity == 1: self.load_result = self.load_func(self.model_data) + elif self.load_func: + self.load_result = self.load_func() except Exception as e: self.loading_exception = e finally: @@ -89,10 +94,20 @@ def write_to_pipe(self, payload, pprint=print): def process_local(self, local_payload, pprint): result = self.apply(local_payload) self.write_to_pipe(result, pprint=pprint) + + def mlops_init(self): + mlops_token = os.environ.get("DATAROBOT_MLOPS_API_TOKEN", None) + if mlops_token: + self.mlops = MLOps(mlops_token, self.mlops_path) + self.mlops.init() + else: + raise Exception("'DATAROBOT_MLOPS_API_TOKEN' was not found, please set to use mlops.") - def init(self, local_payload=None, pprint=print): + def init(self, local_payload=None, pprint=print, mlops=False): + if mlops and not self.is_local: + self.mlops_init() self.load() - if self.is_local and local_payload: + if self.is_local and local_payload is not None: if self.loading_exception: load_error = create_exception(self.loading_exception, loading_exception=True) self.write_to_pipe(load_error, pprint=pprint) diff --git a/adk/io.py b/adk/io.py index be2045c..ccaabfc 100644 --- a/adk/io.py +++ b/adk/io.py @@ -57,7 +57,7 @@ def create_exception(exception, loading_exception=False): response = json.dumps({ "error": { "message": str(exception), - "stacktrace": traceback.format_exc(), + "stacktrace": " ".join(traceback.format_exception(etype=type(exception), value=exception, tb=exception.__traceback__)), "error_type": error_type, } }) diff --git a/adk/mlops.py b/adk/mlops.py new file mode 100644 index 0000000..cd335a7 --- /dev/null +++ b/adk/mlops.py @@ -0,0 +1,61 @@ +import yaml +import json +import os +import subprocess + + +class MLOps(object): + spool_dir = "/tmp/ta" + agent_dir = "/opt/mlops-agent" + mlops_dir_name = "datarobot_mlops_package-8.1.2" + total_dir_path = agent_dir + "/" + mlops_dir_name + + def __init__(self, api_token, path): + self.token = api_token + if os.path.exists(path): + with open(path) as f: + mlops_config = json.load(f) + self.endpoint = mlops_config['datarobot_mlops_service_url'] + self.model_id = mlops_config['model_id'] + self.deployment_id = mlops_config['deployment_id'] + self.mlops_name = mlops_config.get('mlops_dir_name', 'datarobot_mlops_package-8.1.2') + if "MLOPS_SERVICE_URL" in os.environ: + self.endpoint = os.environ['MLOPS_SERVICE_URL'] + if "MODEL_ID" in os.environ: + self.model_id = os.environ['MODEL_ID'] + if "DEPLOYMENT_ID" in os.environ: + self.deployment_id = os.environ['DEPLOYMENT_ID'] + if not os.path.exists(self.agent_dir): + raise Exception("environment is not configured for mlops.\nPlease select a valid mlops enabled environment.") + + if self.endpoint is None: + raise Exception("'no endpoint found, please add 'MLOPS_SERVICE_URL' environment variable, or create an " + "mlops.json file") + if self.model_id is None: + raise Exception("no model_id found, please add 'MODEL_ID' environment variable, or create an mlops.json " + "file") + if self.deployment_id is None: + raise Exception("no deployment_id found, please add 'DEPLOYMENT_ID' environment variable, or create an " + "mlops.json file") + + def init(self): + os.environ['MLOPS_DEPLOYMENT_ID'] = self.deployment_id + os.environ['MLOPS_MODEL_ID'] = self.model_id + os.environ['MLOPS_SPOOLER_TYPE'] = "FILESYSTEM" + os.environ['MLOPS_FILESYSTEM_DIRECTORY'] = self.spool_dir + + with open(self.total_dir_path + '/conf/mlops.agent.conf.yaml') as f: + documents = yaml.load(f, Loader=yaml.FullLoader) + documents['mlopsUrl'] = self.endpoint + documents['apiToken'] = self.token + with open(self.total_dir_path + '/conf/mlops.agent.conf.yaml', 'w') as f: + yaml.dump(documents, f) + + subprocess.call(self.total_dir_path + '/bin/start-agent.sh') + check = subprocess.Popen([self.total_dir_path + '/bin/status-agent.sh'], stdout=subprocess.PIPE) + output = check.stdout.readlines()[0] + check.terminate() + if b"DataRobot MLOps-Agent is running as a service." in output: + return True + else: + raise Exception(output) diff --git a/adk/modeldata.py b/adk/modeldata.py index 39dadf9..893d28b 100644 --- a/adk/modeldata.py +++ b/adk/modeldata.py @@ -6,11 +6,13 @@ class ModelData(object): def __init__(self, client, model_manifest_path): - self.manifest_freeze_path = model_manifest_path - self.manifest_data = get_manifest(self.manifest_freeze_path) + self.manifest_reg_path = model_manifest_path + self.manifest_frozen_path = "{}.freeze".format(self.manifest_reg_path) + self.manifest_data = self.get_manifest() self.client = client self.models = {} self.usr_key = "__user__" + self.using_frozen = True def __getitem__(self, key): return getattr(self, self.usr_key + key) @@ -27,7 +29,6 @@ def data(self): output[without_usr_key] = __dict[key] return output - def available(self): if self.manifest_data: return True @@ -39,61 +40,86 @@ def initialize(self): raise Exception("Client was not defined, please define a Client when using Model Manifests.") for required_file in self.manifest_data['required_files']: name = required_file['name'] + source_uri = required_file['source_uri'] + fail_on_tamper = required_file.get('fail_on_tamper', False) + expected_hash = required_file.get('md5_checksum', None) if name in self.models: raise Exception("Duplicate 'name' detected. \n" + name + " was found to be used by more than one data file, please rename.") - expected_hash = required_file['md5_checksum'] - with self.client.file(required_file['source_uri']).getFile() as f: + with self.client.file(source_uri).getFile() as f: local_data_path = f.name real_hash = md5_for_file(local_data_path) - if real_hash != expected_hash and required_file['fail_on_tamper']: + if real_hash != expected_hash and fail_on_tamper: raise Exception("Model File Mismatch for " + name + "\nexpected hash: " + expected_hash + "\nreal hash: " + real_hash) else: self.models[name] = FileData(real_hash, local_data_path) def get_model(self, model_name): - if model_name in self.models: - return self.models[model_name].file_path - elif len([optional for optional in self.manifest_data['optional_files'] if - optional['name'] == model_name]) > 0: - self.find_optional_model(model_name) - return self.models[model_name].file_path + if self.available(): + if model_name in self.models: + return self.models[model_name].file_path + elif len([optional for optional in self.manifest_data['optional_files'] if + optional['name'] == model_name]) > 0: + self.find_optional_model(model_name) + return self.models[model_name].file_path + else: + raise Exception("model name " + model_name + " not found in manifest") else: - raise Exception("model name " + model_name + " not found in manifest") + raise Exception("unable to get model {}, model_manifest.json not found.".format(model_name)) def find_optional_model(self, file_name): - - found_models = [optional for optional in self.manifest_data['optional_files'] if - optional['name'] == file_name] - if len(found_models) == 0: - raise Exception("file with name '" + file_name + "' not found in model manifest.") - model_info = found_models[0] - self.models[file_name] = {} - expected_hash = model_info['md5_checksum'] - with self.client.file(model_info['source_uri']).getFile() as f: - local_data_path = f.name - real_hash = md5_for_file(local_data_path) - if real_hash != expected_hash and model_info['fail_on_tamper']: - raise Exception("Model File Mismatch for " + file_name + - "\nexpected hash: " + expected_hash + "\nreal hash: " + real_hash) + if self.available(): + found_models = [optional for optional in self.manifest_data['optional_files'] if + optional['name'] == file_name] + if len(found_models) == 0: + raise Exception("file with name '" + file_name + "' not found in model manifest.") + model_info = found_models[0] + self.models[file_name] = {} + source_uri = model_info['source_uri'] + fail_on_tamper = model_info.get("fail_on_tamper", False) + expected_hash = model_info.get('md5_checksum', None) + with self.client.file(source_uri).getFile() as f: + local_data_path = f.name + real_hash = md5_for_file(local_data_path) + if self.using_frozen: + if real_hash != expected_hash and fail_on_tamper: + raise Exception("Model File Mismatch for " + file_name + + "\nexpected hash: " + expected_hash + "\nreal hash: " + real_hash) + else: + self.models[file_name] = FileData(real_hash, local_data_path) + else: + self.models[file_name] = FileData(real_hash, local_data_path) + else: + raise Exception("unable to get model {}, model_manifest.json not found.".format(model_name)) + + def get_manifest(self): + if os.path.exists(self.manifest_frozen_path): + with open(self.manifest_frozen_path) as f: + manifest_data = json.load(f) + if check_lock(manifest_data): + return manifest_data + else: + raise Exception( + "Manifest FreezeFile Tamper Detected; please use the CLI and 'algo freeze' to rebuild your " + "algorithm's freeze file.") + elif os.path.exists(self.manifest_reg_path): + with open(self.manifest_reg_path) as f: + manifest_data = json.load(f) + self.using_frozen = False + return manifest_data else: - self.models[file_name] = FileData(real_hash, local_data_path) - - -def get_manifest(manifest_path): - if os.path.exists(manifest_path): - with open(manifest_path) as f: - manifest_data = json.load(f) - expected_lock_checksum = manifest_data.get('lock_checksum') - del manifest_data['lock_checksum'] - detected_lock_checksum = md5_for_str(str(manifest_data)) - if expected_lock_checksum != detected_lock_checksum: - raise Exception("Manifest FreezeFile Tamper Detected; please use the CLI and 'algo freeze' to rebuild your " - "algorithm's freeze file.") - return manifest_data + return None + + +def check_lock(manifest_data): + expected_lock_checksum = manifest_data.get('lock_checksum') + del manifest_data['lock_checksum'] + detected_lock_checksum = md5_for_str(str(manifest_data)) + if expected_lock_checksum != detected_lock_checksum: + return False else: - return None + return True def md5_for_file(fname): diff --git a/examples/mlops_hello_world/mlops.json b/examples/mlops_hello_world/mlops.json new file mode 100644 index 0000000..c2fbfae --- /dev/null +++ b/examples/mlops_hello_world/mlops.json @@ -0,0 +1,5 @@ +{ + "model_id": "YOUR_MODEL_ID", + "deployment_id": "YOUR_DEPLOYMENT_ID", + "datarobot_mlops_service_url": "https://app.datarobot.com" +} \ No newline at end of file diff --git a/examples/mlops_hello_world/requirements.txt b/examples/mlops_hello_world/requirements.txt new file mode 100644 index 0000000..47f13bc --- /dev/null +++ b/examples/mlops_hello_world/requirements.txt @@ -0,0 +1,4 @@ +algorithmia>=1.0.0,<2.0 +datarobot-mlops==8.0.7 +pyaml==21.10.1 +pillow<9.0 \ No newline at end of file diff --git a/examples/mlops_hello_world/src/Algorithm.py b/examples/mlops_hello_world/src/Algorithm.py new file mode 100644 index 0000000..1eeb104 --- /dev/null +++ b/examples/mlops_hello_world/src/Algorithm.py @@ -0,0 +1,32 @@ +from Algorithmia import ADK +from time import time + +# API calls will begin at the apply() method, with the request body passed as 'input' +# For more details, see algorithmia.com/developers/algorithm-development/languages + +def load(state): + # Lets initialize the final components of the MLOps plugin and prepare it for sending info back to DataRobot. + state['mlops'] = MLOps().init() + return state + +def apply(input, state): + t1 = time() + df = pd.DataFrame(columns=['id', 'values']) + df.loc[0] = ["abcd", 0.25] + df.loc[0][1] += input + association_ids = df.iloc[:, 0].tolist() + reporting_predictions = df.loc[0][1] + t2 = time() + # As we're only making 1 prediction, our reporting tool should show only 1 prediction being made + state['mlops'].report_deployment_stats(1, t2 - t1) + + # Report the predictions data: features, predictions, class_names + state['mlops'].report_predictions_data(features_df=df, + predictions=reporting_predictions, + association_ids=association_ids) + return reporting_predictions + + +algorithm = ADK(apply, load) +algorithm.init(0.25, mlops=True) + diff --git a/examples/mlops_hello_world/src/__init__.py b/examples/mlops_hello_world/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/requirements.txt b/requirements.txt index ccb528b..eda150c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ algorithmia>=1.7,<2 -six \ No newline at end of file +six +pyaml>=21.10,<21.11 \ No newline at end of file diff --git a/setup.py b/setup.py index f470b4a..8d34878 100644 --- a/setup.py +++ b/setup.py @@ -14,6 +14,7 @@ author_email='support@algorithmia.com', packages=['adk'], install_requires=[ + 'pyaml>=21.10,<21.11', 'six', ], include_package_data=True, diff --git a/tests/AdkTest.py b/tests/AdkTest.py index e6b4672..941d848 100644 --- a/tests/AdkTest.py +++ b/tests/AdkTest.py @@ -1,7 +1,7 @@ from adk import ADK - +from adk.modeldata import ModelData class ADKTest(ADK): def __init__(self, apply_func, load_func=None, client=None, manifest_path="model_manifest.json.freeze"): super(ADKTest, self).__init__(apply_func, load_func, client) - self.model_data = self.init_manifest(manifest_path) + self.model_data = ModelData(self.client, manifest_path) diff --git a/tests/test_adk_local.py b/tests/test_adk_local.py index 5592bfc..d4ed78a 100644 --- a/tests/test_adk_local.py +++ b/tests/test_adk_local.py @@ -23,7 +23,7 @@ def execute_example(self, input, apply, load=None): algo.init(input, pprint=lambda x: output.append(x)) return output[0] - def execute_manifest_example(self, input, apply, load, manifest_path="manifests/good_model_manifest.json.freeze"): + def execute_manifest_example(self, input, apply, load, manifest_path): client = Algorithmia.client() algo = ADKTest(apply, load, manifest_path=manifest_path, client=client) output = [] @@ -131,21 +131,22 @@ def test_manifest_file_success(self): actual_output = json.loads(self.execute_manifest_example(input, apply_successful_manifest_parsing, loading_with_manifest, manifest_path="tests/manifests/good_model_manifest" - ".json.freeze")) + ".json")) self.assertEqual(expected_output, actual_output) def test_manifest_file_tampered(self): - input = "Algorithmia" + input = 'Algorithmia' expected_output = {"error": {"error_type": "LoadingError", "message": "Model File Mismatch for squeezenet\n" "expected hash: f20b50b44fdef367a225d41f747a0963\n" "real hash: 46a44d32d2c5c07f7f66324bef4c7266", - "stacktrace": "NoneType: None\n"}} + "stacktrace": ''}} actual_output = json.loads(self.execute_manifest_example(input, apply_successful_manifest_parsing, loading_with_manifest, manifest_path="tests/manifests/bad_model_manifest" - ".json.freeze")) + ".json")) + actual_output['error']['stacktrace'] = '' self.assertEqual(expected_output, actual_output) diff --git a/tests/test_adk_remote.py b/tests/test_adk_remote.py index f0d69ba..5108dd7 100644 --- a/tests/test_adk_remote.py +++ b/tests/test_adk_remote.py @@ -173,7 +173,7 @@ def test_manifest_file_success(self): actual_output = self.execute_manifest_example(input, apply_successful_manifest_parsing, loading_with_manifest, manifest_path="tests/manifests/good_model_manifest" - ".json.freeze") + ".json") self.assertEqual(expected_output, actual_output)