Normalize Json data to structured dataframes

def on_event_handler(json_data: json):        
    # The latest data read is in the json_data json.
    data = json.loads(json_data)
    df = pd.json_normalize(data['results']) # Normalized in a dataframe
    df = df.drop('var5', axis=1) # We don't need the column var5
    df = df.dropna() # Drop nulls

    # Publish/write df data into our import stream


Engineer new features not available in source data

def on_data_handler(df: DataFrame):        
    # The latest data read from json_import_stream is in the dataframe df
    # Modify the temperature column from Celsius to Kelvin
    df['temp'] = 273.15 + df['temp']

    # Standardize the numerical var1. We need external data (avg & std)
    nonlocal avg  # Use the historical average
    nonlocal std  # Use the historical standard deviation
    df['var1'] = (df['var1'] - avg)/std

    # Publish/write df data into output stream

json_import_stream.parameters.on_read_pandas += on_data_handler

Perform ML model predictions

ml_model = pickle.load("ml-model.pkl") # ML artifact loaded in memory as ml_model
def on_data_handler(df: DataFrame):        
    # List of variables the model was trained with
    var_list = ['temp', 'var1', 'var2', 'var3', 'var4']

    # Use the model to generate predictions and save in a new column
    df['pred_score'] = ml_model.predict(df[var_list])   
    df['pred_class'] = df['pred_score'].astype(int) # 0s & 1s

    # Publish/write df data into predicted_stream

prepared_stream.parameters.on_read_pandas += on_data_handler

Act on the ML model results

def on_data_handler(df: DataFrame):

    # Let's say 1s are anomalies. Are there any?
    if df['pred_class'].sum() > 0:
      # Send anomaly id's to channel in Slack, for example
      slack.send("Alert", df.loc[df['pred_class']==1,'id'])

predicted_stream.parameters.on_read_pandas += on_data_handler


