Friday, August 19, 2016

Opinionated Microservices Framework Lagom

Lagom is a Swedish word meaning “just the right amount”. Microservices have often been categorised as small services. However, Lightbend wants to emphasize that finding the right boundaries between services, aligning them with bounded contexts, business capabilities, and isolation requirements are the most important aspects when architecting a microservice-based system. Therefore, it fits very well in a Domain-Driven Design focused mindset.

Following this will help in building a scalable and resilient system that is easy to deploy and manage. According to Lightbend the focus should not be on how small the services are, but instead they should be just the right size, “Lagom” size services. Lagom, being an opinionated framework, provides a “golden path” from which the developer can deviate if necessary. Being based on the reactive principles as defined in the Reactive Manifesto, Lagom provides the developer a guard-railed approach with good defaults while also allowing to deviate if necessary.

This blogpost will cover our initial impression on the framework together with our opinion on the choices made while architecting the framework. Note that we won’t go too deep into detail in all the different aspects of the framework, for more details refer to Lagom’s extensive documentation. As Lightbend is entering the microservices market with Lagom, we feel obliged to make a fair comparison with existing frameworks out there. In the Java world this is predominantly the Spring stack with Spring Boot and Spring Cloud, standing on the shoulders of giants such as the Netflix OSS. In this current stage, it would be a bit too early to make an in-depth comparison between the two, seeing as you would be comparing a mature project to an MVP. What we can share though, are our initial observations.

Design philosophy

Lagom’s design rests on the following principles:

  • Message-Driven and Asynchronous: Built upon Akka Stream for asynchronous streaming and the JDK8 CompletionStage API. Streaming is a first-class concept.
  • Distributed persistence: Lagom favours distributed persistence patterns using Event Sourcing with Command Query Responsibility Segregation (CQRS).
  • Developer productivity: Starting all microservices with a single command, code hot reloading and expressive service interface declarations are some examples of Lagom’s high emphasis on developer productivity.

Building blocks

The Lagom framework acts as an abstraction layer upon several Lightbend frameworks and consists of the following core technologies and frameworks:

Seeing as it acts as an abstraction layer the developer doesn’t need to hold any knowledge of Play Framework and Akka in order to successfully use Lagom. Sbt has been chosen as the build tool because it also acts as a development environment. Lagom relies heavily on the following sbt features:

  • Fine-grained tasks
  • Each task may return a value
  • The value returned by a task may be consumed by other tasks

According to Lightbend, Scala’s build tool ‘sbt’ offers many handy features to Lagom such as fast incremental recompilation, hot code reloading, starting and stopping services in parallel and automatic injection of configuration defaults. Sbt might be seen as a hurdle by most Java developers since it is Maven and Gradle (and to a lesser extent Ant) that rule most Java projects. Moving towards a microservices framework such as Lagom would already constitute quite a transition so we think that this might hold back Java developers from adopting the framework. Lightbend’s rebranding could be interpreted as a move away from a Scala-oriented company towards a more Java-minded company. In that regard it would make sense to lower the initial learning curve especially for a rather trivial component such as a building tool. After all, the most important thing to achieve adoption is allowing people to easily get started with the new technology. We think that providing integration for Maven or Gradle would have a positive effect on the adoption rate and although it may not be trivial to implement, it should help convince Java developers to give Lagom a go.

Google’s Guice has been chosen for dependency injection since it is a lightweight framework. What is remarkable is that Guice is used as well for intermicroservices calls. Lagom acts as a communication abstraction layer and it does so by adding a dependency on the interfaces of remote microservices. Just like a shared domain model and shared datastores being antipatterns for microservices, having code dependencies from one service in another is as well. Changing the code of one microservice should not have an immediate cascading effect on other microservices. This is the very essence of the microservices architecture. In a monolith, having code changes in one component can result in immediate breaking changes in other components of the system. While this may be desired in order to keep technical debt low, this is an inherent characteristic of monolithic systems. One of the reasons microservices were introduced, is to decouple components on all levels, especially binary coupling. Using protocols between components instead of actual binary dependencies allows us to implement the tolerant reader principle and versioning through for instance content negotiation. Lightbend argues that sharing interfaces as code will increase productivity and performance, but we fear the result of this is a distributed monolith instead of an actual decoupled microservices architecture. While we question the default way of communicating between microservices in Lagom, we are enthusiastic that more ways of making intermicroservices calls are becoming available. Using HTTP is possible as well, and one of the upcoming features is a Lagom Service Client. The Guice approach might also be quite favorable for people migrating from monolithic applications to microservices. In the end it is a trade-off, but one that shouldn’t be taken lightly.

As a default persistence solution, Apache Cassandra is used due to how well it integrates with CQRS and Event Sourcing. Lagom has support for Cassandra as datastore, both for the reading and writing data. It is possible to use other datastore solutions but this comes at the cost of not being able to take advantage of the persistence module in Lagom.

ConductR is an orchestration tool for managing Lightbend Reactive Platform applications across a cluster of machines and is Lightbend’s solution for running Lagom systems in production. Note that ConductR comes with a license fee and is majorly targeted at enterprises. The other option we currently have in order to run our Lagom system in production is to write our own service locator compatible with Lagom. At the time of writing someone already started working on Kubernetes support and we are sure that, given more time, more options will become available. For now though, Lagom is still in an early stage where we either have to pay for the ConductR license, build our own service locator, or wait until someone does the work for us.

Getting started with Lagom

In order to start using Lagom, Activator must be correctly set up. Currently two Lagom templates exist that can be used for creating a new Lagom application. The Lagom Java Seed template should be the template of choice, the Lagom Java Chirper template is an example of a Twitter-like app created in Lagom.

Creating a new Lagom application is as simple as using the following command:

$ activator new my-first-system lagom-java

Afterwards the project can be imported in any of the prominent IDEs as an sbt project.

In order to boot the system, we first need to navigate to the root of the project and start the Activator console:

$ activator

After which we can start all our services using a single simple command:

$ runAll

> runAll
[info] Starting embedded Cassandra server
.......
[info] Cassandra server running at 127.0.0.1:4000
[info] Service locator is running at http://localhost:8000
[info] Service gateway is running at http://localhost:9000
[info] application - Signalled start to ConductR
[info] application - Signalled start to ConductR
[info] application - Signalled start to ConductR
[info] Service helloworld-impl listening for HTTP on 0:0:0:0:0:0:0:0:24266
[info] Service hellostream-impl listening for HTTP on 0:0:0:0:0:0:0:0:26230
[info] (Services started, use Ctrl+D to stop and go back to the console...)
\

This command starts a Cassandra server, service locator and service gateway. Each of our microservices is started in parallel while also registering them in the service locator. Additionally, a run command to individually start services is available as well. Note that the ports are assigned to each microservice by an algorithm and are consistent even on different machines. The possibility to assign a specific port is available though.

Similar to Play Framework, Lagom also supports code hot reloading allowing you to make changes in the code and immediately seeing these changes live without having to restart anything. A feature we’re very fond of. In general, a restart is only required when adding a new microservice API and implementation module in the project.

Anatomy of a Lagom project

helloworld-api           → Microservice API submodule
 └ src/main/java         → Java source code interfaces with model objects
helloworld-impl          → Microservice implementation submodule
 └ logs                  → Logs of the microservice
 └ src/main/java         → Java source code implementation of the API submodule
 └ src/main/resources    → Contains the microservice application config
 └ src/test/java         → Java source code unit tests
logs                     → Logs of the Lagom system
project                  → Sbt configuration files
 └ build.properties      → Marker for sbt project
 └ plugins.sbt           → Sbt plugins including the declaration for Lagom itself
.gitignore               → Git ignore file
build.sbt                → Application build script

Example of a microservice

In order to write a new microservice you create a new API and implementation project. In the API project you define the interface of your microservice:

HelloService.java

public interface HelloService extends Service {
  ServiceCall<String, NotUsed, String> hello();
  
  ServiceCall<String, GreetingMessage, String> useGreeting();

  @Override
  default Descriptor descriptor() {
    return named("helloservice").with(
        restCall(Method.GET,  "/api/hello/:id",       hello()),
        restCall(Method.POST, "/api/hello/:id",       useGreeting())
      ).withAutoAcl(true);
  }
}

A Descriptor defines the service name and the endpoints offered by a service. In our case we define two REST endpoints, a GET and a POST.

GreetingMessage is basically an immutable class with a single String message instance variable. On the subject of immutability the Lagom documentation mentions Immutables, a Java library that helps you create immutable objects via annotation processing. Definitely worth a look seeing as it helps you get rid of boilerplate code.

In the implementation submodule we implement our API’s interface.

HelloServiceImpl.java

public class HelloServiceImpl implements HelloService {
  @Override
  public ServiceCall<String, NotUsed, String> hello() {
    return (id, request) -> {
      CompletableFuture.completedFuture("Hello, " + id);
    };
  }

  @Override
  public ServiceCall<String, GreetingMessage, String> useGreeting() {
    return (id, request) -> {
      CompletableFuture.completedFuture(request.message + id);
    };
  }
}

You’ll immediately notice that the service calls are non-blocking by default using CompletableFutures introduced in JDK8. Interesting to know is that Lagom also provides support for the Publish-subscribe pattern out of the box. We also need to implement the module that binds the HelloService so that it can be served.

HelloServiceModule.java

public class HelloServiceModule extends AbstractModule implements ServiceGuiceSupport {
  @Override
  protected void configure() {
    bindServices(serviceBinding(HelloService.class, HelloServiceImpl.class));
  }
}

We define our module in the application.config:

play.modules.enabled += sample.helloworld.impl.HelloServiceModule

And finally register our microservice in build.sbt with its dependencies and settings:

lazy val helloworldApi = project("helloworld-api")
  .settings(
    version := "1.0-SNAPSHOT",
    libraryDependencies += lagomJavadslApi
  )

lazy val helloworldImpl = project("helloworld-impl")
  .enablePlugins(LagomJava)
  .settings(
    version := "1.0-SNAPSHOT",
    libraryDependencies ++= Seq(
      lagomJavadslPersistence,
      lagomJavadslTestKit
    )
  )
  .settings(lagomForkedTestSettings: _*)
  .dependsOn(helloworldApi)

We can then test our endpoint:

$ curl localhost:24266/api/hello/World
Hello, World!

$ curl -H "Content-Type: application/json" -X POST -d '{"message": "Hello "}' http://localhost:24266/api/hello/World
Hello World

Seeing as any good developer writes unit tests for his/her code so should we!

public class HelloServiceTest {
  private static ServiceTest.TestServer server;

  @BeforeClass
  public static void setUp() {
    server = ServiceTest.startServer(ServiceTest.defaultSetup());
  }

  @AfterClass
  public static void tearDown() {
    if (server != null) {
      server.stop();
      server = null;
    }
  }

  @Test
  public void shouldRespondHello() throws Exception {
    // given
    HelloService service = server.client(HelloService.class);

    // when
    String hello = service.hello().invoke("Yannick", NotUsed.getInstance()).toCompletableFuture().get(5, SECONDS);

    // then
    assertEquals("Hello, Yannick", hello);
  }

  @Test
  public void shouldRespondGreeting() throws Exception {
    // given
    HelloService service = server.client(HelloService.class);

    // when
    String greeting = service.useGreeting().invoke("Yannick", new GreetingMessage("Hi there, ")).toCompletableFuture().get(5, SECONDS);

    // then
    assertEquals("Hi there, Yannick", greeting);
  }
}

Tests can be executed in Activator via the following command: $ test

> test
[info] Test run started
[info] Test sample.helloworld.impl.HelloServiceTest.testHello started
[info] Test sample.helloworld.impl.HelloServiceTest.testGreeting started
[info] Test run finished: 0 failed, 0 ignored, 2 total, 16.759s
[info] Passed: Total 2, Failed 0, Errors 0, Passed 2
[success] Total time: 21 s, completed Apr 14, 2016 10:06:41 AM

CQRS and Event Sourcing

Being an opinionated framework Lagom suggests to use CQRS and Event Sourcing seeing as it fits well within the reactive paradigm. In this blogpost we are not going to explain CQRS and Event Sourcing in detail seeing as it is very well documented in the documentation of Lagom. The gist of it is that each service should own its own data and only the service itself should have direct access to the database. Other services need to use the service’s API in order to interact with its data. Sharing the database across different services would result in tight coupling. Ideally we want to work with Bounded Contexts following the core principles of Domain-Driven Design where each service defines a Bounded Context. Using Event Sourcing gives us many advantages such as not only storing the current state of data but having an entire journal of events that tell us how the data achieved its current state. With event sourcing we only perform reads and writes, there are no updates nor deletes. All this makes it easy to test and debug and allows us to easily reproduce scenarios that happened in production by replaying the event log from that environment.

Note that just because Lagom encourages us to use CQRS and Event Sourcing it isn’t forcing us to use it as it is not always applicable to every use case. It is perfectly possible to, for example, plug in a PostgreSQL database for our persistence layer. Someone has already set up PostgreSQL integration using Revenj persistence. However, Lightbend suggests that for best scalability preference must be given to asynchronous APIs because using blocking APIs like JDBC and JPA will have an impact on that.

By default, when launching our development environment, a Cassandra server will be booted without having to do any setup ourselves besides adding the lagomJavadslPersistence dependency to our implementation in our build.sbt.

Regarding the code, a persistent entity needs to be defined, combined with a related command, event and state. Note that the following code samples are mainly here to give an idea of the work needed for implementing all this. For more information and a detailed explanation, consult the excellent documentation on the subject.

In the persistent entity we define the behaviour of our entity. In order to interact with event sourced entities, commands need to be sent. We therefore need to specify a command handler for each command class that the entity can receive. Commands are then translated into events which will get persisted by the entity. Each event has its own event handler registered.

Example of a PersistentEntity:

HelloWorld.java

HelloWorld.java

public class HelloWorld extends PersistentEntity<HelloCommand, HelloEvent, WorldState> {
  @Override
  public Behavior initialBehavior(Optional<WorldState> snapshotState) {
    BehaviorBuilder b = newBehaviorBuilder(
        snapshotState.orElse(new WorldState("Hello", LocalDateTime.now().toString())));
    b.setCommandHandler(UseGreetingMessage.class, (cmd, ctx) ->
      ctx.thenPersist(new GreetingMessageChanged(cmd.message),
        evt -> ctx.reply(Done.getInstance())));

    b.setEventHandler(GreetingMessageChanged.class,
        evt -> new WorldState(evt.message, LocalDateTime.now().toString()));

    b.setReadOnlyCommandHandler(Hello.class,
        (cmd, ctx) -> ctx.reply(state().message + ", " + cmd.name + "!"));

    return b.build();
  }
}

Our PersistentEntity requires a state to be defined:

WorldState.java

WorldState.java

@Immutable
@JsonDeserialize
public final class WorldState implements CompressedJsonable {
  public final String message;
  public final String timestamp;

  @JsonCreator
  public WorldState(String message, String timestamp) {
    this.message = Preconditions.checkNotNull(message, "message");
    this.timestamp = Preconditions.checkNotNull(timestamp, "timestamp");
  }

  @Override
  public boolean equals(@Nullable Object another) {
    if (this == another)
      return true;
    return another instanceof WorldState && equalTo((WorldState) another);
  }

  private boolean equalTo(WorldState another) {
    return message.equals(another.message) && timestamp.equals(another.timestamp);
  }

  @Override
  public int hashCode() {
    int h = 31;
    h = h * 17 + message.hashCode();
    h = h * 17 + timestamp.hashCode();
    return h;
  }

  @Override
  public String toString() {
    return MoreObjects.toStringHelper("WorldState").add("message", message).add("timestamp", timestamp).toString();
  }

In our command interface we define all the commands that our entity supports. In order to get a complete picture of the commands an entity supports, it is the convention to specify all supported commands as inner classes of the interface.

HelloCommand.java

public interface HelloCommand extends Jsonable {
  @Immutable
  @JsonDeserialize
  public final class UseGreetingMessage implements HelloCommand, CompressedJsonable, PersistentEntity.ReplyType<Done> {
    public final String message;

    @JsonCreator
    public UseGreetingMessage(String message) {
      this.message = Preconditions.checkNotNull(message, "message");
    }

    @Override
    public boolean equals(@Nullable Object another) {
      if (this == another)
        return true;
      return another instanceof UseGreetingMessage && equalTo((UseGreetingMessage) another);
    }

    private boolean equalTo(UseGreetingMessage another) {
      return message.equals(another.message);
    }

    @Override
    public int hashCode() {
      int h = 31;
      h = h * 17 + message.hashCode();
      return h;
    }

    @Override
    public String toString() {
      return MoreObjects.toStringHelper("UseGreetingMessage").add("message", message).toString();
    }
  }

  @Immutable
  @JsonDeserialize
  public final class Hello implements HelloCommand, PersistentEntity.ReplyType<String> {
    public final String name;
    public final Optional<String> organization;

    @JsonCreator
    public Hello(String name, Optional<String> organization) {
      this.name = Preconditions.checkNotNull(name, "name");
      this.organization = Preconditions.checkNotNull(organization, "organization");
    }

    @Override
    public boolean equals(@Nullable Object another) {
      if (this == another)
        return true;
      return another instanceof Hello && equalTo((Hello) another);
    }

    private boolean equalTo(Hello another) {
      return name.equals(another.name) && organization.equals(another.organization);
    }

    @Override
    public int hashCode() {
      int h = 31;
      h = h * 17 + name.hashCode();
      h = h * 17 + organization.hashCode();
      return h;
    }

    @Override
    public String toString() {
      return MoreObjects.toStringHelper("Hello").add("name", name).add("organization", organization).toString();
    }
  }
}

And finally we want to define all events that the entity supports in an event interface. It follows the same convention as with commands, specifying all events as inner classes of the interface.

HelloEvent.java

public interface HelloEvent extends Jsonable {
  @Immutable
  @JsonDeserialize
  public final class GreetingMessageChanged implements HelloEvent {
    public final String message;

    @JsonCreator
    public GreetingMessageChanged(String message) {
      this.message = Preconditions.checkNotNull(message, "message");
    }

    @Override
    public boolean equals(@Nullable Object another) {
      if (this == another)
        return true;
      return another instanceof GreetingMessageChanged && equalTo((GreetingMessageChanged) another);
    }

    private boolean equalTo(GreetingMessageChanged another) {
      return message.equals(another.message);
    }

    @Override
    public int hashCode() {
      int h = 31;
      h = h * 17 + message.hashCode();
      return h;
    }

    @Override
    public String toString() {
      return MoreObjects.toStringHelper("GreetingMessageChanged").add("message", message).toString();
    }
  }
}

The HelloServiceImpl.java class will look like the following:

public class HelloServiceImpl implements HelloService {
  private final PersistentEntityRegistry persistentEntityRegistry;

  @Inject
  public HelloServiceImpl(PersistentEntityRegistry persistentEntityRegistry) {
    this.persistentEntityRegistry = persistentEntityRegistry;
    persistentEntityRegistry.register(HelloWorld.class);
  }

  @Override
  public ServiceCall<String, NotUsed, String> hello() {
    return (id, request) -> {
      PersistentEntityRef<HelloCommand> ref = persistentEntityRegistry.refFor(HelloWorld.class, id);
      return ref.ask(new Hello(id, Optional.empty()));
    };
  }

  @Override
  public ServiceCall<String, GreetingMessage, Done> useGreeting() {
    return (id, request) -> {
       PersistentEntityRef<HelloCommand> ref = persistentEntityRegistry.refFor(HelloWorld.class, id);
       return ref.ask(new UseGreetingMessage(request.message));
    };
  }
}

Wednesday, April 20, 2016

Recommender systems using MLlib

Collaborative filtering is commonly used for recommender systems. These techniques aim to fill in the missing entries of a user-item association matrix. spark.ml currently supports model-based collaborative filtering, in which users and products are described by a small set of latent factors that can be used to predict missing entries. spark.ml uses the alternating least squares (ALS) algorithm to learn these latent factors. The implementation in spark.ml has the following parameters:

  • numBlocks is the number of blocks the users and items will be partitioned into in order to parallelize computation (defaults to 10).
  • rank is the number of latent factors in the model (defaults to 10).
  • maxIter is the maximum number of iterations to run (defaults to 10).
  • regParam specifies the regularization parameter in ALS (defaults to 1.0).
  • implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data (defaults to false which means using explicit feedback).
  • alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations (defaults to 1.0).
  • nonnegative specifies whether or not to use nonnegative constraints for least squares (defaults to false).

Note: The DataFrame-based API for ALS currently only supports integers for user and item ids. Other numeric types are supported for the user and item id columns, but the ids must be within the integer value range.

Explicit vs. implicit feedback

The standard approach to matrix factorization based collaborative filtering treats the entries in the user-item matrix as explicit preferences given by the user to the item, for example, users giving ratings to movies.

It is common in many real-world use cases to only have access to implicit feedback (e.g. views, clicks, purchases, likes, shares etc.). The approach used in spark.ml to deal with such data is taken from Collaborative Filtering for Implicit Feedback Datasets. Essentially, instead of trying to model the matrix of ratings directly, this approach treats the data as numbers representing the strength in observations of user actions (such as the number of clicks, or the cumulative duration someone spent viewing a movie). Those numbers are then related to the level of confidence in observed user preferences, rather than explicit ratings given to items. The model then tries to find latent factors that can be used to predict the expected preference of a user for an item.

Scaling of the regularization parameter

We scale the regularization parameter regParam in solving each least squares problem by the number of ratings the user generated in updating user factors, or the number of ratings the product received in updating product factors. This approach is named “ALS-WR” and discussed in the paper “Large-Scale Parallel Collaborative Filtering for the Netflix Prize”. It makes regParam less dependent on the scale of the dataset, so we can apply the best parameter learned from a sampled subset to the full dataset and expect similar performance.

Examples

In the following example, we load ratings data from the MovieLens dataset, each row consisting of a user, a movie, a rating and a timestamp. We then train an ALS model which assumes, by default, that the ratings are explicit (implicitPrefs is false). We evaluate the recommendation model by measuring the root-mean-square error of rating prediction.

Refer to the ALS Scala docs for more details on the API.

import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS

case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
def parseRating(str: String): Rating = {
  val fields = str.split("::")
  assert(fields.size == 4)
  Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
}

val ratings = spark.read.textFile("data/mllib/als/sample_movielens_ratings.txt")
  .map(parseRating)
  .toDF()
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))

// Build the recommendation model using ALS on the training data
val als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating")
val model = als.fit(training)

// Evaluate the model by computing the RMSE on the test data
val predictions = model.transform(test)

val evaluator = new RegressionEvaluator()
  .setMetricName("rmse")
  .setLabelCol("rating")
  .setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala" in the Spark repo.

If the rating matrix is derived from another source of information (i.e. it is inferred from other signals), you can set implicitPrefs to true to get better results:

val als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setImplicitPrefs(true)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating")

In the following example, we load ratings data from the MovieLens dataset, each row consisting of a user, a movie, a rating and a timestamp. We then train an ALS model which assumes, by default, that the ratings are explicit (implicitPrefs is false). We evaluate the recommendation model by measuring the root-mean-square error of rating prediction.

Refer to the ALS Java docs for more details on the API.

import java.io.Serializable;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.recommendation.ALS;
import org.apache.spark.ml.recommendation.ALSModel;

public static class Rating implements Serializable {
  private int userId;
  private int movieId;
  private float rating;
  private long timestamp;

  public Rating() {}

  public Rating(int userId, int movieId, float rating, long timestamp) {
    this.userId = userId;
    this.movieId = movieId;
    this.rating = rating;
    this.timestamp = timestamp;
  }

  public int getUserId() {
    return userId;
  }

  public int getMovieId() {
    return movieId;
  }

  public float getRating() {
    return rating;
  }

  public long getTimestamp() {
    return timestamp;
  }

  public static Rating parseRating(String str) {
    String[] fields = str.split("::");
    if (fields.length != 4) {
      throw new IllegalArgumentException("Each line must contain 4 fields");
    }
    int userId = Integer.parseInt(fields[0]);
    int movieId = Integer.parseInt(fields[1]);
    float rating = Float.parseFloat(fields[2]);
    long timestamp = Long.parseLong(fields[3]);
    return new Rating(userId, movieId, rating, timestamp);
  }
}

JavaRDD<Rating> ratingsRDD = spark
  .read().textFile("data/mllib/als/sample_movielens_ratings.txt").javaRDD()
  .map(new Function<String, Rating>() {
    public Rating call(String str) {
      return Rating.parseRating(str);
    }
  });
Dataset<Row> ratings = spark.createDataFrame(ratingsRDD, Rating.class);
Dataset<Row>[] splits = ratings.randomSplit(new double[]{0.8, 0.2});
Dataset<Row> training = splits[0];
Dataset<Row> test = splits[1];

// Build the recommendation model using ALS on the training data
ALS als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating");
ALSModel model = als.fit(training);

// Evaluate the model by computing the RMSE on the test data
Dataset<Row> predictions = model.transform(test);

RegressionEvaluator evaluator = new RegressionEvaluator()
  .setMetricName("rmse")
  .setLabelCol("rating")
  .setPredictionCol("prediction");
Double rmse = evaluator.evaluate(predictions);
System.out.println("Root-mean-square error = " + rmse);
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java" in the Spark repo.

If the rating matrix is derived from another source of information (i.e. it is inferred from other signals), you can set implicitPrefs to true to get better results:

ALS als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setImplicitPrefs(true)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating");

In the following example, we load ratings data from the MovieLens dataset, each row consisting of a user, a movie, a rating and a timestamp. We then train an ALS model which assumes, by default, that the ratings are explicit (implicitPrefs is False). We evaluate the recommendation model by measuring the root-mean-square error of rating prediction.

Refer to the ALS Python docs for more details on the API.

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

lines = spark.read.text("data/mllib/als/sample_movielens_ratings.txt").rdd
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2]), timestamp=long(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))
Find full example code at "examples/src/main/python/ml/als_example.py" in the Spark repo.

If the rating matrix is derived from another source of information (i.e. it is inferred from other signals), you can set implicitPrefs to True to get better results:

als = ALS(maxIter=5, regParam=0.01, implicitPrefs=True,
          userCol="userId", itemCol="movieId", ratingCol="rating")