tag:blogger.com,1999:blog-68625082024-03-21T12:26:39.922+01:00Alex Ott's blogBlog dedicated to Software Development, Unixes, Content Filtering, Emacs, Lisp, and other things.Alex Otthttp://www.blogger.com/profile/13001951608173211050noreply@blogger.comBlogger379125tag:blogger.com,1999:blog-6862508.post-27559810108808876592023-12-31T15:06:00.000+01:002023-12-31T15:06:19.097+01:00Traditional New Year post, 2023rd edition<p>
Today is the last day of the year, and it's time for a traditional blog post with a review of the year.
</p>
<p>
From the professional side, it was another busy but very interesting year with many activities across multiple areas. For me it was primarily cloud infrastructure, security, all things automation, disaster recovery, migrations, and related areas. I tried to reflect on this in my <a href="https://www.linkedin.com/pulse/3-years-databricks-alex-ott/">post on three years at Databricks</a> published on LinkedIn, and it's also visible from the range of topics of <a href="https://www.databricks.com/blog/author/alex-ott">blog posts published this year</a>.
</p>
<p>
From my point of view the automation (cloud infra, security, DevOps & CI/CD, …) is a critical part of the project's success, and this was one of the most significant parts of my work. Terraform is a robust tool for automation, and I did spend a considerable amount of time on the related work:
</p>
<ul class="org-ul">
<li>More than 150 pull requests were merged into <a href="https://github.com/databricks/terraform-provider-databricks">Databricks Terraform provider</a> - not only the new functionality or bug fixes but also quite a lot of work was done on <a href="https://registry.terraform.io/providers/databricks/databricks/latest/docs/guides/experimental-exporter">Terraform exporter</a> that is heavily used for environment migrations and disaster recovery projects.</li>
<li>In May <a href="https://www.databricks.com/blog/announcing-terraform-databricks-modules">we announced Terraform modules for Databricks</a> - reusable code that helps customers to build their Databricks infrastructure faster, and we're working on including more modules so customers will be able just to combine necessary pieces to get their infrastructure ready to use.</li>
<li>A lot of internal work on enablement around Terraform adoption - some parts of it will be presented in the <a href="https://pages.databricks.com/databricks-specialist-sessions.html">upcoming webinar</a>.</li>
</ul>
<p>
Besides Terraform, quite a lot of work (PRs, GH issues, …) was done with the engineering team responsible for the developer ecosystem - new Databricks SDKs for Go and Python languages and the new Databricks CLI. With these new tools, it's much easier to develop additional tools for Databricks (like <a href="https://github.com/databrickslabs/sandbox/tree/main/ip_access_list_analyzer">this</a>) or automate some boring tasks.
</p>
<p>
This year, a few projects related to cybersecurity kicked off, and hopefully, we'll get more work in this area where I have significant experience and where Databricks and Apache Spark are the natural fit. Modern cybersecurity is a big data domain with challenges around large-scale real-time data processing, data normalization, threat detection, and reporting. Technologies like Delta Live Tables not only simplify development and deployment of scalable data processing pipelines, but they also include features like <a href="https://www.databricks.com/blog/2022/12/08/build-reliable-and-cost-effective-streaming-data-pipelines.html">enhanced autoscaling</a> that allow to automatically scale pipelines up and down, providing cost-efficient way of handling spiky workloads that are natural for cybersecurity (we had that challenges back at McAfee).
</p>
<p>
In February, Databricks celebrated ten years, and attending the company kick-off event in Las Vegas was interesting. For me, it was a chance to finally meet people in person after working with many of them for 2.5 years. It was also the first long-distance business trip since the pandemic began almost three years ago. Although frankly speaking, I can't say that I miss these trips - it's interesting to meet people, but travel takes too much time, so I need to wait for teleportation :-)
</p>
<p>
With all this, I'm looking forward to what the new year will bring. And I wish a happy New Year to all!
</p>
Alex Otthttp://www.blogger.com/profile/13001951608173211050noreply@blogger.com0tag:blogger.com,1999:blog-6862508.post-62976392114752408822023-10-28T13:00:00.005+02:002023-10-28T13:18:07.657+02:00Delta Live Tables recipes: Consuming from Azure Event Hubs using OAuth 2.0/OIDC authentication<p>Last year,<a href="https://alexott.blogspot.com/2022/06/delta-live-tables-recipes-consuming.html">I blogged</a> about consuming data from the Azure Event Hubs with Delta Live Tables (DLT). That blog post showed how to do that using Apache Kafka client that is bundled together with Databricks Runtime that is used by DLT.</p>
<p>That example used Shared Access Signatures (SAS) generated for a specific Event Hubs namespace or a topic. However, in many organizations, the use of SAS is prohibited because it’s a long-living token that is potentially risky to use. Instead, it’s recommended to use short-living tokens of service principals that need to be <a href="https://learn.microsoft.com/en-us/entra/identity-platform/v2-oauth2-client-creds-grant-flow">generated according to the OIDC/OAuth 2.0 specification</a>. These tokens need to be periodically refreshed, which should be done automatically by a consumer.</p>
<p>Before Databricks Runtime 12.2 was released earlier this year, DBR versions were using 2.x versions of Apache Kafka clients that didn’t support OAuth/OIDC authentication, so I even created a <a href="https://github.com/alexott/databricks-playground/tree/main/kafka-eventhubs-aad-auth">simple library</a> that could be used with Databricks clusters to generate and refresh OAuth tokens. But we still had a problem using it on DLT as we can’t attach jar libraries to the DLT pipeline.</p>
<p>Things had changed in DBR 12.2, which upgraded the Apache Kafka clients library, and it now has built-in support for OAuth 2.0/OIDC authentication flows (see <a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575">KIP-768</a> for more details), so it’s now just a matter of correct configuration to start consuming from the Azure Event Hubs topic using an Azure service principal. To make it work, we need a service principal ID, secret, and Azure Tenant ID - using this data, we can construct the correct SASL configuration string. We also need to grant the service principal a corresponding role on Azure Event Hubs (“ Azure Event Hubs Data Receiver” for reading data or “Azure Event Hubs Data Sender” for writing data). </p>
<p>The complete example of a DLT pipeline that consumes from Event Hubs topic looks as follows:</p>
<pre>
<span style="color: #a020f0;">import</span> pyspark.sql.functions <span style="color: #a020f0;">as</span> F
<span style="color: #a020f0;">import</span> dlt
<span style="color: #a0522d;">topic</span> = <span style="color: #008b00;">"<topic>"</span>
<span style="color: #a0522d;">eh_namespace_name</span> = <span style="color: #008b00;">"<eh_namespace_name>"</span>
<span style="color: #a0522d;">eh_server</span> = f<span style="color: #008b00;">"</span>{eh_namespace_name}<span style="color: #008b00;">.servicebus.windows.net"</span>
<span style="color: #7f7f7f;"># </span><span style="color: #7f7f7f;">Data for service principal are stored in the secret scope
</span><span style="color: #a0522d;">tenant_id</span> = dbutils.secrets.get(<span style="color: #008b00;">"scope"</span>, <span style="color: #008b00;">"tenant_id"</span>)
<span style="color: #a0522d;">client_id</span> = dbutils.secrets.get(<span style="color: #008b00;">"scope"</span>, <span style="color: #008b00;">"sp-id"</span>)
<span style="color: #a0522d;">client_secret</span> = dbutils.secrets.get(<span style="color: #008b00;">"scope"</span>, <span style="color: #008b00;">"sp-secret"</span>)
<span style="color: #7f7f7f;"># </span><span style="color: #7f7f7f;">Generate SASL configuration string (it's split to fit into the screen)
</span><span style="color: #a0522d;">sasl_config</span> = f<span style="color: #008b00;">'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule'</span> + \
f<span style="color: #008b00;">' required clientId="</span>{client_id}<span style="color: #008b00;">" clientSecret="</span>{client_secret}<span style="color: #008b00;">"'</span> + \
f<span style="color: #008b00;">' scope="https://</span>{eh_server}<span style="color: #008b00;">/.default" ssl.protocol="SSL";'</span>
<span style="color: #7f7f7f;"># </span><span style="color: #7f7f7f;">Create Kafka options dictionary
</span><span style="color: #a0522d;">callback_class</span> = <span style="color: #008b00;">"kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"</span>
<span style="color: #a0522d;">oauth_endpoint</span> = f<span style="color: #008b00;">"https://login.microsoft.com/</span>{tenant_id}<span style="color: #008b00;">/oauth2/v2.0/token"</span>
<span style="color: #a0522d;">kafka_options</span> = {
<span style="color: #008b00;">"kafka.bootstrap.servers"</span>: f<span style="color: #008b00;">"</span>{eh_server}<span style="color: #008b00;">:9093"</span>,
<span style="color: #008b00;">"subscribe"</span>: topic,
<span style="color: #008b00;">"startingOffsets"</span>: <span style="color: #008b00;">"earliest"</span>,
<span style="color: #008b00;">"kafka.security.protocol"</span>: <span style="color: #008b00;">"SASL_SSL"</span>,
<span style="color: #008b00;">"kafka.sasl.mechanism"</span>: <span style="color: #008b00;">"OAUTHBEARER"</span>,
<span style="color: #008b00;">"kafka.sasl.jaas.config"</span>: sasl_config,
<span style="color: #008b00;">"kafka.sasl.oauthbearer.token.endpoint.url"</span>: oauth_endpoint,
<span style="color: #008b00;">"kafka.sasl.login.callback.handler.class"</span>: callback_class,
}
<span style="color: #0000ee;">@dlt.table</span>
<span style="color: #a020f0;">def</span> <span style="color: #b22222;">bronze</span>():
<span style="color: #a0522d;">df</span> = spark.readStream.<span style="color: #483d8b;">format</span>(<span style="color: #008b00;">"kafka"</span>).options(**kafka_options).load()
<span style="color: #a020f0;">return</span> df.withColumn(<span style="color: #008b00;">"value"</span>, F.col(<span style="color: #008b00;">"value"</span>).cast(<span style="color: #008b00;">"string"</span>))
</pre>
<p>The only change necessary to make it work on Databricks is to prepend kafkashaded to the class names because the Apache Kafka client is shaded.</p>Alex Otthttp://www.blogger.com/profile/13001951608173211050noreply@blogger.com0tag:blogger.com,1999:blog-6862508.post-76005175213252399092022-12-31T16:31:00.002+01:002023-01-01T14:28:56.384+01:00Looking back to 2022nd<p>
It's the last day of the year, and it's time to write a traditional "year in review" blog post.
</p>
<p>
On professional side it was very intensive & interesting year. I'm still working with customers, although my role has changed a bit - now I belong to a group of specialist solution architects, working with customers on advanced use cases in specific areas. For me it's an interesting mix of data engineering, platform, security, data governance, devops, cybersecurity, …, and ability to work with big enterprise customers. Work with customers was tightly connected with other activities - blogging, internal & external knowledge sharing, contributing to internal & open source projects, working with product teams in releasing new functionality, etc.
</p>
<p>
The significant amount of work was done for <a href="https://github.com/databricks/terraform-provider-databricks">Databricks Terraform provider</a>. The most significant event was that <a href="https://www.databricks.com/blog/2022/06/22/databricks-terraform-provider-is-now-generally-available.html">Databricks Terraform provider reached version 1.0 and became a fully supported part of Databricks portfolio</a>, and continues to be a <a href="https://www.linkedin.com/feed/update/urn:li:activity:7009602440190676994/">very popular tool between Databricks customers</a>. Although the provider now is a part of the product, the field team continues actively contributing to its functionality - knowing how people are using it is a very important aspect of developing tools for end-users. From my side, during the year there were more than 80 merged pull requests, with quite a bit of work in the last months on the <a href="https://registry.terraform.io/providers/databricks/databricks/latest/docs/guides/experimental-exporter">exporter functionality</a> that allows users to quickly start to maintain existing Databricks resources with Terraform.
</p>
<p>
Databricks Terraform provider wasn't the only open source contribution this year. In the first half of the year I had a possibility to continue contributions to Apache Airflow, not only fixing bugs or improving existing Airflow operators, but also adding new functionality, like <a href="https://www.databricks.com/blog/2022/04/29/build-data-and-ml-pipelines-more-easily-with-databricks-and-apache-airflow.html">support for Databricks SQL</a> that simplifies data ingestion from different data sources into Delta Lake tables. Plus there were many contributions to projects under the <a href="https://github.com/databrickslabs/">Databricks Labs</a> & <a href="https://github.com/orgs/databricks/repositories">Databricks</a> umbrellas, and quite a lot of work (code samples/demos/…) inside <a href="https://github.com/alexott?tab=repositories">personal repositories</a>…
</p>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjyVAT7NFRXgW7Lz6ZfEHVZmLkbdCCj4UIgcE3qx_bkB9YoeB-EdSNvV4Qo3StVF_Cx57-Wr8kYM2v40nfDIdbsk-VbBMyFGodXuZkLkMCQO1Dh4reOAdxWuKc-Sk5OHCncueWEdp7YIT1cuzTDY-gU2_0z8X9xJpZLYuMnVRGwpyxutN-tVB4/s824/%D0%A1%D0%BD%D0%B8%D0%BC%D0%BE%D0%BA%20%D1%8D%D0%BA%D1%80%D0%B0%D0%BD%D0%B0%202022-12-31%20%D0%B2%2015.03.54.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="209" data-original-width="824" height="162" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjyVAT7NFRXgW7Lz6ZfEHVZmLkbdCCj4UIgcE3qx_bkB9YoeB-EdSNvV4Qo3StVF_Cx57-Wr8kYM2v40nfDIdbsk-VbBMyFGodXuZkLkMCQO1Dh4reOAdxWuKc-Sk5OHCncueWEdp7YIT1cuzTDY-gU2_0z8X9xJpZLYuMnVRGwpyxutN-tVB4/w640-h162/%D0%A1%D0%BD%D0%B8%D0%BC%D0%BE%D0%BA%20%D1%8D%D0%BA%D1%80%D0%B0%D0%BD%D0%B0%202022-12-31%20%D0%B2%2015.03.54.png" width="640" /></a></div><p></p>
<p>
This year I tried to return to blogging. Besides <a href="https://alexott.blogspot.com/2022/">publishing in the personal blog</a>, I managed to co-author <a href="https://www.databricks.com/blog/author/alex-ott">five blog posts in the company blog</a> on different topics. I'm planning to continue writing in both blogs, having already few drafts in work.
</p>
<p>
Continuing to <a href="https://stackoverflow.com/users/18627/alex-ott">answer on StackOverflow</a> was another form of external knowledge sharing about all things Databricks, Delta Lake, Apache Spark, etc. and sometimes I hear from customers that they know me because of answers. This year I managed to get a gold badge (score of 1000) for the <a href="https://stackoverflow.com/questions/tagged/databricks">databricks</a> tag.
</p>
<p>
Another thing that I managed to do this year is to get back to more cybersecurity-related work - the area where I have good practical experience. It was in the different forms - two blog posts (<a href="https://www.databricks.com/blog/2022/07/19/building-a-cybersecurity-lakehouse-for-crowdstrike-falcon-events-part-ii.html">1</a>, <a href="https://www.databricks.com/blog/2022/12/16/building-cybersecurity-lakehouse-crowdstrike-falcon-events-part-iii.html">2</a>) about working with CrowdStrike data in the company blog, <a href="https://alexott.blogspot.com/2022/10/ingesting-indicators-of-compromise-with.html">one post</a> in personal blog, writing a lot of code for ingestion & enrichment of different data sources (not open yet), helping customers to build cybersecurity lakehouses, … Cybersecurity is a big data area, where Apache Spark/Databricks are a natural fit.
</p>
<p>
There were many other things that happened during this interesting year - it's a pleasure to work surrounded by many talented colleagues, and I'm looking with hope into the next year.
</p>
<p>
Happy New Year!
</p>
Alex Otthttp://www.blogger.com/profile/13001951608173211050noreply@blogger.com0tag:blogger.com,1999:blog-6862508.post-23824139871210465552022-12-22T15:41:00.006+01:002023-04-28T20:16:26.331+02:00Delta Live Tables recipes: implementing unit & integration tests, and doing CI/CD<p>The extended & updated version of this blog post is <a href="https://www.databricks.com/blog/applying-software-development-devops-best-practices-delta-live-table-pipelines">published on the Databricks blog</a>.
</p>
Alex Otthttp://www.blogger.com/profile/13001951608173211050noreply@blogger.com2tag:blogger.com,1999:blog-6862508.post-5973325943099657202022-11-25T18:01:00.003+01:002022-12-20T09:14:22.442+01:00Cloud-agnostic resources deployment with Databricks Terraform Provider<p>
<a href="https://registry.terraform.io/providers/databricks/databricks/latest/docs">Databricks Terraform Provider</a> includes a number of the data sources that greatly simplify creation of portable Terraform templates. There are few classes of data sources related to compute, user & group management, and other topics. In practice, the most often used data sources are:
</p>
<ul class="org-ul">
<li><a href="https://registry.terraform.io/providers/databricks/databricks/latest/docs/data-sources/node_type">databricks_node_type</a> together with <a href="https://registry.terraform.io/providers/databricks/databricks/latest/docs/data-sources/spark_version">databricks_spark_version</a> allow to define jobs, clusters, instance pools & DLT pipelines that are cloud agnostic.</li>
<li><a href="https://registry.terraform.io/providers/databricks/databricks/latest/docs/data-sources/current_user">databricks_current_user</a> allows to avoid hard coding of paths to notebooks in jobs & DLT pipelines, so it's easy to move resources between environments, or avoid names conflicts - for example, when developing a job or a DLT pipeline could be created for each of the developers, and should point to a notebook for a given user, but in production environment, this job or DLT pipeline will be owned by service principal.</li>
<li><a href="https://registry.terraform.io/providers/databricks/databricks/latest/docs/data-sources/group">databricks_group</a> is heavily used to refer to predefined user groups, such as, <code>admins</code> or <code>users</code>, for example, when setting permissions to specific resources, or when adding users as workspace administrators (you can find examples in the documentation).</li>
</ul>
<p>
Let's look at how <code>databricks_node_type</code>, <code>databricks_spark_version</code>, and <code>databricks_current_user</code> could be used to create cloud agnostic Terraform templates. When you work with multiple clouds and define jobs or clusters, you need to specify node type - name of the instance type that will be used to run your code. The problem is that these names are cloud specific, and in some cases people resolve to ugly code like <code>node_type_id = (var.cloud == "aws") ? "c5d.2xlarge" : (var.cloud == "azure" ? "Standard_F8s" : "c2-standard-8")</code> that is hard to read & support (and it will break if Databricks will add support for another cloud). Also, you need to specify a Databricks Runtime (DBR) version that you want to use (the <code>spark_version</code> parameter in cluster definition) that consists of several pieces: version itself, is it ML runtime or not, is it ML runtime for GPU or CPU, is it Photon-optimized, is it long term support version (LTS) or not, etc., for example, <code>11.3.x-cpu-ml-scala2.12</code> or <code>11.3.x-photon-scala2.12</code>. Also, new versions are released regularly, and if you want to have clusters/jobs to run on the latest version, you may need to update your Terraform code after each release of new runtimes.
</p>
<p>
And use of <code>databricks_node_type</code> and <code>databricks_spark_version</code> solve these problems:
</p>
<ul class="org-ul">
<li>you parameterize <code>databricks_node_type</code> by specifying what is the minimal number of cores required per node, how much memory should be per core, should it have GPU or not, category (compute or memory optimized, …), and many other parameters described in the <a href="https://registry.terraform.io/providers/databricks/databricks/latest/docs/data-sources/node_type">documentation</a>. When executing, Databricks Terraform provider fetches the list available node types via REST API, and finds a node matching your parameters that you can use in the cluster/job definition (Warning: sometimes it can't find it if you have incompatible requirements).</li>
<li>similarly, you tell <code>databricks_spark_version</code> to search a DBR version matching your requirements: ML or not, with Photon or not, etc. - see <a href="https://registry.terraform.io/providers/databricks/databricks/latest/docs/data-sources/spark_version">documentation</a> for full list. Similarly, when invoked, Terraform provider will call corresponding REST API, and find a specific version matching your requirements (or not find, if you specify incorrect combination, like, Photon + ML).</li>
</ul>
<p>
Let's look at the specific example - deployment of a Databricks job that will execute a notebook on a job cluster. Full source code is <a href="https://github.com/alexott/terraform-playground/tree/main/cloud-agnostic">available on GitHub</a>. It also demonstrates the use of <code>databricks_current_user</code> data source to create user-specific name for a job, and deploy a notebook into the user's directory.
</p>
<p>
First let select the corresponding node type for our job - here I want a node that has a local disk, has at least 8 cores, and it's compute optimized:
</p>
<div class="org-src-container">
<pre class="src src-terraform"><code><span style="color: darkslateblue;">data</span> <span style="color: darkcyan;">"databricks_node_type"</span> <span style="color: #cd661d;">"this"</span> {</code>
<code> <span style="color: sienna;">local_disk</span> =<span style="color: darkcyan;"> true</span></code>
<code> <span style="color: sienna;">min_cores</span> = 8</code>
<code> <span style="color: sienna;">category</span> = <span style="color: #008b00;">"Compute Optimized"</span></code>
<code>}</code>
</pre>
</div>
<p>
I also want to use latest Databricks ML Runtime with long term support:
</p>
<div class="org-src-container">
<pre class="src src-terraform"><code><span style="color: darkslateblue;">data</span> <span style="color: darkcyan;">"databricks_spark_version"</span> <span style="color: #cd661d;">"latest_lts"</span> {</code>
<code> <span style="color: sienna;">long_term_support</span> =<span style="color: darkcyan;"> true</span></code>
<code> <span style="color: sienna;">ml</span> =<span style="color: darkcyan;"> true</span></code>
<code>}</code>
</pre>
</div>
<p>
Then I just refer to that data sources in my job definition:
</p>
<div class="org-src-container">
<pre class="src src-terraform"><code><span style="color: darkslateblue;">resource</span> <span style="color: darkcyan;">"databricks_job"</span> <span style="color: #cd661d;">"this"</span> {</code>
<code> ...</code>
<code> <span style="color: #0000ee;">new_cluster</span> {</code>
<code> <span style="color: sienna;">num_workers</span> = 1</code>
<code> <span style="color: sienna;">spark_version</span> = data.databricks_spark_version.latest_lts.id</code>
<code> <span style="color: sienna;">node_type_id</span> = data.databricks_node_type.this.id</code>
<code> }</code>
<code> ...</code>
<code>}</code>
<code></code>
</pre>
</div>
<p>
That's all!
</p>
<p>
Let's see what happens if I execute that code on Azure, and then compare results with AWS & GCP. After job is created, let see into job cluster definition:
</p>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjMU6zowBhui3CgKAq08WZWQSgxXE6Ni1TPtQUdg89y3GEbQfEP2r9YeSXqhVY7vobYwO2HpCwngXPrwb0Yv65d4kbK6YI-qC_jmq9frPhD14DfmBZgy2M4EakbL1rkUFwXc4mA3kbe_LrPeHusevD26jyn4wSVTnGi-wcaVglyIh8qJfFxjAk/s807/Screenshot%202022-11-25%20at%2016.19.30.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="793" data-original-width="807" height="629" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjMU6zowBhui3CgKAq08WZWQSgxXE6Ni1TPtQUdg89y3GEbQfEP2r9YeSXqhVY7vobYwO2HpCwngXPrwb0Yv65d4kbK6YI-qC_jmq9frPhD14DfmBZgy2M4EakbL1rkUFwXc4mA3kbe_LrPeHusevD26jyn4wSVTnGi-wcaVglyIh8qJfFxjAk/w640-h629/Screenshot%202022-11-25%20at%2016.19.30.png" width="640" /></a></div><p></p>
<p>
As we can see, Terraform provider has selected the <code>Standard_F8s</code> instance type (compute optimized, with 8 cores), and selected <code>11.3.x-cpu-ml-scala2.12</code> as runtime version (latest LTS version with ML support for execution on nodes without GPU).
</p>
<p>
If you execute the same code on AWS, runtime version won't change, but we'll get <code>c5d.2xlarge</code> as the node type:
</p>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEixIUkXnuyCThS_AOZ7az7z13llYz_Uxg_zPcbtrTSgKoRpRTKDQLgrfrghY1fwXU0jqLK5nRg9hVduV8smspe5hYdq_Dpm1nfI2L4FgiVF5Q9BRjBvj6qDr9mIzqv2aUP9kpeL_ZHNgLPju3BfJt4cf-XPNkMrvBag47iMVATfwK9ReLs0vaw/s636/Screenshot%202022-11-25%20at%2016.21.49.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="264" data-original-width="636" height="266" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEixIUkXnuyCThS_AOZ7az7z13llYz_Uxg_zPcbtrTSgKoRpRTKDQLgrfrghY1fwXU0jqLK5nRg9hVduV8smspe5hYdq_Dpm1nfI2L4FgiVF5Q9BRjBvj6qDr9mIzqv2aUP9kpeL_ZHNgLPju3BfJt4cf-XPNkMrvBag47iMVATfwK9ReLs0vaw/w640-h266/Screenshot%202022-11-25%20at%2016.21.49.png" width="640" /></a></div><p></p>
<p>
And if we do the same on GCP, the node type will change to the <code>c2-standard-8</code> (have you noticed that this node has 32Gb of RAM instead of 16GB on Azure & AWS? This happens because there were no other node with smaller amount of memory):</p>
<div class="separator" style="clear: both; text-align: center;">
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgZleU9ktuhXMm-A3GIgaF_13QHUFpBspzvEydyt6M_bO6XKsj56yKHKXIyzUR7sFIJQH8aJ4X_BP2C8myzSD5Idsi48c8FSA8ZM8mProj6ElDJXSggAccnFWvNTLHRee20rRpL2zgrIUj7FaqDRfSrGkDD1V41i28_F3Yse5pEvLkLQs4HTW0/s717/Screenshot%202022-11-25%20at%2016.23.12.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="220" data-original-width="717" height="196" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgZleU9ktuhXMm-A3GIgaF_13QHUFpBspzvEydyt6M_bO6XKsj56yKHKXIyzUR7sFIJQH8aJ4X_BP2C8myzSD5Idsi48c8FSA8ZM8mProj6ElDJXSggAccnFWvNTLHRee20rRpL2zgrIUj7FaqDRfSrGkDD1V41i28_F3Yse5pEvLkLQs4HTW0/w640-h196/Screenshot%202022-11-25%20at%2016.23.12.png" width="640" /></a></div><p></p>
<p>
This blog post demonstrated that it's really easy to create Terraform code for Databricks that is easy to use on different clouds, and also avoid updating your code when new runtime versions are released.
</p>
Alex Otthttp://www.blogger.com/profile/13001951608173211050noreply@blogger.com0tag:blogger.com,1999:blog-6862508.post-59118805443753630932022-10-21T12:33:00.000+02:002022-10-21T12:33:02.942+02:00Ingesting indicators of compromise with Filebeat, Azure Event Hubs & Delta Lake on Databricks<p>
In cybersecurity, an <a href="https://en.wikipedia.org/wiki/Indicator_of_compromise">Indicator of Compromise (IoC)</a> is very important piece of information that is observed on a network or in an operating system that usually indicates a computer intrusion. Typical IoCs are things like file hashes, URLs/domains/IPs of botnet command & control servers, etc. Having this information we can use it to perform the real-time matching of logs & other data against known IoCs, or to perform investigations against historical data. There are multiple data formats that are used to exchange information about IoCs that allows sharing this information between different parties - there are open threat exchange platforms, but there are also a few security vendors that provide high quality, curated threat feeds.
</p>
<p>
As mentioned above, when it comes to use of IoC data we typically have two distinct use cases:
</p>
<ol class="org-ol">
<li>matching IoCs against new data - this usually happens in the real-time or near-real-time fashion against the streaming data, and generated alerts are kicking-in the investigation process. To minimize the time between generation of events/logs and generation of alerts, our tool should support efficient lookup in the IoC data.</li>
<li>matching IoCs against historical data - typically this happens as part of the incident response process, when analysts are looking into previous activity in light of the new data. In this case the tool should be able efficiently process huge amounts of historical data joined with IoC data.</li>
</ol>
<p>
<a href="https://spark.apache.org/">Apache Spark</a> in combination with <a href="https://delta.io/">Delta Lake</a> as underlying file storage format is a perfect combination that is able to handle both of these use cases very efficiently - Spark & Delta support both streaming & batch workloads using the same code, so you don't need to duplicate the IoC data, or write different code for each of the use cases. Additional efficiency when working with historical data could come from use of <a href="https://www.databricks.com/product/databricks-sql">Databricks SQL</a> that allows to process big amounts of data faster due use of the <a href="https://www.databricks.com/product/photon">Photon engine</a>.
</p>
<p>
To make IoC data available for use we need to perform two tasks:
</p>
<ol class="org-ol">
<li><b>Collect IoC data</b>. When you need to receive IoC data from the threat feeds, you usually need to scrap some REST API or something like that - this task often needs a custom code. But for popular threat exchange platforms there is an easier way to do that - you can simply use the <a href="https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-module-threatintel.html" target="_blank">Threat Intel</a> module of the <a href="https://www.elastic.co/beats/filebeat" target="_blank">Elastic Filebeat</a> - very popular, lightweight tool for shipping log data to Elasticsearch and other destinations, such as, Apache Kafka (or <a href="https://azure.microsoft.com/en-us/products/event-hubs/#overview" target="_blank">Azure Event Hubs</a> that also supports Apache Kafka protocol).</li>
<li><b>Make collected data available for consumption</b>. Usually data from different threat exchange platforms come in slightly different formats, also depending on what kind of IoCs are reported (domain names vs. IPs vs. file hashes, etc.). To access IoC data efficiently we need to transform them into a unified format.</li>
</ol>
<p>
The rest of the article describes these two steps in more detail.
</p>
<h3 id="org38d7f57">Setting up & running the Filebeat to ingest IoC data</h3>
<ol class="org-ol">
<li>Setting up Filebeat to output to <a href="https://azure.microsoft.com/en-us/products/event-hubs/#overview" target="_blank">Azure Event Hubs</a> - it's easy to configure of Filebeat to ingest data into Event Hubs (full example of config file you can find <a href="https://gist.github.com/alexott/3367e6757f8094ba4398b4f59fcb7887" target="_blank">here</a>):
<ul class="org-ul">
<li>we need to make sure that we disable <code>output.elasticsearch</code> and <code>output.logstash</code> blocks in the <code>filebeat.yaml</code></li>
<li>and we need to modify the <code>output.kafka</code> block as shown below, replacing values in the <code><></code> with actual values:
<ul class="org-ul">
<li><code>eh-namespace</code> in the <code>hosts</code> is the name of your EventHubs namespace</li>
<li>for authentication we're using <a href="https://learn.microsoft.com/en-us/azure/event-hubs/authorize-access-shared-access-signature" target="_blank">Shared Access Signature</a> that you need to copy from Azure Portal (or get via command-line/terraform) - you need to put it into the <code>password</code> field. The value of <code>username</code> is fixed and equal to <code>$ConnectionString</code></li>
<li>set value of <code>topic</code> field to the name of EventHubs topic into which we'll ingest the data</li>
<li>the rest of the fields should have the fixed values as specified below.</li>
</ul></li>
</ul></li>
<pre class="src src-yaml"><span style="color: sienna;">output.kafka</span>:
<span style="color: sienna;">hosts</span>: [<span style="color: #8b2252;">"<eh-namespace>.servicebus.windows.net:9093"</span>]
<span style="color: sienna;">sasl.mechanism</span>: <span style="color: #8b2252;">"PLAIN"</span>
<span style="color: sienna;">username</span>: <span style="color: #8b2252;">"$ConnectionString"</span>
<span style="color: sienna;">password</span>: <span style="color: #8b2252;">"Endpoint=sb://<eh-namespace>.servicebus.windows.net/..."</span>
<span style="color: #7f7f7f;"># </span><span style="color: #7f7f7f;">message topic selection + partitioning</span>
<span style="color: sienna;">topic</span>: <span style="color: #8b2252;">'<topic-name>'</span>
<span style="color: sienna;">partition.round_robin</span>:
<span style="color: sienna;">reachable_only</span>: <span style="color: darkcyan;">false</span>
<span style="color: sienna;">required_acks</span>: 1
<span style="color: sienna;">compression</span>: none
<span style="color: sienna;">ssl.enabled</span>: <span style="color: darkcyan;">true</span>
<span style="color: sienna;">max_message_bytes</span>: 1000000
</pre>
<li>Enable the Threat Intel module - that's also a very easy task:
<ul class="org-ul">
<li>in the <code>filebeat.yaml</code> make sure that all subsections inside the <code>filebeat.inputs</code> are commented out.</li>
<li>we need to enable <code>threatintel</code> module by renaming the <code>modules.d/threatintel.yml.disabled</code> to <code>modules.d/threatintel.yml</code></li>
<li>edit <code>modules.d/threatintel.yml</code> to enable specific integrations. In current article we're using following feeds: <code>abuseurl</code>, <code>abusemalware</code> & <code>malwarebazaar</code> from <a href="https://abuse.ch" target="_blank">Abuse.ch</a>, and <code>otx</code> from <a href="https://otx.alienvault.com/" target="_blank">AlientVault OTX</a>.</li>
</ul></li>
<li><a href="https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-starting.html" target="_blank">Start Filebeat</a> - of course we can run <code>filebeat</code> on a personal machine, but because it need to run all the time, it could be easier to run it in the cloud, where we can use something like <code>Standard B1ls</code> (on Azure) that has enough memory to run the Filebeat process, and it will cost you less than $4/month.</li>
</ol>
<h3 id="orgf295280">Processing collected IoC data</h3>
<p>
The previous section described how we can make IoC data published, but now we need to read them, and make them available for direct use. To do it we need to take several things into consideration when implementing data processing:
</p>
<ul class="org-ul">
<li>Filebeat's Threat Intel module periodically loads data from the specified REST API endpoints, but it doesn't perform de-duplication of the data - if there are no changes in the API output, it still writes collected data into a configured sink. The solution for this is to generate a hash of the actual payload & discard all duplicate events that have the same hash.</li>
<li>Different threat feeds use different data formats, and we need to perform normalization - use the same field names, expand different hashes of the same file into individual rows for easier matching, etc.</li>
<li>The same IoC may come via different threat feeds. There are different ways of handling this - ignore duplicates, merge data from multiple providers, etc. For simplicity I selected the first method - ignore duplicate submissions.</li>
</ul>
<p>
The implementation itself is quite straightforward and follows the standard <a href="https://www.databricks.com/glossary/medallion-architecture" target="_blank">medallion architecture</a> (full source code is on <a href="https://github.com/alexott/databricks-cybersecurity-playground/tree/main/iocs-ingest" target="_blank">GitHub</a>):
</p>
<ul class="org-ul">
<li>Raw data are ingested from Event Hubs into a bronze layer without much modification - we add a hash of the actual payload that is used to detect duplicates, extracting the threat feed name (the <code>dataset</code> column), and also adding a <code>date</code> column that is used for data partitioning. By keeping the raw data intact we'll be able to reprocess them if necessary, or add handling of new threat feeds later.</li>
<li>Actual data transformation happens when we ingest data into a silver layer. The code consists of the few functions that perform decoding and normalization of data for specific threat feeds (datasets) - this data then is written into a single Delta Lake table that then is used for streaming & batch processing.</li>
</ul>
<p>
Current implementation is implemented using <a href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html" target="_blank">Spark Structured Streaming</a>, but right now it's implemented as a batch-like job using the Trigger.Once that is triggered several times per day using the <a href="https://www.databricks.com/blog/2022/05/10/introducing-databricks-workflows.html" target="_blank">Databricks Workflows</a> that looks as following:
</p>
<p></p><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/a/AVvXsEiAsTV-6x50qlffUcpEDLiOMkJU5n_BDTlzCyxI36jLA7plSw3nK44VRQnjjpkwt26gPt6IQXSPQ4NEHG5fAV4wwiuC3i0pKWhyvzcjwypFCYBtet1tMHBWuKGBPDJQhmYfqHy5ZbBpqYF1iKA2mAH93A2JbPxISkc5ZSlFgNgXoNl0AIVSG0E" style="margin-left: 1em; margin-right: 1em;"><img alt="" data-original-height="573" data-original-width="413" height="400" src="https://blogger.googleusercontent.com/img/a/AVvXsEiAsTV-6x50qlffUcpEDLiOMkJU5n_BDTlzCyxI36jLA7plSw3nK44VRQnjjpkwt26gPt6IQXSPQ4NEHG5fAV4wwiuC3i0pKWhyvzcjwypFCYBtet1tMHBWuKGBPDJQhmYfqHy5ZbBpqYF1iKA2mAH93A2JbPxISkc5ZSlFgNgXoNl0AIVSG0E=w289-h400" width="289" /></a></div><br /><br /><p></p>
<p>
To reach the best performance when working with collected IoC data we need to have the correct data layout. In the current implementation, the silver table has following structure (only main columns are listed):
</p>
<ul class="org-ul">
<li><code>dataset</code> (string) - from which threat feed we got this IoC.</li>
<li><code>ioc_type</code> (string) - IoC type (possible values are <code>URL</code>, <code>domain</code>, <code>hostname</code>, <code>IPv4</code>, and different file hashes in form of <code>FileHash-<hash-type></code>).</li>
<li><code>ioc</code> (string) - actual IoC value, depending on the IoC type (hash/IP/…).</li>
<li><code>first_seen</code> (timestamp) - when a given IoC was first reported.</li>
<li><code>last_seen</code> (timestamp) - when a given entry was seen last time (please note that not all threat feeds report it).</li>
</ul>
<p>
Based on the target schema of the silver table, we can use following techniques to get best performance when working with IoC data:
</p>
<ul class="org-ul">
<li>partition table by the <code>ioc_type</code> column, so we'll read only specific data when matching specific IoC types.</li>
<li>index the <code>first_seen</code> & <code>last_seen</code> columns so we can get advantage of the <a href="https://docs.databricks.com/delta/file-mgmt.html#data-skipping-1" target="_blank">data skipping</a>.</li>
<li><a href="https://docs.databricks.com/delta/file-mgmt.html#z-ordering-multi-dimensional-clustering" target="_blank">Z-Order</a> data by <code>first_seen</code> column to make data skipping even more efficient. This is done by a maintenance task.</li>
<li>create a <a href="https://docs.databricks.com/optimizations/bloom-filters.html" target="_blank">bloom filter</a> (currently Databricks-only) for the <code>ioc</code> column to make joins & point lookups more efficient.</li>
</ul>
<h3 id="org043aad8"><span class="section-number-4">1.1.3.</span> Use the collected IoC data</h3>
<p>
After we prepared our IoC data, it's really easy to use them - we just need to perform a join between a dataframe with data (from stream or batch read) and our IoC table - we only need to make sure that we have input data in the correct format (the <code>ioc_type</code> should specify type of entry (IP/file hash/…), <code>ioc</code> - value to check, and <code>timestamp</code> - when the event happened):
</p>
<pre class="src src-python"><span style="color: sienna;">data</span> = <span style="color: #7f7f7f;"># </span><span style="color: #7f7f7f;">input dataframe</span>
<span style="color: sienna;">iocs</span> = <span style="color: #7f7f7f;"># </span><span style="color: #7f7f7f;">dataframe with IoC data</span>
<span style="color: sienna;">joined</span> = data.join(iocs, (
(data.ioc_type == iocs.ioc_type) & (data.ioc == iocs.ioc) &
(data.timestamp >= iocs.first_seen) &
(data.timestamp <= F.coalesce(iocs.last_seen, F.current_timestamp())))) \
.drop(data.ioc_type).drop(data.ioc)
</pre>
<p>
And that's all. It took less than 200 rows of the Python code to implement ingestion & normalization of the data for four threat feeds, and then use this data to detect potential security incidents.
</p>
Alex Otthttp://www.blogger.com/profile/13001951608173211050noreply@blogger.com0tag:blogger.com,1999:blog-6862508.post-78286011330813296272022-08-13T14:51:00.006+02:002022-08-13T14:51:49.880+02:00Reflecting on two years at Databricks...<p>This Wednesday, 10th of August, was my second anniversary of working at Databricks. Initially I planned to write this blog post on that day, but as usual, started to dig into customer work, and remembered about it only in the evening, after I went away from the keyboard.</p>
<p>I joined Databricks professional services team in the middle of the pandemic year. All interviews were done remotely (it became normal by that time), and I was really impressed by the people who did the interviews - there were deep technical and non-technical questions, but it wasn't something that to demonstrate superiority (I've seen such things previously). People were really excited to talk about position, working at Databricks, etc. There were multiple reasons to join Databricks:</p>
<ul style="text-align: left;">
<li>I always like Apache Spark, since I started to use it in early 2015th, and the possibility of working in the company behind Spark was really exciting. (before that, Spark was one of the decision points when I thought about joining DataStax...)</li>
<li>culture - few of my colleagues from DataStax were already working at Databricks, and I heard many stories about company's culture</li>
<li>the company was (and still) growing at a fast pace, and besides Spark, there were many other interesting products in the portfolio. And I especially wanted to get deeper into machine learning.</li>
<li>remote position - as a SaaS product, the amount of (potential) travel is much lower compared to the products that aren't cloud-based. Really, as of right now, I didn't have any work-related trips, although I had a possibility of working with many customers across the whole Europe - all not leaving the comfort of my home office setup.</li>
</ul>
<p>During the first weeks, as I was going through the onboarding trainings, I started to get to know a wider team - not only the direct colleagues, but people from other geo locations, and different departments - product & engineering, pre-sales solutions architects, ... And these interactions were confirming the stories that I've heard previously about company culture - there are a lot of very smart, but humble people, they are ready to help when you have questions or problems (especially during the onboarding), they are open for suggestions, you can reach people across org boundaries, discuss something with high management, ... And it keeps the same after two years, even though the company grew very significantly (when I joined we had less than 1,500 across the globe, and now we're close to 4,000).</p>
<p>The pace of the product development inside the company is very high - looking back, I can see how many things were added or heavily changed even since the last year, not even talking about two years ago. Databricks SQL, Delta Live Tables, Databricks Repos, Unity Catalog, just to name a few - these things are making life of our customers easier, allowing them to concentrate on solving their business problems, not trying to reinvent the wheel of running Spark & other things themselves. This makes work very interesting, although sometimes you can feel a kind of information overload, when you're trying to cover all areas of your interest.</p>
<p>Often, when I'm talking with people outside of the Databricks, they have an impression that my work is primarily around Spark (data engineering) and machine learning. But reality is quite different - reliable & scalable data engineering and machine learning aren't possible until you have a solid foundation of automation (cloud infra/data/ml/dev ops), security/compliance, and related things. As result, a big chunk of our work is spent around deployment planning (for Databricks and other cloud infrastructure), security, building CI/CD pipelines, and related topics. These things are the base on which customer's teams can build their data and machine learning products. And, almost from the beginning, I've started to contribute to the <a href="https://github.com/databricks/terraform-provider-databricks" target="_blank">Databricks Terraform provider</a> that is used by the significant number of Databricks customers to automate their deployments. And I want specifically mention <a href="https://github.com/nfx" target="_blank">Serge Smertin</a> who leads the development effort of terraform provider (and many other projects) - I learned many new things from him, and always was amused by his relentless push for making things powerful but easy to use. With the similar goal of helping customers to automate, I've started to contribute to Apache Airflow, so now it's not only possible just to run Databricks jobs, but you can query the data, import new data sets, and do many other things using the Databricks SQL. And besides of this, there were many other things done that allow to simplify work with Databricks, for example, <a href="https://github.com/alexott/databricks-nutter-repos-demo" target="_blank">testing of code in Databricks notebooks with Nutter</a>, a lot of code snippets demonstrating different aspects of the platform (check <a href="https://github.com/alexott/spark-playground" target="_blank">spark-playground</a> & <a href="https://github.com/alexott/databricks-playground" target="_blank">databricks-playground</a> repositories on GitHub), etc.</p>
<p>Working in a quickly growing company gives you a lot of possibilities to contribute to its success. These contributions could come in the different forms, like, sharing knowledge internally (in form of SME groups, presentations, creating new workshops, ...) & externally (i.e., I published <a href="https://www.databricks.com/blog/author/alex-ott" target="_blank">three blog posts</a> in the company's blog, answering on Databricks Community & StackOverflow), working closely with product & engineering on new functionality, simplifying internal processes, contributing to open source, ... But most important is that these contributions are recognized, allowing career growth, switching to a new role if you want to try a new area, etc.</p>
<p>All these things could be summarised as follows - decision to join Databricks was one of the best decisions so far, and I'm looking forward to more things happening there...</p>Alex Otthttp://www.blogger.com/profile/13001951608173211050noreply@blogger.com0tag:blogger.com,1999:blog-6862508.post-13740070204769286482022-06-19T16:30:00.004+02:002022-06-19T18:34:25.872+02:00Delta Live Tables recipes: Consuming from Azure Event Hubs<p>
<a href="https://docs.databricks.com/data-engineering/delta-live-tables/index.html">Databricks Delta Live Tables</a> (DLT) is a new framework from Databricks aimed on simplifying building reliable & maintainable data processing pipelines. With this framework developers are concentrating on writing data transformations themselves, linking them together, and Delta Live Tables handles task orchestration, cluster management, error handling, monitoring, and data quality. Delta Live Tables supports both batch & streaming workloads, supporting all data formats & input sources included into a Databricks Runtime (DBR).
</p>
<p>
On Azure, <a href="https://docs.microsoft.com/en-us/azure/event-hubs/">Event Hubs</a> (often spelled as EventHubs) is a popular solution for events transportation, similar to Apache Kafka, so when it comes to building solutions on Azure, the Event Hubs is a natural choice. There is a <a href="https://github.com/Azure/azure-event-hubs-spark">Spark connector for Event Hubs</a>, but right now it's not included into Databricks Runtime, and DLT doesn't allow (yet) to attach 3rd party Java libraries to a DLT pipeline.
</p>
<p>
But there is a workaround for that problem - Azure Event Hubs <a href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-for-kafka-ecosystem-overview">provides an endpoint compatible with Apache Kafka protocol</a>, so we can work with Event Hub topics using the Apache Kafka connector that is included into a Databricks Runtime. We just need to follow the instructions in the <a href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-kafka-spark-tutorial">official documentation</a> with small changes, specific to DBR:
</p>
<ul class="org-ul">
<li>we need to get <a href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string">Shared Access Signatures (SAS)</a> to authenticate to Event Hubs topic - it should look like <code>Endpoint=sb://<....>.windows.net/;?...</code> and will be used as a password. For security reasons it's recommended to put it into a Databricks secret scope (update variables <code>secret_scope</code> and <code>secret_name</code> with your actual values).</li>
<li>we need to form the correct string (the <code>eh_sasl</code> variable) for SASL (<a href="https://en.wikipedia.org/wiki/Simple_Authentication_and_Security_Layer">Simple Authentication and Security Layer</a>) authentication - as a user name we're using static value <code>$ConnectionString</code>, and Event Hubs SAS is used as a password. SASL string looks a bit different on Databricks - instead of <code>org.apache.kafka.common.security.plain.PlainLoginModule...</code> it should be prefixed with <code>kafkashaded.</code> as the original Java package is shaded to avoid conflicts with other packages.</li>
<li>you need to provide the name of the Event Hubs namespace & topic from which to read data in <span style="font-family: courier;"><code>eh_namespace_name</code></span> and <code>topic_name</code> variables.</li>
</ul>
<p>
The final solution looks as following:
</p>
<div class="org-src-container">
<pre class="src src-python"><span style="color: forestgreen;">@dlt.table</span>
<span style="color: #a020f0;">def</span> <span style="color: blue;">eventhubs_topic1</span>():
<span style="color: sienna;">secret_scope</span> = <span style="color: #8b2252;">"scope"</span>
<span style="color: sienna;">secret_name</span> = <span style="color: #8b2252;">"eventhub_sas"</span>
<span style="color: sienna;">topic_name</span> = <span style="color: #8b2252;">"topic1"</span>
<span style="color: sienna;">eh_namespace_name</span> = <span style="color: #8b2252;">"<eh-ns-name>"</span>
<span style="color: sienna;">readConnectionString</span> = dbutils.secrets.get(secret_scope, secret_name)
<span style="color: sienna;">eh_sasl</span> = <span style="color: #8b2252;">'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule'</span> \
+ f<span style="color: #8b2252;">' </span><span style="color: #8b2252;"><span style="color: #8b2252;">required </span>username="$ConnectionString" password="{readConnectionString}";'</span>
<span style="color: sienna;">bootstrap_servers</span> = f<span style="color: #8b2252;">"{eh_namespace_name}.servicebus.windows.net:9093"</span>
<span style="color: sienna;">kafka_options</span> = {
<span style="color: #8b2252;">"kafka.bootstrap.servers"</span>: bootstrap_servers,
<span style="color: #8b2252;">"kafka.sasl.mechanism"</span>: <span style="color: #8b2252;">"PLAIN"</span>,
<span style="color: #8b2252;">"kafka.security.protocol"</span>: <span style="color: #8b2252;">"SASL_SSL"</span>,
<span style="color: #8b2252;">"kafka.request.timeout.ms"</span>: <span style="color: #8b2252;">"60000"</span>,
<span style="color: #8b2252;">"kafka.session.timeout.ms"</span>: <span style="color: #8b2252;">"30000"</span>,
<span style="color: #8b2252;">"startingOffsets"</span>: <span style="color: #8b2252;">"earliest"</span>,
<span style="color: #8b2252;">"kafka.sasl.jaas.config"</span>: eh_sasl,
<span style="color: #8b2252;">"subscribe"</span>: topic_name,
}
<span style="color: #a020f0;">return</span> spark.readStream.<span style="color: darkslateblue;">format</span>(<span style="color: #8b2252;">"kafka"</span>) \
.options(**kafka_options).load()
</pre>
</div>
<p>
With it you can refer to your DLT table by name <code>eventhubs_topic1</code> in the <code>dlt.read</code> or <code>dlt.read_stream</code> functions. An example of using similar code can be seen in the image of a real DLT pipeline that I'm using for processing of threat feeds (there will be a separate post on that topic) - the <span style="font-family: courier;"><code>threatintel_bronze</code></span> consumes data from the Event Hubs. </p><p> </p><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjyLA1BAkoGjl3nEA5zkg0fJVEWkZLi79f02w4gZJ99Gq2N5mtNdCKINJ11XxIWQdcE-aZMTWd4qgFH5ZOTdu_ZAN-039Agf0ARGaenNBKHfkyCR48vbsz5FQg-hCkDJ35ZgN6KifcwElOZrBifwqhAjw6xpJ7YyEOt5pvPzssPzJoRZKvNsOY/s2434/Screenshot%202022-06-19%20at%2016.22.58.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="496" data-original-width="2434" height="130" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjyLA1BAkoGjl3nEA5zkg0fJVEWkZLi79f02w4gZJ99Gq2N5mtNdCKINJ11XxIWQdcE-aZMTWd4qgFH5ZOTdu_ZAN-039Agf0ARGaenNBKHfkyCR48vbsz5FQg-hCkDJ35ZgN6KifcwElOZrBifwqhAjw6xpJ7YyEOt5pvPzssPzJoRZKvNsOY/w640-h130/Screenshot%202022-06-19%20at%2016.22.58.png" width="640" /></a></div><br /><p></p>
<p>
There are also additional benefits in using the Apache Kafka connector. The biggest one is that the original Event Hubs connector requires 1-to-1 mapping between partitions in Event Hubs topic and Spark partitions. This means that if you have more CPU cores than partitions in Event Hubs topic, then your cluster resources will be used only partially, so you will spend money doing nothing. In Apache Kafka connector, the <code>minPartitions</code> parameter allows to specify desired number of Spark partitions, and connector will split existing Kafka/Event Hubs partitions into subranges, allowing creation of Spark partitions without 1-to-1 mapping. And this greatly improves cluster utilization. Stay tuned for a separate blog post on optimization of Spark + Event Hubs combo.
</p>
Alex Otthttp://www.blogger.com/profile/13001951608173211050noreply@blogger.com0tag:blogger.com,1999:blog-6862508.post-55877805997432636702021-12-31T18:43:00.002+01:002021-12-31T18:43:17.383+01:00Goodbye 2021st...<p> As usual, 31st December is a good time to look back on the year behind. This year flew by very fast, filled with many things both professional & personal.<br /><br />On the professional side there was a lot of activity - many different clients from small to huge, and interesting projects around very different things - architecture & implementation, security, automation/infrastructure, data quality, scaling the data processing (from an organisational perspective), improving development processes, etc. I'll try to find time to write about lessons in some of these areas. It's interesting that <a href="https://github.com/alexott/databricks-nutter-projects-demo">one demo</a> that demonstrates how to do automated testing of Databricks notebooks (I developed it for my CI/CD workshop) is 2nd most popular of my Github repositories.<br /><br />As time allowed, I tried to continue to contribute to OSS. The most significant OSS contribution was to <a href="https://github.com/databrickslabs/terraform-provider-databricks">Databricks Terraform Provider</a> - around 10k lines of Go code. Another big part of activity was to the <a href="https://github.com/databrickslabs/overwatch">project Overwatch</a> that simplifies collection & analysis of data from Databricks workspaces to find problematic usage of resources, analyse costs, etc. And on top of that, quite many small activities (PRs, issues, etc.) to fix bugs in documentation, port some components to Spark 3, etc. Hopefully, I'll continue to work on OSS stuff at the same scale.<br /><br />From a personal side, the second pandemic year didn't allow a return back to "normal life". But we still managed to travel two times (it was quite a relief after almost 1.5 year since a "normal" vacation). This year, I finally managed to complete my <a href="https://www.goodreads.com/user_challenges/25194547">reading challenge</a> - primarily because of travelling just with Kindle, without distraction from iPad/laptop.<br /><br />I wish Happy New Year to everyone! And be safe!</p>Alex Otthttp://www.blogger.com/profile/13001951608173211050noreply@blogger.com0tag:blogger.com,1999:blog-6862508.post-59980331499938428802020-12-31T17:47:00.000+01:002020-12-31T17:47:14.574+01:00Looking back to 2020th<p>
The last day of the year is a good opportunity to look back on what happened during the current year.
</p>
<p>
The pandemic changed our life significantly, and I'm not the exception, although maybe not so cardinally as for others - I'm working mostly remotely for the last three years, with periodic trips to customers. And this was the most significant change for me this year - everything became virtual in a very short period of time, without visits to customers onsite. Although, in the first two months of the year I traveled a lot - almost half of the mileage for the 2019th. The biggest effect of this switch to virtual was on trainings that I did for customers - if you can collaborate with people remotely when investigating some problems, discussing implementations, etc., with trainings that's different - it's harder to see if people understands what you're teaching, as you don't see reactions - this required to change an approach to teaching, including materials that are presented…
</p>
<p>
The biggest change that happened this year is the new job - after interesting years at DataStax, I went to Databricks, to a similar position - helping customers to build solutions on the top of the Databricks platform. Databricks is well known as a company behind Spark, but it's not only Spark - MLflow & Delta Lake are very popular & powerful technologies for building data processing & machine learning solutions. And inside Databricks, all of them are getting new functionality faster, before release to the open source. And being a cloud platform, Databricks made it easier working with customers during pandemic - you aren't required to be onsite to help people. Overall, it's very interesting to be in a fast growing company, with a lot of really smart people around, so you can learn a lot. Plus, I got much more exposure to the Azure & AWS services that I didn't touch much before. One of the interesting things to observe is that Spark is traditionally associated with Scala, but in practice I'm writing much more code in Python/PySpark :-)
</p>
<p>
This year I again didn't make my reading challenge - I set it to 55 books, like the last year, but read only 40 (it was 46 in 2019th) - this also was a side effect of the not traveling so much, plus a job change (but I read a lot of documentation :-). One of the book-related activities was a technical proofreading of several books by O'Reilly & Manning (I worked with Manning before on several books):
</p>
<ul class="org-ul">
<li>Cassandra. The Definitive Guide, 3rd edition</li>
<li>The Practitioner's Guide to Graph Data</li>
<li>Graph Databases in Action</li>
<li>Graph-Powered Machine Learning (not released yet)</li>
</ul>
<p>
Programming related activity was spread between open source & internal projects. On the open source front, I became the committer for <a href="https://zeppelin.apache.org/">Apache Zeppelin</a>, primarily improving support for Cassandra (more details are <a href="https://alexott.blogspot.com/2020/07/new-functionality-of-cassandra.html">in this blog post</a>), but with the job change that was put on hold. But at the new job I suddenly started to write in Go again, contributing to the <a href="https://github.com/databrickslabs/terraform-provider-databricks">Terraform provider for Databricks</a>. Besides that, there were a lot of small contributions to multiple OSS projects, including the several <a href="https://github.com/DataStax-Toolkit/">open sourced projects</a> at DataStax that just make life of administrators easier.
</p>
<p>
This year also was more productive than previous around writing. I wrote (with co-authors) two blog posts for DataStax's blog (<a href="https://www.datastax.com/blog/advanced-apache-cassandra-analytics-now-open-all">1</a>, <a href="https://www.datastax.com/blog/migrate-cassandra-apps-cloud-20-lines-code">2</a>), and seven blog posts for my <a href="https://alexott.blogspot.com/">own blog</a> around <a href="https://alexott.blogspot.com/search/label/cassandra">Cassandra</a>, <a href="https://alexott.blogspot.com/search/label/spark">Spark</a>, <a href="https://alexott.blogspot.com/search/label/zeppelin">Zeppelin</a>, <a href="https://alexott.blogspot.com/search/label/datastax">DataStax</a>, and <a href="https://alexott.blogspot.com/search/label/databricks">Databricks</a>. And I have drafts for several blog posts that I'm planning to publish early next year.
</p>
<p>
And many other things happened during the year, but I don't want to list everything here :-)
</p>
<p>
I wish you everyone a happy & prosperous New Year! And stay healthy - this is the main thing right now.
</p>
Alex Otthttp://www.blogger.com/profile/13001951608173211050noreply@blogger.com0tag:blogger.com,1999:blog-6862508.post-30944070756551529982020-07-31T12:50:00.005+02:002020-07-31T12:51:18.243+02:00Running Apache Zeppelin on DSE Analytics<p>
DataStax Enterprise (DSE) includes the modified version of Apache Spark branded as DSE Analytics. This version has increased fault tolerance, doesn't rely on the Zookeeper, and has many additional optimizations & enhancements for work with Cassandra. It also includes a Hadoop-compatible distributed file system - DSEFS. And I already wrote about <a href="https://alexott.blogspot.com/2020/07/using-zeppelin-to-work-with-data-in-dse.html">using Zeppelin with another component of DSE Analytics - AlwaysOn SQL Service</a>.
</p>
<p>
In this post I want to discuss how we can use Apache Zeppelin to run on the DSE Analytics, allowing us to use the Spark resources that we already have in the DSE cluster. If we just need access data in DSE from Spark, without running Spark code in DSE Analytics, then we just need to configure Zeppelin to use <a href="https://www.datastax.com/blog/2020/05/advanced-apache-cassandra-analytics-now-open-all">recently released Spark Cassandra Connector 2.5</a> as it has better compatibility with DSE. For this post I used Zeppelin 0.9-preview2 with DSE 6.8.1 that includes Spark 2.4.
</p>
<p>
To run Zeppelin on DSE Analytics we have two options that are described in corresponding sections:
</p>
<ol class="org-ol">
<li>Execute Zeppelin directly on the node of DSE cluster - this is the easiest way, but not very good from a security standpoint, adding more load to the DSE node, etc.</li>
<li>Execute Zeppelin on some other node that have access to the DSE cluster - this solves security and other problems, but requires more work to setup</li>
</ol>
<p>
In both cases we're relying on the code shipped in DSE, and we don't need to explicitly install Spark Cassandra Connector.
</p>
<div class="outline-4" id="outline-container-org60055c6">
<h4 id="org60055c6">Running Zeppelin on DSE node(s)</h4>
<div class="outline-text-4" id="text-1-5-1">
<p>
This is the most straightforward way to run Zeppelin & get access to DSE Analytics, DSEFS, etc. The procedure is simple:
</p>
<ul class="org-ul">
<li>Start Zeppelin as <code>dse exec path_to_zeppelin.sh</code> on one of the nodes inside DSE Analytics data center. <a href="https://docs.datastax.com/en/dse/5.1/dse-dev/datastax_enterprise/spark/thirdPartyToolsIntegrationSpark.html#Zeppelinintegration">dse exec</a> will setup all necessary parameters - <code>CLASSPATH</code>, etc., so Zeppelin will pick up all necessary jars that are necessary to submit jobs to the DSE Analytics</li>
<li>In Zeppelin UI change the Spark interpreter settings. Change the <code>spark.master</code> (<code>master</code> in the Zeppelin 0.8) parameter to <code>dse://?</code> instead of default <code>local[*]</code> - this will force Zeppelin to execute jobs on DSE Analytics, with all its advantages, like, automatic registration of DSE tables in Spark SQL, access to DSEFS, etc.</li>
</ul>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgi1rXv-CW9ho-Q0E1m8ArPbngvS6ZadRwmKnv7RoVQ_ro6tiz3rxrZX1v-boopH1O_f8-P3mxaW7JYIJgeGEL6GwQtaGAQK83Wk6aeK4JRTPCZ1RBNA7aDD7ILi5CxQkr3fDf0gg/s1132/zeppelin-dse-analytics-1-configure.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="351" data-original-width="1132" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgi1rXv-CW9ho-Q0E1m8ArPbngvS6ZadRwmKnv7RoVQ_ro6tiz3rxrZX1v-boopH1O_f8-P3mxaW7JYIJgeGEL6GwQtaGAQK83Wk6aeK4JRTPCZ1RBNA7aDD7ILi5CxQkr3fDf0gg/d/zeppelin-dse-analytics-1-configure.png" /></a></div><p></p>
<p>
After configuration is changed, we can execute Spark code to read data from DSE, write to DSEFS, execute Spark SQL queries (and we don't need to explicitly register Cassandra tables!), etc.:
</p>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEiTgqGI335OhhL1wIEaJhYEQ8wORmIH9IImh4hyphenhyphenSIrPjsbKZKF1lFsAQAJjYimbB_LXnLVWNN0LyrWv10Ba4CLCCx1s3hCKmC4cYGRRmjL4thvY_sL1mnIPJMX6Zo80tWU9DgPMBQ/s1115/zeppelin-dse-analytics-1-notebook.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="833" data-original-width="1115" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEiTgqGI335OhhL1wIEaJhYEQ8wORmIH9IImh4hyphenhyphenSIrPjsbKZKF1lFsAQAJjYimbB_LXnLVWNN0LyrWv10Ba4CLCCx1s3hCKmC4cYGRRmjL4thvY_sL1mnIPJMX6Zo80tWU9DgPMBQ/d/zeppelin-dse-analytics-1-notebook.png" /></a></div><p></p>
<p>
And we can see in the Spark Master of DSE Analytics that Zeppelin is really executed there:
</p>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjXdyUTAzuamsnaXtxVDClaOPXya-h3vusAZA6Hhjg02aRnzEX_dnaWHA6pOTgjDntgyERKZuZBEqM75hUQM3GAw696XlpWp35BBF1m874AWINmmxiIlfjRv3A5DS8AfDO5OouzQg/s1116/zeppelin-dse-analytics-1-spark-master.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="756" data-original-width="1116" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjXdyUTAzuamsnaXtxVDClaOPXya-h3vusAZA6Hhjg02aRnzEX_dnaWHA6pOTgjDntgyERKZuZBEqM75hUQM3GAw696XlpWp35BBF1m874AWINmmxiIlfjRv3A5DS8AfDO5OouzQg/d/zeppelin-dse-analytics-1-spark-master.png" /></a></div><p></p>
</div>
</div>
<div class="outline-4" id="outline-container-orga9caecc">
<h4 id="orga9caecc"><span class="section-number-4"></span>Running Zeppelin with DSE Analytics outside of DSE Cluster</h4>
<div class="outline-text-4" id="text-1-5-2">
<p>
Sometimes, it's undesirable to run Zeppelin on the DSE node directly due to many reasons - resource consumption, security concerns (for example, people may get access to files via shell interpreter or other means), etc. In this case we can still have benefits of running Zeppelin via <code>dse exec</code> - we just need to do following:
</p>
<ul class="org-ul">
<li>download DSE distribution and unpack it on the machine where we want to run Zeppelin - you don't need to configure or start anything. We just need DSE-specific jar files to be able to</li>
<li>start Zeppelin via <code>dse exec</code> as before</li>
<li>configure it to run on DSE Analytics, but we'll need to make more operations to achieve this:
<ul class="org-ul">
<li>we need to obtain an IP address of Spark master - this is could be done either by looking for Spark master IP in output of <code>dsetool status</code>, or we can use <a href="https://docs.datastax.com/en/dse/6.8/dse-dev/datastax_enterprise/tools/dseClientTool/dseClientToolcommands/dseClient-toolSpark.html">dse client-tool spark master-address</a> - this option would be even easier for automatic configuration of the Zeppelin, because it will print complete URI of Spark master</li>
<li>change <code>spark.master</code> parameter to value obtained via <code>dse client-tool spark master-address</code> - it should be at least <code>dse://<Master-IP>?</code>, or with more parameters like <code>connection.host</code> and <code>local_dc</code>. For example: <code>dse://10.121.34.176:9042?connection.local_dc=SearchAnalytics;connection.host=10.121.34.94,10.121.33.133</code>; <br /></li></ul></li></ul><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjmf8P67F3qh4A2PvR8hYzR5tI01Aat6JQroCJGBKw0FKuXrAKAaxwIrkHCSEjskmCpoFVQIBtmD5YNc1rIHeJUrH3oziDOjvbmKvgHAq25KOLWtal95NbWgNkVcQXXoOtkQs_KNw/s1094/zeppelin-dse-analytics-2-configure-1.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="381" data-original-width="1094" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjmf8P67F3qh4A2PvR8hYzR5tI01Aat6JQroCJGBKw0FKuXrAKAaxwIrkHCSEjskmCpoFVQIBtmD5YNc1rIHeJUrH3oziDOjvbmKvgHAq25KOLWtal95NbWgNkVcQXXoOtkQs_KNw/d/zeppelin-dse-analytics-2-configure-1.png" /></a></div><p></p>
<ul class="org-ul">
<li>if there is no <code>connection.host</code> in the Spark master URI, then you need to add the <code>spark.cassandra.connection.host</code> property, and put there a comma-separated list of DSE nodes</li>
<li>if necessary, add other properties specific for Spark Cassandra Connector and DSE Analytics. We can obtain them by executing <a href="https://docs.datastax.com/en/dse/6.8/dse-dev/datastax_enterprise/tools/dseClientTool/dseClientToolcommands/dseClient-toolConfigurationByos-export.html">dse client-tool configuration byos-export</a> command. Usually these are properties related to security, but we can specify any additional property specific for the Spark Cassandra Connector, like, username and passwords, or performance tuning options</li>
<li>to work with DSEFS as the default file system we can specify the Hadoop option <code>spark.hadoop.fs.defaultFS</code> with value of <code>dsefs://<DSE_NODE_IP></code>. This is not strictly required, we still can use DSEFS but we'll need to specify node address in the path, like, <code>dsefs://192.168.0.10/file.csv</code> (see screenshot below)</li></ul><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEin_T5WCVsAr4EbBQypa18QvmTDCkD6i74i1C3SYHs9cjzLEBAA2CQarEThtK1JEuYC-yvogdd1LsQr23vg0MEFX8GfKZI24lA-TDVNsrnknKwjpjncEnCLf1AiB6rYPhdKZCeX4g/s1082/zeppelin-dse-analytics-2-configure-2.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="91" data-original-width="1082" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEin_T5WCVsAr4EbBQypa18QvmTDCkD6i74i1C3SYHs9cjzLEBAA2CQarEThtK1JEuYC-yvogdd1LsQr23vg0MEFX8GfKZI24lA-TDVNsrnknKwjpjncEnCLf1AiB6rYPhdKZCeX4g/d/zeppelin-dse-analytics-2-configure-2.png" /></a></div><div></div><ul class="org-ul">
</ul>
<p>
After everything is configured, we can execute our code. Result will be the same, we'll get the Zeppelin process running on DSE Analytics, and we'll have full access to data. And we can use DSEFS as well - we can write data to DSEFS using explicit or implicit filesystem:
</p>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjZjdDFdBYj93H3qwVDMUjUBCBrv3hPjTTgC7RijYl8WZ-faj5RexR4-zq8jPb-0dm1AX4ud6va19UdEJJMb-63hcLdiTJfVzIZTlK2liJuk86NYyqt3r6Rxt5LLbopU7iaw5aVBQ/s1095/zeppelin-dse-analytics-2-dsefs-results.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="173" data-original-width="1095" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjZjdDFdBYj93H3qwVDMUjUBCBrv3hPjTTgC7RijYl8WZ-faj5RexR4-zq8jPb-0dm1AX4ud6va19UdEJJMb-63hcLdiTJfVzIZTlK2liJuk86NYyqt3r6Rxt5LLbopU7iaw5aVBQ/d/zeppelin-dse-analytics-2-dsefs-results.png" /></a></div><p></p>
<p>
and see that data on DSEFS:
</p>
<p><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEik7JAe3oBxD3Vu0LyGh6AzIpCnQxmhtQdb0_denF5CJfkW662G4SSFhftnEvsGQx5cWtCSyl1pu2zhDPQ1q9r2JTtznPXlktg2Z5N8nYTWMS3hopu9rH-F7705S0wUpzTOObhqBQ/s1009/zeppelin-dse-analytics-2-dsefs-write.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="573" data-original-width="1009" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEik7JAe3oBxD3Vu0LyGh6AzIpCnQxmhtQdb0_denF5CJfkW662G4SSFhftnEvsGQx5cWtCSyl1pu2zhDPQ1q9r2JTtznPXlktg2Z5N8nYTWMS3hopu9rH-F7705S0wUpzTOObhqBQ/s640/zeppelin-dse-analytics-2-dsefs-write.png" width="640" /></a></p>
</div>
</div>
<div class="outline-4" id="outline-container-orgc7ebf51">
<h4 id="orgc7ebf51">Conclusion</h4>
<div class="outline-text-4" id="text-1-5-3">
<p>
This post shows that it's quite easy to run Apache Zeppelin on DSE Analytics, either directly on the cluster's nodes, or outside of the DSE cluster. For the second option, the setup process could be simplified by packing both DSE & Zeppelin into a Docker image (<a href="https://gist.github.com/alexott/246e9ab5e50416d83c080f53529cecf6">example</a>), and configuring Zeppelin using its <a href="https://zeppelin.apache.org/docs/0.9.0-preview2/usage/rest_api/configuration.html">configuration REST API</a>.
</p>
</div>
</div>
Alex Otthttp://www.blogger.com/profile/13001951608173211050noreply@blogger.com0tag:blogger.com,1999:blog-6862508.post-13125063693346642652020-07-30T11:59:00.004+02:002020-07-30T12:10:20.088+02:00What's new in Apache Zeppelin's Cassandra interpreter<p>
The upcoming Zeppelin 0.9 is a very big release for Apache Zeppelin (the 0.9.0-preview2 was just released). A lot has happened since release of the 0.8.x series - better support for Spark & Flink, new interpreters (Influxdb, KSQL, MongoDB, SPARQL, …), a lot of bug fixes and improvements in the existing interpreters. In this blog post I want to specifically discuss improvements in the Cassandra interpreter that exists since Zeppelin 0.5.5, released almost 5 years ago.
</p>
<p>
The two most notable changes in the new release (already available in the <code>0.9.0-preview2</code>) are:
</p>
<ul class="org-ul">
<li>Upgrade of the driver to DataStax Java driver 4.x (<a href="https://issues.apache.org/jira/browse/ZEPPELIN-4378">ZEPPELIN-4378</a>)</li>
<li>Control of formatting for results of SELECT queries (<a href="https://issues.apache.org/jira/browse/ZEPPELIN-4796">ZEPPELIN-4796</a>)</li>
</ul>
<div class="outline-4" id="outline-container-org1198230">
<h4 id="org1198230">Upgrade to the DataStax Java driver 4.x</h4>
<div class="outline-text-4" id="text-1-3-1">
<p>
Prior releases of the Cassandra interpreter were based on the open source <a href="https://github.com/datastax/java-driver/tree/3.x">DataStax Java Driver for Apache Cassandra 3.x</a>. It worked fine with Apache Cassandra, but not always was usable with DataStax Enterprise (DSE), for example, you couldn't use it with DSE-specific data types, like, <code>Point</code>, when you get data back as <code>ByteBuffer</code> instead of <code>Point</code>:
</p>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEiVgF_4na0zQkMMAUwIhYc36s4eVJvkcA0IVYhUQ83aygcEm8F7vyVEdN04CR5hHIAQVM1P2KUCmLb36_kl3qZdxxYZ_3SdwakGTu6ZUIbgwcY1Vz5xeLFT8xhXozD3kvO85xvM0w/s1082/zeppelin-driver3-geo-points.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="222" data-original-width="1082" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEiVgF_4na0zQkMMAUwIhYc36s4eVJvkcA0IVYhUQ83aygcEm8F7vyVEdN04CR5hHIAQVM1P2KUCmLb36_kl3qZdxxYZ_3SdwakGTu6ZUIbgwcY1Vz5xeLFT8xhXozD3kvO85xvM0w/d/zeppelin-driver3-geo-points.png" /></a></div><p></p>
<p>
DataStax Java driver 4.0, released in March 2019th, was a complete rewrite of the Cassandra driver to make it more scalable and fault-tolerant. To achieve these goals, the architecture of the driver has changed significantly, making it binary incompatible with previous versions. Also since Java driver 4.4.0, released in January 2020th, all DSE-specific functionality is <a href="https://www.datastax.com/blog/2020/01/better-drivers-for-cassandra">available in the single (unified) driver</a>, instead of traditional separation on OSS & DSE drivers. With release of the unified driver 4, the 3.x series of the driver was put into the maintenance mode, receiving only critical bug-fixes, but no new features.
</p>
<p>
To get access to the new features of the driver, internals of Cassandra interpreter were rewritten. Because of the architectural changes of the new driver, the changes in the interpreter were quite significant. But in result we're getting more functionality:
</p>
<ul class="org-ul">
<li>Access to all improvements and new functions provided by the driver itself - better load balancing policy, fault tolerance, performance, etc.</li>
<li>Allow to configure all parameters of the Java driver. In previous versions of interpreter, every configuration option of the driver should be explicitly exposed in the interpreter's configuration, and addition of the new option required change in the interpreter's code, and release of the new version together with Zeppelin release. In the new version of interpreter, we can set any driver configuration option, even if it's not explicitly exposed by interpreter. This is possible because of the <a href="https://docs.datastax.com/en/developer/java-driver/4.7/manual/core/configuration/">way the new Java driver is configured</a> - configuration could be specified in the config file, set programmatically, or even via Java system properties. This flexibility was already demonstrated in the <a href="https://alexott.blogspot.com/2020/06/working-with-datastax-astra-from-apache.html">blog post on connecting Zeppelin to the DataStax's Astra</a> (Cassandra as a Service)</li>
<li>Support for DSE-specific features, for example, now it's possible to execute commands of DSE Search, or work with geospatial data types:</li>
</ul>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgCNt01nHY2Rj0VZWV0azUpY54OH3DpIVTJ1GJREXPfpEA7idGS0dG15vt_-EEUWSx1QRJfl7pmchZ5UDDkcGf30cZxAjmHlf67BoALxl3wz03kK_LdVFPWRhWnSQRO3bTdgUiVlA/s1090/zeppelin-driver4-geo-points.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="201" data-original-width="1090" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgCNt01nHY2Rj0VZWV0azUpY54OH3DpIVTJ1GJREXPfpEA7idGS0dG15vt_-EEUWSx1QRJfl7pmchZ5UDDkcGf30cZxAjmHlf67BoALxl3wz03kK_LdVFPWRhWnSQRO3bTdgUiVlA/d/zeppelin-driver4-geo-points.png" /></a></div><p></p>
<p>
Because of the changes in driver itself, there are some breaking changes in interpreter:
</p>
<ul class="org-ul">
<li>the new driver <a href="https://docs.datastax.com/en/developer/java-driver/4.7/manual/core/native_protocol/#compatibility-matrix">supports only Cassandra versions</a> that implement native protocol V3 and higher (Cassandra 2.1+, and DSE 4.7+). As result, support for Cassandra 1.2 and 2.0 is dropped (but you shouldn't use them in 2020th anyway)</li>
<li>there is only <a href="https://docs.datastax.com/en/developer/java-driver/4.7/manual/core/retries/">one retry policy provided by the new driver</a>, and support for other retry policies (<code>LoggingRetryPolicy</code>, <code>FallthroughRetryPolicy</code>, etc.) are removed. As result of this, support for query parameter <code>@retryPolicy</code> was dropped, so existing notebooks that are using this parameter need to be modified</li>
</ul>
</div>
</div>
<div class="outline-4" id="outline-container-orgef5a268">
<h4 id="orgef5a268">Control of the results' formatting</h4>
<div class="outline-text-4" id="text-1-3-2">
<p>
The previous version of the interpreter always used the predefined formatting for numbers, and date/time related data types. Also, the content of the collections (maps, sets & lists), tuples, and user-defined types was always formatted using the CQL syntax, with This wasn't always flexible, especially for building graphs, or exporting data into a file for importing into external system that may expect data in some specific format.
</p>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgmjOwoWfoH96kMx4zxIKy0j928fIkGozdhk6LVP9cSblFP7AKsdugCMCPwjyFsaUavDbnVBfDrA7vrj-u22OcIqvD2i7E6mC3aonni0scAlqBkAJ0uc3HiZOpvQVQDHctGBz2wgg/s1218/zeppelin-formatting-old.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="217" data-original-width="1218" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgmjOwoWfoH96kMx4zxIKy0j928fIkGozdhk6LVP9cSblFP7AKsdugCMCPwjyFsaUavDbnVBfDrA7vrj-u22OcIqvD2i7E6mC3aonni0scAlqBkAJ0uc3HiZOpvQVQDHctGBz2wgg/d/zeppelin-formatting-old.png" /></a></div><p></p>
<p>
In a new interpreter users can control formatting of results - you can configure this on interpreter and even on the cell level. This includes:
</p>
<ul class="org-ul">
<li>selection between output in the human-readable or strict CQL format. In the human-readable format, users can have more control on the formatting, like, specification of precision, formatting of date/time results, etc.</li>
<li>control of precision for <code>float</code>, <code>double</code>, and <code>decimal</code> types</li>
<li>specification of locale that will be used for formatting - this affects date/time & numeric types</li>
<li>specification of format for date/time types for each of <code>date</code>, <code>time</code>, and <code>timestamp</code> types. You can use any option of <a href="https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html" target="_blank">DateTimeFormatter</a> class</li>
<li>specification of timezone for <code>timestamp</code> type</li>
</ul>
<p>
All of this is applied to all data, including the content of collections, tuples, and user-defined types.
</p>
<p>
Formatting options could be set on the interpreter level by changing new configuration options (see <a href="https://zeppelin.apache.org/docs/0.9.0-preview2/interpreter/cassandra.html" target="_blank">documentation for details</a>) - if you change them, this will affect all notebooks:
</p>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhxNitlBvLqM2VISkX8qOF4FATWylU0l_NVbZwRjAjtZyluQ6DY_ZaZBJjoTmTYEO3Aaa1KJ6rSQrdca2I2lCfhJFYKpG1oxzRXqYDaLGDRDNj2oQ0xhzuZFqsD9NVm17NWnhs6SA/s1335/zeppelin-formatting-config-options.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="305" data-original-width="1335" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhxNitlBvLqM2VISkX8qOF4FATWylU0l_NVbZwRjAjtZyluQ6DY_ZaZBJjoTmTYEO3Aaa1KJ6rSQrdca2I2lCfhJFYKpG1oxzRXqYDaLGDRDNj2oQ0xhzuZFqsD9NVm17NWnhs6SA/d/zeppelin-formatting-config-options.png" /></a></div>
<p>
With default options, user will get data in human-readable format, like this:
</p>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEh4-XpNA5RgTfbTG4M81eqQT2z8IcTyPCQSTw7P7jn4P0wuussGD9yMxFBle89tQgPwWcGu3LAJaI_PbHN7KzKBFJZi_gnDp29-BQafAjo42gfKan0OaM3BEO9DV0cujzLMakkCVA/s1226/zeppelin-formatting-new-default.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="216" data-original-width="1226" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEh4-XpNA5RgTfbTG4M81eqQT2z8IcTyPCQSTw7P7jn4P0wuussGD9yMxFBle89tQgPwWcGu3LAJaI_PbHN7KzKBFJZi_gnDp29-BQafAjo42gfKan0OaM3BEO9DV0cujzLMakkCVA/d/zeppelin-formatting-new-default.png" /></a></div><p></p>
<p>
But sometimes it's useful to change formatting only in specific cells. This is now possible by specifying options in the list after the interpreter name, like <code>%cassandra(option=value, ...)</code> (please note, that if option includes <code>=</code> or <code>,</code> characters, it should be put into double quotes, or escaped with <code>\</code>). There are multiple options available, that are described in the documentation(TODO: link) and built-in help. For example, we can change formatting to CQL:
</p>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhMiHS0LLtVMEs7tgqn47NgTyCr0SW7QTLT91g3Jyp5mCovKemnFElzXJAWyHhWkuIfhVCcshwQfBsMOos9oWrk8RVDArYw-uWfKP-zmcMBnJKtMaMw_CvWsTVmz4qaSZz4BrbLBw/s1227/zeppelin-formatting-new-cql.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="197" data-original-width="1227" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhMiHS0LLtVMEs7tgqn47NgTyCr0SW7QTLT91g3Jyp5mCovKemnFElzXJAWyHhWkuIfhVCcshwQfBsMOos9oWrk8RVDArYw-uWfKP-zmcMBnJKtMaMw_CvWsTVmz4qaSZz4BrbLBw/d/zeppelin-formatting-new-cql.png" /></a></div><p></p>
<p>
Or we can multiple options at the same time - locale (see that it affects formatting of numbers and date/time), timezone, format of timestamp, date, etc.:
</p>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhrwgNaA0wPMSvtRnHT_X9hk4FFO3GERshUV637iE64zzmmd7alPcNUHExVzhnzXiRMOBRzqF74OHLG9YzllYsrhGRjTVyqpCNu1ZpJ5hO26HwxdqFglVKWQi1J5_08rWdwAiIHVQ/s1345/zeppelin-formatting-new-with-options.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="250" data-original-width="1345" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhrwgNaA0wPMSvtRnHT_X9hk4FFO3GERshUV637iE64zzmmd7alPcNUHExVzhnzXiRMOBRzqF74OHLG9YzllYsrhGRjTVyqpCNu1ZpJ5hO26HwxdqFglVKWQi1J5_08rWdwAiIHVQ/d/zeppelin-formatting-new-with-options.png" /></a></div><p></p>
</div>
</div>
<div class="outline-4" id="outline-container-org4a27d51">
<h4 id="org4a27d51">Other changes</h4>
<div class="outline-text-4" id="text-1-3-3">
<p>
There are also smaller changes available in the new release - they are making the interpreter more stable, or add a new functionality. This includes:
</p>
<ul class="org-ul">
<li>(<a href="https://issues.apache.org/jira/browse/ZEPPELIN-4444">ZEPPELIN-4444</a>) explicitly check for schema disagreement when executing the DDL statements (<code>CREATE/ALTER/DROP</code>). This is very important for stability of the Cassandra cluster, especially when executing many of them from the same cell. Because Cassandra is a distributed system, they could be executed on the different nodes in almost the same time, and such uncoordinated execution may lead to a state of the cluster called "schema disagreement" when different nodes have different versions of the database schema. Fixing this state usually <a href="https://support.datastax.com/hc/en-us/articles/360001375763-Handling-schema-disagreements-and-Schema-version-mismatch-detected-on-node-restart">requires manual intervention of database administrators</a>, and restarting of the affected nodes</li>
<li>(<a href="https://issues.apache.org/jira/browse/ZEPPELIN-4393">ZEPPELIN-4393</a>) added support for <code>--</code> comment style, in addition to already supported <span style="font-family: courier;"><code>//</code></span> and <span style="font-family: courier;"><code>/* .. */</code></span> styles</li>
<li>(<a href="https://issues.apache.org/jira/browse/ZEPPELIN-4756">ZEPPELIN-4756</a>) make "No results" messages foldable & folded by default. In previous versions, when we didn't get any results from Cassandra, for example, by executing <code>INSERT/DELETE/UPDATE</code>, or DDL queries, interpreter output a table with statement itself, and information about execution (what hosts were used for execution, etc.). This table occupied quite significant space on the screen, but usually didn't bring much useful information for a user. In the new version, this information is still produced, but it's folded, so it doesn't occupy screen space, and still available if necessary.</li>
</ul>
</div>
</div>
<div class="outline-4" id="outline-container-org3fad7a0">
<h4 id="org3fad7a0">Conclusion</h4>
<div class="outline-text-4" id="text-1-3-4">
<p>
I hope that all described changes will make use of the Cassandra from Zeppelin easier. If you have ideas for a new functionality in Cassandra interpreter, or found a bug, feel free to create an issue at <a href="https://issues.apache.org/jira/browse/ZEPPELIN">Apache Zeppelin's Jira</a>, or drop an email to <a href="https://zeppelin.apache.org/community.html">Zeppelin user mailing list</a>.
</p>
</div>
</div>
Alex Otthttp://www.blogger.com/profile/13001951608173211050noreply@blogger.com0tag:blogger.com,1999:blog-6862508.post-2346082119156469862020-07-28T18:00:00.005+02:002020-10-16T10:26:04.273+02:00Working with DataStax Astra from Databricks platform<p>
One of the notable changes in the <a href="https://www.datastax.com/blog/2020/05/advanced-apache-cassandra-analytics-now-open-all">Spark Cassandra Connector (SCC) 2.5.0</a> is the support for <a href="https://astra.datastax.com/">Astra</a> - DataStax's Cassandra as a Service offering. Having managed Cassandra makes it very easy to start development of the applications - you can create a new database in a couple of minutes. <a href="https://databricks.com/">Databricks</a> is also well known for its Spark-based unified cloud data processing platform. Both Databricks & DataStax offer the free tier, and this combination is an ideal ground for prototypes. This short blog describes how to work with Astra from the Databricks, using free tiers in both cases.
</p>
<p>
To get access to Astra from Databricks, we need the following:
</p>
<ul class="org-ul">
<li>running instance of Astra database - if you don't have it, it's easy to create - just <a href="https://astra.datastax.com/" target="_blank">login to Astra</a>, and press "Create New Database"<br /></li>
<li>credentials (username & password) specified when creating database</li>
<li>secure connect bundle that will be used to establish connection to Astra - this bundle could be downloaded from the database's main page, and need to be uploaded to DBFS (Databricks File System), so it will be available to Spark</li>
<li>Spark cluster configured to use secure connect bundle, together with other parameters - credentials, etc.</li>
</ul>
<p>
First we need to upload the secure connect bundle to DBFS. Easiest way to do it is to go to "Data", click the "Add Data" button, and use the "Upload File" form. After the file is uploaded, remember the path to the uploaded file (like, <code>/FileStore/tables/secure_connect_aott.zip</code>) - we'll need it on the next steps.
</p>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhvhwvEXkWFIjT5Fy13O7Jy9Na6ZFkrAKqs-yOO4qDFIvqTpVixu158hYoQZrrEeIl2Hn8CVIq3Uq8cbErfBnkaWSgR_17I7PU0jWfdLnQBmvJYFlkpRJXNisX2QXhiaXeFBTwTKA/s721/astra-databricks-upload-bundle.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="497" data-original-width="721" height="431" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhvhwvEXkWFIjT5Fy13O7Jy9Na6ZFkrAKqs-yOO4qDFIvqTpVixu158hYoQZrrEeIl2Hn8CVIq3Uq8cbErfBnkaWSgR_17I7PU0jWfdLnQBmvJYFlkpRJXNisX2QXhiaXeFBTwTKA/w625-h431/astra-databricks-upload-bundle.png" width="625" /></a></div><p></p>
<p>
Then we need to create a Spark cluster. Go to "Clusters" and click "Create Cluster". Select runtime, either Spark 2.4, or Spark 3.0 - depending on the version selected we'll need to use different versions of Spark Cassandra Connector. Click "Spark" link and enter configuration parameters there:
</p>
<ul class="org-ul">
<li><code>spark.cassandra.auth.username</code> - user name to connect to database (<code>test</code> in my case)</li>
<li><code>spark.cassandra.auth.password</code> - password for user (<code>123456</code> in my case)</li>
<li><code>spark.cassandra.connection.config.cloud.path</code> - path to the uploaded file with secure connect bundle (<code>dbfs:/FileStore/tables/secure_connect_aott.zip</code>)</li>
<li><code>spark.dse.continuousPagingEnabled</code> with value <code>false</code> - this is a workaround for <a href="https://datastax-oss.atlassian.net/browse/SPARKC-602">SPARKC-602</a> that we need to apply right now to avoid errors when reading data from Astra</li>
</ul>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEiDjWTi_JrJTfe16M0wKdfsxP0-j03yH_Agwx5zpAIRkXizG5pL7PE3hx-rHr4DBlYIYtARZ6znXQayeNUutOd8Rrah5kb65R3S7kK8iZzQ4WNe6oSIFKb1QjZy88HSwcoQlLv0_A/s847/astra-databricks-create-cluster.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="607" data-original-width="847" height="458" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEiDjWTi_JrJTfe16M0wKdfsxP0-j03yH_Agwx5zpAIRkXizG5pL7PE3hx-rHr4DBlYIYtARZ6znXQayeNUutOd8Rrah5kb65R3S7kK8iZzQ4WNe6oSIFKb1QjZy88HSwcoQlLv0_A/w640-h458/astra-databricks-create-cluster.png" width="640" /></a></div><p></p>
<p>
After entering all this data, press "Create cluster" - this will take you to the list of your running clusters. If you point a mouse to the instance that is creating right now, you can see several links, like "Libraries / Spark UI / Logs" - we need to select "Libraries" to add Spark Cassandra Connector to a cluster. In the opened page, click "Install New" - this will open the dialog for addition of the library. Select the "Maven" tab. Because we have dependency conflict between SCC and Databricks runtime we must not use the <code>spark-cassandra-connector</code> artifact, but the assembly version of it: <code>spark-cassandra-connector-assembly</code> (see <a href="https://datastax-oss.atlassian.net/browse/SPARKC-601">SPARKC-601</a> for details). For runtime version 6.x we need to use <code>com.datastax.spark:spark-cassandra-connector-assembly_2.11:2.5.1</code>, and for runtime 7.0 we need to take <code>com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0-beta</code> (or released version when it's done). Click "Install" to add the library to a cluster. (Please note that to use <code>SparkCassandraExtensions</code>, for DirectJoin, for example, you need to have a <a href="https://docs.databricks.com/clusters/init-scripts.html">cluster init script</a> in place, that should copy the assembly before the driver & executor will start...)<br /></p>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjOZ7H7FwkBos4LoXJ6tOAXOWiDbuR_FKNL6-bAbQbXkY82smEH1cQpjRCnaIKsKLvxtJOzhWVixA4dBSeSNV_IO9mi5sUhlrpq_I-k22gDJoawUW0eUSTeDeDiPoA3r5MHFPnISg/s1122/astra-databricks-add-library.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="515" data-original-width="1122" height="294" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjOZ7H7FwkBos4LoXJ6tOAXOWiDbuR_FKNL6-bAbQbXkY82smEH1cQpjRCnaIKsKLvxtJOzhWVixA4dBSeSNV_IO9mi5sUhlrpq_I-k22gDJoawUW0eUSTeDeDiPoA3r5MHFPnISg/w640-h294/astra-databricks-add-library.png" width="640" /></a></div><p></p>
<p>
To make this blog post self contained and not dependent on the previously created tables & loaded data, let's generate test data, create a table using SCC, and write data into that table:
</p>
<div class="org-src-container">
<pre class="src src-scala"><span style="color: #a020f0;">import</span> org.apache.spark.sql.cassandra.<span style="color: #a020f0;">_</span>
<span style="color: #a020f0;">import</span> com.datastax.spark.connector.<span style="color: #a020f0;">_</span>
<span style="color: #a020f0;">import</span> com.datastax.spark.connector.cql.<span style="color: darkcyan;">ClusteringColumn</span>
<span style="color: #a020f0;">val</span> <span style="color: sienna;">newData</span> <span style="color: #a020f0;">=</span> spark.range(<span style="color: darkcyan;">1</span>, <span style="color: darkcyan;">1000</span>)
.select($<span style="color: #8b2252;">"id"</span>.as(<span style="color: #8b2252;">"pk"</span>), $<span style="color: #8b2252;">"id"</span>.cast(<span style="color: #8b2252;">"int"</span>).as(<span style="color: #8b2252;">"c1"</span>),
$<span style="color: #8b2252;">"id"</span>.cast(<span style="color: #8b2252;">"int"</span>).as(<span style="color: #8b2252;">"c2"</span>), $<span style="color: #8b2252;">"id"</span>.cast(<span style="color: #8b2252;">"string"</span>).as(<span style="color: #8b2252;">"str"</span>))
<span style="color: #a020f0;">val</span> <span style="color: sienna;">ksName</span> <span style="color: #a020f0;">=</span> <span style="color: #8b2252;">"test"</span>
<span style="color: #a020f0;">val</span> <span style="color: sienna;">tableName</span> <span style="color: #a020f0;">=</span> <span style="color: #8b2252;">"newdata"</span>
newData.createCassandraTableEx(ksName, tableName,
partitionKeyColumns <span style="color: #a020f0;">=</span> <span style="color: darkcyan;">Seq</span>(<span style="color: #8b2252;">"pk"</span>),
clusteringKeyColumns <span style="color: #a020f0;">=</span> <span style="color: darkcyan;">Seq</span>(
(<span style="color: #8b2252;">"c1"</span>, <span style="color: darkcyan;">ClusteringColumn</span>.<span style="color: darkcyan;">Ascending</span>),
(<span style="color: #8b2252;">"c2"</span>, <span style="color: darkcyan;">ClusteringColumn</span>.<span style="color: darkcyan;">Descending</span>)),
ifNotExists <span style="color: #a020f0;">=</span> <span style="color: darkcyan;">true</span>)
newData.write.cassandraFormat(tableName, ksName).mode(<span style="color: #8b2252;">"append"</span>).save
</pre>
</div>
<p>
This code will generate a dataframe with 999 rows, create a table with following structure, and write data into it:
</p>
<div class="org-src-container">
<pre class="src src-cql"><span style="color: #a020f0;">CREATE TABLE</span> <span style="color: blue;">test.newdata</span> (
pk <span style="color: forestgreen;">bigint</span>,
c1 <span style="color: forestgreen;">int</span>,
c2 <span style="color: forestgreen;">int</span>,
str <span style="color: forestgreen;">text</span>,
<span style="color: #a020f0;">PRIMARY KEY</span> (pk, c1, c2)
) <span style="color: #a020f0;">WITH CLUSTERING ORDER BY</span> (c1 <span style="color: #a020f0;">ASC</span>, c2 <span style="color: #a020f0;">DESC</span>);
</pre>
</div>
<p>
To check that data is written correctly, let read them into another dataframe, print its schema, and count number of rows:
</p>
<div class="org-src-container">
<pre class="src src-scala"><span style="color: #a020f0;">val</span> <span style="color: sienna;">data</span> <span style="color: #a020f0;">=</span> spark.read.cassandraFormat(tableName, ksName).load
data.printSchema
data.count
</pre>
</div>
<p>
as expected, this will print schema as:
</p>
<pre class="example">root
|-- pk: long (nullable = false)
|-- c1: integer (nullable = false)
|-- c2: integer (nullable = false)
|-- str: string (nullable = true)
</pre>
<p>
and output the number of rows, as expected it's 999.
</p>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgQ6QwpIUF3TswLfAeAnr5ABPa6r0nMAnD1LJJag3duro0vV4ikZ5vuc4KegcOFsR13WNjakqY7Nd2KOcQAiy4jKivQIRX1pDZ0kwA8cnvOpFanomuIsAZMRNFIsNHctn5VZTBEUQ/s969/astra-databricks-notebook.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="938" data-original-width="969" height="620" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgQ6QwpIUF3TswLfAeAnr5ABPa6r0nMAnD1LJJag3duro0vV4ikZ5vuc4KegcOFsR13WNjakqY7Nd2KOcQAiy4jKivQIRX1pDZ0kwA8cnvOpFanomuIsAZMRNFIsNHctn5VZTBEUQ/w640-h620/astra-databricks-notebook.png" width="640" /></a></div><p></p>
<p>
That's all! You can continue to use Spark Cassandra Connector to work with data in Astra using either Dataframe, or RDD APIs - all functionality is the same, including <a href="https://alexott.blogspot.com/2020/07/spark-effective-joins-with-cassandra.html">joins with Cassandra</a>, writing streaming data into it, etc. See <a href="https://github.com/datastax/spark-cassandra-connector/tree/b2.5/doc">Spark Cassandra Connector documentation</a> for more information.
</p>
Alex Otthttp://www.blogger.com/profile/13001951608173211050noreply@blogger.com1tag:blogger.com,1999:blog-6862508.post-297325427452041822020-07-27T15:17:00.005+02:002020-08-24T22:48:44.554+02:00Spark & efficient joins with Cassandra<p>
In modern data processing, especially when handling streaming data, quite often there is a need for enrichment of data coming from external sources. High-level diagram for such data processing usually looks as following:
</p>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjY6cF3M0n8ZteH4EextITAHEN8kGvn6rfEkRvoL3zy-4oguOIQh-o3DzZZJnKtu1eLMcBOI2JqGzSJOY9ZGB8DTVC7ygHNdxyzaT04KDVC5CY3f9Pq6L7au7ibs_4DQfj40H6O8g/s579/data-enrichment.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="176" data-original-width="579" height="189" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjY6cF3M0n8ZteH4EextITAHEN8kGvn6rfEkRvoL3zy-4oguOIQh-o3DzZZJnKtu1eLMcBOI2JqGzSJOY9ZGB8DTVC7ygHNdxyzaT04KDVC5CY3f9Pq6L7au7ibs_4DQfj40H6O8g/w625-h189/data-enrichment.png" width="625" /></a></div><p></p>
<p>
For effective enrichment of the data, the database that holds that additional information should provide low-latency, high throughput access to the data. And Apache Cassandra with its very fast reads by primary key, is the ideal candidate for storing the data that will be used for enrichment (maybe in addition to being a storage of processed data).
</p>
<p>
Apache Spark is often a base for implementation of the data processing pipelines, for both batch & streaming data. And it has very good support for Cassandra provided by the <a href="https://github.com/datastax/spark-cassandra-connector">Spark Cassandra Connector (SCC)</a>. Connector provides access to Cassandra via both RDD & Dataframe APIs, and recently released SCC 2.5 <a href="https://www.datastax.com/blog/2020/05/advanced-apache-cassandra-analytics-now-open-all">added a lot of the new functionality</a> that earlier was available only as a part of <a href="https://www.datastax.com/products/datastax-enterprise">DataStax Enterprise</a>, including support for effective joins with Cassandra for dataframes.
</p>
<p>
Spark Cassandra Connector has optimizations for executing join of dataframe or RDD with data in Cassandra - data that is used to join are converted into requests to individual partitions or rows that are executed in parallel, avoiding the full scan of the data in Cassandra (there are settings that define the thresholds when SCC will decide to do a full scan vs individual requests). Russell Spitzer has a <a href="http://www.russellspitzer.com/2018/05/23/DSEDirectJoin/">very good blog post</a> about joins in dataframes, including information about its performance. SCC allows to perform either inner join, or left join between RDD/dataframe and Cassandra. One of the useful things when performing joins is that it reflects changes done in Cassandra, so you can always have access to the latest data. You can also use caching on data in Cassandra to avoid hitting Cassandra on every join, and periodically refresh the cached version to pull the latest changes.
</p>
<p>
We'll use following table definition and data in the most of examples shown below:
</p>
<div class="org-src-container">
<pre class="src src-cql"><span style="color: #a020f0;">create table</span> <span style="color: blue;">test.jtest1</span> (
pk <span style="color: forestgreen;">int</span>,
c1 <span style="color: forestgreen;">int</span>,
c2 <span style="color: forestgreen;">int</span>,
v <span style="color: forestgreen;">text</span>,
<span style="color: #a020f0;">primary key</span>(pk, c1, c2));
<span style="color: #a020f0;">insert into</span> test.jtest1(pk, c1, c2, v) <span style="color: #a020f0;">values</span> (1, 1, 1, <span style="color: #8b2252;">'t1-1-1'</span>);
<span style="color: #a020f0;">insert into</span> test.jtest1(pk, c1, c2, v) <span style="color: #a020f0;">values</span> (1, 1, 2, <span style="color: #8b2252;">'t1-1-2'</span>);
<span style="color: #a020f0;">insert into</span> test.jtest1(pk, c1, c2, v) <span style="color: #a020f0;">values</span> (1, 2, 1, <span style="color: #8b2252;">'t1-2-1'</span>);
<span style="color: #a020f0;">insert into</span> test.jtest1(pk, c1, c2, v) <span style="color: #a020f0;">values</span> (1, 2, 2, <span style="color: #8b2252;">'t1-2-2'</span>);
<span style="color: #a020f0;">insert into</span> test.jtest1(pk, c1, c2, v) <span style="color: #a020f0;">values</span> (2, 1, 1, <span style="color: #8b2252;">'t2-1-1'</span>);
<span style="color: #a020f0;">insert into</span> test.jtest1(pk, c1, c2, v) <span style="color: #a020f0;">values</span> (2, 1, 2, <span style="color: #8b2252;">'t2-1-2'</span>);
<span style="color: #a020f0;">insert into</span> test.jtest1(pk, c1, c2, v) <span style="color: #a020f0;">values</span> (2, 2, 1, <span style="color: #8b2252;">'t2-2-1'</span>);
<span style="color: #a020f0;">insert into</span> test.jtest1(pk, c1, c2, v) <span style="color: #a020f0;">values</span> (2, 2, 2, <span style="color: #8b2252;">'t2-2-2'</span>);
</pre>
</div>
<p>
The join condition could be on:
</p>
<ul class="org-ul">
<li>full partition key (<code>pk</code> column) - in this case, SCC will pull all rows from that partition and create N rows for each input row</li>
<li>partial primary key with all preceding clustering columns should be specified, for example (<code>pk</code> + <code>c1</code> columns) - SCC will pull all rows that are matching to the given range query, and create so many rows for each input row</li>
<li>full primary key (<code>pk</code> + <code>c1</code> + <code>c2</code>) - in this case SCC will pull only one row, if it exists, and use that data for joining</li>
</ul>
<p>
The join isn't supported on following:
</p>
<ul class="org-ul">
<li>partial partition key in case of composite partition key</li>
<li>on the clustering columns that are not preceded by previous clustering columns, for example, <code>pk</code> + <code>c2</code> without <code>c1</code></li>
<li>other join types, like, <code>right</code>, or <code>full</code></li>
</ul>
<p>
In such cases, depending on API, SCC either throws an error, or will fallback to the performing full scan of the Cassandra table and execution of the join on the Spark side.
</p>
<div class="outline-4" id="outline-container-orgcb57400">
<h4 id="orgcb57400"><span class="section-number-4"></span>Joins in Dataframe API</h4>
<div class="outline-text-4" id="text-1-3-1">
<p>
Let's start with the <a href="https://github.com/datastax/spark-cassandra-connector/blob/b2.5/doc/14_data_frames.md">Dataframe API</a> that is recommended to use in modern Spark. Support for effective joins of dataframes with data in Cassandra for a long time was only available in DSE Analytics - commercial distribution of Cassandra and Spark developed by DataStax, and open source version of SCC had support for joins only in RDD API. But with release of <a href="https://www.datastax.com/blog/2020/05/advanced-apache-cassandra-analytics-now-open-all">SCC 2.5</a>, join of dataframes also became available for all users of the open source version of SCC.
</p>
<p>
Please note that this functionality is not enabled by default (together with some other, like, support for <code>ttl</code> and <code>writetime</code> functions). You need to <a href="https://github.com/datastax/spark-cassandra-connector/blob/b2.5/doc/14_data_frames.md#special-cassandra-catalyst-rules-since-scc-25">enable special Catalyst rules</a> by setting configuration parameter <code>spark.sql.extensions</code> to a value <code>com.datastax.spark.connector.CassandraSparkExtensions</code> when starting Spark shell, or submitting a job. Something like this:
</p>
<div class="org-src-container">
<pre class="src src-shell">bin/spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.5.1 <span style="color: #8b2252;">\</span>
--conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions
</pre>
</div>
<p>
If you forget to do this, SCC won't optimize your join, and it will be performed as usual - by reading all data from Cassandra, and executing join in Spark (with shuffle!). You can always check that this optimization is applied by running <code>dataframe.explain</code>, and looking for a string "Cassandra Direct Join" in the physical plan - we'll see that in the examples below (if you're running code on the DSE Analytics, it will be "DSE Direct Join").
</p>
<p>
Let's look at specific examples of performing joins of dataframe with data in Cassandra. We're using the <code>test.jtest1</code> defined above to demonstrate the behaviour when we're using only partition key, and complete or partial primary key. All dataframe examples have following code in common:
</p>
<div class="org-src-container">
<pre class="src src-scala"><span style="color: #a020f0;">import</span> spark.implicits.<span style="color: #a020f0;">_</span>
<span style="color: #a020f0;">import</span> org.apache.spark.sql.cassandra.<span style="color: #a020f0;">_</span>
<span style="color: #a020f0;">val</span> <span style="color: sienna;">cassdata</span> <span style="color: #a020f0;">=</span> spark.read.cassandraFormat(<span style="color: #8b2252;">"jtest1"</span>, <span style="color: #8b2252;">"test"</span>).load
</pre>
</div>
<p>
We're starting with a partition key only. For that we generate the dataframe with one column with name <code>id</code> and values from one to four:
</p>
<div class="org-src-container">
<pre class="src src-scala"><span style="color: #a020f0;">val</span> <span style="color: sienna;">toJoin</span> <span style="color: #a020f0;">=</span> spark.range(<span style="color: darkcyan;">1</span>, <span style="color: darkcyan;">5</span>).select($<span style="color: #8b2252;">"id"</span>.cast(<span style="color: #8b2252;">"int"</span>).as(<span style="color: #8b2252;">"id"</span>))
<span style="color: #a020f0;">val</span> <span style="color: sienna;">joined</span> <span style="color: #a020f0;">=</span> toJoin.join(cassdata, cassdata(<span style="color: #8b2252;">"pk"</span>) === toJoin(<span style="color: #8b2252;">"id"</span>))
</pre>
</div>
<p>
After its execution, we can check that SCC optimized this join by executing <code>explain</code>:
</p>
<pre class="example">scala> joined.explain
== Physical Plan ==
Cassandra Direct Join [pk = id#2] test.jtest1 - Reading (pk, c1, c2, v) Pushed {}
+- *(1) Project [cast(id#0L as int) AS id#2]
+- *(1) Range (1, 5, step=1, splits=8)
</pre>
<p>
and we check that we have correct data pulled from Cassandra. We can see that SCC pulled all rows from partitions <code>1</code> and <code>2</code> and converted them into individual rows in the resulting dataframe:
</p>
<pre class="example">scala> joined.count
res1: Long = 8
scala> joined.show
+---+---+---+---+------+
| id| pk| c1| c2| v|
+---+---+---+---+------+
| 1| 1| 1| 1|t1-1-1|
| 1| 1| 1| 2|t1-1-2|
| 1| 1| 2| 1|t1-2-1|
| 1| 1| 2| 2|t1-2-2|
| 2| 2| 1| 1|t2-1-1|
| 2| 2| 1| 2|t2-1-2|
| 2| 2| 2| 1|t2-2-1|
| 2| 2| 2| 2|t2-2-2|
+---+---+---+---+------+
</pre>
<p>
Handling of the partial primary key is similar - we're generating similar dataframe but with 2 columns, and join it:
</p>
<div class="org-src-container">
<pre class="src src-scala"><span style="color: #a020f0;">val</span> <span style="color: sienna;">toJoin</span> <span style="color: #a020f0;">=</span> spark.range(<span style="color: darkcyan;">1</span>, <span style="color: darkcyan;">5</span>).select($<span style="color: #8b2252;">"id"</span>.cast(<span style="color: #8b2252;">"int"</span>).as(<span style="color: #8b2252;">"id"</span>))
.select($<span style="color: #8b2252;">"id"</span>, $<span style="color: #8b2252;">"id"</span>.as(<span style="color: #8b2252;">"cc1"</span>))
<span style="color: #a020f0;">val</span> <span style="color: sienna;">joined</span> <span style="color: #a020f0;">=</span> toJoin.join(cassdata, cassdata(<span style="color: #8b2252;">"pk"</span>) === toJoin(<span style="color: #8b2252;">"id"</span>)
&& cassdata(<span style="color: #8b2252;">"c1"</span>) === toJoin(<span style="color: #8b2252;">"cc1"</span>))
</pre>
</div>
<p>
Again we see that SCC optimized that query:
</p>
<pre class="example">scala> joined.explain
== Physical Plan ==
Cassandra Direct Join [pk = id#30, c1 = cc1#32] test.jtest1 - Reading (pk, c1, c2, v) Pushed {}
+- *(1) Project [cast(id#28L as int) AS id#30, cast(id#28L as int) AS cc1#32]
+- *(1) Range (1, 5, step=1, splits=8)
scala> joined.count
res8: Long = 4
scala> joined.show
+---+---+---+---+---+------+
| id|cc1| pk| c1| c2| v|
+---+---+---+---+---+------+
| 1| 1| 1| 1| 1|t1-1-1|
| 1| 1| 1| 1| 2|t1-1-2|
| 2| 2| 2| 2| 1|t2-2-1|
| 2| 2| 2| 2| 2|t2-2-2|
+---+---+---+---+---+------+
</pre>
<p>
And with full primary key behaviour is the same:
</p>
<div class="org-src-container">
<pre class="src src-scala"><span style="color: #a020f0;">val</span> <span style="color: sienna;">toJoin</span> <span style="color: #a020f0;">=</span> spark.range(<span style="color: darkcyan;">1</span>, <span style="color: darkcyan;">5</span>).select($<span style="color: #8b2252;">"id"</span>.cast(<span style="color: #8b2252;">"int"</span>).as(<span style="color: #8b2252;">"id"</span>))
.select($<span style="color: #8b2252;">"id"</span>, $<span style="color: #8b2252;">"id"</span>.as(<span style="color: #8b2252;">"cc1"</span>), $<span style="color: #8b2252;">"id"</span>.as(<span style="color: #8b2252;">"cc2"</span>))
<span style="color: #a020f0;">val</span> <span style="color: sienna;">joined</span> <span style="color: #a020f0;">=</span> toJoin.join(cassdata, cassdata(<span style="color: #8b2252;">"pk"</span>) === toJoin(<span style="color: #8b2252;">"id"</span>)
&& cassdata(<span style="color: #8b2252;">"c1"</span>) === toJoin(<span style="color: #8b2252;">"cc1"</span>)
&& cassdata(<span style="color: #8b2252;">"c2"</span>) === toJoin(<span style="color: #8b2252;">"cc2"</span>))
</pre>
</div>
<p>
We have one-to-one correspondence of rows from the generated dataframe with row in the Cassandra:
</p>
<pre class="example">scala> joined.explain
== Physical Plan ==
Cassandra Direct Join [pk = id#318, c1 = cc1#320, c2 = cc2#321] test.jtest1 - Reading (pk, c1, c2, v) Pushed {}
+- *(1) Project [cast(id#316L as int) AS id#318, cast(id#316L as int) AS cc1#320, cast(id#316L as int) AS cc2#321]
+- *(1) Range (1, 5, step=1, splits=8)
scala> joined.count
res13: Long = 2
scala> joined.show
+---+---+---+---+---+---+------+
| id|cc1|cc2| pk| c1| c2| v|
+---+---+---+---+---+---+------+
| 1| 1| 1| 1| 1| 1|t1-1-1|
| 2| 2| 2| 2| 2| 2|t2-2-2|
+---+---+---+---+---+---+------+
</pre>
<p>
Left join isn't much different - we only need explicitly specify it with "left" or "leftouter" argument:
</p>
<div class="org-src-container">
<pre class="src src-scala"><span style="color: #a020f0;">val</span> <span style="color: sienna;">toJoin</span> <span style="color: #a020f0;">=</span> spark.range(<span style="color: darkcyan;">1</span>, <span style="color: darkcyan;">5</span>).select($<span style="color: #8b2252;">"id"</span>.cast(<span style="color: #8b2252;">"int"</span>).as(<span style="color: #8b2252;">"id"</span>))
.select($<span style="color: #8b2252;">"id"</span>, $<span style="color: #8b2252;">"id"</span>.as(<span style="color: #8b2252;">"cc1"</span>), $<span style="color: #8b2252;">"id"</span>.as(<span style="color: #8b2252;">"cc2"</span>))
<span style="color: #a020f0;">val</span> <span style="color: sienna;">joined</span> <span style="color: #a020f0;">=</span> toJoin.join(cassdata, cassdata(<span style="color: #8b2252;">"pk"</span>) === toJoin(<span style="color: #8b2252;">"id"</span>)
&& cassdata(<span style="color: #8b2252;">"c1"</span>) === toJoin(<span style="color: #8b2252;">"cc1"</span>)
&& cassdata(<span style="color: #8b2252;">"c2"</span>) === toJoin(<span style="color: #8b2252;">"cc2"</span>),
<span style="color: #8b2252;">"left"</span>)
</pre>
</div>
<p>
and again, we see that SCC optimized the query. The only difference is that it retains the rows for which we didn't find rows in Cassandra:
</p>
<pre class="example">scala> joined.explain
== Physical Plan ==
Cassandra Direct Join [pk = id#349, c1 = cc1#351, c2 = cc2#352] test.jtest1 - Reading (pk, c1, c2, v) Pushed {}
+- *(1) Project [cast(id#347L as int) AS id#349, cast(id#347L as int) AS cc1#351, cast(id#347L as int) AS cc2#352]
+- *(1) Range (1, 5, step=1, splits=8)
scala> joined.count
res5: Long = 4
scala> joined.show
+---+---+---+----+----+----+------+
| id|cc1|cc2| pk| c1| c2| v|
+---+---+---+----+----+----+------+
| 1| 1| 1| 1| 1| 1|t1-1-1|
| 2| 2| 2| 2| 2| 2|t2-2-2|
| 3| 3| 3|null|null|null| null|
| 4| 4| 4|null|null|null| null|
+---+---+---+----+----+----+------+
</pre>
<p>
But if we try to perform right or full join:
</p>
<div class="org-src-container">
<pre class="src src-scala"><span style="color: #a020f0;">val</span> <span style="color: sienna;">joined</span> <span style="color: #a020f0;">=</span> toJoin.join(cassdata, cassdata(<span style="color: #8b2252;">"pk"</span>) === toJoin(<span style="color: #8b2252;">"id"</span>)
&& cassdata(<span style="color: #8b2252;">"c1"</span>) === toJoin(<span style="color: #8b2252;">"cc1"</span>)
&& cassdata(<span style="color: #8b2252;">"c2"</span>) === toJoin(<span style="color: #8b2252;">"cc2"</span>),
<span style="color: #8b2252;">"right"</span>)
</pre>
</div>
<p>
then we'll see that it's executed by reading the data from the whole table, and performing join on the Spark level (this is example for "right" join, plan for "full" join looks slightly different):
</p>
<pre class="example">scala> joined.explain
== Physical Plan ==
*(2) BroadcastHashJoin [id#56, cc1#58, cc2#59], [pk#4, c1#5, c2#6], RightOuter, BuildLeft
:- BroadcastExchange HashedRelationBroadcastMode(List(input[0, int, false], input[1, int, false], input[2, int, false]))
: +- *(1) Project [cast(id#54L as int) AS id#56, cast(id#54L as int) AS cc1#58, cast(id#54L as int) AS cc2#59]
: +- *(1) Range (1, 5, step=1, splits=8)
+- *(2) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [pk#4,c1#5,c2#6,v#7] PushedFilters: [], ReadSchema: struct<pk:int,c1:int,c2:int,v:string>
scala> joined.show
+----+----+----+---+---+---+------+
| id| cc1| cc2| pk| c1| c2| v|
+----+----+----+---+---+---+------+
| 1| 1| 1| 1| 1| 1|t1-1-1|
|null|null|null| 1| 1| 2|t1-1-2|
|null|null|null| 1| 2| 1|t1-2-1|
|null|null|null| 1| 2| 2|t1-2-2|
|null|null|null| 2| 1| 1|t2-1-1|
|null|null|null| 2| 1| 2|t2-1-2|
|null|null|null| 2| 2| 1|t2-2-1|
| 2| 2| 2| 2| 2| 2|t2-2-2|
+----+----+----+---+---+---+------+
</pre>
<p>
As it was mentioned above, in case of the partial primary key, all preceding clustering columns need to be specified in joining condition as well. If we don't do that, like in this example that joins on partition key & second clustering column:
</p>
<div class="org-src-container">
<pre class="src src-scala"><span style="color: #a020f0;">val</span> <span style="color: sienna;">toJoin</span> <span style="color: #a020f0;">=</span> spark.range(<span style="color: darkcyan;">1</span>, <span style="color: darkcyan;">5</span>).select($<span style="color: #8b2252;">"id"</span>.cast(<span style="color: #8b2252;">"int"</span>).as(<span style="color: #8b2252;">"id"</span>))
.select($<span style="color: #8b2252;">"id"</span>, $<span style="color: #8b2252;">"id"</span>.as(<span style="color: #8b2252;">"cc2"</span>))
<span style="color: #a020f0;">val</span> <span style="color: sienna;">joined</span> <span style="color: #a020f0;">=</span> toJoin.join(cassdata, cassdata(<span style="color: #8b2252;">"pk"</span>) === toJoin(<span style="color: #8b2252;">"id"</span>)
&& cassdata(<span style="color: #8b2252;">"c2"</span>) === toJoin(<span style="color: #8b2252;">"cc2"</span>))
</pre>
</div>
<p>
then we'll get an error saying that we can't do that (please note that error will be thrown only during reading of the data, not when you're declaring the join):
</p>
<pre class="example">scala> joined.show
java.lang.IllegalArgumentException: Can't pushdown join on column ColumnDef(c2,ClusteringColumn(1,ASC),IntType) without also specifying [ Set(ColumnDef(c1,ClusteringColumn(0,ASC),IntType)) ]
at com.datastax.spark.connector.rdd.AbstractCassandraJoin$class.checkValidJoin(AbstractCassandraJoin.scala:114)
at com.datastax.spark.connector.rdd.CassandraJoinRDD.checkValidJoin(CassandraJoinRDD.scala:26)
at com.datastax.spark.connector.rdd.AbstractCassandraJoin$class.getPartitions(AbstractCassandraJoin.scala:210)
at com.datastax.spark.connector.rdd.CassandraJoinRDD.getPartitions(CassandraJoinRDD.scala:26)
</pre>
<p>
if you still want to do it, then you need to set <code>directJoinSetting</code> to <code>off</code> when reading data, like this:
</p>
<div class="org-src-container">
<pre class="src src-scala"><span style="color: #a020f0;">val</span> <span style="color: sienna;">cassdata</span> <span style="color: #a020f0;">=</span> spark.read.cassandraFormat(<span style="color: #8b2252;">"jtest1"</span>, <span style="color: #8b2252;">"test"</span>)
.option(<span style="color: #8b2252;">"directJoinSetting"</span>, <span style="color: #8b2252;">"off"</span>).load
<span style="color: #a020f0;">val</span> <span style="color: sienna;">toJoin</span> <span style="color: #a020f0;">=</span> spark.range(<span style="color: darkcyan;">1</span>, <span style="color: darkcyan;">5</span>).select($<span style="color: #8b2252;">"id"</span>.cast(<span style="color: #8b2252;">"int"</span>).as(<span style="color: #8b2252;">"id"</span>))
.select($<span style="color: #8b2252;">"id"</span>, $<span style="color: #8b2252;">"id"</span>.as(<span style="color: #8b2252;">"cc2"</span>))
<span style="color: #a020f0;">val</span> <span style="color: sienna;">joined</span> <span style="color: #a020f0;">=</span> toJoin.join(cassdata, cassdata(<span style="color: #8b2252;">"pk"</span>) === toJoin(<span style="color: #8b2252;">"id"</span>)
&& cassdata(<span style="color: #8b2252;">"c2"</span>) === toJoin(<span style="color: #8b2252;">"cc2"</span>))
</pre>
</div>
<p>
and this will force SCC to perform full table scan:
</p>
<pre class="example">scala> joined.explain
== Physical Plan ==
*(2) BroadcastHashJoin [id#195, cc2#197], [pk#185, c2#187], Inner, BuildLeft
:- BroadcastExchange HashedRelationBroadcastMode(List((shiftleft(cast(input[0, int, false] as bigint), 32) | (cast(input[1, int, false] as bigint) & 4294967295))))
: +- *(1) Project [cast(id#193L as int) AS id#195, cast(id#193L as int) AS cc2#197]
: +- *(1) Range (1, 5, step=1, splits=8)
+- *(2) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [pk#185,c1#186,c2#187,v#188] PushedFilters: [], ReadSchema: struct<pk:int,c1:int,c2:int,v:string>
scala> joined.show
+---+---+---+---+---+------+
| id|cc2| pk| c1| c2| v|
+---+---+---+---+---+------+
| 1| 1| 1| 1| 1|t1-1-1|
| 1| 1| 1| 2| 1|t1-2-1|
| 2| 2| 2| 1| 2|t2-1-2|
| 2| 2| 2| 2| 2|t2-2-2|
+---+---+---+---+---+------+
</pre>
<p>
Theoretically direct join should also work for Spark SQL, like this:
</p>
<div class="org-src-container">
<pre class="src src-scala"><span style="color: #a020f0;">val</span> <span style="color: sienna;">toJoin</span> <span style="color: #a020f0;">=</span> spark.range(<span style="color: darkcyan;">1</span>, <span style="color: darkcyan;">5</span>).select($<span style="color: #8b2252;">"id"</span>.cast(<span style="color: #8b2252;">"int"</span>).as(<span style="color: #8b2252;">"id"</span>))
toJoin.createOrReplaceTempView(<span style="color: #8b2252;">"tojoin"</span>)
spark.sql(<span style="color: #8b2252;">"""CREATE OR REPLACE TEMPORARY VIEW cassdata</span>
<span style="color: #8b2252;"> USING org.apache.spark.sql.cassandra</span>
<span style="color: #8b2252;"> OPTIONS (table "jtest1", keyspace "test", pushdown "true", directJoinSetting "auto")"""</span>)
<span style="color: #a020f0;">val</span> <span style="color: sienna;">joined</span> <span style="color: #a020f0;">=</span> spark.sql(<span style="color: #8b2252;">"select * from tojoin tj inner join cassdata cd on tj.id = cd.pk"</span>)
</pre>
</div>
<p>
but if we look into execution plan, we can see that it doesn't happen:
</p>
<pre class="example">scala> joined.explain
== Physical Plan ==
*(2) BroadcastHashJoin [id#552], [pk#554], Inner, BuildLeft
:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
: +- *(1) Project [cast(id#550L as int) AS id#552]
: +- *(1) Range (1, 5, step=1, splits=8)
+- *(2) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [pk#554,c1#555,c2#556,v#557] PushedFilters: [], ReadSchema: struct<pk:int,c1:int,c2:int,v:string>
</pre>
<p>
This is investigated as <a href="https://datastax-oss.atlassian.net/browse/SPARKC-613">SPARKC-613</a>, and hopefully will be fixed.
</p>
</div>
</div>
<div class="outline-4" id="outline-container-orgc69cb7b">
<h4 id="orgc69cb7b"><span class="section-number-4"></span>Joins in RDD API</h4>
<div class="outline-text-4" id="text-1-3-2">
<p>
For a long time, RDD API was only a way to perform effective joins with data in Cassandra. Spark Cassandra Connector <a href="https://github.com/datastax/spark-cassandra-connector/blob/b2.5/doc/2_loading.md#using-joinwithcassandratable">provides two functions for performing joins</a>: <code>joinWithCassandraTable</code> and <code>leftJoinWithCassandraTable</code> - they exist in SCC for a long time (since version 1.2, released more than 5 years ago). When executed, both functions return an instance of a special RDD type: <code>CassandraJoinRDD</code> - it has all functions of <code>CassandraRDD</code> API, plus one function (<code>.on</code>) that specifies the list of columns on which join should be performed.
</p>
<p>
Let's re-implement the same examples as above but with RDD API. We're starting with partition key only:
</p>
<div class="org-src-container">
<pre class="src src-scala"><span style="color: #a020f0;">import</span> com.datastax.spark.connector.<span style="color: #a020f0;">_</span>
<span style="color: #a020f0;">val</span> <span style="color: sienna;">toJoin</span> <span style="color: #a020f0;">=</span> sc.parallelize(<span style="color: darkcyan;">1</span> until <span style="color: darkcyan;">5</span>).map(x <span style="color: #a020f0;">=></span> <span style="color: darkcyan;">Tuple1</span>(x.toInt))
<span style="color: #a020f0;">val</span> <span style="color: sienna;">joined</span> <span style="color: #a020f0;">=</span> toJoin.joinWithCassandraTable(<span style="color: #8b2252;">"test"</span>, <span style="color: #8b2252;">"jtest1"</span>)
.on(<span style="color: darkcyan;">SomeColumns</span>(<span style="color: #8b2252;">"pk"</span>))
</pre>
</div>
<p>
Please note that we need explicitly create <code>Tuple1</code> objects, as <code>joinWithCassandraTable</code> expects an RDD of tuples. We can check that we got correct RDD type as result of execution:
</p>
<pre class="example">scala> joined.toDebugString
res21: String =
(8) CassandraJoinRDD[150] at RDD at CassandraRDD.scala:18 []
| ParallelCollectionRDD[147] at parallelize at <console>:33 []
</pre>
<p>
The type of <code>joined</code> is <code>CassandraJoinRDD[(Int,),CassandraRow]</code>, and we can check that with <code>collect</code>:
</p>
<pre class="example">scala> joined.collect
res22: Array[((Int,), com.datastax.spark.connector.CassandraRow)] = Array(
((1,),CassandraRow{pk: 1, c1: 1, c2: 1, v: t1-1-1}),
((1,),CassandraRow{pk: 1, c1: 1, c2: 2, v: t1-1-2}),
((1,),CassandraRow{pk: 1, c1: 2, c2: 1, v: t1-2-1}),
((1,),CassandraRow{pk: 1, c1: 2, c2: 2, v: t1-2-2}),
((2,),CassandraRow{pk: 2, c1: 1, c2: 1, v: t2-1-1}),
((2,),CassandraRow{pk: 2, c1: 1, c2: 2, v: t2-1-2}),
((2,),CassandraRow{pk: 2, c1: 2, c2: 1, v: t2-2-1}),
((2,),CassandraRow{pk: 2, c1: 2, c2: 2, v: t2-2-2})
)
</pre>
<p>
We can access data in <code>CassandraRow</code> using the standard functions, like, <a href="https://github.com/datastax/spark-cassandra-connector/blob/b2.5/doc/2_loading.md#reading-primitive-column-values">getInt, getString, get, …</a>, but it's not always handy. To simplify work with results, both functions <a href="https://github.com/datastax/spark-cassandra-connector/blob/b2.5/doc/4_mapper.md">support mapping of the rows into tuples or into instances of specific (case) classes</a>, that are much easier to use when doing processing of obtained data:
</p>
<div class="org-src-container">
<pre class="src src-scala"><span style="color: #a020f0;">case</span> <span style="color: #a020f0;">class</span> <span style="color: forestgreen;">CassData</span>(pk<span style="color: #a020f0;">:</span> <span style="color: forestgreen;">Int</span>, c1<span style="color: #a020f0;">:</span> <span style="color: forestgreen;">Int</span>, c2<span style="color: #a020f0;">:</span> <span style="color: forestgreen;">Int</span>, v<span style="color: #a020f0;">:</span> <span style="color: forestgreen;">String</span>)
<span style="color: #a020f0;">val</span> <span style="color: sienna;">joined</span> <span style="color: #a020f0;">=</span> toJoin.joinWithCassandraTable[<span style="color: darkcyan;">CassData</span>](<span style="color: #8b2252;">"test"</span>, <span style="color: #8b2252;">"jtest1"</span>)
.on(<span style="color: darkcyan;">SomeColumns</span>(<span style="color: #8b2252;">"pk"</span>))
</pre>
</div>
<p>
and we'll get data in Cassandra mapped into our case class:
</p>
<pre class="example">scala> joined.collect
res23: Array[((Int,), CassData)] = Array(
((1,),CassData(1,1,1,t1-1-1)),
((1,),CassData(1,1,2,t1-1-2)),
((1,),CassData(1,2,1,t1-2-1)),
((1,),CassData(1,2,2,t1-2-2)),
((2,),CassData(2,1,1,t2-1-1)),
((2,),CassData(2,1,2,t2-1-2)),
((2,),CassData(2,2,1,t2-2-1)),
((2,),CassData(2,2,2,t2-2-2))
)
</pre>
<p>
With partial partition key, behaviour is similar - create two elements tuple, and call <code>joinWithCassandraTable</code>:
</p>
<div class="org-src-container">
<pre class="src src-scala"><span style="color: #a020f0;">val</span> <span style="color: sienna;">toJoin</span> <span style="color: #a020f0;">=</span> sc.parallelize(<span style="color: darkcyan;">1</span> until <span style="color: darkcyan;">5</span>).map(x <span style="color: #a020f0;">=></span> (x.toInt, x.toInt))
<span style="color: #a020f0;">val</span> <span style="color: sienna;">joined</span> <span style="color: #a020f0;">=</span> toJoin.joinWithCassandraTable[<span style="color: darkcyan;">CassData</span>](<span style="color: #8b2252;">"test"</span>, <span style="color: #8b2252;">"jtest1"</span>)
.on(<span style="color: darkcyan;">SomeColumns</span>(<span style="color: #8b2252;">"pk"</span>, <span style="color: #8b2252;">"c1"</span>))
</pre>
</div>
<p>
and as expected, we're getting back four rows:
</p>
<pre class="example">scala> joined.collect
res28: Array[((Int, Int), CassData)] = Array(
((1,1),CassData(1,1,1,t1-1-1)),
((1,1),CassData(1,1,2,t1-1-2)),
((2,2),CassData(2,2,1,t2-2-1)),
((2,2),CassData(2,2,2,t2-2-2))
)
</pre>
<p>
Similarly, for full primary key:
</p>
<div class="org-src-container">
<pre class="src src-scala"><span style="color: #a020f0;">val</span> <span style="color: sienna;">toJoin</span> <span style="color: #a020f0;">=</span> sc.parallelize(<span style="color: darkcyan;">1</span> until <span style="color: darkcyan;">5</span>).map(x <span style="color: #a020f0;">=></span> (x.toInt, x.toInt, x.toInt))
<span style="color: #a020f0;">val</span> <span style="color: sienna;">joined</span> <span style="color: #a020f0;">=</span> toJoin.joinWithCassandraTable[<span style="color: darkcyan;">CassData</span>](<span style="color: #8b2252;">"test"</span>, <span style="color: #8b2252;">"jtest1"</span>)
.on(<span style="color: darkcyan;">SomeColumns</span>(<span style="color: #8b2252;">"pk"</span>, <span style="color: #8b2252;">"c1"</span>, <span style="color: #8b2252;">"c2"</span>))
</pre>
</div>
<p>
that gives us two rows:
</p>
<pre class="example">scala> joined.collect
res29: Array[((Int, Int, Int), CassData)] = Array(
((1,1,1),CassData(1,1,1,t1-1-1)),
((2,2,2),CassData(2,2,2,t2-2-2))
)
</pre>
<p>
Left join is done the same way as inner join, only another function is used:
</p>
<div class="org-src-container">
<pre class="src src-scala"><span style="color: #a020f0;">val</span> <span style="color: sienna;">toJoin</span> <span style="color: #a020f0;">=</span> sc.parallelize(<span style="color: darkcyan;">1</span> until <span style="color: darkcyan;">5</span>).map(x <span style="color: #a020f0;">=></span> (x.toInt, x.toInt))
<span style="color: #a020f0;">val</span> <span style="color: sienna;">joined</span> <span style="color: #a020f0;">=</span> toJoin.leftJoinWithCassandraTable[<span style="color: darkcyan;">CassData</span>](<span style="color: #8b2252;">"test"</span>, <span style="color: #8b2252;">"jtest1"</span>)
.on(<span style="color: darkcyan;">SomeColumns</span>(<span style="color: #8b2252;">"pk"</span>, <span style="color: #8b2252;">"c1"</span>))
</pre>
</div>
<p>
But the type of <code>joined</code> is <code>CassandraLeftJoinRDD</code> instead of <code>CassandraJoinRDD</code>, and instead of the <code>CassandraRow</code> or instance of our class, we're getting <code>Option</code> as the data for given key may absent in the Cassandra:
</p>
<pre class="example">scala> joined.collect
res38: Array[((Int, Int, Int), Option[CassData])] = Array(
((1,1,1),Some(CassData(1,1,1,t1-1-1))),
((1,1,1),Some(CassData(1,1,2,t1-1-2))),
((2,2,2),Some(CassData(2,2,1,t2-2-1))),
((2,2,2),Some(CassData(2,2,2,t2-2-2))),
((3,3,3),None),
((4,4,4),None)
)
</pre>
<p>
All examples above did use the tuple to represent the data to join with. But we can also use case classes here - we only need to have field names matching to columns in the table, like here (please note that we need to enforce list of columns, otherwise functions will use just a partition key - it's the same as for tuples):
</p>
<div class="org-src-container">
<pre class="src src-scala"><span style="color: #a020f0;">case</span> <span style="color: #a020f0;">class</span> <span style="color: forestgreen;">ToJoin</span>(pk<span style="color: #a020f0;">:</span> <span style="color: forestgreen;">Int</span>, c1<span style="color: #a020f0;">:</span> <span style="color: forestgreen;">Int</span>)
<span style="color: #a020f0;">val</span> <span style="color: sienna;">toJoin</span> <span style="color: #a020f0;">=</span> sc.parallelize(<span style="color: darkcyan;">1</span> until <span style="color: darkcyan;">5</span>).map(x <span style="color: #a020f0;">=></span> <span style="color: darkcyan;">ToJoin</span>(x.toInt, x.toInt))
<span style="color: #a020f0;">val</span> <span style="color: sienna;">joined</span> <span style="color: #a020f0;">=</span> toJoin.leftJoinWithCassandraTable[<span style="color: darkcyan;">CassData</span>](<span style="color: #8b2252;">"test"</span>, <span style="color: #8b2252;">"jtest1"</span>)
.on(<span style="color: darkcyan;">SomeColumns</span>(<span style="color: #8b2252;">"pk"</span>, <span style="color: #8b2252;">"c1"</span>))
</pre>
</div>
<p>
In this case, it's easier to work with an instance of the case class instead of tuple:
</p>
<pre class="example">scala> joined.collect
res48: Array[(ToJoin, Option[CassData])] = Array(
(ToJoin(1,1),Some(CassData(1,1,1,t1-1-1))),
(ToJoin(1,1),Some(CassData(1,1,2,t1-1-2))),
(ToJoin(2,2),Some(CassData(2,2,1,t2-2-1))),
(ToJoin(2,2),Some(CassData(2,2,2,t2-2-2))),
(ToJoin(3,3),None),
(ToJoin(4,4),None))
</pre>
<p>
Besides simple usage that was shown above, SCC provides more capabilities in the RDD API. For example, we can <a href="https://github.com/datastax/spark-cassandra-connector/blob/b2.5/doc/2_loading.md#performing-efficient-joins-with-cassandra-tables-since-12">repartition RDD that we join with data in Cassandra</a> to match partitioning of the data in Cassandra, so we can avoid non-local reads from Cassandra.
</p>
<p>
Also, we can use <a href="https://github.com/datastax/spark-cassandra-connector/blob/b2.5/doc/2_loading.md#cassandra-operations-on-a-cassandrajoinrdd">any function of the <code>CassandraRDD</code> API</a>, such as, <code>select</code>, <code>where</code>, <code>limit</code>, etc. For example, we can limit the number of the returned rows by putting the condition onto the <code>c2</code> column that will be applied to every partition (please note that it should be valid CQL expression!):
</p>
<div class="org-src-container">
<pre class="src src-scala"><span style="color: #a020f0;">case</span> <span style="color: #a020f0;">class</span> <span style="color: forestgreen;">ToJoin</span>(pk<span style="color: #a020f0;">:</span> <span style="color: forestgreen;">Int</span>, c1<span style="color: #a020f0;">:</span> <span style="color: forestgreen;">Int</span>)
<span style="color: #a020f0;">val</span> <span style="color: sienna;">toJoin</span> <span style="color: #a020f0;">=</span> sc.parallelize(<span style="color: darkcyan;">1</span> until <span style="color: darkcyan;">5</span>).map(x <span style="color: #a020f0;">=></span> <span style="color: darkcyan;">ToJoin</span>(x.toInt, x.toInt))
<span style="color: #a020f0;">val</span> <span style="color: sienna;">joined</span> <span style="color: #a020f0;">=</span> toJoin.leftJoinWithCassandraTable[<span style="color: darkcyan;">CassData</span>](<span style="color: #8b2252;">"test"</span>, <span style="color: #8b2252;">"jtest1"</span>)
.on(<span style="color: darkcyan;">SomeColumns</span>(<span style="color: #8b2252;">"pk"</span>, <span style="color: #8b2252;">"c1"</span>)).where(<span style="color: #8b2252;">"c2 > 1"</span>)
</pre>
</div>
<p>
so we get less data than in the previous example:
</p>
<pre class="example">scala> joined.collect
res55: Array[(ToJoin, Option[CassData])] = Array(
(ToJoin(1,1),Some(CassData(1,1,2,t1-1-2))),
(ToJoin(2,2),Some(CassData(2,2,2,t2-2-2))),
(ToJoin(3,3),None),
(ToJoin(4,4),None)
)
</pre>
<p>
The <code>limit(N)</code> call will return max N rows per Spark partition. While <code>perPartitionLimit(N)</code> will return max N rows per Cassandra partition. This is quite useful, for example, when we're doing joins with some "historical" data, where we have multiple rows per partition, but usually need to join with the latest entry for a given partition. For example, we may have a table that contains information about historical stock prices (sorted by timestamp in descending order):
</p>
<div class="org-src-container">
<pre class="src src-cql"><span style="color: #a020f0;">create table</span> <span style="color: blue;">test.stock_price</span> (
ticker <span style="color: forestgreen;">text</span>,
tm <span style="color: forestgreen;">timestamp</span>,
price <span style="color: forestgreen;">double</span>,
<span style="color: #a020f0;">primary key</span>(ticker, tm)
) <span style="color: #a020f0;">with clustering order by</span> (tm <span style="color: #a020f0;">desc</span>);
<span style="color: #a020f0;">insert into</span> test.stock_price (ticker, tm, price) <span style="color: #a020f0;"><br /> values</span> (<span style="color: #8b2252;">'MSFT'</span>, <span style="color: #8b2252;">'2020-07-25T10:00:00Z'</span>, 100.0);
<span style="color: #a020f0;">insert into</span> test.stock_price (ticker, tm, price) <span style="color: #a020f0;"><br /> values</span> (<span style="color: #8b2252;">'MSFT'</span>, <span style="color: #8b2252;">'2020-07-25T11:00:00Z'</span>, 101.0);
<span style="color: #a020f0;">insert into</span> test.stock_price (ticker, tm, price) <span style="color: #a020f0;"><br /> values</span> (<span style="color: #8b2252;">'MSFT'</span>, <span style="color: #8b2252;">'2020-07-25T12:00:00Z'</span>, 99.0);
<span style="color: #a020f0;">insert into</span> test.stock_price (ticker, tm, price) <span style="color: #a020f0;"><br /> values</span> (<span style="color: #8b2252;">'MSFT'</span>, <span style="color: #8b2252;">'2020-07-25T13:00:00Z'</span>, 97.0);
</pre>
</div>
<p>
For example, if we have data about stocks coming from some source. In this case we can join incoming data with latest prices for given shares, and perform some calculation:
</p>
<div class="org-src-container">
<pre class="src src-scala"><span style="color: #a020f0;">case</span> <span style="color: #a020f0;">class</span> <span style="color: forestgreen;">StockData</span>(ticker<span style="color: #a020f0;">:</span> <span style="color: forestgreen;">String</span>, currentPrice<span style="color: #a020f0;">:</span> <span style="color: forestgreen;">Double</span>)
<span style="color: #a020f0;">case</span> <span style="color: #a020f0;">class</span> <span style="color: forestgreen;">StockPrice</span>(ticker<span style="color: #a020f0;">:</span> <span style="color: forestgreen;">String</span>, tm<span style="color: #a020f0;">:</span> <span style="color: forestgreen;">java</span>.time.<span style="color: darkcyan;">Instant</span>, price<span style="color: #a020f0;">:</span> <span style="color: forestgreen;">Double</span>)
<span style="color: #a020f0;">val</span> <span style="color: sienna;">stocks</span> <span style="color: #a020f0;">=</span> sc.parallelize(<span style="color: darkcyan;">Seq</span>(<span style="color: darkcyan;">StockData</span>(<span style="color: #8b2252;">"MSFT"</span>, <span style="color: darkcyan;">100</span>), <span style="color: darkcyan;">StockData</span>(<span style="color: #8b2252;">"INTC"</span>, <span style="color: darkcyan;">200</span>)))
<span style="color: #a020f0;">val</span> <span style="color: sienna;">joined</span> <span style="color: #a020f0;">=</span> stocks.leftJoinWithCassandraTable[<span style="color: darkcyan;">StockPrice</span>](<span style="color: #8b2252;">"test"</span>, <span style="color: #8b2252;">"stock_price"</span>)
.on(<span style="color: darkcyan;">SomeColumns</span>(<span style="color: #8b2252;">"ticker"</span>)).perPartitionLimit(<span style="color: darkcyan;">1</span>)
</pre>
</div>
<p>
After execution, we can see that we pulled the latest price for Microsoft shares:
</p>
<pre class="example">scala> joined.collect
res37: Array[(StockData, Option[StockPrice])] = Array(
(StockData(MSFT,100.0),Some(StockPrice(MSFT,2020-07-25T13:00:00Z,97.0))),
(StockData(INTC,200.0),None)
)
</pre>
</div>
</div>
<div class="outline-4" id="outline-container-orgf0c36f0">
<h4 id="orgf0c36f0">Configuration options, optimizations, etc.</h4>
<div class="outline-text-4" id="text-1-3-3">
<p>
Spark Cassandra Connector has a number of configuration parameters that may affect execution of the joins. Some of the configuration parameters could be specified globally, via instance of the <code>ReadConf</code> class or via <code>option</code>, while others could be specified only as table option.
</p>
<p>
With <code>spark.cassandra.concurrent.reads</code> parameter we can control how many parallel requests will be sent per core when executing join (default: 512). For example we can change it to a lower value if we want to decrease the load to cluster from doing joins, although this will increase processing time.
</p>
<p>
Table-only options include (only for Dataframe API!):
</p>
<ul class="org-ul">
<li><code>directJoinSetting</code> with possible values <code>on</code> (always perform direct join), <code>off</code> (disable direct join), and <code>auto</code> - when SCC decides about use of direct join based on the specified threshold between size of the data to join, and data in Cassandra (default: <code>auto</code>)</li>
<li><code>directJoinSizeRatio</code> defines a threshold for switching to full scan (default: <code>0.9</code>)</li>
</ul>
</div>
</div>
<div class="outline-4" id="outline-container-org7ff1a77">
<h4 id="org7ff1a77"><span class="section-number-4"></span>More practical example</h4>
<div class="outline-text-4" id="text-1-3-4">
<p>
The previous sections showed the basic usage of the joins with data in Cassandra. This section is trying to show how to perform joins when processing streaming data. Following project (<a href="https://github.com/alexott/cassandra-dse-playground/tree/master/cassandra-join-spark">full source code</a>) demonstrates the use of joins with Dataframe & RDD APIs to perform enrichment of data coming from Kafka with data from Cassandra. In our case, we're getting from Kafka the information about stocks (stock ticker, timestamp, and price), and enrich that data with more information about specific stock, like, full company name, type company, stock exchange, etc. After enrichment, we just output data to the console, but the code could be adjusted to do something more useful with enriched data. To run the code, just follow instructions in <a href="https://github.com/alexott/cassandra-dse-playground/blob/master/cassandra-join-spark/README.md">README</a>.
</p>
<p>
The <a href="https://github.com/alexott/cassandra-dse-playground/blob/master/cassandra-join-spark/src/main/scala/com/datastax/alexott/demos/streaming/StockTickersJoinDataFrames.scala">implementation that uses Spark Structured Streaming</a> is very straightforward:
</p>
<ol class="org-ol">
<li>get data from Kafka</li>
<li>decode JSON payload</li>
<li>create dataframe for data in Cassandra (if we have "static" dataset in Cassandra, then we can use cache the data so they will be read only once)</li>
<li>perform join (we use <code>joined.explain</code> check that we got "Cassandra Direct Join")</li>
<li>output data to console</li>
</ol>
<div class="org-src-container">
<pre class="src src-scala"><span style="color: #7f7f7f;">// </span><span style="color: #7f7f7f;">1.</span>
<span style="color: #a020f0;">val</span> <span style="color: sienna;">streamingInputDF</span> <span style="color: #a020f0;">=</span> spark.readStream
.format(<span style="color: #8b2252;">"kafka"</span>)
.option(<span style="color: #8b2252;">"kafka.bootstrap.servers"</span>, kafkaServes)
.option(<span style="color: #8b2252;">"subscribe"</span>, topicName)
.load()
<span style="color: #7f7f7f;">// </span><span style="color: #7f7f7f;">2.</span>
<span style="color: #a020f0;">val</span> <span style="color: sienna;">parsed</span> <span style="color: #a020f0;">=</span> streamingInputDF.selectExpr(<span style="color: #8b2252;">"CAST(value AS STRING)"</span>)
.select(from_json($<span style="color: #8b2252;">"value"</span>, schema).as(<span style="color: #8b2252;">"stock"</span>))
.select(<span style="color: #8b2252;">"stock.*"</span>)
.withColumnRenamed(<span style="color: #8b2252;">"symbol"</span>, <span style="color: #8b2252;">"ticker"</span>)
<span style="color: #7f7f7f;">// </span><span style="color: #7f7f7f;">3.</span>
<span style="color: #a020f0;">val</span> <span style="color: sienna;">cassandra</span> <span style="color: #a020f0;">=</span> spark.read
.format(<span style="color: #8b2252;">"org.apache.spark.sql.cassandra"</span>)
.options(<span style="color: darkcyan;">Map</span>(<span style="color: #8b2252;">"table"</span> -> <span style="color: #8b2252;">"stock_info"</span>, <span style="color: #8b2252;">"keyspace"</span> -> <span style="color: #8b2252;">"test"</span>))
.load
<span style="color: #7f7f7f;">// </span><span style="color: #7f7f7f;">4.</span>
<span style="color: #a020f0;">val</span> <span style="color: sienna;">joined</span> <span style="color: #a020f0;">=</span> parsed.join(cassandra, cassandra(<span style="color: #8b2252;">"symbol"</span>) === parsed(<span style="color: #8b2252;">"ticker"</span>), <span style="color: #8b2252;"><br /> "left"</span>).drop(<span style="color: #8b2252;">"ticker"</span>)
joined.explain
<span style="color: #7f7f7f;">// </span><span style="color: #7f7f7f;">5.</span>
<span style="color: #a020f0;">val</span> <span style="color: sienna;">query</span> <span style="color: #a020f0;">=</span> joined.writeStream
.outputMode(<span style="color: #8b2252;">"update"</span>)
.format(<span style="color: #8b2252;">"console"</span>)
.start()
</pre>
</div>
<p>
And when we execute it then we can see the data printed to console, like this:
</p>
<pre class="example">+------------------+--------------------+------+----------+--------+--------------+--------------------+
| value| datetime|symbol|base_price|exchange| industry| name|
+------------------+--------------------+------+----------+--------+--------------+--------------------+
| 254.5442902345344|2020-07-14 14:03:...| ADBE| 253.0| NASDAQ| TECH| ADOBE SYSTEMS|
| 66.13761365408801|2020-07-14 14:03:...| LNC| 66.0| NYSE| FINANCIALS| LINCOLN NATIONAL|
| 37.18736354960266|2020-07-14 14:04:...| AAL| 37.0| NASDAQ|TRANSPORTATION|AMERICAN TRANSPOR...|
+------------------+--------------------+------+----------+--------+--------------+--------------------+
</pre>
<p>
The <a href="https://github.com/alexott/cassandra-dse-playground/blob/master/cassandra-join-spark/src/main/scala/com/datastax/alexott/demos/streaming/StockTickersJoinRDD.scala">implementation that uses RDD-based Spark Streaming</a> follows the same steps as previous example, although it is slightly more complicated, because it's doing more than dataframe-based implementation - it filters out entries for which we didn't find data in Cassandra, and prints only entries for which we have data in Cassandra:
</p>
<div class="org-src-container">
<pre class="src src-scala"><span style="color: #a020f0;">case</span> <span style="color: #a020f0;">class</span> <span style="color: forestgreen;">StockData</span>(symbol<span style="color: #a020f0;">:</span> <span style="color: forestgreen;">String</span>, timestamp<span style="color: #a020f0;">:</span> <span style="color: forestgreen;">Instant</span>, price<span style="color: #a020f0;">:</span> <span style="color: forestgreen;">Double</span>) <span style="color: #a020f0;"><br /> extends</span> <span style="color: forestgreen;">Serializable</span>
<span style="color: #a020f0;">case</span> <span style="color: #a020f0;">class</span> <span style="color: forestgreen;">StockInfo</span>(symbol<span style="color: #a020f0;">:</span> <span style="color: forestgreen;">String</span>, exchange<span style="color: #a020f0;">:</span> <span style="color: forestgreen;">String</span>, name<span style="color: #a020f0;">:</span> <span style="color: forestgreen;">String</span>, <br /> industry<span style="color: #a020f0;">:</span> <span style="color: forestgreen;">String</span>, base_price<span style="color: #a020f0;">:</span> <span style="color: forestgreen;">Double</span>) <span style="color: #a020f0;">extends</span> <span style="color: forestgreen;">Serializable</span>
<span style="color: #a020f0;">case</span> <span style="color: #a020f0;">class</span> <span style="color: forestgreen;">JoinedData</span>(symbol<span style="color: #a020f0;">:</span> <span style="color: forestgreen;">String</span>, exchange<span style="color: #a020f0;">:</span> <span style="color: forestgreen;">String</span>, name<span style="color: #a020f0;">:</span> <span style="color: forestgreen;">String</span>, <br /> industry<span style="color: #a020f0;">:</span> <span style="color: forestgreen;">String</span>, base_price<span style="color: #a020f0;">:</span> <span style="color: forestgreen;">Double</span>, timestamp<span style="color: #a020f0;">:</span> <span style="color: forestgreen;">Instant</span>, price<span style="color: #a020f0;">:</span> <span style="color: forestgreen;">Double</span>) <br /> <span style="color: #a020f0;">extends</span> <span style="color: forestgreen;">Serializable</span>
<span style="color: #a020f0;">val</span> <span style="color: sienna;">ssc</span> <span style="color: #a020f0;">=</span> <span style="color: #a020f0;">new</span> <span style="color: forestgreen;">StreamingContext</span>(sc, <span style="color: darkcyan;">Seconds</span>(<span style="color: darkcyan;">10</span>))
<span style="color: #7f7f7f;">// </span><span style="color: #7f7f7f;">....</span>
<span style="color: #a020f0;">val</span> <span style="color: sienna;">stream</span> <span style="color: #a020f0;">=</span> <span style="color: darkcyan;">KafkaUtils</span>.createDirectStream[<span style="color: darkcyan;">String</span>, <span style="color: darkcyan;">String</span>](
ssc, <span style="color: darkcyan;">PreferConsistent</span>, <span style="color: darkcyan;">Subscribe</span>[<span style="color: darkcyan;">String</span>, <span style="color: darkcyan;">String</span>](topics, kafkaParams)
)
<span style="color: #a020f0;">val</span> <span style="color: sienna;">parsedData</span> <span style="color: #a020f0;">=</span> stream.flatMap(x <span style="color: #a020f0;">=></span> parseJson(x.value()))
<span style="color: #a020f0;">val</span> <span style="color: sienna;">transformedData</span> <span style="color: #a020f0;">=</span> parsedData.transform(rdd <span style="color: #a020f0;">=></span> {
<span style="color: #a020f0;">val</span> <span style="color: sienna;">joined</span> <span style="color: #a020f0;">=</span> rdd.leftJoinWithCassandraTable[<span style="color: darkcyan;">StockInfo</span>](<span style="color: #8b2252;">"test"</span>, <span style="color: #8b2252;">"stock_info"</span>)
joined.persist()
<span style="color: #a020f0;">val</span> <span style="color: sienna;">missingInfoCount</span> <span style="color: #a020f0;">=</span> joined.filter(x <span style="color: #a020f0;">=></span> x._2.isEmpty).count()
<span style="color: #a020f0;">val</span> <span style="color: sienna;">stocksWithInfo</span> <span style="color: #a020f0;">=</span> joined.filter(x <span style="color: #a020f0;">=></span> x._2.isDefined)
<span style="color: #a020f0;">val</span> <span style="color: sienna;">existingInfoCount</span> <span style="color: #a020f0;">=</span> stocksWithInfo.count()
println(s<span style="color: #8b2252;">"There are </span><span style="color: sienna;">$missingInfoCount</span><span style="color: #8b2252;"> stock tickers without information in Cassandra"</span>)
println(s<span style="color: #8b2252;">"There are </span><span style="color: sienna;">$existingInfoCount</span><span style="color: #8b2252;"> stock tickers with information in Cassandra"</span>)
<span style="color: #a020f0;">val</span> <span style="color: sienna;">combined</span> <span style="color: #a020f0;">=</span> stocksWithInfo.map(x <span style="color: #a020f0;">=></span> {
<span style="color: #a020f0;">val</span> <span style="color: sienna;">i</span> <span style="color: #a020f0;">=</span> x._2.get
<span style="color: #a020f0;">val</span> <span style="color: sienna;">d</span> <span style="color: #a020f0;">=</span> x._1
<span style="color: darkcyan;">JoinedData</span>(i.symbol, i.exchange, i.name, i.industry, i.base_price, <br /> d.timestamp, d.price)
})
joined.unpersist()
combined
})
transformedData.foreachRDD(rdd <span style="color: #a020f0;">=></span> rdd.foreach(println))
ssc.start()
</pre>
</div>
<p>
and when it's running, we'll see on the console following messages:
</p>
<pre class="example">There are 0 stock tickers without information in Cassandra
There are 20 stock tickers with information in Cassandra
...
JoinedData(ESND,NASDAQ,ESSENDANT,WHOLESALERS,13.0,2020-07-14T16:19:19.588Z,13.483634952551117)
JoinedData(SWK,NYSE,STANLEY BLACK & DECKER,HOUSEHOLD PRODUCTS,128.0,2020-07-14T16:19:23.588Z,121.58327281753643)
JoinedData(BLK,NYSE,BLACKROCK,FINANCIALS,424.0,2020-07-14T16:19:24.588Z,394.7030616365362)
</pre>
</div>
</div>
<div class="outline-4" id="outline-container-org2c657c2">
<h4 id="org2c657c2"><span class="section-number-4"></span>Conclusion</h4>
<div class="outline-text-4" id="text-1-3-5">
<p>
Joining with data in Cassandra is a very convenient and fast method for data enrichment - with a small amount of code we can quickly pull necessary data from the database, and perform data processing based on the retrieved data.
</p>
</div>
</div>
Alex Otthttp://www.blogger.com/profile/13001951608173211050noreply@blogger.com0tag:blogger.com,1999:blog-6862508.post-59363572496279899082020-07-23T17:08:00.002+02:002020-07-23T17:11:28.237+02:00Using Apache Zeppelin to work with data in DSE via AlwaysOn SQL ServiceSince release 6.0
DataStax Enterprise (DSE) includes <a href="https://www.datastax.com/blog/2018/05/introducing-alwayson-sql-dse-analytics">AlwaysOn SQL Service (AOSS)</a> that allows to connect to DSE Analytics via JDBC or ODBC drivers and execute Spark SQL queries against data in DSE, or external sources, such as, data on DSEFS. AOSS is built on the top of the Spark Thrift Server, but has a number of improvements, such as, improved fault tolerance, support for advanced security features of DSE (for example, row-level access control), better caching of the data to improve response time on restarts, etc. Using AOSS people can use their favorite BI tools to access data stored in DSE, and this greatly simplifies work with that data.
<br />
<a href="https://zeppelin.apache.org/">Apache Zeppelin</a> has a <a href="https://zeppelin.apache.org/docs/0.9.0-preview1/interpreter/jdbc.html">dedicated interpreter</a> for accessing databases via JDBC and documentation contains all information on how to configure and use this interpreter, with examples for many popular databases, such as, PostgreSQL, MySQL, etc. JDBC interpreter also supports dynamic forms, and interpolation of variables to simplify creation of interactive & dynamic queries. So we can also use Apache Zeppelin to work with data in DSE via JDBC interpreter.
<br />
<div class="outline-4" id="outline-container-org407dbc9">
<h3 id="org407dbc9" style="text-align: left;">
Configuring Zeppelin to work with AOSS</h3>
<div class="outline-text-4" id="text-1-2-1">
To access data via AOSS we need to get a special version of JDBC driver that supports AOSS enhancements, such as, auto-discovery of AOSS instance, or reconnection to another server if AOSS fails. We need to get "Simba JDBC Driver for Apache Spark" from the <a href="https://downloads.datastax.com/#odbc-jdbc-drivers">corresponding section of DataStax download site</a>. (besides driver it makes sense to download the driver manual as well, as it describes all driver options). After the driver is downloaded, we need to unpack the archive to a place accessible by Zeppelin. Archive should contain a file with the name <code>"SparkJDBC41.jar</code><code>"</code>.
<br />
Now we can configure Zeppelin to connect to AOSS by going to <code>"</code>Interpreters<code>"</code> section in the top right drop-down that shows the user name. We can configure existing instance of the JDBC interpreter, but it's usually recommended to create a new interpreter based on the JDBC interpreter template for each type of the used database. Click <code>"</code>+Create<code>"</code> button, enter interpreter name, like <code>aoss</code> (it will be used to specify interpreter on the cell level, like, <code><code>"</code>%aoss</code><code>"</code>), and select <code>"</code><code>jdbc</code><code>"</code> in "Interpreter group" drop-down - this will load all existing properties of JDBC interpreter, that we can fill with information specific for AOSS:
<br />
<div class="separator" style="clear: both; text-align: center;">
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEg-XnS1NJWzpXQ9cueysxp5uJH0z3I46GVDEODJ4QVwR1mSgkjKLbKmsf6fK4Emj_L1MyQT__ni-7cjZV_RBbMU6KCUqOQpP67YNwo5wxo-IZvYsGRgyJQzUI40jtETbKR5QdMZDg/s1149/zeppelin-aoss-create-interpreter.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="378" data-original-width="1149" height="211" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEg-XnS1NJWzpXQ9cueysxp5uJH0z3I46GVDEODJ4QVwR1mSgkjKLbKmsf6fK4Emj_L1MyQT__ni-7cjZV_RBbMU6KCUqOQpP67YNwo5wxo-IZvYsGRgyJQzUI40jtETbKR5QdMZDg/w640-h211/zeppelin-aoss-create-interpreter.png" width="640" /></a></div>
We need to configure several things that are common for all AOSS installations:
<br />
<ul class="org-ul">
<li>we need to put full path to Simba JDBC driver into "artifact" field of "Dependencies" section (like, <code>/Users/ott/work/zeppelin/SparkJDBC41.jar</code>)</li>
<li>we need to put driver's class name (<code>com.simba.spark.jdbc41.Driver</code>) for configuration parameter <code>default.driver</code></li>
</ul>
We also must specify value for configuration parameter <code>default.url</code>. For AOSS, there are two ways to do it:
<br />
<ol class="org-ol">
<li>Explicitly specify host name or IP-address with port configured for AOSS (<code>10000</code> by default, configured by <code>alwayson_sql_options:thrift_port</code> setting in <code>dse.yaml</code>), like, <code>jdbc:spark://server:10000</code> - although this method works, but it's not optimal as it requires to know which of servers is running AOSS right now, and no connection to another server will happen in case of failover</li>
<li>Use auto-discovery functionality of the driver that relies on the meta-information published by every node of DSE Analytics (by default on the port <code>9077</code>, configured by <code>alwayson_sql_options:web_ui_port</code> setting in <code>dse.yaml</code>). In this case, the driver will automatically discover where the instance of AOSS is running, and also perform connection to the new node if the current node fails. For this case, URL looks as following: <code>jdbc:spark://AOSSStatusEndpoints=server1:9077,server2:9077;</code> (we can specify any number of nodes as parameter)</li>
</ol>
We can pass additional driver options by adding them to the URL. Refer to the driver documentation for a list of the available options. We can also configure other Zeppelin parameters, but we can leave them with default values. After everything is configured, press "Save" to save changes (I removed not necessary parameters to make screenshot smaller):
<br />
<div class="separator" style="clear: both; text-align: center;">
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEiAynb9RptHNGj5_20K4tHjwuyfTjFDnLm3aObpu8JZU6Ik-LkKCiPVm42-ANHVWFTtMXmCUB_tPH6enlrzNM4hbOrqmYONEXuygaU_3Yr-Q4yNSTjsGBiWPyQiUhMNgF4fVjH4xA/s1135/zeppelin-aoss-configure.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="411" data-original-width="1135" height="232" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEiAynb9RptHNGj5_20K4tHjwuyfTjFDnLm3aObpu8JZU6Ik-LkKCiPVm42-ANHVWFTtMXmCUB_tPH6enlrzNM4hbOrqmYONEXuygaU_3Yr-Q4yNSTjsGBiWPyQiUhMNgF4fVjH4xA/w640-h232/zeppelin-aoss-configure.png" width="640" /></a></div>
</div>
</div>
<div class="outline-4" id="outline-container-org0d15d39">
<h3 id="org0d15d39" style="text-align: left;">
Usage</h3>
<div class="outline-text-4" id="text-1-2-2">
After we create the interpreter, we can start to use it either in the new notebooks, or in existing ones. We can configure interpreter on the notebook level when creating it, or we can put <code>%interpreter_name</code> at the beginning of the cell, to indicate that we're using a specific interpreter.
<br />
And everything that we need to do now - just issue Spark SQL queries, and wait for results, like this:
<br />
<div class="separator" style="clear: both; text-align: center;">
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEh6O8ShXlLn6Vw4dqtWSYGIhib8gLBSgzVubCnomJQ6x2NULyqw_feBZBvVgJlmobHetIs0Rb1SAclUXJCLOXrG3reCsRSqGOXlvj0nMGmXOCPfxvbiV14I8nBqDgyNuoWPjmgETg/s1153/zeppelin-aoss-query.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="546" data-original-width="1153" height="304" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEh6O8ShXlLn6Vw4dqtWSYGIhib8gLBSgzVubCnomJQ6x2NULyqw_feBZBvVgJlmobHetIs0Rb1SAclUXJCLOXrG3reCsRSqGOXlvj0nMGmXOCPfxvbiV14I8nBqDgyNuoWPjmgETg/w640-h304/zeppelin-aoss-query.png" width="640" /></a></div>
We can check that the same data is available via CQL (don't wonder about syntax - this table has DSE Search index):
<br />
<div class="separator" style="clear: both; text-align: center;">
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjcX3GByGHTcIgvZNZ2wvtzkOTqASo2D0vQpg-ca4l6-J2x9qKI5nhd1Pe6ua4rke31n5ptlpyAYBcxKaegdcKe_jKq-d9H887KxqP0eupRJlZx8zF4IMKhFls0cRh1mcm4K4afBQ/s1108/zeppelin-aoss-query-cql.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="297" data-original-width="1108" height="172" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjcX3GByGHTcIgvZNZ2wvtzkOTqASo2D0vQpg-ca4l6-J2x9qKI5nhd1Pe6ua4rke31n5ptlpyAYBcxKaegdcKe_jKq-d9H887KxqP0eupRJlZx8zF4IMKhFls0cRh1mcm4K4afBQ/w640-h172/zeppelin-aoss-query-cql.png" width="640" /></a></div>
We can also use all available visualizations, including additional, like <i>geospark-zeppelin</i>, that is installed from the Helium registry:
<br />
<div class="separator" style="clear: both; text-align: center;">
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgy_bX-TTrOA2iWaA8M-qDE_I1t77or8NMqDUOq7sKfXvVrMOYu42EldBG57h8Iy-dzgUsw5nX5YWiLB8AhPB0FTv_4s-EM7JAUUXX-lGJhGiPHNa6kB3fLLE1dUC6Ffc59fHDVtA/s1111/zeppelin-aoss-query-geomap.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="449" data-original-width="1111" height="258" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgy_bX-TTrOA2iWaA8M-qDE_I1t77or8NMqDUOq7sKfXvVrMOYu42EldBG57h8Iy-dzgUsw5nX5YWiLB8AhPB0FTv_4s-EM7JAUUXX-lGJhGiPHNa6kB3fLLE1dUC6Ffc59fHDVtA/w640-h258/zeppelin-aoss-query-geomap.png" width="640" /></a></div>
<h3 style="text-align: left;">
Conclusion</h3>
This post demonstrates flexibility and ease of use of the Apache Zeppelin when working with different technologies, such as databases, etc.<br />
P.S. this post was written using Zeppelin 0.9.0-preview1</div>
</div>
Alex Otthttp://www.blogger.com/profile/13001951608173211050noreply@blogger.com0tag:blogger.com,1999:blog-6862508.post-69668086137913044682020-06-23T12:01:00.001+02:002020-06-23T13:02:06.897+02:00Working with DataStax Astra from Apache Zeppelin<a href="https://zeppelin.apache.org/">Apache Zeppelin</a> is very powerful web-based environment for collaborative work with very good support for the big data technologies and databases, such as, Spark, Flink, Cassandra, and many others. Apache Zeppelin 0.9.0 will include a lot of changes for Cassandra interpreter:<br />
<ul>
<li>migration to the <a href="https://docs.datastax.com/en/developer/java-driver/latest/">new, unified DataStax Java driver</a> that brings more performance & stability, and also support for DSE-specific functionality, such as, geospatial types</li>
<li>flexible formatting of results - we can output data in CQL, or human-readable formats, format time/date-related columns using custom patterns, control formatting of floating point numbers, etc. All of this could be configured on interpreter and/or cell level</li>
<li>ability to change any configuration parameter of the Java driver</li>
</ul>
The last item is the most important one for connecting to <a href="https://astra.datastax.com/">DataStax Astra<a/> (Cassandra as a Service from DataStax) - we can specify the path to secure connect bundle, and get access to our Astra instance.<br />
<br />
Right now, there is no precompiled version of Zeppelin with these changes available, so you will need to compile Zeppelin from sources. After compilation is done, start Zeppelin, and open in web browser default Zeppelin address: http://localhost:8080/.<br /><br />
We can configure Cassandra interpreter directly to work with Astra, but often it's better to create a separate interpreter (like is shown on the picture below):<br />
<ul>
<li>go to the "Interpreter" menu (in drop down in the top right corner), and there click "Create"</li>
<li>enter the name of the interpreter (<i>astra</i>), and select the <i>cassandra</i> in the interpreter group dropdown</li>
<li>enter the username/password in the <tt>cassandra.credentials.username</tt> & <tt>cassandra.credentials.password properties</tt></li>
<li>clear the value of the <tt>cassandra.hosts</tt> property (this is temporary workaround until it's fixed on the driver level)</li>
<li>change the value of <tt>cassandra.query.default.consistency</tt> to <tt>LOCAL_QUORUM</tt>, and <tt>cassandra.query.default.serial.consistency</tt> to <tt>LOCAL_SERIAL</tt> as <a href="https://docs.datastax.com/en/astra/aws/doc/dscloud/astra/dscloudDatabaseConditions.html">Astra requires this to perform write operations</a></li>
<li>optionally change the value of <tt>cassandra.keyspace</tt> to the name of keyspace that was created in Astra</li>
<li>add a property with name <tt>datastax-java-driver.basic.cloud.secure-connect-bundle</tt> and value of the path to the secure bundle
save the interpreter - this enables connection to Astra</li>
</ul>
<div class="separator" style="clear: both; text-align: center;">
</div>
<div class="separator" style="clear: both; text-align: center;">
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhl-DNGbzzbfZ08w0JpgkW-80xaglBILdXMuCUmnN1tu_hSSGrE9fjmpKASE7vFdPXgaJbQkSwUmeSiIcEovkB9-c3IhY8AyliLXbFGclCrzfGbqowfruxyCXewqBHpN8-U65mFbQ/s1600/Screen+Shot+2020-06-23+at+09.06.19.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="888" data-original-width="1233" height="576" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhl-DNGbzzbfZ08w0JpgkW-80xaglBILdXMuCUmnN1tu_hSSGrE9fjmpKASE7vFdPXgaJbQkSwUmeSiIcEovkB9-c3IhY8AyliLXbFGclCrzfGbqowfruxyCXewqBHpN8-U65mFbQ/s640/Screen+Shot+2020-06-23+at+09.06.19.png" width="800" /></a></div>
Using the new interpreter is easy:
<ul>
<li>Click "Create new note"
Enter the note name, and select <i>astra</i> as default interpreter.</li>
</ul>
<div class="separator" style="clear: both; text-align: center;">
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEibNjhyphenhyphentBOeOTlKCH5xBGk7XZc8-EeesMD8hKP7bKE_nbuLYJEQrbp6K9GS_C88zhkA4AhjalOMrJC8HYLzTtC_QmS3FkBaRrh4oiMGhX2hy4Qc_ullcGqw6j1Boi_BT8hJxQiicg/s1600/Screen+Shot+2020-06-23+at+09.08.39.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="458" data-original-width="746" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEibNjhyphenhyphentBOeOTlKCH5xBGk7XZc8-EeesMD8hKP7bKE_nbuLYJEQrbp6K9GS_C88zhkA4AhjalOMrJC8HYLzTtC_QmS3FkBaRrh4oiMGhX2hy4Qc_ullcGqw6j1Boi_BT8hJxQiicg/s400/Screen+Shot+2020-06-23+at+09.08.39.png" /></a></div>
<ul>
<li>Start to execute commands, for example, execute <tt>describe cluster;</tt> that should show something like this:</li>
</ul>
<div class="separator" style="clear: both; text-align: center;">
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhdMi7IDnUCBBpZLD7Dl_X_11i1Ct1OeKf67Oi3G_kjwAdghI54aeB0VyLn9lPdGxXv8iA3zE2v5sIkMgt4OUjZA6rGlhdYTsONDPNrJyp1jaNFAO2ZSMIPq0etFkjKegr9T-ARng/s1600/Screen+Shot+2020-06-23+at+09.14.54.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="508" data-original-width="1390" height="292" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhdMi7IDnUCBBpZLD7Dl_X_11i1Ct1OeKf67Oi3G_kjwAdghI54aeB0VyLn9lPdGxXv8iA3zE2v5sIkMgt4OUjZA6rGlhdYTsONDPNrJyp1jaNFAO2ZSMIPq0etFkjKegr9T-ARng/s640/Screen+Shot+2020-06-23+at+09.14.54.png" width="800" /></a></div>
<ul>
<li>Create table, insert data, and select them:</li>
</ul>
<div class="separator" style="clear: both; text-align: center;">
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjaK_JBvAnImhTLqnOmIlHfRm6TAx_Oi-Opc8appgyyXhyphenhyphenqZfFtT49X-OF2yUYB0FPAWnf13uZykS91z4BlPFk5g3JpJcINSZzOScR3ylDpjbFTTKR53w-neenmQ-GkBs_igdxtyg/s1600/Screen+Shot+2020-06-23+at+11.50.28.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="699" data-original-width="1138" height="491" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjaK_JBvAnImhTLqnOmIlHfRm6TAx_Oi-Opc8appgyyXhyphenhyphenqZfFtT49X-OF2yUYB0FPAWnf13uZykS91z4BlPFk5g3JpJcINSZzOScR3ylDpjbFTTKR53w-neenmQ-GkBs_igdxtyg/s640/Screen+Shot+2020-06-23+at+11.50.28.png" width="800" /></a></div>
Please note that the new interpreter inherits all functionality of the base interpreter, for example, it's possible to specify formatting options, like this (formatting using German locale, for German timezone):<br />
<div class="separator" style="clear: both; text-align: center;">
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEh1YTPsXBENQ3Ly1TM2HB3tq0PQSiqvG5mPcGuQx8VTWQt8A0VF7xnaH7pFxAN6xJE6krn-rnxjdUu_dVdqFa6EsZgmRFsRZ6Ltcndf-XRUssh_mnbXgDsfZu6hAGw4NfNiYd_7Tg/s1600/Screen+Shot+2020-06-23+at+10.41.28.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="448" data-original-width="1070" height="335" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEh1YTPsXBENQ3Ly1TM2HB3tq0PQSiqvG5mPcGuQx8VTWQt8A0VF7xnaH7pFxAN6xJE6krn-rnxjdUu_dVdqFa6EsZgmRFsRZ6Ltcndf-XRUssh_mnbXgDsfZu6hAGw4NfNiYd_7Tg/s640/Screen+Shot+2020-06-23+at+10.41.28.png" width="800" /></a></div>
Alex Otthttp://www.blogger.com/profile/13001951608173211050noreply@blogger.com0tag:blogger.com,1999:blog-6862508.post-31025402818386060242020-06-05T17:39:00.001+02:002020-12-31T14:47:55.629+01:00Using Delta Lake with DSE Analytics & DSEFS <p>
<a href="https://delta.io/">Delta Lake</a> is an interesting technology from the Databricks. It provides ACID transactions, data versioning, and many other features. It also could be used as both sink and source for streaming jobs, making it's possible to build complex data processing pipelines. I was planning to play with it long time ago, but I've found time only recently.
</p>
<p>
Because I'm working at DataStax, I was trying to use Delta Lake with DataStax Enterprise (DSE) that includes DSE Analytics (based on Spark), and implementation of distributed file system (DSEFS) that is compatible with Hadoop file system APIs. Until DSE 6.8 release, it wasn't possible directly use Delta Lake with DSE Analytics, because it was based on the Spark 2.2, and Delta Lake required Spark 2.4 that became a base for DSE Analytics only in 6.8.0 release.
</p>
<p>
Delta Lake works out of box with DSE 6.8 - we just need to include it into application, for example, by starting Spark shell:
</p>
<div class="org-src-container">
<pre class="src src-shell">dse spark --packages io.delta:delta-core_2.11:0.6.1
</pre>
</div>
<p>
and then we can use it, just as other input or output format in the DataFrame API:
</p>
<div class="org-src-container">
<pre class="src src-scala"><span style="color: #a020f0;">import</span> org.apache.spark.sql.cassandra.<span style="color: #a020f0;">_</span>
<span style="color: #a020f0;">val</span> <span style="color: #a0522d;">data</span> <span style="color: #a020f0;">=</span> spark.read.cassandraFormat(<span style="color: #008b00;">"vehicle"</span>, <span style="color: #008b00;">"datastax"</span>).load()
data.write.format(<span style="color: #008b00;">"delta"</span>).mode(<span style="color: #008b00;">"append"</span>).save(<span style="color: #008b00;">"data.delta"</span>)
<span style="color: #7f7f7f;">// </span><span style="color: #7f7f7f;">read data back & count</span>
<span style="color: #a020f0;">val</span> <span style="color: #a0522d;">data2</span> <span style="color: #a020f0;">=</span> spark.read.format(<span style="color: #008b00;">"delta"</span>).load(<span style="color: #008b00;">"data.delta"</span>)
data2.count
</pre>
</div>
<p>
and we can see files created on DSEFS:
</p>
<pre class="example">
dsefs dsefs://10.101.35.23:5598/ > ls
data.delta tmp
dsefs dsefs://10.101.35.23:5598/ > ls data.delta/
_delta_log
part-00000-4d550fdd-e61c-489c-933e-640fb774f70e-c000.snappy.parquet
part-00001-b33bd4fd-1746-49e7-aee3-08f9fc90608d-c000.snappy.parquet
part-00002-df596490-79b4-4211-82c0-74f7019e9f2a-c000.snappy.parquet
part-00003-d39476fa-1424-4383-ac93-de71cfbc72d9-c000.snappy.parquet
part-00004-a22e4d79-755f-4107-92ae-2c8472401b04-c000.snappy.parquet
part-00005-b3c66412-b2ca-43f4-a650-251569176a00-c000.snappy.parquet
</pre>
<p>
Streaming example also just works, I can push the data from external source in one job, and subscribe to the changes in another job.
</p>
<p>
More interesting for me was to try to use Delta Lake with older versions of DSE, like, 6.0 or 6.7 that are used by many customers, as 6.8.0 was released just 2 months ago. I couldn't use Delta Lake directly with DSE Analytics, because it's not binary compatible with Spark 2.2 that is used in these versions. But I should be able to use external Spark 2.4.x and store data on DSEFS.
</p>
<p>
To access DSEFS from external Spark I need to use a special jar (so called <a href="https://docs.datastax.com/en/dse/6.7/dse-admin/datastax_enterprise/spark/byosOverview.html">Bring-Your-Own-Spark, BYOS</a>) that contains DataStax's version of Spark Cassandra Connector, plus DSEFS client library. I also need to specify properties to connect to the DSE - it's better to use a <a href="https://docs.datastax.com/en/dse/6.7/dse-admin/datastax_enterprise/spark/byosGeneratingConfigFile.html">separate tool for generation of the file with properties</a> (in my case it's called <code>byos.properties</code>).
</p>
<p>
Starting the Spark 2.4 with following command gives me access to both Delta Lake & DSE BYOS packages (DSE BYOS is already in my local Maven repository):
</p>
<div class="org-src-container">
<pre class="src src-shell">bin/spark-shell --properties-file ../byos.properties <span style="color: #008b00;">\</span>
--packages io.delta:delta-core_2.11:0.6.0,com.datastax.dse:dse-byos_2.11:6.8.0 <span style="color: #008b00;">\</span>
--master <span style="color: #008b00;">'local[*]'</span>
</pre>
</div>
<p>
But if I try to execute commands from above in the Spark shell, I'll get the following error when trying to save data to Delta Lake:
</p>
<pre class="example">
20/06/05 17:05:00 ERROR HDFSLogStore: The error typically occurs when the default LogStore implementation, that
is, HDFSLogStore, is used to write into a Delta table on a non-HDFS storage system.
In order to get the transactional ACID guarantees on table updates, you have to use the
correct implementation of LogStore that is appropriate for your storage system.
See https://docs.delta.io/latest/delta-storage.html " for details.
org.apache.hadoop.fs.UnsupportedFileSystemException: fs.AbstractFileSystem.dsefs.impl=null: No AbstractFileSystem configured for scheme: dsefs
at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:160)
at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:249)
at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:334)
</pre>
<p>
Looking onto the error message, it's clear that Delta Lake is trying to use implementation of <code>AbstractFileSystem</code> interface, and it's not available. Checking the <code>byos.properties</code> file, we can see that it has only following property:
</p>
<pre class="example">
spark.hadoop.fs.dsefs.impl=com.datastax.bdp.fs.hadoop.DseFileSystem
</pre>
<p>
and this class implements only <code>FileSystem</code> interface, but not <code>AbstractFileSystem</code>. Only by talking with developers it was found that there is an implementation of <code>AbstractFileSystem</code> as well, but its declaration wasn't exported by the tool that generates the configuration file for BYOS (this bug should be fixed in the next versions of DSE). So to fix the problem, it's just enough to add corresponding line to the <code>byos.properties</code> file:
</p>
<pre class="example">
spark.hadoop.fs.AbstractFileSystem.dsefs.impl=com.datastax.bdp.fs.hadoop.DseFs
</pre>
<p>
and check that everything works now, including streaming operations.
</p>
Alex Otthttp://www.blogger.com/profile/13001951608173211050noreply@blogger.com0tag:blogger.com,1999:blog-6862508.post-40852594750767581152019-12-31T17:04:00.000+01:002019-12-31T17:04:15.420+01:002019th in review<p>
From professional perspective this year was a continuation of the last year - I'm still working in DataStax, helping our customers to build solutions on top of our products based on Apache Cassandra. I managed to make big jump from solutions architect to senior solutions architect, and then to principal architect. The new role brought new responsibilities & challenges (I did them anyway ;-) besides direct work with customers - improving our internal processes to make them more effective, mentoring of new team members, etc. Also, I was trying to keep myself closer to keyboard, providing small patches (and not small as well) to our products, developing different tools for use by our customers & internally, creating examples, etc.
</p>
<p>
And keeping myself closer to keyboard continued on the open source front - I tried to contribute more to different projects, in form of patches, issues, and code reviews. Most of contributions were to Apache Zeppelin - improving Cassandra interpreter, fixing bugs, etc.
</p>
<p>
From travel perspective, I travelled (for work and vacation) almost 52 thousand kilometers, and visited 9 countries (some of them several times). But most of the travels were in Germany, and if I complained about Deutsche Bahn last year, this year it was much worse, like, cancellation of the train half hour before the trip 3 weeks ago, so instead of 4 hour trip, I spent almost 8 trying to reach home from the south of Germany on my last work day before vacation.
</p>
<p>
This year I set for myself an ambitious goal of reading 55 books, but <a href="https://www.goodreads.com/user/year_in_books/2019/635213">managed to read only 46</a> (similar to 2018th 45 books), not counting numerous papers and a lot of documentation… I think that for next year, the goal will be the same - 55 books, and let see if I can manage it.
</p>
<p>
Although I wanted to spend more time on blogging, publicly I wrote only <a href="http://alexott.blogspot.com/2019/09/some-travel-tips.html">post with some travel tips in my own blog</a>, and the <a href="https://www.datastax.com/blog/2019/07/using-object-mapper-scala">post on use of Object Mapper with Scala</a> for the company's blog. But there are much more internal writings, and some of them could be published publicly as well - they just need to be polished.
</p>
<p><b>I wish all my readers a Happy New Year!</b></p>
Alex Otthttp://www.blogger.com/profile/13001951608173211050noreply@blogger.com0tag:blogger.com,1999:blog-6862508.post-69056901249585187462019-09-23T18:27:00.000+02:002019-09-23T21:36:32.675+02:00Some travel tipsAfter I changed the job almost two years ago, I started to travel much more than before (although I travelled a lot in 2015-2017 as well). As result of this, I've started to build some habits to make the travel more comfortable, and easier. This list of tips was sitting in my notes for a long time, waiting until I write this blog post.
<br />
Some of the tips are applicable only for specific airlines, or only for long range flights, so they are listed in arbitrary order:
<br />
<ul class="org-ul">
<li>Carry on a bottle for water - you can help with eliminating not necessary plastic waste by using your own bottles. Usually US airports have drinking fountains, and special automates for filling the bottles. In Germany it's also not a problem to get water from tap, but in UK it's usually a problem because the water is usually warm, and not all airports have drinking fountains (Heathrow 5 is the worst - they had 4 fountains on plan, but only one is working). But sometime it's solvable - for example, in London city, you can ask to fill you bottle in any restaurant;</li>
<li>Get a window seat, especially for a long flights - you won't be disturbed when your neighbors decided to go to toilet. Plus, on most of planes you can lean to the side to sleep (I hate A380 for having too much space between window and seat);</li>
<li>Good pillow is also important for long flights. I'm using <a href="https://amzn.to/2mBcZxa">following one</a> because it allows me to sleep even without leaning to the side;</li>
<li>Compression socks are also important for long range flights (or trains), when you sit for many hours without much movement;</li>
<li>Order special meal, when it's available - on the long range flights, the special meals are usually delivered separately, long before the standard meals, so you have a chance to eat & fall asleep much earlier;</li>
<li>When radically changing time zones, like, going to US west coast, use Melatonin for better regulation of your flight (although it doesn't help to everyone);</li>
<li>Use a small bag to store all connectors, adapters, chargers, etc. I'm using a small toiletry bag that fits perfectly into my backpack.</li>
</ul>
I would be happy to hear tips from experienced travelers.<br />
<br />
Update:<br />
<ul>
<li><a href="https://twitter.com/spyced/status/1176183607829905408?s=20">Hint from Jonathan Ellis</a>: buy double set of adapters for everything - I personally use 2 universal adapters with multiple USB outputs...</li>
<li>From <a href="https://twitter.com/ifesdjeen/status/1176215016518889472?s=20">Alex Petrov</a>: Bum bug for mobile phone and passport </li>
</ul>
Alex Otthttp://www.blogger.com/profile/13001951608173211050noreply@blogger.com0tag:blogger.com,1999:blog-6862508.post-8819357183256223952018-12-31T17:28:00.002+01:002019-01-02T09:21:46.386+01:00Looking back to 2018th<div class="outline-text-3" id="text-1-1">
<p>
The 2018th was quite intensive year for me because of the multiple reasons, so I decided to write a blog post after long time not posting anything.
</p>
</div>
<div id="outline-container-org840d544" class="outline-4">
<h3 id="org840d544">New job</h3>
<div class="outline-text-4" id="text-1-1-1">
<p>
I'd started the new year at new company. After 11 years in McAfee, working primarily in development (developer/architect), I switched to the customer-facing role of solution architect (post-sales) at <a href="https://www.datastax.com/">DataStax</a> (the company behind Apache Cassandra and related projects). Working with customers isn't new for me - I did work a lot with them when I lived in Moscow, but it was different environment. My new job requires quite broad knowledge in different areas - different customers have different tasks - some need advice on project's architecture, data modelling, and code reviews, some need help with deployment & preparing project to production, troubleshooting of problems in production, etc. Also, customers are different in how the services are provided - for some I need to produce report(s) with my findings/recommendations, and then their admins & developers are making changes, while for some customers, I'm more like a part of the scrum team, with ability to make changes in the code & configurations directly, although it's still involves a lot of writing.
</p>
<p>
Besides been an adviser to customers, my role is also be a bridge between customers & our product management/development teams - I'm helping translate customer's problems and requirements for new functionality into JIRAs (in case of errors), items in the product's roadmaps, improvements in documentation, etc. And I even managed to make (although it's not a my direct role) a few small pull requests against our code that were included into official releases ;-)
</p>
<p>
Technology-wise, I'm working not only with Cassandra that is the base for <a href="https://www.datastax.com/products/datastax-enterprise">DataStax Enterprise (DSE)</a>, but also with Apache Solr that is base for DSE Search, and Apache Spark that is base for DSE Analytics, but all components were quite heavily modified to make faster & more reliable. <a href="https://www.datastax.com/products/datastax-enterprise-graph">DSE Graph</a> is very interesting part of our stack, but I unfortunately didn't have a chance to work much with. From programming language perspective, I worked with Java, Scala, and Python, plus some prototypes in Clojure. Also, because the projects don't exist in vacuum, I need to help people integrate with different other components, so I had a chance to work with Kafka & Kafka Streams, DC/OS, different monitoring systems (such as, DataDog and Prometheus), Ansible, and many other.
</p>
<p>
DataStax is great company for many reasons:
</p>
<ul class="org-ul">
<li>amazing people - very knowledgeable, highly-qualified, energetic, collaborative and responsive. I'm learning new stuff every day, getting help from different teams, and also trying to give back;</li>
<li>it's really remote company - people are located everywhere in the world (although we have bigger sites, like, Santa Clara, London, etc.), and company foster its remote culture. It's seen from all aspects, starting with regular all-hands meetings where presenters are sitting alone, and "talking to you", people are trying to adjust their schedule to include people from other regions, etc. (You can read <a href="https://hbr.org/sponsored/2018/06/5-practical-ways-to-engage-a-geographically-distributed-workforce">an article from our CEO about building remote company</a>);</li>
<li>although it's already quite big company (> 500 people), the pace of changes is still high - new features are added very quickly;</li>
<li>people are recognized - there are company & team level recognition awards for persons, that are publicly announced. It's also rewarding to see your personal impact on the product(s) and success of company.</li>
</ul>
<p>
And <a href="http://myjob.io/g6os2">we're hiring for different positions</a> development, field team, etc. (reach me via email if you're interested - alexott at gmail).
</p>
</div>
</div>
<div id="outline-container-org0db48fa" class="outline-4">
<h3 id="org0db48fa">Open source/side projects</h3>
<div class="outline-text-4" id="text-1-1-2">
<p>
This year I had more possibilities to work on the side projects, including open source projects, although attention has changed more to the Cassandra/DSE-related stuff - extending/fixing documentation for DataStax Cassandra drivers, adding new functionality, building prototypes, etc. - most of the stuff is available in <a href="http://github.com/alexott/">my GitHub repository</a>. Also, my interest returned to the Apache Zeppelin, so I did some extensions to it, although not everything is went into upstream (yet):
</p>
<ul class="org-ul">
<li>extended the Cassandra interpreter with DSE-specific functionality, such as, support for DSE-specific types, DSE Search's commands, authentication;</li>
<li>started development of <a href="https://github.com/alexott/zeppelin/tree/ZEPPELIN-3548-KSQL">interpreter for Confluent's KSQL</a> - basic functionality works, although I need to spend more time polishing it - using template engine instead of hard-coding HTML, add support for HTTPS, etc.</li>
</ul>
<p>
Also I did find a time to extend <a href="http://planet.clojure.in/">Planet Clojure</a> with feature that was planned long time ago - direct integration with Twitter instead of relying on the 3rd party services that are often not flexible. As result of relatively small changes, right now it's possible to cross-post to Twitter with attribution to real authors of the blog posts.
</p>
<p>
During the year, I also worked with 2 my former McAfee/Intel colleagues, <a href="https://twitter.com/EdwardDixon3">Edward Dixon</a> & <a href="https://twitter.com/damilarefagbemi">Damilare D. Fagbemi</a>, writing an <a href="https://medium.com/@damilare/pirating-ai-800a8da6431b">article on pirating machine learning models from cloud services</a>. That was interesting joint work that didn't require too much time for coding, but we were able to get quite good results.
</p>
<p>
I've also spent quite significant amount of time answering the DSE/Cassandra-related questions on the StackOverflow & other StackExchange sites. This allows me better understand what problems people have using Cassandra or DSE (and related technologies, like, Spark), and what we can do to make better developer's experience - quite often this leads to filing a new JIRAs, adding clarifications to documentation, etc. As result, for this year I've added around 8k points to <a href="https://stackoverflow.com/users/18627/alex-ott">my SO reputation</a>, and made to top 0.32% SO users for this year, with ~370 answers about Cassandra:
</p>
<p>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjWppkPQ7UgDHgPcmpi3eg-2gcUddPQE6liY8D0POtf_B6zOfkifOK_05qf80wn7cqB5GOHJZqTo_MDKsmt3iPGsspF8CM38TLNBK2OHymHwZtJZM4DMPOO8wBvSSocmiwa5fDRGg/s1600/%25D0%25A1%25D0%25BD%25D0%25B8%25D0%25BC%25D0%25BE%25D0%25BA+%25D1%258D%25D0%25BA%25D1%2580%25D0%25B0%25D0%25BD%25D0%25B0+2018-12-31+%25D0%25B2+11.52.22.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjWppkPQ7UgDHgPcmpi3eg-2gcUddPQE6liY8D0POtf_B6zOfkifOK_05qf80wn7cqB5GOHJZqTo_MDKsmt3iPGsspF8CM38TLNBK2OHymHwZtJZM4DMPOO8wBvSSocmiwa5fDRGg/s320/%25D0%25A1%25D0%25BD%25D0%25B8%25D0%25BC%25D0%25BE%25D0%25BA+%25D1%258D%25D0%25BA%25D1%2580%25D0%25B0%25D0%25BD%25D0%25B0+2018-12-31+%25D0%25B2+11.52.22.png" /></a></div>
</p>
<p>
Similarly, I was quite active on the <a href="https://academy.datastax.com/slack">DataStax Academy's Slack</a> channels, together with other people from DataStax helping users to find answers to their questions - that also helped to understand what are the typical problems our users have, and how we can improve our stuff.
</p>
</div>
</div>
<div id="outline-container-org68c4670" class="outline-4">
<h3 id="org68c4670">Books read</h3>
<div class="outline-text-4" id="text-1-1-3">
<p>
This year I read a lot, but not only books - documentation on our products, and products that we integrate with, different papers, etc. But this year I managed to complete my <a href="https://www.goodreads.com/user_challenges/10755766">reading challenge</a> first time in last 4 years, although the last book was just finished :-)
</p>
<p>
Big part of reading was science fiction, just to free the brain from technical stuff, but I can recommend several great technical books that I read this year:
</p>
<ul class="org-ul">
<li><a href="https://www.goodreads.com/book/show/34681140-spark">Spark. The definitive guide</a> - very good book on Spark with focus on the Spark SQL stuff, in contrast to most of other books that were concentrated on RDDs;</li>
<li><a href="https://www.goodreads.com/book/show/34921247-kafka-streams-in-action">Kafka Streams in Action</a> - good introduction into Kafka Streams, very accessible, but providing a good foundation for its use;</li>
<li><a href="https://www.goodreads.com/book/show/34013922-kubernetes-in-action">Kubernetes in Action</a> - very detailed introduction into Kubernetes that not only allows quickly start use it, but also covers complex topics.</li>
</ul>
<p>
In the next year I would try to decrease the pile of the books in the "To Read" queue, but it's growing too fast :-)
</p>
</div>
</div>
<div id="outline-container-org451ce2d" class="outline-4">
<h3 id="org451ce2d">Travels</h3>
<div class="outline-text-4" id="text-1-1-4">
<p>
Because I'm now in the customer-facing role, I'm traveling much more than before. This year I did 2 rounds about Earth (> 80,000 km):
</p>
<p>
<div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEifGVRinPWjaBGDnlhP2huswnzusMe4cLt75F7iMewpNHSv7vVfd-N4-6GEcvmxtj0Th8QU1M8_UoEYW4CORfiN9ybo0clL2l3eYduUvwujUWkbS5NkSj8HLAsR0SL_2HKVotHIOA/s1600/travel-stats.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEifGVRinPWjaBGDnlhP2huswnzusMe4cLt75F7iMewpNHSv7vVfd-N4-6GEcvmxtj0Th8QU1M8_UoEYW4CORfiN9ybo0clL2l3eYduUvwujUWkbS5NkSj8HLAsR0SL_2HKVotHIOA/s320/travel-stats.png" /></a></div>
</p>
<p>
I was 3 times in US (Austin & Santa Clara), 4 times in UK, 2 times in Spain (vacation), once in Italy, Austria, and even in Moscow (first time in 12 years), and numerous trips across Germany on train (Deutsche Bahn isn't reliable, especially ICEs) and airplanes. As result of many trips, I developed a set of habits & tricks to make life easier.
</p>
</div>
</div>
<div id="outline-container-org5135ae4" class="outline-4">
<h3 id="org5135ae4">Plans for the next year</h3>
<div class="outline-text-4" id="text-1-1-5">
<ul class="org-ul">
<li>Spend more time on learning new stuff & refreshing old knowledge - from programming languages (Go, Erlang), to machine learning (text & image classification, reinforcement learning), graph-related stuff;</li>
<li>Spend more time on sport - return to regular running & cycling, although it's not always doable on trips - hotels in Germany often don't have fitness rooms in contrast to hotels in USA;</li>
<li>Read more - will try to increase my reading challenge;</li>
<li>Return to regular blogging, maybe will try to use some other platform, like, Medium.</li>
</ul>
<br />
<br />
<p>
<b>And I want to wish all my readers a happy & prosperous New Year!</b>
</p>
</div>
</div>
Alex Otthttp://www.blogger.com/profile/13001951608173211050noreply@blogger.com2tag:blogger.com,1999:blog-6862508.post-4871254963445512152017-10-05T18:39:00.000+02:002017-11-30T13:05:45.877+01:00Evaluating fastText's models for language detection<p><b>Updated on 30th Nov 2017 with results for OpenNLP</b></p>
<p>
Three days ago, fastText development team from Facebook Research <a href="https://fasttext.cc/blog/2017/10/02/blog-post.html">announced the availability</a> of the new models for fastText designed for language detection & providing better results than <a href="https://github.com/saffsd/langid.py">langid.py library</a>. Because in one of my projects we're heavily relying on the language detection (via JNI wrapper to Google's CLD - see below), I decided to conduct my own experiment to access quality of detection using fastText.
</p>
<h2 id="org52bab71">Setup</h2>
<p>
Software:
</p>
<ul class="org-ul">
<li>CLD was used from Java via JNI wrapper developed by our team. CLD was called in extended mode with following flags enabled: <code>kCLDFlagBestEffort</code> & <code>kCLDFlagScoreAsQuads</code>. We also have added some heuristics for selection of language in case of text in multiple languages;</li>
<li>langid.py was installed via <code>pip</code>;</li>
<li>fastText was cloned directly from Git, and 2 models were used - full & compressed;</li>
<li>OpenNLP 1.8.3 with language model for 103 languages (file <tt>langdetect-183.bin</tt>).</li>
</ul>
<p>
Test data set consists of 11,062 entries in 58 languages that I collected including some entries from "debug data set" that contains entries where CLD isn't able to make prediction at all, or make incorrect predictions. Initially I tried to have more or less balanced data set - having similar number of entries for all languages, but after entries were manually re-checked, some languages got more sample than other (especially Russian). Full list of languages & number of entries for each of them you can find <a href="https://gist.github.com/alexott/dd43fa8d1db4b8202d55c6325b2c69c2#file-cld-stat-txt">here</a>. Every line of file with input data represented text extracted from different sites, length of text ranged from 15 characters to 10-20Kb of text. Dataset is available in <a href="https://www.dropbox.com/s/j908sdjt0uc3ed8/lang-detection-data.tar.gz?dl=0">my Dropbox</a>.
</p>
<p>
Tests were made on quite old (2013th) laptop with i7-3720QM/2.6GHz CPU & 16Gb of RAM.
</p>
<h2 id="orgbcf5f7c">Results</h2>
<p>
Full results for every language that I tested are in table at the end of blog post & on <a href="https://gist.github.com/alexott/dd43fa8d1db4b8202d55c6325b2c69c2">Github</a>. From them I can make following conclusions:
</p>
<ul class="org-ul">
<li>all detectors are equally good on some languages, such as, Japanese, Chinese, Vietnamese, Greek, Arabic, Farsi, Georgian, etc. - for them the accuracy of detection is between 98 & 100%;</li>
<li>CLD is much better in detection of "rare" languages, especially for languages, that are similar to more frequently used - Afrikaans vs Dutch, Azerbaijani vs. Turkish, Malay vs. Indonesian, Nepali vs. Hindi, Russian vs Bulgarian, etc. (it could be result of imbalance of training data - I need to check the source dataset);</li>
<li>for "major" languages not mentioned above (English, French, German, Spanish, Portuguese, Dutch) the fastText results are much better than CLD's, and in many cases lingid.py's & OpenNLP's;</li>
<li>for many languages results for "compressed" fastText model are slightly worse than results from "full" model (mostly only by 1-2%, but could be higher, like for Kazakh when difference is 33%), but there are languages where the situation is different - results for compressed are slight better than for full (for example, for German or Dutch);</li>
<lii>OpenNLP has many misclassifications for Cyrillic languages - Russian/Ukrainian, ...</li>
</ul>
<p>Rafael Oliveira <a href="https://www.facebook.com/groups/1174547215919768/permalink/1702123316495486/?comment_id=1704414996266318&reply_comment_id=1705159672858517¬if_id=1507280476710677¬if_t=group_comment">posted on FB</a> a simple diagram that shows what languages are detected better by CLD & what is better handled by fastText</p>
<p>
Here are some additional notes about differences in behavior of detectors that I observe during analyzing results:
</p>
<ul class="org-ul">
<li>fastText is more reliable than CLD on the short texts;</li>
<li>fastText models & langid.py detect language as Hebrew instead of Jewish as in CLD. Similarly, CLD uses 'in' for Indonesian language instead of standard 'id' used by fastText & langid.py;</li>
<li>fastText distinguish between Cyrillic- & Latin-based versions of Serbian;</li>
<li>CLD tends to incorporate geographical & person's names into detection results - for example, blog post in German about travel to Iceland is detected as Icelandic, while fastText detects it as German;</li>
<li>In extended detection mode CLD tends to select more rare language, like, Galician or Catalan over Spanish, Serbian instead of Russian, etc.</li>
<li>OpenNLP isn't very good in detection for short texts.</li>
</ul>
<p>
Performance-wise, the langid.py is much slower than both CLD & fastText. On average, CLD requires 0.5-1 ms to perform language detection. For fastText & langid.py I don't have precise numbers <span class="underline">yet</span>, only approximates based on speed of execution of corresponding programs.
</p>
<h2 id="org1401abb">Conclusion</h2>
<p>
The models released by fastText development team provides very good alternative to existing language detection tools, like, Google's CLD & langid.py - for most of "popular" languages, these models provides higher detection accuracy comparing to other tools, combined with high speed of detection (drawback of langid.py). Even using "compressed" model it's possible to reach good detection accuracy. Although for some less frequently used languages, CLD & langid.py may show better results.
</p>
<h2 id="org79fd113">Detailed results</h2>
<p>
This table contains accuracy of detection for every language tested using CLD, fastText with full model, fastText with compressed model, and langid.py:
</p>
<table border="2" cellspacing="0" cellpadding="6" rules="groups" frame="hsides">
<colgroup>
<col class="org-left" />
<col class="org-right" />
<col class="org-right" />
<col class="org-right" />
<col class="org-right" />
<col class="org-right" />
<col class="org-right" />
</colgroup>
<thead>
<tr>
<th scope="col" class="org-left">Lang</th>
<th scope="col" class="org-right">N of</th>
<th scope="col" class="org-right">CLD</th>
<th scope="col" class="org-right">fastText</th>
<th scope="col" class="org-right">fastText</th>
<th scope="col" class="org-right">langid.py</th>
<th scope="col" class="org-right">OpenNLP</th>
</tr>
<tr>
<th scope="col" class="org-left"> </th>
<th scope="col" class="org-right">samples</th>
<th scope="col" class="org-right"> </th>
<th scope="col" class="org-right">full</th>
<th scope="col" class="org-right">compressed</th>
<th scope="col" class="org-right"> </th>
<th scope="col" class="org-right"> </th>
</tr>
</thead>
<tbody>
<tr>
<td class="org-left">af</td>
<td class="org-right">34</td>
<td class="org-right">0.94</td>
<td class="org-right">0.61</td>
<td class="org-right">0.76</td>
<td class="org-right">0.76</td>
<td class="org-right">0.82</td>
</tr>
<tr>
<td class="org-left">ar</td>
<td class="org-right">201</td>
<td class="org-right">0.99</td>
<td class="org-right">1.00</td>
<td class="org-right">1.00</td>
<td class="org-right">1.00</td>
<td class="org-right">0.92</td>
</tr>
<tr>
<td class="org-left">az</td>
<td class="org-right">161</td>
<td class="org-right">1.00</td>
<td class="org-right">0.93</td>
<td class="org-right">0.91</td>
<td class="org-right">0.88</td>
<td class="org-right">0.88</td>
</tr>
<tr>
<td class="org-left">be</td>
<td class="org-right">157</td>
<td class="org-right">1.00</td>
<td class="org-right">0.98</td>
<td class="org-right">0.91</td>
<td class="org-right">0.85</td>
<td class="org-right">0.85</td>
</tr>
<tr>
<td class="org-left">bg</td>
<td class="org-right">203</td>
<td class="org-right">0.96</td>
<td class="org-right">0.95</td>
<td class="org-right">0.94</td>
<td class="org-right">0.92</td>
<td class="org-right">0.94</td>
</tr>
<tr>
<td class="org-left">bn</td>
<td class="org-right">198</td>
<td class="org-right">1.00</td>
<td class="org-right">1.00</td>
<td class="org-right">1.00</td>
<td class="org-right">0.98</td>
<td class="org-right">0.70</td>
</tr>
<tr>
<td class="org-left">ca</td>
<td class="org-right">193</td>
<td class="org-right">1.00</td>
<td class="org-right">0.98</td>
<td class="org-right">0.98</td>
<td class="org-right">0.93</td>
<td class="org-right">0.97</td>
</tr>
<tr>
<td class="org-left">cs</td>
<td class="org-right">196</td>
<td class="org-right">0.99</td>
<td class="org-right">0.93</td>
<td class="org-right">0.91</td>
<td class="org-right">0.87</td>
<td class="org-right">0.91</td>
</tr>
<tr>
<td class="org-left">da</td>
<td class="org-right">200</td>
<td class="org-right">0.99</td>
<td class="org-right">0.93</td>
<td class="org-right">0.96</td>
<td class="org-right">0.85</td>
<td class="org-right">0.98</td>
</tr>
<tr>
<td class="org-left">de</td>
<td class="org-right">236</td>
<td class="org-right">0.80</td>
<td class="org-right">0.94</td>
<td class="org-right">0.95</td>
<td class="org-right">0.94</td>
<td class="org-right">0.89</td>
</tr>
<tr>
<td class="org-left">el</td>
<td class="org-right">199</td>
<td class="org-right">1.00</td>
<td class="org-right">1.00</td>
<td class="org-right">1.00</td>
<td class="org-right">1.00</td>
<td class="org-right">0.95</td>
</tr>
<tr>
<td class="org-left">en</td>
<td class="org-right">249</td>
<td class="org-right">0.80</td>
<td class="org-right">0.98</td>
<td class="org-right">0.95</td>
<td class="org-right">0.86</td>
<td class="org-right">0.93</td>
</tr>
<tr>
<td class="org-left">es</td>
<td class="org-right">255</td>
<td class="org-right">0.78</td>
<td class="org-right">0.98</td>
<td class="org-right">0.98</td>
<td class="org-right">0.96</td>
<td class="org-right">0.94</td>
</tr>
<tr>
<td class="org-left">et</td>
<td class="org-right">198</td>
<td class="org-right">1.00</td>
<td class="org-right">0.97</td>
<td class="org-right">0.97</td>
<td class="org-right">0.91</td>
<td class="org-right">0.57</td>
</tr>
<tr>
<td class="org-left">fa</td>
<td class="org-right">200</td>
<td class="org-right">1.00</td>
<td class="org-right">1.00</td>
<td class="org-right">1.00</td>
<td class="org-right">0.98</td>
<td class="org-right">0.99</td>
</tr>
<tr>
<td class="org-left">fi</td>
<td class="org-right">199</td>
<td class="org-right">0.98</td>
<td class="org-right">0.99</td>
<td class="org-right">1.00</td>
<td class="org-right">0.91</td>
<td class="org-right">0.95</td>
</tr>
<tr>
<td class="org-left">fr</td>
<td class="org-right">205</td>
<td class="org-right">0.96</td>
<td class="org-right">0.99</td>
<td class="org-right">0.99</td>
<td class="org-right">0.98</td>
<td class="org-right">0.97</td>
</tr>
<tr>
<td class="org-left">hi</td>
<td class="org-right">180</td>
<td class="org-right">0.98</td>
<td class="org-right">0.98</td>
<td class="org-right">1.00</td>
<td class="org-right">0.96</td>
<td class="org-right">0.73</td>
</tr>
<tr>
<td class="org-left">hr</td>
<td class="org-right">175</td>
<td class="org-right">0.81</td>
<td class="org-right">0.80</td>
<td class="org-right">0.80</td>
<td class="org-right">0.81</td>
<td class="org-right">0.73</td>
</tr>
<tr>
<td class="org-left">hu</td>
<td class="org-right">197</td>
<td class="org-right">1.00</td>
<td class="org-right">0.98</td>
<td class="org-right">0.98</td>
<td class="org-right">0.97</td>
<td class="org-right">0.96</td>
</tr>
<tr>
<td class="org-left">hy</td>
<td class="org-right">200</td>
<td class="org-right">1.00</td>
<td class="org-right">1.00</td>
<td class="org-right">0.98</td>
<td class="org-right">0.94</td>
<td class="org-right">0.82</td>
</tr>
<tr>
<td class="org-left">id</td>
<td class="org-right">200</td>
<td class="org-right">0.98</td>
<td class="org-right">0.95</td>
<td class="org-right">0.92</td>
<td class="org-right">0.93</td>
<td class="org-right">0.32</td>
</tr>
<tr>
<td class="org-left">is</td>
<td class="org-right">97</td>
<td class="org-right">1.00</td>
<td class="org-right">0.73</td>
<td class="org-right">0.77</td>
<td class="org-right">0.89</td>
<td class="org-right">0.75</td>
</tr>
<tr>
<td class="org-left">it</td>
<td class="org-right">206</td>
<td class="org-right">0.95</td>
<td class="org-right">0.99</td>
<td class="org-right">0.99</td>
<td class="org-right">0.98</td>
<td class="org-right">0.97</td>
</tr>
<tr>
<td class="org-left">iw</td>
<td class="org-right">200</td>
<td class="org-right">1.00</td>
<td class="org-right">1.00</td>
<td class="org-right">1.00</td>
<td class="org-right">1.00</td>
<td class="org-right">0.95</td>
</tr>
<tr>
<td class="org-left">ja</td>
<td class="org-right">199</td>
<td class="org-right">1.00</td>
<td class="org-right">0.99</td>
<td class="org-right">0.99</td>
<td class="org-right">1.00</td>
<td class="org-right">0.98</td>
</tr>
<tr>
<td class="org-left">ka</td>
<td class="org-right">200</td>
<td class="org-right">1.00</td>
<td class="org-right">1.00</td>
<td class="org-right">1.00</td>
<td class="org-right">0.99</td>
<td class="org-right">0.91</td>
</tr>
<tr>
<td class="org-left">kk</td>
<td class="org-right">120</td>
<td class="org-right">1.00</td>
<td class="org-right">0.95</td>
<td class="org-right">0.62</td>
<td class="org-right">0.53</td>
<td class="org-right">0.60</td>
</tr>
<tr>
<td class="org-left">ko</td>
<td class="org-right">199</td>
<td class="org-right">1.00</td>
<td class="org-right">1.00</td>
<td class="org-right">1.00</td>
<td class="org-right">1.00</td>
<td class="org-right">0.61</td>
</tr>
<tr>
<td class="org-left">lt</td>
<td class="org-right">198</td>
<td class="org-right">1.00</td>
<td class="org-right">1.00</td>
<td class="org-right">0.87</td>
<td class="org-right">0.96</td>
<td class="org-right">1.00</td>
</tr>
<tr>
<td class="org-left">lv</td>
<td class="org-right">192</td>
<td class="org-right">1.00</td>
<td class="org-right">0.99</td>
<td class="org-right">0.93</td>
<td class="org-right">0.92</td>
<td class="org-right">0.90</td>
</tr>
<tr>
<td class="org-left">mk</td>
<td class="org-right">199</td>
<td class="org-right">0.99</td>
<td class="org-right">0.98</td>
<td class="org-right">0.97</td>
<td class="org-right">0.87</td>
<td class="org-right">0.95</td>
</tr>
<tr>
<td class="org-left">mn</td>
<td class="org-right">200</td>
<td class="org-right">1.00</td>
<td class="org-right">0.97</td>
<td class="org-right">0.96</td>
<td class="org-right">1.00</td>
<td class="org-right">0.95</td>
</tr>
<tr>
<td class="org-left">mr</td>
<td class="org-right">196</td>
<td class="org-right">1.00</td>
<td class="org-right">0.94</td>
<td class="org-right">0.94</td>
<td class="org-right">0.97</td>
<td class="org-right">0.80</td>
</tr>
<tr>
<td class="org-left">ms</td>
<td class="org-right">148</td>
<td class="org-right">1.00</td>
<td class="org-right">0.10</td>
<td class="org-right">0.09</td>
<td class="org-right">0.40</td>
<td class="org-right">0.31</td>
</tr>
<tr>
<td class="org-left">ne</td>
<td class="org-right">198</td>
<td class="org-right">1.00</td>
<td class="org-right">0.82</td>
<td class="org-right">0.85</td>
<td class="org-right">0.59</td>
<td class="org-right">0.73</td>
</tr>
<tr>
<td class="org-left">nl</td>
<td class="org-right">301</td>
<td class="org-right">0.66</td>
<td class="org-right">0.97</td>
<td class="org-right">0.98</td>
<td class="org-right">0.91</td>
<td class="org-right">0.94</td>
</tr>
<tr>
<td class="org-left">no</td>
<td class="org-right">183</td>
<td class="org-right">0.98</td>
<td class="org-right">0.97</td>
<td class="org-right">0.96</td>
<td class="org-right">0.66</td>
<td class="org-right">0.95</td>
</tr>
<tr>
<td class="org-left">pl</td>
<td class="org-right">196</td>
<td class="org-right">1.00</td>
<td class="org-right">0.98</td>
<td class="org-right">0.98</td>
<td class="org-right">0.95</td>
<td class="org-right">0.94</td>
</tr>
<tr>
<td class="org-left">pt</td>
<td class="org-right">206</td>
<td class="org-right">0.95</td>
<td class="org-right">0.97</td>
<td class="org-right">0.95</td>
<td class="org-right">0.95</td>
<td class="org-right">0.95</td>
</tr>
<tr>
<td class="org-left">ro</td>
<td class="org-right">193</td>
<td class="org-right">1.00</td>
<td class="org-right">0.89</td>
<td class="org-right">0.90</td>
<td class="org-right">0.90</td>
<td class="org-right">0.98</td>
</tr>
<tr>
<td class="org-left">ru</td>
<td class="org-right">446</td>
<td class="org-right">0.44</td>
<td class="org-right">0.99</td>
<td class="org-right">0.98</td>
<td class="org-right">0.94</td>
<td class="org-right">0.85</td>
</tr>
<tr>
<td class="org-left">si</td>
<td class="org-right">200</td>
<td class="org-right">1.00</td>
<td class="org-right">0.98</td>
<td class="org-right">0.99</td>
<td class="org-right">0.99</td>
<td class="org-right">0.73</td>
</tr>
<tr>
<td class="org-left">sk</td>
<td class="org-right">195</td>
<td class="org-right">1.00</td>
<td class="org-right">0.94</td>
<td class="org-right">0.93</td>
<td class="org-right">0.97</td>
<td class="org-right">0.97</td>
</tr>
<tr>
<td class="org-left">sl</td>
<td class="org-right">162</td>
<td class="org-right">1.00</td>
<td class="org-right">0.96</td>
<td class="org-right">0.96</td>
<td class="org-right">0.92</td>
<td class="org-right">0.99</td>
</tr>
<tr>
<td class="org-left">sq</td>
<td class="org-right">151</td>
<td class="org-right">1.00</td>
<td class="org-right">0.92</td>
<td class="org-right">0.94</td>
<td class="org-right">0.90</td>
<td class="org-right">0.79</td>
</tr>
<tr>
<td class="org-left">sr</td>
<td class="org-right">91</td>
<td class="org-right">0.82</td>
<td class="org-right">0.65</td>
<td class="org-right">0.68</td>
<td class="org-right">0.20</td>
<td class="org-right">0.17</td>
</tr>
<tr>
<td class="org-left">sv</td>
<td class="org-right">202</td>
<td class="org-right">0.99</td>
<td class="org-right">1.00</td>
<td class="org-right">1.00</td>
<td class="org-right">0.99</td>
<td class="org-right">0.99</td>
</tr>
<tr>
<td class="org-left">ta</td>
<td class="org-right">199</td>
<td class="org-right">1.00</td>
<td class="org-right">1.00</td>
<td class="org-right">1.00</td>
<td class="org-right">0.98</td>
<td class="org-right">0.83</td>
</tr>
<tr>
<td class="org-left">tg</td>
<td class="org-right">82</td>
<td class="org-right">1.00</td>
<td class="org-right">0.90</td>
<td class="org-right">0.82</td>
<td class="org-right">0</td>
<td class="org-right">0.90</td>
</tr>
<tr>
<td class="org-left">th</td>
<td class="org-right">186</td>
<td class="org-right">1.00</td>
<td class="org-right">1.00</td>
<td class="org-right">1.00</td>
<td class="org-right">0.90</td>
<td class="org-right">0.74</td>
</tr>
<tr>
<td class="org-left">tr</td>
<td class="org-right">195</td>
<td class="org-right">0.95</td>
<td class="org-right">0.97</td>
<td class="org-right">0.97</td>
<td class="org-right">0.94</td>
<td class="org-right">0.97</td>
</tr>
<tr>
<td class="org-left">ug</td>
<td class="org-right">193</td>
<td class="org-right">1.00</td>
<td class="org-right">0.99</td>
<td class="org-right">1.00</td>
<td class="org-right">0.98</td>
<td class="org-right">0</td>
</tr>
<tr>
<td class="org-left">uk</td>
<td class="org-right">197</td>
<td class="org-right">0.97</td>
<td class="org-right">0.97</td>
<td class="org-right">0.97</td>
<td class="org-right">0.92</td>
<td class="org-right">0.90</td>
</tr>
<tr>
<td class="org-left">ur</td>
<td class="org-right">196</td>
<td class="org-right">1.00</td>
<td class="org-right">0.96</td>
<td class="org-right">0.95</td>
<td class="org-right">0.96</td>
<td class="org-right">0.86</td>
</tr>
<tr>
<td class="org-left">uz</td>
<td class="org-right">97</td>
<td class="org-right">1.00</td>
<td class="org-right">0.35</td>
<td class="org-right">0.23</td>
<td class="org-right">0</td>
<td class="org-right">0.84</td>
</tr>
<tr>
<td class="org-left">vi</td>
<td class="org-right">198</td>
<td class="org-right">1.00</td>
<td class="org-right">0.98</td>
<td class="org-right">0.98</td>
<td class="org-right">0.98</td>
<td class="org-right">0.87</td>
</tr>
<tr>
<td class="org-left">zh</td>
<td class="org-right">205</td>
<td class="org-right">0.97</td>
<td class="org-right">0.99</td>
<td class="org-right">0.99</td>
<td class="org-right">0.99</td>
<td class="org-right">0.97</td>
</tr>
</tbody>
</table>
Alex Otthttp://www.blogger.com/profile/13001951608173211050noreply@blogger.com2tag:blogger.com,1999:blog-6862508.post-31297979498119850232017-09-25T20:45:00.001+02:002017-09-25T21:00:13.116+02:00Confluent Platform: Installing C/C++ packages on Ubuntu<style type="text/css">
<!--/*--><![CDATA[/*><!--*/
.title { text-align: center;
margin-bottom: .2em; }
.subtitle { text-align: center;
font-size: medium;
font-weight: bold;
margin-top:0; }
.todo { font-family: monospace; color: red; }
.done { font-family: monospace; color: green; }
.priority { font-family: monospace; color: orange; }
.tag { background-color: #eee; font-family: monospace;
padding: 2px; font-size: 80%; font-weight: normal; }
.timestamp { color: #bebebe; }
.timestamp-kwd { color: #5f9ea0; }
.org-right { margin-left: auto; margin-right: 0px; text-align: right; }
.org-left { margin-left: 0px; margin-right: auto; text-align: left; }
.org-center { margin-left: auto; margin-right: auto; text-align: center; }
.underline { text-decoration: underline; }
#postamble p, #preamble p { font-size: 90%; margin: .2em; }
p.verse { margin-left: 3%; }
pre {
border: 1px solid #ccc;
box-shadow: 3px 3px 3px #eee;
padding: 8pt;
font-family: monospace;
overflow: auto;
margin: 1.2em;
}
pre.src {
position: relative;
overflow: visible;
padding-top: 1.2em;
}
pre.src:before {
display: none;
position: absolute;
background-color: white;
top: -10px;
right: 10px;
padding: 3px;
border: 1px solid black;
}
pre.src:hover:before { display: inline;}
/* Languages per Org manual */
pre.src-asymptote:before { content: 'Asymptote'; }
pre.src-awk:before { content: 'Awk'; }
pre.src-C:before { content: 'C'; }
/* pre.src-C++ doesn't work in CSS */
pre.src-clojure:before { content: 'Clojure'; }
pre.src-css:before { content: 'CSS'; }
pre.src-D:before { content: 'D'; }
pre.src-ditaa:before { content: 'ditaa'; }
pre.src-dot:before { content: 'Graphviz'; }
pre.src-calc:before { content: 'Emacs Calc'; }
pre.src-emacs-lisp:before { content: 'Emacs Lisp'; }
pre.src-fortran:before { content: 'Fortran'; }
pre.src-gnuplot:before { content: 'gnuplot'; }
pre.src-haskell:before { content: 'Haskell'; }
pre.src-java:before { content: 'Java'; }
pre.src-js:before { content: 'Javascript'; }
pre.src-latex:before { content: 'LaTeX'; }
pre.src-ledger:before { content: 'Ledger'; }
pre.src-lisp:before { content: 'Lisp'; }
pre.src-lilypond:before { content: 'Lilypond'; }
pre.src-matlab:before { content: 'MATLAB'; }
pre.src-mscgen:before { content: 'Mscgen'; }
pre.src-ocaml:before { content: 'Objective Caml'; }
pre.src-octave:before { content: 'Octave'; }
pre.src-org:before { content: 'Org mode'; }
pre.src-oz:before { content: 'OZ'; }
pre.src-plantuml:before { content: 'Plantuml'; }
pre.src-processing:before { content: 'Processing.js'; }
pre.src-python:before { content: 'Python'; }
pre.src-R:before { content: 'R'; }
pre.src-ruby:before { content: 'Ruby'; }
pre.src-sass:before { content: 'Sass'; }
pre.src-scheme:before { content: 'Scheme'; }
pre.src-screen:before { content: 'Gnu Screen'; }
pre.src-sed:before { content: 'Sed'; }
pre.src-sh:before { content: 'shell'; }
pre.src-sql:before { content: 'SQL'; }
pre.src-sqlite:before { content: 'SQLite'; }
/* additional languages in org.el's org-babel-load-languages alist */
pre.src-forth:before { content: 'Forth'; }
pre.src-io:before { content: 'IO'; }
pre.src-J:before { content: 'J'; }
pre.src-makefile:before { content: 'Makefile'; }
pre.src-maxima:before { content: 'Maxima'; }
pre.src-perl:before { content: 'Perl'; }
pre.src-picolisp:before { content: 'Pico Lisp'; }
pre.src-scala:before { content: 'Scala'; }
pre.src-shell:before { content: 'Shell Script'; }
pre.src-ebnf2ps:before { content: 'ebfn2ps'; }
/* additional language identifiers per "defun org-babel-execute"
in ob-*.el */
pre.src-cpp:before { content: 'C++'; }
pre.src-abc:before { content: 'ABC'; }
pre.src-coq:before { content: 'Coq'; }
pre.src-groovy:before { content: 'Groovy'; }
/* additional language identifiers from org-babel-shell-names in
ob-shell.el: ob-shell is the only babel language using a lambda to put
the execution function name together. */
pre.src-bash:before { content: 'bash'; }
pre.src-csh:before { content: 'csh'; }
pre.src-ash:before { content: 'ash'; }
pre.src-dash:before { content: 'dash'; }
pre.src-ksh:before { content: 'ksh'; }
pre.src-mksh:before { content: 'mksh'; }
pre.src-posh:before { content: 'posh'; }
/* Additional Emacs modes also supported by the LaTeX listings package */
pre.src-ada:before { content: 'Ada'; }
pre.src-asm:before { content: 'Assembler'; }
pre.src-caml:before { content: 'Caml'; }
pre.src-delphi:before { content: 'Delphi'; }
pre.src-html:before { content: 'HTML'; }
pre.src-idl:before { content: 'IDL'; }
pre.src-mercury:before { content: 'Mercury'; }
pre.src-metapost:before { content: 'MetaPost'; }
pre.src-modula-2:before { content: 'Modula-2'; }
pre.src-pascal:before { content: 'Pascal'; }
pre.src-ps:before { content: 'PostScript'; }
pre.src-prolog:before { content: 'Prolog'; }
pre.src-simula:before { content: 'Simula'; }
pre.src-tcl:before { content: 'tcl'; }
pre.src-tex:before { content: 'TeX'; }
pre.src-plain-tex:before { content: 'Plain TeX'; }
pre.src-verilog:before { content: 'Verilog'; }
pre.src-vhdl:before { content: 'VHDL'; }
pre.src-xml:before { content: 'XML'; }
pre.src-nxml:before { content: 'XML'; }
/* add a generic configuration mode; LaTeX export needs an additional
(add-to-list 'org-latex-listings-langs '(conf " ")) in .emacs */
pre.src-conf:before { content: 'Configuration File'; }
table { border-collapse:collapse; }
caption.t-above { caption-side: top; }
caption.t-bottom { caption-side: bottom; }
td, th { vertical-align:top; }
th.org-right { text-align: center; }
th.org-left { text-align: center; }
th.org-center { text-align: center; }
td.org-right { text-align: right; }
td.org-left { text-align: left; }
td.org-center { text-align: center; }
dt { font-weight: bold; }
.footpara { display: inline; }
.footdef { margin-bottom: 1em; }
.figure { padding: 1em; }
.figure p { text-align: center; }
.inlinetask {
padding: 10px;
border: 2px solid gray;
margin: 10px;
background: #ffffcc;
}
#org-div-home-and-up
{ text-align: right; font-size: 70%; white-space: nowrap; }
textarea { overflow-x: auto; }
.linenr { font-size: smaller }
.code-highlighted { background-color: #ffff00; }
.org-info-js_info-navigation { border-style: none; }
#org-info-js_console-label
{ font-size: 10px; font-weight: bold; white-space: nowrap; }
.org-info-js_search-highlight
{ background-color: #ffff00; color: #000000; font-weight: bold; }
/*]]>*/-->
</style>
<p>This is mostly note for myself, just not to forget, but I hope that it could be useful for somebody as well...</p>
<p>
Confluent Platform provides convenient way to <a href="https://docs.confluent.io/current/installation.html#installation-apt">install/update provided packages via apt</a>. This method works fine for Java code because it's platform independent, but if you try to install on Ubuntu binary packages for librdkafka & other packages that are written in C/C++, then you get in troubles - it looks like that these packages are compiled for Debian, so most of binary dependencies are wrong.</p>
<p>To solve this problem, I wrote a small script that allows to build all these packages from sources, and prevent them from automatic upgrade.<br />
Pre-requisite for this script is addition of deb-src configuration to apt with following command:</p>
<pre><span style="font-family: "Courier New",Courier,monospace;"><span style="font-size: x-small;">sudo add-apt-repository <span class="s2">"deb-src http://packages.confluent.io/deb/<version> stable main"</version></span></span></span></pre>
<p>The script for building all C/C++-based packages looks following way (also available as <a href="https://gist.github.com/alexott/76847779adf8adb2a488d835f0eabb35">Gist</a>):</p>
<pre class="src src-shell"><span style="color: #7f7f7f;">#</span><span style="color: #7f7f7f;">!/bin/</span><span style="color: #a020f0;">bash</span>
<span style="color: #7f7f7f;"># </span><span style="color: #7f7f7f;">By default, Confluent provides Debian versions of their packages via APT.</span>
<span style="color: #7f7f7f;"># </span><span style="color: #7f7f7f;">as result, they are linked to other versions of the libraries than in Ubuntu, ...</span>
<span style="color: #7f7f7f;"># </span><span style="color: #7f7f7f;">This small script rebuilds them on Ubuntu, or Debian versions different </span>
<span style="color: #7f7f7f;"># </span><span style="color: #7f7f7f;">from provided by default.</span>
<span style="color: #a0522d;">MY_BUILD_DIR</span>=<span style="color: #8b2252;">"build-$$"</span>
mkdir $<span style="color: #a0522d;">MY_BUILD_DIR</span>
<span style="color: #483d8b;">cd</span> $<span style="color: #a0522d;">MY_BUILD_DIR</span>
apt-get source librdkafka libavro-cpp-dev libavro-c-dev confluent-libserdes-dev
<span style="color: #7f7f7f;"># </span><span style="color: #7f7f7f;">install necessary build dependencies</span>
apt-get -y install debhelper libsasl2-dev liblz4-dev libsnappy-dev liblzma-dev <span style="color: #8b2252;">\</span>
libjansson-dev libboost-dev libboost-filesystem-dev libboost-system-dev <span style="color: #8b2252;">\</span>
libboost-program-options-dev libboost-iostreams-dev <span style="color: #8b2252;">\</span>
libcurl4-openssl-dev cmake zlib1g-dev libssl-dev
<span style="color: #7f7f7f;"># </span><span style="color: #7f7f7f;">build librdkafka</span>
<span style="color: #483d8b;">cd</span> <span style="color: #ff00ff;">`find . -maxdepth 1 -type d -a -name librdkafka\*`</span>
dpkg-buildpackage -b -uc
<span style="color: #483d8b;">cd</span> ..
dpkg -i librdkafka*.deb
<span style="color: #7f7f7f;"># </span><span style="color: #7f7f7f;">build avro-c</span>
<span style="color: #483d8b;">cd</span> <span style="color: #ff00ff;">`find . -maxdepth 1 -type d -a -name avro-c-\*`</span>
dpkg-buildpackage -b -uc
<span style="color: #483d8b;">cd</span> ..
<span style="color: #7f7f7f;"># </span><span style="color: #7f7f7f;">build avro-cpp</span>
<span style="color: #483d8b;">cd</span> <span style="color: #ff00ff;">`find . -maxdepth 1 -type d -a -name avro-cpp-\*`</span>
dpkg-buildpackage -b -uc
<span style="color: #483d8b;">cd</span> ..
dpkg -i libavro-c*.deb
<span style="color: #7f7f7f;"># </span><span style="color: #7f7f7f;">build libserdes</span>
<span style="color: #483d8b;">cd</span> <span style="color: #ff00ff;">`find . -maxdepth 1 -type d -a -name confluent-libserdes-\*`</span>
dpkg-buildpackage -b -uc
<span style="color: #483d8b;">cd</span> ..
dpkg -i confluent-libserdes*.deb
<span style="color: #7f7f7f;"># </span><span style="color: #7f7f7f;">cleanup after build</span>
<span style="color: #483d8b;">cd</span> ..
rm -rf $<span style="color: #a0522d;">MY_BUILD_DIR</span>
<span style="color: #7f7f7f;"># </span><span style="color: #7f7f7f;">Mark packages as not upgradable automatically</span>
<span style="color: #a020f0;">for</span> i<span style="color: #a020f0;"> in</span> confluent-libserdes++1 confluent-libserdes-dev <span style="color: #8b2252;">\</span>
confluent-libserdes1 librdkafka1 librdkafka1-dbg libavro-c-dev <span style="color: #8b2252;">\</span>
libavro-cpp-dev librdkafka++1 librdkafka-dev ; <span style="color: #a020f0;">do</span>
apt-mark hold $<span style="color: #a0522d;">i</span>
<span style="color: #a020f0;">done</span>
</pre>
Alex Otthttp://www.blogger.com/profile/13001951608173211050noreply@blogger.com0tag:blogger.com,1999:blog-6862508.post-45578328404361134812016-06-02T22:28:00.001+02:002016-06-03T09:20:57.764+02:00Notes on practical machine learningThis post describes my personal observations collected when working on the projects that apply machine learning for solving practical problems. This topic was always interesting for me, together with other topics, such as, natural language processing, data mining, and related technologies, such as, big data. Although I have some theoretical background that is necessary for understanding of algorithms' applicability, I'm primarily looking to all of these topics from practical point of view.<br />
<br />
My first experience with practical machine learning I got in 2002-2003rd, when Paul Graham published the famous article <a href="http://www.paulgraham.com/spam.html">Plan for Spam</a>,
that described very simple Naive Bayes algorithm for classification of spam emails. Because of the simplicity of the algorithm, we decided to built the spam filtering module for our email filtering product "<a href="http://www.jet.msk.su/services_and_solutions/information_security/products_vendor/dozor_dzhet/">Dozor Jet</a>". And during the implementation, I first time came to the fact, that the most of time you spent not on algorithm itself, but on the related stuff - collection & cleanup of training data, analysis of results, mitigation of the false positives, etc.<br />
<br />
Later, when I've started to work at McAfee/Intel Security, I continued to experiment with different algorithms & libraries. Plus I got better theoretical background because of the online courses: <a href="http://ml-class.org/">ML Class</a> (by Andrew Ng) & <a href="http://ai-class.org/">AI Class</a>
(by Peter Norwig & Sebastian Thrun), plus some other on the Coursera. Two years ago I've moved to new group that worked on the application of machine learning technologies for the tasks of information security - that allowed me to get more practical experience in this area. I wrote this post based on this experience.<br />
<h3 id="orgc3ace22">
How to get good results when using machine learning</h3>
When applying machine learning-based methods for practical problems, the result depends on many factors, including:<br />
<ul class="org-ul">
<li><b>Clear understanding of what problem you want to solve</b>. This is quite important item - when you know what problem you want to solve, you have the information about restrictions that are put on the algorithms
(resource consumption, performance, etc.), you understand, how critical could be the false positives and true negatives, what our business gains if we solve this problem, etc. Quite often, the "good enough, but cheap" solution is better than "the best, but very expensive" from the development cost, performance, and other points of view (the good example of this is the story of the
Netflix prize, when <a href="https://www.techdirt.com/blog/innovation/articles/20120409/03412518422/why-netflix-never-implemented-algorithm-that-won-netflix-1-million-challenge.shtml">the algorithm that won the competition wasn't implemented, and company decided in favor of slightly worse, but much cheaper solution that was put into production</a>);</li>
<li><b>Very good knowledge of the problem domain</b>. From my point of view - this is most important factor for success of the project. Without this knowledge it's very hard (if possible at all) to build the very good model. The good knowledge of problem domain allows to concentrate on the key features (or their combinations) that will be included into model; ignore the unnecessary features that won't improve the model (or even make it worse); select correct methods for data collection, extraction & encoding; evaluate the applicability of particular algorithms, etc.;</li>
<li><b>Existence of good taxonomy for classifications tasks</b>. In such tasks you need to classify input data (text, or something else) as one, or more predefined classes. In some cases, the taxonomy may already exist, and the goal of project is to build the solution that uses this taxonomy. And sometimes, it's not so easy to use these taxonomies with machine learning-based methods. For example, if you have several "similar" classifications, then the model may produce false positives by classifying the data as one of the "similar" classes. For example, if you have class "Sport" and class "Gambling", it's very hard to distinguish the text on sites that discuss the football results, from sites that give the advice on football betting;</li>
<li><b>Understanding the applicability of particular algorithms for different classes of problems</b>. Currently you don't need to have a PhD in machine learning to apply it for practical problems. But you need to know what classes of algorithms exist, and to which problems they could be applied, what are resource requirements, what are requirements for data quality, etc. (Exists a lot of information on this topics - online courses, books, documentation for libraries, and much more. So, you only need to find time to absorb this information). Quite often, the best result is obtained not from the the single algorithm, but from the ensembles of algorithms that use many different models, that individually have only "average" quality, but works much better combined together. For example, Random Forest, different implementations of the Boosted Trees, etc.;</li>
<li><b>What methods are used for collection, extraction & encoding of data; Cleanness & balance of the training data set</b>.
For different tasks, there are different methods of data collection, but the main goal is to get high-quality set of training data, without any noise, if it's possible (although this could be quite expensive for big datasets). In some cases, you also need to balance amount of data between classes, so one class doesn't have much more samples, than others (although, there are some methods for solving this problem when building the models);</li>
<li><b>Features extraction</b>.
This is also very important factor that affects quality of the model - inclusion of unnecessary features into model can impair its quality, or greatly increase resource consumption when building the model. Quite often, the good results are obtained not from the individual features, but from their combinations;</li>
<li><b>No separation on "developers" & "scientists"</b>. This organizational factor is also very important. Sometimes, the "scientists" & "developers" exist as separate groups, without strong connection between them. In such cases, there are situations when "scientists" decide for particular algorithm that gives very good results in lab environment, but that very hard to apply in the real life. For example, it consumes quite significant amount of resources, hard to implement, very slow, etc. It's better to avoid such separation, and allow all groups to work on the project together from beginning.</li>
</ul>
<h3 id="orgdcc8ede">
Machine learning: theory & practice…</h3>
Some people, not so familiar with machine learning, when hear this term, start to imagine the sheets of papers covered with equations, very complex code, etc. In reality, the most of time is spent not on the implementation of machine learning algorithms (there are many existing libraries), but on the analytical & engineering tasks:<br />
<ul class="org-ul">
<li><b>data collection & cleanup</b>. This is one of the most labor intensive parts.
Usually, there are some existing libraries & tools, but quite often, you need to write something special anyway. After you collected the data, you need to check that you really collected the correct data. For example, if you work on the classification of the web sites, then there could be situation, when site's content doesn't match its classification that was assigned some time ago - the site's software can return error, site can change the owner, it could be hacked, so it will start to provide advertising of Viagra, porno-sites, etc. To check the correctness of data you often need to write special tools, perform cross-validation of data, etc.;</li>
<li><b>feature analysis & selection</b>. Another important part of workflow - you need to understand what features are important for model. To do this you need to have very good understanding of problem domain. For some tasks, such as text classification, you also need to perform feature selection (for text classification these are words in the text), otherwise the model will be too big, so the training will be very slow & consume a lot of resources;</li>
<li><b>data extraction</b> - how we extract selected features from the data. For example, if you work with texts, you may need to convert everything to the same encoding, or you may need to extract only the part of the data. For example, if you try to find spam emails, then you can ignore some email headers;</li>
<li><b>analysis of the results</b> - another labor-intensive part. There are different methods for evaluation of the model's quality, they are different for tasks of classifiation, clustering, and other. But the most of time is spent on the analysis of the false positives, why recall is too low, etc. As result of this analysis you may need to perform the tuning of model's parameters, make the changes in the training data, repeat the feature analysis;</li>
<li><b>tuning of model's parameters</b>. Many algorithms have set of parameters that could influence the quality of the model. But exists no universal set of parameters that could fit all tasks. So usually we build many models for different parameters, and then select that parameters that give the best model. But the other models could be also useful. For example, all built models could be used to build the meta-model, that has better quality than individual models;</li>
<li><b>deployment to production</b>. This includes: making sure that system works, checking the results on the live data, etc.</li>
</ul>
In practice, only small number of the projects may require the completely new algorithms, or re-implementation of existing algorithms from scratch. In most cases, there are many existing libraries/frameworks, such as, <a href="http://scikit-learn.org/">Scikit-Learn</a> for Python, <a href="https://mahout.apache.org/">Apache Mahout</a>, <a href="http://spark.apache.org/docs/latest/mllib-guide.html">Apache Spark ML</a>, <a href="http://www.h2o.ai/">H2O</a>, <a href="https://github.com/dmlc/xgboost">XGBoost</a>, libraries for R, and much more - these libraries are developed by many people, they battle-tested, and most of explicit bugs are already test. If you already have training dataset, and you understand what algorithm you can apply for it, then you can quickly build models using one of these libraries, evaluate their applicability for the problem, and make a decision about implementation.<br />
<br />
My observation on this topic are similar to the following phrase & figure from very interested paper <a href="https://papers.nips.cc/paper/5656-hidden-technical-debt-in-machine-learning-systems">Hidden Technical Debt in Machine Learning Systems</a> published by Google's researchers: "It may be surprising to the
academic community to know that only a tiny fraction of the code in many
ML systems is actually devoted to learning or prediction".<br />
<div class="separator" style="clear: both; text-align: center;">
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEies-kcZqXMEv4t0S-Z_9nVZ8ImRK9DdqojdxW3RVDczTwYZ1RgaJ-hjqzmbJp6pXT1i44EackruWLgguKcP91UQ_uTWrxM2OG86iqxe5TWiItqYEoVmBNl4Kn0BPwCWiR8qojA/s1600/ml-system.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" height="209" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEies-kcZqXMEv4t0S-Z_9nVZ8ImRK9DdqojdxW3RVDczTwYZ1RgaJ-hjqzmbJp6pXT1i44EackruWLgguKcP91UQ_uTWrxM2OG86iqxe5TWiItqYEoVmBNl4Kn0BPwCWiR8qojA/s640/ml-system.png" width="640" /></a></div>
<br />
<h3 id="org6143bde" style="text-align: center;">
* * *</h3>
In this post I tried to describe my experience that I got when implementing some practical projects. In next posts I'll try to expand some of the items above.<br />
<br />
I appreciate any feedback on this post, maybe something isn't described very clear - then I'll try to improve it. Thank youAlex Otthttp://www.blogger.com/profile/13001951608173211050noreply@blogger.com1tag:blogger.com,1999:blog-6862508.post-45685040357043835092016-03-28T17:27:00.000+02:002016-03-28T17:33:50.801+02:00Impressions from the Natural Language Processing course by Prof. RadevLast year I decided to refresh my knowledge in the area of the natural language processing (NLP), and took the <a href="https://www.coursera.org/course/nlpintro">"Introduction to Natural Language Processing" course from University Michigan</a> that was lead by Prof. Dragomir R. Radev. I already took the <a href="https://www.coursera.org/course/nlp">NLP course from Stanford University</a> lead by Dan Jurafsky & Christopher Manning 4 years ago, but it had slightly different syllabus, comparing to course from Prof. Radev.<br />
<br />
The course from University Michigan provided very good introduction into NLP, discussing the typical topics, such as syntax and parsing, language modelling, part of speech tagging, etc. in much details. Smaller part of the course provided the overview of the related areas, such as, question answering, text summarization, information retrieval, sentiment analysis, machine translation, etc.<br />
<br />
Every lecture was accomplished the quizzes (25% of final score), and there were also 3 programming assignments (quite challenging, in Python, 50% of final score):<br />
<ul class="org-ul">
<li>implement dependency parsing algorithm for 3 languages: English, Danish & Swedish - this was interesting, because these languages are quite different, and you need to work to find the correct sets of the features that will help you to train correct language model;</li>
<li>build language models using the Brown corpus, and after that implement the part-of-speech tagging using the Viterbi algorithm. This was most challenging task from the programming point of view. One of the problems was that Viterbi algorithm wasn't explained very good in the lecture, but the good explanation was found in the <a href="https://www.coursera.org/course/nlangp">NLP course from Columbia University</a>, and in the Jurafsky's book on the NLP;</li>
<li>implement word sense disambiguation using vector space model, also for 3 languages: English, Spanish & Catalan. This task was relatively easy, comparing to other assignments, although also required separate pieces of the code for every language.</li>
</ul>
And at the end of course there was an 2 hours exam (25% of the final score) that covered all topics from this course.<br />
<br />
Overall impression - very solid course on natural language processing, although the part on the related topics could be easily dropped, as it didn't provide deep dive into corresponding areas. This was pointed by many people in the post-course survey, and Prof. Radev answered that he'll rework the course to exclude these topics, and include the more information about new developments in the area of the deep learning - during the last year many interesting papers were published that had shown that DL methods achieve high accuracy in the NLP tasks, and without requiring the expensive language modeling. I'm looking forward to the new version of the course, just to get more information about this topic.Alex Otthttp://www.blogger.com/profile/13001951608173211050noreply@blogger.com2tag:blogger.com,1999:blog-6862508.post-82100416785715495162015-12-31T14:51:00.000+01:002016-03-28T17:28:15.468+02:00Looking back to 2015th...The 2015th was interesting year for me - a lot of different small & big projects in areas of machine learning, big data & real-time data processing. (I'll write a separate post about my experience in implementation of machine learning based system.)<br />
I mostly used the JVM-based languages, like Java 8 for primary development (it's not so bad with lambdas, etc.), Clojure for development of auxiliary tools & web services, and Scala for Spark programming. But from time to time I also touch C++ & Python (mostly for MOOCs), not mentioning a lot of shell scripting. I also spent quite a lot of time on evaluation of technologies for our projects, including Spark, Storm, Kafka, Solr, ELK stack (Elasticsearch/Logstash/Kibana), Docker, ...<br />
<br />
Part of the free time was used to participate in MOOCs - this year I was able to finish only 6 of them:<br />
<ul>
<li>Pattern Discovery in Data Mining (Coursera) - very high-level, but provides good overview of developments in this area;</li>
<li>Cluster Analysis in Data Mining (Coursera) - the same;</li>
<li>Introduction to Natural Language Processing (Courseara) - very interesting & challenging (<a href="http://alexott.blogspot.de/2016/03/impressions-from-natural-language.html">separate post about it</a>);</li>
<li>LAFF: Linear Algebra - Foundations to Frontiers (edX) - to refresh the math skills - very useful, but too long (and I don't like the technical implementation of the edX itself);</li>
<li> CS100.1x Introduction to Big Data with Apache Spark (edX) - short, but useful introduction into Spark. (<a href="http://alexott.blogspot.de/2015/07/impressions-from-cs1001x-introduction.html">separate blog post about it</a>)</li>
<li>CS190.1x Scalable Machine Learning (edX) - introduction to machine learning with Spark (<a href="http://alexott.blogspot.de/2015/07/impressions-from-cs1901x-scalable.html">separate blog post about it</a>).</li>
</ul>
Also, big portion of free time was spent on reading - I tried to read as much as in previous years, but this year I read <a href="https://www.goodreads.com/user_challenges/1852607">fewer books</a> (mostly work-related), but I also read a lot of papers, related to our development topics.<br />
One of the negative implications of not having much free time is that I spent almost no time on the open source activity, especially Incanter. The only regularly updated project right now is Planet Clojure.<br />
<br />
I also spent less time biking, switching mostly to running & using spinning bike at home. I'm still thinking about upgrading to road bike, but not sure that I'll have much free time to ride it.<br />
Together with my wife we did several long trips (Tenerife, Mallorca, Bayern), and a number of the local hiking trips - there is a plenty of nice places in our part of Germany.<br />
<br />
<br />
I wish all my readers Happy New Year & all the best in your life!Alex Otthttp://www.blogger.com/profile/13001951608173211050noreply@blogger.com0