Streamlit: Using cache not for performance but for repeatability

I came up with using random module for random selection from initial item of multi selectbox.

But my first implementation like below did not work well.

import random
import streamlit as st


def genRandIntWithoutCache():
    return random.randint(0, 3)

if __name__ == '__main__':
    
    color_list = ['yellow', 'red', 'blue', 'green']

    # Does not work well
    rand_index_01 = genRandIntWithoutCache()
    st.write(rand_index_01)
    options = st.multiselect(
        'What are your favorite colors?',
        color_list,
        color_list[rand_index_01],
        key="01"
    )
    st.write(options)

If you reloaded this page. you might see strange scene. You selected any items but every your action does not keep its state. This was caused by the feature of streamlit like being re-excecuted all of your code.

And then. our next step is to use "Cache" with decorator of it. The code below works well because of its caching. This makes its generating random number work only one time.

import random
import streamlit as st
from streamlit import caching


@st.cache
def genRandInt():
    return random.randint(0, 3)

def genRandIntWithoutCache():
    return random.randint(0, 3)

if __name__ == '__main__':
    
    color_list = ['yellow', 'red', 'blue', 'green']

    # Does not work well
    rand_index_01 = genRandIntWithoutCache()
    st.write(rand_index_01)
    options = st.multiselect(
        'What are your favorite colors?',
        color_list,
        color_list[rand_index_01],
        key="01"
    )
    st.write(options)

    # Works well
    rand_index_02 = genRandInt()
    st.write(rand_index_02)
    options = st.multiselect(
        'What are your favorite colors?',
        color_list,
        color_list[rand_index_02],
        key="02"
    )
    st.write(options)

Yes!! It works well. But is it really good?? In this time. user is only me. But in production. user will be many. If that random generator works only one time,this cause only one number generating. So next step,we add one step which make us possible to control the timing of doing cache and we do not need to use caching when generating random number.

import random
import streamlit as st
from streamlit import caching


@st.cache
def genRandInt():
    return random.randint(0, 3)

def genRandIntWithoutCache():
    return random.randint(0, 3)

if __name__ == '__main__':
    
    color_list = ['yellow', 'red', 'blue', 'green']

    # Does not work well
    rand_index_01 = genRandIntWithoutCache()
    st.write(rand_index_01)
    options = st.multiselect(
        'What are your favorite colors?',
        color_list,
        color_list[rand_index_01],
        key="01"
    )
    st.write(options)

    # Works well
    form = st.form(key="form")
    rand_index_02 = genRandIntWithoutCache()
    st.write(rand_index_02)
    options = form.multiselect(
        'What are your favorite colors?',
        color_list,
        color_list[rand_index_02],
        key="02"
    )
    st.write(options)

    # To control cache
    submitted = form.form_submit_button("Submit")
    if submitted:
        caching.clear_cache()

Testing numpy array

I tried to test below. But 4th assertion could not be passed.

import numpy as np

def test__sigmoid():

    assert round(sigmoid(-5), 7) == (0.0066929)
    assert round(sigmoid(0), 5) == (0.5000000)
    assert round(sigmoid(5), 5) == (0.99331)

    assert (sigmoid([4, 5, 6])).round(5) == np.array([0.98201, 0.99331, 0.99753])

The reason of its failure assertion line returns array with boolean like below.

sigmoid([4, 5, 6]).round(5) == np.array([0.98201, 0.99331, 0.99753])
# array([ True,  True,  True])

Then I found a testing module in numpy.

numpy.org

Then I revised test code like below and it worked well.

def test__sigmoid():

    assert round(sigmoid(-5), 7) == (0.0066929)
    assert round(sigmoid(0), 5) == (0.5000000)
    assert round(sigmoid(5), 5) == (0.99331)

    np.testing.assert_almost_equal(sigmoid([4, 5, 6]).round(5), np.array([0.98201, 0.99331, 0.99753]))

But if you unnoticed about decimal. this test will be failed.

Sort multi edge nodes with networkx in python

import

import networkx as nx
import numpy as np

Create multi graph

G = nx.MultiGraph()
G.add_nodes_from(["A", "B", "C", "D", "E", "F"])
G.add_edges_from([("A", "B"), ("B", "C"), ("B", "D"), ("C", "D"), ("A", "E"), ("C", "E"), ("C", "F"), ("C", "F"), ("F", "F")])
  • this graph has multi edge in between node C and F
  • in this case, we want to visualize its multi edges in matplotlib

 

Approach1. Convert data structure G to Adjacency matrix

 

  • "Adjacency matrix" is the way to describe network data structure by using matrix.
  • It is described as below

 

A = (nx.adjacency_matrix(G)).todense()

// output of A
matrix([[0., 1., 0., 0., 1., 0.],
[1., 0., 1., 1., 0., 0.],
[0., 1., 0., 1., 1., 2.],
[0., 1., 1., 0., 0., 0.],
[1., 0., 1., 0., 0., 0.],
[0., 0., 2., 0., 0., 1.]])

 

  • next, we want to sort its matrix with the filter.
  • The filter serves some nodes which have multi edges.
  • How to generate the filter
  • Flatten A to one dimension array or list
  • get index number from it by sorting
  • get node value from origin node list by using above index number

 

node_list = list(G)
A_flatten = np.ravel(A)
idx_sorted_list = np.where(A_flatten >= 2)[0] # return value is tuple
[node_list[(I % len(G.nodes))] for I in idx_sorted_list]

//
['F', 'C']

 

Then after, let's try to apply this way to existed dataset. In this time, we use zachary's karate club graph.

 

source code  

import networkx as nx
import numpy as np
import matplotlib.pyplot as plt

def get_multi_edge_nodes(G, threshold):
    node_list = list(G)
    A = nx.adjacency_matrix(G)
    A_flatten = np.ravel(A.todense())
    idx_sorted_list = np.where(A_flatten >= threshold)[0] # return value is tuple
    node_multi_edge_list = [node_list[(I % len(G.nodes))] for I in idx_sorted_list]
    return set(node_multi_edge_list)


def rand_edges(node_num, edge_num):
    def randint(low, high, size):
        return np.random.randint(low, high, size)
    return [(s, t) for s, t in zip(randint(0, node_num, edge_num), randint(0, node_num, edge_num))]

 

G_01 = nx.karate_club_graph()
G_02 = nx.MultiGraph()
G_02.add_edges_from(G_01.edges)
G_02.add_edges_from(rand_edges(len(G_02), 20))

 

ret1 = get_multi_edge_nodes(G_01, 2)
ret2 = get_multi_edge_nodes(G_02, 2)

 

G_02_sub = G_02.subgraph(ret2)

 

fig, (ax1, ax2) = plt.subplots(1, 2)
nx.draw_networkx(G_01, ax=ax1)
nx.draw_networkx(G_02_sub, ax=ax2)

plt.savefig('test.png')

 f:id:kichinosukey:20210330152449p:plain

Streamlitでインタラクティブな地図にデータを可視化する

この記事は何か

  • Streamlitを使ってインタラクティブなデータ可視化を地図上で試みたい人のためのハウツーを記載
  • 本記事で使用するコードのレポジトリは↓

github.com

完成イメージ

f:id:kichinosukey:20200628232023p:plain

どんなコードか

まずダッシュボードのタイトルを設定

st.title("Garbage Distribution in Tochigi")

データを生成(デモなのでランダムで)

data= generate_data()

地図上にデータをプロット

st.write(pdk.Deck(
        map_style="mapbox://styles/mapbox/light-v9",
        initial_view_state={
            "latitude": PARK[0][0],
            "longitude": PARK[0][1],
            "zoom": 11,
            "pitch": 50,
        },
        layers=[
            pdk.Layer(
                "HexagonLayer",
                data=data,
                get_position=["lon", "lat"],
                radius=100,
                elevation_scale=4,
                elevation_range=[0, 1000],
                pickable=True,
                extruded=True,
            ),
        ],
    ))

棒グラフも表示

st.altair_chart(alt.Chart(data)
        .mark_bar().encode(
            x=alt.X("day:Q", scale=alt.Scale(nice=False)),
            y=alt.Y("q:Q"),
            tooltip=['day', 'q']
        ), use_container_width=True)

チェックボックスで生データ表示を切り替え

if st.checkbox("Show raw data", False):
        st.write(data)

所感

  • Streamlitを使った部分の実装については30分もかからなかったのでめちゃお手軽
  • クラウド上へのデプロイ手法を身につけとけば企画〜実装のサイクルがめちゃ早まる
  • pydeckについての理解はほとんど必要なかった
    • APIについての理解が深まればもっと柔軟な設定ができそう
  • AltairのAPIは多少理解しておかないとStreamlitのAPIだけだと使いづらいかな・・・


Python&Rの時系列予測ライブラリ Prophetとは

始めに

  • 今回はペーパーの冒頭を斜め読み+チュートリアルの触りをこなしてみた
  • 超導入としてのメモとして記しておく

Prophet とは

  • 非線形のトレンドを持つ時系列データの予測にまつわるタスクの多くを自動化
  • Facebookが作った
  • Python と Rで提供されている

facebook.github.io

何を解決したいのか

  • ドメイン知識を持つアナリストによる予測をより簡単にしたい
    • 直感的かつ解釈可能なモデルで
    • 調整をより柔軟に
    • 一般的にデータアナリストはドメイン知識を有するが、時系列解析のトレーニングは受けていないため

どんなモデルか

 y(t) = g(t) + s(t) + h(t) + \epsilon_{t \cdot}

最後に

  • モデルの誤差項は正規分布とのことで、今後他の分布を想定したモデルに拡張していくのか?が感じた疑問。
    • ペーパーの詳細見ればわかるのかな?

「ネットワーク思考のすすめーネットセントリック時代の組織戦略」を読んだ

www.amazon.co.jp

本書の問い

  • 環境変化に役立つ普遍的な原理とは
    • 解法としてのネットワーク理論、社会システム理論

他研究との違い

  • シミュレーションによる機械的な結論等の器物的なネットワーク分析ではない
    • 既存の成果を批判的に摂取しながら道の領域を開拓
    • 最新のシステム論の知見も織り込む

大事なこと

初感

  • 後半の社会システム論は難解でかなり飛ばし気味に読んだ
  • 中盤までは納得感もあり、うまく言語化されている

python streamlitで超お手軽データ可視化ダッシュボード作成してみた

streamlitヤバイ

  • FlaskとJsでやってたことが一瞬でできる
  • なんならそれ以上のことができる(サイドバーや進捗バー表示、チェックボックスとか)
    • html/cssの設定要らず
  • 学習コストが非常に低い

とりあえずやってみたこと

docs.streamlit.io

  • 写経 + α

github.com